好的想法是十分钱一打,真正无价的是能够实现这些想法的人。

logstash 从MySQL导入es,有的导入成功,有的不成功,报异常 undefined method `ti me' for nil:NilClass

Logstash | 作者 super9du | 发布于2019年12月28日 | 阅读数:3414

环境:logstash7.3, es7.3, centos7, mysql 5.7
 
背景:
单独导入过一次一张MySQL表到es,后来删除了,使用一个 conf 统一导入多张表到ES,但是有的表可以导入,有的表不能导入。曾经怀疑过是 es 自动转换时间格式的字段造成的,配置了相关 index 的 mappings 的 date_detected 为 false。测试后仍然不行。
另外使用单独的配置导入出问题的每个表,不存在报错和不能导入的情况。
补充:导入不成功的表都含有一个varchar类型的times字段(有的字段中间使用逗号分隔)。表都是前辈们设计的,我只负责导入导出……
 
有问题的配置文件部分大致如下所示,略长
input {
stdin {
}
jdbc {
type => "proj__address"

jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000

record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj__address.txt"

statement => "select * from proj__address where id > :sql_last_value"
schedule => "* * * * *"
}
jdbc {
type => "proj_l_address2"

jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000

record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj_l_address2.txt"

statement => "select * from proj_l_address2 where id > :sql_last_value"
schedule => "* * * * *"
}
jdbc {
type => "proj_maddress"

jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000

record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj_maddress.txt"

statement => "select * from proj_malicious_address where id > :sql_last_value"
schedule => "* * * * *"
}
jdbc {
type => "proj_ses_address"

jdbc_connection_string => "jdbc:mysql://localhost:3306/cntd?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "111"
jdbc_driver_library => "/home/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 1000

record_last_run => true
use_column_value => true
tracking_column => "id"
tracking_column_type => "numeric"
last_run_metadata_path => "/home/logstash-7.3.1/data/proj_ses_address.txt"

statement => "select * from proj_ses_address where id > :sql_last_value"
schedule => "* * * * *"
}
}

filter {
if [type] in ["proj_", "aaa"] {
date {
match => [ "add_date", "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd"]
target => "@timestamp"
}
ruby {
code => "event.set('add_date', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('modify_date', event.get('modify_date').time.localtime + 8*60*60)"
}
mutate {
remove_field => ["message","@version","@timestamp"]
}
}
if [type] in ["proj__address", "proj_l_address2", "proj_maddress", "proj_ses_address"] {
ruby {
code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('modify_date', event.get('modify_date').time.localtime + 8*60*60)"
}
mutate {
remove_field => ["message","@version","@timestamp"]
}
}
}

output {
if [type] == "proj__address" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj__address"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
if [type] == "proj_l_address2" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj_l_address2"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
if [type] == "proj_maddress" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj_maddress"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
if [type] == "proj_ses_address" {
elasticsearch {
action => "index"
hosts => ["localhost:9201","localhost:9202"]
index => "proj_ses_address"
document_id => "%{id}"
template_overwrite => true
manage_template => false
}
stdout {
codec => json_lines
}
}
}

已邀请:

tacsklet - 公司有用到es

赞同来自: everything

这个错误是ruby部分报的错误。看着像是有些数据没有获取到time字段,检查一下modify_date和create_date是否都存在并且是time类型,或者你说的单独导出的配置是什么样的?

ziyou - 一个学习ELK的Java程序员

赞同来自:

你贴一下报错信息吧,这里看不出来你的报错

super9du

赞同来自:

谢谢各位,找到问题了!是MySQL数据库中存在type字段,与配置中的type字段冲突了。造成conf配置中的type不生效,使用if[type]时默认去查找当前数据库的type字段了。
 
解决方案:
最容易想到的是使用 sql 为 type 字段起个别名。但是这样需要一个一个列出其他数据库字段,而我有多张表,这样做工作量相当大。
在 jdbc input 的官方介绍 里,有这么几个通用字段 id, type, add_field ,因为 type 字段数据库已经有了,id表里也有。所以想到使用 `add_field` 添加字段,经实验有效。add_field 是 hash 类型,所以添加了如下代码:
add_field => { xxx_type => "xxx" }

全部配置如下:
input {
jdbc {
add_field => { xxx_type => "xxx" }
jdbc_connection_string => "jdbc:mysql://localhost:xxx/xxx?characterEncoding=UTF-8&useSSL=false&serverTimezone=CTT"
jdbc_user => "root"
jdbc_password => "xxx"
jdbc_driver_library => "/home/insatalls/logstash-7.3.1/mysql/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => true
jdbc_page_size => 4000
jdbc_default_timezone => "Asia/Shanghai"

record_last_run => true
use_column_value => false
tracking_column => "modify_date"
tracking_column_type => "timestamp"
last_run_metadata_path => "/home/logstash-7.3.1/data/xxx"

statement => "
select * from xxx
           where modify_date >= :sql_last_value
           ORDER BY `modify_date` 
           limit 20000"
schedule => "* * * * *"
}
}

filter {
mutate {
remove_field => ["message","@version","@timestamp"]
}
}

output {
if [xxx_type] == "xxx" {
elasticsearch {
hosts => ["localhost:xxx","localhost:xxx"]
index => "xxx"
document_id => "%{id}"
}
}
}

要回复问题请先登录注册