ELK 收集业务日志的来源,除了应用服务器以外,还有很大一部分来自客户端。考虑到客户端网络流量的因素,一般实现上都不会要求实时上报数据,而是攒一批,等到手机连上 WIFI 网络了,再统一发送出来。所以,这类客户端日志一般都有几个特点:
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
- 预先已经记录成 JSON 了;
- 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
- 一次 POST 会有几 MB 到几十 MB 大小。
在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。
假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:input {
tcp {
codec => json
}
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。filter {
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
remove_fields => ["logs", "timestamp"]
}
}
这样,就可以得到两个 event 了:{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环: original_value = event[@field]
if original_value.is_a?(Array)
splits = original_value
elsif original_value.is_a?(String)
splits = original_value.split(@terminator, -1)
else
raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
end
return if splits.length == 1
splits.each do |value|
next if value.empty?
event_split = event.clone
@logger.debug("Split event", :value => value, :field => @field)
event_split[(@target || @field)] = value
filter_matched(event_split)
yield event_split
end
event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
[尊重社区原创,转载请保留或注明出处]
本文地址:http://elasticsearch.cn/article/23
本文地址:http://elasticsearch.cn/article/23
1 个评论
能问下大神,json格式的日志,怎么做图表处理呢?设置的时候k,v是1对1的,显示不出效果