你可以的,加油

elasticsearch 已死,但是 subsys 被锁

rochy 回复了问题 • 3 人关注 • 1 个回复 • 2246 次浏览 • 2018-08-14 20:32 • 来自相关话题

QueryString的问题

rochy 回复了问题 • 4 人关注 • 3 个回复 • 2612 次浏览 • 2018-08-14 20:28 • 来自相关话题

elasticsearch 尽可能少的遍历index得到查询结果

rochy 回复了问题 • 3 人关注 • 2 个回复 • 1469 次浏览 • 2018-08-14 20:23 • 来自相关话题

bulk传输的数据多少合适?目前es里有很多bulk,每个执行几百毫秒左右,大量的bulk占用了线程池。

rochy 回复了问题 • 4 人关注 • 2 个回复 • 6551 次浏览 • 2018-08-14 20:22 • 来自相关话题

推荐一个同步Mysql数据到Elasticsearch的工具

MCTW 发表了文章 • 13 个评论 • 29765 次浏览 • 2018-08-14 15:47 • 来自相关话题

把Mysql的数据同步到Elasticsearch是个很常见的需求,但在Github里找到的同步工具用起来或多或少都有些别扭。
例如:某记录内容为"aaa|bbb|ccc",将其按|分割成数组同步到es,这样的简单任务都难以实现,再加上配置繁琐,文档语焉不详...
所以我写了个同步工具MysqlsMom:力求用最简单的配置完成复杂的同步任务。目前除了我所在的部门,也有越来越多的互联网公司在生产环境中使用该工具了。
欢迎各位大佬进行试用并提出意见,任何建议、鼓励、批评都受到欢迎。
github: https://github.com/m358807551/mysqlsmom
![Alt text](https://github.com/m358807551/ ... 3Dtrue)

简介:同步 Mysql 数据到 elasticsearch 的工具;
QQ、微信:358807551

特点


  1. Python 编写;
  2. 支持基于 sql 语句的全量同步,基于 binlog 的增量同步,基于更新字段的增量同步三种同步方式;
  3. 全量更新只占用少量内存;支持通过sql语句同步数据;
  4. 增量更新自动断点续传;
  5. 取自 Mysql 的数据可经过一系列自定义函数的处理后再同步至 Elasticsearch
  6. 能用非常简单的配置完成复杂的同步任务;

    环境


    • python2.7;
    • 增量同步需开启 redis
    • 分析 binlog 的增量同步需要 Mysql 开启 binlogbinlog-format=row);

      快速开始


      全量同步MySql数据到es


  7. clone 项目到本地;

  8. 安装依赖;

    <br /> cd mysqlsmom<br /> pip install -r requirements.txt<br />

    默认支持 elasticsearch-2.4版本,支持其它版本请运行(将5.4换成需要的elasticsearch版本)

    <br /> pip install --upgrade elasticsearch==5.4<br />

  9. 编辑 ./config/example_init.py,按注释提示修改配置;

    ```python

    coding=utf-8


    STREAM = "INIT"

    修改数据库连接

    CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': ''
    }

    修改elasticsearch节点

    NODES = [{"host": "127.0.0.1", "port": 9200}]

    TASKS = [
    {
    "stream": {
    "database": "test_db", # 在此数据库执行sql语句
    "sql": "select * from person" # 将该sql语句选中的数据同步到 elasticsearch
    },
    "jobs": [
    {
    "actions": ["insert", "update"],
    "pipeline": [
    {"set_id": {"field": "id"}} # 默认设置 id字段的值 为elasticsearch中的文档id
    ],
    "dest": {
    "es": {
    "action": "upsert",
    "index": "test_index", # 设置 index
    "type": "test", # 设置 type
    "nodes": NODES
    }
    }
    }
    ]
    }
    ]
    ```

  10. 运行

    <br /> cd mysqlsmom<br /> python mysqlsmom.py ./config/example_init.py<br />

    等待同步完成即可;

    分析 binlog 的增量同步


  11. 确保要增量同步的MySql数据库开启binlog,且开启redis(为了存储最后一次读到的binlog文件名及读到的位置。未来可能支持本地文件存储该信息。)

  12. 下载项目到本地,且安装好依赖后,编辑 ./config/example_init.py,按注释提示修改配置;

    ```python

    coding=utf-8


    STREAM = "BINLOG"
    SERVER_ID = 99 # 确保每个用于binlog同步的配置文件的SERVER_ID不同;
    SLAVE_UUID = name

    配置开启binlog权限的MySql连接

    BINLOG_CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': ''
    }

    配置es节点

    NODES = [{"host": "127.0.0.1", "port": 9200}]

    TASKS = [
    {
    "stream": {
    "database": "test_db", # [table]所在的数据库
    "table": "person" # 监控该表的binlog
    },
    "jobs": [
    {
    "actions": ["insert", "update"],
    "pipeline": [
    {"only_fields": {"fields": ["id", "name", "age"]}}, # 只同步这些字段到es,注释掉该行则同步全部字段的值到es
    {"set_id": {"field": "id"}} # 设置es中文档_id的值取自 id(或根据需要更改)字段
    ],
    "dest": {
    "es": {
    "action": "upsert",
    "index": "test_index", # 设置 index
    "type": "test", # 设置 type
    "nodes": NODES
    }
    }
    }
    ]
    }
    ]
    ```

  13. 运行

    shell<br /> cd mysqlsmom<br /> python mysqlsmom.py ./config/example_binlog.py<br />

    该进程会一直运行,实时同步新增和更改后的数据到elasticsearch;

    注意:第一次运行该进程时不会同步MySql中已存在的数据,从第二次运行开始,将接着上次同步停止时的位置继续同步;

    同步旧数据请看全量同步MySql数据到es

    基于更新时间的增量同步


    Mysql 表中有类似 update_time 的时间字段,且在每次插入、更新数据后将该字段的值设置为操作时间,则可在不用开启 binlog 的情况下进行增量同步。

  14. 下载项目到本地,且安装好依赖后,编辑 ./config/example_cron.py,按注释提示修改配置;

    ```python

    coding=utf-8


    STREAM = "CRON"

    修改数据库连接

    CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': ''
    }

    redis存储上次同步时间等信息

    REDIS = {
    "host": "127.0.0.1",
    "port": 6379,
    "db": 0,
    "password": "password", # 不需要密码则注释或删掉该行
    }

    一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1

    BULK_SIZE = 1

    修改elasticsearch节点

    NODES = [{"host": "127.0.0.1", "port": 9200}]

    TASKS = [
    {
    "stream": {
    "database": "test_db", # 在此数据库执行sql语句
    "sql": "select id, name from person where update_time >= ?", # 将该sql语句选中的数据同步到 elasticsearch
    "seconds": 10, # 每隔 seconds 秒同步一次,
    "init_time": "2018-08-15 18:05:47" # 只有第一次同步会加载
    },
    "jobs": [
    {
    "pipeline": [
    {"set_id": {"field": "id"}} # 默认设置 id字段的值 为 es 中的文档id
    ],
    "dest": {
    "es": {
    "action": "upsert",
    "index": "test_index", # 设置 index
    "type": "test" # 设置 type
    }
    }
    }
    ]
    }
    ]
    ```

  15. 运行

    shell<br /> cd mysqlsmom<br /> python mysqlsmom.py ./config/example_cron.py<br />

    组织架构

    ![Alt text](https://github.com/m358807551/ ... 3Dtrue)

    Mysqlsmom 使用实战


    Mysqlsmom 的灵活性依赖于:

    • row_handlers.py 中添加自定义函数对取自Mysql的数据进行二次加工。
    • row_filters.py 中添加自定义函数决定是否要同步某一条数据。
    • config/ 目录下的任意配置文件应用上面的函数。

      如果不了解 Python 也没关系,上述两个文件中自带的函数足以应付大多数种情况,遇到特殊的同步需求可以在 Github 发起 issue 或通过微信、QQ联系作者。

      同步多张表


      在一个配置文件中即可完成:

      ```python
      ...
      TASKS = [

      同步表1

      {
      "stream": {
      "database": "数据库名1",
      "table": "表名1"
      },
      "jobs": [...]
      }

      同步表2

      {
      "stream": {
      "database": "数据库名2",
      "table": "表名2"
      },
      "jobs": [...]
      }
      ]
      ```

      一个 Mysql Connection 对应一个配置文件。

      一张表同步到多个索引


      分为两种情况。

      一种是把相同的数据同步到不同的索引,配置如下:

      ```python
      ...
      TASKS = [
      {
      "stream": {...},
      "jobs": [
      {
      "actions": [...],
      "pipeline": [...],
      "dest": [

      同步到索引1

              {<br />
                  "es": {"action": "upsert", "index": "索引1", "type": "类型1", "nodes": NODES},<br />
              },<br />
              # 同步到索引2<br />
              {<br />
                  "es": {"action": "upsert", "index": "索引2", "type": "类型2", "nodes": NODES},<br />
              }<br />
          ]<br />
      }<br />

      ]
      },
      ...
      ]
      <br /> <br /> 另一种是把同一个表产生的数据经过不同的 *pipeline* 同步到不同的索引:<br /> <br /> python
      ...
      TASKS = [
      {
      "stream": {...},
      "jobs": [
      {
      "actions": {...},
      "pipeline": [...], # 对数据经过一系列处理
      "dest": {"es": {"index": "索引1", ...}} # 同步到索引1
      },
      {
      "actions": {...},
      "pipeline": [...], # 与上面的pipeline不同
      "dest": {"es": {"index": "索引2", ...}} # 同步到索引2
      }
      ]
      }
      ]
      ```

    • TASKS 中的每一项对应一张要同步的表。
    • jobs 中的每一项对应对一条记录的一种处理方式。
    • dest 中的每一项对应一个es索引类型。

      只同步某些字段


      对每条来自 Mysql 的 记录的处理都在 pipeline 中进行处理。

      python<br /> "pipeline": [<br /> {"only_fields": {"fields": ["id", "name"]}}, # 只同步 id 和 name字段<br /> {"set_id": {"field": "id"}} # 然后设置 id 字段为es中文档的_id<br /> ]<br />

      字段重命名


      对于 Mysql 中的字段名和 elasticsearch 中的域名不一致的情况:

      ```python
      "pipeline": [

      将name重命名为name1,age 重命名为age1

      {"replace_fields": {"name": ["name1"], "age": ["age1"]}},
      {"set_id": {"field": "id"}}
      ]
      <br /> <br /> *pipeline* 会依次执行处理函数,上面的例子等价于:<br /> <br /> python
      "pipeline": [

      先重命名 name 为 name1

      {"replace_fields": {"name": ["name1"]}},

      再重命名 age 为 age1

      {"replace_fields": {"age": ["age1"]}},
      {"set_id": {"field": "id"}}
      ]
      <br /> <br /> 还有一种特殊情形,es 中两个字段存相同的数据,但是分词方式不同。<br /> <br /> 例如 *name_default* 的分析器为 *default*,*name_raw* 设置为不分词,需要将 *name* 的值同时同步到这两个域:<br /> <br /> python
      "pipeline": [
      {"replace_fields": {"name": ["name_default", "name_raw"]}},
      {"set_id": {"field": "id"}}
      ]
      ```

      当然上述问题有一个更好的解决方案,在 esmappings 中配置 name 字段的 fields 属性即可,这超出了本文档的内容。

      切分字符串为数组


      有时 Mysql 存储字符串类似:"aaa|bbb|ccc",希望转化成数组: ["aaa", "bbb", "ccc"] 再进行同步

      ```python
      "pipeline": [

      tags 存储类似"aaa|bbb|ccc"的字符串,将 tags 字段的值按符号 | 切分成数组

      {"split": {"field": "tags", "flag": "|"}},
      {"set_id": {"field": "id"}}
      ]
      ```

      同步删除文档


      只有 binlog 同步 能实现删除 elasticsearch 中的文档,配置如下:

      ```python
      TASKS = [
      {
      "stream": {
      "database": "test_db",
      "table": "person"
      },
      "jobs": [

      插入、更新

      {<br />
          "actions": ["insert", "update"],<br />
          "pipeline": [<br />
              {"set_id": {"field": "id"}}  # 设置 id 字段的值为 es 中文档 _id<br />
          ],<br />
          "dest": {<br />
              "es": {<br />
                  "action": "upsert",<br />
                  ...<br />
              }<br />
          }<br />
      },<br />
      # 重点在这里,配置删除<br />
      {<br />
          "actions": ["delete"],  # 当读取到 binlog 中该表的删除操作时<br />
          "pipeline": [{"set_id": {"field": "id"}}],  # 要删除的文档 _id<br />
          "dest": {<br />
              "es": {<br />
                  "action": "delete",  # 在 es 中执行删除操作<br />
                  ...  # 与上面的 index 和 type 相同<br />
              }<br />
          }<br />
      }<br />

      ]
      },
      ...
      ]
      ```



      更多示例正在更新


      常见问题


      为什么我的增量同步不及时?


  16. 连接本地数据库增量同步不及时

    该情况暂未收到过反馈,如能复现请联系作者。

  17. 连接线上数据库发现增量同步不及时

    2.1 推荐使用内网IP连接数据库。连接线上数据库(如开启在阿里、腾讯服务器上的Mysql)时,推荐使用内网IP地址,因为外网IP会受到带宽等限制导致获取binlog数据速度受限,最终可能造成同步延时。

    待改进


  18. 据部分用户反馈,全量同步百万级以上的数据性能不佳。

    未完待续


    文档近期会较频繁更新,任何问题、建议都收到欢迎,请在issues留言,会在24小时内回复;或联系QQ、微信: 358807551;

elasticsearch 已死,但是 subsys 被锁

wajika 回复了问题 • 2 人关注 • 3 个回复 • 3319 次浏览 • 2018-08-14 14:04 • 来自相关话题

elastic 6.3 源码 导入 eclipse 没有_core. 怎么运行?

God_lockin 回复了问题 • 2 人关注 • 1 个回复 • 4259 次浏览 • 2018-08-14 10:44 • 来自相关话题

failed to execute bulk item

rochy 回复了问题 • 2 人关注 • 1 个回复 • 6651 次浏览 • 2018-08-14 09:18 • 来自相关话题

elasticsearch 之 LockObtainFailedException

rochy 回复了问题 • 2 人关注 • 1 个回复 • 1609 次浏览 • 2018-08-14 08:47 • 来自相关话题

es集群删除某个索引,会导致relocate吗

laoyang360 回复了问题 • 4 人关注 • 1 个回复 • 1021 次浏览 • 2018-08-13 23:19 • 来自相关话题

Mmap fs可能让大索引访问变得缓慢

kennywu76 发表了文章 • 2 个评论 • 6606 次浏览 • 2018-08-13 17:52 • 来自相关话题

在一年多以前,我写过Elasticsearch 5 入坑指南一文,其中提到将生产的某个ES集群从2.4升级到5.0以后, 冷数据结点搜索性能变差,对大索引进行搜索的时候,io read会长时间飙高,导致系统load很重,甚至到无法响应的程度。
 
通过进一步分析,用Linux下的Sar -B命令,可以看到有大量的数据被pagein到内存。 虽然通过“试”的方法,定位到这个问题和5.0开始使用的mmap fs有关联,并且通过更改为nio fs以后得到解决,但问题的底层根源一直没找到。 
 
近期有空重新去看了一下这个问题, 在Github上发现一个对os底层更熟悉的人提交并分析了类似的问题 Avoid file cache trashing on Linux with mmapfs by using madvise 。  细读之后,感觉该文抓到了问题的本质,以下基于该文做个总结:
  1. mmap fs对比nio fs,省去了磁盘io上的系统调用,并且不需要在jvm内部做io缓存,也减轻了GC压力。 所以通常来说,mmapfs的性能应该更高。 这也是为什么lucene推荐使用mmap fs,并且ES从5.0开始做为默认的store配置的原因。
  2. 然而,mmap系统调用,在内核层面默认会有一个2MB的预读大小设置,也就是说,当映射了一个大文件以后,即使读取其中1k个字节,mmap也会预读取2MB的数据到缓存。 这种策略是基于文件的访问大多数是顺序的假设。
  3. 在ES这个特定的应用场景,如果某数据结点上索引不是很大,系统剩余缓存也足够,一般不会有问题。但是如果是大数据应用场景,典型的如海量的日志ELK应用,则可能对大索引的搜索聚合,产生较多的随机磁盘访问。 从而mmap的预读策略,可能会导致大量的无用数据从磁盘读取到系统缓存。 在系统可用的缓存不是非常宽裕的情况下,某些极端场景下,会导致热数据被过于频繁的踢出内存,再反复读入,让磁盘IO不堪重负。
  4. Lucene有一个NativePosixUtil.madvise(buffer,NativePosixUtil.RANDOM)的native调用,可以用于指导内核对mmap过的文件做读取的时候,禁用预读。 上文作者将该调用hack进lucene代码,做搜索对比测试。 结论是对于磁盘io和cache的消耗,niofs都要好于mmapfs,而patch过的mmapfs则比niofs更好。
  5. 作者的测试仅限于搜索,对于其他类型的io操作,如写入,merge没有做过详尽测试,因此不清楚利弊。
  6. ES官方开发人员认为这是一个有趣的发现,值得深入去探究。对于用户报告的mmap fs性能比nio fs更差的问题,猜测可能是在大索引读取的场景下,预读带来的额外开销,抵消了相对niofs节省的系统调用等开销。 
  7. ES官方提到Lucene已经有一种类似功能的store,叫做NativeUnixDirectory(显然ES目前还没有对这种store的支持),用户动手能力强的话,应该可以利用这个store自己写一个ES plugin。 另外提到JAVA 10提供了O_DIRECT to streams / channels ,似乎官方打算等这个出来以后再看怎么处理这个问题。
  8. 要注意,这个预读是mmap层面的,和块设备的预读是两回事。 我们曾经尝试过使用 blockdev --setra 这个linux命令取消块设备预读,证实无法解决这个问题。

 
结论: 如果ES结点上会存放海量的索引数据,经常会有大索引(如1TB+)的搜索聚合操作,使用NIOFS会更安全,可以避免很多怪异的性能问题。
 

ElasticSearch如何开发自己的Tokenzer 和TokenizerFilter

novia 回复了问题 • 6 人关注 • 4 个回复 • 1790 次浏览 • 2018-08-13 17:37 • 来自相关话题

怎么删除包含中文字符的索引,如图:

rochy 回复了问题 • 3 人关注 • 2 个回复 • 2677 次浏览 • 2018-08-13 17:13 • 来自相关话题

kibana自动建了好多索引会自动删除吗?

18811051022zj 回复了问题 • 5 人关注 • 3 个回复 • 8629 次浏览 • 2018-08-13 13:54 • 来自相关话题

ES6.3.0英文分词问题

aimerwhy 回复了问题 • 5 人关注 • 4 个回复 • 4546 次浏览 • 2018-08-13 09:47 • 来自相关话题