三人行必有我师
mysql

mysql

使用 Logstash 同步 MySQL 到 Easysearch

Easysearchyangmf2040 发表了文章 • 0 个评论 • 1382 次浏览 • 2023-08-17 00:49 • 来自相关话题

从 MySQL 同步数据到 ES 有多种方案,这次我们使用 ELK 技术栈中的 Logstash 来将数据从 MySQL 同步到 Easysearch 。

方案前提

  1. MySQL 表记录必须有主键,比如 id 字段。通过该字段,可将 Easysearch 索引数据与 MySQL 表数据形成一对一映射关系,支持修改。
  2. MySQL 表记录必须有时间字段,以支持增量同步。

如果上述条件具备,便可使用 logstash 定期同步新写入或修改后的数据到 Easysearch 中。

方案演示

版本信息

MySQL: 5.7
Logstash: 7.10.2
Easysearch: 1.5.0

MySQL 设置

创建演示用的表。

CREATE DATABASE es_db;
USE es_db;
DROP TABLE IF EXISTS es_table;
CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

说明

  • id 字段: 主键、唯一键,将作为 Easysearch 索引中的 doc id 字段。
  • modification_time 字段: 表记录的插入和修改都会记录在此。
  • client_name: 代表用户数据。
  • insertion_time: 可省略,用来记录数据插入到 MySQL 数据的时间。

    插入数据

    INSERT INTO es_table (id, client_name) VALUES (1, 'test 1');
    INSERT INTO es_table (id, client_name) VALUES (2, 'test 2');
    INSERT INTO es_table (id, client_name) VALUES (3, 'test 3');

    Logstash

    配置文件

    input {
    jdbc {
    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"
    jdbc_user => "root"
    jdbc_password => "password"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    last_run_metadata_path => "./.mysql-es_table-sql_last_value.yml"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
    }
    jdbc {
    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "*/5 * * * * *"
    statement => "SELECT count(*) AS count,'es_table' AS table_name  from es_table"
    }
    }
    filter {
    if ![table_name] {
        mutate {
           copy => { "id" => "[@metadata][_id]"}
           remove_field => ["@version", "unix_ts_in_secs","@timestamp"]
           add_field => { "[@metadata][target_index]" => "mysql_es_table" } }
      } else {
        mutate { 
           add_field => { "[@metadata][target_index]" => "table_counts" } 
           remove_field => ["@version"]
               }
        uuid {
           target  => "[@metadata][_id]"
           overwrite => true
             }
      }
    }
    output {
    elasticsearch {
      hosts => ["https://localhost:9200"]
      user => "admin"
      password => "f0c6fc61fe5f7b084c00"
      ssl_certificate_verification => "false"
      index => "%{[@metadata][target_index]}"
      manage_template => "false"
      document_id => "%{[@metadata][_id]}"
    }
    }
  • 每 5 秒钟同步一次 es_table 表的数据到 mysql_sync_idx 索引。
  • 每 5 秒统计一次 es_table 表的记录条数到 table_counts 索引,用于监控。

    启动 logstash

    ./bin/logstash -f sync_es_table.conf

    查看同步结果, 3 条数据都已同步到索引。 Mysql 数据库新增记录

    INSERT INTO es_table (id, client_name) VALUES (4, 'test 4');

    Easysearch 确认新增

Mysql 数据库修改记录

UPDATE es_table SET client_name = 'test 0001' WHERE id=1;

Easysearch 确认修改

删除数据

Logstash 无法直接删除操作到 ES ,有两个方案:

  1. 在表中增加 is_deleted 字段,实现软删除,可达到同步的目的。查询过滤掉 is_deleted : true 的记录,后续通过脚本等方式定期清理 is_deleted : true 的数据。
  2. 执行删除操作的程序,删除完 MySQL 中的记录后,继续删除 Easysearch 中的记录。

同步监控

数据已经在 ES 中了,我们可利用 INFINI Console 的数据看板来监控数据是否同步,展示表记录数、索引记录数及其变化。

类比mysql查询,适合新手学习Elasticsearch的DSL查询语句

Elasticsearch 发表了文章 • 0 个评论 • 7691 次浏览 • 2020-04-29 10:44 • 来自相关话题

Mysql查询与Elasticsearch的DSL查询语句对照

作者:

小森同学,互联网公司搜索开发工程师。

前言

作为新入门的后端开发人员,一般对Mysql,SqlServer这类的关系型数据库或多或少都有了解。当入门Elasticsearch时,发现其DSL语句与关系型数据库的查询完全不一样,不再是那熟悉的语法,顿感门槛有点高。为了方便熟悉关系型数据库查询的同学,更加容易,快捷的理解并掌握DSL基础语法,本文将进行Mysql与DSL语句进行类比。

一、Mysql数据库与Elasticsearch的类比

关系型数据库(比如Mysql) 非关系型数据库(Elasticsearch)
数据库 Database 索引 Index
表 Table 类型 Type
数据行 Row 文档 Document
数据列 Column 字段 Field
约束 Schema 映射 Mapping

二、Mysql查询语句与DSL查询类比

Mysql查询语句与Elasticsearch的DSL查询类比,主要通过mysql库中的search_lexicon表和es中的search_lexicon_v1索引进行比较。

2.1 search_lexicon 表结构

CREATE TABLE `search_lexicon` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `keyword` varchar(50) NOT NULL DEFAULT '' COMMENT '关键词',
  `keyword_crc32` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '关键词校验',
  `search_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
  `consumer_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消费者ID',
  `num` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '文档数',
  `views` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '搜索次数',
  `state` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '状态 0 关闭 1 开启',
  `is_del` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否删除 0 正常 1 删除',
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
  `updatetime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据最后更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_search_lexicon_views` (`views`),
  KEY `idx_search_lexicon_updatetime` (`updatetime`) USING BTREE,
  KEY `idx_search_lexicon_keyword_type` (`keyword_crc32`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索词库';

2.2 search_lexicon_v1 索引结构

{
  "search_lexicon_v1" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "@version" : {
            "type" : "long"
          },
          "consumer_id" : {
            "type" : "keyword"
          },
          "createtime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "id" : {
            "type" : "integer"
          },
          "is_del" : {
            "type" : "integer"
          },
          "keyword" : {
            "type" : "text",
            "fields" : {
              "standard" : {
                "type" : "text",
                "analyzer" : "by_standard_no_synonym"
              }
            },
            "analyzer" : "by_max_word_pinyin_no_synonym"
          },
          "num" : {
            "type" : "long"
          },
          "search_type" : {
            "type" : "integer"
          },
          "state" : {
            "type" : "integer"
          },
          "updatetime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "views" : {
            "type" : "long"
          }
        }
      }
    }
  }
}

2.3 查询语句对照

注意:dsl查询,每次默认展示10(size默认为10)条

以下的查询条件,是为了写查询而构造的,无任何实质性的意义,仅供mysql查询与dsl查询对比用

布尔查询支持的子查询类型共有四种,分别是:must,should,must_not和filter:

查询字句 说明 类型
must 文档必须符合must中所有的条件,会影响相关性得分 数组
should 文档应该匹配should子句查询的一个或多个 数组
must_not 文档必须不符合must_not 中的所有条件 数组
filter 过滤器,文档必须匹配该过滤条件,跟must子句的唯一区别是,filter不影响查询的score ,会缓存 字典

A、查询所有数据

mysql

SELECT * FROM search_lexicon

dsl

GET search_lexicon/_search
{

}
或
GET search_lexicon/_search
{
  "query": {
    "match_all": {}
  }
}

B、 查询一个条件且条件只有一个值(consumer_id=demo)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id='demo'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "consumer_id": "demo"
        }
      }
    }
  }
}
或
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        }
      ]
    }
  }
}

两者的区别在于前一个filter是一个对象,filter中只能放一个条件,后者filter是一个数组,里面可以放多个对象(多个查询条件),后续都将按照第二种方式查询

C、 查询一个条件且条件有多个值(consumer_id的值为demo,demo2)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id in('demo','demo2')

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "consumer_id": [
              "demo",
              "demo2"
            ]
          }
        }
      ]
    }
  }
}

D、 查询consumer_id=demo 且 state=1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ]
    }
  }
}

E、 查询consumer_id=demo , state=1 且 is_del<>1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1 and is_del <>1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ],
      "must_not": [
        {
          "term": {
            "is_del": {
              "value": 1
            }
          }
        }
      ]
    }
  }
}

F、查询Sconsumer_id ='demo' or (state=1 and is_del =0)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  }
}

G、在F的基础上,查询指定字段

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  }
}

H、在G的基础上,增加排序

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ]
}

I、在H的基础上,添加分页

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC LIMIT 0,20

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ],
  "from": 0,
  "size": 20
}

# from 是一个偏移量,size为每页显示条数

J、去重查询

mysql

SELECT DISTINCT state FROM search_lexicon WHERE consumer_id = 'demo'

dsl

# 通过折叠去重查询
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "collapse": {
    "field": "state"
  }
}

K、分组查询

mysql

SELECT  * FROM search_lexicon WHERE consumer_id = 'demo' GROUP BY state

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "size": 0, 
  "aggs": {
    "aaa": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}

L、模糊匹配

mysql

SELECT * FROM search_lexicon WHERE consumer_id="demo" and keyword LIKE '%渴望%'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ],
      "must": [
        {
          "match": {
            "keyword": "渴望"
          }
        }
      ]
    }
  }
}

三、总结

Mysql查询与DSL查询对照,用心体会二者之间,上下文之间,各查询条件的差异与相似,快速掌握DSL的语法结构,You can do it!

声明:

本文版权归作者所有,未经许可不得擅自转载或引用。 原文地址:https://elasticsearch.cn/article/13760

logstash同步多张相同表结构的数据表到同一个索引

Logstashzm 回复了问题 • 4 人关注 • 4 个回复 • 4815 次浏览 • 2019-10-20 17:09 • 来自相关话题

logstash向es中导入mysql数据,tinyint字段导入过程中抛出mapper_parsing_exception类型异常

Logstashguotenv 回复了问题 • 3 人关注 • 3 个回复 • 9500 次浏览 • 2019-10-17 14:52 • 来自相关话题

从MySQL建立Elasticsearch索引-索引

Elasticsearchhufuman 发表了文章 • 0 个评论 • 4729 次浏览 • 2019-07-16 23:24 • 来自相关话题

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

从MySQL建立Elasticsearch索引-引言

Elasticsearchhufuman 发表了文章 • 0 个评论 • 2688 次浏览 • 2019-07-16 23:19 • 来自相关话题

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

MySQL慢日志 制图去掉重复SQL 问题

Kibanakkk 回复了问题 • 2 人关注 • 3 个回复 • 2358 次浏览 • 2019-04-23 16:28 • 来自相关话题

logstash导入mysql上亿级别数据的效率问题

LogstashJea 回复了问题 • 14 人关注 • 7 个回复 • 20721 次浏览 • 2019-03-08 17:33 • 来自相关话题

用go-mysql-elasticsearch同步mysql数据到es5.5.0的时候,字符串怎么转日期

Elasticsearchbellengao 回复了问题 • 2 人关注 • 1 个回复 • 2836 次浏览 • 2018-12-15 11:05 • 来自相关话题

filebeat只把系统日志发到ELK,没有发送mysql日志到ELK

Beatssailershen 回复了问题 • 3 人关注 • 3 个回复 • 4059 次浏览 • 2018-11-05 01:15 • 来自相关话题

应使用哪个beats同步mysql数据到es

Beatschienx 回复了问题 • 4 人关注 • 2 个回复 • 5861 次浏览 • 2018-09-18 08:57 • 来自相关话题

推荐一个同步Mysql数据到Elasticsearch的工具

ElasticsearchMCTW 发表了文章 • 13 个评论 • 30414 次浏览 • 2018-08-14 15:47 • 来自相关话题

把Mysql的数据同步到Elasticsearch是个很常见的需求,但在Github里找到的同步工具用起来或多或少都有些别扭。 例如:某记录内容为"aaa|bbb|ccc",将其按|分割成数组同步到es,这样的简单任务都难以实现,再加上配置繁琐,文档语焉不详... 所以我写了个同步工具MysqlsMom:力求用最简单的配置完成复杂的同步任务。目前除了我所在的部门,也有越来越多的互联网公司在生产环境中使用该工具了。 欢迎各位大佬进行试用并提出意见,任何建议、鼓励、批评都受到欢迎。 github: https://github.com/m358807551/mysqlsmom Alt text

简介:同步 Mysql 数据到 elasticsearch 的工具; QQ、微信:358807551

特点

  1. Python 编写;
  2. 支持基于 sql 语句的全量同步,基于 binlog 的增量同步,基于更新字段的增量同步三种同步方式;
  3. 全量更新只占用少量内存;支持通过sql语句同步数据;
  4. 增量更新自动断点续传;
  5. 取自 Mysql 的数据可经过一系列自定义函数的处理后再同步至 Elasticsearch
  6. 能用非常简单的配置完成复杂的同步任务;

环境

  • python2.7;
  • 增量同步需开启 redis
  • 分析 binlog 的增量同步需要 Mysql 开启 binlogbinlog-format=row);

快速开始

全量同步MySql数据到es

  1. clone 项目到本地;

  2. 安装依赖;

    cd mysqlsmom
    pip install -r requirements.txt

    默认支持 elasticsearch-2.4版本,支持其它版本请运行(将5.4换成需要的elasticsearch版本)

    pip install --upgrade elasticsearch==5.4
  3. 编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "INIT"
    
    # 修改数据库连接
    CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 修改elasticsearch节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # 在此数据库执行sql语句
               "sql": "select * from person"  # 将该sql语句选中的数据同步到 elasticsearch
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",   # 设置 index
                           "type": "test",          # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  4. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_init.py

    等待同步完成即可;

分析 binlog 的增量同步

  1. 确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)

  2. 下载项目到本地,且安装好依赖后,编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "BINLOG"
    SERVER_ID = 99  # 确保每个用于binlog同步的配置文件的SERVER_ID不同;
    SLAVE_UUID = __name__
    
    # 配置开启binlog权限的MySql连接
    BINLOG_CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 配置es节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # [table]所在的数据库
               "table": "person"  # 监控该表的binlog
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"only_fields": {"fields": ["id", "name", "age"]}},  # 只同步这些字段到es,注释掉该行则同步全部字段的值到es
                       {"set_id": {"field": "id"}}  # 设置es中文档_id的值取自 id(或根据需要更改)字段
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",  # 设置 index
                           "type": "test",         # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  3. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_binlog.py

    该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;

    注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;

    同步旧数据请看全量同步MySql数据到es

基于更新时间的增量同步

Mysql 表中有类似 update_time 的时间字段,且在每次插入、更新数据后将该字段的值设置为操作时间,则可在不用开启 binlog 的情况下进行增量同步。

  1. 下载项目到本地,且安装好依赖后,编辑 ./config/example_cron.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "CRON"
    
    # 修改数据库连接
    CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # redis存储上次同步时间等信息
    REDIS = {
       "host": "127.0.0.1",
       "port": 6379,
       "db": 0,
       "password": "password",  # 不需要密码则注释或删掉该行
    }
    
    # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
    BULK_SIZE = 1
    
    # 修改elasticsearch节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # 在此数据库执行sql语句
               "sql": "select id, name from person where update_time >= ?",  # 将该sql语句选中的数据同步到 elasticsearch
               "seconds": 10,  # 每隔 seconds 秒同步一次,
               "init_time": "2018-08-15 18:05:47"  # 只有第一次同步会加载
           },
           "jobs": [
               {
                   "pipeline": [
                       {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为 es 中的文档id
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",   # 设置 index
                           "type": "test"          # 设置 type
                       }
                   }
               }
           ]
       }
    ]
  2. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_cron.py

组织架构

Alt text

Mysqlsmom 使用实战

Mysqlsmom 的灵活性依赖于:

  • row_handlers.py 中添加自定义函数对取自Mysql的数据进行二次加工。
  • row_filters.py 中添加自定义函数决定是否要同步某一条数据。
  • config/ 目录下的任意配置文件应用上面的函数。

如果不了解 Python 也没关系,上述两个文件中自带的函数足以应付大多数种情况,遇到特殊的同步需求可以在 Github 发起 issue 或通过微信、QQ联系作者。

同步多张表

在一个配置文件中即可完成:

...
TASKS = [
    # 同步表1
    {
        "stream": {
            "database": "数据库名1",
            "table": "表名1"
        },
        "jobs": [...]
    }
    # 同步表2
    {
        "stream": {
            "database": "数据库名2",
            "table": "表名2"
        },
        "jobs": [...]
    }
]

一个 Mysql Connection 对应一个配置文件。

一张表同步到多个索引

分为两种情况。

一种是把相同的数据同步到不同的索引,配置如下:

...
TASKS = [
    {
        "stream": {...},
        "jobs": [
            {
                "actions": [...],
                "pipeline": [...],
                "dest": [
                    # 同步到索引1
                    {
                        "es": {"action": "upsert", "index": "索引1", "type": "类型1", "nodes": NODES},
                    },
                    # 同步到索引2
                    {
                        "es": {"action": "upsert", "index": "索引2", "type": "类型2", "nodes": NODES},
                    }
                ]
            }
        ]
    },
    ...
]

另一种是把同一个表产生的数据经过不同的 pipeline 同步到不同的索引:

...
TASKS = [
    {
        "stream": {...},
        "jobs": [
            {
                "actions": {...},
                "pipeline": [...],  # 对数据经过一系列处理
                "dest": {"es": {"index": "索引1", ...}}  # 同步到索引1
            },
            {
                "actions": {...},
                "pipeline": [...],  # 与上面的pipeline不同
                "dest": {"es": {"index": "索引2", ...}}  # 同步到索引2
            }
        ]
    }
]
  • TASKS 中的每一项对应一张要同步的表。
  • jobs 中的每一项对应对一条记录的一种处理方式。
  • dest 中的每一项对应一个es索引类型。

只同步某些字段

对每条来自 Mysql 的 记录的处理都在 pipeline 中进行处理。

"pipeline": [
    {"only_fields": {"fields": ["id", "name"]}},  # 只同步 id 和 name字段
    {"set_id": {"field": "id"}}  # 然后设置 id 字段为es中文档的_id
]

字段重命名

对于 Mysql 中的字段名和 elasticsearch 中的域名不一致的情况:

"pipeline": [
    # 将name重命名为name1,age 重命名为age1
    {"replace_fields": {"name": ["name1"], "age": ["age1"]}},
    {"set_id": {"field": "id"}}
]

pipeline 会依次执行处理函数,上面的例子等价于:

"pipeline": [
    # 先重命名 name 为 name1
    {"replace_fields": {"name": ["name1"]}},
    # 再重命名 age 为 age1
    {"replace_fields": {"age": ["age1"]}},
    {"set_id": {"field": "id"}}
]

还有一种特殊情形,es 中两个字段存相同的数据,但是分词方式不同。

例如 name_default 的分析器为 defaultname_raw 设置为不分词,需要将 name 的值同时同步到这两个域:

"pipeline": [
    {"replace_fields": {"name": ["name_default", "name_raw"]}},
    {"set_id": {"field": "id"}}
]

当然上述问题有一个更好的解决方案,在 esmappings 中配置 name 字段的 fields 属性即可,这超出了本文档的内容。

切分字符串为数组

有时 Mysql 存储字符串类似:"aaa|bbb|ccc",希望转化成数组: ["aaa", "bbb", "ccc"] 再进行同步

"pipeline": [
    # tags 存储类似"aaa|bbb|ccc"的字符串,将 tags 字段的值按符号 `|` 切分成数组
    {"split": {"field": "tags", "flag": "|"}},
    {"set_id": {"field": "id"}}
] 

同步删除文档

只有 binlog 同步 能实现删除 elasticsearch 中的文档,配置如下:

TASKS = [
    {
        "stream": {
            "database": "test_db",
            "table": "person"
        },
        "jobs": [
            # 插入、更新
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    {"set_id": {"field": "id"}}  # 设置 id 字段的值为 es 中文档 _id
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        ...
                    }
                }
            },
            # 重点在这里,配置删除
            {
                "actions": ["delete"],  # 当读取到 binlog 中该表的删除操作时
                "pipeline": [{"set_id": {"field": "id"}}],  # 要删除的文档 _id
                "dest": {
                    "es": {
                        "action": "delete",  # 在 es 中执行删除操作
                        ...  # 与上面的 index 和 type 相同
                    }
                }
            }
        ]
    },
    ...
]

更多示例正在更新

常见问题

为什么我的增量同步不及时?

  1. 连接本地数据库增量同步不及时

    该情况暂未收到过反馈,如能复现请联系作者。

  2. 连接线上数据库发现增量同步不及时

    2.1 推荐使用内网IP连接数据库。连接线上数据库(如开启在阿里、腾讯服务器上的Mysql)时,推荐使用内网IP地址,因为外网IP会受到带宽等限制导致获取binlog数据速度受限,最终可能造成同步延时。

待改进

  1. 据部分用户反馈,全量同步百万级以上的数据性能不佳。

未完待续

文档近期会较频繁更新,任何问题、建议都收到欢迎,请在issues留言,会在24小时内回复;或联系QQ、微信: 358807551;

logstash的input和output的statement => 有个需求怎么设置

Logstashheyddo 回复了问题 • 4 人关注 • 4 个回复 • 6602 次浏览 • 2018-08-08 14:12 • 来自相关话题

请教各位大神,我的logstash提取数据后,是无限自循环的数据,怎么办呢?

Logstashlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 3869 次浏览 • 2018-08-06 22:14 • 来自相关话题

我的logstash input jdbc 配置时间格式不一样怎么办

Logstashmedcl 回复了问题 • 4 人关注 • 2 个回复 • 6153 次浏览 • 2018-08-06 10:58 • 来自相关话题

条新动态, 点击查看
有几种方法,看着选吧:
1.在业务系统里做双写
 
2.用elasticsearch-jdbc之类的工具来做全量和增量同步
 
3.用阿里的canal来做数据库binlog->kafka->es的同步,需要开发,而且依赖比较多,小公司选择还是慎重些... 显示全部 »
有几种方法,看着选吧:
1.在业务系统里做双写
 
2.用elasticsearch-jdbc之类的工具来做全量和增量同步
 
3.用阿里的canal来做数据库binlog->kafka->es的同步,需要开发,而且依赖比较多,小公司选择还是慎重些吧
hexiaohong

hexiaohong 回答了问题 • 2018-05-16 16:08 • 7 个回复 不感兴趣

logstash导入mysql上亿级别数据的效率问题

赞同来自:

我也遇到你同样问题,可以这么解决:
1.给updated_ts时间字段加上索引。
2.分批处理原则
(1)你的SQL:每批处理100000个
SELECT a.party_id AS id ,a.* FROM PARTY_ALL_1 a WHERE a.upd... 显示全部 »
我也遇到你同样问题,可以这么解决:
1.给updated_ts时间字段加上索引。
2.分批处理原则
(1)你的SQL:每批处理100000个
SELECT a.party_id AS id ,a.* FROM PARTY_ALL_1 a WHERE a.updated_ts > '2011-11-17 13:23:58' order by a.updated_ts asc LIMIT 100000;
 
(2)logstash分页的时候每次处理50000个
SELECT * FROM (SELECT a.party_id AS id ,a.* FROM PARTY_ALL_1 a WHERE a.updated_ts > '2011-11-17 13:23:58' order by a.updated_ts asc LIMIT 100000) AS `t1` LIMIT 50000 OFFSET 0;
 
SELECT * FROM (SELECT a.party_id AS id ,a.* FROM PARTY_ALL_1 a WHERE a.updated_ts > '2011-11-17 13:23:58' order by a.updated_ts asc LIMIT 100000) AS `t1` LIMIT 50000 OFFSET 50000;
 
(3)处理两次就写一个最后一条记录的updated_ts时间到指定文件。下一个定时任务启动的时候进行循环处理就行,因为每批处理updated_ts都会改变
 
好问题
好问题

logstash同步多张相同表结构的数据表到同一个索引

回复

Logstashzm 回复了问题 • 4 人关注 • 4 个回复 • 4815 次浏览 • 2019-10-20 17:09 • 来自相关话题

logstash向es中导入mysql数据,tinyint字段导入过程中抛出mapper_parsing_exception类型异常

回复

Logstashguotenv 回复了问题 • 3 人关注 • 3 个回复 • 9500 次浏览 • 2019-10-17 14:52 • 来自相关话题

MySQL慢日志 制图去掉重复SQL 问题

回复

Kibanakkk 回复了问题 • 2 人关注 • 3 个回复 • 2358 次浏览 • 2019-04-23 16:28 • 来自相关话题

logstash导入mysql上亿级别数据的效率问题

回复

LogstashJea 回复了问题 • 14 人关注 • 7 个回复 • 20721 次浏览 • 2019-03-08 17:33 • 来自相关话题

用go-mysql-elasticsearch同步mysql数据到es5.5.0的时候,字符串怎么转日期

回复

Elasticsearchbellengao 回复了问题 • 2 人关注 • 1 个回复 • 2836 次浏览 • 2018-12-15 11:05 • 来自相关话题

filebeat只把系统日志发到ELK,没有发送mysql日志到ELK

回复

Beatssailershen 回复了问题 • 3 人关注 • 3 个回复 • 4059 次浏览 • 2018-11-05 01:15 • 来自相关话题

应使用哪个beats同步mysql数据到es

回复

Beatschienx 回复了问题 • 4 人关注 • 2 个回复 • 5861 次浏览 • 2018-09-18 08:57 • 来自相关话题

logstash的input和output的statement => 有个需求怎么设置

回复

Logstashheyddo 回复了问题 • 4 人关注 • 4 个回复 • 6602 次浏览 • 2018-08-08 14:12 • 来自相关话题

请教各位大神,我的logstash提取数据后,是无限自循环的数据,怎么办呢?

回复

Logstashlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 3869 次浏览 • 2018-08-06 22:14 • 来自相关话题

我的logstash input jdbc 配置时间格式不一样怎么办

回复

Logstashmedcl 回复了问题 • 4 人关注 • 2 个回复 • 6153 次浏览 • 2018-08-06 10:58 • 来自相关话题

logstash-input-jdbc 和 logstash-output-jdbc 中间如何过滤改变一下时间字段数据格式

回复

默认分类ljx95315 发起了问题 • 1 人关注 • 0 个回复 • 3232 次浏览 • 2018-08-05 01:25 • 来自相关话题

我的logstash配置文件input和output检索数据表字段和获取数据表字段如何配置??

回复

Logstash匿名用户 发起了问题 • 1 人关注 • 0 个回复 • 960 次浏览 • 2018-08-02 21:52 • 来自相关话题

请教一个logstash简单的获取数据的问题,获取的数据错误

回复

Logstashmedcl 回复了问题 • 2 人关注 • 2 个回复 • 3247 次浏览 • 2018-08-02 10:29 • 来自相关话题

请求指导技术解决方向

回复

Elasticsearchyayg2008 回复了问题 • 3 人关注 • 2 个回复 • 2957 次浏览 • 2018-05-10 21:38 • 来自相关话题

各位做过 将 mysql表导入到 kafka 中吗, 然后从kafka 中导入到es 中, 有什么方案吗?

回复

Elasticsearchclean 回复了问题 • 4 人关注 • 4 个回复 • 5905 次浏览 • 2018-04-04 10:10 • 来自相关话题

使用 Logstash 同步 MySQL 到 Easysearch

Easysearchyangmf2040 发表了文章 • 0 个评论 • 1382 次浏览 • 2023-08-17 00:49 • 来自相关话题

从 MySQL 同步数据到 ES 有多种方案,这次我们使用 ELK 技术栈中的 Logstash 来将数据从 MySQL 同步到 Easysearch 。

方案前提

  1. MySQL 表记录必须有主键,比如 id 字段。通过该字段,可将 Easysearch 索引数据与 MySQL 表数据形成一对一映射关系,支持修改。
  2. MySQL 表记录必须有时间字段,以支持增量同步。

如果上述条件具备,便可使用 logstash 定期同步新写入或修改后的数据到 Easysearch 中。

方案演示

版本信息

MySQL: 5.7
Logstash: 7.10.2
Easysearch: 1.5.0

MySQL 设置

创建演示用的表。

CREATE DATABASE es_db;
USE es_db;
DROP TABLE IF EXISTS es_table;
CREATE TABLE es_table (
  id BIGINT(20) UNSIGNED NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY unique_id (id),
  client_name VARCHAR(32) NOT NULL,
  modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

说明

  • id 字段: 主键、唯一键,将作为 Easysearch 索引中的 doc id 字段。
  • modification_time 字段: 表记录的插入和修改都会记录在此。
  • client_name: 代表用户数据。
  • insertion_time: 可省略,用来记录数据插入到 MySQL 数据的时间。

    插入数据

    INSERT INTO es_table (id, client_name) VALUES (1, 'test 1');
    INSERT INTO es_table (id, client_name) VALUES (2, 'test 2');
    INSERT INTO es_table (id, client_name) VALUES (3, 'test 3');

    Logstash

    配置文件

    input {
    jdbc {
    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"
    jdbc_user => "root"
    jdbc_password => "password"
    jdbc_paging_enabled => true
    tracking_column => "unix_ts_in_secs"
    use_column_value => true
    tracking_column_type => "numeric"
    last_run_metadata_path => "./.mysql-es_table-sql_last_value.yml"
    schedule => "*/5 * * * * *"
    statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC"
    }
    jdbc {
    jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db"
    jdbc_user => "root"
    jdbc_password => "password"
    schedule => "*/5 * * * * *"
    statement => "SELECT count(*) AS count,'es_table' AS table_name  from es_table"
    }
    }
    filter {
    if ![table_name] {
        mutate {
           copy => { "id" => "[@metadata][_id]"}
           remove_field => ["@version", "unix_ts_in_secs","@timestamp"]
           add_field => { "[@metadata][target_index]" => "mysql_es_table" } }
      } else {
        mutate { 
           add_field => { "[@metadata][target_index]" => "table_counts" } 
           remove_field => ["@version"]
               }
        uuid {
           target  => "[@metadata][_id]"
           overwrite => true
             }
      }
    }
    output {
    elasticsearch {
      hosts => ["https://localhost:9200"]
      user => "admin"
      password => "f0c6fc61fe5f7b084c00"
      ssl_certificate_verification => "false"
      index => "%{[@metadata][target_index]}"
      manage_template => "false"
      document_id => "%{[@metadata][_id]}"
    }
    }
  • 每 5 秒钟同步一次 es_table 表的数据到 mysql_sync_idx 索引。
  • 每 5 秒统计一次 es_table 表的记录条数到 table_counts 索引,用于监控。

    启动 logstash

    ./bin/logstash -f sync_es_table.conf

    查看同步结果, 3 条数据都已同步到索引。 Mysql 数据库新增记录

    INSERT INTO es_table (id, client_name) VALUES (4, 'test 4');

    Easysearch 确认新增

Mysql 数据库修改记录

UPDATE es_table SET client_name = 'test 0001' WHERE id=1;

Easysearch 确认修改

删除数据

Logstash 无法直接删除操作到 ES ,有两个方案:

  1. 在表中增加 is_deleted 字段,实现软删除,可达到同步的目的。查询过滤掉 is_deleted : true 的记录,后续通过脚本等方式定期清理 is_deleted : true 的数据。
  2. 执行删除操作的程序,删除完 MySQL 中的记录后,继续删除 Easysearch 中的记录。

同步监控

数据已经在 ES 中了,我们可利用 INFINI Console 的数据看板来监控数据是否同步,展示表记录数、索引记录数及其变化。

类比mysql查询,适合新手学习Elasticsearch的DSL查询语句

Elasticsearch 发表了文章 • 0 个评论 • 7691 次浏览 • 2020-04-29 10:44 • 来自相关话题

Mysql查询与Elasticsearch的DSL查询语句对照

作者:

小森同学,互联网公司搜索开发工程师。

前言

作为新入门的后端开发人员,一般对Mysql,SqlServer这类的关系型数据库或多或少都有了解。当入门Elasticsearch时,发现其DSL语句与关系型数据库的查询完全不一样,不再是那熟悉的语法,顿感门槛有点高。为了方便熟悉关系型数据库查询的同学,更加容易,快捷的理解并掌握DSL基础语法,本文将进行Mysql与DSL语句进行类比。

一、Mysql数据库与Elasticsearch的类比

关系型数据库(比如Mysql) 非关系型数据库(Elasticsearch)
数据库 Database 索引 Index
表 Table 类型 Type
数据行 Row 文档 Document
数据列 Column 字段 Field
约束 Schema 映射 Mapping

二、Mysql查询语句与DSL查询类比

Mysql查询语句与Elasticsearch的DSL查询类比,主要通过mysql库中的search_lexicon表和es中的search_lexicon_v1索引进行比较。

2.1 search_lexicon 表结构

CREATE TABLE `search_lexicon` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `keyword` varchar(50) NOT NULL DEFAULT '' COMMENT '关键词',
  `keyword_crc32` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '关键词校验',
  `search_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
  `consumer_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消费者ID',
  `num` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '文档数',
  `views` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '搜索次数',
  `state` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '状态 0 关闭 1 开启',
  `is_del` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否删除 0 正常 1 删除',
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
  `updatetime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据最后更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_search_lexicon_views` (`views`),
  KEY `idx_search_lexicon_updatetime` (`updatetime`) USING BTREE,
  KEY `idx_search_lexicon_keyword_type` (`keyword_crc32`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索词库';

2.2 search_lexicon_v1 索引结构

{
  "search_lexicon_v1" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "@version" : {
            "type" : "long"
          },
          "consumer_id" : {
            "type" : "keyword"
          },
          "createtime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "id" : {
            "type" : "integer"
          },
          "is_del" : {
            "type" : "integer"
          },
          "keyword" : {
            "type" : "text",
            "fields" : {
              "standard" : {
                "type" : "text",
                "analyzer" : "by_standard_no_synonym"
              }
            },
            "analyzer" : "by_max_word_pinyin_no_synonym"
          },
          "num" : {
            "type" : "long"
          },
          "search_type" : {
            "type" : "integer"
          },
          "state" : {
            "type" : "integer"
          },
          "updatetime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "views" : {
            "type" : "long"
          }
        }
      }
    }
  }
}

2.3 查询语句对照

注意:dsl查询,每次默认展示10(size默认为10)条

以下的查询条件,是为了写查询而构造的,无任何实质性的意义,仅供mysql查询与dsl查询对比用

布尔查询支持的子查询类型共有四种,分别是:must,should,must_not和filter:

查询字句 说明 类型
must 文档必须符合must中所有的条件,会影响相关性得分 数组
should 文档应该匹配should子句查询的一个或多个 数组
must_not 文档必须不符合must_not 中的所有条件 数组
filter 过滤器,文档必须匹配该过滤条件,跟must子句的唯一区别是,filter不影响查询的score ,会缓存 字典

A、查询所有数据

mysql

SELECT * FROM search_lexicon

dsl

GET search_lexicon/_search
{

}
或
GET search_lexicon/_search
{
  "query": {
    "match_all": {}
  }
}

B、 查询一个条件且条件只有一个值(consumer_id=demo)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id='demo'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "consumer_id": "demo"
        }
      }
    }
  }
}
或
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        }
      ]
    }
  }
}

两者的区别在于前一个filter是一个对象,filter中只能放一个条件,后者filter是一个数组,里面可以放多个对象(多个查询条件),后续都将按照第二种方式查询

C、 查询一个条件且条件有多个值(consumer_id的值为demo,demo2)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id in('demo','demo2')

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "consumer_id": [
              "demo",
              "demo2"
            ]
          }
        }
      ]
    }
  }
}

D、 查询consumer_id=demo 且 state=1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ]
    }
  }
}

E、 查询consumer_id=demo , state=1 且 is_del<>1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1 and is_del <>1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ],
      "must_not": [
        {
          "term": {
            "is_del": {
              "value": 1
            }
          }
        }
      ]
    }
  }
}

F、查询Sconsumer_id ='demo' or (state=1 and is_del =0)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  }
}

G、在F的基础上,查询指定字段

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  }
}

H、在G的基础上,增加排序

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ]
}

I、在H的基础上,添加分页

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC LIMIT 0,20

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ],
  "from": 0,
  "size": 20
}

# from 是一个偏移量,size为每页显示条数

J、去重查询

mysql

SELECT DISTINCT state FROM search_lexicon WHERE consumer_id = 'demo'

dsl

# 通过折叠去重查询
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "collapse": {
    "field": "state"
  }
}

K、分组查询

mysql

SELECT  * FROM search_lexicon WHERE consumer_id = 'demo' GROUP BY state

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "size": 0, 
  "aggs": {
    "aaa": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}

L、模糊匹配

mysql

SELECT * FROM search_lexicon WHERE consumer_id="demo" and keyword LIKE '%渴望%'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ],
      "must": [
        {
          "match": {
            "keyword": "渴望"
          }
        }
      ]
    }
  }
}

三、总结

Mysql查询与DSL查询对照,用心体会二者之间,上下文之间,各查询条件的差异与相似,快速掌握DSL的语法结构,You can do it!

声明:

本文版权归作者所有,未经许可不得擅自转载或引用。 原文地址:https://elasticsearch.cn/article/13760

从MySQL建立Elasticsearch索引-索引

Elasticsearchhufuman 发表了文章 • 0 个评论 • 4729 次浏览 • 2019-07-16 23:24 • 来自相关话题

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

从MySQL建立Elasticsearch索引-引言

Elasticsearchhufuman 发表了文章 • 0 个评论 • 2688 次浏览 • 2019-07-16 23:19 • 来自相关话题

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

推荐一个同步Mysql数据到Elasticsearch的工具

ElasticsearchMCTW 发表了文章 • 13 个评论 • 30414 次浏览 • 2018-08-14 15:47 • 来自相关话题

把Mysql的数据同步到Elasticsearch是个很常见的需求,但在Github里找到的同步工具用起来或多或少都有些别扭。 例如:某记录内容为"aaa|bbb|ccc",将其按|分割成数组同步到es,这样的简单任务都难以实现,再加上配置繁琐,文档语焉不详... 所以我写了个同步工具MysqlsMom:力求用最简单的配置完成复杂的同步任务。目前除了我所在的部门,也有越来越多的互联网公司在生产环境中使用该工具了。 欢迎各位大佬进行试用并提出意见,任何建议、鼓励、批评都受到欢迎。 github: https://github.com/m358807551/mysqlsmom Alt text

简介:同步 Mysql 数据到 elasticsearch 的工具; QQ、微信:358807551

特点

  1. Python 编写;
  2. 支持基于 sql 语句的全量同步,基于 binlog 的增量同步,基于更新字段的增量同步三种同步方式;
  3. 全量更新只占用少量内存;支持通过sql语句同步数据;
  4. 增量更新自动断点续传;
  5. 取自 Mysql 的数据可经过一系列自定义函数的处理后再同步至 Elasticsearch
  6. 能用非常简单的配置完成复杂的同步任务;

环境

  • python2.7;
  • 增量同步需开启 redis
  • 分析 binlog 的增量同步需要 Mysql 开启 binlogbinlog-format=row);

快速开始

全量同步MySql数据到es

  1. clone 项目到本地;

  2. 安装依赖;

    cd mysqlsmom
    pip install -r requirements.txt

    默认支持 elasticsearch-2.4版本,支持其它版本请运行(将5.4换成需要的elasticsearch版本)

    pip install --upgrade elasticsearch==5.4
  3. 编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "INIT"
    
    # 修改数据库连接
    CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 修改elasticsearch节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # 在此数据库执行sql语句
               "sql": "select * from person"  # 将该sql语句选中的数据同步到 elasticsearch
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",   # 设置 index
                           "type": "test",          # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  4. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_init.py

    等待同步完成即可;

分析 binlog 的增量同步

  1. 确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)

  2. 下载项目到本地,且安装好依赖后,编辑 ./config/example_init.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "BINLOG"
    SERVER_ID = 99  # 确保每个用于binlog同步的配置文件的SERVER_ID不同;
    SLAVE_UUID = __name__
    
    # 配置开启binlog权限的MySql连接
    BINLOG_CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # 配置es节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # [table]所在的数据库
               "table": "person"  # 监控该表的binlog
           },
           "jobs": [
               {
                   "actions": ["insert", "update"],
                   "pipeline": [
                       {"only_fields": {"fields": ["id", "name", "age"]}},  # 只同步这些字段到es,注释掉该行则同步全部字段的值到es
                       {"set_id": {"field": "id"}}  # 设置es中文档_id的值取自 id(或根据需要更改)字段
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",  # 设置 index
                           "type": "test",         # 设置 type
                           "nodes": NODES
                       }
                   }
               }
           ]
       }
    ]
  3. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_binlog.py

    该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;

    注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;

    同步旧数据请看全量同步MySql数据到es

基于更新时间的增量同步

Mysql 表中有类似 update_time 的时间字段,且在每次插入、更新数据后将该字段的值设置为操作时间,则可在不用开启 binlog 的情况下进行增量同步。

  1. 下载项目到本地,且安装好依赖后,编辑 ./config/example_cron.py,按注释提示修改配置;

    # coding=utf-8
    
    STREAM = "CRON"
    
    # 修改数据库连接
    CONNECTION = {
       'host': '127.0.0.1',
       'port': 3306,
       'user': 'root',
       'passwd': ''
    }
    
    # redis存储上次同步时间等信息
    REDIS = {
       "host": "127.0.0.1",
       "port": 6379,
       "db": 0,
       "password": "password",  # 不需要密码则注释或删掉该行
    }
    
    # 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
    BULK_SIZE = 1
    
    # 修改elasticsearch节点
    NODES = [{"host": "127.0.0.1", "port": 9200}]
    
    TASKS = [
       {
           "stream": {
               "database": "test_db",  # 在此数据库执行sql语句
               "sql": "select id, name from person where update_time >= ?",  # 将该sql语句选中的数据同步到 elasticsearch
               "seconds": 10,  # 每隔 seconds 秒同步一次,
               "init_time": "2018-08-15 18:05:47"  # 只有第一次同步会加载
           },
           "jobs": [
               {
                   "pipeline": [
                       {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为 es 中的文档id
                   ],
                   "dest": {
                       "es": {
                           "action": "upsert",
                           "index": "test_index",   # 设置 index
                           "type": "test"          # 设置 type
                       }
                   }
               }
           ]
       }
    ]
  2. 运行

    cd mysqlsmom
    python mysqlsmom.py ./config/example_cron.py

组织架构

Alt text

Mysqlsmom 使用实战

Mysqlsmom 的灵活性依赖于:

  • row_handlers.py 中添加自定义函数对取自Mysql的数据进行二次加工。
  • row_filters.py 中添加自定义函数决定是否要同步某一条数据。
  • config/ 目录下的任意配置文件应用上面的函数。

如果不了解 Python 也没关系,上述两个文件中自带的函数足以应付大多数种情况,遇到特殊的同步需求可以在 Github 发起 issue 或通过微信、QQ联系作者。

同步多张表

在一个配置文件中即可完成:

...
TASKS = [
    # 同步表1
    {
        "stream": {
            "database": "数据库名1",
            "table": "表名1"
        },
        "jobs": [...]
    }
    # 同步表2
    {
        "stream": {
            "database": "数据库名2",
            "table": "表名2"
        },
        "jobs": [...]
    }
]

一个 Mysql Connection 对应一个配置文件。

一张表同步到多个索引

分为两种情况。

一种是把相同的数据同步到不同的索引,配置如下:

...
TASKS = [
    {
        "stream": {...},
        "jobs": [
            {
                "actions": [...],
                "pipeline": [...],
                "dest": [
                    # 同步到索引1
                    {
                        "es": {"action": "upsert", "index": "索引1", "type": "类型1", "nodes": NODES},
                    },
                    # 同步到索引2
                    {
                        "es": {"action": "upsert", "index": "索引2", "type": "类型2", "nodes": NODES},
                    }
                ]
            }
        ]
    },
    ...
]

另一种是把同一个表产生的数据经过不同的 pipeline 同步到不同的索引:

...
TASKS = [
    {
        "stream": {...},
        "jobs": [
            {
                "actions": {...},
                "pipeline": [...],  # 对数据经过一系列处理
                "dest": {"es": {"index": "索引1", ...}}  # 同步到索引1
            },
            {
                "actions": {...},
                "pipeline": [...],  # 与上面的pipeline不同
                "dest": {"es": {"index": "索引2", ...}}  # 同步到索引2
            }
        ]
    }
]
  • TASKS 中的每一项对应一张要同步的表。
  • jobs 中的每一项对应对一条记录的一种处理方式。
  • dest 中的每一项对应一个es索引类型。

只同步某些字段

对每条来自 Mysql 的 记录的处理都在 pipeline 中进行处理。

"pipeline": [
    {"only_fields": {"fields": ["id", "name"]}},  # 只同步 id 和 name字段
    {"set_id": {"field": "id"}}  # 然后设置 id 字段为es中文档的_id
]

字段重命名

对于 Mysql 中的字段名和 elasticsearch 中的域名不一致的情况:

"pipeline": [
    # 将name重命名为name1,age 重命名为age1
    {"replace_fields": {"name": ["name1"], "age": ["age1"]}},
    {"set_id": {"field": "id"}}
]

pipeline 会依次执行处理函数,上面的例子等价于:

"pipeline": [
    # 先重命名 name 为 name1
    {"replace_fields": {"name": ["name1"]}},
    # 再重命名 age 为 age1
    {"replace_fields": {"age": ["age1"]}},
    {"set_id": {"field": "id"}}
]

还有一种特殊情形,es 中两个字段存相同的数据,但是分词方式不同。

例如 name_default 的分析器为 defaultname_raw 设置为不分词,需要将 name 的值同时同步到这两个域:

"pipeline": [
    {"replace_fields": {"name": ["name_default", "name_raw"]}},
    {"set_id": {"field": "id"}}
]

当然上述问题有一个更好的解决方案,在 esmappings 中配置 name 字段的 fields 属性即可,这超出了本文档的内容。

切分字符串为数组

有时 Mysql 存储字符串类似:"aaa|bbb|ccc",希望转化成数组: ["aaa", "bbb", "ccc"] 再进行同步

"pipeline": [
    # tags 存储类似"aaa|bbb|ccc"的字符串,将 tags 字段的值按符号 `|` 切分成数组
    {"split": {"field": "tags", "flag": "|"}},
    {"set_id": {"field": "id"}}
] 

同步删除文档

只有 binlog 同步 能实现删除 elasticsearch 中的文档,配置如下:

TASKS = [
    {
        "stream": {
            "database": "test_db",
            "table": "person"
        },
        "jobs": [
            # 插入、更新
            {
                "actions": ["insert", "update"],
                "pipeline": [
                    {"set_id": {"field": "id"}}  # 设置 id 字段的值为 es 中文档 _id
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        ...
                    }
                }
            },
            # 重点在这里,配置删除
            {
                "actions": ["delete"],  # 当读取到 binlog 中该表的删除操作时
                "pipeline": [{"set_id": {"field": "id"}}],  # 要删除的文档 _id
                "dest": {
                    "es": {
                        "action": "delete",  # 在 es 中执行删除操作
                        ...  # 与上面的 index 和 type 相同
                    }
                }
            }
        ]
    },
    ...
]

更多示例正在更新

常见问题

为什么我的增量同步不及时?

  1. 连接本地数据库增量同步不及时

    该情况暂未收到过反馈,如能复现请联系作者。

  2. 连接线上数据库发现增量同步不及时

    2.1 推荐使用内网IP连接数据库。连接线上数据库(如开启在阿里、腾讯服务器上的Mysql)时,推荐使用内网IP地址,因为外网IP会受到带宽等限制导致获取binlog数据速度受限,最终可能造成同步延时。

待改进

  1. 据部分用户反馈,全量同步百万级以上的数据性能不佳。

未完待续

文档近期会较频繁更新,任何问题、建议都收到欢迎,请在issues留言,会在24小时内回复;或联系QQ、微信: 358807551;

mysql协议解析扩展

Beatsggg 发表了文章 • 5 个评论 • 3046 次浏览 • 2018-05-08 15:40 • 来自相关话题

elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。
elastic/beats项目中支持mysql协议的解析,但实际使用过程中发现不支持预编译和压缩通信协议的解析,所以扩展了预编译SQL和压缩通信协议的支持,目前已稳定运行在生产环境,所有SQL都能完美解析,已提交PR,有相同需求的同学可参考。

一个把数据从MySQL同步到Elasticsearch的工具

Elasticsearchwindfarer 发表了文章 • 2 个评论 • 9674 次浏览 • 2016-01-13 16:34 • 来自相关话题

https://github.com/zhongbiaode ... -sync 这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。 最近刚刚更新了中文文档。
https://github.com/zhongbiaode ... -sync 这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。 最近刚刚更新了中文文档。