logstash

logstash

logstash 给elasticsearch 发送自定义日志数据,出现错误:Could not index event to Elasticsearch...有大神知道问题所在吗?

Logstashsailershen 回复了问题 • 4 人关注 • 4 个回复 • 1446 次浏览 • 1 天前 • 来自相关话题

ES集群同步数据以及使用

ElasticsearchGod_lockin 回复了问题 • 3 人关注 • 2 个回复 • 52 次浏览 • 2 天前 • 来自相关话题

logstash 接收 http 数据

Logstashrochy 回复了问题 • 2 人关注 • 3 个回复 • 104 次浏览 • 6 天前 • 来自相关话题

logstash从mysql同步数据到es

Logstashlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 121 次浏览 • 2018-10-13 10:50 • 来自相关话题

logstash 分割问题

Logstashchzhty001 回复了问题 • 2 人关注 • 2 个回复 • 74 次浏览 • 2018-10-12 16:56 • 来自相关话题

logstash 输出乱码

Logstashzqc0512 回复了问题 • 3 人关注 • 2 个回复 • 1551 次浏览 • 2018-09-29 08:45 • 来自相关话题

logstash截取字段前几个字符

Logstashrochy 回复了问题 • 2 人关注 • 1 个回复 • 78 次浏览 • 2018-09-27 15:23 • 来自相关话题

logstash日志重复,每次修改文件,都是从头读取一遍

Logstashlibinjale2008 回复了问题 • 3 人关注 • 3 个回复 • 472 次浏览 • 2018-09-26 16:53 • 来自相关话题

logstash-input-http

Logstashrochy 回复了问题 • 2 人关注 • 1 个回复 • 95 次浏览 • 2018-09-26 10:04 • 来自相关话题

请教下 filebeats-logstash-elasticsearch 实现日志收集并展示的方案

Elasticsearchlei13427312420 回复了问题 • 3 人关注 • 2 个回复 • 517 次浏览 • 2018-09-25 17:44 • 来自相关话题

logstash6.3配置文件的问题

Logstashrochy 回复了问题 • 3 人关注 • 2 个回复 • 354 次浏览 • 2018-09-25 13:06 • 来自相关话题

logstash到ES的数据更新问题

Logstashlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 212 次浏览 • 2018-09-24 19:47 • 来自相关话题

logstash mutate 插件 split 功能的使用

回复

Logstashhelloworld1128 发起了问题 • 1 人关注 • 0 个回复 • 111 次浏览 • 2018-09-21 21:36 • 来自相关话题

logstash大并发写入es报错

Logstashzqc0512 回复了问题 • 2 人关注 • 2 个回复 • 154 次浏览 • 2018-09-19 16:50 • 来自相关话题

logstash file插件有办法读取中文路劲的文件吗?

回复

Logstashchienx 发起了问题 • 1 人关注 • 0 个回复 • 81 次浏览 • 2018-09-19 10:30 • 来自相关话题

条新动态, 点击查看
kennywu76

kennywu76 回答了问题 • 2014-12-02 18:12 • 2 个回复 不感兴趣

logstash获取时间的问题

赞同来自:

产生这个错误的原因是系统的locale设置不是english,因此logstash用到的joda无法理解Dec这样的月份格式。

解决办法为在date filter里加一个locale=>"en"的设置:
filter {
d... 显示全部 »
产生这个错误的原因是系统的locale设置不是english,因此logstash用到的joda无法理解Dec这样的月份格式。

解决办法为在date filter里加一个locale=>"en"的设置:
filter {
date {
locale => "en"
..............
}
}
can

can 回答了问题 • 2014-12-23 19:42 • 3 个回复 不感兴趣

请教个关于logstash output 到es protocol的问题

赞同来自:

感谢@wood大神的指点,我这里总结下哈!
理论上来说,transport没有http的header带来的额外处理开销,所以速度应该要比http快。
node protocol和transport protocol的区别,主要就是node的话,logstash... 显示全部 »
感谢@wood大神的指点,我这里总结下哈!
理论上来说,transport没有http的header带来的额外处理开销,所以速度应该要比http快。
node protocol和transport protocol的区别,主要就是node的话,logstash会成为一个ES node加入集群,理论上性能比transport protocol更好,因为数据少一层转发。但是集群规模大了后,node protocol有一定副作用。 因为每个node上都有集群的状态信息,集群大了后,任何集群状态信息的改变都会在所有node同步,node数量越多,同步的代价越高
关于ES nodes过多可能导致的集群水平扩展问题可以参照这里:
http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/finite-scale.html
ES的transport protocol支持多个host,只是logstash的插件不支持,官网java client api有demo,依葫芦画瓢,改下logstash的ES插件就可以了:
http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/client.html
下面是http与transport比较的讨论帖,有兴趣的也可以看看,多了解了解:
http://elasticsearch-users.115913.n3.nabble.com/TransportClient-vs-HTTP-TransportClient-advantages-td4023014.html
差不多就这些了,希望对后面的同学有帮助!

ES集群同步数据以及使用

回复

ElasticsearchGod_lockin 回复了问题 • 3 人关注 • 2 个回复 • 52 次浏览 • 2 天前 • 来自相关话题

logstash 接收 http 数据

回复

Logstashrochy 回复了问题 • 2 人关注 • 3 个回复 • 104 次浏览 • 6 天前 • 来自相关话题

logstash从mysql同步数据到es

回复

Logstashlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 121 次浏览 • 2018-10-13 10:50 • 来自相关话题

logstash 分割问题

回复

Logstashchzhty001 回复了问题 • 2 人关注 • 2 个回复 • 74 次浏览 • 2018-10-12 16:56 • 来自相关话题

logstash 输出乱码

回复

Logstashzqc0512 回复了问题 • 3 人关注 • 2 个回复 • 1551 次浏览 • 2018-09-29 08:45 • 来自相关话题

logstash截取字段前几个字符

回复

Logstashrochy 回复了问题 • 2 人关注 • 1 个回复 • 78 次浏览 • 2018-09-27 15:23 • 来自相关话题

logstash日志重复,每次修改文件,都是从头读取一遍

回复

Logstashlibinjale2008 回复了问题 • 3 人关注 • 3 个回复 • 472 次浏览 • 2018-09-26 16:53 • 来自相关话题

logstash-input-http

回复

Logstashrochy 回复了问题 • 2 人关注 • 1 个回复 • 95 次浏览 • 2018-09-26 10:04 • 来自相关话题

请教下 filebeats-logstash-elasticsearch 实现日志收集并展示的方案

回复

Elasticsearchlei13427312420 回复了问题 • 3 人关注 • 2 个回复 • 517 次浏览 • 2018-09-25 17:44 • 来自相关话题

logstash6.3配置文件的问题

回复

Logstashrochy 回复了问题 • 3 人关注 • 2 个回复 • 354 次浏览 • 2018-09-25 13:06 • 来自相关话题

logstash到ES的数据更新问题

回复

Logstashlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 212 次浏览 • 2018-09-24 19:47 • 来自相关话题

logstash mutate 插件 split 功能的使用

回复

Logstashhelloworld1128 发起了问题 • 1 人关注 • 0 个回复 • 111 次浏览 • 2018-09-21 21:36 • 来自相关话题

logstash大并发写入es报错

回复

Logstashzqc0512 回复了问题 • 2 人关注 • 2 个回复 • 154 次浏览 • 2018-09-19 16:50 • 来自相关话题

logstash file插件有办法读取中文路劲的文件吗?

回复

Logstashchienx 发起了问题 • 1 人关注 • 0 个回复 • 81 次浏览 • 2018-09-19 10:30 • 来自相关话题

通过 metadata 使logstash配置更简洁

LogstashLeon J 发表了文章 • 0 个评论 • 141 次浏览 • 2018-09-04 13:17 • 来自相关话题

从Logstash 1.5开始,我们可以在logstash配置中使用metadata。metadata不会在output中被序列化输出,这样我们便可以在metadata中添加一些临时的中间数据,而不需要去删除它。

我们可以通过以下方式来访问metadata:

[@metadata][foo]

用例

假设我们有这样一条日志:

[2017-04-01 22:21:21] production.INFO: this is a test log message by leon

我们可以在filter中使用grok来做解析:

grok {
      match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{DATA:env}\.%{DATA:log_level}: %{DATA:content}" }
    }

解析的结果为

{
      "env" => "production",
      "timestamp" => "2017-04-01 22:21:21",
      "log_level" => "INFO",
      "content" => "{\"message\":\"[2017-04-01 22:21:21] production.INFO: this is a test log message by leon\"}"
}

假设我们希望

  1. 能把log_level为INFO的日志丢弃掉,但又不想让该字段出现在最终的输出中
  2. 输出的索引名中能体现出env,但也不想让该字段出现在输出结果里

对于1,一种方案是在输出之前通过mutate插件把不需要的字段删除掉,但是一旦这样的处理多了,会让配置文件变得“不干净”。

通过 metadata,我们可以轻松地处理这些问题:

grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{DATA:[@metadata][env]}\.%{DATA:[@metadata][log_level]}: %{DATA:content}" }
}

if [@metadata][log_level] == "INFO"{
    drop{}    
}

output{
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "%{[@metadata][env]}-log-%{+YYYY.MM}"
        document_type => "_doc"
    }
}

除了简化我们的配置文件、减少冗余字段意外,同时也能提高logstash的处理速度。

Elasticsearch input插件

有些插件会用到metadata这个特性,比如elasticsearch input插件:

input {
  elasticsearch {
    host => "127.0.0.1"
    # 把 ES document metadata (_index, _type, _id) 包存到 @metadata 中
    docinfo_in_metadata => true
  }
}

filter{
    ......
}

output {
  elasticsearch {
    document_id => "%{[@metadata][_id]}"
    index => "transformed-%{[@metadata][_index]}"
    type => "%{[@metadata][_type]}"
  }
}

调试

一般来说metadata是不会出现在输出中的,除非使用 rubydebug codec 的方式输出:

output { 
  stdout { 
    codec  => rubydebug {
      metadata => true
    }
  }
}

日志经过处理后输出中会包含:

{
    ....,
    "@metadata" => {
        "env" => "production",
        "log_level" => "INFO"
    }
}

总结

由上可见,metadata提供了一种简单、方便的方式来保存中间数据。这样一方面减少了logstash配置文件的复杂性:避免调用remove_field,另一方面也减少了输出中的一些不必要的数据。通过这篇对metadata的介绍,希望能对大家有所帮助。

elasticTalk,qrcode

ET001 不可不掌握的 Logstash 使用技巧

Logstashrockybean 发表了文章 • 3 个评论 • 389 次浏览 • 2018-07-21 11:53 • 来自相关话题

Logstash 是 Elastic Stack 中功能最强大的 ETL 工具,相较于 beats 家族,虽然它略显臃肿,但是强在功能丰富、处理能力强大。大家在使用的过程中肯定也体验过其启动时的慢吞吞,那么有什么办法可以减少等待 Logstash 的启动时间,提高编写其处理配置文件的效率呢?本文给大家推荐一个小技巧,帮助大家解决如下两个问题,让大家更好地与这个笨重的大家伙相处。

  1. 减少 Logstash 重启的次数,也就节省宝贵的时间
  2. 方便快捷地向 Logstash 输入需要处理的内容

1. 打开 reload 配置开关

Logstash 启动的时候可以加上 -r 的参数来做到配置文件热加载,效果是:

  • 当你修改了配置文件后,无需重启 Logstash 即可让新配置文件生效。

它的含义如下:

当你写好配置文件,比如 test.conf ,启动命令如下:

bin/logstash -f test.conf -r

启动完毕,修改 test.conf 的内容并保存后,过 1 秒钟,你会发现 Logstash 端有类似如下日志输出(注意红色框标记的部分),此时说明 reload 的成功。

如果你修改的配置文件有错误,会看到报错的日志,你可以根据错误提示修改。

至此,第一个问题解决!

2. 使用 HTTP INPUT

编写配置文件的另一个痛点是需要针对不同格式的输入内容进行详细的测试,以防解析报错的情况出现。此时大家常用标准输入来解决这个问题(stdin input),但是标准输入对于文字编辑支持不太友好,而且配置文件热更新的功能也不支持标准输入。

在这里向大家推荐使用 http input 插件,配置如下:

input{
    http{
        port => 7474
        codec => "json"
    }
}

然后大家再用自己喜欢的 http 请求工具,比如 POSTMan、Insomnia 等向 http://loclahost:7474发送待测试内容即可,如下是 Insomnia 的截图。

至此,第二个问题也解决了。

3. 总结

相信看到这里,大家一定是跃跃欲试了,赶紧打开电脑,找到 Logstash,然后编辑 test.conf,输入如下内容:

input{
    http{
        port => 7474
        codec => "json"
    }
}

filter{

}

output{
        stdout{
        codec => rubydebug{
            metadata => true
        }
    }
}

然后执行启动命令:

bin/logstash -f test.conf -r

打开 Insomnia ,输入要测试的内容,点击发送,开始舒爽流畅的配置文件编写之旅吧!

logstash-filter-elasticsearch的简易安装

Logstashziyou 发表了文章 • 0 个评论 • 341 次浏览 • 2018-05-29 09:38 • 来自相关话题

不同版本的logstash集成的插件不一样,在5.6版本就未集成logstash-filter-elasticsearch插件,所以需要自己安装。 官方提供的方法因为需要联网,并且需要调整插件管理源,比较麻烦,针对logstash-filter-elasticsearch插件,使用下面这种方式安装。 logstash-filter-elasticsearch插件安装 1、在git上下载logstash-filter-elasticsearch压缩包,logstash-filter-elasticsearch.zip, 2、在logstash的目录下新建plugins目录,解压logstash-filter-elasticsearch.zip到此目录下。 3、在logstash目录下的Gemfile中添加一行:
gem "logstash-filter-elasticsearch", :path => "./plugins/logstash-filter-elasticsearch"
4、重启logstash即可。   此方法适用logstash-filter-elasticsearch,但不适用全部logstash插件。

我想只记录这一行,如果写logstash规则

Logstashsunjj 发表了文章 • 0 个评论 • 650 次浏览 • 2018-03-15 10:26 • 来自相关话题

2018-03-14 22:23:56,833 ERROR [FrontShopController.java:45] : ==dianchou.app.boss.pageController.FrontShopControl lerjava.lang.NullPointerException       at dianchou.app.boss.pageController.FrontShopController.projectDetail(FrontShopController.java:40)       at sun.reflect.GeneratedMethodAccessor495.invoke(Unknown Source)       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)       at java.lang.reflect.Method.invoke(Method.java:606)       at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)

kibana使用geoip插件展示地图热力图

Kibanaziyou 发表了文章 • 7 个评论 • 3882 次浏览 • 2018-02-09 16:56 • 来自相关话题

geoip1.png

上图是我们最终的地图效果。

总体步骤:

一、使用logstash geoip插件解析IP字段;

二、配置geoip.location字段为geo_point类型。

三、使用kibana的Coordinate map作图。

具体步骤:

一、解析IP字段

使用logstash的geoip插件 logstash-filter-geoip 解析IP字段,需要在logstash的配置文件中配置geoip的解析配置,配置如下:

geoip {
       source => "ip" //需要解析的IP地址
      }

解析出的效果如下:

"geoip": {
      "city_name": "Wuhan",
      "timezone": "Asia/Shanghai",
      "ip": "117.136.52.200",
      "latitude": 30.5801,
      "country_name": "China",
      "country_code2": "CN",
      "continent_code": "AS",
      "country_code3": "CN",
      "region_name": "Hubei",
      "location": {
        "lon": 114.2734,
        "lat": 30.5801
      },
      "region_code": "42",
      "longitude": 114.2734
    }

备注:这个只是geoip的配置,解析日志的时候需要先解析出ip字段

二、配置geoip字段类型

IP经过logstash解析后就可以使用IP的所有解析信息了,但是如果想要在kibana中作图,就必须把geoip里面的相应信息配置成相应的字段类型,才能够被kibana识别,然后经过聚合作图。 需要配置的字段:geoip.location 需要配置的类型:geo_point 在mapping中的配置为:

"geoip" : {
          "properties" : {
            "location" : {
              "type" : "geo_point",
              "ignore_malformed": "true"  
            }
          }
        }

备注1: ignore_malformed 如果true,格式错误的地理位置被忽略。如果false(默认),格式错误的地理位置引发异常并拒绝整个文档。 此字段需要配置成true,以防地理格式错误导致文档被拒绝。 也可以在所以级别进行设置: "settings": { "index.mapping.ignore_malformed": true }

备注2:需要先设置mapping,再导入数据mapping才会生效,如果先导入数据,再设置mapping,则第二天八点后才会生效(北京时间)。

三、kibana作图

1、在kibana中打开visualize->coordinate map

geoip2.png

2、选择相应的索引进行画图

geoip3.png

3、选择geoip.location作为聚合字段,然后设置Options,调整地图效果即可。

geoip4.png

logstash date不支持BJT时区

Logstashqvitt 发表了文章 • 2 个评论 • 734 次浏览 • 2017-12-18 10:29 • 来自相关话题

人生日历截图20171218102703.png
收集网络设备的日志,但是格式不统一,很苦恼,而且仅仅时间戳格式就有快10种不同格式,测试发现date不支持BJT时区
人生日历截图20171218102703.png
收集网络设备的日志,但是格式不统一,很苦恼,而且仅仅时间戳格式就有快10种不同格式,测试发现date不支持BJT时区

ELK学习资料整理

经验分享lsyoung 发表了文章 • 0 个评论 • 4020 次浏览 • 2017-04-14 10:17 • 来自相关话题

刚开始学习使用ELK,整理一个学习资料列表,当做备忘录。   1.第一个当然是官方文档
  • ElasticSearch参考手册,学习 DSL查询语法,包括查找(query)、过滤(filter)和聚合(aggs)等。
  • Logstash参考手册,学习数据导入,包括输入(input)、过滤(filter)和输出( output)等,主要是filter中如何对复杂文本 进行拆分和类型 转化。
  • Kibana参考手册,使用Kibana提供的前端界面对数据进行快速展示,主要是对Visulize 模块的使
2.中文文档   欢迎补充……

logstash多时间(date)字段文档导入

Logstashlsyoung 发表了文章 • 1 个评论 • 2262 次浏览 • 2017-04-13 20:43 • 来自相关话题

我们都知道如果文档中有表示时间的字段时可以用如下方法将字段转化为date(timestamp)格式导入
date {
        match => [ "submission_time", "yyyyMMdd" ]
    }
但是如果一条记录中有多个字段需要使用date类型呢?有人遇到了 同样的问题How to parse multiple date fields?其实多次使用date{}语法就行了,例如:
date {
        match => [ "submission_time", "UNIX_MS" ]
        target => "@timestamp"
        }
date {
        match => [ "submission_time", "UNIX_MS" ]
        target => "submission_time"
        }
date {
        match => [ "start_time", "UNIX_MS" ]
        target => "start_time"
        }
date {
        match => [ "end_time", "UNIX_MS" ]
        target => "end_time"
        }
查看官方文档后, 发现一直使用的date是省去了参数target的,而target默认值为@timestamp。

如何反向设置es mapping template

Elasticsearchrunc 发表了文章 • 1 个评论 • 2738 次浏览 • 2016-08-31 14:26 • 来自相关话题

比如,由logstash打到es中的数据,除了其中一个字段比如message,其余字段都想设置为not analyzed,这种情况如何设置?貌似目前es只支持设置那些具体的字段为not analyzed,而不能反过来设置啊?
比如,由logstash打到es中的数据,除了其中一个字段比如message,其余字段都想设置为not analyzed,这种情况如何设置?貌似目前es只支持设置那些具体的字段为not analyzed,而不能反过来设置啊?

Day14: percolator接口在logstash中的运用

Advent三斗室 发表了文章 • 0 个评论 • 3550 次浏览 • 2015-12-16 23:10 • 来自相关话题

我们都知道 Elasticsearch 除了普通的 search 接口以外,还有另一个 Percolator 接口,天生用来做实时过滤告警的。但是由于接口比较复杂,在目前的 ELK 体系中不是很容易运用。 而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。 这个插件的设计逻辑是:
  1. 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
  2. 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
  3. Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
  4. 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据
Percolator 接口的用法简单说是这样: 创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:
curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。 对应的 Logstash 配置如下:
filter {
    checksum {
        algorithm => "md5"
        keys => ["message"]
    }
}
output {
    elasticsearch {
        host => "localhost"
        cluster => "my-cluster"
        document_id => "%{logstash_checksum}"
        index => "my-index"
    }
    percolator {
        host => "es-balancer"
        redis_host => ["localhost"]
        document_id => "%{logstash_checksum}"
        pattern_index => "patterns"
    }
}
连接上对应的 Redis,就可以看到报警信息了:
$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。

Day13: ipip.net介绍

Advent三斗室 发表了文章 • 1 个评论 • 3131 次浏览 • 2015-12-16 23:05 • 来自相关话题

Geo 定位在 ELK 应用中是非常重要和有用的一个环节。不幸的是:GeoIP 本身在国内的准确度实在堪忧。高春辉近年成立了一个项目,专注收集细化 IP 地址在国内的数据:http://www.ipip.net。数据分为免费版和收费版两种。项目提供了不少客户端,有趣的是,有社区贡献了一个 Logstash 插件:https://github.com/bittopaz/logstash-filter-ipip。 用法很简单:
filter {
    ipip {
        source => "clientip"
        target => "ipip"
    }
}
生成的 JSON 数据结构类似下面这样:
{
    "clientip" : "",
    "ipip" : {
        "country" : "",
        "city" : "",
        "carrier" : "",
        "province" : ""
    }
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。 想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。

Day10: 如何处理数组形式的JSON日志

Advent三斗室 发表了文章 • 1 个评论 • 5376 次浏览 • 2015-12-16 22:57 • 来自相关话题

ELK 收集业务日志的来源,除了应用服务器以外,还有很大一部分来自客户端。考虑到客户端网络流量的因素,一般实现上都不会要求实时上报数据,而是攒一批,等到手机连上 WIFI 网络了,再统一发送出来。所以,这类客户端日志一般都有几个特点:
  1. 预先已经记录成 JSON 了;
  2. 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
  3. 一次 POST 会有几 MB 到几十 MB 大小。
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。 假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:
input {
    tcp {
        codec => json
    }
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。
filter {
    split {
        field => "logs"
    }
    date {
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
        remove_fields => ["logs", "timestamp"]
    }
}
这样,就可以得到两个 event 了:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环:
    original_value = event[@field]

    if original_value.is_a?(Array)
        splits = original_value
    elsif original_value.is_a?(String)
        splits = original_value.split(@terminator, -1)
    else
        raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
    end

    return if splits.length == 1

    splits.each do |value|
        next if value.empty?

        event_split = event.clone
        @logger.debug("Split event", :value => value, :field => @field)
        event_split[(@target || @field)] = value
        filter_matched(event_split)

        yield event_split
    end
    event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。

Day9: Elasticsearch template的order

Advent三斗室 发表了文章 • 2 个评论 • 5116 次浏览 • 2015-12-10 01:13 • 来自相关话题

ELK Stack 在入门学习过程中,必然会碰到自己修改定制索引映射(mapping)乃至模板(template)的问题。 这时候,不少比较认真看 Logstash 文档的新用户会通过下面这段配置来制定自己的模板策略:
output {
    elasticsearch {
        host => "127.0.0.1"
        manage_template => true
        template => "/path/to/mytemplate"
        template_name => "myname"
    }
}
然而随后就发现,自己辛辛苦苦修改出来的模板,通过 curl -XGET 'http://127.0.0.1:9200/_template/myname' 看也确实上传成功了,但实际新数据索引创建出来,就是没生效! 这个原因是:Logstash 默认会上传一个名叫 logstash 的模板到 ES 里。如果你在使用上面这个配置之前,曾经运行过 Logstash(一般来说都会),那么 ES 里就已经存在这么一个模板了。你可以curl -XGET 'http://127.0.0.1:9200/_template/logstash' 验证。 这个时候,ES 里就变成有两个模板,logstash 和 myname,都匹配 logstash-* 索引名,要求设置一定的映射规则了。 ES 会按照一定的规则来尝试自动 merge 多个都匹配上了的模板规则,最终运用到索引上:https://www.elastic.co/guide/e ... lates 其中要点就是:template 是可以设置 order 参数的!而不写这个参数,默认的 order 值就是 0。order 值越大,在 merge 规则的时候优先级越高。 所以,解决这个问题的办法很简单:在你自定义的 template 里,加一行,变成这样:
{
    "template" : "logstash-*",
    "order" : 1,
    "settings" : { ... },
    "mappings" : { ... }
}
当然,其实如果只从 Logstash 配置角度出发,其实更简单的办法是:直接修改原来默认的 logstash 模板,然后模板名称也不要改,就好了:
output {
    elasticsearch {
        host => "127.0.0.1"
        manage_template => true
        template_overwrite => true
    }
}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。

Day7: hangout 替代 logstash-input-kafka

Advent三斗室 发表了文章 • 2 个评论 • 6645 次浏览 • 2015-12-08 00:54 • 来自相关话题

用 Logstash 接收 Kafka 里的业务日志再写入 Elasticsearch 已经成为一个常见的选择。但是大多数人随后就会碰到一个问题:logstash-input-kafka 的性能上不去! 这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。 Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。 那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout。 hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:
inputs:
  - Kafka:
    codec: plain
    encoding: UTF8 # defaut UTF8
    topic: 
      app: 2
    consumer_settings:
      group.id: hangout
      zookeeper.connect: 192.168.1.200:2181
      auto.commit.interval.ms: "1000"
      socket.receive.buffer.bytes: "1048576"
      fetch.message.max.bytes: "1048576"
      num.consumer.fetchers: "4"
  - Kafka:
    codec: json
    topic: 
      web: 1
    consumer_settings:
      group.id: hangout
      zookeeper.connect: 192.168.1.201:2181
      auto.commit.interval.ms: "5000"

filters:
  - Grok:
    match:
      - '^(?<logtime>\S+) (?<user>.+) (-|(?<level>\w+)) %{DATA:msg}$'
    remove_fields: ['message']
  - Add:
    fields:
      test: 'abcd'
    if:
      - '<#if message??>true</#if>'
      - '<#if message?contains("liu")>true<#elseif message?contains("warn")>true</#if>'
  - Date:
    src: logtime
    formats:
      - 'ISO8601'
    remove_fields: ['logtime']
  - Lowercase:
    fields: ['user']
  - Add:
    fields:
      me: 'I am ${user}'
  - Remove:
    fields:
      - logtime
  - Trim:
    fields:
      - user
  - Rename:
    fields:
      me: he
      user: she
  - Gsub:
    fields:
      she: ['c','CCC']
      he: ['(^\w+)|(\w+$)','XXX']
  - Translate:
    source: user
    target: nick
    dictionary_path: /tmp/app.dic
  - KV:
    source: msg
    target: kv
    field_split: ' '
    value_split: '='
    trim: '\t\"'
    trimkey: '\"'
    include_keys: ["a","b","xyz","12"]
    exclude_keys: ["b","c"] # b in excluded
    tag_on_failure: "KVfail"
    remove_fields: ['msg']
  - Convert:
    fields:
      cs_bytes: integer
      time_taken: float
  - URLDecode:
    fields: ["query1","query2"]

outputs:
  - Stdout:
    if:
      - '<#if user=="childe">true</#if>'
  - Elasticsearch:
    cluster: hangoutcluster
    hosts:
      - 192.168.1.200
    index: 'hangout-%{user}-%{+YYYY.MM.dd}'
    index_type: logs # default logs
    bulk_actions: 20000 #default 20000
    bulk_size: 15 # default 15 MB
    flush_interval: 10 # default 10 seconds
    concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
  - Kafka:
    broker_list: 192.168.1.200:9092
    topic: test2
其 pipeline 设计和 Logstash 不同的是:整个 filter 和 output 流程,都在 Kafka 的 consumer 线程中完成。所以,并发线程数完全是有 Kafka 的 partitions 设置来控制的。 实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。 想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
  Logstash是一个开源的日志收集管理工具。