如同磁铁吸引四周的铁粉,热情也能吸引周围的人,改变周围的情况。

ES重索引(reindex)时如何不停止写入服务(业务存在少量物理删除文档操作)?

Elasticsearch | 作者 the_best | 发布于2018年06月02日 | 阅读数:15730

当索引比较大,reindex操作是比较耗时的,如果不停止写入服务,这个过程会有热数据(包括文档的删除)流入老索引中,ES的索引别名在一定程度上实现零停机。但是当存在物理删除操作时(reindex无法识别已删除的文档),由于业务上实时性要求较高,如何保证重索引过程中在不停止写服务的情况下,保证新老索引间数据的一致性?由于我们是使用数据库的日志(如binlog、oplog)来同步数据写入到ES,除了重置日志的消费位点,重放一遍reindex期间的文档操作(但这样还是一定程度上会影响实时性),不知有没有更好的方法?望不吝赐教~
已邀请:

kennywu76 - Wood

赞同来自: the_best

ES的reindex在索引有实时的update/delete的情况下,即使借助alias,也没有办法实现真正的zero down time。  增加新文档比较好办,通过alias切换写入到新索引,同时reindex做旧->新索引的数据传输即可。 但是update/delete操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。
 
我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似last_update_time字段记录文档最后更新的时间,用作写入ES文档的版本号。 然后数据写入新索引的时候,url里带上下面这样的参数:
version_type=external_gt&version=xxxxxx
其中version_type=external_gt表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功, 否则会抛版本冲突的异常。   另外delete操作都要转换成index操作,index的内容可以是一个空文档。
 
这样实时数据写入新索引和reindex可以同时进行。 实时写入的数据应该具有更高的版本,总是能够成功。  reindex如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。 
 
该方案的缺陷:
1. 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改。
2. delete操作必须转化为写入一个空文档。 delete实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了segment merge,  delete可能会被合并以后物理清除。 这样delete和对应的版本信息丢失,之后reindex如果写入了旧版本的文档,仍然会有一致性问题。  但是空文档会增加索引文件的大小,有额外的消耗。  一个可能的缓解办法是在reindex全部做完以后,再做一次空文档的删除。

laoyang360 - 《一本书讲透Elasticsearch》作者,Elastic认证工程师 [死磕Elasitcsearch]知识星球地址:http://t.cn/RmwM3N9;微信公众号:铭毅天下; 博客:https://elastic.blog.csdn.net

赞同来自:

1.ES是非实时的。Elasticsearch中,Index的实时性是由refresh控制的,默认是1s,最快可到100ms,那么也就意味着Index doc成功后,需要等待一秒钟后才可以被搜索到。
2.reindex是耗费性能的。借助:scroll+bulk实现。
优化建议:
 
{
"source": {
"index": "foo",
"size": 5000 <---- batch size 这里在机器资源允许的条件下,搞大点
},
"dest": {
"index": "bar"
}
}

 
参考讨论:https://stackoverflow.com/ques ... earch
https://discuss.elastic.co/t/r ... 52949

hw_cloudsearch - 云Elasticsearch:https://www.huaweicloud.com/product/es.html

赞同来自:

你们从binlog同步数据到ES是怎么同步的?为什么业务删除数据,不能增量同步到ES?

typuc - 80后IT男,乒乓球爱好者

赞同来自:

配置别名试试。新老索引配置相同别名,业务第一次上线,写入新索引,然后通过别名查询;然后导出旧索引到新索引;移除老索引别名。 这样可能在导入数据期间查询返回相同的2条数据。

HelloClyde

赞同来自:

使用kafka做增量就可以了
 
输入源 --> kafka --> es
 
1.reindex前先记录kafka偏移
2.reindex v1索引到v2
3.别名切到v2
4.reindex完kafka seek到reindex前记录的offset
5.等待增量同步完成

hufuman

赞同来自:

别名绑定在新老两个索引上,新数据和更新都写到新索引上

fanmo3yuan

赞同来自:

有个方案,需要两个前提:
1. 业务支持双写
2. 删除操作转为index/update操作,逻辑删除或者是空doc
 
先对新老集群进行双写,然后使用reindex同步数据,reindex需要配置op_type:create,这样如果新集群已经有了某个doc,则不使用老集群中的数据覆盖,由于delete变为了update,那么doc本身还是存在的,即使增量操作中删除了某个doc,reindex也不会使用老集群中的doc覆盖。
 
在集群迁移完成后,可以统一处理delete的文档

要回复问题请先登录注册