你不会是程序猿吧?

logstash消费kafka的消息求解

Logstash | 作者 rm | 发布于2017年06月08日 | 阅读数:33430

filebeat 5.4 output kafka-0.10.2.1
产生消息正常。
  kafka:
    hosts: ["127.0.0.1:9092", "127.0.0.1:9093", "127.0.0.1:9094"]
    topic: 'test'
    partition.round_robin:
      reachable_only: true
    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000
logstash 5.4 input kafka-0.10.2.1
消费消息正常。
input {
    kafka {
        bootstrap_servers => "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"
        topics => ["test"]
    auto_offset_reset => "latest"
    }
}
 
但在测试中发现一些问题,求解惑:
1.filebeat和logstash全部停止,删除掉kafka里面test这个topic;
2.启动filebeat运行一阵后停止filebeat;
3.用命令查看kafka相关topic信息,正常;
4.启动logstash消费完test这个topic里面所有的消息后停止logstash;
5.用命令查看kafka相关"logstash"这个group信息(./kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server 127.0.0.1:9092 --describe --group logstash)正常;
6.启动filebeat运行一阵后停止filebeat;
7.重复第5步命令可以看相关topic里面的offset均正常,已积压一定数量的消息;
8.启动logstash消费部分消息后(控制台已输出消息日志)迅速停止logstash;
9.重复第5步命令发现offset没有发生任何变化;
10.启动logstash至所有消息消费完成,查看logstash日志发现消息总数和产生消息总数不匹配,出现部分重复数据;
这个问题如何解决,查看官网没有找到相关配置说明,很郁闷。。。
 
已邀请:

rm

赞同来自:

或者说kafka的消息有至少一次、最多一次、正好一次三种消费模式,logstash-input-kafka默认应该是至少一次的策略,怎么实现正好一次这种策略了。

leighton_buaa

赞同来自:

貌似只能自己实现了,设置auto.commit为false,等消息消费完了之后再commit

kennywu76 - Wood

赞同来自:

插件有一个参数auto_commit_interval_ms设置多久向kafka提交一次消费的offset,默认是5000ms,如果启动后迅速停止logstash,有可能已经消费的数据还没来得及提交offset。  
 
这个是至少一次的消费模式,符合kafka的设计原则。

要回复问题请先登录注册