是时候用 ES 拯救发际线啦

为什么用java重写logstash

Logstash | 作者 whiletrue | 发布于2017年03月01日 | | 阅读数:14638


写之前这里先打个广告,java版本的logstash已经开源。git地址:https://github.com/dtstack,下面进入正题。


jlogstash性能:

当时袋鼠云的云日志系统的日志接收端是ruby版本的logstash,存储使用的是elasticsearch,前端的展示没有使用原生的kibana,而是自己写了一套前端。

本人是负责日志接收端的logstash开发人员,主要负责基于ruby版本的logstash编写一些公司业务需要的插件。

当时为了提升性能做了各种优化,比如用java重写了一些模块,再用ruby调用这些模块,比如ip的解析模块,但是最终优化的结果只是单机4core、4g的虚拟机每小时最多能处理800万的数据而已(我们的场景跟大部分人一样都是订阅kafka的消息,再经过一些filter(瓶颈主要是这里比较耗cpu)写入elasticsearch)。

因为logstash的核心代码是用ruby语言开发,虽然运行在jruby上,但是由于中间涉及到数据结构的转化,性能跟原生的class运行在jvm上肯定是有所差距的。

所以当时抱着尽可能最大程度上提升性能,更好地满足用户需求的目的,用java重写了logstash,并把需要用到的插件也进行了重写。在同样的4core、4g虚拟机环境下,每小时能处理4000万数据,性能有了近5倍的提升。


下面是java logstash 和 ruby logstash(2.3.2版本)按照logstash官方测试方案做的性能对比:
 

performance.png



git 地址(https://github.com/DTStack/jlo ... sting) 



以上三种场景的处理效率

java版本logstash性能分别是ruby版本logstash的2.99倍、4.15倍、3.49倍。


jlogstash尽可能保证数据不丢失:

ruby 版本的logstash,对保证数据防丢失这块没做太多的设计。


举个列子:数据从kafka消费再output到elasticsearch,一旦elasticsearch集群不可用,ruby logstash会自动重试几次,如果还不成功就会放弃继续消费kafka里的数据,而且重试的动作也是elasticsearch插件自身来完成的,logstash本身并没有对数据防丢失做设计。


而java 版本logstash 的BaseOutput 这个抽象类里面有个failedMsgQueue队列,每个output实例维护一个,output 插件需要自身判断哪些数据失败了,再调用addFailedMsg方法把失败的数据写入到failedMsgQueue队列里。java logstash一旦发现failedMsgQueue里面有数据就会调用sendFailedMsg这个方法来消费这里的数据,直到数据消费完成才会去消费input里的数据。这个逻辑是可以通过consistency这个属性来控制的。该属性默认是关闭的。

还有一点是input和output插件都提供了release方法,这个主要是为了jvm退出时要执行一些动作而设计的。

因为大部分的input和output插件在获取和发送数据时都会先放在一个集合里面,再去慢慢消耗集合里面的数据。这样jvm退出时,插件就可以各自实现自己的逻辑,从而保证jvm退出前,集合里面的数据彻底消耗完。当然如果你强制杀死该进程(kill -9)那就没法保证了。

现在我们的elasticsearch插件已经实现了数据防丢失逻辑,并且已经在我们的生产环境稳定的跑了很长时间了。


jlogstash可以是分布式应用,而不只是单机应用:

我们这边开发了kafkadistributed插件,通过zookeeper把jlogstash变成分布式应用,因为从客户端采集上来的数据分发到kafka集群中数据是无序的,比如要分析jvm1.8 gc日志,cms的gc日志分成5个步骤,这5个步骤是一个整体,中间有可能夹杂着yonggc的日志,这样数据采集到kafka的时候就会乱序,后端又有多台jlogstash去订阅kafka,这样多台单机版本的jlogstash是没法保证一个完整的cms日志进入到同一个jvm上进行统一分析,所以我们通过zookeeper把jlogstash变成分布式应用,会把同一个文件的日志分发到同一个jlogstash上,这样就能保证cms数据的完整性。

最后希望jlogstash能为一些开发者解决一些问题,也希望有更多的人参与到jlogstash的插件开发里来。

注释:有人问jlogstash跟hangout有什么区别,这里就不做说明了,有兴趣的同学可以看看这两个的源码就知道区别了。






 

[尊重社区原创,转载请保留或注明出处]
本文地址:http://elasticsearch.cn/article/137


8 个评论

jlogstash 前期的有部分代码是引用了hangout项目里的代码,里面的有些代码确实引用了也没有加上原作者,这个是本人失误后面会加上,但是看过hangout原代 码的人知道,hangout所适用的范围只是从kafka到elasticsearch这么一种场景,也没法扩展,因为它工作的线程都是取决于kafka的comsumer数量,filter和 output都是在一个线程里面的。而jlogstash的线程模型是input,filter,output都是独立的,这跟ruby版本的logstash是一致的。在github上也加了这个注释。
厉害了,膜拜
hangout早期版本也是模仿logstash为input, filter,output设置独立的线程,连接成一个pipeline。 但后来发现这种架构方式会导致数据在管道中多次拷贝,性能损耗不小。考虑到海量数据的传输处理过程中,kafka基本上是标准配置,hangout作者(childe)才决定改成现在这种架构,舍弃普适性,降低cpu消耗。

我们线上每天要处理700亿/30TB size的日志,通过mesos/marathon运行和管理一批hangout实例,目前看来吞吐量和稳定性都还是不错的。

当然你写的这个版本适用性更广,对比ruby logstash性能提升也很大,值得赞一下。
性能和适用性是要有个平衡,最近我们这边也在尝试用disruptor 来代替现有的队列模型看会不会性能更进一步提升
不说什么是开源精神吧,只说文中讲的设计考虑,在我看来,hangout的改法恰恰是优点,而你们却抄回了缺点。
事实上你们继续跟踪elastic的logstash发展就知道,logstash从2.3版本开始(也就是从这个版本开始彻底放弃了MRI兼容性改走Java化路线了),也改成了filter和output共用一个pipeline worker线程。
类似的还有rsyslog的main queue和action queue设计。
可以说,这才是高性能的恰当做法。
jlogstash一开始的版本是input独立线程,filter和output在同一个线程,这样可以少经过一个queue,性能会好些,但是在一些性能瓶颈出现在filter上时必须要提高filter并发数来提高性能,output的并发也要相应提高,这样有时候对后端的存储集群反而造成压力。后来把input,filter,output都设置成独立的线程,并且每个filter和output线程都会各自维护一个queue,这样主要是防止在高并发下锁竞争加剧导致性能下降;我们也做过测试比filter和output在同一个线程的性能会差些,但是差距比较小。当然每个人设计都有自己的初衷,我们想要的是性能和适用性的一个平衡。
大神,能不能写篇文章说下jlogstash的设计思路,看源码的能力还不够
从我个人角度,你这个库和flumeng差不多啊。。。。。 就是可能mutative哪里差一些。但是扩展flumeng也可以啊。

要回复文章请先登录注册