Logstach 将Mysql表导入 kafka,mysql表是 YYYYMMdd格式, 且每年增加一个表, 如何在配置文件中指定表名,logstash 自动导入?
Logstash | 作者 clean | 发布于2018年04月10日 | 阅读数:4545
现在 logstash 配置文件如下:
# 同步mysql数据到kafka
input {
stdin {
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => time
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/x.txt"
# 执行的sql 文件路径+名称
statement_filepath => "/opt/logstash6/inputtxt/test.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "jdbc"
codec=>json
}
jdbc {
jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "SELECT * from b"
schedule => "* * * * *"
type => "jdbc_demo"
codec=>json
}
}
#filter {
# json {
# source => "message"
# remove_field => ["message"]
# }
#}
output{
if [type] == 'jdbc_demo' {
kafka{
# codec => plain {
# format => "%{message}"
#}
bootstrap_servers => "10.1.11.120:9092"
topic_id => "jdbc_demo_topic"
#compression_type=>"gzip"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
if [type] == 'jdbc' {
kafka{
# codec => plain {
# format => "%{message}"
#}
bootstrap_servers => "10.1.11.120:9092"
topic_id => "jdbc_topic"
#compression_type=>"gzip"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
}
sql 文件如下, 直接指定了表名, 现在需要动态的 修改表名, 形如 : YYYYMMdd , 如 20171111:
select * from a where a.time > :sql_last_value
现在需要将 表名a, 替换成动态的时间作为表名, 形式为 YYYYMMdd 格式
# 同步mysql数据到kafka
input {
stdin {
}
jdbc {
# mysql 数据库链接,test为数据库名
jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
# 用户名和密码
jdbc_user => "root"
jdbc_password => "root"
# 驱动
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
# 驱动类名
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
#使用其它字段追踪,而不是用时间
use_column_value => true
##追踪的字段
tracking_column => time
record_last_run => true
##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
last_run_metadata_path => "/opt/logstash6/inputtxt/x.txt"
# 执行的sql 文件路径+名称
statement_filepath => "/opt/logstash6/inputtxt/test.sql"
# 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 索引类型
type => "jdbc"
codec=>json
}
jdbc {
jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement => "SELECT * from b"
schedule => "* * * * *"
type => "jdbc_demo"
codec=>json
}
}
#filter {
# json {
# source => "message"
# remove_field => ["message"]
# }
#}
output{
if [type] == 'jdbc_demo' {
kafka{
# codec => plain {
# format => "%{message}"
#}
bootstrap_servers => "10.1.11.120:9092"
topic_id => "jdbc_demo_topic"
#compression_type=>"gzip"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
if [type] == 'jdbc' {
kafka{
# codec => plain {
# format => "%{message}"
#}
bootstrap_servers => "10.1.11.120:9092"
topic_id => "jdbc_topic"
#compression_type=>"gzip"
codec => json { charset => "UTF-8" }
}
stdout {
codec => rubydebug
}
}
}
sql 文件如下, 直接指定了表名, 现在需要动态的 修改表名, 形如 : YYYYMMdd , 如 20171111:
select * from a where a.time > :sql_last_value
现在需要将 表名a, 替换成动态的时间作为表名, 形式为 YYYYMMdd 格式
2 个回复
clean
赞同来自:
select * from "${test_table}" as a where a.time > :sql_last_value
直接将 sql语句写入 logstash 配置文件中,然后在linux 下执行
export test_table=a
codxiao - 90后
赞同来自: