kafka

kafka

请问这个时什么工具啊

默认分类HelloClyde 回复了问题 • 2 人关注 • 2 个回复 • 115 次浏览 • 2018-11-26 11:06 • 来自相关话题

当Elasticsearch遇见Kafka--Kafka Connect

Elasticsearchmushao999 发表了文章 • 0 个评论 • 148 次浏览 • 2018-11-17 11:15 • 来自相关话题

本文同步发布在腾讯云+社区Elasticsearch专栏中:https://cloud.tencent.com/developer/column/4008
在“当Elasticsearch遇见Kafka--Logstash kafka input插件”一文中,我对Logstash的Kafka input插件进行了简单的介绍,并通过实际操作的方式,为大家呈现了使用该方式实现Kafka与Elastisearch整合的基本过程。可以看出使用Logstash input插件的方式,具有配置简单,数据处理方便等优点。然而使用Logstash Kafka插件并不是Kafka与Elsticsearch整合的唯一方案,另一种比较常见的方案是使用Kafka的开源组件Kafka Connect。

Confluent实现Kafka与Elasticsearch的连接

1 Kafka Connect简介

Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

Kafka connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)

Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)

关于Kafka Connect的详细情况可以参考[Kafka Connect]

2 使用Kafka Connect连接Kafka和Elasticsearch

2.1 测试环境准备

本文与使用Logstash Kafka input插件环境一样[传送门],组件列表如下

服务 ip port
Elasticsearch service 192.168.0.8 9200
Ckafka 192.168.13.10 9092
CVM 192.168.0.13 -

kafka topic也复用原来了的kafka_es_test

2.2 Kafka Connect 安装

[Kafka Connec下载地址]

本文下载的为开源版本confluent-oss-5.0.1-2.11.tar.gz,下载后解压

2.3 Worker配置

1) 配置参考

如前文所说,worker分为Standalone和Distributed两种模式,针对两种模式的配置,参考如下

[通用配置]

[Standalone Woker配置]

[Distributed Worker配置]

此处需要注意的是Kafka Connect默认使用AvroConverter,使用该AvroConverter时需要注意必须启动Schema Registry服务

2) 实际操作

本测试使用standalone模式,因此修改/root/confluent-5.0.1/etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=192.168.13.10:9092

2.4 Elasticsearch Connector配置

1) 配置参考

[Connectors通用配置]

[Elasticsearch Configuration Options]

2) 实际操作

修改/root/confluent-5.0.1/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=kafka_es_test
key.ignore=true
connection.url=http://192.168.0.8:9200
type.name=kafka-connect

注意: 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

2.5 启动connector

1 注意事项

1) 由于配置文件中jar包位置均采用的相对路径,因此建议在confluent根目录下执行命令和启动程序,以避免不必要的问题

2) 如果前面没有修改converter,仍采用AvroConverter, 注意需要在启动connertor前启动Schema Registry服务

2 启动Schema Registry服务

正如前文所说,由于在配置worker时指定使用了AvroConverter,因此需要启动Schema Registry服务。而该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。由于CKafka不支持用户通过接口形式创建topic,因此需要在本机起一个kafka以创建名为_schema的topic。

1) 启动Zookeeper

./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

2) 启动kafka

./bin/kafka-server-start -daemon etc/kafka/server.properties

3) 启动schema Registry

./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

4) 使用netstat -natpl 查看各服务端口是否正常启动

zookeeper 2181

kafka 9092

schema registry 8081

3 启动Connector

./bin/connect-standalone -daemon  etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

ps:以上启动各服务均可在logs目录下找到对应日志

2.6 启动Kafka Producer

由于我们采用的是AvroConverter,因此不能采用Kafka工具包中的producer。Kafka Connector bin目录下提供了Avro Producer

1) 启动Producer

./bin/kafka-avro-console-producer --broker-list 192.168.13.10:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'

2) 输入如下数据

{"nickname":"michel"}
{"nickname":"mushao"}

2.7 Kibana验证结果

1) 查看索引

在kibana Dev Tools的Console中输入

GET _cat/indices

结果

green open kafka_es_test 36QtDP6vQOG7ubOa161wGQ 5 1 1 0 7.9kb 3.9kb
green open .kibana       QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb

可以看到名为kafka_es_test的索引被成功创建

2) 查看数据

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+0",
        "_score": 1,
        "_source": {
          "nickname": "michel"
        }
      },
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+1",
        "_score": 1,
        "_source": {
          "nickname": "mushao"
        }
      }
    ]
  }
}

可以看到数据已经被成功写入

3 Confluent CLI

3.1 简介

查阅资料时发现很多文章都是使用Confluent CLI启动Kafka Connect,然而官方文档已经明确说明了该CLI只是适用于开发阶段,不能用于生产环境。

它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务。但是这些服务对于Kafka Connect都不是必须的,如果不使用AvroConverter,则只需要启动Connect即可。即使使用了AvroConverter, 也只需要启动schema registry,将schema保存在远端的kafka中。Kafka Connect REST API也只是为用户提供一个管理connector的接口,也不是必选的。

另外使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置

3.2 使用Confluent CLI

confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等,详情参考如下简介视频 [Introducing the Confluent CLI | Screencast]

1) 启动

./bin/confluent start

2) 检查confluent运行状态

./bin/confluent status

当得到如下结果则说明confluent启动成功

ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

3) 问题定位

如果第二步出现问题,可以使用log命令查看,如connect未启动成功则

./bin/confluent log connect

4) 加载Elasticsearch Connector

a) 查看connector

./bin/confluent list connectors

结果

Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink

b) 加载Elasticsearch connector

./bin/confluent load elasticsearch-sink

结果

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "kafka_es_test",
        "key.ignore": "true",
        "connection.url": "http://192.168.0.8:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
    },
    "tasks": [],
    "type": null
}

5) 使用producer生产数据,并使用kibana验证是否写入成功

4 Kafka Connect Rest API

Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

1) GET connectors 获取运行中的connector列表

2) POST connectors 使用指定的名称和配置创建connector

3) GET connectors/(string:name) 获取connector的详细信息

4) GET connectors/(string:name)/config 获取connector的配置

5) PUT connectors/(string:name)/config 设置connector的配置

6) GET connectors/(string:name)/status 获取connector状态

7) POST connectors/(stirng:name)/restart 重启connector

8) PUT connectors/(string:name)/pause 暂停connector

9) PUT connectors/(string:name)/resume 恢复connector

10) DELETE connectors/(string:name)/ 删除connector

11) GET connectors/(string:name)/tasks 获取connectors任务列表

12) GET /connectors/(string: name)/tasks/(int: taskid)/status 获取任务状态

13) POST /connectors/(string: name)/tasks/(int: taskid)/restart 重启任务

14) GET /connector-plugins/ 获取已安装插件列表

15) PUT /connector-plugins/(string: name)/config/validate 验证配置

5 总结

Kafka Connect是Kafka一个功能强大的组件,为kafka提供了与外部系统连接的一套完整方案,包括数据传输,连接管理,监控,多副本等。相对于Logstash Kafka插件,功能更为全面,但配置也相对为复杂些。有文章提到其性能也优于Logstash Kafka Input插件,如果对写入性能比较敏感的场景,可以在实际压测的基础上进行选择。另外由于直接将数据从Kafka写入Elasticsearch, 如果需要对文档进行处理时,选择Logstash可能更为方便。

Filebeat连接Kafka ssl 报错

Beatsmaojing 回复了问题 • 3 人关注 • 2 个回复 • 553 次浏览 • 2018-09-19 15:15 • 来自相关话题

filebeat可以发送到多个kafka集群吗?

Beatszqc0512 回复了问题 • 2 人关注 • 2 个回复 • 262 次浏览 • 2018-09-19 08:54 • 来自相关话题

logstash与kafka topic可以使用变量吗?

Logstashluohuanfeng 回复了问题 • 4 人关注 • 3 个回复 • 223 次浏览 • 2018-09-11 17:03 • 来自相关话题

filebeat不能自动创建topic

BeatsJackGe 回复了问题 • 4 人关注 • 4 个回复 • 418 次浏览 • 2018-09-10 22:20 • 来自相关话题

测试logstatsh 接收kafka topic 性能有这么低嘛?

回复

Logstashzhangshuai 发起了问题 • 1 人关注 • 0 个回复 • 165 次浏览 • 2018-09-04 11:54 • 来自相关话题

logstash作为kafka的消费者的时候,获取不到kafka中组的信息

Logstashaslan 回复了问题 • 2 人关注 • 2 个回复 • 395 次浏览 • 2018-08-17 09:54 • 来自相关话题

kafka消费时报错

默认分类JackGe 回复了问题 • 2 人关注 • 1 个回复 • 527 次浏览 • 2018-08-15 20:57 • 来自相关话题

filebeat-> Kafka 如何保留host字段?

Logstashluohuanfeng 回复了问题 • 4 人关注 • 3 个回复 • 340 次浏览 • 2018-08-13 16:51 • 来自相关话题

找能在武汉培训 Kfaka的老师,给客户讲几天课,报酬私聊。

求职招聘wendy 回复了问题 • 3 人关注 • 2 个回复 • 1274 次浏览 • 2018-07-31 08:33 • 来自相关话题

logstash无法消费kafka数据

回复

Logstashmuou 发起了问题 • 1 人关注 • 0 个回复 • 770 次浏览 • 2018-07-20 14:59 • 来自相关话题

filebeat到logstash之间加一层Kafka后的日志格式问题

Logstashmuou 回复了问题 • 3 人关注 • 2 个回复 • 709 次浏览 • 2018-07-19 13:59 • 来自相关话题

kafka manager 中的Brokers skew 什么含义啊?

回复

开源项目sterne vencel 回复了问题 • 1 人关注 • 2 个回复 • 596 次浏览 • 2018-06-25 12:21 • 来自相关话题

logstash往kafka里同步数据的时候kafka经常超时,然后logstash会停止

回复

Logstashaoliao_paopao 发起了问题 • 1 人关注 • 0 个回复 • 594 次浏览 • 2018-05-30 14:50 • 来自相关话题

请问这个时什么工具啊

回复

默认分类HelloClyde 回复了问题 • 2 人关注 • 2 个回复 • 115 次浏览 • 2018-11-26 11:06 • 来自相关话题

Filebeat连接Kafka ssl 报错

回复

Beatsmaojing 回复了问题 • 3 人关注 • 2 个回复 • 553 次浏览 • 2018-09-19 15:15 • 来自相关话题

filebeat可以发送到多个kafka集群吗?

回复

Beatszqc0512 回复了问题 • 2 人关注 • 2 个回复 • 262 次浏览 • 2018-09-19 08:54 • 来自相关话题

logstash与kafka topic可以使用变量吗?

回复

Logstashluohuanfeng 回复了问题 • 4 人关注 • 3 个回复 • 223 次浏览 • 2018-09-11 17:03 • 来自相关话题

filebeat不能自动创建topic

回复

BeatsJackGe 回复了问题 • 4 人关注 • 4 个回复 • 418 次浏览 • 2018-09-10 22:20 • 来自相关话题

测试logstatsh 接收kafka topic 性能有这么低嘛?

回复

Logstashzhangshuai 发起了问题 • 1 人关注 • 0 个回复 • 165 次浏览 • 2018-09-04 11:54 • 来自相关话题

logstash作为kafka的消费者的时候,获取不到kafka中组的信息

回复

Logstashaslan 回复了问题 • 2 人关注 • 2 个回复 • 395 次浏览 • 2018-08-17 09:54 • 来自相关话题

kafka消费时报错

回复

默认分类JackGe 回复了问题 • 2 人关注 • 1 个回复 • 527 次浏览 • 2018-08-15 20:57 • 来自相关话题

filebeat-> Kafka 如何保留host字段?

回复

Logstashluohuanfeng 回复了问题 • 4 人关注 • 3 个回复 • 340 次浏览 • 2018-08-13 16:51 • 来自相关话题

找能在武汉培训 Kfaka的老师,给客户讲几天课,报酬私聊。

回复

求职招聘wendy 回复了问题 • 3 人关注 • 2 个回复 • 1274 次浏览 • 2018-07-31 08:33 • 来自相关话题

logstash无法消费kafka数据

回复

Logstashmuou 发起了问题 • 1 人关注 • 0 个回复 • 770 次浏览 • 2018-07-20 14:59 • 来自相关话题

filebeat到logstash之间加一层Kafka后的日志格式问题

回复

Logstashmuou 回复了问题 • 3 人关注 • 2 个回复 • 709 次浏览 • 2018-07-19 13:59 • 来自相关话题

kafka manager 中的Brokers skew 什么含义啊?

回复

开源项目sterne vencel 回复了问题 • 1 人关注 • 2 个回复 • 596 次浏览 • 2018-06-25 12:21 • 来自相关话题

logstash往kafka里同步数据的时候kafka经常超时,然后logstash会停止

回复

Logstashaoliao_paopao 发起了问题 • 1 人关注 • 0 个回复 • 594 次浏览 • 2018-05-30 14:50 • 来自相关话题

filebeat->kafka没反应。

回复

Logstashrockybean 回复了问题 • 3 人关注 • 2 个回复 • 1227 次浏览 • 2018-05-25 15:14 • 来自相关话题

当Elasticsearch遇见Kafka--Kafka Connect

Elasticsearchmushao999 发表了文章 • 0 个评论 • 148 次浏览 • 2018-11-17 11:15 • 来自相关话题

本文同步发布在腾讯云+社区Elasticsearch专栏中:https://cloud.tencent.com/developer/column/4008
在“当Elasticsearch遇见Kafka--Logstash kafka input插件”一文中,我对Logstash的Kafka input插件进行了简单的介绍,并通过实际操作的方式,为大家呈现了使用该方式实现Kafka与Elastisearch整合的基本过程。可以看出使用Logstash input插件的方式,具有配置简单,数据处理方便等优点。然而使用Logstash Kafka插件并不是Kafka与Elsticsearch整合的唯一方案,另一种比较常见的方案是使用Kafka的开源组件Kafka Connect。

Confluent实现Kafka与Elasticsearch的连接

1 Kafka Connect简介

Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

Kafka connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)

Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)

关于Kafka Connect的详细情况可以参考[Kafka Connect]

2 使用Kafka Connect连接Kafka和Elasticsearch

2.1 测试环境准备

本文与使用Logstash Kafka input插件环境一样[传送门],组件列表如下

服务 ip port
Elasticsearch service 192.168.0.8 9200
Ckafka 192.168.13.10 9092
CVM 192.168.0.13 -

kafka topic也复用原来了的kafka_es_test

2.2 Kafka Connect 安装

[Kafka Connec下载地址]

本文下载的为开源版本confluent-oss-5.0.1-2.11.tar.gz,下载后解压

2.3 Worker配置

1) 配置参考

如前文所说,worker分为Standalone和Distributed两种模式,针对两种模式的配置,参考如下

[通用配置]

[Standalone Woker配置]

[Distributed Worker配置]

此处需要注意的是Kafka Connect默认使用AvroConverter,使用该AvroConverter时需要注意必须启动Schema Registry服务

2) 实际操作

本测试使用standalone模式,因此修改/root/confluent-5.0.1/etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=192.168.13.10:9092

2.4 Elasticsearch Connector配置

1) 配置参考

[Connectors通用配置]

[Elasticsearch Configuration Options]

2) 实际操作

修改/root/confluent-5.0.1/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=kafka_es_test
key.ignore=true
connection.url=http://192.168.0.8:9200
type.name=kafka-connect

注意: 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

2.5 启动connector

1 注意事项

1) 由于配置文件中jar包位置均采用的相对路径,因此建议在confluent根目录下执行命令和启动程序,以避免不必要的问题

2) 如果前面没有修改converter,仍采用AvroConverter, 注意需要在启动connertor前启动Schema Registry服务

2 启动Schema Registry服务

正如前文所说,由于在配置worker时指定使用了AvroConverter,因此需要启动Schema Registry服务。而该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。由于CKafka不支持用户通过接口形式创建topic,因此需要在本机起一个kafka以创建名为_schema的topic。

1) 启动Zookeeper

./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

2) 启动kafka

./bin/kafka-server-start -daemon etc/kafka/server.properties

3) 启动schema Registry

./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

4) 使用netstat -natpl 查看各服务端口是否正常启动

zookeeper 2181

kafka 9092

schema registry 8081

3 启动Connector

./bin/connect-standalone -daemon  etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

ps:以上启动各服务均可在logs目录下找到对应日志

2.6 启动Kafka Producer

由于我们采用的是AvroConverter,因此不能采用Kafka工具包中的producer。Kafka Connector bin目录下提供了Avro Producer

1) 启动Producer

./bin/kafka-avro-console-producer --broker-list 192.168.13.10:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'

2) 输入如下数据

{"nickname":"michel"}
{"nickname":"mushao"}

2.7 Kibana验证结果

1) 查看索引

在kibana Dev Tools的Console中输入

GET _cat/indices

结果

green open kafka_es_test 36QtDP6vQOG7ubOa161wGQ 5 1 1 0 7.9kb 3.9kb
green open .kibana       QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb

可以看到名为kafka_es_test的索引被成功创建

2) 查看数据

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+0",
        "_score": 1,
        "_source": {
          "nickname": "michel"
        }
      },
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+1",
        "_score": 1,
        "_source": {
          "nickname": "mushao"
        }
      }
    ]
  }
}

可以看到数据已经被成功写入

3 Confluent CLI

3.1 简介

查阅资料时发现很多文章都是使用Confluent CLI启动Kafka Connect,然而官方文档已经明确说明了该CLI只是适用于开发阶段,不能用于生产环境。

它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务。但是这些服务对于Kafka Connect都不是必须的,如果不使用AvroConverter,则只需要启动Connect即可。即使使用了AvroConverter, 也只需要启动schema registry,将schema保存在远端的kafka中。Kafka Connect REST API也只是为用户提供一个管理connector的接口,也不是必选的。

另外使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置

3.2 使用Confluent CLI

confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等,详情参考如下简介视频 [Introducing the Confluent CLI | Screencast]

1) 启动

./bin/confluent start

2) 检查confluent运行状态

./bin/confluent status

当得到如下结果则说明confluent启动成功

ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

3) 问题定位

如果第二步出现问题,可以使用log命令查看,如connect未启动成功则

./bin/confluent log connect

4) 加载Elasticsearch Connector

a) 查看connector

./bin/confluent list connectors

结果

Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink

b) 加载Elasticsearch connector

./bin/confluent load elasticsearch-sink

结果

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "kafka_es_test",
        "key.ignore": "true",
        "connection.url": "http://192.168.0.8:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
    },
    "tasks": [],
    "type": null
}

5) 使用producer生产数据,并使用kibana验证是否写入成功

4 Kafka Connect Rest API

Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

1) GET connectors 获取运行中的connector列表

2) POST connectors 使用指定的名称和配置创建connector

3) GET connectors/(string:name) 获取connector的详细信息

4) GET connectors/(string:name)/config 获取connector的配置

5) PUT connectors/(string:name)/config 设置connector的配置

6) GET connectors/(string:name)/status 获取connector状态

7) POST connectors/(stirng:name)/restart 重启connector

8) PUT connectors/(string:name)/pause 暂停connector

9) PUT connectors/(string:name)/resume 恢复connector

10) DELETE connectors/(string:name)/ 删除connector

11) GET connectors/(string:name)/tasks 获取connectors任务列表

12) GET /connectors/(string: name)/tasks/(int: taskid)/status 获取任务状态

13) POST /connectors/(string: name)/tasks/(int: taskid)/restart 重启任务

14) GET /connector-plugins/ 获取已安装插件列表

15) PUT /connector-plugins/(string: name)/config/validate 验证配置

5 总结

Kafka Connect是Kafka一个功能强大的组件,为kafka提供了与外部系统连接的一套完整方案,包括数据传输,连接管理,监控,多副本等。相对于Logstash Kafka插件,功能更为全面,但配置也相对为复杂些。有文章提到其性能也优于Logstash Kafka Input插件,如果对写入性能比较敏感的场景,可以在实际压测的基础上进行选择。另外由于直接将数据从Kafka写入Elasticsearch, 如果需要对文档进行处理时,选择Logstash可能更为方便。

新的kafka集群监控系统使用golang开发

Elasticsearchkppotato 发表了文章 • 0 个评论 • 487 次浏览 • 2018-05-04 17:04 • 来自相关话题

开源地址:https://github.com/kppotato/kafka_monitor   项目使用:golang开发,数据库:prometheus 图形:grafana
开源地址:https://github.com/kppotato/kafka_monitor   项目使用:golang开发,数据库:prometheus 图形:grafana

【急聘】搜索推荐系统研发工程师 12-20K

求职招聘man429 发表了文章 • 0 个评论 • 1188 次浏览 • 2017-07-26 15:38 • 来自相关话题

岗位职责: 1,负责个性化推荐系统的算法和架构研发, 实现在相关产品中的精准推荐; 2,负责产品、内容的推荐与其他场景的基础数据挖掘; 3,根据海量用户行为的分析和挖掘,构建用户画像、标签系统等。   任职要求: 1、两年以上相关工作经验; 2、有推荐系统或搜索排序研发经验, 熟悉常用的推荐算法,有实际算法调优经验; 3、熟悉Hadoop、HBase、Spark、Kafka等计算平台和工具; 4、掌握自然语言处理、协同推荐算法方面的基本知识; 5、良好的沟通和学习能力,团队合作精神,能独立承担工作。   加分项: 1,有大规模海量数据机器学习、数据挖掘、计算广告、搜索引擎相关经验; 2,有互联网电商行业数据经验。 易所试集团(Www.liketry.com),新三板上市公司,市值10亿左右,组建北京研发中心,13薪起,正常基数五险一金并提供商业保险(补充医疗+意外等),10天年假起,弹性工作制,薪资可根据能力商议。工作地点:北京望京SOHO,简历请发送至邮箱:hang.song@liketry.com。
岗位职责: 1,负责个性化推荐系统的算法和架构研发, 实现在相关产品中的精准推荐; 2,负责产品、内容的推荐与其他场景的基础数据挖掘; 3,根据海量用户行为的分析和挖掘,构建用户画像、标签系统等。   任职要求: 1、两年以上相关工作经验; 2、有推荐系统或搜索排序研发经验, 熟悉常用的推荐算法,有实际算法调优经验; 3、熟悉Hadoop、HBase、Spark、Kafka等计算平台和工具; 4、掌握自然语言处理、协同推荐算法方面的基本知识; 5、良好的沟通和学习能力,团队合作精神,能独立承担工作。   加分项: 1,有大规模海量数据机器学习、数据挖掘、计算广告、搜索引擎相关经验; 2,有互联网电商行业数据经验。 易所试集团(Www.liketry.com),新三板上市公司,市值10亿左右,组建北京研发中心,13薪起,正常基数五险一金并提供商业保险(补充医疗+意外等),10天年假起,弹性工作制,薪资可根据能力商议。工作地点:北京望京SOHO,简历请发送至邮箱:hang.song@liketry.com。

Day7: hangout 替代 logstash-input-kafka

Advent三斗室 发表了文章 • 2 个评论 • 6945 次浏览 • 2015-12-08 00:54 • 来自相关话题

用 Logstash 接收 Kafka 里的业务日志再写入 Elasticsearch 已经成为一个常见的选择。但是大多数人随后就会碰到一个问题:logstash-input-kafka 的性能上不去! 这个问题,主要是由于 Logstash 用 JRuby 实现,所以数据从 Kafka 下来到最后流转进 Logstash 里,要经过四五次 Ruby 和 Java 之间的数据结构转换,大大浪费和消耗了 CPU 资源。作为优化,我们可以通过修改默认的 logstash-input-kafka 的 codec 配置为 line,把 Jrjackson 处理流程挪到 logstash-filter-json 里多线程处理,但是也只能提高一倍性能而已。 Logstash 开发组目前也在实现纯 Java 版的 logstash-core-event,但是最终能提高多少,也是未知数。 那么在 Logstash 性能提上去之前,围绕 Kafka 还有什么办法能高效又不失灵活的做到数据处理并写入 Elasticsearch 呢?今天给大家推荐一下携程网开源的 hangout。 hangout 采用 YAML 格式配置语法,跟 Elasticsearch 一样,省去了 Logstash 解析 DSL 的复杂度。下面一段配置是 repo 中自带的 example 示例:
inputs:
  - Kafka:
    codec: plain
    encoding: UTF8 # defaut UTF8
    topic: 
      app: 2
    consumer_settings:
      group.id: hangout
      zookeeper.connect: 192.168.1.200:2181
      auto.commit.interval.ms: "1000"
      socket.receive.buffer.bytes: "1048576"
      fetch.message.max.bytes: "1048576"
      num.consumer.fetchers: "4"
  - Kafka:
    codec: json
    topic: 
      web: 1
    consumer_settings:
      group.id: hangout
      zookeeper.connect: 192.168.1.201:2181
      auto.commit.interval.ms: "5000"

filters:
  - Grok:
    match:
      - '^(?<logtime>\S+) (?<user>.+) (-|(?<level>\w+)) %{DATA:msg}$'
    remove_fields: ['message']
  - Add:
    fields:
      test: 'abcd'
    if:
      - '<#if message??>true</#if>'
      - '<#if message?contains("liu")>true<#elseif message?contains("warn")>true</#if>'
  - Date:
    src: logtime
    formats:
      - 'ISO8601'
    remove_fields: ['logtime']
  - Lowercase:
    fields: ['user']
  - Add:
    fields:
      me: 'I am ${user}'
  - Remove:
    fields:
      - logtime
  - Trim:
    fields:
      - user
  - Rename:
    fields:
      me: he
      user: she
  - Gsub:
    fields:
      she: ['c','CCC']
      he: ['(^\w+)|(\w+$)','XXX']
  - Translate:
    source: user
    target: nick
    dictionary_path: /tmp/app.dic
  - KV:
    source: msg
    target: kv
    field_split: ' '
    value_split: '='
    trim: '\t\"'
    trimkey: '\"'
    include_keys: ["a","b","xyz","12"]
    exclude_keys: ["b","c"] # b in excluded
    tag_on_failure: "KVfail"
    remove_fields: ['msg']
  - Convert:
    fields:
      cs_bytes: integer
      time_taken: float
  - URLDecode:
    fields: ["query1","query2"]

outputs:
  - Stdout:
    if:
      - '<#if user=="childe">true</#if>'
  - Elasticsearch:
    cluster: hangoutcluster
    hosts:
      - 192.168.1.200
    index: 'hangout-%{user}-%{+YYYY.MM.dd}'
    index_type: logs # default logs
    bulk_actions: 20000 #default 20000
    bulk_size: 15 # default 15 MB
    flush_interval: 10 # default 10 seconds
    concurrent_requests: 0 # default 0, concurrent_requests设置成大于0的数, 意思着多线程处理, 以我应用的经验,还有是一定OOM风险的,强烈建议设置为0
  - Kafka:
    broker_list: 192.168.1.200:9092
    topic: test2
其 pipeline 设计和 Logstash 不同的是:整个 filter 和 output 流程,都在 Kafka 的 consumer 线程中完成。所以,并发线程数完全是有 Kafka 的 partitions 设置来控制的。 实际运行下来,hangout 比 Logstash 确实在处理能力,尤其是 CPU 资源消耗方面,性价比要高出很多。 想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。