找到问题的解决办法了么?
beats

beats

filebeat中work配置

回复

BeatsFatGirl_Spring 发起了问题 • 1 人关注 • 0 个回复 • 1819 次浏览 • 2020-05-13 21:01 • 来自相关话题

想购买 elastic 的产品,有没有中国区的相关销售?

默认分类pengallengao 回复了问题 • 5 人关注 • 3 个回复 • 8625 次浏览 • 2019-07-22 10:38 • 来自相关话题

【线下活动-分享主题征集-武汉】 2019年3月 Elastic&尚德机构技术沙龙

活动medcl 回复了问题 • 3 人关注 • 1 个回复 • 5136 次浏览 • 2019-02-22 15:42 • 来自相关话题

目前beats工具只有官网列出来的8种吗?

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 2299 次浏览 • 2018-12-17 12:05 • 来自相关话题

应使用哪个beats同步mysql数据到es

Beatschienx 回复了问题 • 4 人关注 • 2 个回复 • 4789 次浏览 • 2018-09-18 08:57 • 来自相关话题

kibana时区问题

Kibanaa807257775 回复了问题 • 2 人关注 • 2 个回复 • 8739 次浏览 • 2018-07-10 10:01 • 来自相关话题

filebeat新增字段后,ES可以查出字段数据,KIBANA显示异常,求大神!

Kibanaa807257775 回复了问题 • 3 人关注 • 4 个回复 • 4270 次浏览 • 2018-07-09 17:00 • 来自相关话题

filebeat 跨主机间发送日志,logstash 和elasticsearch 均接收不到,且不报错

Beatsmedcl 回复了问题 • 4 人关注 • 4 个回复 • 4363 次浏览 • 2018-07-05 13:47 • 来自相关话题

filebeat发送日志到kafka可以只发送原始日志吗,也就是message中的内容

Beatszcheneng 回复了问题 • 4 人关注 • 3 个回复 • 6371 次浏览 • 2018-07-03 14:13 • 来自相关话题

请问那种形式的beat可以完成数据的拉取(pull操作)

Beatsmedcl 回复了问题 • 2 人关注 • 1 个回复 • 2119 次浏览 • 2018-06-06 10:16 • 来自相关话题

对于写入非常频繁且数据量很大的iis日志,如何优化filebeat采集速度?

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 3435 次浏览 • 2018-06-05 13:53 • 来自相关话题

AIX系统下,如何采集实时文件日志

回复

Beatszhan 发起了问题 • 1 人关注 • 0 个回复 • 2726 次浏览 • 2018-03-21 15:04 • 来自相关话题

winlogbeat 采集的windows日志host字段里为什么是主机名而不是IP?

BeatsRonnie 回复了问题 • 3 人关注 • 2 个回复 • 5313 次浏览 • 2018-01-25 16:14 • 来自相关话题

Filebeat直接向elasticsearch发送时使用自定义的模板json格式么

Beatshhee 回复了问题 • 2 人关注 • 1 个回复 • 3360 次浏览 • 2017-12-05 13:46 • 来自相关话题

filebeat nginx module配置问题

Beatsmedcl 回复了问题 • 2 人关注 • 1 个回复 • 3438 次浏览 • 2017-10-18 14:34 • 来自相关话题

【线下活动-分享主题征集-武汉】 2019年3月 Elastic&尚德机构技术沙龙

活动medcl 回复了问题 • 3 人关注 • 1 个回复 • 5136 次浏览 • 2019-02-22 15:42 • 来自相关话题

filebeat中work配置

回复

BeatsFatGirl_Spring 发起了问题 • 1 人关注 • 0 个回复 • 1819 次浏览 • 2020-05-13 21:01 • 来自相关话题

想购买 elastic 的产品,有没有中国区的相关销售?

回复

默认分类pengallengao 回复了问题 • 5 人关注 • 3 个回复 • 8625 次浏览 • 2019-07-22 10:38 • 来自相关话题

【线下活动-分享主题征集-武汉】 2019年3月 Elastic&尚德机构技术沙龙

回复

活动medcl 回复了问题 • 3 人关注 • 1 个回复 • 5136 次浏览 • 2019-02-22 15:42 • 来自相关话题

目前beats工具只有官网列出来的8种吗?

回复

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 2299 次浏览 • 2018-12-17 12:05 • 来自相关话题

应使用哪个beats同步mysql数据到es

回复

Beatschienx 回复了问题 • 4 人关注 • 2 个回复 • 4789 次浏览 • 2018-09-18 08:57 • 来自相关话题

kibana时区问题

回复

Kibanaa807257775 回复了问题 • 2 人关注 • 2 个回复 • 8739 次浏览 • 2018-07-10 10:01 • 来自相关话题

filebeat新增字段后,ES可以查出字段数据,KIBANA显示异常,求大神!

回复

Kibanaa807257775 回复了问题 • 3 人关注 • 4 个回复 • 4270 次浏览 • 2018-07-09 17:00 • 来自相关话题

filebeat 跨主机间发送日志,logstash 和elasticsearch 均接收不到,且不报错

回复

Beatsmedcl 回复了问题 • 4 人关注 • 4 个回复 • 4363 次浏览 • 2018-07-05 13:47 • 来自相关话题

filebeat发送日志到kafka可以只发送原始日志吗,也就是message中的内容

回复

Beatszcheneng 回复了问题 • 4 人关注 • 3 个回复 • 6371 次浏览 • 2018-07-03 14:13 • 来自相关话题

请问那种形式的beat可以完成数据的拉取(pull操作)

回复

Beatsmedcl 回复了问题 • 2 人关注 • 1 个回复 • 2119 次浏览 • 2018-06-06 10:16 • 来自相关话题

对于写入非常频繁且数据量很大的iis日志,如何优化filebeat采集速度?

回复

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 3435 次浏览 • 2018-06-05 13:53 • 来自相关话题

AIX系统下,如何采集实时文件日志

回复

Beatszhan 发起了问题 • 1 人关注 • 0 个回复 • 2726 次浏览 • 2018-03-21 15:04 • 来自相关话题

winlogbeat 采集的windows日志host字段里为什么是主机名而不是IP?

回复

BeatsRonnie 回复了问题 • 3 人关注 • 2 个回复 • 5313 次浏览 • 2018-01-25 16:14 • 来自相关话题

Filebeat直接向elasticsearch发送时使用自定义的模板json格式么

回复

Beatshhee 回复了问题 • 2 人关注 • 1 个回复 • 3360 次浏览 • 2017-12-05 13:46 • 来自相关话题

filebeat nginx module配置问题

回复

Beatsmedcl 回复了问题 • 2 人关注 • 1 个回复 • 3438 次浏览 • 2017-10-18 14:34 • 来自相关话题

Day5: 《PacketBeat奇妙的OOM小记》

Adventkira8565 发表了文章 • 0 个评论 • 5083 次浏览 • 2016-12-05 23:00 • 来自相关话题

Beats这个项目的确很好用,几行命令下来,一个成型的Agent就出来了。使用者只需要关注采集什么数据就好,后续的事情libbeat基本都处理完了。不过值得吐槽的是,Beat太散了,管理起来东一个西一个的,产品化的时候对客户说,我们要在机器上放n个Agent不知道客户会是什么样的表情。
d7d0a529244b57acb6ce3796da132df8.jpg
不过轻量级、已部署的特点还是极大的吸引了我,于是就有了后面的事情了。 PacketBeat不明原因的OOM 某天我把PacketBeat放到了我的服务器上面,这台服务器上面有个MongoDB,MongoDB主要是拿来存放ES的元数据的。ES2.x的时候并没有很好的元数据管理,为了能让ES的索引分配的比较均匀,并且有元数据辅助查询,设计好一个元数据管理的仓库是必要的。然后我打开了对MongoDB的抓包功能,恩,一切都很好,接着我打开了日志管理页面,看到了一条一条的MongoDB的包被抓回来,解码,然后塞到了ES。可是第二天一看,咦??Packet跪了?不是吧,ElasticSearch做的产品这么不稳定么。我不信。
06170826_dLgU.png
然后我又启动了第二次,紧接着熟练的top了一下,观察了PacketBeat半个多小时,在被观察的这段时间里面,PacketBeat的表现非常的正常,看不出有什么异样。好吧,那上一次的OOM可能只是个意外,Windows也经常蓝屏嘛,OOM一次也正常。结果第二天我再次打开终端,发现这货居然又OOM了!!
06170909_irst.png
好吧好吧,我感觉我已经踩到Bug了,拿了开源社区这么多东西,总得贡献一下的,好吧,提个Issue去 https://github.com/elastic/beats/issues/2867 真相只有一个 微信群里面聊起这个奇妙的OOM,Medcl大神问是不是因为采集了ES的日志,(我的这台服务器和日志服务器有关系)然后导致滚雪球把PacketBeat给滚死了。咦?说不定真的是这个原因耶!但是看了看PacketBeat,我并没有抓ES的包,而且假如我采集了ES的包,应该一下就OOM掉了,不应该等那么久。不过这么一说,却仿佛打开了新世界的大门
06171040_FVEG.png
我把这台服务器在日志服务器中的角色重新梳理了下,终于发现了这次OOM的原因了。。 由于2.X的ES没有比较好的元数据信息,所以当日志送到LogServer的时候,我做了些额外的操作,让LogServer持久化到ES一定量的时候就会往Mongo写一下元数据信息(当然也有其他服务会往里面做CRUD啦),开始的时候访问Mongo的次数其实是很少的,假设按1W来算。那么问题来了,由于我们的PacketBeat抓了Mongo的包,那么LogServer往ES的CRUD操作都会被PacketBeat给抓走,然后再送回给LogServer
06171248_tcdl.png
那么一个隐藏的滚雪球事件就产生了,刚开始的那段时间,Mongo被抓包的次数只有1W,然后就往LogServer多送了1W条日志,不。。应该多很多,毕竟网络包嘛,然后就导致LogServer因为要管理元数据的频率开始逐渐地提高,逐渐提高CRUD的频率后抓包的内容也越来越多,紧接着到这发生到LogServer的频率也越来越高。。。。。每次PacketBeat崩掉的时候,都送了80W左右的日志量出去,然后它就OOM掉了(因为我那台机器就只剩下2G的空闲内存给它用,被系统给干掉了)。。我居然发现了这样的场景
06171336_PbWI.png
结论 使用PacketBeat的时候,记得要留意一下有没这种反馈型滚雪球的情况,多发生在自己的日志服务器上面。当然那种直接抓ES的就没什么好说了,估计启动了之后没多久就崩溃掉了

Day3: 《创建一个你自己的 Beat》

Adventmedcl 发表了文章 • 0 个评论 • 7585 次浏览 • 2016-12-03 22:19 • 来自相关话题

Elastic Advent 第三篇, 手头上事情实在太多,这两天正在进行权威指南翻译的冲刺阶段,临时填下坑,翻译官网的一篇文章吧(原文:https://www.elastic.co/blog/build-your-own-beat),Advent 规则很自由的,没说不能翻译文章啊,嘿嘿嘿,另外号召大家踊跃报名,大家一起玩才有意思。   活动地址:http://elasticsearch.cn/article/107   言归正传!  Beat 是一个开源的用来构建轻量级数据汇集的平台,可用于将各种类型的数据发送至Elasticsearch 与 Logstash。我们有 Packetbeat 用于监控局域网内服务器之间的网络流量信息,有 Filebeat 收集服务器上的日志信息,还有新推出的 Metricbeat 可以定期获取外部系统的监控指标信息,除此以外,你还可以非常方便的基于 libbeat 框架来构建你属于自己的专属 Beat,目前 beas 社区已经有超过25个 Community Beats 了。 Elastic 还提供一个 Beat generator(Beat 生成器)来帮你快速构建属于你自己的 Beat。通过这篇博客你将会看到如何通过 Beat 生成器来快速创建一个你自己的 Beat。今天我们创建的是一个叫做 lsbeat 的 Beat,lsbeat 非常类似 Unix 系统下的命令行 ls,我们用 lsbeat 来索引目录和文件信息。本篇文章环境基于 Unix 系统,如果你是 Windows 或是其它系统,相关操作可能需要根据实际情况进行调整。 第一步 – 配置 Golang 环境 Beats 是用 Golang写的,显然,要创建和开发一个 beat,Golang 环境必不可少,关于这方面的文章很多,建议查看这篇 Golang 的安装向导: install Golang。当前 Beats 需要的最低版本是 Golang 1.6。另外请确保正确设置了你的 $GOPATH 环境变量。 另外 Golang Glide 被用来进行包的依赖管理,所以也需要确保正确安装,最低版本是 Glide 0.10.0,安装说明点这里。 让我们先来看看 lsbeat 将会用到的一段代码吧,这是一个简单的 golang 程序,通过命令行接收一个目录参数,然后列出该目录下的文件和子目录信息。
package main
 
import (
    "fmt"
    "io/ioutil"
    "os"
)
 
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
 
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
 
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}
后面我们将使用到这段代码和 listDir 函数。 第二步 – 生成项目 要生成一个你自己的 Beat,就要用到 beat-generator 了,首先你必须安装 cookiecutter。安装的详细说明看这里。安装好 cookiecutter 之后,我们要给自己的 Beat 起一个好听的名字,最好是小写的英文字母,我们今天这个例子就叫 lsbeat 吧。 生成项目模板之前,我们需要下载 Beats generator 包文件,就在 beats 仓库。安装好 GoLang 之后,你就可以很方便的使用 go get 命令来下载 Beats generator 包文件了。 当你执行下面的这个命令后,所有的源码文件都会下载到 $GOPATH/src 目录。
$ go get github.com/elastic/beats
在 GOPATH 下创建一个以你自己github账号名称命名的目录,并切换过去,然后执行 cookiecutter 命令并指向 Beat Generator 源码路径。
$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat
Cookiecutter 接下来会问你几个问题,比如项目名称,我们输入:lsbeat;你的 github 用户名,输入你自己的 github 账户;还有两个关于beat和beat_path应该会自动识别,默认回车就好;最后的问题,你可以输入你的姓和名。
project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}
现在应该已经创建好了一个名为 lsbeat 的目录,并且里面应该会生成一些文件,让我们一起来看一下吧,结构如下:
$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py
我们刚刚已经生成好了一个原始的 Beat 模板了,但是你还需要获取相关的依赖和设置好 git 仓库。 首先,你需要拉取依赖的相关包信息,我们的这个例子是 lsbeat,我们先做一些的基本的配置,回头再看看详细看看其它的模板和配置文件,只需要执行 make setup 就可以自动获取依赖。
$ make setup
当你创建好了自己的 Beat 之后,记得上传到 github 仓库,并和社区进行分享哦,如下:
beats.png
要 push lsbeat 到你的 git 仓库,只需要执行如下命令:
$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master
恭喜你,现在你已经完成了一个 Beat ,并且发布了第一个版本到了 Github,不过里面还没有什么具体内容,现在让我们进一步看看里面的代码吧。 第四步 – 配置 执行过上面一系列命令之后,项目里将会自动创建名为 lsbeat.yml 和 lsbeat.template.json 的配置文件。所有的基本配置项都已经生成在了里面。
lsbeat.yml
lsbeat:
# Defines how often an event is sent to the output
period: 1s
Period 参数包含在每一个生成的 Beats 里面,它表示 lsbeat 将会每隔 1 秒钟轮询一次,这里我们修改 period 时间间隔为 10 秒。还可以在修改 etc/ 目录下面的 beat.yml 文件,这里新增一个 path 参数表示我们具体要监听哪个目录。
lsbeat:
# Defines how often an event is sent to the output
period: 10s
path: "."
参数添加好了之后,我们只需要运行 make update 命令就能让这些修改应用到配置文件lsbeat.yml。
$ make update
$ cat lsbeat.yml
 
################### Lsbeat Configuration Example #########################
 
############################# Lsbeat ######################################
 
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################
修改完 yml 文件,记得修改 config/config.go文件,添加一个path 参数。
package config
 
import "time"
 
type Config struct {
Period time.Duration `config:"period"`
Path   string        `config:"path"`
}
 
var DefaultConfig = Config{
Period: 10 * time.Second,
Path:   ".",
}
同时我们修改 period 默认时间间隔为 10 秒,默认监听的是当前目录 (.) 。. 第五步 – 添加代码 每一个 Beat 需要实现 Beater 接口,里面定义了 Run() 和 Stop() 函数。.  我们可以定义一个名为 Lsbeat 的结构体,然后用这个对象实现 Beater 接口。然后添加字段 lastIndexTime 来保存最后运行的时间戳信息。
type Lsbeat struct {
done   chan struct{}
config config.Config
client publisher.Client
 
lastIndexTime time.Time
...
}
另外,每个 Beat 还需要实现 New() 方法来接收 Beat 配置信息和返回 Lsbeat 的具体实例。
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
 
ls := &Lsbeat{
done:   make(chan struct{}),
config: config,
}
return ls, nil
}
在我们的 lsbeat 例子中,我们要做的就是扩展默认的 Run() 函数来导出指定目录的文件和子目录信息。 在修改 Run() 函数之前,我们先在 lsbeat.go 增加 listDir() 函数,就是我们前面最开始测试的那段代码,用于收集目录和文件信息的简单例子稍微修改一下。另外我们还要生成以下字段信息:
"@timestamp": common.Time(time.Now()),
"type": beatname,
"modtime": common.Time(t),
"filename": f.Name(),
"path": dirFile + "/" + f.Name(),
"directory": f.IsDir(),
"filesize": f.Size(),
第一次运行的时候我们将索引所有的文件和目录信息,然后我们再定期检查是否有新文件被创建或者修改,再索引这些新创建的文件和目录。每次定期检查的时间戳都会保存在 lasIndexTime 变量,完整代码如下:
func (bt *Lsbeat) listDir(dirFile string, beatname string, init bool) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()
 
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type":       beatname,
"modtime":    common.Time(t),
"filename":   f.Name(),
"path":       dirFile + "/" + f.Name(),
"directory":  f.IsDir(),
"filesize":   f.Size(),
}
if init {
// index all files and directories on init
bt.client.PublishEvent(event)
} else {
// Index only changed files since last run.
if t.After(bt.lastIndexTime) {
bt.client.PublishEvent(event)
}
}
 
if f.IsDir() {
bt.listDir(dirFile+"/"+f.Name(), beatname, init)
}
}
}
记住在最开始需要导入 “io/ioutil” 包。
import (
"fmt"
"io/ioutil"
"time"
)
现在,让我们看看如何在 Run() 函数里面调用 listDir() 函数,并且保存时间戳到 lasIndexTime 变量。
func (bt *Lsbeat) Run(b *beat.Beat) error {
logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
 
bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
 
select {
case <-bt.done:
return nil
case <-ticker.C:
}
 
bt.listDir(bt.config.Path, b.Name, true)   // call lsDir
bt.lastIndexTime = time.Now()               // mark Timestamp
 
logp.Info("Event sent")
counter++
}
 
}
函数 Stop() 用来中断 run 的循环执行,保持默认生成的就行。
func (bt *Lsbeat) Stop() {
bt.client.Close()
close(bt.done)
}
到这里,编码部分基本就完成了。我们接下来添加新字段到 mapping 中,修改文件 etc/fields.yml。.
- key: lsbeat
  title: LS Beat
  description:
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

重新应用新的配置。 $ make update 字段 file_name 将使用 nGram 分词,我们还需要在文件 lsbeat.template.json 的 “settings” 节点添加一个自定义的 analyzer。
{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

第六步 – 编译和运行 现在我们可以编译和运行了,只需要执行 make 命令就可以编译出可执行文件 lsbeat (lsbeat.exe on windows) 。 $ make 修改 lsbeat.yml 文件,设置需要监听的目录,如: “/Users/ec2-user/go”,记住是全路径。
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"
同时确保你的 elasticsearch 和 kibana 正常运行。现在运行一下 lsbeat 命令看看会发生什么事情吧。 $ ./lsbeat 打开Kibana,通过调用 _cat 接口我们看看的索引是不是创建了。
beats-1.png
可以看到创建了一个名为 lsbeat-2016.06.03 的索引,并且看到已经有了一些文档了。现在对 filename 字段查询一下,由于使用的是 nGram 分词,支持模糊匹配,我们使用 lsbe 关键字搜一下。
beats-2.png
大功告成! 恭喜你,你已经完成了第一个属于你自己的 beat。

Packetbeat协议扩展开发教程(3)

Beatsmedcl 发表了文章 • 12 个评论 • 10749 次浏览 • 2016-01-15 18:32 • 来自相关话题

 书接上回:http://elasticsearch.cn/article/53   前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了) 网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样: 打开一个现有的UDP和HTTP协议接口定义: /~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
	// Called to initialize the Plugin
	Init(test_mode bool, results publisher.Client) error
 
	// Called to return the configured ports
	GetPorts() int
}
 
type TcpProtocolPlugin interface {
	ProtocolPlugin
 
	// Called when TCP payload data is available for parsing.
	Parse(pkt *Packet, tcptuple *common.TcpTuple,
		dir uint8, private ProtocolData) ProtocolData
 
	// Called when the FIN flag is seen in the TCP stream.
	ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
		private ProtocolData) ProtocolData
 
	// Called when a packets are missing from the tcp
	// stream.
	GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
		private ProtocolData) (priv ProtocolData, drop bool)
 
	// ConnectionTimeout returns the per stream connection timeout.
	// Return <=0 to set default tcp module transaction timeout.
	ConnectionTimeout() time.Duration
}
 
type UdpProtocolPlugin interface {
	ProtocolPlugin
 
	// ParseUdp is invoked when UDP payload data is available for parsing.
	ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间; UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单; ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。 请问: Packetbeat怎么工作的? 回答: 每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。 貌似很简单嘛! 进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
	Ts      time.Time
	Tuple   common.IpPortTuple
	Payload byte
}
Packet结构简单, Ts是收到数据包的时间戳; Tuple是一个来源IP+来源端口和目的IP+目的端口的元组; Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。 首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
  smtp:
    # Configure the ports where to listen for Smtp traffic. You can disable
    # the Smtp protocol by commenting out the list of ports.
    ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:
git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
        Pgsql    Pgsql
        Redis    Redis
        Thrift   Thrift
+       Smtp     Smtp
 }
 
 type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
        Send_response *bool
 }
 
+type Smtp struct {
+	ProtocolCommon        `yaml:",inline"`
+}
+
 // Config Singleton
 var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go, 这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。
git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
        MongodbProtocol
        DnsProtocol
        MemcacheProtocol
+       SmtpProtocol
 )
 
 // Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
        "mongodb",
        "dns",
        "memcache",
+       "smtp",
 }

继续修改packetbeat.go主文件,允许SMTP协议并加载。
git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
        "github.com/elastic/packetbeat/protos/tcp"
        "github.com/elastic/packetbeat/protos/thrift"
        "github.com/elastic/packetbeat/protos/udp"
+       "github.com/elastic/packetbeat/protos/smtp"
        "github.com/elastic/packetbeat/sniffer"
 )
 
@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
        protos.ThriftProtocol:   new(thrift.Thrift),
        protos.MongodbProtocol:  new(mongodb.Mongodb),
        protos.DnsProtocol:      new(dns.Dns),
+       protos.SmtpProtocol:      new(smtp.Smtp),
 }

做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。 说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。 现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
		dir uint8, private ProtocolData) ProtocolData

pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。 下面看段MySQL模块里面的例子
 priv := mysqlPrivateData{}
        if private != nil {
                var ok bool
                priv, ok = private.(mysqlPrivateData)
                if !ok {
                        priv = mysqlPrivateData{}
                }
        }
 
        [ ... ]
 
        return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。 我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
                if !ok {
                        // drop this tcp stream. Will retry parsing with the next
                        // segment in it
                        priv.Data[dir] = nil
                        logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
                        return priv
                }
 
                if complete {
                        mysql.messageComplete(tcptuple, dir, stream)
                } else {
                        // wait for more data
                        break
                }
                
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。 好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321 由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM: 
S: 250 Ok
C: RCPT TO: 
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令: MAIL:开始一份邮件 mail from: xxx@xx.com RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com QUIT:结束SMTP会话,不一定发送了邮件,注意 RESET:重置会话,当前传输被取消  最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/ Ctrl+F找到SMTP的下载地址:smtp.pcap 用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图: 上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。 打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go 定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
	Data [2]*SmtpStream
}

type SmtpStream struct {
	tcptuple *common.TcpTuple

	data byte

	parseOffset int
	isClient    bool
	message *SmtpMessage
}

type SmtpMessage struct {
	Ts   time.Time
	From string
	To string
}
然后参照MySQL协议,定义相应的方法,最终如下:
package smtp

import (
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/logp"
	"github.com/elastic/beats/libbeat/publisher"
	"github.com/elastic/beats/packetbeat/config"
	"github.com/elastic/beats/packetbeat/protos"
	"github.com/elastic/beats/packetbeat/protos/tcp"
	"bytes"
	"time"
	"strings"
)

type smtpPrivateData struct{
	Data [2]*SmtpStream
}

type SmtpStream struct {
	tcptuple *common.TcpTuple

	data byte

	parseOffset int
	isClient    bool

	message *SmtpMessage
}

type SmtpMessage struct {
	start int
	end   int

	Ts   time.Time
	From string
	To string
	IgnoreMessage bool
}

type Smtp struct {
	SendRequest         bool
	SendResponse        bool
	transactionTimeout time.Duration
	Ports         int
	results publisher.Client
}

func (smtp *Smtp) initDefaults() {
	smtp.SendRequest = false
	smtp.SendResponse = false
	smtp.transactionTimeout = protos.DefaultTransactionExpiration
}

func (smtp *Smtp) setFromConfig(config config.Smtp) error {
	smtp.Ports = config.Ports
	if config.SendRequest != nil {
		smtp.SendRequest = *config.SendRequest
	}
	if config.SendResponse != nil {
		smtp.SendResponse = *config.SendResponse
	}

	if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
		smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
	}

	return nil
}

func (smtp *Smtp) GetPorts() int {
	return smtp.Ports
}

func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
	smtp.initDefaults()

	if !test_mode {
		err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
		if err != nil {
			return err
		}
	}
	smtp.results = results

	return nil
}

func readLine(data byte, offset int) (bool, string, int) {
	q := bytes.Index(data[offset:], byte("\r\n"))
	if q == -1 {
		return false, "", 0
	}
	return true, string(data[offset : offset+q]), offset + q + 2
}

func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {

	defer logp.Recover("ParseSmtp exception")

	priv := smtpPrivateData{}
	if private != nil {
		var ok bool
		priv, ok = private.(smtpPrivateData)
		if !ok {
			priv = smtpPrivateData{}
		}
	}

	if priv.Data[dir] == nil {
		priv.Data[dir] = &SmtpStream{
			tcptuple: tcptuple,
			data:     pkt.Payload,
			message:  &SmtpMessage{Ts: pkt.Ts},
		}
	} else {
		// concatenate bytes
		priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
		if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
			logp.Debug("smtp", "Stream data too large, dropping TCP stream")
			priv.Data[dir] = nil
			return priv
		}
	}

	stream := priv.Data[dir]
	for len(stream.data) > 0 {
		if stream.message == nil {
			stream.message = &SmtpMessage{Ts: pkt.Ts}
		}

		ok, complete := stmpMessageParser(priv.Data[dir])
		if !ok {
			// drop this tcp stream. Will retry parsing with the next
			// segment in it
			priv.Data[dir] = nil
			logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
			return priv
		}

		if complete {
			smtp.messageComplete(tcptuple, dir, stream)
		} else {
			logp.Debug("smtp","still wait message...")
			// wait for more data
			break
		}
	}

	return priv
}

func (smtp *Smtp) ConnectionTimeout() time.Duration {
	return smtp.transactionTimeout
}

func stmpMessageParser(s *SmtpStream) (bool, bool) {

	var value string=""

	for s.parseOffset < len(s.data) {


		logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))


		if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {

			logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))

			found, line, off := readLine(s.data, s.parseOffset)
			if !found {
				return true, false
			}

			value = line[1:]
			logp.Debug("smtp", "value  %s", value)

			s.parseOffset = off
		} else {
			logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
			return false, false
		}
	}

	return true, false
}

func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
	logp.Info("smtp","handle smtp message...")

	//TODO

}

// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {

	logp.Info("smtp","message completed...")

	// all ok, ship it
	msg := stream.data[stream.message.start:stream.message.end]

	if !stream.message.IgnoreMessage {
		handleSmtp(smtp, stream.message, tcptuple, dir, msg)
	}

	// and reset message
	stream.PrepareForNewMessage()
}

func (stream *SmtpStream) PrepareForNewMessage() {
	logp.Info("smtp","prepare for new message...")

	stream.data = stream.data[stream.parseOffset:]
	stream.parseOffset = 0
	stream.isClient = false
	stream.message = nil
}



func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {

	defer logp.Recover("GapInStream(smtp) exception")

	if private == nil {
		return private, false
	}

	return private, true
}

func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

	logp.Info("smtp","stream closed...")

	// TODO: check if we have data pending and either drop it to free
	// memory or send it up the stack.
	return private
}

现在切换到命令行,编译一下
cd ~/go/src/github.com/elastic/beats/packetbeat
make

编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)
./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

运行查看控制台输出结果:
➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG  Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG  Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG  Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG  Unexpected message starting with From: "Gurpartap Singh" 
To: 
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...

成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。 留待下回分解。

Packetbeat协议扩展开发教程(2)

Beatsmedcl 发表了文章 • 1 个评论 • 7671 次浏览 • 2016-01-15 18:23 • 来自相关话题

书接上回:http://elasticsearch.cn/article/48 我们打开Packetbeat项目,看看里面长什么样: 现在beats项目都合并在一起了,第一级可以看到各个子项目: /libbeat: 公共依赖; /filebeat: 替代Logstash-forwarder,处理日志类型数据; /packetbeat: 本文扩展重点,网络抓包; /topbeat: 监控系统性能; /winlogbeat: 监控windows下面的日志信息; /vender: 依赖的第三方库; /tests: 用于测试的pcamp抓包文件,非常有用; /scripts: 一些用于开发和测试的Docker脚本文件; 现在重点看看/packetbeat下面目录都有些什么: /packetbeat/main.go: 启动入口,里面没有什么逻辑; /packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册; /packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体; /packetbeat/debian/: debian打包相关; /packetbeat/decoder/: 解码类,网络传输层包的解码; /packetbeat/docs/: 项目的相关文档; /packetbeat/etc/: 示例配置文件; /packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类; /packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP; /packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options; /packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本; 知道项目的大概架构就知道从哪下手了,下节分解。

elasticsearch logstash kibana beats 资料分享

资料分享abcdef 发表了文章 • 1 个评论 • 7607 次浏览 • 2015-12-02 14:40 • 来自相关话题

ELK系列文章推荐 http://www.ttlsa.com/log-system/elk/    写的还不错。
ELK系列文章推荐 http://www.ttlsa.com/log-system/elk/    写的还不错。