使用 nohup 或 disown 如果你要让某个进程运行在后台。

logstash无法消费kafka数据

Logstash | 作者 muou | 发布于2018年07月20日 | 阅读数:9369

logstash无法消费kafka数据.

kafka控制台开启消费是正常消费的。但是logstash 的input kafka就不行。大神帮忙看下
 

1111111111111.jpg

 
配置很简单。启动也不报错,一切正常,就是不输出消费数据。
 
kafka的out是可以的,可以放到kafka。
 
开启logstash的kafka日志模式,打印了一些错误:
[2018-07-20T14:45:44,361][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Sending metadata request (type=MetadataRequest, topics=logstashTopic) to node 10.14.123.119:9092 (id: 2 rack: null)
[2018-07-20T14:45:44,363][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v2 to send METADATA {topics=[logstashTopic]} with correlation id 188 to node 2
[2018-07-20T14:45:44,367][DEBUG][org.apache.kafka.clients.Metadata] Updated cluster metadata version 92 to Cluster(id = 6TvaqPIkQtaoKobVH1zNYA, nodes = [10.14.123.117:9092 (id: 0 rack: null), 10.14.123.119:9092 (id: 2 rack: null), 10.14.123.118:9092 (id: 1 rack: null)], partitions = [Partition(topic = logstashTopic, partition = 0, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])])
[2018-07-20T14:45:44,369][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Sending FindCoordinator request to broker 10.14.123.117:9092 (id: 0 rack: null)
[2018-07-20T14:45:44,373][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v0 to send FIND_COORDINATOR {group_id=logstash} with correlation id 189 to node 0
[2018-07-20T14:45:44,380][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Received FindCoordinator response ClientResponse(receivedTimeMs=1532069144379, latencyMs=6, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=0, clientId=logstash-0, correlationId=189), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
[2018-07-20T14:45:44,386][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Group coordinator lookup failed: The coordinator is not available.
[2018-07-20T14:45:44,387][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Coordinator discovery failed, refreshing metadata
[2018-07-20T14:45:44,468][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Sending metadata request (type=MetadataRequest, topics=logstashTopic) to node 10.14.123.119:9092 (id: 2 rack: null)
[2018-07-20T14:45:44,468][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v2 to send METADATA {topics=[logstashTopic]} with correlation id 190 to node 2
[2018-07-20T14:45:44,474][DEBUG][org.apache.kafka.clients.Metadata] Updated cluster metadata version 93 to Cluster(id = 6TvaqPIkQtaoKobVH1zNYA, nodes = [10.14.123.119:9092 (id: 2 rack: null), 10.14.123.117:9092 (id: 0 rack: null), 10.14.123.118:9092 (id: 1 rack: null)], partitions = [Partition(topic = logstashTopic, partition = 0, leader = 2, replicas = [2], isr = [2], offlineReplicas = [])])
[2018-07-20T14:45:44,478][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Sending FindCoordinator request to broker 10.14.123.118:9092 (id: 1 rack: null)
[2018-07-20T14:45:44,540][DEBUG][org.apache.kafka.clients.NetworkClient] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v0 to send FIND_COORDINATOR {group_id=logstash} with correlation id 191 to node 1
[2018-07-20T14:45:44,547][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Received FindCoordinator response ClientResponse(receivedTimeMs=1532069144547, latencyMs=66, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=0, clientId=logstash-0, correlationId=191), responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null)))
[2018-07-20T14:45:44,549][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Group coordinator lookup failed: The coordinator is not available.
[2018-07-20T14:45:44,550][DEBUG][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Coordinator discovery failed, refreshing metadata
已邀请:

要回复问题请先登录注册