即使是不成熟的尝试,也胜于胎死腹中的策略。
PacketBeat

PacketBeat

packetbeat运行一段时间后自动停止

回复

Beatsihategg 发起了问题 • 1 人关注 • 0 个回复 • 1803 次浏览 • 2021-08-15 21:16 • 来自相关话题

packetbeat 支持https吗

Beatsxx_zhang 回复了问题 • 4 人关注 • 3 个回复 • 3773 次浏览 • 2021-07-12 15:09 • 来自相关话题

packetbeat性能数据

BeatsWTF 回复了问题 • 7 人关注 • 6 个回复 • 8277 次浏览 • 2021-04-06 20:50 • 来自相关话题

packetbeat抓取redis命令,丢包率较大

回复

Beatswjj_ah 发起了问题 • 1 人关注 • 0 个回复 • 3159 次浏览 • 2019-11-14 09:31 • 来自相关话题

packetbeat 报http_parser.go:156: WARN Failed to understand HTTP response status: 200

Beatsnotliulk 回复了问题 • 3 人关注 • 2 个回复 • 3649 次浏览 • 2019-10-31 17:56 • 来自相关话题

packetbeat af_packet丢包

Beatsrochy 回复了问题 • 2 人关注 • 1 个回复 • 6385 次浏览 • 2018-10-27 00:08 • 来自相关话题

packetbeat不能持续发送数据,只是重启的时候发送一下数据

Beatsggg 回复了问题 • 2 人关注 • 3 个回复 • 2874 次浏览 • 2018-10-17 11:14 • 来自相关话题

应使用哪个beats同步mysql数据到es

Beatschienx 回复了问题 • 4 人关注 • 2 个回复 • 5865 次浏览 • 2018-09-18 08:57 • 来自相关话题

mysql协议解析扩展

Beatsggg 发表了文章 • 5 个评论 • 3052 次浏览 • 2018-05-08 15:40 • 来自相关话题

elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。
elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。

packetbeat flow数据不正确

Beatsggg 回复了问题 • 2 人关注 • 1 个回复 • 3836 次浏览 • 2018-03-26 17:58 • 来自相关话题

packetbeat监控同一个域名下的3个接口,一个可以监控到两个监控不到,大神求解

Beatsggg 回复了问题 • 2 人关注 • 1 个回复 • 3548 次浏览 • 2018-03-26 11:24 • 来自相关话题

packetbeat测试的时候遇到这样的错误,请问如何处理?

回复

Beatshongyang 回复了问题 • 1 人关注 • 1 个回复 • 2831 次浏览 • 2018-01-24 10:17 • 来自相关话题

packetbeat 抓取vpn tun网卡流量问题

Beatsggg 回复了问题 • 3 人关注 • 2 个回复 • 5179 次浏览 • 2017-12-13 11:35 • 来自相关话题

packetbeat 发到es 时区问题

Beatsggg 回复了问题 • 2 人关注 • 2 个回复 • 3693 次浏览 • 2017-12-11 17:11 • 来自相关话题

网卡流量过大 packetbeat 频繁重启

Beatsmedcl 回复了问题 • 3 人关注 • 2 个回复 • 4120 次浏览 • 2017-10-27 12:02 • 来自相关话题

packetbeat运行一段时间后自动停止

回复

Beatsihategg 发起了问题 • 1 人关注 • 0 个回复 • 1803 次浏览 • 2021-08-15 21:16 • 来自相关话题

packetbeat 支持https吗

回复

Beatsxx_zhang 回复了问题 • 4 人关注 • 3 个回复 • 3773 次浏览 • 2021-07-12 15:09 • 来自相关话题

packetbeat性能数据

回复

BeatsWTF 回复了问题 • 7 人关注 • 6 个回复 • 8277 次浏览 • 2021-04-06 20:50 • 来自相关话题

packetbeat抓取redis命令,丢包率较大

回复

Beatswjj_ah 发起了问题 • 1 人关注 • 0 个回复 • 3159 次浏览 • 2019-11-14 09:31 • 来自相关话题

packetbeat 报http_parser.go:156: WARN Failed to understand HTTP response status: 200

回复

Beatsnotliulk 回复了问题 • 3 人关注 • 2 个回复 • 3649 次浏览 • 2019-10-31 17:56 • 来自相关话题

packetbeat af_packet丢包

回复

Beatsrochy 回复了问题 • 2 人关注 • 1 个回复 • 6385 次浏览 • 2018-10-27 00:08 • 来自相关话题

packetbeat不能持续发送数据,只是重启的时候发送一下数据

回复

Beatsggg 回复了问题 • 2 人关注 • 3 个回复 • 2874 次浏览 • 2018-10-17 11:14 • 来自相关话题

应使用哪个beats同步mysql数据到es

回复

Beatschienx 回复了问题 • 4 人关注 • 2 个回复 • 5865 次浏览 • 2018-09-18 08:57 • 来自相关话题

packetbeat flow数据不正确

回复

Beatsggg 回复了问题 • 2 人关注 • 1 个回复 • 3836 次浏览 • 2018-03-26 17:58 • 来自相关话题

packetbeat监控同一个域名下的3个接口,一个可以监控到两个监控不到,大神求解

回复

Beatsggg 回复了问题 • 2 人关注 • 1 个回复 • 3548 次浏览 • 2018-03-26 11:24 • 来自相关话题

packetbeat测试的时候遇到这样的错误,请问如何处理?

回复

Beatshongyang 回复了问题 • 1 人关注 • 1 个回复 • 2831 次浏览 • 2018-01-24 10:17 • 来自相关话题

packetbeat 抓取vpn tun网卡流量问题

回复

Beatsggg 回复了问题 • 3 人关注 • 2 个回复 • 5179 次浏览 • 2017-12-13 11:35 • 来自相关话题

packetbeat 发到es 时区问题

回复

Beatsggg 回复了问题 • 2 人关注 • 2 个回复 • 3693 次浏览 • 2017-12-11 17:11 • 来自相关话题

网卡流量过大 packetbeat 频繁重启

回复

Beatsmedcl 回复了问题 • 3 人关注 • 2 个回复 • 4120 次浏览 • 2017-10-27 12:02 • 来自相关话题

关于af_packet启动失败的分析

回复

Beatsggg 发起了问题 • 1 人关注 • 0 个回复 • 5125 次浏览 • 2017-10-25 14:57 • 来自相关话题

mysql协议解析扩展

Beatsggg 发表了文章 • 5 个评论 • 3052 次浏览 • 2018-05-08 15:40 • 来自相关话题

elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。
elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。

packetbeat的oracle协议扩展

Beatsggg 发表了文章 • 40 个评论 • 7399 次浏览 • 2017-05-10 15:01 • 来自相关话题

oracle由于是商用软件,协议并不公开,而且相比mysql等开源数据库软件,协议复杂度加了不止一个量级。 出于版权考虑,packetbeat并没有加入oracle协议的支持,只能自己动手。 好在beats充分考虑了扩展性,把公共的基础工作抽象成框架,新协议的扩展只需要专注于协议的分析和解码。 tns协议是oracle客户端和服务端通信协议,应用可以通过OCI、JDBC等接口去访问数据库。 tns协议有多个版本,不同版本之间差异也比较大,11g是主流tns版本为314。 目前完成308、310、313、314、315版本的解析   packetbeat支持pcap、pf_ring等抓包方式,通过kafka+es+kibana展示,效果如图  
oracle由于是商用软件,协议并不公开,而且相比mysql等开源数据库软件,协议复杂度加了不止一个量级。 出于版权考虑,packetbeat并没有加入oracle协议的支持,只能自己动手。 好在beats充分考虑了扩展性,把公共的基础工作抽象成框架,新协议的扩展只需要专注于协议的分析和解码。 tns协议是oracle客户端和服务端通信协议,应用可以通过OCI、JDBC等接口去访问数据库。 tns协议有多个版本,不同版本之间差异也比较大,11g是主流tns版本为314。 目前完成308、310、313、314、315版本的解析   packetbeat支持pcap、pf_ring等抓包方式,通过kafka+es+kibana展示,效果如图  

Packetbeat的Cassandra协议扩展

Beatsmedcl 发表了文章 • 1 个评论 • 4699 次浏览 • 2016-07-05 23:16 • 来自相关话题

论坛有多少人在用Cassandra的啊?弄了一个Cassandra的协议,有在用的Cassandra么?帮忙测试一下,看看有没有bug, 欢迎反馈。   https://github.com/elastic/beats/pull/1959  
论坛有多少人在用Cassandra的啊?弄了一个Cassandra的协议,有在用的Cassandra么?帮忙测试一下,看看有没有bug, 欢迎反馈。   https://github.com/elastic/beats/pull/1959  

Packetbeat协议扩展开发教程(3)

Beatsmedcl 发表了文章 • 12 个评论 • 11946 次浏览 • 2016-01-15 18:32 • 来自相关话题

 书接上回:http://elasticsearch.cn/article/53   前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了) 网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样: 打开一个现有的UDP和HTTP协议接口定义: /~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
	// Called to initialize the Plugin
	Init(test_mode bool, results publisher.Client) error
 
	// Called to return the configured ports
	GetPorts() int
}
 
type TcpProtocolPlugin interface {
	ProtocolPlugin
 
	// Called when TCP payload data is available for parsing.
	Parse(pkt *Packet, tcptuple *common.TcpTuple,
		dir uint8, private ProtocolData) ProtocolData
 
	// Called when the FIN flag is seen in the TCP stream.
	ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
		private ProtocolData) ProtocolData
 
	// Called when a packets are missing from the tcp
	// stream.
	GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
		private ProtocolData) (priv ProtocolData, drop bool)
 
	// ConnectionTimeout returns the per stream connection timeout.
	// Return <=0 to set default tcp module transaction timeout.
	ConnectionTimeout() time.Duration
}
 
type UdpProtocolPlugin interface {
	ProtocolPlugin
 
	// ParseUdp is invoked when UDP payload data is available for parsing.
	ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间; UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单; ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。 请问: Packetbeat怎么工作的? 回答: 每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。 貌似很简单嘛! 进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
	Ts      time.Time
	Tuple   common.IpPortTuple
	Payload byte
}
Packet结构简单, Ts是收到数据包的时间戳; Tuple是一个来源IP+来源端口和目的IP+目的端口的元组; Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。 首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
  smtp:
    # Configure the ports where to listen for Smtp traffic. You can disable
    # the Smtp protocol by commenting out the list of ports.
    ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:
git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
        Pgsql    Pgsql
        Redis    Redis
        Thrift   Thrift
+       Smtp     Smtp
 }
 
 type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
        Send_response *bool
 }
 
+type Smtp struct {
+	ProtocolCommon        `yaml:",inline"`
+}
+
 // Config Singleton
 var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go, 这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。
git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
        MongodbProtocol
        DnsProtocol
        MemcacheProtocol
+       SmtpProtocol
 )
 
 // Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
        "mongodb",
        "dns",
        "memcache",
+       "smtp",
 }

继续修改packetbeat.go主文件,允许SMTP协议并加载。
git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
        "github.com/elastic/packetbeat/protos/tcp"
        "github.com/elastic/packetbeat/protos/thrift"
        "github.com/elastic/packetbeat/protos/udp"
+       "github.com/elastic/packetbeat/protos/smtp"
        "github.com/elastic/packetbeat/sniffer"
 )
 
@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
        protos.ThriftProtocol:   new(thrift.Thrift),
        protos.MongodbProtocol:  new(mongodb.Mongodb),
        protos.DnsProtocol:      new(dns.Dns),
+       protos.SmtpProtocol:      new(smtp.Smtp),
 }

做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。 说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。 现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
		dir uint8, private ProtocolData) ProtocolData

pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。 下面看段MySQL模块里面的例子
 priv := mysqlPrivateData{}
        if private != nil {
                var ok bool
                priv, ok = private.(mysqlPrivateData)
                if !ok {
                        priv = mysqlPrivateData{}
                }
        }
 
        [ ... ]
 
        return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。 我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
                if !ok {
                        // drop this tcp stream. Will retry parsing with the next
                        // segment in it
                        priv.Data[dir] = nil
                        logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
                        return priv
                }
 
                if complete {
                        mysql.messageComplete(tcptuple, dir, stream)
                } else {
                        // wait for more data
                        break
                }
                
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。 好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321 由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM: 
S: 250 Ok
C: RCPT TO: 
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令: MAIL:开始一份邮件 mail from: xxx@xx.com RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com QUIT:结束SMTP会话,不一定发送了邮件,注意 RESET:重置会话,当前传输被取消  最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/ Ctrl+F找到SMTP的下载地址:smtp.pcap 用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图: 上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。 打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go 定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
	Data [2]*SmtpStream
}

type SmtpStream struct {
	tcptuple *common.TcpTuple

	data byte

	parseOffset int
	isClient    bool
	message *SmtpMessage
}

type SmtpMessage struct {
	Ts   time.Time
	From string
	To string
}
然后参照MySQL协议,定义相应的方法,最终如下:
package smtp

import (
	"github.com/elastic/beats/libbeat/common"
	"github.com/elastic/beats/libbeat/logp"
	"github.com/elastic/beats/libbeat/publisher"
	"github.com/elastic/beats/packetbeat/config"
	"github.com/elastic/beats/packetbeat/protos"
	"github.com/elastic/beats/packetbeat/protos/tcp"
	"bytes"
	"time"
	"strings"
)

type smtpPrivateData struct{
	Data [2]*SmtpStream
}

type SmtpStream struct {
	tcptuple *common.TcpTuple

	data byte

	parseOffset int
	isClient    bool

	message *SmtpMessage
}

type SmtpMessage struct {
	start int
	end   int

	Ts   time.Time
	From string
	To string
	IgnoreMessage bool
}

type Smtp struct {
	SendRequest         bool
	SendResponse        bool
	transactionTimeout time.Duration
	Ports         int
	results publisher.Client
}

func (smtp *Smtp) initDefaults() {
	smtp.SendRequest = false
	smtp.SendResponse = false
	smtp.transactionTimeout = protos.DefaultTransactionExpiration
}

func (smtp *Smtp) setFromConfig(config config.Smtp) error {
	smtp.Ports = config.Ports
	if config.SendRequest != nil {
		smtp.SendRequest = *config.SendRequest
	}
	if config.SendResponse != nil {
		smtp.SendResponse = *config.SendResponse
	}

	if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
		smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
	}

	return nil
}

func (smtp *Smtp) GetPorts() int {
	return smtp.Ports
}

func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
	smtp.initDefaults()

	if !test_mode {
		err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
		if err != nil {
			return err
		}
	}
	smtp.results = results

	return nil
}

func readLine(data byte, offset int) (bool, string, int) {
	q := bytes.Index(data[offset:], byte("\r\n"))
	if q == -1 {
		return false, "", 0
	}
	return true, string(data[offset : offset+q]), offset + q + 2
}

func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {

	defer logp.Recover("ParseSmtp exception")

	priv := smtpPrivateData{}
	if private != nil {
		var ok bool
		priv, ok = private.(smtpPrivateData)
		if !ok {
			priv = smtpPrivateData{}
		}
	}

	if priv.Data[dir] == nil {
		priv.Data[dir] = &SmtpStream{
			tcptuple: tcptuple,
			data:     pkt.Payload,
			message:  &SmtpMessage{Ts: pkt.Ts},
		}
	} else {
		// concatenate bytes
		priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
		if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
			logp.Debug("smtp", "Stream data too large, dropping TCP stream")
			priv.Data[dir] = nil
			return priv
		}
	}

	stream := priv.Data[dir]
	for len(stream.data) > 0 {
		if stream.message == nil {
			stream.message = &SmtpMessage{Ts: pkt.Ts}
		}

		ok, complete := stmpMessageParser(priv.Data[dir])
		if !ok {
			// drop this tcp stream. Will retry parsing with the next
			// segment in it
			priv.Data[dir] = nil
			logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
			return priv
		}

		if complete {
			smtp.messageComplete(tcptuple, dir, stream)
		} else {
			logp.Debug("smtp","still wait message...")
			// wait for more data
			break
		}
	}

	return priv
}

func (smtp *Smtp) ConnectionTimeout() time.Duration {
	return smtp.transactionTimeout
}

func stmpMessageParser(s *SmtpStream) (bool, bool) {

	var value string=""

	for s.parseOffset < len(s.data) {


		logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))


		if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {

			logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))

			found, line, off := readLine(s.data, s.parseOffset)
			if !found {
				return true, false
			}

			value = line[1:]
			logp.Debug("smtp", "value  %s", value)

			s.parseOffset = off
		} else {
			logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
			return false, false
		}
	}

	return true, false
}

func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
	logp.Info("smtp","handle smtp message...")

	//TODO

}

// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {

	logp.Info("smtp","message completed...")

	// all ok, ship it
	msg := stream.data[stream.message.start:stream.message.end]

	if !stream.message.IgnoreMessage {
		handleSmtp(smtp, stream.message, tcptuple, dir, msg)
	}

	// and reset message
	stream.PrepareForNewMessage()
}

func (stream *SmtpStream) PrepareForNewMessage() {
	logp.Info("smtp","prepare for new message...")

	stream.data = stream.data[stream.parseOffset:]
	stream.parseOffset = 0
	stream.isClient = false
	stream.message = nil
}



func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {

	defer logp.Recover("GapInStream(smtp) exception")

	if private == nil {
		return private, false
	}

	return private, true
}

func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

	logp.Info("smtp","stream closed...")

	// TODO: check if we have data pending and either drop it to free
	// memory or send it up the stack.
	return private
}

现在切换到命令行,编译一下
cd ~/go/src/github.com/elastic/beats/packetbeat
make

编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)
./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

运行查看控制台输出结果:
➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG  Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG  Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG  Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG  Unexpected message starting with From: "Gurpartap Singh" 
To: 
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...

成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。 留待下回分解。

Packetbeat协议扩展开发教程(2)

Beatsmedcl 发表了文章 • 1 个评论 • 8664 次浏览 • 2016-01-15 18:23 • 来自相关话题

书接上回:http://elasticsearch.cn/article/48 我们打开Packetbeat项目,看看里面长什么样: 现在beats项目都合并在一起了,第一级可以看到各个子项目: /libbeat: 公共依赖; /filebeat: 替代Logstash-forwarder,处理日志类型数据; /packetbeat: 本文扩展重点,网络抓包; /topbeat: 监控系统性能; /winlogbeat: 监控windows下面的日志信息; /vender: 依赖的第三方库; /tests: 用于测试的pcamp抓包文件,非常有用; /scripts: 一些用于开发和测试的Docker脚本文件; 现在重点看看/packetbeat下面目录都有些什么: /packetbeat/main.go: 启动入口,里面没有什么逻辑; /packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册; /packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体; /packetbeat/debian/: debian打包相关; /packetbeat/decoder/: 解码类,网络传输层包的解码; /packetbeat/docs/: 项目的相关文档; /packetbeat/etc/: 示例配置文件; /packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类; /packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP; /packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options; /packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本; 知道项目的大概架构就知道从哪下手了,下节分解。

Packetbeat协议扩展开发教程(1)

Beatsmedcl 发表了文章 • 1 个评论 • 10177 次浏览 • 2015-12-30 21:02 • 来自相关话题

Packetbeat(https://www.elastic.co/products/beats/packetbeat) 是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。 开发环境: 1.Go语言 Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/。 2.Git 源码管理,相信大家都比较熟悉了。 3.Tcpdump *nix下的抓包分析,可选,用于调试。 4.Mac本一台 Windows太伤,不建议。 5.IDE 推荐idea,其它只要你顺手都行。 这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。 A.源码签出 登陆Github打开https://github.com/elastic/beats  fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat 
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/ 
cd $GOPATH/src/github.com/elastic

#签出源码
git clone https://github.com/elastic/beats.git
cd beats

#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git

#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master

#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat

#切换到packetbeat模块
cd packetbeat

#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep

#编译
make
编译出来的文件:packetbeat就在根目录 现在我们测试一下 修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。 packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html 
output:
  elasticsearch:
    enabled: true
    hosts: ["localhost:9200"]
    username: "tribe_user"
    password: "tribe_user"
现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml  -d "publish"
介绍一下常用的参数: -N dry run模式,不实际output存储日志 -e 控制台输出调试日志 -d 仅显示对应logger的日志 好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG  Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG  Publish: {
  "@timestamp": "2015-12-29T14:24:39.709Z",
  "beat": {
    "hostname": "medcls-MacBook.local",
    "name": "medcls-MacBook.local"
  },
  "bytes_in": 31,
  "bytes_out": 115,
  "client_ip": "192.168.3.10",
  "client_port": 53669,
  "client_proc": "",
  "client_server": "",
  "count": 1,
  "direction": "out",
  "dns": {
    "additionals_count": 0,
    "answers": [
      {
        "class": "IN",
        "data": "www.a.shifen.com",
        "name": "sp2.baidu.com",
        "ttl": 333,
        "type": "CNAME"
      }
    ],
    "answers_count": 1,
    "authorities": [
      {
        "class": "IN",
        "data": "ns1.a.shifen.com",
        "expire": 86400,
        "minimum": 3600,
        "name": "a.shifen.com",
        "refresh": 5,
        "retry": 5,
        "rname": "baidu_dns_master.baidu.com",
        "serial": 1512240003,
        "ttl": 12,
        "type": "SOA"
      }
    ],
    "authorities_count": 1,
    "flags": {
      "authoritative": false,
      "recursion_allowed": true,
      "recursion_desired": true,
      "truncated_response": false
    },
    "id": 7435,
    "op_code": "QUERY",
    "question": {
      "class": "IN",
      "name": "sp2.baidu.com",
      "type": "AAAA"
    },
    "response_code": "NOERROR"
  },
  "ip": "192.168.3.1",
  "method": "QUERY",
  "port": 53,
  "proc": "",
  "query": "class IN, type AAAA, sp2.baidu.com",
  "resource": "sp2.baidu.com",
  "responsetime": 18,
  "server": "",
  "status": "OK",
  "transport": "udp",
  "type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG  Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG  async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG  output worker: publish 2 events
然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29  5 1   135  0 561.2kb 561.2kb
至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。

Day15:Beats是什么东西?

Adventmedcl 发表了文章 • 5 个评论 • 11395 次浏览 • 2015-12-17 22:34 • 来自相关话题

Advent接力传到我这里了,今天我给大家介绍一下Beats,刚好前几天也有好多人问我它是干嘛的,之前的上海我有分享过Beats的内容,PPT在这里: https://pan.baidu.com/s/1eS157 ... -6-18  事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
  1. PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
  2. TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
  3. FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
  4. 。。。
所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。   另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。   关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。   随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats   现在社区已经提交了的Beats: https://www.elastic.co/guide/e ... .html   明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦   今天的Advent就这些吧。 Advent接力活动,规则:http://elasticsearch.cn/article/20