使用 shuf 来打乱一个文件中的行或是选择文件中一个随机的行。

ELK日志回填问题

默认分类 | 作者 BirdZhang | 发布于2017年03月03日 | 阅读数:5652

现在架构是 filebeat - > kafka -> logstash -> elasticsearch
kafka保留消息时间为24小时
问题:
    如果一段时间内数据采集有遗漏,再想把数据补充回去,并保证数据几乎不重复,有什么方式?
 
想法:
    elastic查询出指定时间段的offset,删除指定时间段的,补全数据后logstash根据offset值重新消费
 
但是上面的想法实现起来太困难,不管是从kafka还是elastic获取offset都不靠谱,而且重新消费也消耗性能,目前ye也没找到logstash重新消费的方式
已邀请:

BirdZhang

赞同来自: leighton_buaa medcl

再次感谢 @leighton_buaa
总结一下吧,logstash 的filter跟out配置如下:
应该用fingerprint也可以
filter {
json {
source => "message"
}
ruby {
code => "require 'digest/md5';event.set('[@metadata][computed_id]',Digest::MD5.hexdigest(event.get('message') + event.get('type') + event.get('source')))"
}
...
}
output {
elasticsearch {
index => "accesslog-%{+YYYY.MM.dd}"
flush_size => 60000
document_id => "%{[@metadata][computed_id]}"
idle_flush_time => 10
}
}

leighton_buaa

赞同来自:

我觉得不用那么麻烦,可以在logstash -> elasticsearch 这一段控制
每条日志都指定一个字段(例如timestamp)作为doc ID索引到es中,当你的补全数据入库时,如果es中没有该日志,那就是添加,如果有就会覆盖原有日志记录。

要回复问题请先登录注册