ET001 不可不掌握的 Logstash 使用技巧

Logstash 是 Elastic Stack 中功能最强大的 ETL 工具,相较于 beats 家族,虽然它略显臃肿,但是强在功能丰富、处理能力强大。大家在使用的过程中肯定也体验过其启动时的慢吞吞,那么有什么办法可以减少等待 Logstash 的启动时间,提高编写其处理配置文件的效率呢?本文给大家推荐一个小技巧,帮助大家解决如下两个问题,让大家更好地与这个笨重的大家伙相处。

  1. 减少 Logstash 重启的次数,也就节省宝贵的时间
  2. 方便快捷地向 Logstash 输入需要处理的内容

1. 打开 reload 配置开关

Logstash 启动的时候可以加上 -r 的参数来做到配置文件热加载,效果是:

  • 当你修改了配置文件后,无需重启 Logstash 即可让新配置文件生效。

它的含义如下:

当你写好配置文件,比如 test.conf ,启动命令如下:

bin/logstash -f test.conf -r

启动完毕,修改 test.conf 的内容并保存后,过 1 秒钟,你会发现 Logstash 端有类似如下日志输出(注意红色框标记的部分),此时说明 reload 的成功。

如果你修改的配置文件有错误,会看到报错的日志,你可以根据错误提示修改。

至此,第一个问题解决!

2. 使用 HTTP INPUT

编写配置文件的另一个痛点是需要针对不同格式的输入内容进行详细的测试,以防解析报错的情况出现。此时大家常用标准输入来解决这个问题(stdin input),但是标准输入对于文字编辑支持不太友好,而且配置文件热更新的功能也不支持标准输入。

在这里向大家推荐使用 http input 插件,配置如下:

input{
    http{
        port => 7474
        codec => "json"
    }
}

然后大家再用自己喜欢的 http 请求工具,比如 POSTMan、Insomnia 等向 http://loclahost:7474发送待测试内容即可,如下是 Insomnia 的截图。

至此,第二个问题也解决了。

3. 总结

相信看到这里,大家一定是跃跃欲试了,赶紧打开电脑,找到 Logstash,然后编辑 test.conf,输入如下内容:

input{
    http{
        port => 7474
        codec => "json"
    }
}

filter{

}

output{
        stdout{
        codec => rubydebug{
            metadata => true
        }
    }
}

然后执行启动命令:

bin/logstash -f test.conf -r

打开 Insomnia ,输入要测试的内容,点击发送,开始舒爽流畅的配置文件编写之旅吧!

继续阅读 »

Logstash 是 Elastic Stack 中功能最强大的 ETL 工具,相较于 beats 家族,虽然它略显臃肿,但是强在功能丰富、处理能力强大。大家在使用的过程中肯定也体验过其启动时的慢吞吞,那么有什么办法可以减少等待 Logstash 的启动时间,提高编写其处理配置文件的效率呢?本文给大家推荐一个小技巧,帮助大家解决如下两个问题,让大家更好地与这个笨重的大家伙相处。

  1. 减少 Logstash 重启的次数,也就节省宝贵的时间
  2. 方便快捷地向 Logstash 输入需要处理的内容

1. 打开 reload 配置开关

Logstash 启动的时候可以加上 -r 的参数来做到配置文件热加载,效果是:

  • 当你修改了配置文件后,无需重启 Logstash 即可让新配置文件生效。

它的含义如下:

当你写好配置文件,比如 test.conf ,启动命令如下:

bin/logstash -f test.conf -r

启动完毕,修改 test.conf 的内容并保存后,过 1 秒钟,你会发现 Logstash 端有类似如下日志输出(注意红色框标记的部分),此时说明 reload 的成功。

如果你修改的配置文件有错误,会看到报错的日志,你可以根据错误提示修改。

至此,第一个问题解决!

2. 使用 HTTP INPUT

编写配置文件的另一个痛点是需要针对不同格式的输入内容进行详细的测试,以防解析报错的情况出现。此时大家常用标准输入来解决这个问题(stdin input),但是标准输入对于文字编辑支持不太友好,而且配置文件热更新的功能也不支持标准输入。

在这里向大家推荐使用 http input 插件,配置如下:

input{
    http{
        port => 7474
        codec => "json"
    }
}

然后大家再用自己喜欢的 http 请求工具,比如 POSTMan、Insomnia 等向 http://loclahost:7474发送待测试内容即可,如下是 Insomnia 的截图。

至此,第二个问题也解决了。

3. 总结

相信看到这里,大家一定是跃跃欲试了,赶紧打开电脑,找到 Logstash,然后编辑 test.conf,输入如下内容:

input{
    http{
        port => 7474
        codec => "json"
    }
}

filter{

}

output{
        stdout{
        codec => rubydebug{
            metadata => true
        }
    }
}

然后执行启动命令:

bin/logstash -f test.conf -r

打开 Insomnia ,输入要测试的内容,点击发送,开始舒爽流畅的配置文件编写之旅吧!

收起阅读 »

logstash5.X 时差8小时问题

在filter中处理
 ruby {   
   code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"   
 }  
 ruby {  
   code => "event.set('@timestamp',event.get('timestamp'))"  
 }  
 mutate {  
   remove_field => ["timestamp"]  
 } 
继续阅读 »
在filter中处理
 ruby {   
   code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"   
 }  
 ruby {  
   code => "event.set('@timestamp',event.get('timestamp'))"  
 }  
 mutate {  
   remove_field => ["timestamp"]  
 }  收起阅读 »

logstash-filter-elasticsearch的简易安装

不同版本的logstash集成的插件不一样,在5.6版本就未集成logstash-filter-elasticsearch插件,所以需要自己安装。

官方提供的方法因为需要联网,并且需要调整插件管理源,比较麻烦,针对logstash-filter-elasticsearch插件,使用下面这种方式安装。

logstash-filter-elasticsearch插件安装

1、在git上下载logstash-filter-elasticsearch压缩包,logstash-filter-elasticsearch.zip,

2、在logstash的目录下新建plugins目录,解压logstash-filter-elasticsearch.zip到此目录下。

3、在logstash目录下的Gemfile中添加一行:
gem "logstash-filter-elasticsearch", :path => "./plugins/logstash-filter-elasticsearch"

4、重启logstash即可。
 
此方法适用logstash-filter-elasticsearch,但不适用全部logstash插件。
继续阅读 »
不同版本的logstash集成的插件不一样,在5.6版本就未集成logstash-filter-elasticsearch插件,所以需要自己安装。

官方提供的方法因为需要联网,并且需要调整插件管理源,比较麻烦,针对logstash-filter-elasticsearch插件,使用下面这种方式安装。

logstash-filter-elasticsearch插件安装

1、在git上下载logstash-filter-elasticsearch压缩包,logstash-filter-elasticsearch.zip,

2、在logstash的目录下新建plugins目录,解压logstash-filter-elasticsearch.zip到此目录下。

3、在logstash目录下的Gemfile中添加一行:
gem "logstash-filter-elasticsearch", :path => "./plugins/logstash-filter-elasticsearch"

4、重启logstash即可。
 
此方法适用logstash-filter-elasticsearch,但不适用全部logstash插件。 收起阅读 »

Grok Debugger

官网的在线调试地址:http://grokdebug.herokuapp.com/
 
Grok Debugger中文站:http://grok.qiexun.net/
 
自己本地搭建:http://blog.51cto.com/fengwan/1758845
继续阅读 »
官网的在线调试地址:http://grokdebug.herokuapp.com/
 
Grok Debugger中文站:http://grok.qiexun.net/
 
自己本地搭建:http://blog.51cto.com/fengwan/1758845 收起阅读 »

logstash输出到文件

使用filebeat采集数据,使用document_type 区分不同的类型的日志 logstash 输入日志到文件,这样方面查看,也方便将怎么相同的服务运行在不同的服务器里面日志汇总

logstash 配置如下

input{
  beats{
    port => 5044
    codec => "json"
  }
}

output{
   if [type]  == "123_server" {
    file {
      path => "/home/logs/123-server.log"
      codec => plain{ charset => "GBK" }
      gzip => true
        }
  }
}

其实需要2个条件,输入的日志尽量保持和原来的日志一样 我这个代码输出全部是乱码,无论怎么修改编码格式 求大神指点下

继续阅读 »

使用filebeat采集数据,使用document_type 区分不同的类型的日志 logstash 输入日志到文件,这样方面查看,也方便将怎么相同的服务运行在不同的服务器里面日志汇总

logstash 配置如下

input{
  beats{
    port => 5044
    codec => "json"
  }
}

output{
   if [type]  == "123_server" {
    file {
      path => "/home/logs/123-server.log"
      codec => plain{ charset => "GBK" }
      gzip => true
        }
  }
}

其实需要2个条件,输入的日志尽量保持和原来的日志一样 我这个代码输出全部是乱码,无论怎么修改编码格式 求大神指点下

收起阅读 »

我想只记录这一行,如果写logstash规则


2018-03-14 22:23:56,833 ERROR [FrontShopController.java:45] : ==dianchou.app.boss.pageController.FrontShopControl
lerjava.lang.NullPointerException
      at dianchou.app.boss.pageController.FrontShopController.projectDetail(FrontShopController.java:40)
      at sun.reflect.GeneratedMethodAccessor495.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)

继续阅读 »

2018-03-14 22:23:56,833 ERROR [FrontShopController.java:45] : ==dianchou.app.boss.pageController.FrontShopControl
lerjava.lang.NullPointerException
      at dianchou.app.boss.pageController.FrontShopController.projectDetail(FrontShopController.java:40)
      at sun.reflect.GeneratedMethodAccessor495.invoke(Unknown Source)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)

收起阅读 »

logstash date不支持BJT时区


人生日历截图20171218102703.png

收集网络设备的日志,但是格式不统一,很苦恼,而且仅仅时间戳格式就有快10种不同格式,测试发现date不支持BJT时区

人生日历截图20171218102703.png

收集网络设备的日志,但是格式不统一,很苦恼,而且仅仅时间戳格式就有快10种不同格式,测试发现date不支持BJT时区

中文值的字段是string的类型吗?我有一个字段类型一直显示unknown

我新增加了一个字段,对应的值是中文的

QQ图片20170928162123.png

 如上图 我的这个字段类型一直是unknown,下图是配置方式:

QQ图片20170928162329.png

 es 是5.4的
logstash 5.4
kibana 5.4
 
 
 
继续阅读 »
我新增加了一个字段,对应的值是中文的

QQ图片20170928162123.png

 如上图 我的这个字段类型一直是unknown,下图是配置方式:

QQ图片20170928162329.png

 es 是5.4的
logstash 5.4
kibana 5.4
 
 
  收起阅读 »

請問該 日誌 如何寫 Grok 匹配。

日誌格式如下
 
<30>Sep 11 11:57:24 dnsmasq-dhcp[15643]: DHCPACK(eth1) 192.168.2.22 1c:77:f6:64:99:d3 android-c5a782dc5af0b478
 
Grok
%{SYSLOG5424PRI:ID}%{CISCOTIMESTAMP:Date} %{URIHOST:Method}%{NAGIOSTIME:EventID}: %{CISCO_REASON:Content} %{IP:IP} %{MAC:MAC} %{HOSTNAME:DevName}
黃色部份可以解析出 字段 但是 後面   %{IP:IP} %{MAC:MAC} %{HOSTNAME:DevName} 解析不出來。
 
Error.gif

 
不知道如何解析 (eth1)  該字段...
 
 
继续阅读 »
日誌格式如下
 
<30>Sep 11 11:57:24 dnsmasq-dhcp[15643]: DHCPACK(eth1) 192.168.2.22 1c:77:f6:64:99:d3 android-c5a782dc5af0b478
 
Grok
%{SYSLOG5424PRI:ID}%{CISCOTIMESTAMP:Date} %{URIHOST:Method}%{NAGIOSTIME:EventID}: %{CISCO_REASON:Content} %{IP:IP} %{MAC:MAC} %{HOSTNAME:DevName}
黃色部份可以解析出 字段 但是 後面   %{IP:IP} %{MAC:MAC} %{HOSTNAME:DevName} 解析不出來。
 
Error.gif

 
不知道如何解析 (eth1)  該字段...
 
  收起阅读 »

用logstash导入ES且自定义mapping时踩的坑

问题发生背景:

1.本来我是使用logstash的默认配置向ES导入日志的。然后很嗨皮,发现一切OK,后来我开始对日志进行聚合统计,发现terms聚合时的key很奇怪,后来查询这奇怪的key,发现这些关键字都是源字符串的一段,而且全部复现场景都是出现"xxxx-xxxxxx"时就会截断,感觉像是分词器搞的鬼。所以想自己定制mapping。下面是原来的logstash配置
output{
elasticsearch{
action => "index"
hosts => ["xxxxxx:9200"]
index => "xxxxx"
document_type => "haha"
}
}



说干就干:

开始四处查阅文档,发现可以定制mapping,很开心。
output{
elasticsearch{
action => "index"
hosts => ["xxx"]
index => "logstashlog"
template => "xx/http-logstash.json"
template_name => "http-log-logstash"
template_overwrite => true
}
stdout{
codec => rubydebug
}

}没有什么一帆风顺:
问题1:
但是我发现我已经上传了自定义的template,但是就是不能生效。
这时知道了,这个要设置order才能覆盖,默认的order是0,必须更大才行,参考
http://elasticsearch.cn/article/21
问题2:
我看到自己上传的template的order已经是1了,怎么还是不生效呢?
原来自己的索引名称不匹配自己的template的名称,所以不能使用,就又用了默认的template。
改成下面后OK,终于生效了。(注意index名称变化)output{
elasticsearch{
action => "index"
hosts => ["xxx"]
index => "http-log-logstash"
document_type => "haha"
template => "xxx/http-logstash.json"
template_name => "http-log-logstash"
template_overwrite => true
}
stdout{
codec => rubydebug
}

}问题3:
发现导入失败,原来自己的时间字符串不能用默认的date的format匹配,
如2017-04-11 00:07:25   不能用 { "type" : "date"} 的默认format匹配,
改成:"format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}, 
这样就能解析了。
一切OK,谢谢社区,谢谢Google(你是我见过的除了书籍和老师之后最提升生产力的工具)

附上我的模板
{ 
"template" : "qmpsearchlog",
"order":1,
"settings" : { "index.refresh_interval" : "60s" },
"mappings" : {
"_default_" : {
"_all" : { "enabled" : false },
"dynamic_templates" : [{
"message_field" : {
"match" : "message",
"match_mapping_type" : "string",
"mapping" : { "type" : "string", "index" : "not_analyzed" }
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : { "type" : "string", "index" : "not_analyzed" }
}
}],
"properties" : {
"@timestamp" : { "type" : "date"},
"@version" : { "type" : "integer", "index" : "not_analyzed" },
"path" : { "type" : "string", "index" : "not_analyzed" },
"host" : { "type" : "string", "index" : "not_analyzed" },
"record_time":{"type":"date","format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"method":{"type":"string","index" : "not_analyzed"},
"unionid":{"type":"string","index" : "not_analyzed"},
"user_name":{"type":"string","index" : "not_analyzed"},
"query":{"type":"string","index" : "not_analyzed"},
"ip":{ "type" : "ip"},
"webbrower":{"type":"string","index" : "not_analyzed"},
"os":{"type":"string","index" : "not_analyzed"},
"device":{"type":"string","index" : "not_analyzed"},
"ptype":{"type":"string","index" : "not_analyzed"},
"serarch_time":{"type":"date","format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"have_ok":{"type":"string","index" : "not_analyzed"},
"legal":{"type":"string","index" : "not_analyzed"}
}
}
}
}



 
继续阅读 »
问题发生背景:

1.本来我是使用logstash的默认配置向ES导入日志的。然后很嗨皮,发现一切OK,后来我开始对日志进行聚合统计,发现terms聚合时的key很奇怪,后来查询这奇怪的key,发现这些关键字都是源字符串的一段,而且全部复现场景都是出现"xxxx-xxxxxx"时就会截断,感觉像是分词器搞的鬼。所以想自己定制mapping。下面是原来的logstash配置
output{
elasticsearch{
action => "index"
hosts => ["xxxxxx:9200"]
index => "xxxxx"
document_type => "haha"
}
}



说干就干:

开始四处查阅文档,发现可以定制mapping,很开心。
output{
elasticsearch{
action => "index"
hosts => ["xxx"]
index => "logstashlog"
template => "xx/http-logstash.json"
template_name => "http-log-logstash"
template_overwrite => true
}
stdout{
codec => rubydebug
}

}没有什么一帆风顺:
问题1:
但是我发现我已经上传了自定义的template,但是就是不能生效。
这时知道了,这个要设置order才能覆盖,默认的order是0,必须更大才行,参考
http://elasticsearch.cn/article/21
问题2:
我看到自己上传的template的order已经是1了,怎么还是不生效呢?
原来自己的索引名称不匹配自己的template的名称,所以不能使用,就又用了默认的template。
改成下面后OK,终于生效了。(注意index名称变化)output{
elasticsearch{
action => "index"
hosts => ["xxx"]
index => "http-log-logstash"
document_type => "haha"
template => "xxx/http-logstash.json"
template_name => "http-log-logstash"
template_overwrite => true
}
stdout{
codec => rubydebug
}

}问题3:
发现导入失败,原来自己的时间字符串不能用默认的date的format匹配,
如2017-04-11 00:07:25   不能用 { "type" : "date"} 的默认format匹配,
改成:"format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}, 
这样就能解析了。
一切OK,谢谢社区,谢谢Google(你是我见过的除了书籍和老师之后最提升生产力的工具)

附上我的模板
{ 
"template" : "qmpsearchlog",
"order":1,
"settings" : { "index.refresh_interval" : "60s" },
"mappings" : {
"_default_" : {
"_all" : { "enabled" : false },
"dynamic_templates" : [{
"message_field" : {
"match" : "message",
"match_mapping_type" : "string",
"mapping" : { "type" : "string", "index" : "not_analyzed" }
}
}, {
"string_fields" : {
"match" : "*",
"match_mapping_type" : "string",
"mapping" : { "type" : "string", "index" : "not_analyzed" }
}
}],
"properties" : {
"@timestamp" : { "type" : "date"},
"@version" : { "type" : "integer", "index" : "not_analyzed" },
"path" : { "type" : "string", "index" : "not_analyzed" },
"host" : { "type" : "string", "index" : "not_analyzed" },
"record_time":{"type":"date","format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"method":{"type":"string","index" : "not_analyzed"},
"unionid":{"type":"string","index" : "not_analyzed"},
"user_name":{"type":"string","index" : "not_analyzed"},
"query":{"type":"string","index" : "not_analyzed"},
"ip":{ "type" : "ip"},
"webbrower":{"type":"string","index" : "not_analyzed"},
"os":{"type":"string","index" : "not_analyzed"},
"device":{"type":"string","index" : "not_analyzed"},
"ptype":{"type":"string","index" : "not_analyzed"},
"serarch_time":{"type":"date","format": "yyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"},
"have_ok":{"type":"string","index" : "not_analyzed"},
"legal":{"type":"string","index" : "not_analyzed"}
}
}
}
}



  收起阅读 »

logstash多时间(date)字段文档导入

我们都知道如果文档中有表示时间的字段时可以用如下方法将字段转化为date(timestamp)格式导入
date {
match => [ "submission_time", "yyyyMMdd" ]
}
但是如果一条记录中有多个字段需要使用date类型呢?有人遇到了 同样的问题How to parse multiple date fields?其实多次使用date{}语法就行了,例如:
date {
match => [ "submission_time", "UNIX_MS" ]
target => "@timestamp"
}
date {
match => [ "submission_time", "UNIX_MS" ]
target => "submission_time"
}
date {
match => [ "start_time", "UNIX_MS" ]
target => "start_time"
}
date {
match => [ "end_time", "UNIX_MS" ]
target => "end_time"
}
查看官方文档后, 发现一直使用的date是省去了参数target的,而target默认值为@timestamp。
继续阅读 »
我们都知道如果文档中有表示时间的字段时可以用如下方法将字段转化为date(timestamp)格式导入
date {
match => [ "submission_time", "yyyyMMdd" ]
}
但是如果一条记录中有多个字段需要使用date类型呢?有人遇到了 同样的问题How to parse multiple date fields?其实多次使用date{}语法就行了,例如:
date {
match => [ "submission_time", "UNIX_MS" ]
target => "@timestamp"
}
date {
match => [ "submission_time", "UNIX_MS" ]
target => "submission_time"
}
date {
match => [ "start_time", "UNIX_MS" ]
target => "start_time"
}
date {
match => [ "end_time", "UNIX_MS" ]
target => "end_time"
}
查看官方文档后, 发现一直使用的date是省去了参数target的,而target默认值为@timestamp。 收起阅读 »

为什么用java重写logstash


写之前这里先打个广告,java版本的logstash已经开源。git地址:https://github.com/dtstack,下面进入正题。


jlogstash性能:

当时袋鼠云的云日志系统的日志接收端是ruby版本的logstash,存储使用的是elasticsearch,前端的展示没有使用原生的kibana,而是自己写了一套前端。

本人是负责日志接收端的logstash开发人员,主要负责基于ruby版本的logstash编写一些公司业务需要的插件。

当时为了提升性能做了各种优化,比如用java重写了一些模块,再用ruby调用这些模块,比如ip的解析模块,但是最终优化的结果只是单机4core、4g的虚拟机每小时最多能处理800万的数据而已(我们的场景跟大部分人一样都是订阅kafka的消息,再经过一些filter(瓶颈主要是这里比较耗cpu)写入elasticsearch)。

因为logstash的核心代码是用ruby语言开发,虽然运行在jruby上,但是由于中间涉及到数据结构的转化,性能跟原生的class运行在jvm上肯定是有所差距的。

所以当时抱着尽可能最大程度上提升性能,更好地满足用户需求的目的,用java重写了logstash,并把需要用到的插件也进行了重写。在同样的4core、4g虚拟机环境下,每小时能处理4000万数据,性能有了近5倍的提升。


下面是java logstash 和 ruby logstash(2.3.2版本)按照logstash官方测试方案做的性能对比:
 

performance.png



git 地址(https://github.com/DTStack/jlo ... sting) 



以上三种场景的处理效率

java版本logstash性能分别是ruby版本logstash的2.99倍、4.15倍、3.49倍。


jlogstash尽可能保证数据不丢失:

ruby 版本的logstash,对保证数据防丢失这块没做太多的设计。


举个列子:数据从kafka消费再output到elasticsearch,一旦elasticsearch集群不可用,ruby logstash会自动重试几次,如果还不成功就会放弃继续消费kafka里的数据,而且重试的动作也是elasticsearch插件自身来完成的,logstash本身并没有对数据防丢失做设计。


而java 版本logstash 的BaseOutput 这个抽象类里面有个failedMsgQueue队列,每个output实例维护一个,output 插件需要自身判断哪些数据失败了,再调用addFailedMsg方法把失败的数据写入到failedMsgQueue队列里。java logstash一旦发现failedMsgQueue里面有数据就会调用sendFailedMsg这个方法来消费这里的数据,直到数据消费完成才会去消费input里的数据。这个逻辑是可以通过consistency这个属性来控制的。该属性默认是关闭的。

还有一点是input和output插件都提供了release方法,这个主要是为了jvm退出时要执行一些动作而设计的。

因为大部分的input和output插件在获取和发送数据时都会先放在一个集合里面,再去慢慢消耗集合里面的数据。这样jvm退出时,插件就可以各自实现自己的逻辑,从而保证jvm退出前,集合里面的数据彻底消耗完。当然如果你强制杀死该进程(kill -9)那就没法保证了。

现在我们的elasticsearch插件已经实现了数据防丢失逻辑,并且已经在我们的生产环境稳定的跑了很长时间了。


jlogstash可以是分布式应用,而不只是单机应用:

我们这边开发了kafkadistributed插件,通过zookeeper把jlogstash变成分布式应用,因为从客户端采集上来的数据分发到kafka集群中数据是无序的,比如要分析jvm1.8 gc日志,cms的gc日志分成5个步骤,这5个步骤是一个整体,中间有可能夹杂着yonggc的日志,这样数据采集到kafka的时候就会乱序,后端又有多台jlogstash去订阅kafka,这样多台单机版本的jlogstash是没法保证一个完整的cms日志进入到同一个jvm上进行统一分析,所以我们通过zookeeper把jlogstash变成分布式应用,会把同一个文件的日志分发到同一个jlogstash上,这样就能保证cms数据的完整性。

最后希望jlogstash能为一些开发者解决一些问题,也希望有更多的人参与到jlogstash的插件开发里来。

注释:有人问jlogstash跟hangout有什么区别,这里就不做说明了,有兴趣的同学可以看看这两个的源码就知道区别了。






 
继续阅读 »

写之前这里先打个广告,java版本的logstash已经开源。git地址:https://github.com/dtstack,下面进入正题。


jlogstash性能:

当时袋鼠云的云日志系统的日志接收端是ruby版本的logstash,存储使用的是elasticsearch,前端的展示没有使用原生的kibana,而是自己写了一套前端。

本人是负责日志接收端的logstash开发人员,主要负责基于ruby版本的logstash编写一些公司业务需要的插件。

当时为了提升性能做了各种优化,比如用java重写了一些模块,再用ruby调用这些模块,比如ip的解析模块,但是最终优化的结果只是单机4core、4g的虚拟机每小时最多能处理800万的数据而已(我们的场景跟大部分人一样都是订阅kafka的消息,再经过一些filter(瓶颈主要是这里比较耗cpu)写入elasticsearch)。

因为logstash的核心代码是用ruby语言开发,虽然运行在jruby上,但是由于中间涉及到数据结构的转化,性能跟原生的class运行在jvm上肯定是有所差距的。

所以当时抱着尽可能最大程度上提升性能,更好地满足用户需求的目的,用java重写了logstash,并把需要用到的插件也进行了重写。在同样的4core、4g虚拟机环境下,每小时能处理4000万数据,性能有了近5倍的提升。


下面是java logstash 和 ruby logstash(2.3.2版本)按照logstash官方测试方案做的性能对比:
 

performance.png



git 地址(https://github.com/DTStack/jlo ... sting) 



以上三种场景的处理效率

java版本logstash性能分别是ruby版本logstash的2.99倍、4.15倍、3.49倍。


jlogstash尽可能保证数据不丢失:

ruby 版本的logstash,对保证数据防丢失这块没做太多的设计。


举个列子:数据从kafka消费再output到elasticsearch,一旦elasticsearch集群不可用,ruby logstash会自动重试几次,如果还不成功就会放弃继续消费kafka里的数据,而且重试的动作也是elasticsearch插件自身来完成的,logstash本身并没有对数据防丢失做设计。


而java 版本logstash 的BaseOutput 这个抽象类里面有个failedMsgQueue队列,每个output实例维护一个,output 插件需要自身判断哪些数据失败了,再调用addFailedMsg方法把失败的数据写入到failedMsgQueue队列里。java logstash一旦发现failedMsgQueue里面有数据就会调用sendFailedMsg这个方法来消费这里的数据,直到数据消费完成才会去消费input里的数据。这个逻辑是可以通过consistency这个属性来控制的。该属性默认是关闭的。

还有一点是input和output插件都提供了release方法,这个主要是为了jvm退出时要执行一些动作而设计的。

因为大部分的input和output插件在获取和发送数据时都会先放在一个集合里面,再去慢慢消耗集合里面的数据。这样jvm退出时,插件就可以各自实现自己的逻辑,从而保证jvm退出前,集合里面的数据彻底消耗完。当然如果你强制杀死该进程(kill -9)那就没法保证了。

现在我们的elasticsearch插件已经实现了数据防丢失逻辑,并且已经在我们的生产环境稳定的跑了很长时间了。


jlogstash可以是分布式应用,而不只是单机应用:

我们这边开发了kafkadistributed插件,通过zookeeper把jlogstash变成分布式应用,因为从客户端采集上来的数据分发到kafka集群中数据是无序的,比如要分析jvm1.8 gc日志,cms的gc日志分成5个步骤,这5个步骤是一个整体,中间有可能夹杂着yonggc的日志,这样数据采集到kafka的时候就会乱序,后端又有多台jlogstash去订阅kafka,这样多台单机版本的jlogstash是没法保证一个完整的cms日志进入到同一个jvm上进行统一分析,所以我们通过zookeeper把jlogstash变成分布式应用,会把同一个文件的日志分发到同一个jlogstash上,这样就能保证cms数据的完整性。

最后希望jlogstash能为一些开发者解决一些问题,也希望有更多的人参与到jlogstash的插件开发里来。

注释:有人问jlogstash跟hangout有什么区别,这里就不做说明了,有兴趣的同学可以看看这两个的源码就知道区别了。






  收起阅读 »