ELK,萌萌哒

Logstash.filters.json 警告 org.jruby.RubyArray cannot be cast to org.jruby.RubyIO

Logstash | 作者 Ocean | 发布于2018年11月26日 | 阅读数:5180

elasticsearch version : 6.4.3
logstash version : 6.4.3
 
目的:将mysql中的关系型数据通过logstash导入到elasticsearch
 
遇到问题:
mysql中有两张表:origin_resume(简历表)和origin_user_education(教育经历表)
sql如下:
 
SELECT origin.main_id rid,origin.resume_id resumeid, oue.main_id eid,oue.origin_resume_id orid,oue.school_name schoolname 
FROM origin_resume origin
JOIN origin_user_education oue on oue.origin_resume_id = origin.main_id


查询结果如下:
rid resumeid eid orid schoolname
53 3649931 60 53 xxxUniversity
53 3649931 61 53 yyyUniversity

但是当导入完成后,es中的数据却是有问题的:
{
"_index": "resumeindex",
"_type": "_doc",
"_id": "53",
"_version": 406,
"_score": 1,
"_source": {
"@timestamp": "2018-11-23T02:50:00.356Z",
"resumeid": 3649931,
"rid": 53,
"tags": [
"_jsonparsefailure"
],
"education": [
{
"eid": 61,
"orid": 53,
"schoolname": "yyyUniversity"
}
],
"@version": "1"
},
"fields": {
"@timestamp": [
"2018-11-23T02:50:00.356Z"
]
}
}
可以看到,education中没有“xxxUniversity”的数据。
并且控制台输出如下的警告:
[2018-11-22T11:35:01,980][WARN ][logstash.filters.json ] 
Error parsing json
{:source=>"education", :raw=>[{"orid"=>53, "schoolname"=>"xxxUniversity", "eid"=>60}],
:exception=>java.lang.ClassCastException:
org.jruby.RubyArray cannot be cast to org.jruby.RubyIO}

logstash conf :
input {
jdbc {
jdbc_driver_library => "/Users/pangyang/Documents/study/elasticSearch/mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/resume"
jdbc_user => "xxxx"
jdbc_password => "xxxxxx"
schedule => "*/5 * * * *"
use_column_value => true
tracking_column_type => "numeric"
tracking_column => rid
record_last_run => true
codec => json { charset => "UTF-8"}
jdbc_default_timezone => "Asia/Shanghai"
statement => "SELECT origin.main_id rid,origin.resume_id resumeid,
oue.main_id eid,oue.origin_resume_id orid,oue.school_name schoolname
FROM origin_resume origin
JOIN origin_user_education oue on oue.origin_resume_id = origin.main_id"
}
}
filter{
aggregate{
task_id => "%{rid}"
code => "
map['rid']=event.get('rid')
map['resumeid']=event.get('resumeid')
map['education'] ||=
map['education'] << {'eid'=>event.get('eid'),
'orid'=>event.get('orid'),'schoolname' =>event.get('schoolname')}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
json {
source => "education"
target => "education"
}
}
output {
elasticsearch {
index => "resumeindex"
document_type => "_doc"
document_id => "%{rid}"
hosts => "http://localhost:9200&quot;
}
stdout {
codec => json { charset => "UTF-8"}
}
}
elasticsearch mapping :
{
"mappings": {
"_doc": {
"properties": {
"rid": { "type": "long" },
"resumeid": { "type": "long" },
"education": {
"type":"nested",
"properties": {
"eid": { "type": "long" },
"orid": {"type": "long"},
"schoolname": { "type": "text" }
}
}
}
}
}
}
有没有大神帮忙看下,是什么原因导致的。困扰了好几天了
已邀请:

要回复问题请先登录注册