如何通过filebeat收集不同来源日志,并根据不同来源filter和output到不同索引
Logstash | 作者 huangjiangong | 发布于2018年05月11日 | 阅读数:6367
收集到不同机器上不同的业务日志,一个为nginx日志,一个为go语言写的服务日志。其中elk整体架构为filebeat-logstash-es-kibana
nginx日志格式:
log_format timed_combined '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'{$http_sc-id} {$http_sc-mode} {$http_sc-product} {$http_sc-timer} '
'$request_time $upstream_response_time';
日志内容:192.168.31.121 - - [11/May/2018:15:58:25 +0800] "POST /api/courtNotice/detail HTTP/1.0" 200 26398 "-" "python-requests/2.9.1" {--id} {--mode} {--product} {--timer} 0.195 0.194
go服务日志格式:"time" level "data" lineno "filename"
日志内容:"2018-05-11T10:26:24+08:00" info "Save Quantum data to S3" 77 "/usr/app/GoProjects/src/dataApi/apis/saicQuantum/thirdDataService.go"
logstash配置文件如下:
input {
beats {
port => 5044
}
}
filter {
if "local_nginx" in [tags] {
grok {
match => { "message" => "%{IP:remote_addr} (?:%{DATA:remote_user}|-) \[%{HTTPDATE:timestamp}\] \"%{DATA:request_method} %{DATA:request_uri}\" %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}|-) \"(?:%{DATA:http_referer}|-)\" \"%{DATA:http_user_agent}\" \{%{DATA:http_sc-id}\} \{%{DATA:http_sc-mode}\} \{%{DATA:http_sc-product}\
} \{%{DATA:http_sc-timer}\} (?:%{DATA:request_time}|-) (?:%{DATA:upstream_response_time}|-)"}
}
mutate {
convert => ["status","integer"]
convert => ["body_bytes_sent","integer"]
convert => ["request_time","float"]
}
geoip {
source=>"remote_addr"
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
}
useragent {
source=>"http_user_agent"
}
}
else if "local_go_log" in [tags] {
grok {
match => { "message" => " \"%{HTTPDATE:time}\" %{DATA:level} \"%{DATA:data}\" %{NUMBER:lineno} \"%{DATA:filename}\""}
}
}
}
output {
if "local_go_log" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => "local_go_service"
}
}
else if "local_nginx" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => "local23-nginx"
}
}
}
问题所在:发现nginx日志已经成功切割成想要的字段,可go服务的日志还是message没有切分成对应的time level data lineno filename等几个字段,求助该如何操作完成,是否配置文件哪里有问题?
nginx日志格式:
log_format timed_combined '$remote_addr - $remote_user [$time_local] '
'"$request" $status $body_bytes_sent '
'"$http_referer" "$http_user_agent" '
'{$http_sc-id} {$http_sc-mode} {$http_sc-product} {$http_sc-timer} '
'$request_time $upstream_response_time';
日志内容:192.168.31.121 - - [11/May/2018:15:58:25 +0800] "POST /api/courtNotice/detail HTTP/1.0" 200 26398 "-" "python-requests/2.9.1" {--id} {--mode} {--product} {--timer} 0.195 0.194
go服务日志格式:"time" level "data" lineno "filename"
日志内容:"2018-05-11T10:26:24+08:00" info "Save Quantum data to S3" 77 "/usr/app/GoProjects/src/dataApi/apis/saicQuantum/thirdDataService.go"
logstash配置文件如下:
input {
beats {
port => 5044
}
}
filter {
if "local_nginx" in [tags] {
grok {
match => { "message" => "%{IP:remote_addr} (?:%{DATA:remote_user}|-) \[%{HTTPDATE:timestamp}\] \"%{DATA:request_method} %{DATA:request_uri}\" %{NUMBER:status} (?:%{NUMBER:body_bytes_sent}|-) \"(?:%{DATA:http_referer}|-)\" \"%{DATA:http_user_agent}\" \{%{DATA:http_sc-id}\} \{%{DATA:http_sc-mode}\} \{%{DATA:http_sc-product}\
} \{%{DATA:http_sc-timer}\} (?:%{DATA:request_time}|-) (?:%{DATA:upstream_response_time}|-)"}
}
mutate {
convert => ["status","integer"]
convert => ["body_bytes_sent","integer"]
convert => ["request_time","float"]
}
geoip {
source=>"remote_addr"
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
}
useragent {
source=>"http_user_agent"
}
}
else if "local_go_log" in [tags] {
grok {
match => { "message" => " \"%{HTTPDATE:time}\" %{DATA:level} \"%{DATA:data}\" %{NUMBER:lineno} \"%{DATA:filename}\""}
}
}
}
output {
if "local_go_log" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => "local_go_service"
}
}
else if "local_nginx" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => "local23-nginx"
}
}
}
问题所在:发现nginx日志已经成功切割成想要的字段,可go服务的日志还是message没有切分成对应的time level data lineno filename等几个字段,求助该如何操作完成,是否配置文件哪里有问题?
0 个回复