ELK,萌萌哒

kafka集群一节点损坏后 logstash消费停止

Logstash | 作者 qq89267388 | 发布于2020年03月14日 | 阅读数:5496

我的环境如下
 
rsyslog/filebeat --- > provider-logstash (7.4.0) --- > kafka(3节点) --- > consumer logstash(7.2.0 3节点) --- > es(3节点)
 
其中consumer logstash 与 ES  部署在3个服务器上, 每个服务器上有一个consumer和一个ES
 
我的问题是,当kafka的一个节点故障或者停止,此时数据可以正常provider到kafka ,但是consumer的logstash却无法从kafka上获取消息。
 
我修改了metadata_max_age_ms 为3000 即每3秒同步一次metadata数据,以便获取kafka topic分布,但是仍然是这样
 
我通过python的客户端尝试消费,发现在缺少一个kafka节点的情况下消费是正常,说明问题应该是在logstash上
 
下面是我的logstash的配置
 
输入kafka的logstash配置
 
output {

  if [topic]{

    kafka {
      bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092"    #生产者
      topic_id => "%{topic}" 
      codec => json
      metadata_max_age_ms => 3000
    }
  }
}

 
从kafka消费的logstash的配置
input {
  kafka {
    bootstrap_servers => "ip1:9092,ip2:9092,ip3:9092"
    group_id => "logstash-input"
    metadata_max_age_ms => 3000
    auto_offset_reset => "latest"
    consumer_threads => 3 
    decorate_events => true
    topics => [topic列表。。]
    codec => json

  }


 
通过logstash的日志我看到了当kafka节点断开连接后,logstash的报错
 


[2020-03-14T23:48:25,268][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,308][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,308][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,309][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,310][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:25,411][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Discovered group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null)
[2020-03-14T23:48:35,449][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash-input] Group coordinator 故障kafka节点ip:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
[2020-03-14T23:48:49,630][INFO ][org.apache.kafka.clients.FetchSessionHandler] [Consumer clientId=logstash-0, groupId=logstash-input] Error sending fetch request (sessionId=2036742827, epoch=226) to node 0: org.apache.kafka.common.errors.DisconnectException.

 
已邀请:

yang4210

赞同来自:

“我通过python的客户端尝试消费,发现在缺少一个kafka节点的情况下消费是正常,说明问题应该是在logstash上”  ,那么你怎么不直接用python代替 consumer logstash呢?  python写的消费者,速度比logstash快了好几倍。
 

from elasticsearch import Elasticsearch 
from kafka import KafkaConsumer
 
之后批量写入ES

 

要回复问题请先登录注册