logstash 从MySQL导入es,有的导入成功,有的不成功,报异常 undefined method `ti me' for nil:NilClass
Logstash | 作者 super9du | 发布于2019年12月28日 | 阅读数:3718
环境:logstash7.3, es7.3, centos7, mysql 5.7
背景:
单独导入过一次一张MySQL表到es,后来删除了,使用一个 conf 统一导入多张表到ES,但是有的表可以导入,有的表不能导入。曾经怀疑过是 es 自动转换时间格式的字段造成的,配置了相关 index 的 mappings 的 date_detected 为 false。测试后仍然不行。
另外使用单独的配置导入出问题的每个表,不存在报错和不能导入的情况。
补充:导入不成功的表都含有一个varchar类型的times字段(有的字段中间使用逗号分隔)。表都是前辈们设计的,我只负责导入导出……
有问题的配置文件部分大致如下所示,略长
背景:
单独导入过一次一张MySQL表到es,后来删除了,使用一个 conf 统一导入多张表到ES,但是有的表可以导入,有的表不能导入。曾经怀疑过是 es 自动转换时间格式的字段造成的,配置了相关 index 的 mappings 的 date_detected 为 false。测试后仍然不行。
另外使用单独的配置导入出问题的每个表,不存在报错和不能导入的情况。
补充:导入不成功的表都含有一个varchar类型的times字段(有的字段中间使用逗号分隔)。表都是前辈们设计的,我只负责导入导出……
有问题的配置文件部分大致如下所示,略长
input {
stdin {
}
jdbc {
type => "proj__address"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000
record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj__address.txt"
statement => "select * from proj__address where id > :sql_last_value"
schedule => "* * * * *"
}
jdbc {
type => "proj_l_address2"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000
record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj_l_address2.txt"
statement => "select * from proj_l_address2 where id > :sql_last_value"
schedule => "* * * * *"
}
jdbc {
type => "proj_maddress"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000
record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj_maddress.txt"
statement => "select * from proj_malicious_address where id > :sql_last_value"
schedule => "* * * * *"
}
jdbc {
type => "proj_ses_address"
jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000
record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj_ses_address.txt"
statement => "select * from proj_ses_address where id > :sql_last_value"
schedule => "* * * * *"
}
}
filter {
if [type] in ["proj_", "aaa"] {
date {
match => [ "add_date", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd"]
target => "@timestamp"
}
ruby {
code => "event.set('add_date', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('modify_date', event.get('modify_date').time.localtime + 8*60*60)"
}
mutate {
remove_field => ["message","@version","@timestamp"]
}
}
if [type] in ["proj__address", "proj_l_address2", "proj_maddress", "proj_ses_address"] {
ruby {
code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('modify_date', event.get('modify_date').time.localtime + 8*60*60)"
}
mutate {
remove_field => ["message","@version","@timestamp"]
}
}
}
output {
if [type] == "proj__address" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj__address"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
if [type] == "proj_l_address2" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj_l_address2"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
if [type] == "proj_maddress" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj_maddress"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
if [type] == "proj_ses_address" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj_ses_address"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
}
3 个回复
tacsklet - 公司有用到es
赞同来自: everything
ziyou - 一个学习ELK的Java程序员
赞同来自:
super9du
赞同来自:
解决方案:
最容易想到的是使用 sql 为 type 字段起个别名。但是这样需要一个一个列出其他数据库字段,而我有多张表,这样做工作量相当大。
在 jdbc input 的官方介绍 里,有这么几个通用字段 id, type, add_field ,因为 type 字段数据库已经有了,id表里也有。所以想到使用 `add_field` 添加字段,经实验有效。add_field 是 hash 类型,所以添加了如下代码:
全部配置如下: