logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。
logstash内部主要包含三个模块:
* input: 从数据源获取数据
* filter: 过滤、转换数据
* output: 输出数据
不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。
logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。
本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。
logstash官方提供了有个简单的input plugin example可供参考: https://github.com/logstash-plugins/logstash-input-example/
环境准备
logstash使用jruby开发,首先要配置jruby环境:
-
安装rvm:
rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。
gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB
\curl -sSL https://get.rvm.io | bash -s stable
source /etc/profile.d/rvm.sh
-
安装jruby
rvm install jruby
rvm use jruby
-
安装包管理工具bundle和测试工具rspec
gem install bundle gem install rspec
从example开始
-
clone logstash-input-example
git clone https://github.com/logstash-plugins/logstash-input-example.git
-
将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:
mv logstash-input-example.gemspec logstash-input-cos.gemspec mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb
- 建立的源码目录结构如图所示:
其中,重要文件的作用说明如下:
- cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
- cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
- logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包
配置并下载依赖
因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:
# Gem dependencies
s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_runtime_dependency 'jar-dependencies'
s.add_development_dependency 'logstash-devutils', '1.3.6'
相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。
然后,在logstash-input-cos.gemspec中增加配置:
s.platform = 'java'
这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。
最后,执行以下命令下载依赖:
bundle install
编写代码
logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。
jar包的引用
因为要调用cos java sdk中的代码,先引用该jar包:
require 'cos_api-5.4.4.jar'
java_import com.qcloud.cos.COSClient;
java_import com.qcloud.cos.ClientConfig;
java_import com.qcloud.cos.auth.BasicCOSCredentials;
java_import com.qcloud.cos.auth.COSCredentials;
java_import com.qcloud.cos.exception.CosClientException;
java_import com.qcloud.cos.exception.CosServiceException;
java_import com.qcloud.cos.model.COSObjectSummary;
java_import com.qcloud.cos.model.ListObjectsRequest;
java_import com.qcloud.cos.model.ObjectListing;
java_import com.qcloud.cos.region.Region;
读取配置文件
logstash配置文件读取的代码如图所示:
config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:
input {
cos {
"endpoint" => "cos.ap-guangzhou.myqcloud.com"
"access_key_id" => "*****"
"access_key_secret" => "****"
"bucket" => "******"
"region" => "ap-guangzhou"
"appId" => "**********"
"interval" => 60
}
}
output {
stdout {
codec=>rubydebug
}
}
实现register方法
logstash input插件必须实现另个方法:register 和run
register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:
# 1 初始化用户身份信息(appid, secretId, secretKey)
cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
# 2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
# 3 生成cos客户端
@cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
# bucket名称, 需包含appid
bucketName = @bucket + "-"+ @appId
@bucketName = bucketName
@listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()
# 设置bucket名称
@listObjectsRequest.setBucketName(bucketName)
# prefix表示列出的object的key以prefix开始
@listObjectsRequest.setPrefix(@prefix)
# 设置最大遍历出多少个对象, 一次listobject最大支持1000
@listObjectsRequest.setMaxKeys(1000)
@listObjectsRequest.setMarker(@markerConfig.getMarker)
示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。
注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().
实现run方法
run方法获取数据并将数据流转换成event事件
最简单的run方法为:
def run(queue)
Stud.interval(@interval) do
event = LogStash::Event.new("message" => @message, "host" => @host)
decorate(event)
queue << event
end # loop
end # def run
代码说明:
- 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
- 生成event, 示例代码生成了一个包含两个字段数据的event
- 调用decorate()方法, 给该event打上tag,如果配置的话
- queue<<event, 将event插入到数据管道中,发送给filter处理
logstash-input-cos的run方法实现为:
def run(queue)
@current_thread = Thread.current
Stud.interval(@interval) do
process(queue)
end
end
def process(queue)
@logger.info('Marker from: ' + @markerConfig.getMarker)
objectListing = @cosclient.listObjects(@listObjectsRequest)
nextMarker = objectListing.getNextMarker()
cosObjectSummaries = objectListing.getObjectSummaries()
cosObjectSummaries.each do |obj|
# 文件的路径key
key = obj.getKey()
if stop?
@logger.info("stop while attempting to read log file")
break
end
# 根据key获取内容
getObject(key) { |log|
# 发送消息
@codec.decode(log) do |event|
decorate(event)
queue << event
end
}
#记录 marker
@markerConfig.setMarker(key)
@logger.info('Marker end: ' + @markerConfig.getMarker)
end
end
# 获取下载输入流
def getObject(key, &block)
getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
cosObject = @cosclient.getObject(getObjectRequest)
cosObjectInput = cosObject.getObjectContent()
buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
while (line = buffered.readLine())
block.call(line)
end
end
测试代码
在spec/inputs/cos_spec.rb中增加如下测试代码:
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"
describe LogStash::Inputs::Cos do
it_behaves_like "an interruptible input plugin" do
let(:config) { {
"endpoint" => 'cos.ap-guangzhou.myqcloud.com',
"access_key_id" => '*',
"access_key_secret" => '*',
"bucket" => '*',
"region" => 'ap-guangzhou',
"appId" => '*',
"interval" => 60 } }
end
end
rspec是一个ruby测试库,通过bundle命令执行rspec:
bundle exec rspec
如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:
Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures
构建并测试input-plugin-cos
build
使用gem对input-plugin-cos插件源码进行build:
gem build logstash-input-cos.gemspec
构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件
test
在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:
./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
执行结果为:
Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem
Installing logstash-input-cos
Installation successful
另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。
生成配置文件cos.logstash.conf,内容为:
input {
cos {
"endpoint" => "cos.ap-guangzhou.myqcloud.com"
"access_key_id" => "*****"
"access_key_secret" => "****"
"bucket" => "******"
"region" => "ap-guangzhou"
"appId" => "**********"
"interval" => 60
}
}
output {
stdout {
codec=>rubydebug
}
}
该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。
执行logstash:
./bin/logstash -f cos.logstash.conf
输出结果为:
Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties
[2018-07-30T19:26:17,039][WARN ][logstash.runner ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.
[2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}
[2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}
[2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.
[2018-07-30T19:26:17,341][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}
[2018-07-30T19:26:17,528][INFO ][logstash.pipeline ] Pipeline main started
[2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos ] Marker from:
log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[2018-07-30T19:26:17,574][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos ] Marker end: access.log
{
"message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
"@version" => "1",
"@timestamp" => 2018-07-30T11:26:17.710Z
}
{
"message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"http://localhost:8080/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",
"@version" => "1",
"@timestamp" => 2018-07-30T11:26:17.711Z
}
在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。
本文地址:http://elasticsearch.cn/article/6185