请各位老师帮我看看这是什么问题
[2019-09-23T10:32:00,375][INFO ][logstash.inputs.jdbc ] (0.008332s) SELECT * FROM (select id,filename,url from test where update_date>'2019-09-23 02:30:00') AS `t1` LIMIT 50000 OFFSET 0
{"@version":"1","filename":"1/txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"1","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
{"@version":"1","filename":"3.txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"3","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
{"@version":"1","filename":"2.txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"2","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
{"@version":"1","filename":"3.txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"4","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
[2019-09-23T10:32:14,502][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 500 ({"type"=>"exception", "reason"=>"java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [data] not present as part of path [data]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"java.lang.IllegalArgumentException: field [data] not present as part of path [data]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field [data] not present as part of path [data]"}}, "header"=>{"processor_type"=>"remove"}})
[2019-09-23T10:32:14,503][INFO ][logstash.outputs.elasticsearch] Retrying individual bulk actions that failed or were rejected by the previous bulk request. {:count=>1}
jing
从mysql 读取数据库数据,在logstash 的filter 里根据数据的路径url ,读取文件内容转成base64,输出到elasticsearch:
input {
stdin {
}
jdbc {
# 数据库
jdbc_connection_string => "jdbc:mysql://139.199.31.xxx:3306/test"
# 用户名密码
jdbc_user => "xxxx"
jdbc_password => "xxxx"
# jar包的位置
jdbc_driver_library => "/opt/logstash/logstash-6.2.4/piugins-conf/mysql/mysql-connector-java-5.1.40.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#statement_filepath => "config-mysql/test02.sql"
use_column_value => false
tracking_column => "update_date"
last_run_metadata_path => "/opt/logstash/logstash-6.2.4/piugins-conf/mysql/last.txt"
statement => "select id,filename,,url from test where update_date>:sql_last_value"
schedule => "*/2 * * * *"
#索引的类型
type => "doc"
}
}
filter {
java_filter_encode{
source => "message"
}
}
output {
elasticsearch {
hosts => "10.10.20.7:9200"
# index名
index => "test"
pipeline => "attachment"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
附件是异常信息和filter的截图
[2019-09-23T10:32:00,375][INFO ][logstash.inputs.jdbc ] (0.008332s) SELECT * FROM (select id,filename,url from test where update_date>'2019-09-23 02:30:00') AS `t1` LIMIT 50000 OFFSET 0
{"@version":"1","filename":"1/txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"1","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
{"@version":"1","filename":"3.txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"3","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
{"@version":"1","filename":"2.txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"2","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
{"@version":"1","filename":"3.txt","data":"d2VsY29tZSB0byBiZWlqaW5nCg==","type":"doc","id":"4","@timestamp":"2019-09-23T02:32:00.376Z","url":"/opt/test.txt"}
[2019-09-23T10:32:14,502][INFO ][logstash.outputs.elasticsearch] retrying failed action with response code: 500 ({"type"=>"exception", "reason"=>"java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: field [data] not present as part of path [data]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"java.lang.IllegalArgumentException: field [data] not present as part of path [data]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"field [data] not present as part of path [data]"}}, "header"=>{"processor_type"=>"remove"}})
[2019-09-23T10:32:14,503][INFO ][logstash.outputs.elasticsearch] Retrying individual bulk actions that failed or were rejected by the previous bulk request. {:count=>1}
jing
从mysql 读取数据库数据,在logstash 的filter 里根据数据的路径url ,读取文件内容转成base64,输出到elasticsearch:
input {
stdin {
}
jdbc {
# 数据库
jdbc_connection_string => "jdbc:mysql://139.199.31.xxx:3306/test"
# 用户名密码
jdbc_user => "xxxx"
jdbc_password => "xxxx"
# jar包的位置
jdbc_driver_library => "/opt/logstash/logstash-6.2.4/piugins-conf/mysql/mysql-connector-java-5.1.40.jar"
# mysql的Driver
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#statement_filepath => "config-mysql/test02.sql"
use_column_value => false
tracking_column => "update_date"
last_run_metadata_path => "/opt/logstash/logstash-6.2.4/piugins-conf/mysql/last.txt"
statement => "select id,filename,,url from test where update_date>:sql_last_value"
schedule => "*/2 * * * *"
#索引的类型
type => "doc"
}
}
filter {
java_filter_encode{
source => "message"
}
}
output {
elasticsearch {
hosts => "10.10.20.7:9200"
# index名
index => "test"
pipeline => "attachment"
# 需要关联的数据库中有有一个id字段,对应索引的id号
document_id => "%{id}"
}
stdout {
codec => json_lines
}
}
附件是异常信息和filter的截图
2 个回复
anran
赞同来自:
doom
赞同来自:
"date": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis" }