找到问题的解决办法了么?

elk +kafka集群,logstash无法将Kafka中的数据写入es,全程无报错

Logstash | 作者 xsq95 | 发布于2019年09月26日 | 阅读数:2923

elk版本7.3.2  kafka版本是2.12
之前是6.4.2时,也无法写入,后来换了最新版。不经过kafka,elk一切正常,可是一旦加了kafka就无法写入数据了
 
filebeat配置
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages

output.kafka:
enabled: true
hosts: ["172.23.0.41:9092", "172.23.0.42:9092", "172.23.0.43:9092"]
topic: 'kafka'


filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 3
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~

 
 
logstash配置
input {
kafka {
bootstrap_servers=> "172.23.0.41:9092"
topics => "kafka"
#group_id => "logstash"
consumer_threads => 5
decorate_events => true
auto_offset_reset => "latest"
codec => "json"
}
}

output {
stdout {
codec => "json"
}

elasticsearch {
hosts => ["172.23.0.71:9200","172.23.0.72:9200","172.23.0.73:9200"]
index => "kafka-%{+YYYY.MM.dd}"

 
 
 
 
 
 
启动logstash后,没用报错,es里也没有新建index(没有写入)systemctl restart logstash
[root@elk6 conf.d]# tail /var/log/messages
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,063][WARN ][logstash.runner          ] SIGTERM received. Shutting down.
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,155][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,155][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,155][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,156][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,156][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,344][INFO ][logstash.javapipeline    ] Pipeline terminated {"pipeline.id"=>"main"}
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,681][INFO ][logstash.runner          ] Logstash shut down.
Sep 26 08:33:40 elk6 systemd: Started logstash.
Sep 26 08:33:40 elk6 systemd: Starting logstash...
[root@elk6 conf.d]# tailf /var/log/messages
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,063][WARN ][logstash.runner          ] SIGTERM received. Shutting down.
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,155][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,155][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,155][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,156][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,156][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Sending LeaveGroup request to coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,344][INFO ][logstash.javapipeline    ] Pipeline terminated {"pipeline.id"=>"main"}
Sep 26 08:33:40 elk6 logstash: [2019-09-26T08:33:40,681][INFO ][logstash.runner          ] Logstash shut down.
Sep 26 08:33:40 elk6 systemd: Started logstash.
Sep 26 08:33:40 elk6 systemd: Starting logstash...
Sep 26 08:33:57 elk6 logstash: Thread.exclusive is deprecated, use Thread::Mutex
Sep 26 08:34:00 elk6 logstash: Sending Logstash logs to /home/logstash/logs which is now configured via log4j2.properties
Sep 26 08:34:01 elk6 logstash: [2019-09-26T08:34:01,578][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
Sep 26 08:34:01 elk6 logstash: [2019-09-26T08:34:01,598][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.3.2"}
Sep 26 08:34:03 elk6 logstash: [2019-09-26T08:34:03,889][INFO ][org.reflections.Reflections] Reflections took 50 ms to scan 1 urls, producing 19 keys and 39 values
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,037][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://172.23.0.71:9200/, http://172.23.0.72:9200/, http://172.23.0.73:9200/]}}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,413][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://172.23.0.71:9200/"}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,487][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,492][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,504][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://172.23.0.72:9200/"}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,526][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://172.23.0.73:9200/"}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,567][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//172.23.0.71:9200", "//172.23.0.72:9200", "//172.23.0.73:9200"]}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,650][INFO ][logstash.outputs.elasticsearch] Using default mapping template
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,698][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,706][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, :thread=>"#<Thread:0x779ce661 run>"}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,770][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,782][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,945][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
Sep 26 08:34:05 elk6 logstash: [2019-09-26T08:34:05,966][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
Sep 26 08:34:05 elk6 logstash: auto.commit.interval.ms = 5000
Sep 26 08:34:05 elk6 logstash: auto.offset.reset = latest
Sep 26 08:34:05 elk6 logstash: bootstrap.servers = [172.23.0.41:9092]
Sep 26 08:34:05 elk6 logstash: check.crcs = true
Sep 26 08:34:05 elk6 logstash: client.dns.lookup = default
Sep 26 08:34:05 elk6 logstash: client.id = logstash-0
Sep 26 08:34:05 elk6 logstash: connections.max.idle.ms = 540000
Sep 26 08:34:05 elk6 logstash: default.api.timeout.ms = 60000
Sep 26 08:34:05 elk6 logstash: enable.auto.commit = true
Sep 26 08:34:05 elk6 logstash: exclude.internal.topics = true
Sep 26 08:34:05 elk6 logstash: fetch.max.bytes = 52428800
Sep 26 08:34:05 elk6 logstash: fetch.max.wait.ms = 500
Sep 26 08:34:05 elk6 logstash: fetch.min.bytes = 1
Sep 26 08:34:05 elk6 logstash: group.id = logstash
Sep 26 08:34:05 elk6 logstash: heartbeat.interval.ms = 3000
Sep 26 08:34:05 elk6 logstash: interceptor.classes = []
Sep 26 08:34:05 elk6 logstash: internal.leave.group.on.close = true
Sep 26 08:34:05 elk6 logstash: isolation.level = read_uncommitted
Sep 26 08:34:05 elk6 logstash: key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:05 elk6 logstash: max.partition.fetch.bytes = 1048576
Sep 26 08:34:05 elk6 logstash: max.poll.interval.ms = 300000
Sep 26 08:34:05 elk6 logstash: max.poll.records = 500
Sep 26 08:34:05 elk6 logstash: metadata.max.age.ms = 300000
Sep 26 08:34:05 elk6 logstash: metric.reporters = []
Sep 26 08:34:05 elk6 logstash: metrics.num.samples = 2
Sep 26 08:34:05 elk6 logstash: metrics.recording.level = INFO
Sep 26 08:34:05 elk6 logstash: metrics.sample.window.ms = 30000
Sep 26 08:34:05 elk6 logstash: partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
Sep 26 08:34:05 elk6 logstash: receive.buffer.bytes = 65536
Sep 26 08:34:05 elk6 logstash: reconnect.backoff.max.ms = 1000
Sep 26 08:34:05 elk6 logstash: reconnect.backoff.ms = 50
Sep 26 08:34:05 elk6 logstash: request.timeout.ms = 30000
Sep 26 08:34:05 elk6 logstash: retry.backoff.ms = 100
Sep 26 08:34:05 elk6 logstash: sasl.client.callback.handler.class = null
Sep 26 08:34:05 elk6 logstash: sasl.jaas.config = null
Sep 26 08:34:05 elk6 logstash: sasl.kerberos.kinit.cmd = /usr/bin/kinit
Sep 26 08:34:05 elk6 logstash: sasl.kerberos.min.time.before.relogin = 60000
Sep 26 08:34:05 elk6 logstash: sasl.kerberos.service.name = null
Sep 26 08:34:05 elk6 logstash: sasl.kerberos.ticket.renew.jitter = 0.05
Sep 26 08:34:05 elk6 logstash: sasl.kerberos.ticket.renew.window.factor = 0.8
Sep 26 08:34:05 elk6 logstash: sasl.login.callback.handler.class = null
Sep 26 08:34:05 elk6 logstash: sasl.login.class = null
Sep 26 08:34:05 elk6 logstash: sasl.login.refresh.buffer.seconds = 300
Sep 26 08:34:05 elk6 logstash: sasl.login.refresh.min.period.seconds = 60
Sep 26 08:34:05 elk6 logstash: sasl.login.refresh.window.factor = 0.8
Sep 26 08:34:05 elk6 logstash: sasl.login.refresh.window.jitter = 0.05
Sep 26 08:34:05 elk6 logstash: sasl.mechanism = GSSAPI
Sep 26 08:34:05 elk6 logstash: security.protocol = PLAINTEXT
Sep 26 08:34:05 elk6 logstash: send.buffer.bytes = 131072
Sep 26 08:34:05 elk6 logstash: session.timeout.ms = 10000
Sep 26 08:34:05 elk6 logstash: ssl.cipher.suites = null
Sep 26 08:34:05 elk6 logstash: ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
Sep 26 08:34:05 elk6 logstash: ssl.endpoint.identification.algorithm = https
Sep 26 08:34:05 elk6 logstash: ssl.key.password = null
Sep 26 08:34:05 elk6 logstash: ssl.keymanager.algorithm = SunX509
Sep 26 08:34:05 elk6 logstash: ssl.keystore.location = null
Sep 26 08:34:05 elk6 logstash: ssl.keystore.password = null
Sep 26 08:34:05 elk6 logstash: ssl.keystore.type = JKS
Sep 26 08:34:05 elk6 logstash: ssl.protocol = TLS
Sep 26 08:34:05 elk6 logstash: ssl.provider = null
Sep 26 08:34:05 elk6 logstash: ssl.secure.random.implementation = null
Sep 26 08:34:05 elk6 logstash: ssl.trustmanager.algorithm = PKIX
Sep 26 08:34:05 elk6 logstash: ssl.truststore.location = null
Sep 26 08:34:05 elk6 logstash: ssl.truststore.password = null
Sep 26 08:34:05 elk6 logstash: ssl.truststore.type = JKS
Sep 26 08:34:05 elk6 logstash: value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,107][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,107][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,112][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
Sep 26 08:34:06 elk6 logstash: auto.commit.interval.ms = 5000
Sep 26 08:34:06 elk6 logstash: auto.offset.reset = latest
Sep 26 08:34:06 elk6 logstash: bootstrap.servers = [172.23.0.41:9092]
Sep 26 08:34:06 elk6 logstash: check.crcs = true
Sep 26 08:34:06 elk6 logstash: client.dns.lookup = default
Sep 26 08:34:06 elk6 logstash: client.id = logstash-1
Sep 26 08:34:06 elk6 logstash: connections.max.idle.ms = 540000
Sep 26 08:34:06 elk6 logstash: default.api.timeout.ms = 60000
Sep 26 08:34:06 elk6 logstash: enable.auto.commit = true
Sep 26 08:34:06 elk6 logstash: exclude.internal.topics = true
Sep 26 08:34:06 elk6 logstash: fetch.max.bytes = 52428800
Sep 26 08:34:06 elk6 logstash: fetch.max.wait.ms = 500
Sep 26 08:34:06 elk6 logstash: fetch.min.bytes = 1
Sep 26 08:34:06 elk6 logstash: group.id = logstash
Sep 26 08:34:06 elk6 logstash: heartbeat.interval.ms = 3000
Sep 26 08:34:06 elk6 logstash: interceptor.classes = []
Sep 26 08:34:06 elk6 logstash: internal.leave.group.on.close = true
Sep 26 08:34:06 elk6 logstash: isolation.level = read_uncommitted
Sep 26 08:34:06 elk6 logstash: key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: max.partition.fetch.bytes = 1048576
Sep 26 08:34:06 elk6 logstash: max.poll.interval.ms = 300000
Sep 26 08:34:06 elk6 logstash: max.poll.records = 500
Sep 26 08:34:06 elk6 logstash: metadata.max.age.ms = 300000
Sep 26 08:34:06 elk6 logstash: metric.reporters = []
Sep 26 08:34:06 elk6 logstash: metrics.num.samples = 2
Sep 26 08:34:06 elk6 logstash: metrics.recording.level = INFO
Sep 26 08:34:06 elk6 logstash: metrics.sample.window.ms = 30000
Sep 26 08:34:06 elk6 logstash: partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
Sep 26 08:34:06 elk6 logstash: receive.buffer.bytes = 65536
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.max.ms = 1000
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.ms = 50
Sep 26 08:34:06 elk6 logstash: request.timeout.ms = 30000
Sep 26 08:34:06 elk6 logstash: retry.backoff.ms = 100
Sep 26 08:34:06 elk6 logstash: sasl.client.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.jaas.config = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.kinit.cmd = /usr/bin/kinit
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.min.time.before.relogin = 60000
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.service.name = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.buffer.seconds = 300
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.min.period.seconds = 60
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.mechanism = GSSAPI
Sep 26 08:34:06 elk6 logstash: security.protocol = PLAINTEXT
Sep 26 08:34:06 elk6 logstash: send.buffer.bytes = 131072
Sep 26 08:34:06 elk6 logstash: session.timeout.ms = 10000
Sep 26 08:34:06 elk6 logstash: ssl.cipher.suites = null
Sep 26 08:34:06 elk6 logstash: ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
Sep 26 08:34:06 elk6 logstash: ssl.endpoint.identification.algorithm = https
Sep 26 08:34:06 elk6 logstash: ssl.key.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keymanager.algorithm = SunX509
Sep 26 08:34:06 elk6 logstash: ssl.keystore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.type = JKS
Sep 26 08:34:06 elk6 logstash: ssl.protocol = TLS
Sep 26 08:34:06 elk6 logstash: ssl.provider = null
Sep 26 08:34:06 elk6 logstash: ssl.secure.random.implementation = null
Sep 26 08:34:06 elk6 logstash: ssl.trustmanager.algorithm = PKIX
Sep 26 08:34:06 elk6 logstash: ssl.truststore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.type = JKS
Sep 26 08:34:06 elk6 logstash: value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,122][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,122][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,123][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
Sep 26 08:34:06 elk6 logstash: auto.commit.interval.ms = 5000
Sep 26 08:34:06 elk6 logstash: auto.offset.reset = latest
Sep 26 08:34:06 elk6 logstash: bootstrap.servers = [172.23.0.41:9092]
Sep 26 08:34:06 elk6 logstash: check.crcs = true
Sep 26 08:34:06 elk6 logstash: client.dns.lookup = default
Sep 26 08:34:06 elk6 logstash: client.id = logstash-2
Sep 26 08:34:06 elk6 logstash: connections.max.idle.ms = 540000
Sep 26 08:34:06 elk6 logstash: default.api.timeout.ms = 60000
Sep 26 08:34:06 elk6 logstash: enable.auto.commit = true
Sep 26 08:34:06 elk6 logstash: exclude.internal.topics = true
Sep 26 08:34:06 elk6 logstash: fetch.max.bytes = 52428800
Sep 26 08:34:06 elk6 logstash: fetch.max.wait.ms = 500
Sep 26 08:34:06 elk6 logstash: fetch.min.bytes = 1
Sep 26 08:34:06 elk6 logstash: group.id = logstash
Sep 26 08:34:06 elk6 logstash: heartbeat.interval.ms = 3000
Sep 26 08:34:06 elk6 logstash: interceptor.classes = []
Sep 26 08:34:06 elk6 logstash: internal.leave.group.on.close = true
Sep 26 08:34:06 elk6 logstash: isolation.level = read_uncommitted
Sep 26 08:34:06 elk6 logstash: key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: max.partition.fetch.bytes = 1048576
Sep 26 08:34:06 elk6 logstash: max.poll.interval.ms = 300000
Sep 26 08:34:06 elk6 logstash: max.poll.records = 500
Sep 26 08:34:06 elk6 logstash: metadata.max.age.ms = 300000
Sep 26 08:34:06 elk6 logstash: metric.reporters = []
Sep 26 08:34:06 elk6 logstash: metrics.num.samples = 2
Sep 26 08:34:06 elk6 logstash: metrics.recording.level = INFO
Sep 26 08:34:06 elk6 logstash: metrics.sample.window.ms = 30000
Sep 26 08:34:06 elk6 logstash: partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
Sep 26 08:34:06 elk6 logstash: receive.buffer.bytes = 65536
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.max.ms = 1000
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.ms = 50
Sep 26 08:34:06 elk6 logstash: request.timeout.ms = 30000
Sep 26 08:34:06 elk6 logstash: retry.backoff.ms = 100
Sep 26 08:34:06 elk6 logstash: sasl.client.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.jaas.config = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.kinit.cmd = /usr/bin/kinit
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.min.time.before.relogin = 60000
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.service.name = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.buffer.seconds = 300
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.min.period.seconds = 60
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.mechanism = GSSAPI
Sep 26 08:34:06 elk6 logstash: security.protocol = PLAINTEXT
Sep 26 08:34:06 elk6 logstash: send.buffer.bytes = 131072
Sep 26 08:34:06 elk6 logstash: session.timeout.ms = 10000
Sep 26 08:34:06 elk6 logstash: ssl.cipher.suites = null
Sep 26 08:34:06 elk6 logstash: ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
Sep 26 08:34:06 elk6 logstash: ssl.endpoint.identification.algorithm = https
Sep 26 08:34:06 elk6 logstash: ssl.key.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keymanager.algorithm = SunX509
Sep 26 08:34:06 elk6 logstash: ssl.keystore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.type = JKS
Sep 26 08:34:06 elk6 logstash: ssl.protocol = TLS
Sep 26 08:34:06 elk6 logstash: ssl.provider = null
Sep 26 08:34:06 elk6 logstash: ssl.secure.random.implementation = null
Sep 26 08:34:06 elk6 logstash: ssl.trustmanager.algorithm = PKIX
Sep 26 08:34:06 elk6 logstash: ssl.truststore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.type = JKS
Sep 26 08:34:06 elk6 logstash: value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,131][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,131][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,133][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
Sep 26 08:34:06 elk6 logstash: auto.commit.interval.ms = 5000
Sep 26 08:34:06 elk6 logstash: auto.offset.reset = latest
Sep 26 08:34:06 elk6 logstash: bootstrap.servers = [172.23.0.41:9092]
Sep 26 08:34:06 elk6 logstash: check.crcs = true
Sep 26 08:34:06 elk6 logstash: client.dns.lookup = default
Sep 26 08:34:06 elk6 logstash: client.id = logstash-3
Sep 26 08:34:06 elk6 logstash: connections.max.idle.ms = 540000
Sep 26 08:34:06 elk6 logstash: default.api.timeout.ms = 60000
Sep 26 08:34:06 elk6 logstash: enable.auto.commit = true
Sep 26 08:34:06 elk6 logstash: exclude.internal.topics = true
Sep 26 08:34:06 elk6 logstash: fetch.max.bytes = 52428800
Sep 26 08:34:06 elk6 logstash: fetch.max.wait.ms = 500
Sep 26 08:34:06 elk6 logstash: fetch.min.bytes = 1
Sep 26 08:34:06 elk6 logstash: group.id = logstash
Sep 26 08:34:06 elk6 logstash: heartbeat.interval.ms = 3000
Sep 26 08:34:06 elk6 logstash: interceptor.classes = []
Sep 26 08:34:06 elk6 logstash: internal.leave.group.on.close = true
Sep 26 08:34:06 elk6 logstash: isolation.level = read_uncommitted
Sep 26 08:34:06 elk6 logstash: key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: max.partition.fetch.bytes = 1048576
Sep 26 08:34:06 elk6 logstash: max.poll.interval.ms = 300000
Sep 26 08:34:06 elk6 logstash: max.poll.records = 500
Sep 26 08:34:06 elk6 logstash: metadata.max.age.ms = 300000
Sep 26 08:34:06 elk6 logstash: metric.reporters = []
Sep 26 08:34:06 elk6 logstash: metrics.num.samples = 2
Sep 26 08:34:06 elk6 logstash: metrics.recording.level = INFO
Sep 26 08:34:06 elk6 logstash: metrics.sample.window.ms = 30000
Sep 26 08:34:06 elk6 logstash: partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
Sep 26 08:34:06 elk6 logstash: receive.buffer.bytes = 65536
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.max.ms = 1000
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.ms = 50
Sep 26 08:34:06 elk6 logstash: request.timeout.ms = 30000
Sep 26 08:34:06 elk6 logstash: retry.backoff.ms = 100
Sep 26 08:34:06 elk6 logstash: sasl.client.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.jaas.config = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.kinit.cmd = /usr/bin/kinit
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.min.time.before.relogin = 60000
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.service.name = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.buffer.seconds = 300
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.min.period.seconds = 60
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.mechanism = GSSAPI
Sep 26 08:34:06 elk6 logstash: security.protocol = PLAINTEXT
Sep 26 08:34:06 elk6 logstash: send.buffer.bytes = 131072
Sep 26 08:34:06 elk6 logstash: session.timeout.ms = 10000
Sep 26 08:34:06 elk6 logstash: ssl.cipher.suites = null
Sep 26 08:34:06 elk6 logstash: ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
Sep 26 08:34:06 elk6 logstash: ssl.endpoint.identification.algorithm = https
Sep 26 08:34:06 elk6 logstash: ssl.key.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keymanager.algorithm = SunX509
Sep 26 08:34:06 elk6 logstash: ssl.keystore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.type = JKS
Sep 26 08:34:06 elk6 logstash: ssl.protocol = TLS
Sep 26 08:34:06 elk6 logstash: ssl.provider = null
Sep 26 08:34:06 elk6 logstash: ssl.secure.random.implementation = null
Sep 26 08:34:06 elk6 logstash: ssl.trustmanager.algorithm = PKIX
Sep 26 08:34:06 elk6 logstash: ssl.truststore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.type = JKS
Sep 26 08:34:06 elk6 logstash: value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,140][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,140][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,141][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
Sep 26 08:34:06 elk6 logstash: auto.commit.interval.ms = 5000
Sep 26 08:34:06 elk6 logstash: auto.offset.reset = latest
Sep 26 08:34:06 elk6 logstash: bootstrap.servers = [172.23.0.41:9092]
Sep 26 08:34:06 elk6 logstash: check.crcs = true
Sep 26 08:34:06 elk6 logstash: client.dns.lookup = default
Sep 26 08:34:06 elk6 logstash: client.id = logstash-4
Sep 26 08:34:06 elk6 logstash: connections.max.idle.ms = 540000
Sep 26 08:34:06 elk6 logstash: default.api.timeout.ms = 60000
Sep 26 08:34:06 elk6 logstash: enable.auto.commit = true
Sep 26 08:34:06 elk6 logstash: exclude.internal.topics = true
Sep 26 08:34:06 elk6 logstash: fetch.max.bytes = 52428800
Sep 26 08:34:06 elk6 logstash: fetch.max.wait.ms = 500
Sep 26 08:34:06 elk6 logstash: fetch.min.bytes = 1
Sep 26 08:34:06 elk6 logstash: group.id = logstash
Sep 26 08:34:06 elk6 logstash: heartbeat.interval.ms = 3000
Sep 26 08:34:06 elk6 logstash: interceptor.classes = []
Sep 26 08:34:06 elk6 logstash: internal.leave.group.on.close = true
Sep 26 08:34:06 elk6 logstash: isolation.level = read_uncommitted
Sep 26 08:34:06 elk6 logstash: key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: max.partition.fetch.bytes = 1048576
Sep 26 08:34:06 elk6 logstash: max.poll.interval.ms = 300000
Sep 26 08:34:06 elk6 logstash: max.poll.records = 500
Sep 26 08:34:06 elk6 logstash: metadata.max.age.ms = 300000
Sep 26 08:34:06 elk6 logstash: metric.reporters = []
Sep 26 08:34:06 elk6 logstash: metrics.num.samples = 2
Sep 26 08:34:06 elk6 logstash: metrics.recording.level = INFO
Sep 26 08:34:06 elk6 logstash: metrics.sample.window.ms = 30000
Sep 26 08:34:06 elk6 logstash: partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
Sep 26 08:34:06 elk6 logstash: receive.buffer.bytes = 65536
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.max.ms = 1000
Sep 26 08:34:06 elk6 logstash: reconnect.backoff.ms = 50
Sep 26 08:34:06 elk6 logstash: request.timeout.ms = 30000
Sep 26 08:34:06 elk6 logstash: retry.backoff.ms = 100
Sep 26 08:34:06 elk6 logstash: sasl.client.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.jaas.config = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.kinit.cmd = /usr/bin/kinit
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.min.time.before.relogin = 60000
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.service.name = null
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.kerberos.ticket.renew.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.callback.handler.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.class = null
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.buffer.seconds = 300
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.min.period.seconds = 60
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.factor = 0.8
Sep 26 08:34:06 elk6 logstash: sasl.login.refresh.window.jitter = 0.05
Sep 26 08:34:06 elk6 logstash: sasl.mechanism = GSSAPI
Sep 26 08:34:06 elk6 logstash: security.protocol = PLAINTEXT
Sep 26 08:34:06 elk6 logstash: send.buffer.bytes = 131072
Sep 26 08:34:06 elk6 logstash: session.timeout.ms = 10000
Sep 26 08:34:06 elk6 logstash: ssl.cipher.suites = null
Sep 26 08:34:06 elk6 logstash: ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
Sep 26 08:34:06 elk6 logstash: ssl.endpoint.identification.algorithm = https
Sep 26 08:34:06 elk6 logstash: ssl.key.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keymanager.algorithm = SunX509
Sep 26 08:34:06 elk6 logstash: ssl.keystore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.keystore.type = JKS
Sep 26 08:34:06 elk6 logstash: ssl.protocol = TLS
Sep 26 08:34:06 elk6 logstash: ssl.provider = null
Sep 26 08:34:06 elk6 logstash: ssl.secure.random.implementation = null
Sep 26 08:34:06 elk6 logstash: ssl.trustmanager.algorithm = PKIX
Sep 26 08:34:06 elk6 logstash: ssl.truststore.location = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.password = null
Sep 26 08:34:06 elk6 logstash: ssl.truststore.type = JKS
Sep 26 08:34:06 elk6 logstash: value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,147][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 2.1.0
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,147][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : eec43959745f444f
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,526][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: H4zxWFXARWugn1cNdQMhIg
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,526][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: H4zxWFXARWugn1cNdQMhIg
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,527][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: H4zxWFXARWugn1cNdQMhIg
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,528][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: H4zxWFXARWugn1cNdQMhIg
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,530][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: H4zxWFXARWugn1cNdQMhIg
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,536][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,537][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Discovered group coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,537][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Discovered group coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,537][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Discovered group coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,537][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Discovered group coordinator kafka1.dev2:9092 (id: 2147483646 rack: null)
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,555][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Revoking previously assigned partitions []
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,555][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Revoking previously assigned partitions []
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,556][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] (Re-)joining group
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,556][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] (Re-)joining group
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,558][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Revoking previously assigned partitions []
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,558][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] (Re-)joining group
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,560][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Revoking previously assigned partitions []
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,560][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] (Re-)joining group
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,562][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Revoking previously assigned partitions []
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,563][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] (Re-)joining group
Sep 26 08:34:06 elk6 logstash: [2019-09-26T08:34:06,652][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,588][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Successfully joined group with generation 64
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,588][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Successfully joined group with generation 64
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,588][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Successfully joined group with generation 64
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,588][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Successfully joined group with generation 64
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,589][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Successfully joined group with generation 64
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,593][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [kafka-4, kafka-5, kafka-2, kafka-3, kafka-0, kafka-1]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,593][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [kafka-6, kafka-7, kafka-10, kafka-11, kafka-8, kafka-9]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,594][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Setting newly assigned partitions [kafka-16, kafka-17, kafka-14, kafka-15, kafka-12, kafka-13]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,594][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Setting newly assigned partitions [kafka-28, kafka-29, kafka-26, kafka-27, kafka-24, kafka-25]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,597][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Setting newly assigned partitions [kafka-22, kafka-23, kafka-20, kafka-21, kafka-18, kafka-19]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,632][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-2, groupId=logstash] Resetting offset for partition kafka-17 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,633][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-2, groupId=logstash] Resetting offset for partition kafka-14 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,634][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-3, groupId=logstash] Resetting offset for partition kafka-22 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,635][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-3, groupId=logstash] Resetting offset for partition kafka-19 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,635][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition kafka-5 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,635][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-4, groupId=logstash] Resetting offset for partition kafka-29 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,635][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-4, groupId=logstash] Resetting offset for partition kafka-26 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,636][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-4, groupId=logstash] Resetting offset for partition kafka-28 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,636][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-4, groupId=logstash] Resetting offset for partition kafka-25 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,636][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition kafka-2 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,636][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Resetting offset for partition kafka-11 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,637][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Resetting offset for partition kafka-8 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,641][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Resetting offset for partition kafka-7 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,641][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition kafka-4 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,641][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Resetting offset for partition kafka-10 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,641][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition kafka-1 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,641][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-2, groupId=logstash] Resetting offset for partition kafka-15 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,641][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-2, groupId=logstash] Resetting offset for partition kafka-12 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,642][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-2, groupId=logstash] Resetting offset for partition kafka-16 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,642][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-2, groupId=logstash] Resetting offset for partition kafka-13 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,643][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-4, groupId=logstash] Resetting offset for partition kafka-27 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,643][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-4, groupId=logstash] Resetting offset for partition kafka-24 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,644][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Resetting offset for partition kafka-6 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,644][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-1, groupId=logstash] Resetting offset for partition kafka-9 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,644][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition kafka-3 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,645][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition kafka-0 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,652][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-3, groupId=logstash] Resetting offset for partition kafka-23 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,652][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-3, groupId=logstash] Resetting offset for partition kafka-20 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,652][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-3, groupId=logstash] Resetting offset for partition kafka-21 to offset 0.
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,652][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-3, groupId=logstash] Resetting offset for partition kafka-18 to offset 0.
Sep 26 08:40:01 elk6 systemd: Started Session 988 of user root.
Sep 26 08:40:01 elk6 systemd: Starting Session 988 of user root.
Sep 26 08:50:01 elk6 systemd: Started Session 989 of user root.
Sep 26 08:50:01 elk6 systemd: Starting Session 989 of user root.
Sep 26 09:00:01 elk6 systemd: Started Session 990 of user root.
Sep 26 09:00:01 elk6 systemd: Starting Session 990 of user root.
Sep 26 09:01:01 elk6 systemd: Started Session 991 of user root.
Sep 26 09:01:01 elk6 systemd: Starting Session 991 of user root.
Sep 26 09:10:01 elk6 systemd: Started Session 992 of user root.
Sep 26 09:10:01 elk6 systemd: Starting Session 992 of user root.
已邀请:

xsq95

赞同来自:

请求哪位大神指点一下

stone_xy

赞同来自:

[Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions [kafka-4, kafka-5, kafka-2, kafka-3, kafka-0, kafka-1]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,593][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-1, groupId=logstash] Setting newly assigned partitions [kafka-6, kafka-7, kafka-10, kafka-11, kafka-8, kafka-9]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,594][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-2, groupId=logstash] Setting newly assigned partitions [kafka-16, kafka-17, kafka-14, kafka-15, kafka-12, kafka-13]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,594][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-4, groupId=logstash] Setting newly assigned partitions [kafka-28, kafka-29, kafka-26, kafka-27, kafka-24, kafka-25]
Sep 26 08:34:09 elk6 logstash: [2019-09-26T08:34:09,597][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-3, groupId=logstash] Setting newly assigned partitions [kafka-22, kafka-23, kafka-20, kafka-21, kafka-18, kafka-19]
从这个日志来看,logstash作为kafka的消费者已经正常分配到了消费的partition。问题应该不在Logstash。
 
你确定FileBeat正常把数据发送到了kafka吗?用kafka-console-consumer消费一下,看能不能消费到数据。
 
 
 

taeyeon

赞同来自:

排查:
1.你先将filebeat的output为标准输出  output.console: pretty:true 看看有没有数据
 
2.在看看kafka有没有消费数据    bin/kafka-console-consumer.sh --bootstrap-server host:9092 --topic xxx,,按此排查
 

zqc0512 - andy zhou

赞同来自:

topic 为kafaka的有数据没有?
找个有数据的topic
 

要回复问题请先登录注册