logstash将mysql 数据写入 kafka 中, 然后通过Python脚本, 消费kafka 写入到es中, 现在进入 es 中的数据条数, 比 mysql中的多, 谁遇到过?
Logstash | 作者 clean | 发布于2018年04月11日 | 阅读数:4248
logstash将mysql 数据写入 kafka 中, 然后通过Python脚本, 消费kafka 写入到es中, 现在进入 es 中的数据条数, 比 mysql中的多, 谁遇到过?
是kafka 重复消费吗?
数据将近多了一倍。
还是logstash写的sql语句就会产生重复?
sql 语句如下:
select * from t_car_drive_2016 a where a.createdtime > :sql_last_value
logstatsh 配置文件如下:
input {
stdin {
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://zzzzzzzzz8:3306/vw"
# 用户名和密码
jdbc_user => "rdxxx"
jdbc_password => "Ptim4xxx!Mxxxxxx"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => createdtime
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/y.txt"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/logstash6/inputtxt/test.sql"
statement => "select * from t_car_drive_2016 a where a.createdtime > :sql_last_value"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "jdbc"
codec=>json
}
}
output{
if [type] == 'jdbc' {
kafka{
bootstrap_servers => "xxxxxxxxxxxxx:9092"
topic_id => "jdbc_demo_2016_topic"
#compression_type=>"gzip"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
}
~
~
这种问题应该如何排查
是kafka 重复消费吗?
数据将近多了一倍。
还是logstash写的sql语句就会产生重复?
sql 语句如下:
select * from t_car_drive_2016 a where a.createdtime > :sql_last_value
logstatsh 配置文件如下:
input {
stdin {
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://zzzzzzzzz8:3306/vw"
# 用户名和密码
jdbc_user => "rdxxx"
jdbc_password => "Ptim4xxx!Mxxxxxx"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "10000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => createdtime
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/y.txt"
# 执行的sql 文件路径+名称
#statement_filepath => "/opt/logstash6/inputtxt/test.sql"
statement => "select * from t_car_drive_2016 a where a.createdtime > :sql_last_value"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "jdbc"
codec=>json
}
}
output{
if [type] == 'jdbc' {
kafka{
bootstrap_servers => "xxxxxxxxxxxxx:9092"
topic_id => "jdbc_demo_2016_topic"
#compression_type=>"gzip"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
}
~
~
这种问题应该如何排查
3 个回复
strglee
赞同来自:
jlhde123
赞同来自:
ESWorker
赞同来自: