hi all:
我的logstash 在收集kafka 数据的时候,只有在重启后的一段时间才有数据,之后就没有了。有其他部门也在消费这个topic的数据,group_id ,client_id都改成不一样的了。logstash log也没看到异常。
logstash conf如下:
input {
tcp{
port => 6688
type => "vbs"
}
kafka {
bootstrap_servers => ["lol:9092,lol:9092"]
consumer_threads => 3
topics => ["dev2-RMCloudAPILog"]
group_id => "fdf"
client_id => "fdf"
codec => plain
type => "dev2rmcloud"
auto_offset_reset => "earliest"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
filter {
grok {
match => {"message" => "(?<logdate>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})"}
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
target => "@timestamp"
}
mutate {
remove_field => ["logdate"]
remove_field => ["tags"]
}
}
output {
if "dev2rmcloud" in [type] {
elasticsearch {
hosts => ["balabala:9200","balabala:9200"]
index => "dev2-rmcloudapilog_%{+YYYY.MM.dd}"
user => balabala
password =>balabala
}
}
}
lostash logs 如下:
[2018-08-16T16:16:08,674][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [balabala:9092, balabala:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = fdf-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = fdf
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest
我的logstash 在收集kafka 数据的时候,只有在重启后的一段时间才有数据,之后就没有了。有其他部门也在消费这个topic的数据,group_id ,client_id都改成不一样的了。logstash log也没看到异常。
logstash conf如下:
input {
tcp{
port => 6688
type => "vbs"
}
kafka {
bootstrap_servers => ["lol:9092,lol:9092"]
consumer_threads => 3
topics => ["dev2-RMCloudAPILog"]
group_id => "fdf"
client_id => "fdf"
codec => plain
type => "dev2rmcloud"
auto_offset_reset => "earliest"
}
}
filter {
json {
source => "message"
remove_field => ["message"]
}
}
filter {
grok {
match => {"message" => "(?<logdate>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})"}
}
date {
match => ["logdate", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
target => "@timestamp"
}
mutate {
remove_field => ["logdate"]
remove_field => ["tags"]
}
}
output {
if "dev2rmcloud" in [type] {
elasticsearch {
hosts => ["balabala:9200","balabala:9200"]
index => "dev2-rmcloudapilog_%{+YYYY.MM.dd}"
user => balabala
password =>balabala
}
}
}
lostash logs 如下:
[2018-08-16T16:16:08,674][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [balabala:9092, balabala:9092]
ssl.keystore.type = JKS
enable.auto.commit = true
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id = fdf-1
ssl.endpoint.identification.algorithm = null
max.poll.records = 2147483647
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = fdf
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = earliest
3 个回复
catddd
赞同来自:
就不停的输出如下日志了。
[2018-08-16T19:17:37,091][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:17:42,091][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:17:47,090][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:17:52,090][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:17:57,089][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:18:02,089][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:18:07,088][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
[2018-08-16T19:18:12,088][DEBUG][logstash.pipeline ] Pushing flush onto pipeline
catddd
赞同来自:
JackGe
赞同来自:
然后看下es日志里是否有写入出错的日志信息。检测下写入es索引中id字段的设置,是否因为重复写入相同id。