找到问题的解决办法了么?

各位做过 将 mysql表导入到 kafka 中吗, 然后从kafka 中导入到es 中, 有什么方案吗?

Elasticsearch | 作者 clean | 发布于2018年04月02日 | 阅读数:5928

    
    各位做过 将 mysql表导入到 kafka 中吗, 然后从kafka 中导入到es 中, 有什么方案吗?
 
mysql数据库做层层防护, 并且与 es 不在一个网段内, 需要通过kafka 进行中转
已邀请:

strglee

赞同来自: clean

用binlog或者用logstsh的logstash-input-jdbc写入到kafka

anthony2018 - 90后 程序猿

赞同来自: clean

logstash从kafka里拉数据存到es 很简单 搜一下就有了

laoyang360 - 《一本书讲透Elasticsearch》作者,Elastic认证工程师 [死磕Elasitcsearch]知识星球地址:http://t.cn/RmwM3N9;微信公众号:铭毅天下; 博客:https://elastic.blog.csdn.net

赞同来自: clean

kafka到Es的除了借助logstash_input_kafka之外,还可以使用kafka_connector 实现。具体参考:https://blog.csdn.net/laoyang3 ... 68806

clean

赞同来自:

# 同步mysql数据到kafka, logstash 配置文件
input {
    stdin {
    }
    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"
      # 驱动
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #使用其它字段追踪,而不是用时间
      use_column_value => true
      ##追踪的字段
      tracking_column => time
      record_last_run => true
      ##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/opt/logstash6/inputtxt/x.txt"

          # 执行的sql 文件路径+名称
      statement_filepath => "/opt/logstash6/inputtxt/test.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "* * * * *"
      # 索引类型
          type => "jdbc"
        codec=>json
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement => "SELECT * from b"
      schedule => "* * * * *"
      type => "jdbc_demo"
         codec=>json
    }
}


#filter {
#    json {
#        source => "message"
#        remove_field => ["message"]
#    }
#}




output{
        if [type] == 'jdbc_demo' {

                kafka{

                #  codec => plain {
                #    format => "%{message}"
                #}
                        bootstrap_servers => "10.1.11.120:9092"
                        topic_id => "jdbc_demo_topic"
                        #compression_type=>"gzip"
                        codec => json { charset => "UTF-8" }

        }

                 stdout {
                        codec => rubydebug
                }
   }

       if [type] == 'jdbc' {
                kafka{


                 # codec => plain {
                 #   format => "%{message}"
                #}
                        bootstrap_servers => "10.1.11.120:9092"
                        topic_id => "jdbc_topic"
                        #compression_type=>"gzip"
                        codec => json { charset => "UTF-8" }

        }

         stdout {
                        codec => rubydebug
                }
   }
}
       

要回复问题请先登录注册