logstash无法拉取kafka数据

作者 catddd | 发布于2018年08月16日 | 阅读数:361

hi all:
   我的logstash 在收集kafka 数据的时候,只有在重启后的一段时间才有数据,之后就没有了。有其他部门也在消费这个topic的数据,group_id ,client_id都改成不一样的了。logstash log也没看到异常。 
logstash conf如下:        
input {
       tcp{
                port => 6688
                type => "vbs"
            }
       kafka {
                bootstrap_servers => ["lol:9092,lol:9092"]
                consumer_threads => 3
                topics => ["dev2-RMCloudAPILog"]
                group_id => "fdf"
                 client_id => "fdf"
                codec => plain
                type => "dev2rmcloud"
                auto_offset_reset => "earliest"
              }
      
        }
filter {
        json {
            source => "message"
            remove_field => ["message"]
        }
}
filter {
        grok {
                match => {"message" => "(?<logdate>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})"}
                }
        date {
                match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
                target => "@timestamp"
                }
        mutate {
                remove_field => ["logdate"]
                remove_field => ["tags"]
                }
}
output {
    if "dev2rmcloud" in [type] { 
        elasticsearch {
                  hosts => ["balabala:9200","balabala:9200"]
                  index => "dev2-rmcloudapilog_%{+YYYY.MM.dd}"
                  user => balabala
                  password =>balabala
                  }
        }

 
lostash logs 如下:

[2018-08-16T16:16:08,674][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values: 
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [balabala:9092, balabala:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = fdf-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = fdf
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest

log.png
已邀请:

catddd

赞同来自:

这个Kafka的日志量比较大且其中有单挑日志也是很大的。我把logstash 开启了debug。
就不停的输出如下日志了。
 
[2018-08-16T19:17:37,091][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:17:42,091][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:17:47,090][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:17:52,090][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:17:57,089][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:18:02,089][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:18:07,088][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2018-08-16T19:18:12,088][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline

catddd

赞同来自:

是ES没办法消化突然过来的数据量吗???

JackGe

赞同来自:

先通过kafka-console-consumer.sh --zookeeper 127.0.0.1:2181/online --topic topicName来消费下topic里是不是一直有数据。
然后看下es日志里是否有写入出错的日志信息。检测下写入es索引中id字段的设置,是否因为重复写入相同id。
 

要回复问题请先登录注册