环境: `CentOS7.2`, `Logstash5.x`, `Kafka-0.10.1`, `logstash-codec-avro-3.0.0`, `logstash-output-kafka-6.1.3`
在日志采集方面使用了Logstash进行收集,由于考虑到性能的问题,采用了apache avro进行序列化,发现网上也有人提供此插件,于是把搭建了个简单的环境进行测试,验证是可以实现的。不过在压测的时候发现个问题,后面数据处理的速度并不快,跟踪下来发现Logstash是line的形式发送消息的,不知道能否实现用批量的形式发送呢?查看过`logstash-codec-avro`的源码,但不知道如何修改这块,有人遇过到类似的问题吗?求指教。
`logstash-codec-avro`的源码:
```
public
def encode(event)
dw = Avro::IO::DatumWriter.new(@schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(event.to_hash, encoder)
@on_event.call(event, Base64.strict_encode64(buffer.string))
end
```
Logstash配置文件:
```
input {
file {
path => "/data/tmp/*.log"
start_position => beginning
codec => "json"
}
}
filter {
json {
remove_field => [ "path","@timestamp","@version","host" ]
source => message
}
}
output {
stdout {
codec => plain {
format => "%{message}"
}
}
kafka {
client_id => "logstash-test"
bootstrap_servers => "xxx:9092,xxx:9093"
topic_id => "logstash-test"
compression_type => "snappy"
retries => 5
codec => avro {
schema_uri => "/root/User.avsc"
}
}
}
```
在日志采集方面使用了Logstash进行收集,由于考虑到性能的问题,采用了apache avro进行序列化,发现网上也有人提供此插件,于是把搭建了个简单的环境进行测试,验证是可以实现的。不过在压测的时候发现个问题,后面数据处理的速度并不快,跟踪下来发现Logstash是line的形式发送消息的,不知道能否实现用批量的形式发送呢?查看过`logstash-codec-avro`的源码,但不知道如何修改这块,有人遇过到类似的问题吗?求指教。
`logstash-codec-avro`的源码:
```
public
def encode(event)
dw = Avro::IO::DatumWriter.new(@schema)
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new(buffer)
dw.write(event.to_hash, encoder)
@on_event.call(event, Base64.strict_encode64(buffer.string))
end
```
Logstash配置文件:
```
input {
file {
path => "/data/tmp/*.log"
start_position => beginning
codec => "json"
}
}
filter {
json {
remove_field => [ "path","@timestamp","@version","host" ]
source => message
}
}
output {
stdout {
codec => plain {
format => "%{message}"
}
}
kafka {
client_id => "logstash-test"
bootstrap_servers => "xxx:9092,xxx:9093"
topic_id => "logstash-test"
compression_type => "snappy"
retries => 5
codec => avro {
schema_uri => "/root/User.avsc"
}
}
}
```
0 个回复