【 报名开启】2018 Elastic & 袋鼠云 & 阿里云技术沙龙(杭州)

ElasticsearchAllwang 发布了置顶文章 • 1 个评论 • 446 次浏览 • 2018-12-02 20:40 • 来自相关话题

2018 年 Elastic Advent Calendar 分享活动开启了🎤

Adventmedcl 发布了置顶文章 • 35 个评论 • 1690 次浏览 • 2018-11-20 22:33 • 来自相关话题

【线下活动 - 分享主题征集 - 上海】2018 Elastic & 东方航空大数据技术沙龙

活动kennywu76 发布了置顶文章 • 5 个评论 • 845 次浏览 • 2018-10-18 11:10 • 来自相关话题

elastic 过滤器缓存问题

ElasticsearchJackGe 回复了问题 • 3 人关注 • 1 个回复 • 41 次浏览 • 8 小时前 • 来自相关话题

TF/IDF 计算fieldNorm,不同文档字段的词数(fieldNum)不一样,为什么得分结果都一样(版本2.3)

Elasticsearchmedcl 回复了问题 • 2 人关注 • 2 个回复 • 26 次浏览 • 9 小时前 • 来自相关话题

kafka不出数

Beatsmedcl 回复了问题 • 2 人关注 • 1 个回复 • 51 次浏览 • 11 小时前 • 来自相关话题

filebeat如何为原生模块添加字段

Beatsxuebin 回复了问题 • 2 人关注 • 4 个回复 • 1184 次浏览 • 11 小时前 • 来自相关话题

logstash input插件开发

Logstashbellengao 发表了文章 • 0 个评论 • 36 次浏览 • 11 小时前 • 来自相关话题

logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。

logstash内部主要包含三个模块:

  • input: 从数据源获取数据
  • filter: 过滤、转换数据
  • output: 输出数据

    ![](https://main.qcloudimg.com/raw ... ee.png)

    不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。

    logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。

    本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。

    logstash官方提供了有个简单的input plugin example可供参考:
    [https://github.com/logstash-pl ... mple/](https://github.com/logstash-pl ... ample/)


    环境准备


    logstash使用jruby开发,首先要配置jruby环境:

    1. 安装rvm:

      rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。

      <br /> gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB<br />

      <br /> \curl -sSL <a href="https://get.rvm.io" rel="nofollow" target="_blank">https://get.rvm.io</a> | bash -s stable<br />

      <br /> source /etc/profile.d/rvm.sh<br />
    2. 安装jruby

      <br /> rvm install jruby<br />

      <br /> rvm use jruby<br />
    3. 安装包管理工具bundle和测试工具rspec

      <br /> gem install bundle<br /> gem install rspec<br />

      从example开始


    4. clone logstash-input-example

      <br /> git clone <a href="https://github.com/logstash-plugins/logstash-input-example.git" rel="nofollow" target="_blank">https://github.com/logstash-pl ... e.git</a><br />
    5. 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:

      <br /> mv logstash-input-example.gemspec logstash-input-cos.gemspec<br /> mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb<br /> mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb<br />
    6. 建立的源码目录结构如图所示:

      ![](https://main.qcloudimg.com/raw ... 7d.png)

      其中,重要文件的作用说明如下:

      • cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
      • cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
      • logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包

        配置并下载依赖


        因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:

        ```

        Gem dependencies

        s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
        s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
        s.add_runtime_dependency 'logstash-codec-plain'
        s.add_runtime_dependency 'stud', '>= 0.0.22'
        s.add_runtime_dependency 'jar-dependencies'
        s.add_development_dependency 'logstash-devutils', '1.3.6'
        <br /> 相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。<br /> <br /> 然后,在logstash-input-cos.gemspec中增加配置:<br /> <br />
        s.platform = 'java'
        <br /> 这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。<br /> <br /> 最后,执行以下命令下载依赖:<br /> <br />
        bundle install
        ```

        编写代码


        logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。

        jar包的引用

        因为要调用cos java sdk中的代码,先引用该jar包:

        <br /> require 'cos_api-5.4.4.jar'<br /> java_import com.qcloud.cos.COSClient;<br /> java_import com.qcloud.cos.ClientConfig;<br /> java_import com.qcloud.cos.auth.BasicCOSCredentials;<br /> java_import com.qcloud.cos.auth.COSCredentials;<br /> java_import com.qcloud.cos.exception.CosClientException;<br /> java_import com.qcloud.cos.exception.CosServiceException;<br /> java_import com.qcloud.cos.model.COSObjectSummary;<br /> java_import com.qcloud.cos.model.ListObjectsRequest;<br /> java_import com.qcloud.cos.model.ObjectListing;<br /> java_import com.qcloud.cos.region.Region;<br />

        读取配置文件


        logstash配置文件读取的代码如图所示:
        ![](https://main.qcloudimg.com/raw ... 12.png)

        config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:

        <br /> input {<br /> cos {<br /> "endpoint" => "cos.ap-guangzhou.myqcloud.com"<br /> "access_key_id" => "*****"<br /> "access_key_secret" => "****"<br /> "bucket" => "******"<br /> "region" => "ap-guangzhou"<br /> "appId" => "**********"<br /> "interval" => 60<br /> }<br /> }<br /> <br /> output {<br /> stdout {<br /> codec=>rubydebug<br /> }<br /> }<br />

        实现register方法


        logstash input插件必须实现另个方法:register 和run

        register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:

        ```

        1 初始化用户身份信息(appid, secretId, secretKey)

        cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)

        2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224

        clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))

        3 生成cos客户端

        @cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)

        bucket名称, 需包含appid

        bucketName = @bucket + "-"+ @appId
        @bucketName = bucketName

        @listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()

        设置bucket名称

        @listObjectsRequest.setBucketName(bucketName)

        prefix表示列出的object的key以prefix开始

        @listObjectsRequest.setPrefix(@prefix)

        设置最大遍历出多少个对象, 一次listobject最大支持1000

        @listObjectsRequest.setMaxKeys(1000)
        @listObjectsRequest.setMarker(@markerConfig.getMarker)
        ```
        示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。

        注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().

        实现run方法


        run方法获取数据并将数据流转换成event事件

        最简单的run方法为:

        <br /> def run(queue)<br /> Stud.interval(@interval) do<br /> event = LogStash::Event.new("message" => @message, "host" => @host)<br /> decorate(event)<br /> queue << event<br /> end # loop<br /> end # def run<br />
        代码说明:

      • 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
      • 生成event, 示例代码生成了一个包含两个字段数据的event
      • 调用decorate()方法, 给该event打上tag,如果配置的话
      • queue<<event, 将event插入到数据管道中,发送给filter处理


        logstash-input-cos的run方法实现为:

        ```
        def run(queue)
        @current_thread = Thread.current
        Stud.interval(@interval) do
        process(queue)
        end
        end

        def process(queue)
        @logger.info('Marker from: ' + @markerConfig.getMarker)

        objectListing = @cosclient.listObjects(@listObjectsRequest)
        nextMarker = objectListing.getNextMarker()
        cosObjectSummaries = objectListing.getObjectSummaries()
        cosObjectSummaries.each do |obj|

        文件的路径key

        key = obj.getKey()

        if stop?
        @logger.info("stop while attempting to read log file")
        break
        end

        根据key获取内容

        getObject(key) { |log|

        发送消息

        @codec.decode(log) do |event|
        decorate(event)
        queue << event
        end
        }

        记录 marker

        @markerConfig.setMarker(key)
        @logger.info('Marker end: ' + @markerConfig.getMarker)
        end
        end


        获取下载输入流

        def getObject(key, &block)
        getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
        cosObject = @cosclient.getObject(getObjectRequest)
        cosObjectInput = cosObject.getObjectContent()
        buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
        while (line = buffered.readLine())
        block.call(line)
        end
        end
        ```

        测试代码

        在spec/inputs/cos_spec.rb中增加如下测试代码:

        ```

        encoding: utf-8

        require "logstash/devutils/rspec/spec_helper"
        require "logstash/inputs/cos"

        describe LogStash::Inputs::Cos do

        it_behaves_like "an interruptible input plugin" do
        let(:config) { {
        "endpoint" => 'cos.ap-guangzhou.myqcloud.com',
        "access_key_id" => '',
        "access_key_secret" => '
        ',
        "bucket" => '',
        "region" => 'ap-guangzhou',
        "appId" => '
        ',
        "interval" => 60 } }
        end
        end

        <br /> <br /> rspec是一个ruby测试库,通过bundle命令执行rspec:<br /> <br />
        bundle exec rspec
        <br /> 如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:<br /> <br />
        Finished in 0.8022 seconds (files took 3.45 seconds to load)
        1 example, 0 failures
        ```

        构建并测试input-plugin-cos


        build


        使用gem对input-plugin-cos插件源码进行build:

        <br /> gem build logstash-input-cos.gemspec<br />
        构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件

        test

        在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:

        <br /> ./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem<br />
        执行结果为:

        <br /> Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem<br /> Installing logstash-input-cos<br /> Installation successful<br />

        另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。

        生成配置文件cos.logstash.conf,内容为:

        <br /> input {<br /> cos {<br /> "endpoint" => "cos.ap-guangzhou.myqcloud.com"<br /> "access_key_id" => "*****"<br /> "access_key_secret" => "****"<br /> "bucket" => "******"<br /> "region" => "ap-guangzhou"<br /> "appId" => "**********"<br /> "interval" => 60<br /> }<br /> }<br /> <br /> output {<br /> stdout {<br /> codec=>rubydebug<br /> }<br /> }<br />
        该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。

        执行logstash:

        <br /> ./bin/logstash -f cos.logstash.conf<br />

        输出结果为:

        <br /> Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties<br /> [2018-07-30T19:26:17,039][WARN ][logstash.runner ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.<br /> [2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}<br /> [2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}<br /> [2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.<br /> [2018-07-30T19:26:17,341][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}<br /> [2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}<br /> [2018-07-30T19:26:17,528][INFO ][logstash.pipeline ] Pipeline main started<br /> [2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos ] Marker from:<br /> log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).<br /> log4j:WARN Please initialize the log4j system properly.<br /> log4j:WARN See <a href="http://logging.apache.org/log4j/1.2/faq.html#noconfig" rel="nofollow" target="_blank">http://logging.apache.org/log4 ... onfig</a> for more info.<br /> [2018-07-30T19:26:17,574][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}<br /> [2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos ] Marker end: access.log<br /> {<br /> "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",<br /> "@version" => "1",<br /> "@timestamp" => 2018-07-30T11:26:17.710Z<br /> }<br /> {<br /> "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"<a href="http://localhost:8080/" rel="nofollow" target="_blank">http://localhost:8080/</a>\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",<br /> "@version" => "1",<br /> "@timestamp" => 2018-07-30T11:26:17.711Z<br /> }<br />
        在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。


jdbc oracle驱动问题

回复

LogstashGLC 回复了问题 • 1 人关注 • 1 个回复 • 40 次浏览 • 13 小时前 • 来自相关话题

ES参数cluster.routing.allocation.disk.watermark.high值,疑惑

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 99 次浏览 • 14 小时前 • 来自相关话题

build_scorer很长时间

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 52 次浏览 • 15 小时前 • 来自相关话题

安装search guard后,使用客户端代码连接ES。ES版本是6.5.1。

回复

Elasticsearchly898197688 发起了问题 • 1 人关注 • 0 个回复 • 66 次浏览 • 15 小时前 • 来自相关话题

filebeat -> logstash 失败的问题

Beatsmedcl 回复了问题 • 3 人关注 • 2 个回复 • 72 次浏览 • 16 小时前 • 来自相关话题

kibana 作图 advanced 里的json input怎么使用

Kibanamedcl 回复了问题 • 2 人关注 • 1 个回复 • 95 次浏览 • 16 小时前 • 来自相关话题

ES数据快照到HDFS

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 62 次浏览 • 15 小时前 • 来自相关话题

关于滚动模式,关注terms耗时的化是不是就不需要缩小分片了

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 31 次浏览 • 16 小时前 • 来自相关话题

在filebeat 的source上取值

Logstashmedcl 回复了问题 • 2 人关注 • 1 个回复 • 111 次浏览 • 16 小时前 • 来自相关话题

elasticsearch的索引消费慢问题。

Elasticsearchgodma 回复了问题 • 4 人关注 • 7 个回复 • 227 次浏览 • 16 小时前 • 来自相关话题