使用 shuf 来打乱一个文件中的行或是选择文件中一个随机的行。

beats如何通过配置删除host字段

Beatsrochy 回复了问题 • 2 人关注 • 1 个回复 • 3671 次浏览 • 2018-11-17 14:49 • 来自相关话题

jest 和restClient 哪个好用点,有没有demo推荐?

Elasticsearchrochy 回复了问题 • 4 人关注 • 3 个回复 • 3356 次浏览 • 2018-11-17 13:55 • 来自相关话题

一文快速上手Logstash

Logstashmushao999 发表了文章 • 3 个评论 • 11037 次浏览 • 2018-11-17 11:24 • 来自相关话题

本文同步发布在腾讯云+社区Elasticsearch专栏:https://cloud.tencent.com/developer/column/4008
Elasticsearch是当前主流的分布式大数据存储和搜索引擎,可以为用户提供强大的全文本检索能力,广泛应用于日志检索,全站搜索等领域。Logstash作为Elasicsearch常用的实时数据采集引擎,可以采集来自不同数据源的数据,并对数据进行处理后输出到多种输出源,是Elastic Stack 的重要组成部分。本文从Logstash的工作原理,使用示例,部署方式及性能调优等方面入手,为大家提供一个快速入门Logstash的方式。文章最后也给出了一些深入了解Logstash的的链接,以方便大家根据需要详细了解。

![Logstash简介](https://main.qcloudimg.com/raw ... f8.png)

1 Logstash工作原理


1.1 处理过程


![Logstash处理过程](https://main.qcloudimg.com/raw ... 72.png)

如上图,Logstash的数据处理过程主要包括:Inputs, Filters, Outputs 三部分, 另外在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input,filter,output, codec插件,以实现特定的数据采集,数据处理,数据输出等功能

  • (1)Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等[[详细参考](https://www.elastic.co/guide/e ... s.html)]
  • (2)Filters:用于处理数据如格式转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等[[详细参考](https://www.elastic.co/guide/e ... s.html)]
  • (3)Outputs:用于数据输出,常见的插件如elastcisearch,file, graphite, statsd等[[详细参考](https://www.elastic.co/guide/e ... s.html)]
  • (4)Codecs:Codecs不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如json,multiline[[详细参考](https://www.elastic.co/guide/e ... s.html)]

    可以点击每个模块后面的_详细参考_链接了解该模块的插件列表及对应功能

    1.2 执行模型:


  • (1)每个Input启动一个线程,从对应数据源获取数据
  • (2)Input会将数据写入一个队列:默认为内存中的有界队列(意外停止会导致数据丢失)。为了防止数丢失Logstash提供了两个特性:
    [Persistent Queues](https://www.elastic.co/guide/e ... s.html):通过磁盘上的queue来防止数据丢失
    [Dead Letter Queues](https://www.elastic.co/guide/e ... s.html):保存无法处理的event(仅支持Elasticsearch作为输出源)
  • (3)Logstash会有多个pipeline worker, 每一个pipeline worker会从队列中取一批数据,然后执行filter和output(worker数目及每次处理的数据量均由配置确定)

    2 Logstash使用示例


    2.1 Logstash Hello world


    第一个示例Logstash将采用标准输入和标准输出作为input和output,并且不指定filter

  • (1)下载Logstash并解压(需要预先安装JDK8)
  • (2)cd到Logstash的根目录,并执行启动命令如下:

    <br /> cd logstash-6.4.0<br /> bin/logstash -e 'input { stdin { } } output { stdout {} }'<br />

  • (3)此时Logstash已经启动成功,-e表示在启动时直接指定pipeline配置,当然也可以将该配置写入一个配置文件中,然后通过指定配置文件来启动
  • (4)在控制台输入:hello world,可以看到如下输出:

    <br /> {<br /> "@version" => "1",<br /> "host" => "localhost",<br /> "@timestamp" => 2018-09-18T12:39:38.514Z,<br /> "message" => "hello world"<br /> } <br />

    Logstash会自动为数据添加@version, host, @timestamp等字段

    在这个示例中Logstash从标准输入中获得数据,仅在数据中添加一些简单字段后将其输出到标准输出。

    2.2 日志采集


    这个示例将采用Filebeat input插件(Elastic Stack中的轻量级数据采集程序)采集本地日志,然后将结果输出到标准输出

  • (1)下载示例使用的日志文件[[地址](https://download.elastic.co/de ... log.gz)],解压并将日志放在一个确定位置
  • (2)安装filebeat,配置并启动[[参考](https://www.elastic.co/guide/e ... d.html)]

    filebeat.yml配置如下(paths改为日志实际位置,不同版本beats配置可能略有变化,请根据情况调整)

    ```
    filebeat.prospectors:
    • input_type: log
      paths:
      • /path/to/file/logstash-tutorial.log
        output.logstash:
        hosts: "localhost:5044"
        <br /> <br /> 启动命令: <br /> <br />
        ./filebeat -e -c filebeat.yml -d "publish"
        ```

  • (3)配置logstash并启动

    1)创建first-pipeline.conf文件内容如下(该文件为pipeline配置文件,用于指定input,filter, output等):

    ```
    input {
    beats {
    port => "5044"
    }
    }

    filter {

    }

    output {
    stdout { codec => rubydebug }
    }
    <br /> <br /> codec => rubydebug用于美化输出[[参考](<a href="https://www.elastic.co/guide/en/logstash/6.4/plugins-codecs-rubydebug.htm" rel="nofollow" target="_blank">https://www.elastic.co/guide/e ... g.htm</a>l)] <br /> <br /> 2)验证配置(注意指定配置文件的路径):<br /> <br />
    ./bin/logstash -f first-pipeline.conf --config.test_and_exit
    <br /> <br /> 3)启动命令:<br /> <br />
    ./bin/logstash -f first-pipeline.conf --config.reload.automatic
    <br /> <br /> --config.reload.automatic选项启用动态重载配置功能<br /> <br /> 4)预期结果:<br /> <br /> 可以在Logstash的终端显示中看到,日志文件被读取并处理为如下格式的多条数据 <br /> <br />
    {
    "@timestamp" => 2018-10-09T12:22:39.742Z,
    "offset" => 24464,
    "@version" => "1",
    "input_type" => "log",
    "beat" => {
    "name" => "VM_136_9_centos",
    "hostname" => "VM_136_9_centos",
    "version" => "5.6.10"
    },
    "host" => "VM_136_9_centos",
    "source" => "/data/home/michelmu/workspace/logstash-tutorial.log",
    "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"http://www.semicomplete.com/projects/xdotool/\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",
    "type" => "log",
    "tags" => [
    [0] "beats_input_codec_plain_applied"
    ]
    }
    ```

    相对于示例2.1,该示例使用了filebeat input插件从日志中获取一行记录,这也是Elastic stack获取日志数据最常见的一种方式。另外该示例还采用了rubydebug codec 对输出的数据进行显示美化。

    2.3 日志格式处理


    可以看到虽然示例2.2使用filebeat从日志中读取数据,并将数据输出到标准输出,但是日志内容作为一个整体被存放在message字段中,这样对后续存储及查询都极为不便。可以为该pipeline指定一个[grok filter](https://www.elastic.co/guide/e ... k.html)来对日志格式进行处理

  • (1)在first-pipeline.conf中增加filter配置如下

    <br /> input {<br /> beats {<br /> port => "5044"<br /> }<br /> }<br /> filter {<br /> grok {<br /> match => { "message" => "%{COMBINEDAPACHELOG}"}<br /> }<br /> }<br /> output {<br /> stdout { codec => rubydebug }<br /> }<br />

  • (2)到filebeat的根目录下删除之前上报的数据历史(以便重新上报数据),并重启filebeat

    <br /> sudo rm data/registry<br /> sudo ./filebeat -e -c filebeat.yml -d "publish"<br />

  • (3)由于之前启动Logstash设置了自动更新配置,因此Logstash不需要重新启动,这个时候可以获取到的日志数据如下:

    <br /> {<br /> "request" => "/style2.css",<br /> "agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",<br /> "offset" => 24464,<br /> "auth" => "-",<br /> "ident" => "-",<br /> "input_type" => "log",<br /> "verb" => "GET",<br /> "source" => "/data/home/michelmu/workspace/logstash-tutorial.log",<br /> "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"<a href="http://www.semicomplete.com/projects/xdotool/" rel="nofollow" target="_blank">http://www.semicomplete.com/projects/xdotool/</a>\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",<br /> "type" => "log",<br /> "tags" => [<br /> [0] "beats_input_codec_plain_applied"<br /> ],<br /> "referrer" => "\"<a href="http://www.semicomplete.com/projects/xdotool/" rel="nofollow" target="_blank">http://www.semicomplete.com/projects/xdotool/</a>\"",<br /> "@timestamp" => 2018-10-09T12:24:21.276Z,<br /> "response" => "200",<br /> "bytes" => "4877",<br /> "clientip" => "86.1.76.62",<br /> "@version" => "1",<br /> "beat" => {<br /> "name" => "VM_136_9_centos",<br /> "hostname" => "VM_136_9_centos",<br /> "version" => "5.6.10"<br /> },<br /> "host" => "VM_136_9_centos",<br /> "httpversion" => "1.1",<br /> "timestamp" => "04/Jan/2015:05:30:37 +0000"<br /> }<br />

    可以看到message中的数据被详细解析出来了

    2.4 数据派生和增强


    Logstash中的一些filter可以根据现有数据生成一些新的数据,如[geoip](https://www.elastic.co/guide/e ... p.html)可以根据ip生成经纬度信息

  • (1)在first-pipeline.conf中增加geoip配置如下

    <br /> input {<br /> beats {<br /> port => "5044"<br /> }<br /> }<br /> filter {<br /> grok {<br /> match => { "message" => "%{COMBINEDAPACHELOG}"}<br /> }<br /> geoip {<br /> source => "clientip"<br /> }<br /> }<br /> output {<br /> stdout { codec => rubydebug }<br /> }<br />

  • (2)如2.3一样清空filebeat历史数据,并重启
  • (3)当然Logstash仍然不需要重启,可以看到输出变为如下:

    <br /> {<br /> "request" => "/style2.css",<br /> "agent" => "\"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",<br /> "geoip" => {<br /> "timezone" => "Europe/London",<br /> "ip" => "86.1.76.62",<br /> "latitude" => 51.5333,<br /> "continent_code" => "EU",<br /> "city_name" => "Willesden",<br /> "country_name" => "United Kingdom",<br /> "country_code2" => "GB",<br /> "country_code3" => "GB",<br /> "region_name" => "Brent",<br /> "location" => {<br /> "lon" => -0.2333,<br /> "lat" => 51.5333<br /> },<br /> "postal_code" => "NW10",<br /> "region_code" => "BEN",<br /> "longitude" => -0.2333<br /> },<br /> "offset" => 24464,<br /> "auth" => "-",<br /> "ident" => "-",<br /> "input_type" => "log",<br /> "verb" => "GET",<br /> "source" => "/data/home/michelmu/workspace/logstash-tutorial.log",<br /> "message" => "86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] \"GET /style2.css HTTP/1.1\" 200 4877 \"<a href="http://www.semicomplete.com/projects/xdotool/" rel="nofollow" target="_blank">http://www.semicomplete.com/projects/xdotool/</a>\" \"Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0\"",<br /> "type" => "log",<br /> "tags" => [<br /> [0] "beats_input_codec_plain_applied"<br /> ],<br /> "referrer" => "\"<a href="http://www.semicomplete.com/projects/xdotool/" rel="nofollow" target="_blank">http://www.semicomplete.com/projects/xdotool/</a>\"",<br /> "@timestamp" => 2018-10-09T12:37:46.686Z,<br /> "response" => "200",<br /> "bytes" => "4877",<br /> "clientip" => "86.1.76.62",<br /> "@version" => "1",<br /> "beat" => {<br /> "name" => "VM_136_9_centos",<br /> "hostname" => "VM_136_9_centos",<br /> "version" => "5.6.10"<br /> },<br /> "host" => "VM_136_9_centos",<br /> "httpversion" => "1.1",<br /> "timestamp" => "04/Jan/2015:05:30:37 +0000"<br /> }<br />

    可以看到根据ip派生出了许多地理位置信息数据

    2.5 将数据导入Elasticsearch


    Logstash作为Elastic stack的重要组成部分,其最常用的功能是将数据导入到Elasticssearch中。将Logstash中的数据导入到Elasticsearch中操作也非常的方便,只需要在pipeline配置文件中增加Elasticsearch的output即可。

  • (1)首先要有一个已经部署好的Logstash,当然可以使用腾讯云快速创建一个Elasticsearch[创建地址](https://console.cloud.tencent.com/es)
  • (2)在first-pipeline.conf中增加Elasticsearch的配置,如下

    <br /> input {<br /> beats {<br /> port => "5044"<br /> }<br /> }<br /> filter {<br /> grok {<br /> match => { "message" => "%{COMBINEDAPACHELOG}"}<br /> }<br /> geoip {<br /> source => "clientip"<br /> }<br /> }<br /> output {<br /> elasticsearch {<br /> hosts => [ "localhost:9200" ]<br /> }<br /> }<br />

  • (3)清理filebeat历史数据,并重启
  • (4)查询Elasticsearch确认数据是否正常上传(注意替换查询语句中的日期)

    <br /> curl -XGET 'http://172.16.16.17:9200/logstash-2018.10.09/_search?pretty&q=response=200'<br />

  • (5)如果Elasticsearch关联了Kibana也可以使用kibana查看数据是否正常上报

    ![kibana图示](https://main.qcloudimg.com/raw ... e5.png)

    Logstash提供了大量的Input, filter, output, codec的插件,用户可以根据自己的需要,使用一个或多个组件实现自己的功能,当然用户也可以自定义插件以实现更为定制化的功能。自定义插件可以参考[[logstash input插件开发](https://cloud.tencent.com/developer/article/1171033)]

    3 部署Logstash


    演示过如何快速使用Logstash后,现在详细讲述一下Logstash的部署方式。

    3.1 安装


  • 安装JDK:Logstash采用JRuby编写,运行需要JDK环境,因此安装Logstash前需要先安装JDK。(当前6.4仅支持JDK8)
  • 安装Logstash:可以采用直接下载压缩包方式安装,也通过APT或YUM安装,另外Logstash支持安装到Docker中。[[Logstash安装参考](https://www.elastic.co/guide/e ... h.html)]
  • 安装X-PACK:在6.3及之后版本X-PACK会随Logstash安装,在此之前需要手动安装[[参考链接](https://www.elastic.co/guide/e ... g.html)]

    3.2 目录结构


    logstash的目录主要包括:根目录bin目录配置目录日志目录插件目录数据目录

    不同安装方式各目录的默认位置参考[[此处](https://www.elastic.co/guide/e ... t.html)]

    3.3 配置文件


  • Pipeline配置文件,名称可以自定义,在启动Logstash时显式指定,编写方式可以参考前面示例,对于具体插件的配置方式参见具体插件的说明(使用Logstash时必须配置):
    用于定义一个pipeline,数据处理方式和输出源
  • Settings配置文件(可以使用默认配置):
    在使用Logstash时可以不用设置,用于性能调优,日志记录等
    • logstash.yml:用于控制logstash的执行过程[[参考链接](https://www.elastic.co/guide/e ... e.html)]
    • pipelines.yml: 如果有多个pipeline时使用该配置来配置多pipeline执行[[参考链接](https://www.elastic.co/guide/e ... s.html)]
    • jvm.options:jvm的配置
    • log4j2.properties:log4j 2的配置,用于记录logstash运行日志[[参考链接](https://www.elastic.co/guide/e ... log4j2)]
    • startup.options: 仅适用于Lniux系统,用于设置系统启动项目!
  • 为了保证敏感配置的安全性,logstash提供了配置加密功能[[参考链接](https://www.elastic.co/guide/e ... e.html)]

    3.4 启动关闭方式


    3.4.1 启动


  • [命令行启动](https://www.elastic.co/guide/e ... e.html)
  • [在debian和rpm上以服务形式启动](https://www.elastic.co/guide/e ... h.html)
  • [在docker中启动](https://www.elastic.co/guide/e ... r.html)3.4.2 关闭
  • [关闭Logstash](https://www.elastic.co/guide/e ... n.html)
  • Logstash的关闭时会先关闭input停止输入,然后处理完所有进行中的事件,然后才完全停止,以防止数据丢失,但这也导致停止过程出现延迟或失败的情况。

    3.5 扩展Logstash


    当单个Logstash无法满足性能需求时,可以采用横向扩展的方式来提高Logstash的处理能力。横向扩展的多个Logstash相互独立,采用相同的pipeline配置,另外可以在这多个Logstash前增加一个LoadBalance,以实现多个Logstash的负载均衡。

    4 性能调优


    [[详细调优参考](https://www.elastic.co/guide/e ... g.html)]

  • (1)Inputs和Outputs的性能:当输入输出源的性能已经达到上限,那么性能瓶颈不在Logstash,应优先对输入输出源的性能进行调优。
  • (2)系统性能指标
    • CPU:确定CPU使用率是否过高,如果CPU过高则先查看JVM堆空间使用率部分,确认是否为GC频繁导致,如果GC正常,则可以通过调节Logstash worker相关配置来解决。
    • 内存:由于Logstash运行在JVM上,因此注意调整JVM堆空间上限,以便其有足够的运行空间。另外注意Logstash所在机器上是否有其他应用占用了大量内存,导致Logstash内存磁盘交换频繁。
    • I/O使用率
      1)磁盘IO
      磁盘IO饱和可能是因为使用了会导致磁盘IO饱和的创建(如file output),另外Logstash中出现错误产生大量错误日志时也会导致磁盘IO饱和。Linux下可以通过iostat, dstat等查看磁盘IO情况
      2)网络IO
      网络IO饱和一般发生在使用有大量网络操作的插件时。linux下可以使用dstat或iftop等查看网络IO情况
  • (3)JVM堆检查
    • 如果JVM堆大小设置过小会导致GC频繁,从而导致CPU使用率过高
    • 快速验证这个问题的方法是double堆大小,看性能是否有提升。注意要给系统至少预留1GB的空间。
    • 为了精确查找问题可以使用jmap或VisualVM。[[参考](https://www.elastic.co/guide/e ... e-heap)]
    • 设置Xms和Xmx为相同值,防止堆大小在运行时调整,这个过程非常消耗性能。
  • (4)Logstash worker设置
    worker相关配置在logstash.yml中,主要包括如下三个:
    • pipeline.workers
      该参数用以指定Logstash中执行filter和output的线程数,当如果发现CPU使用率尚未达到上限,可以通过调整该参数,为Logstash提供更高的性能。建议将Worker数设置适当超过CPU核数可以减少IO等待时间对处理过程的影响。实际调优中可以先通过-w指定该参数,当确定好数值后再写入配置文件中。
    • pipeline.batch.size:
      该指标用于指定单个worker线程一次性执行flilter和output的event批量数。增大该值可以减少IO次数,提高处理速度,但是也以为这增加内存等资源的消耗。当与Elasticsearch联用时,该值可以用于指定Elasticsearch一次bluck操作的大小。
    • pipeline.batch.delay:
      该指标用于指定worker等待时间的超时时间,如果worker在该时间内没有等到pipeline.batch.size个事件,那么将直接开始执行filter和output而不再等待。

      结束语


      Logstash作为Elastic Stack的重要组成部分,在Elasticsearch数据采集和处理过程中扮演着重要的角色。本文通过简单示例的演示和Logstash基础知识的铺陈,希望可以帮助初次接触Logstash的用户对Logstash有一个整体认识,并能较为快速上手。对于Logstash的高阶使用,仍需要用户在使用过程中结合实际情况查阅相关资源深入研究。当然也欢迎大家积极交流,并对文中的错误提出宝贵意见。

      MORE:


  • [Logstash数据处理常见示例](https://www.elastic.co/guide/e ... n.html)
  • [Logstash日志相关配置参考](https://www.elastic.co/guide/e ... g.html)
  • [Kibana管理Logstash pipeline配置](https://www.elastic.co/guide/e ... t.html)
  • [LogstashModule](https://www.elastic.co/guide/e ... s.html)
  • [监控Logstash](https://www.elastic.co/guide/e ... h.html)

当Elasticsearch遇见Kafka--Kafka Connect

Elasticsearchmushao999 发表了文章 • 2 个评论 • 13168 次浏览 • 2018-11-17 11:15 • 来自相关话题

本文同步发布在腾讯云+社区Elasticsearch专栏中:https://cloud.tencent.com/developer/column/4008
在“[当Elasticsearch遇见Kafka--Logstash kafka input插件](https://cloud.tencent.com/developer/article/1362320)”一文中,我对Logstash的Kafka input插件进行了简单的介绍,并通过实际操作的方式,为大家呈现了使用该方式实现Kafka与Elastisearch整合的基本过程。可以看出使用Logstash input插件的方式,具有配置简单,数据处理方便等优点。然而使用Logstash Kafka插件并不是Kafka与Elsticsearch整合的唯一方案,另一种比较常见的方案是使用Kafka的开源组件Kafka Connect。

![Confluent实现Kafka与Elasticsearch的连接](https://main.qcloudimg.com/raw ... a3.png)

1 Kafka Connect简介


Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

Kafka connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)

Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)

关于Kafka Connect的详细情况可以参考[[Kafka Connect](https://docs.confluent.io/curr ... x.html)]

2 使用Kafka Connect连接Kafka和Elasticsearch


2.1 测试环境准备


本文与使用Logstash Kafka input插件环境一样[[传送门](https://cloud.tencent.com/developer/article/1362320)],组件列表如下

| 服务 | ip | port |
|:----|:----|:----|
| Elasticsearch service | 192.168.0.8 | 9200 |
| Ckafka | 192.168.13.10 | 9092 |
| CVM | 192.168.0.13 | - |


kafka topic也复用原来了的kafka_es_test

2.2 Kafka Connect 安装


[[Kafka Connec下载地址](https://www.confluent.io/download/)]

本文下载的为开源版本confluent-oss-5.0.1-2.11.tar.gz,下载后解压

2.3 Worker配置


1) 配置参考

如前文所说,worker分为Standalone和Distributed两种模式,针对两种模式的配置,参考如下

[[通用配置](https://www.confluent.io/download/)]

[[Standalone Woker配置](https://www.confluent.io/download/)]

[[Distributed Worker配置](https://www.confluent.io/download/)]

此处需要注意的是Kafka Connect默认使用AvroConverter,使用该AvroConverter时需要注意必须启动Schema Registry服务

2) 实际操作

本测试使用standalone模式,因此修改/root/confluent-5.0.1/etc/schema-registry/connect-avro-standalone.properties

<br /> bootstrap.servers=192.168.13.10:9092<br />

2.4 Elasticsearch Connector配置


1) 配置参考

[[Connectors通用配置](https://docs.confluent.io/curr ... ectors)]

[[Elasticsearch Configuration Options](https://docs.confluent.io/curr ... s.html)]

2) 实际操作

修改/root/confluent-5.0.1/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

<br /> name=elasticsearch-sink<br /> connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector<br /> tasks.max=1<br /> topics=kafka_es_test<br /> key.ignore=true<br /> connection.url=http://192.168.0.8:9200<br /> type.name=kafka-connect<br />

注意: 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

2.5 启动connector


1 注意事项

1) 由于配置文件中jar包位置均采用的相对路径,因此建议在confluent根目录下执行命令和启动程序,以避免不必要的问题

2) 如果前面没有修改converter,仍采用AvroConverter, 注意需要在启动connertor前启动Schema Registry服务

2 启动Schema Registry服务

正如前文所说,由于在配置worker时指定使用了AvroConverter,因此需要启动Schema Registry服务。而该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。由于CKafka不支持用户通过接口形式创建topic,因此需要在本机起一个kafka以创建名为_schema的topic。

1) 启动Zookeeper

<br /> ./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties<br />

2) 启动kafka

<br /> ./bin/kafka-server-start -daemon etc/kafka/server.properties<br />

3) 启动schema Registry

<br /> ./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties<br />

4) 使用netstat -natpl 查看各服务端口是否正常启动

zookeeper 2181

kafka 9092

schema registry 8081

3 启动Connector

<br /> ./bin/connect-standalone -daemon etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties<br />

ps:以上启动各服务均可在logs目录下找到对应日志

2.6 启动Kafka Producer


由于我们采用的是AvroConverter,因此不能采用Kafka工具包中的producer。Kafka Connector bin目录下提供了Avro Producer

1) 启动Producer

<br /> ./bin/kafka-avro-console-producer --broker-list 192.168.13.10:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'<br />

2) 输入如下数据

<br /> {"nickname":"michel"}<br /> {"nickname":"mushao"}<br />

2.7 Kibana验证结果


1) 查看索引

在kibana Dev Tools的Console中输入

<br /> GET _cat/indices<br />

结果

<br /> green open kafka_es_test 36QtDP6vQOG7ubOa161wGQ 5 1 1 0 7.9kb 3.9kb<br /> green open .kibana QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb<br />

可以看到名为kafka_es_test的索引被成功创建

2) 查看数据

<br /> {<br /> "took": 0,<br /> "timed_out": false,<br /> "_shards": {<br /> "total": 5,<br /> "successful": 5,<br /> "skipped": 0,<br /> "failed": 0<br /> },<br /> "hits": {<br /> "total": 2,<br /> "max_score": 1,<br /> "hits": [<br /> {<br /> "_index": "kafka_es_test",<br /> "_type": "kafka-connect",<br /> "_id": "kafka_es_test+0+0",<br /> "_score": 1,<br /> "_source": {<br /> "nickname": "michel"<br /> }<br /> },<br /> {<br /> "_index": "kafka_es_test",<br /> "_type": "kafka-connect",<br /> "_id": "kafka_es_test+0+1",<br /> "_score": 1,<br /> "_source": {<br /> "nickname": "mushao"<br /> }<br /> }<br /> ]<br /> }<br /> }<br />

可以看到数据已经被成功写入

3 Confluent CLI


3.1 简介


查阅资料时发现很多文章都是使用Confluent CLI启动Kafka Connect,然而官方文档已经明确说明了该CLI只是适用于开发阶段,不能用于生产环境。

它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务。但是这些服务对于Kafka Connect都不是必须的,如果不使用AvroConverter,则只需要启动Connect即可。即使使用了AvroConverter, 也只需要启动schema registry,将schema保存在远端的kafka中。Kafka Connect REST API也只是为用户提供一个管理connector的接口,也不是必选的。

另外使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置

3.2 使用Confluent CLI


confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等,详情参考如下简介视频 [[Introducing the Confluent CLI | Screencast](https://www.youtube.com/watch?v=ZKqBptBHZTg)]

1) 启动

<br /> ./bin/confluent start<br />

2) 检查confluent运行状态

<br /> ./bin/confluent status<br />

当得到如下结果则说明confluent启动成功

<br /> ksql-server is [UP]<br /> connect is [UP]<br /> kafka-rest is [UP]<br /> schema-registry is [UP]<br /> kafka is [UP]<br /> zookeeper is [UP]<br />

3) 问题定位

如果第二步出现问题,可以使用log命令查看,如connect未启动成功则

<br /> ./bin/confluent log connect<br />

4) 加载Elasticsearch Connector

a) 查看connector

<br /> ./bin/confluent list connectors<br />

结果

<br /> Bundled Predefined Connectors (edit configuration under etc/):<br /> elasticsearch-sink<br /> file-source<br /> file-sink<br /> jdbc-source<br /> jdbc-sink<br /> hdfs-sink<br /> s3-sink<br />

b) 加载Elasticsearch connector

<br /> ./bin/confluent load elasticsearch-sink<br />

结果

<br /> {<br /> "name": "elasticsearch-sink",<br /> "config": {<br /> "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",<br /> "tasks.max": "1",<br /> "topics": "kafka_es_test",<br /> "key.ignore": "true",<br /> "connection.url": "<a href="http://192.168.0.8:9200"" rel="nofollow" target="_blank">http://192.168.0.8:9200"</a>,<br /> "type.name": "kafka-connect",<br /> "name": "elasticsearch-sink"<br /> },<br /> "tasks": [],<br /> "type": null<br /> }<br />

5) 使用producer生产数据,并使用kibana验证是否写入成功

4 Kafka Connect Rest API


Kafka Connect提供了一套完成的管理Connector的接口,详情参考[[Kafka Connect REST Interface](https://docs.confluent.io/curr ... i.html)]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

1) GET connectors 获取运行中的connector列表

2) POST connectors 使用指定的名称和配置创建connector

3) GET connectors/(string:name) 获取connector的详细信息

4) GET connectors/(string:name)/config 获取connector的配置

5) PUT connectors/(string:name)/config 设置connector的配置

6) GET connectors/(string:name)/status 获取connector状态

7) POST connectors/(stirng:name)/restart 重启connector

8) PUT connectors/(string:name)/pause 暂停connector

9) PUT connectors/(string:name)/resume 恢复connector

10) DELETE connectors/(string:name)/ 删除connector

11) GET connectors/(string:name)/tasks 获取connectors任务列表

12) GET /connectors/(string: name)/tasks/(int: taskid)/status 获取任务状态

13) POST /connectors/(string: name)/tasks/(int: taskid)/restart 重启任务

14) GET /connector-plugins/ 获取已安装插件列表

15) PUT /connector-plugins/(string: name)/config/validate 验证配置

5 总结


Kafka Connect是Kafka一个功能强大的组件,为kafka提供了与外部系统连接的一套完整方案,包括数据传输,连接管理,监控,多副本等。相对于Logstash Kafka插件,功能更为全面,但配置也相对为复杂些。有文章提到其性能也优于Logstash Kafka Input插件,如果对写入性能比较敏感的场景,可以在实际压测的基础上进行选择。另外由于直接将数据从Kafka写入Elasticsearch, 如果需要对文档进行处理时,选择Logstash可能更为方便。

社区日报 第451期 (2018-11-17)

社区日报bsll 发表了文章 • 0 个评论 • 1119 次浏览 • 2018-11-17 10:48 • 来自相关话题

  1. 如何迁移到kibana空间
    [http://t.cn/E2yMhZi](http://t.cn/E2yMhZi)

  2. 利用kibana空间优化管理权限。
    [http://t.cn/E2yJq1b](http://t.cn/E2yJq1b)

  3. Elasitcsearch索引优化。
    [http://t.cn/E2y6afZ](http://t.cn/E2y6afZ)

关于ik_max_word分词的疑问

Elasticsearchgreenjim20 回复了问题 • 2 人关注 • 3 个回复 • 5616 次浏览 • 2018-11-16 18:55 • 来自相关话题

filebeat无法向Elasticsearch 发送日志数据

Beatsccsy 回复了问题 • 4 人关注 • 3 个回复 • 3329 次浏览 • 2018-11-16 18:15 • 来自相关话题

filebeat 发送日志到logstash ,logstash 报错

Logstashccsy 回复了问题 • 2 人关注 • 1 个回复 • 2976 次浏览 • 2018-11-16 17:09 • 来自相关话题

社区日报 第450期 (2018-11-16)

社区日报laoyang360 发表了文章 • 0 个评论 • 1112 次浏览 • 2018-11-16 13:27 • 来自相关话题

1、喜大普奔!Elastic6.5发布
http://t.cn/E2PPJH2
2、Elastic开启了大数据应用新时代
http://t.cn/E2PPCmn
3、图解elasticsearch原理
http://t.cn/E2PPThd

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6138
订阅:https://tinyletter.com/elastic-daily

es建立倒排索引时如何区分字段的?倒排索引是token对应文档的集合,里面是否区分token匹配到一篇文档的哪些字段?

Elasticsearchweizijun 回复了问题 • 2 人关注 • 1 个回复 • 5077 次浏览 • 2018-11-16 12:46 • 来自相关话题

使用es 前端分页 有好的方法吗?深度分页不行,数据量太大

ElasticsearchErza 回复了问题 • 6 人关注 • 3 个回复 • 3533 次浏览 • 2018-11-16 09:16 • 来自相关话题

社区日报 第449期 (2018-11-15)

社区日报白衬衣 发表了文章 • 0 个评论 • 2068 次浏览 • 2018-11-15 18:49 • 来自相关话题

1.基于Lucene查询原理分析Elasticsearch的性能
http://t.cn/EwZO5to
2.一个让elastalert报警更简单的UI
http://t.cn/EAgg8WQ
3.Filebeat优化实践
http://t.cn/EAge74i
 
编辑:金桥
归档:https://elasticsearch.cn/article/6137
订阅:https://tinyletter.com/elastic-daily

es multi_match 索引优化

Elasticsearchrochy 回复了问题 • 2 人关注 • 1 个回复 • 2001 次浏览 • 2018-11-15 18:17 • 来自相关话题

请教一问题,elasticsearch如何实现句内检索或者段内检索

Elasticsearchrochy 回复了问题 • 4 人关注 • 2 个回复 • 1809 次浏览 • 2018-11-15 18:13 • 来自相关话题

elasticsearch去重

Elasticsearchzz_hello 回复了问题 • 3 人关注 • 8 个回复 • 4184 次浏览 • 2018-11-15 18:01 • 来自相关话题