filebeat 7.3 和logstash7.3 无法发送或接收filebe module模块的日志且无日志错误
Logstash | 作者 yoling1985 | 发布于2019年08月18日 | 阅读数:2777
filebeat7.3 filebeat.yml配置如下:
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/openresty/nginx/logs/access.log
fields:
log_source: nginx
- type: log
enabled: true
paths:
- /usr/local/openresty/nginx/logs/error.log
fields:
log_source: nginx-error
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
output.logstash:
hosts: ["10.0.0.160:5044"]
logstash7.3 配置:
input {
beats {
port => 5044
}
}
filter {
if [fields][log_source] == "nginx" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
if [fields][log_source] == "nginx-error" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
}
output {
if [fields][log_source] == "nginx" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-access-%{+YYYY.MM.dd}"
}
}
if [fields][log_source] == "nginx-error" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-nginxerror-%{+YYYY.MM.dd}"
}
}
}
以上配置信息可以生成日志,如下:
==========================================================
以下配置启用system 模块和nginx模块配置(无法生成日志信息)
1、启用模块配置
filebeat module enable system
filebeat module enable nginx
2、配置system.yml和nginx.yml
cd /etc/filebeat/modules.d/
system.yml 配置:
nginx.yml配置:
3、filebeat.yml 配置如上,未做修改。
4、logstash配置
input {
beats {
port => 5044
}
}
filter {
if [fields][log_source] == "nginx" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
if [fields][log_source] == "nginx-error" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
date {
match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
if [fileset][module] == "nginx" {
if [fileset][name] == "access" {
grok {
match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][agent]"
}
geoip {
source => "[nginx][access][remote_ip]"
target => "[nginx][access][geoip]"
}
}
else if [fileset][name] == "error" {
grok {
match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
remove_field => "message"
}
mutate {
rename => { "@timestamp" => "read_timestamp" }
}
date {
match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
remove_field => "[nginx][error][time]"
}
}
}
}
output {
if [fields][log_source] == "nginx" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-access-%{+YYYY.MM.dd}"
}
}
if [fields][log_source] == "nginx-error" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-nginxerror-%{+YYYY.MM.dd}"
}
}
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-systemlog-%{+YYYY.MM.dd}"
}
}
else if [fileset][name] == "syslog" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-systemlog-%{+YYYY.MM.dd}"
}
}
}
if [fileset][module] == "nginx" {
if [fileset][name] == "access" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-moduleaccess-%{+YYYY.MM.dd}"
}
}
else if [fileset][name] == "error" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-moduleerror-%{+YYYY.MM.dd}"
}
}
}
}
5、日志输出:
filebeat 日志输出
logstash:日志输出:
elasticsearch日志输出:
filebeat debug日志输出:
filebeat.inputs:
- type: log
enabled: true
paths:
- /usr/local/openresty/nginx/logs/access.log
fields:
log_source: nginx
- type: log
enabled: true
paths:
- /usr/local/openresty/nginx/logs/error.log
fields:
log_source: nginx-error
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: true
output.logstash:
hosts: ["10.0.0.160:5044"]
logstash7.3 配置:
input {
beats {
port => 5044
}
}
filter {
if [fields][log_source] == "nginx" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
if [fields][log_source] == "nginx-error" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
}
output {
if [fields][log_source] == "nginx" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-access-%{+YYYY.MM.dd}"
}
}
if [fields][log_source] == "nginx-error" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-nginxerror-%{+YYYY.MM.dd}"
}
}
}
以上配置信息可以生成日志,如下:
==========================================================
以下配置启用system 模块和nginx模块配置(无法生成日志信息)
1、启用模块配置
filebeat module enable system
filebeat module enable nginx
2、配置system.yml和nginx.yml
cd /etc/filebeat/modules.d/
system.yml 配置:
nginx.yml配置:
3、filebeat.yml 配置如上,未做修改。
4、logstash配置
input {
beats {
port => 5044
}
}
filter {
if [fields][log_source] == "nginx" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
geoip {
source => "clientip"
target => "geoip"
database => "/etc/logstash/GeoLite2-City.mmdb"
add_field => ["[geoip][coordinates]","%{[geoip][longitude]}"]
add_field => ["[geoip][coordinates]","%{[geoip][latitude]}"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
if [fields][log_source] == "nginx-error" {
grok {
match => [ "message","%{COMBINEDAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message"]
}
mutate {
convert => ["response","integer"]
convert => ["bytes","integer"]
convert => ["responsetime","float"]
}
date {
match => [ "timestamp","dd/MMM/YYYY:HH:mm:ss Z"]
remove_field => [ "timestamp"]
}
useragent {
source=>"user_agent"
}
}
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} %{DATA:[system][auth][ssh][method]} for (invalid user )?%{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]} port %{NUMBER:[system][auth][ssh][port]} ssh2(: %{GREEDYDATA:[system][auth][ssh][signature]})?",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: %{DATA:[system][auth][ssh][event]} user %{DATA:[system][auth][user]} from %{IPORHOST:[system][auth][ssh][ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sshd(?:\[%{POSINT:[system][auth][pid]}\])?: Did not receive identification string from %{IPORHOST:[system][auth][ssh][dropped_ip]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} sudo(?:\[%{POSINT:[system][auth][pid]}\])?: \s*%{DATA:[system][auth][user]} :( %{DATA:[system][auth][sudo][error]} ;)? TTY=%{DATA:[system][auth][sudo][tty]} ; PWD=%{DATA:[system][auth][sudo][pwd]} ; USER=%{DATA:[system][auth][sudo][user]} ; COMMAND=%{GREEDYDATA:[system][auth][sudo][command]}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} groupadd(?:\[%{POSINT:[system][auth][pid]}\])?: new group: name=%{DATA:system.auth.groupadd.name}, GID=%{NUMBER:system.auth.groupadd.gid}",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} useradd(?:\[%{POSINT:[system][auth][pid]}\])?: new user: name=%{DATA:[system][auth][useradd][name]}, UID=%{NUMBER:[system][auth][useradd][uid]}, GID=%{NUMBER:[system][auth][useradd][gid]}, home=%{DATA:[system][auth][useradd][home]}, shell=%{DATA:[system][auth][useradd][shell]}$",
"%{SYSLOGTIMESTAMP:[system][auth][timestamp]} %{SYSLOGHOST:[system][auth][hostname]} %{DATA:[system][auth][program]}(?:\[%{POSINT:[system][auth][pid]}\])?: %{GREEDYMULTILINE:[system][auth][message]}"] }
pattern_definitions => {
"GREEDYMULTILINE"=> "(.|\n)*"
}
remove_field => "message"
}
date {
match => [ "[system][auth][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
geoip {
source => "[system][auth][ssh][ip]"
target => "[system][auth][ssh][geoip]"
}
}
else if [fileset][name] == "syslog" {
grok {
match => { "message" => ["%{SYSLOGTIMESTAMP:[system][syslog][timestamp]} %{SYSLOGHOST:[system][syslog][hostname]} %{DATA:[system][syslog][program]}(?:\[%{POSINT:[system][syslog][pid]}\])?: %{GREEDYMULTILINE:[system][syslog][message]}"] }
pattern_definitions => { "GREEDYMULTILINE" => "(.|\n)*" }
remove_field => "message"
}
date {
match => [ "[system][syslog][timestamp]", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
if [fileset][module] == "nginx" {
if [fileset][name] == "access" {
grok {
match => { "message" => ["%{IPORHOST:[nginx][access][remote_ip]} - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
remove_field => "message"
}
mutate {
add_field => { "read_timestamp" => "%{@timestamp}" }
}
date {
match => [ "[nginx][access][time]", "dd/MMM/YYYY:H:m:s Z" ]
remove_field => "[nginx][access][time]"
}
useragent {
source => "[nginx][access][agent]"
target => "[nginx][access][user_agent]"
remove_field => "[nginx][access][agent]"
}
geoip {
source => "[nginx][access][remote_ip]"
target => "[nginx][access][geoip]"
}
}
else if [fileset][name] == "error" {
grok {
match => { "message" => ["%{DATA:[nginx][error][time]} \[%{DATA:[nginx][error][level]}\] %{NUMBER:[nginx][error][pid]}#%{NUMBER:[nginx][error][tid]}: (\*%{NUMBER:[nginx][error][connection_id]} )?%{GREEDYDATA:[nginx][error][message]}"] }
remove_field => "message"
}
mutate {
rename => { "@timestamp" => "read_timestamp" }
}
date {
match => [ "[nginx][error][time]", "YYYY/MM/dd H:m:s" ]
remove_field => "[nginx][error][time]"
}
}
}
}
output {
if [fields][log_source] == "nginx" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-access-%{+YYYY.MM.dd}"
}
}
if [fields][log_source] == "nginx-error" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-nginxerror-%{+YYYY.MM.dd}"
}
}
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-systemlog-%{+YYYY.MM.dd}"
}
}
else if [fileset][name] == "syslog" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-systemlog-%{+YYYY.MM.dd}"
}
}
}
if [fileset][module] == "nginx" {
if [fileset][name] == "access" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-moduleaccess-%{+YYYY.MM.dd}"
}
}
else if [fileset][name] == "error" {
elasticsearch {
hosts => ["10.0.0.160:9200"]
manage_template => false
index => "logstash-moduleerror-%{+YYYY.MM.dd}"
}
}
}
}
5、日志输出:
filebeat 日志输出
logstash:日志输出:
elasticsearch日志输出:
filebeat debug日志输出:
2 个回复
yoling1985
赞同来自:
yoling1985
赞同来自:
}
if [fileset][module] == "system" {
if [fileset][name] == "auth" {
}
}
修改为去除if [fileset][module] == "system" {
}
直接改成
if [fileset][name] == "auth" {
}
修改后,logstash数据写入elasticsearch正常,