elastic6.5.1(datatype join 实现关联查询)

Elasticsearch | 作者 json_111 | 发布于2019年01月09日 | 阅读数:115

如题所示,我想通过es实现类mysql中的关联关系,目的通过关联关系实现多表数据整合
例如我有两个表user(用户表)和active_user表
user表包含id, reg_time, tg_id三个字段
active_user表包含id(自增),uid(用户ID), add_time 三个字段
两个表的关系如下:active_user表中的uid对应user表中的id
我想在es中实现这样的查询结果
id(user表)  uid  reg_time tg_id add_time
1 1 2018-11-11 09:10:32 asdf  2018-11-11 09:11:11
因为上面两个表实际环境中数据量比较大,如果在导入前通过sql实现宽表化,对数据库的压力比较大,请问各位大神、专家有没有比较好的方案,我是刚接触ELK的小白,还望各位指教
 
我看了网上的一些教程,利用es6+ 的新类型 join ,建立index 通过relations指定父子文档的关系, 但是查询时并不能得到关联结果
1.索引的mapping 如下
PUT testjoin_new
{
  "mappings": {
    "doc": {
      "dynamic": "strict",
      "properties": {
        "type": {
          "type": "text"
        },
         "id": {
          "type": "integer"
        },
         "tg_id": {
          "type": "keyword"
        },
        "reg_time": {
          "type": "date"
        },
        "uid": {
          "type": "integer"
        },
        "add_time": {
          "type": "date"
        },
        "user_list": {
          "type": "join",
          "relations": {
            "axh_user": "axh_active_user"
          }
        }
      }
    }
  }
}
 
2. logstash的配置文件如下
input {
    jdbc {
      type => "axh_active_user"
      jdbc_connection_string => "jdbc:mysql://xxxx:xxxx/bigdata" 
      jdbc_user => "xxxx"    
      jdbc_password => "xxxx"
      jdbc_driver_library => "/home/elk1/software/logstash-6.5.3/logstash-core/lib/jars/mysql-connector-java-8.0.13.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #last_run_metadata_path => "/app/logstash/logstash_jdbc_last_run"
      #use_column_value => true
      #tracking_column => "ability_id"
      statement => "select uid ,add_time from axh_active_user limit 5000"
      schedule => "*/3 * * * *"
    }
 
    jdbc {
      type => "axh_user"
      jdbc_connection_string => "jdbc:mysql://xxxx:xxxx/bigdata" 
      jdbc_user => "xxxx"    
      jdbc_password => "xxxx"
      jdbc_driver_library =>  "/home/elk1/software/logstash-6.5.3/logstash-core/lib/jars/mysql-connector-java-8.0.13.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      #last_run_metadata_path => "/app/logstash/logstash_jdbc_last_run_02"
      #use_column_value => true
      #tracking_column => "product_id"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement => "select id,reg_time,tg_id from axh_user limit 5000"
      schedule => "*/2 * * * *"
    }
}
filter {
        if[type] == "axh_user"{
                  alter {
                    add_field  => {
                          "[user_list][name]"=> "axh_user"
                    }
                  }
        }
        if[type] == "axh_active_user"{
                  alter {
                    add_field  => {
                         "[user_list][name]" => "axh_active_user"
                         "[user_list][parent]" =>  "%{id}"
                    }
                  }
        }
         mutate {
            remove_field => [ "@timestamp","@version"]
          }
}
output{ 
        if [type] == "axh_user" { 
                elasticsearch { 
                        hosts => "xxxx:9200" 
                        index => "testjoin" 
                        document_id => "axh_user_%{id}"
                } 
        }
        if [type]== "axh_active_user"  { 
                        elasticsearch { 
                                hosts => "xxxx:9200" 
                                index => "testjoin" 
                                document_id => "axh_active_user_%{uid}"
                                routing => "axh_user_%{id}"
                                  } 
                                } 
     } 
 
上面的mapping 或 配置文件可能存在问题,请了解这个的大神们给指导指导,非常感谢!
 
已邀请:

laoyang360 - [死磕Elasitcsearch]知识星球地址:http://t.cn/RmwM3N9;微信公众号:铭毅天下; 博客:blog.csdn.net/laoyang360

赞同来自: pengallengao

要回复问题请先登录注册