mysql

mysql

推荐一个同步Mysql数据到Elasticsearch的工具

ElasticsearchMCTW 发表了文章 • 7 个评论 • 250 次浏览 • 3 天前 • 来自相关话题

把Mysql的数据同步到Elasticsearch是个很常见的需求,但在Github里找到的同步工具用起来或多或少都有些别扭。 例如:某记录内容为"aaa|bbb|ccc",将其按|分割成数组同步到es,这样的简单任务都难以实现,再加上配置繁琐,文档语焉不详... 所以我写了个同步工具MysqlsMom:力求用最简单的配置完成复杂的同步任务。目前除了我所在的部门,也有越来越多的互联网公司在生产环境中使用该工具了。 欢迎各位大佬进行试用并提出意见,任何建议、鼓励、批评都受到欢迎。 github: https://github.com/m358807551/mysqlsmom MysqlsMom.jpeg

简介:同步mysql数据到elasticsearch的工具; QQ、微信:358807551

特点

  1. 纯Python编写;
  2. 有全量、增量更新两种模式;
  3. 全量更新只占用少量内存;支持通过sql语句同步数据;
  4. 增量更新自动断点续传;
  5. 取自mysql的数据可经过一系列自定义函数的处理后再同步至elasticsearch;
  6. 能用非常简单的配置完成复杂的同步任务;

环境

  • python2.7;
  • 如需增量同步,需要mysql开启binlog(binlog-format=row)且本地开启redis;

快速开始

全量同步MySql数据到es

  1. clone 项目到本地;

  2. 安装依赖;

    cd mysqlsmom
    pip install -r requirements.txt

    默认支持 elasticsearch-2.4版本,支持其它版本请运行(将5.4换成需要的elasticsearch版本)

    pip install --upgrade elasticsearch==5.4
  3. 编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "INIT"
    
    # 修改数据库连接
    CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 修改elasticsearch节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # 在此数据库执行sql语句
               "sql": "select * from person"  # 将该sql语句选中的数据同步到 elasticsearch
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",   # 设置 index
                           "type": "test",          # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  4. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_init.py

    等待同步完成即可;

增量同步MySql数据到es

  1. 确保要增量同步的MySql数据库开启binlog,且本地开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)

  2. 下载项目到本地,且安装好依赖后,编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "BINLOG"
    SERVER_ID = 99  # 确保每个用于binlog同步的配置文件的SERVER_ID不同;
    SLAVE_UUID = __name__
    
    # 配置开启binlog权限的MySql连接
    BINLOG_CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 配置es节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # [table]所在的数据库
               "table": "person"  # 监控该表的binlog
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"only_fields": {"fields": ["id", "name", "age"]}},  # 只同步这些字段到es,注释掉该行则同步全部字段的值到es
                       {"set_id": {"field": "id"}}  # 设置es中文档_id的值取自 id(或根据需要更改)字段
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",  # 设置 index
                           "type": "test",         # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  3. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_binlog.py

    该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;

    注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;

    同步旧数据请看全量同步MySql数据到es

组织架构

all.png

Pipeline

如果需要从Mysql获取数据再进行特殊处理再同步到elasticsearch,pipeline组件会派上用场。

无论数据来自于全量同步的Sql语句或是通过实时分析binlog。

例如:

  • 只同步某些字段到es

    "pipeline": [
    {"only_fields": {"fields": ["id", "name"]}}, # 只同步 id 和 name字段
      ...
    ]
  • 重命名字段

    "pipeline": [
    {"replace_fields": {"name": ["name1", "name2"]}}, # 将name重命名为name1和name2
      ...
    ]
  • 甚至可以执行跨库数据库查询

    "pipeline": [
    {
        "do_sql": {
            "database": "db2",
            "connection": CONNECTION2,
            "sql": "select company, personid from company_manager where personid = {id}"  # id 的值会自动替换
        }
    }
      ...
    ]

支持编写自定义函数,只需在 row_handlers.py 中加入,之后可在pipeline中配置调用。

row_handlers.py中预定义了一些数据处理函数,但可能需要自定义的情况更多。

常见问题

能否把数据同步到多个es索引?

目前增量同步支持,只需修改配置文件中的[dest]

"dest": [
        {
            "es": {
            "action": "upsert",
            "index": "index1",  # 同步到 es index1.type1
            "type": "type1",
            "nodes": NODES
            }
        },
        {
            "es": {
            "action": "upsert",
            "index": "index2",  # 同时同步到 es index1.type1
            "type": "type2",
            "nodes": NODES
            }
        }
 ]

全量同步很快会支持该功能;

为什么我的增量同步不及时?

  1. 连接本地数据库增量同步不及时

    该情况暂未收到过反馈,如能复现请联系作者。

  2. 连接线上数据库发现增量同步不及时

    2.1 推荐使用内网IP连接数据库。连接线上数据库(如开启在阿里、腾讯服务器上的Mysql)时,推荐使用内网IP地址,因为外网IP会受到带宽等限制导致获取binlog数据速度受限,最终可能造成同步延时。

待改进

  1. 据部分用户反馈,全量同步百万级以上的数据性能不佳。

未完待续

文档近期会大幅度更新完善,任何问题、建议都收到欢迎,请在issues留言,会在24小时内回复;或联系QQ、微信: 358807551;

logstash的input和output的statement => 有个需求怎么设置

Logstashheyddo 回复了问题 • 4 人关注 • 4 个回复 • 166 次浏览 • 2018-08-08 14:12 • 来自相关话题

请教各位大神,我的logstash提取数据后,是无限自循环的数据,怎么办呢?

Logstashlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 106 次浏览 • 2018-08-06 22:14 • 来自相关话题

我的logstash input jdbc 配置时间格式不一样怎么办

Logstashmedcl 回复了问题 • 3 人关注 • 2 个回复 • 106 次浏览 • 2018-08-06 10:58 • 来自相关话题

logstash-input-jdbc 和 logstash-output-jdbc 中间如何过滤改变一下时间字段数据格式

回复

默认分类ljx95315 发起了问题 • 1 人关注 • 0 个回复 • 86 次浏览 • 2018-08-05 01:25 • 来自相关话题

我的logstash配置文件input和output检索数据表字段和获取数据表字段如何配置??

回复

Logstash匿名用户 发起了问题 • 1 人关注 • 0 个回复 • 47 次浏览 • 2018-08-02 21:52 • 来自相关话题

请教一个logstash简单的获取数据的问题,获取的数据错误

Logstashmedcl 回复了问题 • 2 人关注 • 2 个回复 • 95 次浏览 • 2018-08-02 10:29 • 来自相关话题

应使用哪个beats同步mysql数据到es

回复

Beatsliudamu 发起了问题 • 2 人关注 • 0 个回复 • 183 次浏览 • 2018-07-10 18:04 • 来自相关话题

logstash导入mysql上亿级别数据的效率问题

Logstashhexiaohong 回复了问题 • 8 人关注 • 5 个回复 • 2949 次浏览 • 2018-05-16 16:08 • 来自相关话题

请求指导技术解决方向

Elasticsearchyayg2008 回复了问题 • 3 人关注 • 2 个回复 • 301 次浏览 • 2018-05-10 21:38 • 来自相关话题

mysql协议解析扩展

Beatsggg 发表了文章 • 5 个评论 • 262 次浏览 • 2018-05-08 15:40 • 来自相关话题

elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。
elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。

各位做过 将 mysql表导入到 kafka 中吗, 然后从kafka 中导入到es 中, 有什么方案吗?

Elasticsearchclean 回复了问题 • 4 人关注 • 4 个回复 • 589 次浏览 • 2018-04-04 10:10 • 来自相关话题

logstash5.1.2 filter插件,使用jdbc访问mysql,报错 Sequel::PoolTimeout

回复

Logstashjuneryang 发起了问题 • 1 人关注 • 0 个回复 • 315 次浏览 • 2018-03-15 12:02 • 来自相关话题

问下各位logstash如何全量同步mysql数据,每天晚上12点更新,每次更新前将之前的数据删掉

Logstashsmiling 回复了问题 • 3 人关注 • 4 个回复 • 1142 次浏览 • 2018-01-22 17:12 • 来自相关话题

logstash向es中导入mysql数据,tinyint字段导入过程中抛出mapper_parsing_exception类型异常

Logstashpiggyci 回复了问题 • 2 人关注 • 2 个回复 • 624 次浏览 • 2017-12-21 10:00 • 来自相关话题

条新动态, 点击查看
有几种方法,看着选吧:
1.在业务系统里做双写
 
2.用elasticsearch-jdbc之类的工具来做全量和增量同步
 
3.用阿里的canal来做数据库binlog->kafka->es的同步,需要开发,而且依赖比较多,小公司选择还是慎重些... 显示全部 »
有几种方法,看着选吧:
1.在业务系统里做双写
 
2.用elasticsearch-jdbc之类的工具来做全量和增量同步
 
3.用阿里的canal来做数据库binlog->kafka->es的同步,需要开发,而且依赖比较多,小公司选择还是慎重些吧

logstash的input和output的statement => 有个需求怎么设置

回复

Logstashheyddo 回复了问题 • 4 人关注 • 4 个回复 • 166 次浏览 • 2018-08-08 14:12 • 来自相关话题

请教各位大神,我的logstash提取数据后,是无限自循环的数据,怎么办呢?

回复

Logstashlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 106 次浏览 • 2018-08-06 22:14 • 来自相关话题

我的logstash input jdbc 配置时间格式不一样怎么办

回复

Logstashmedcl 回复了问题 • 3 人关注 • 2 个回复 • 106 次浏览 • 2018-08-06 10:58 • 来自相关话题

logstash-input-jdbc 和 logstash-output-jdbc 中间如何过滤改变一下时间字段数据格式

回复

默认分类ljx95315 发起了问题 • 1 人关注 • 0 个回复 • 86 次浏览 • 2018-08-05 01:25 • 来自相关话题

我的logstash配置文件input和output检索数据表字段和获取数据表字段如何配置??

回复

Logstash匿名用户 发起了问题 • 1 人关注 • 0 个回复 • 47 次浏览 • 2018-08-02 21:52 • 来自相关话题

请教一个logstash简单的获取数据的问题,获取的数据错误

回复

Logstashmedcl 回复了问题 • 2 人关注 • 2 个回复 • 95 次浏览 • 2018-08-02 10:29 • 来自相关话题

应使用哪个beats同步mysql数据到es

回复

Beatsliudamu 发起了问题 • 2 人关注 • 0 个回复 • 183 次浏览 • 2018-07-10 18:04 • 来自相关话题

logstash导入mysql上亿级别数据的效率问题

回复

Logstashhexiaohong 回复了问题 • 8 人关注 • 5 个回复 • 2949 次浏览 • 2018-05-16 16:08 • 来自相关话题

请求指导技术解决方向

回复

Elasticsearchyayg2008 回复了问题 • 3 人关注 • 2 个回复 • 301 次浏览 • 2018-05-10 21:38 • 来自相关话题

各位做过 将 mysql表导入到 kafka 中吗, 然后从kafka 中导入到es 中, 有什么方案吗?

回复

Elasticsearchclean 回复了问题 • 4 人关注 • 4 个回复 • 589 次浏览 • 2018-04-04 10:10 • 来自相关话题

logstash5.1.2 filter插件,使用jdbc访问mysql,报错 Sequel::PoolTimeout

回复

Logstashjuneryang 发起了问题 • 1 人关注 • 0 个回复 • 315 次浏览 • 2018-03-15 12:02 • 来自相关话题

问下各位logstash如何全量同步mysql数据,每天晚上12点更新,每次更新前将之前的数据删掉

回复

Logstashsmiling 回复了问题 • 3 人关注 • 4 个回复 • 1142 次浏览 • 2018-01-22 17:12 • 来自相关话题

logstash向es中导入mysql数据,tinyint字段导入过程中抛出mapper_parsing_exception类型异常

回复

Logstashpiggyci 回复了问题 • 2 人关注 • 2 个回复 • 624 次浏览 • 2017-12-21 10:00 • 来自相关话题

elasticsearch-jdbc导入数据时下面这个错误,请问是什么原因呢?

回复

Elasticsearchliuyueyue 回复了问题 • 3 人关注 • 2 个回复 • 3318 次浏览 • 2017-05-24 14:36 • 来自相关话题

Elasticsearch做站内搜索,索引与mysql数据表同步问题

回复

Elasticsearchyqcute 回复了问题 • 7 人关注 • 4 个回复 • 7270 次浏览 • 2016-08-24 23:53 • 来自相关话题

推荐一个同步Mysql数据到Elasticsearch的工具

ElasticsearchMCTW 发表了文章 • 7 个评论 • 250 次浏览 • 3 天前 • 来自相关话题

把Mysql的数据同步到Elasticsearch是个很常见的需求,但在Github里找到的同步工具用起来或多或少都有些别扭。 例如:某记录内容为"aaa|bbb|ccc",将其按|分割成数组同步到es,这样的简单任务都难以实现,再加上配置繁琐,文档语焉不详... 所以我写了个同步工具MysqlsMom:力求用最简单的配置完成复杂的同步任务。目前除了我所在的部门,也有越来越多的互联网公司在生产环境中使用该工具了。 欢迎各位大佬进行试用并提出意见,任何建议、鼓励、批评都受到欢迎。 github: https://github.com/m358807551/mysqlsmom MysqlsMom.jpeg

简介:同步mysql数据到elasticsearch的工具; QQ、微信:358807551

特点

  1. 纯Python编写;
  2. 有全量、增量更新两种模式;
  3. 全量更新只占用少量内存;支持通过sql语句同步数据;
  4. 增量更新自动断点续传;
  5. 取自mysql的数据可经过一系列自定义函数的处理后再同步至elasticsearch;
  6. 能用非常简单的配置完成复杂的同步任务;

环境

  • python2.7;
  • 如需增量同步,需要mysql开启binlog(binlog-format=row)且本地开启redis;

快速开始

全量同步MySql数据到es

  1. clone 项目到本地;

  2. 安装依赖;

    cd mysqlsmom
    pip install -r requirements.txt

    默认支持 elasticsearch-2.4版本,支持其它版本请运行(将5.4换成需要的elasticsearch版本)

    pip install --upgrade elasticsearch==5.4
  3. 编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "INIT"
    
    # 修改数据库连接
    CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 修改elasticsearch节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # 在此数据库执行sql语句
               "sql": "select * from person"  # 将该sql语句选中的数据同步到 elasticsearch
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",   # 设置 index
                           "type": "test",          # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  4. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_init.py

    等待同步完成即可;

增量同步MySql数据到es

  1. 确保要增量同步的MySql数据库开启binlog,且本地开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)

  2. 下载项目到本地,且安装好依赖后,编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "BINLOG"
    SERVER_ID = 99  # 确保每个用于binlog同步的配置文件的SERVER_ID不同;
    SLAVE_UUID = __name__
    
    # 配置开启binlog权限的MySql连接
    BINLOG_CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 配置es节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # [table]所在的数据库
               "table": "person"  # 监控该表的binlog
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"only_fields": {"fields": ["id", "name", "age"]}},  # 只同步这些字段到es,注释掉该行则同步全部字段的值到es
                       {"set_id": {"field": "id"}}  # 设置es中文档_id的值取自 id(或根据需要更改)字段
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",  # 设置 index
                           "type": "test",         # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  3. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_binlog.py

    该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;

    注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;

    同步旧数据请看全量同步MySql数据到es

组织架构

all.png

Pipeline

如果需要从Mysql获取数据再进行特殊处理再同步到elasticsearch,pipeline组件会派上用场。

无论数据来自于全量同步的Sql语句或是通过实时分析binlog。

例如:

  • 只同步某些字段到es

    "pipeline": [
    {"only_fields": {"fields": ["id", "name"]}}, # 只同步 id 和 name字段
      ...
    ]
  • 重命名字段

    "pipeline": [
    {"replace_fields": {"name": ["name1", "name2"]}}, # 将name重命名为name1和name2
      ...
    ]
  • 甚至可以执行跨库数据库查询

    "pipeline": [
    {
        "do_sql": {
            "database": "db2",
            "connection": CONNECTION2,
            "sql": "select company, personid from company_manager where personid = {id}"  # id 的值会自动替换
        }
    }
      ...
    ]

支持编写自定义函数,只需在 row_handlers.py 中加入,之后可在pipeline中配置调用。

row_handlers.py中预定义了一些数据处理函数,但可能需要自定义的情况更多。

常见问题

能否把数据同步到多个es索引?

目前增量同步支持,只需修改配置文件中的[dest]

"dest": [
        {
            "es": {
            "action": "upsert",
            "index": "index1",  # 同步到 es index1.type1
            "type": "type1",
            "nodes": NODES
            }
        },
        {
            "es": {
            "action": "upsert",
            "index": "index2",  # 同时同步到 es index1.type1
            "type": "type2",
            "nodes": NODES
            }
        }
 ]

全量同步很快会支持该功能;

为什么我的增量同步不及时?

  1. 连接本地数据库增量同步不及时

    该情况暂未收到过反馈,如能复现请联系作者。

  2. 连接线上数据库发现增量同步不及时

    2.1 推荐使用内网IP连接数据库。连接线上数据库(如开启在阿里、腾讯服务器上的Mysql)时,推荐使用内网IP地址,因为外网IP会受到带宽等限制导致获取binlog数据速度受限,最终可能造成同步延时。

待改进

  1. 据部分用户反馈,全量同步百万级以上的数据性能不佳。

未完待续

文档近期会大幅度更新完善,任何问题、建议都收到欢迎,请在issues留言,会在24小时内回复;或联系QQ、微信: 358807551;

mysql协议解析扩展

Beatsggg 发表了文章 • 5 个评论 • 262 次浏览 • 2018-05-08 15:40 • 来自相关话题

elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。
elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。

一个把数据从MySQL同步到Elasticsearch的工具

Elasticsearchwindfarer 发表了文章 • 2 个评论 • 6183 次浏览 • 2016-01-13 16:34 • 来自相关话题

https://github.com/zhongbiaode ... -sync 这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。 最近刚刚更新了中文文档。
https://github.com/zhongbiaode ... -sync 这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。 最近刚刚更新了中文文档。