kafka
filebeat写数据到kafka乱序
Beats • shaonianlang 回复了问题 • 5 人关注 • 6 个回复 • 3544 次浏览 • 2021-03-04 19:20
你们elasticsearch用什么方式消费kafka?
Elasticsearch • laoyang360 回复了问题 • 2 人关注 • 1 个回复 • 2910 次浏览 • 2020-07-23 08:23
logstash 消费kafka数据时,partition持续rebalancing
Logstash • pony_maggie 回复了问题 • 10 人关注 • 8 个回复 • 12948 次浏览 • 2020-07-07 09:16
Kafka 消费端突然不进行消费
资讯动态 • zhtanjy 回复了问题 • 5 人关注 • 4 个回复 • 5901 次浏览 • 2020-04-17 14:20
kafka集群宕机一台后无法消费
默认分类 • micmouse521 回复了问题 • 2 人关注 • 1 个回复 • 3618 次浏览 • 2019-11-28 17:38
logstash无法消费kafka数据
Logstash • medcl 回复了问题 • 4 人关注 • 3 个回复 • 7584 次浏览 • 2019-11-13 14:35
filebeat -> kafka -> logstash -> elasticsearch || (kafka -> ApplicationDB ),第二个kafka消费的时候消息顺序错乱
Logstash • juneryang 回复了问题 • 4 人关注 • 2 个回复 • 3575 次浏览 • 2019-07-04 14:28
kafka新增分区对单分区时序性影响
默认分类 • Bobbao 回复了问题 • 4 人关注 • 2 个回复 • 2807 次浏览 • 2019-06-26 16:11
logstash消费kafka的问题
Logstash • zhangxinhong 回复了问题 • 8 人关注 • 4 个回复 • 8051 次浏览 • 2019-05-08 09:21
grok解析,出现一个字段值 重复匹配 ,message字段传过来的是正常的
回复Logstash • changxiangshimian/changxiangshimian.github.io 发起了问题 • 2 人关注 • 0 个回复 • 4938 次浏览 • 2019-05-05 18:17
请问这个时什么工具啊
默认分类 • gomatu 回复了问题 • 3 人关注 • 3 个回复 • 3645 次浏览 • 2019-04-18 14:41
如何选择在ES之前处理写入的中间件?
Elasticsearch • God_lockin 回复了问题 • 4 人关注 • 2 个回复 • 4079 次浏览 • 2018-12-18 14:48
用elasitc stack监控kafka
Kibana • 点火三周 发表了文章 • 0 个评论 • 4612 次浏览 • 2018-12-12 11:28
当我们搭建elasitc stack集群时,大多数时候会在我们的架构中加入kafka作为消息缓冲区,即从beats -> kafka -> logstash -> elasticsearch这样的一个消息流。使用kafka可以给我们带来很多便利,但是也让我们需要额外多维护一套组件,elasitc stack本身已经提供了monitoring的功能,我们可以方便的从kibana上监控各个组件中各节点的可用性,吞吐和性能等各种指标,但kafka作为架构中的组件之一却游离在监控之外,相当不合理。
幸而elastic真的是迭代的相当快,在metricbeat上很早就有了对kafka的监控,但一直没有一个直观的dashboard,终于在6.5版本上,上新了kafka dashboard。我们来看一下吧。
安装和配置metricbeat
安装包下载地址,下载后,自己安装。
然后,将/etc/metricbeat/modules.d/kafka.yml.disable
文件重命名为/etc/metricbeat/modules.d/kafka.yml
。(即打开kafka的监控)。稍微修改一下文件内容, 注意,这里需填入所有你需要监控的kafka服务器的地址:
# Module: kafka
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/6.4/metricbeat-module-kafka.html
- module: kafka
metricsets:
- partition
- consumergroup
period: 20s
hosts: ["10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092"]
#client_id: metricbeat
#retries: 3
#backoff: 250ms
# List of Topics to query metadata for. If empty, all topics will be queried.
#topics: []
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# SASL authentication
#username: ""
#password: ""
运行metricbeat,这里,一定要注意enable kibana dashboard。
然后就可以在kibana里面看到:
这样,我们就可以通过sentinl等类似的插件,自动做kafka的告警等功能了
你们elasticsearch用什么方式消费kafka?
回复Elasticsearch • laoyang360 回复了问题 • 2 人关注 • 1 个回复 • 2910 次浏览 • 2020-07-23 08:23
logstash 消费kafka数据时,partition持续rebalancing
回复Logstash • pony_maggie 回复了问题 • 10 人关注 • 8 个回复 • 12948 次浏览 • 2020-07-07 09:16
filebeat -> kafka -> logstash -> elasticsearch || (kafka -> ApplicationDB ),第二个kafka消费的时候消息顺序错乱
回复Logstash • juneryang 回复了问题 • 4 人关注 • 2 个回复 • 3575 次浏览 • 2019-07-04 14:28
grok解析,出现一个字段值 重复匹配 ,message字段传过来的是正常的
回复Logstash • changxiangshimian/changxiangshimian.github.io 发起了问题 • 2 人关注 • 0 个回复 • 4938 次浏览 • 2019-05-05 18:17
如何选择在ES之前处理写入的中间件?
回复Elasticsearch • God_lockin 回复了问题 • 4 人关注 • 2 个回复 • 4079 次浏览 • 2018-12-18 14:48
用elasitc stack监控kafka
Kibana • 点火三周 发表了文章 • 0 个评论 • 4612 次浏览 • 2018-12-12 11:28
当我们搭建elasitc stack集群时,大多数时候会在我们的架构中加入kafka作为消息缓冲区,即从beats -> kafka -> logstash -> elasticsearch这样的一个消息流。使用kafka可以给我们带来很多便利,但是也让我们需要额外多维护一套组件,elasitc stack本身已经提供了monitoring的功能,我们可以方便的从kibana上监控各个组件中各节点的可用性,吞吐和性能等各种指标,但kafka作为架构中的组件之一却游离在监控之外,相当不合理。
幸而elastic真的是迭代的相当快,在metricbeat上很早就有了对kafka的监控,但一直没有一个直观的dashboard,终于在6.5版本上,上新了kafka dashboard。我们来看一下吧。
安装和配置metricbeat
安装包下载地址,下载后,自己安装。
然后,将/etc/metricbeat/modules.d/kafka.yml.disable
文件重命名为/etc/metricbeat/modules.d/kafka.yml
。(即打开kafka的监控)。稍微修改一下文件内容, 注意,这里需填入所有你需要监控的kafka服务器的地址:
# Module: kafka
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/6.4/metricbeat-module-kafka.html
- module: kafka
metricsets:
- partition
- consumergroup
period: 20s
hosts: ["10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092","10.*.*.*:9092"]
#client_id: metricbeat
#retries: 3
#backoff: 250ms
# List of Topics to query metadata for. If empty, all topics will be queried.
#topics: []
# Optional SSL. By default is off.
# List of root certificates for HTTPS server verifications
#ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
# Certificate for SSL client authentication
#ssl.certificate: "/etc/pki/client/cert.pem"
# Client Certificate Key
#ssl.key: "/etc/pki/client/cert.key"
# SASL authentication
#username: ""
#password: ""
运行metricbeat,这里,一定要注意enable kibana dashboard。
然后就可以在kibana里面看到:
这样,我们就可以通过sentinl等类似的插件,自动做kafka的告警等功能了
当Elasticsearch遇见Kafka--Kafka Connect
Elasticsearch • mushao999 发表了文章 • 2 个评论 • 13881 次浏览 • 2018-11-17 11:15
本文同步发布在腾讯云+社区Elasticsearch专栏中:https://cloud.tencent.com/developer/column/4008
在“当Elasticsearch遇见Kafka--Logstash kafka input插件”一文中,我对Logstash的Kafka input插件进行了简单的介绍,并通过实际操作的方式,为大家呈现了使用该方式实现Kafka与Elastisearch整合的基本过程。可以看出使用Logstash input插件的方式,具有配置简单,数据处理方便等优点。然而使用Logstash Kafka插件并不是Kafka与Elsticsearch整合的唯一方案,另一种比较常见的方案是使用Kafka的开源组件Kafka Connect。
1 Kafka Connect简介
Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。
Kafka connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)
Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)
关于Kafka Connect的详细情况可以参考[Kafka Connect]
2 使用Kafka Connect连接Kafka和Elasticsearch
2.1 测试环境准备
本文与使用Logstash Kafka input插件环境一样[传送门],组件列表如下
服务 | ip | port |
---|---|---|
Elasticsearch service | 192.168.0.8 | 9200 |
Ckafka | 192.168.13.10 | 9092 |
CVM | 192.168.0.13 | - |
kafka topic也复用原来了的kafka_es_test
2.2 Kafka Connect 安装
本文下载的为开源版本confluent-oss-5.0.1-2.11.tar.gz,下载后解压
2.3 Worker配置
1) 配置参考
如前文所说,worker分为Standalone和Distributed两种模式,针对两种模式的配置,参考如下
[通用配置]
此处需要注意的是Kafka Connect默认使用AvroConverter,使用该AvroConverter时需要注意必须启动Schema Registry服务
2) 实际操作
本测试使用standalone模式,因此修改/root/confluent-5.0.1/etc/schema-registry/connect-avro-standalone.properties
bootstrap.servers=192.168.13.10:9092
2.4 Elasticsearch Connector配置
1) 配置参考
[Elasticsearch Configuration Options]
2) 实际操作
修改/root/confluent-5.0.1/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=kafka_es_test
key.ignore=true
connection.url=http://192.168.0.8:9200
type.name=kafka-connect
注意: 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射
2.5 启动connector
1 注意事项
1) 由于配置文件中jar包位置均采用的相对路径,因此建议在confluent根目录下执行命令和启动程序,以避免不必要的问题
2) 如果前面没有修改converter,仍采用AvroConverter, 注意需要在启动connertor前启动Schema Registry服务
2 启动Schema Registry服务
正如前文所说,由于在配置worker时指定使用了AvroConverter,因此需要启动Schema Registry服务。而该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。由于CKafka不支持用户通过接口形式创建topic,因此需要在本机起一个kafka以创建名为_schema的topic。
1) 启动Zookeeper
./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties
2) 启动kafka
./bin/kafka-server-start -daemon etc/kafka/server.properties
3) 启动schema Registry
./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties
4) 使用netstat -natpl 查看各服务端口是否正常启动
zookeeper 2181
kafka 9092
schema registry 8081
3 启动Connector
./bin/connect-standalone -daemon etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
ps:以上启动各服务均可在logs目录下找到对应日志
2.6 启动Kafka Producer
由于我们采用的是AvroConverter,因此不能采用Kafka工具包中的producer。Kafka Connector bin目录下提供了Avro Producer
1) 启动Producer
./bin/kafka-avro-console-producer --broker-list 192.168.13.10:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'
2) 输入如下数据
{"nickname":"michel"}
{"nickname":"mushao"}
2.7 Kibana验证结果
1) 查看索引
在kibana Dev Tools的Console中输入
GET _cat/indices
结果
green open kafka_es_test 36QtDP6vQOG7ubOa161wGQ 5 1 1 0 7.9kb 3.9kb
green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb
可以看到名为kafka_es_test的索引被成功创建
2) 查看数据
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 2,
"max_score": 1,
"hits": [
{
"_index": "kafka_es_test",
"_type": "kafka-connect",
"_id": "kafka_es_test+0+0",
"_score": 1,
"_source": {
"nickname": "michel"
}
},
{
"_index": "kafka_es_test",
"_type": "kafka-connect",
"_id": "kafka_es_test+0+1",
"_score": 1,
"_source": {
"nickname": "mushao"
}
}
]
}
}
可以看到数据已经被成功写入
3 Confluent CLI
3.1 简介
查阅资料时发现很多文章都是使用Confluent CLI启动Kafka Connect,然而官方文档已经明确说明了该CLI只是适用于开发阶段,不能用于生产环境。
它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务。但是这些服务对于Kafka Connect都不是必须的,如果不使用AvroConverter,则只需要启动Connect即可。即使使用了AvroConverter, 也只需要启动schema registry,将schema保存在远端的kafka中。Kafka Connect REST API也只是为用户提供一个管理connector的接口,也不是必选的。
另外使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置
3.2 使用Confluent CLI
confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等,详情参考如下简介视频 [Introducing the Confluent CLI | Screencast]
1) 启动
./bin/confluent start
2) 检查confluent运行状态
./bin/confluent status
当得到如下结果则说明confluent启动成功
ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]
3) 问题定位
如果第二步出现问题,可以使用log命令查看,如connect未启动成功则
./bin/confluent log connect
4) 加载Elasticsearch Connector
a) 查看connector
./bin/confluent list connectors
结果
Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink
b) 加载Elasticsearch connector
./bin/confluent load elasticsearch-sink
结果
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "kafka_es_test",
"key.ignore": "true",
"connection.url": "http://192.168.0.8:9200",
"type.name": "kafka-connect",
"name": "elasticsearch-sink"
},
"tasks": [],
"type": null
}
5) 使用producer生产数据,并使用kibana验证是否写入成功
4 Kafka Connect Rest API
Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作
1) GET connectors 获取运行中的connector列表
2) POST connectors 使用指定的名称和配置创建connector
3) GET connectors/(string:name) 获取connector的详细信息
4) GET connectors/(string:name)/config 获取connector的配置
5) PUT connectors/(string:name)/config 设置connector的配置
6) GET connectors/(string:name)/status 获取connector状态
7) POST connectors/(stirng:name)/restart 重启connector
8) PUT connectors/(string:name)/pause 暂停connector
9) PUT connectors/(string:name)/resume 恢复connector
10) DELETE connectors/(string:name)/ 删除connector
11) GET connectors/(string:name)/tasks 获取connectors任务列表
12) GET /connectors/(string: name)/tasks/(int: taskid)/status 获取任务状态
13) POST /connectors/(string: name)/tasks/(int: taskid)/restart 重启任务
14) GET /connector-plugins/ 获取已安装插件列表
15) PUT /connector-plugins/(string: name)/config/validate 验证配置
5 总结
Kafka Connect是Kafka一个功能强大的组件,为kafka提供了与外部系统连接的一套完整方案,包括数据传输,连接管理,监控,多副本等。相对于Logstash Kafka插件,功能更为全面,但配置也相对为复杂些。有文章提到其性能也优于Logstash Kafka Input插件,如果对写入性能比较敏感的场景,可以在实际压测的基础上进行选择。另外由于直接将数据从Kafka写入Elasticsearch, 如果需要对文档进行处理时,选择Logstash可能更为方便。
新的kafka集群监控系统使用golang开发
Elasticsearch • kppotato 发表了文章 • 0 个评论 • 3432 次浏览 • 2018-05-04 17:04
【急聘】搜索推荐系统研发工程师 12-20K
求职招聘 • man429 发表了文章 • 0 个评论 • 3986 次浏览 • 2017-07-26 15:38
Day7: hangout 替代 logstash-input-kafka
Advent • 三斗室 发表了文章 • 2 个评论 • 13102 次浏览 • 2015-12-08 00:54
inputs:
- Kafka:
codec: plain
encoding: UTF8 # defaut UTF8
topic:
app: 2
consumer_settings:
group.id: hangout
zookeeper.connect: 192.168.1.200:2181
auto.commit.interval.ms: "1000"
socket.receive.buffer.bytes: "1048576"
fetch.message.max.bytes: "1048576"
num.consumer.fetchers: "4"
- Kafka:
codec: json
topic:
web: 1
consumer_settings:
group.id: hangout
zookeeper.connect: 192.168.1.201:2181
auto.commit.interval.ms: "5000"
filters:
- Grok:
match:
- '^(?<logtime>\S+) (?<user>.+) (-|(?<level>\w+)) %{DATA:msg}$'
remove_fields: ['message']
- Add:
fields:
test: 'abcd'
if:
- '<#if message??>true</#if>'
- '<#if message?contains("liu")>true<#elseif message?contains("warn")>true</#if>'
- Date:
src: logtime
formats:
- 'ISO8601'
remove_fields: ['logtime']
- Lowercase:
fields: ['user']
- Add:
fields:
me: 'I am ${user}'
- Remove:
fields:
- logtime
- Trim:
fields:
- user
- Rename:
fields:
me: he
user: she
- Gsub:
fields:
she: ['c','CCC']
he: ['(^\w+)|(\w+$)','XXX']
- Translate:
source: user
target: nick
dictionary_path: /tmp/app.dic
- KV:
source: msg
target: kv
field_split: ' '
value_split: '='
trim: '\t\"'
trimkey: '\"'
include_keys: ["a","b","xyz","12"]
exclude_keys: ["b","c"] # b in excluded
tag_on_failure: "KVfail"
remove_fields: ['msg']
- Convert:
fields:
cs_bytes: integer
time_taken: float
- URLDecode:
fields: ["query1","query2"]
outputs:
- Stdout:
if:
- '<#if user=="childe">true</#if>'
- Elasticsearch:
cluster: hangoutcluster
hosts:
- 192.168.1.200
index: 'hangout-%{user}-%{+YYYY.MM.dd}'
index_type: logs # default logs
bulk_actions: 20000 #default 20000
bulk_size: 15 # default 15 MB
flush_interval: 10 # default 10 seconds
concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
- Kafka:
broker_list: 192.168.1.200:9092
topic: test2
其 pipeline 设计和 Logstash 不同的是:整个 filter 和 output 流程,都在 Kafka 的 consumer 线程中完成。所以,并发线程数完全是有 Kafka 的 partitions 设置来控制的。
实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。