要不要也来分享分享一下啊

logstash无法消费kafka数据

Logstash | 作者 xiaoxiao_1110 | 发布于2019年10月18日 | 阅读数:7584

通过在控制台执行logstash,可以看到正常连接到kafka,但数据一直无法消费,然后过大约5分钟后连接就断开了,也能看到明显的提示
[WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstsh-0, groupId=gc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
 
这是logstash连接上kafka后 查看kafka的 gc消费组 的偏移,不会变化GROUP    TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG      CONSUMER-ID                                     HOST            CLIENT-ID
gc              gc-mysql-slow         0          666730          708688          41958           logstash-0 /10.255.0.2     logstash-0
gc              gc-mysql-slow         1          669991          710558          40567           logstash-0 /10.255.0.2     logstash-0
 
这是logstash在控制台下打印的消息,
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [kafka-node1:26200, kafka-node2:26201, kafka-node3:26202]
        check.crcs = true
        client.dns.lookup = default
        client.id = logstsh-1-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = gc
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes =
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 1
        metadata.max.age.ms = 300000
        metric.reporters =
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 52428800
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
 [2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.3.0
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: fc1aaa116b661c8a
[2019-10-18T04:08:57,480][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1571386137478
[2019-10-18T04:08:57,516][INFO ][org.apache.kafka.clients.consumer.KafkaConsumer][main] [Consumer clientId=logstash-0, groupId=gc] Subscribed to topic(s): gc-mysql-slow
[2019-10-18T04:08:57,754][INFO ][org.apache.kafka.clients.Metadata][main] [Consumer clientId=logstash-0, groupId=gc] Cluster ID: 7LJ29pB8TW2zpdihaoyDbg
[2019-10-18T04:08:57,755][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Discovered group coordinator kafka-node3:26202 (id: 2147483644 rack: null)
[2019-10-18T04:08:57,764][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Revoking previously assigned partitions
[2019-10-18T04:08:57,764][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] (Re-)joining group
[2019-10-18T04:08:57,788][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] (Re-)joining group
[2019-10-18T04:08:57,814][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Successfully joined group with generation 297
[2019-10-18T04:08:57,817][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting newly assigned partitions: gc-mysql-slow-0, gc-mysql-slow-1
[2019-10-18T04:08:57,831][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting offset for partition gc-mysql-slow-0 to the committed offset FetchPosition{offset=666730, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-node3:26202 (id: 3 rack: null), epoch=6}}
[2019-10-18T04:08:57,841][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Setting offset for partition gc-mysql-slow-1 to the committed offset FetchPosition{offset=669991, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=kafka-node1:26200 (id: 1 rack: null), epoch=11}}
[2019-10-18T04:08:57,866][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
 
我也有通过python写了一小段简单的脚本,能正常获取到kafka数据。
下面是python的订阅参数
{
'bootstrap_servers': ['kafka-node1:26200'],
 'client_id': 'kafka-python-1.4.7',
 'group_id': 'gc',
 'key_deserializer': None,
 'value_deserializer': None,
 'fetch_max_wait_ms': 500,
 'fetch_min_bytes': 1,
 'fetch_max_bytes': 52428800,
 'max_partition_fetch_bytes': 1048576,
 'request_timeout_ms': 305000,
 'retry_backoff_ms': 100,
 'reconnect_backoff_ms': 50,
 'reconnect_backoff_max_ms': 1000,
 'max_in_flight_requests_per_connection': 5,
 'auto_offset_reset': 'latest',
 'enable_auto_commit': True,
 'auto_commit_interval_ms': 5000,
 'default_offset_commit_callback': <function KafkaConsumer.<lambda> at 0x000001355B4FCE58>,
 'check_crcs': True,
 'metadata_max_age_ms': 300000,
 'partition_assignment_strategy': (<class 'kafka.coordinator.assignors.range.RangePartitionAssignor'>, <class 'kafka.coordinator.assignors.roundrobin.RoundRobinPartitionAssignor'>),
 'max_poll_records': 500,
 'max_poll_interval_ms': 300000,
 'session_timeout_ms': 10000,
 'heartbeat_interval_ms': 3000,
 'receive_buffer_bytes': None,
 'send_buffer_bytes': None,
 'socket_options': [(6, 1, 1)],
 'sock_chunk_bytes': 4096,
 'sock_chunk_buffer_count': 1000,
 'consumer_timeout_ms': inf,
 'security_protocol': 'PLAINTEXT',
 'ssl_context': None,
 'ssl_check_hostname': True,
 'ssl_cafile': None,
 'ssl_certfile': None,
 'ssl_keyfile': None,
 'ssl_crlfile': None,
 'ssl_password': None,
 'ssl_ciphers': None,
 'api_version': (1, 0, 0),
 'api_version_auto_timeout_ms': 2000,
 'connections_max_idle_ms': 540000,
 'metric_reporters': ,
 'metrics_num_samples': 2,
 'metrics_sample_window_ms': 30000,
 'metric_group_prefix': 'consumer',
 'selector': <class 'selectors.SelectSelector'>,
 'exclude_internal_topics': True,
 'sasl_mechanism': None,
 'sasl_plain_username': None,
 'sasl_plain_password': None,
 'sasl_kerberos_service_name': 'kafka',
 'sasl_kerberos_domain_name': None,
 'sasl_oauth_token_provider': None,
 'legacy_iterator': False}
 
真的不知道原因在哪了。有没有大神指点一下方向。昨晚还好好的,今天一天就不行了。现在已经堆积好几万条的数据没消费了。
input kafka
output elasticsearch
已邀请:

xiaoxiao_1110

赞同来自:

当我设置 max_poll_records => "10"时,
每次启动都能往es中写入1345条数据,然后就是等到5分钟后被移出 gc消费组
 
[2019-10-18T05:20:00,563][WARN ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
[2019-10-18T05:20:00,564][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator][main] [Consumer clientId=logstash-0, groupId=gc] Member logstash-0-adabac6d-22f0-4da3-b89e-2016684a9770 sending LeaveGroup request to coordinator kafka-node3:26202 (id: 2147483644 rack: null)
 
但没有提交最新的偏移,每次都是从上一个地方重复在消费数据,commit没成功。不知何故

chachabusi - 新手妹子运维,希望多多关照

赞同来自:

帮顶 

medcl - 今晚打老虎。

赞同来自:

Logstash 怎么配置的呢,管道配置发一下看看。

要回复问题请先登录注册