三人行必有我师

logstash将mysql 数据写入 kafka 中, 然后通过Python脚本, 消费kafka 写入到es中, 现在进入 es 中的数据条数, 比 mysql中的多, 谁遇到过?

Logstash | 作者 clean | 发布于2018年04月11日 | 阅读数:4187

 logstash将mysql 数据写入 kafka 中, 然后通过Python脚本, 消费kafka 写入到es中, 现在进入 es 中的数据条数, 比 mysql中的多, 谁遇到过? 
 
 
是kafka 重复消费吗?
 
数据将近多了一倍。 
 
还是logstash写的sql语句就会产生重复?
 
sql 语句如下: 
 
select * from t_car_drive_2016 a where a.createdtime > :sql_last_value
 
 
logstatsh 配置文件如下:
 
 
input {
    stdin {
    }

    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://zzzzzzzzz8:3306/vw"
      # 用户名和密码
      jdbc_user => "rdxxx"
      jdbc_password => "Ptim4xxx!Mxxxxxx"
      # 驱动
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "10000"

      #使用其它字段追踪,而不是用时间
      use_column_value => true
      ##追踪的字段
      tracking_column => createdtime
      record_last_run => true
      ##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/opt/logstash6/inputtxt/y.txt"

          # 执行的sql 文件路径+名称
      #statement_filepath => "/opt/logstash6/inputtxt/test.sql"
          statement => "select * from t_car_drive_2016 a where a.createdtime > :sql_last_value"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
          schedule => "* * * * *"
      # 索引类型
          type => "jdbc"
        codec=>json
    }
}


output{
        if [type] == 'jdbc' {

                kafka{
                        bootstrap_servers => "xxxxxxxxxxxxx:9092"
                        topic_id => "jdbc_demo_2016_topic"
                        #compression_type=>"gzip"
                        codec => json { charset => "UTF-8" }

        }

                 stdout {
                        codec => rubydebug
                }
   }
}
~
~
 
这种问题应该如何排查
已邀请:

strglee

赞同来自:

消费kafka写入到es时,es文档的_id你定义的是什么规则?

jlhde123

赞同来自:

用mysql的做为ES的ID会覆盖的,通过index动作,然后就不会多了

ESWorker

赞同来自:

主键一定要定义好,在ES里主键一般就是_id

要回复问题请先登录注册