最近看到 INFINI Gateway 新增了一个 bulk_reshuffle filter, 于是便简单地测试一下这个功能。(Gateway 下载地址 以及 参考文档)
测试机器配置
系统 | 处理器 | 内存 |
---|---|---|
Macos | 2 GHz 四核Intel Core i5 | 16 GB |
测试所需软件及版本
- Elasticsearch 7.10
- Kibana 7.10
- INFINI Gateway 最新版本
- Logstash 7.10
- Metricbeat 7.10
本文就省略以上软件的下载和安装步骤了。 另外本文中测试 Elasticsearch 集群含两个节点,每个节点配置内存都为 1GB ,其他参数均为默认。
测试步骤
准备测试数据文件
本文测试数据文件 nginx_mock_log ,文件中每行结构如下:
{"timestamp":1611540661651,"method":"POST","msg":"mock log"}
大概一千多万条
Logstatsh 使用 Input file 模式直接输出数据到 Elasticsearch
编辑 Logstash 配置 test.conf 如下:
input{
file {
path => ["/test/nginx_mock_log"]
type => "file_monitor"
start_position => "beginning"
}
}
output{
elasticsearch {
hosts => ["localhost:9200"]
index => "nginx_mock_log"
http_compression => false
}
}
在 kibana 中创建索引 nginx_mock_log ,将主分片设置为2(为了体现出 Gateway的性能优势, 主分片数应设置大于1), 配置如下:
PUT nginx_mock_log
{
"mappings" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"host" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"message" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"path" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"type" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"number_of_shards" : "2"
}
}
运行 Logstash
/usr/local/logstash/bin/logstash -f test.conf
打开 Kibana Stack Monitorning 查看 Indexing Rate 监控指标如下图:
从图中可以看到索引速率基本保持在4300/s 上下
Logstatsh 使用 Input file 模式输出数据到 Gateway
进入 Kibana 删除索引 nginx_mock_log 并重建
DELETE nginx_mock_log
PUT nginx_mock_log
{
"mappings" : {
"properties" : {
"@timestamp" : {
"type" : "date"
},
"@version" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"host" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"message" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"path" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"type" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
},
"settings" : {
"number_of_shards" : "2"
}
}
修改 Logstash 配置 test.conf 如下:
input{
file {
path => ["/test/nginx_mock_log"]
type => "file_monitor"
start_position => "beginning"
}
}
output{
elasticsearch {
hosts => ["localhost:8000"]
index => "nginx_mock_log"
http_compression => false
}
}
修改 Gateway 配置文件 gateway.yaml 如下:
path.data: data
path.logs: log
entry:
- name: es_gateway #your gateway endpoint
enabled: true
router: default
network:
binding: localhost:8000
reuse_port: true #you can start multi gateway instance, they share same port, to full utilize system's resources
flow:
- name: bulk_es_test
filter: #comment out any filter sections, like you don't need cache or rate-limiter
- name: bulk_reshuffle
parameters:
elasticsearch: dev
level: node
mode: async
- name: elasticsearch
parameters:
elasticsearch: dev
refresh:
enabled: true
interval: 30s
- name: request_logging
filter:
- name: request_logging
parameters:
queue_name: request_logging
router:
- name: default
default_flow: bulk_es_test
tracing_flow: request_logging
elasticsearch:
- name: dev
enabled: true
endpoint: http://localhost:9200 # if your elasticsearch is using https, your gateway should be listen on as https as well
basic_auth: #used to discovery full cluster nodes, or check elasticsearch's health and versions
username: elastic
password: yV6syH3KLt4DxqMlCyag
discovery: # auto discovery elasticsearch cluster nodes
enabled: true
refresh:
enabled: true
modules:
- name: elastic
enabled: true
elasticsearch: dev
store:
enabled: true
orm:
enabled: true
init_template: true
template_name: ".infini-default1"
index_prefix: "gateway_"
- name: pipeline
enabled: true
runners:
- name: nodes_index
enabled: true
max_go_routine: 2
threshold_in_ms: 0
timeout_in_ms: 5000
pipeline_id: bulk_request_ingest
- name: request_logging_test_name
enabled: true
max_go_routine: 2
threshold_in_ms: 0
timeout_in_ms: 5000
pipeline_id: request_logging_index
pipelines:
- name: bulk_request_ingest
start:
joint: bulk_indexing
enabled: true
parameters:
elasticsearch: "dev"
timeout: "5s"
worker_size: 10
bulk_size_in_mb: 1 #in MB
- name: request_logging_index
start:
joint: json_indexing
enabled: true
parameters:
index_name: "gateway_requests"
elasticsearch: "dev"
input_queue: "request_logging"
timeout: "5s"
worker_size: 10
bulk_size_in_mb: 1 #in MB
queue:
min_msg_size: 1
max_msg_size: 5000000000
max_bytes_per_file: 53687091200
sync_every_records: 100000 # sync by records count
sync_timeout_in_ms: 10000 # sync by time in million seconds
write_chan_buffer: 1000
read_chan_buffer: 1000
以上各配置节点含义,请参考 Gateway 文档
启动 Gateway
./gateway
删除 Logstash data 目录
rm -rf /usr/local/logstash/data
启动 Logstash
/usr/local/logstash/bin/logstash -f test.conf
打开 Kibana Stack Monitorning 查看 Indexing Rate 监控指标如下图:
从上图后半部分可以看到索引速率可以保持在 25000/s 上下(一会儿的功夫,一千多万条数据导入ES完事了)
前面看到 Gateway 配置开启了 request_logging,因此可以在 Kibana Dashboard 里面的 INFINI Gateway Dashboard 查看请求信息,如下图:
注意,上面图中的请求速率是 _bulk 请求的速率,不是索引速率
总结
从测试结果来看,相同环境下,用 Logstash elasticsearch output 输出数据到 Gateway 的方式比 Logstash elasticsearch output 直接到 ES 的方式速率快了4倍,不得不说这速率是真的杠杠的。至于能不能通过参数调优再提升速率呢?大家有兴趣的自己下载测试吧!最后感谢 medcl 大神出品。
本文地址:http://elasticsearch.cn/article/14228
14 个评论
👍👍 ,很高兴看到索引速度有提升,目前看来瓶颈在 Logstash,可以试着调优一下 Logstash 再看看。
medcl 回复 XiongYing86
为啥我用spark写入并没有明显的提升呢?