Hello,World

Logstach 将Mysql表导入 kafka,mysql表是 YYYYMMdd格式, 且每年增加一个表, 如何在配置文件中指定表名,logstash 自动导入?

Logstash | 作者 clean | 发布于2018年04月10日 | 阅读数:4545

现在 logstash 配置文件如下: 
 
 
 
# 同步mysql数据到kafka
input {
    stdin {
    }
    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
      # 用户名和密码
      jdbc_user => "root"
      jdbc_password => "root"
      # 驱动
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #使用其它字段追踪,而不是用时间
      use_column_value => true
      ##追踪的字段
      tracking_column => time 
      record_last_run => true
      ##上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/opt/logstash6/inputtxt/x.txt" 

      # 执行的sql 文件路径+名称
      statement_filepath => "/opt/logstash6/inputtxt/test.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
      # 索引类型
      type => "jdbc"
    codec=>json
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://10.1.11.120:3306/test"
      jdbc_user => "root"
      jdbc_password => "root" 
      jdbc_driver_library => "/opt/logstash6/mysql/mysql-connector-java-5.1.30-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      statement => "SELECT * from b"
      schedule => "* * * * *"
      type => "jdbc_demo"
     codec=>json
    }
}


#filter {
#    json {
#        source => "message"
#        remove_field => ["message"]
#    }
#}




output{
    if [type] == 'jdbc_demo' {

        kafka{
    
                #  codec => plain {
                #    format => "%{message}"
                #}
            bootstrap_servers => "10.1.11.120:9092"
            topic_id => "jdbc_demo_topic"
                        #compression_type=>"gzip"
            codec => json { charset => "UTF-8" }
            
    }

         stdout {
                codec => rubydebug
            }
   }

       if [type] == 'jdbc' {
                kafka{

            
                 # codec => plain {
                 #   format => "%{message}"
                #}
                        bootstrap_servers => "10.1.11.120:9092"
                        topic_id => "jdbc_topic"
                        #compression_type=>"gzip"
            codec => json { charset => "UTF-8" }

        }

     stdout {
                codec => rubydebug
            }
   }
}
 
 
 
 
 
sql 文件如下, 直接指定了表名, 现在需要动态的 修改表名, 形如 : YYYYMMdd , 如 20171111:
 
select * from a where a.time > :sql_last_value
 
现在需要将 表名a, 替换成动态的时间作为表名, 形式为  YYYYMMdd 格式
 
已邀请:

clean

赞同来自:

已经解决: 
 
 
select * from "${test_table}" as a  where a.time > :sql_last_value

 
 
直接将 sql语句写入 logstash 配置文件中,然后在linux 下执行
 
export test_table=a

codxiao - 90后

赞同来自:

楼主你好,我们有个项目时分析用户上传的json文档。用户上传后会新建相应的MySQL表。现在想如果新建表就自动同步到es中,这个需求有想法吗

要回复问题请先登录注册