bulk update 重复的文档id 导致更新性能下降?

ES 5.4 bulk update 5000数据,这批数据里面很多更新是重复的文档id,导致性能下降(不停创建segment ),请问有什么办法解决这个问题?
比如我的数据格式是
{id:1,name:aaa}
{id:1,name:bbb}
{id:1,name:ccc}
{id:2,name:aaa}
{id:2,name:bbb}
{id:2,name:ccc}
已邀请:

kennywu76 - wood@Ctrip

赞同来自: famoss Cheetah 240475588 ningbohezhijun

@240475588 终于找到答案了。
 
update操作是分为两个步骤进行,即先根据文档ID做一次GET,得到旧版本的文档,然后在其基础上做更新再写回去,问题就出在这个GET上面。
 
在 core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java 这个类里面,get函数会根据一个realtime参数(默认是true),决定如何拿到文档。 
public GetResult get(Get get, Function<String, Searcher> searcherFactory, LongConsumer onRefresh) throws EngineException {
assert Objects.equals(get.uid().field(), uidField) : get.uid().field();
try (ReleasableLock lock = readLock.acquire()) {
ensureOpen();
if (get.realtime()) {
VersionValue versionValue = versionMap.getUnderLock(get.uid());
if (versionValue != null) {
if (versionValue.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (get.versionType().isVersionConflictForReads(versionValue.getVersion(), get.version())) {
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
get.versionType().explainConflictForReads(versionValue.getVersion(), get.version()));
}
long time = System.nanoTime();
refresh("realtime_get");
onRefresh.accept(System.nanoTime() - time);
}
}

// no version, get the version from the index, we know that we refresh on flush
return getFromSearcher(get, searcherFactory);
}

 
可以看到realtime参数决定了GET到的数据的实施性。 如果设置为false,就从searcher里面拿,而searcher只能访问refresh过的数据,即刚写入的数据存在于index writter buffer里,暂时无法搜索到,所以这种方式拿到的数据是准实时的。 而默认的realtime:true,则决定了获取到的数据需要是实时的,也就是说需要也能够访问到writter buffer里的数据。  所以,中间有一个 refresh("realtime_get"); 的函数调用,这个函数调用会检查,GET的doc id是否都是可以被搜索到。 如果已经写入了但无法搜索,也就是写入到writter buffer里这种情况,就会强制执行一次refresh操作,让数据可以被searcher检索到,保证之后从getFromSearcher调用拿的是完全实时的数据。
 
实际上测试下来,也是这样,关闭自动刷新,写入一条文档,然后对该文档ID执行一个GET操作,就会看到有一个新的segment生成。 默认的realtime GET 强制刷新后,生成了新的segment file。
 
查了下文档,GET api调用时,url里可以带可选参数realtime=false,关闭实时功能: https://www.elastic.co/guide/e ... ltime
 
然而,不幸的是,update API的文档和源码都没有提供一个“禁用”实时性的参数。 update对GET的调用,传入的realtime是写死为true的。
 
至于为什么update一定需要实时GET,想了一下,是因为update允许对文档做部分字段更新。如果有2个请求分别更新了不同的字段, 可能先更新的数据只在writter buffer里,searcher里看不到,那后面的更新还是在老版本文档上做的,造成部分更新丢失。 
 
另外一个问题,为啥之前的版本不是这种行为?  看了下2.4的GET方法源码,其没有采用refresh的方式让数据实时,而是直接访问的translog来保证GET的实时性。官方在这个变更里 https://github.com/elastic/ela ... 20102   将其更新方式改为了refresh。理由是之前ES里有很多地方用translog维护数据的位置,使得很多操作变得很慢,去掉对translog的依赖可以提高性能。
 
但是很不幸,这个更改,对于短时间反复大量更新相同doc id的操作,会因为过于频繁的refresh短时间生成很多小segment,继而不断做短合产生性能损耗。 从上面链接里的讨论看,官方认为,在提升大多数应用场景性能情况下,对于这种较少见的场景下的性能损失是值得的,应该在应用层面解决。
 
因此,对于你的特定问题,要么优化应用数据架构,在应用层面合并相同doc id的更新,要么只能使用ES 2.x了。
 
 

kennywu76 - wood@Ctrip

赞同来自: huzhoahui168

bulk更新文档用的什么方法?   index/create/update? 

kennywu76 - wood@Ctrip

赞同来自: 240475588

我用python测试了一下,能够复现你提到的问题。 测试过程中,观察到一些现象可能可以说明为什么这种测试方法会导致更新比较慢。
 
测试步骤:
1.  设置一个空的索引,设置一个shard,关闭refresh, 即设置refresh_interval=-1
PUT test
{
"settings": {
"index.number_of_shards": 1,
"refresh_interval": "-1"
},
"mappings": {
"en": {
"properties": {
"content": {
"type": "keyword"
}
}
}
}
}

2. 用python脚本写入了1万条测试数据, refresh过后,看到生成了一个包含1万条数据的segment:
xgwu@Kennys-Mac:~|⇒  python index.py
Performed 10000 actions
index shard prirep ip        segment generation docs.count docs.deleted    size size.memory committed searchable version compound
test 0 p 127.0.0.1 _0 0 10000 0 133.4kb 3086 false true 6.5.0 true
3. 用如下python脚本,循环10次,每次更新1000条同样的数据:
  1 # coding=utf-8
2 from elasticsearch import Elasticsearch
3 from elasticsearch.helpers import bulk
4 import time
5 def bulk_update(es, index_name="test", doc_type_name="en"):
6 for i in range(10):
7 ACTIONS=
8 for j in range(1000):
9 action = {
10 "_index": index_name,
11 "_type": doc_type_name,
12 "_id": j,
13 "_op_type": "update",
14 "detect_noop": "false",
15 "doc": {
16 "content":"aaa" }
17 }
18 ACTIONS.append(action)
19 #ACTIONS.append(action)
20
21 start = time.time()
22 success, _ = bulk(es, ACTIONS, index=index_name, raise_on_error=True)
23 end = time.time()
24 print('Performed %d actions in %.03f s' % (success,(end-start)))
25
26 if __name__ == '__main__':
27 es = Elasticsearch(hosts=["localhost:9200"], timeout=30000)
28 bulk_update(es)
写入速度非常快:
xgwu@Kennys-Mac:~|⇒  python update.py
Performed 1000 actions in 0.103 s
Performed 1000 actions in 0.102 s
Performed 1000 actions in 0.100 s
Performed 1000 actions in 0.109 s
Performed 1000 actions in 0.102 s
Performed 1000 actions in 0.112 s
Performed 1000 actions in 0.102 s
Performed 1000 actions in 0.110 s
Performed 1000 actions in 0.105 s
Performed 1000 actions in 0.115 s
观察到这时候,即使refresh是关闭的,上面的bulk update操作仍然生成了新的segment
index shard prirep ip        segment generation docs.count docs.deleted    size size.memory committed searchable version compound
test 0 p 127.0.0.1 _0 0 9000 1000 133.4kb 3086 false true 6.5.0 true
test 0 p 127.0.0.1 _9 9 1000 0 16.9kb 1847 false true 6.5.0 true










其他观察到的重要信息:
1) 之前包含1万条文档的segment, 现在docs.count变成了9000, 其中1000被标记为delete。而新生成的segment docs.count是1000,也就是更新操作都写到了这个segment里面。这个符合预期。
2) 新生成的segment,  generation是9, 意味着程序里10次bulk操作,每次都生成了一个新的segment,但因为每次生成的中间segment文件内容完全相同,只有最新的generation保留下来,中间状态的删除掉了。
 
现在模拟产生问题的操作,也就是拼接bulk操作的时候,同一个action追加2次
ACTIONS.append(action)
ACTIONS.append(action)

复现了更新操作非常缓慢:
xgwu@Kennys-Mac:~|⇒  python update.py
Performed 2000 actions in 9.300 s
Performed 2000 actions in 9.395 s
Performed 2000 actions in 9.708 s
Performed 2000 actions in 8.950 s
Performed 2000 actions in 8.558 s
Performed 2000 actions in 8.689 s
Performed 2000 actions in 8.369 s
Performed 2000 actions in 8.811 s
Performed 2000 actions in 8.642 s
Performed 2000 actions in 8.452 s

 
同时观察到,不断有新的只包含一个文档的segment生成,generation顺序加1.
index shard prirep ip        segment generation docs.count docs.deleted    size size.memory committed searchable version compound
test 0 p 127.0.0.1 _0 0 10000 1000 134.7kb 0 true false 6.5.0 true
test 0 p 127.0.0.1 _a 10 1000 0 16.9kb 0 true false 6.5.0 true
test 0 p 127.0.0.1 _og 880 9988 12 135.9kb 3134 false true 6.5.0 false
test 0 p 127.0.0.1 _oh 881 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _oi 882 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _oj 883 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _ok 884 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _ol 885 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _om 886 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _on 887 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _oo 888 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _op 889 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _or 891 1 1 2.9kb 1737 false true 6.5.0 true
test 0 p 127.0.0.1 _os 892 2 0 2.9kb 1737 false true 6.5.0 true

 
反复查看segment生成情况,会看到这些包含1个文档的segment, docs.deleted也是1,都只是中间状态,随着不断写入,会被合并掉。 另外,这些segment的commit状态都还是false,也就是并未fsync到磁盘,只是存在于文件缓冲区里。 上面测试过程中,也能观察到cpu利用率明显增高。
 
问题看起来和测试方法有关,因为在一个bulk request里加入了1000对同样doc_id的文档,并且都是相邻的。而看起来ES处理update的方式是将已经包含该doc id的segment seal起来,并将其中旧版本的doc标记为deleted。 这样包含大量两两doc id相同的bulk request就造成不断的生成包含2个doc的新segment,然后并且标记其中一个doc为deleted,反复循环。猜测这是ES解决并发更新冲突的一种机制,具体是否是这样还需要再研究底层才能知晓。

实际的应用场景里,大量短时间并发对同一个doc_id更新的情况应该不是很常见,应该不会有什么影响。其实想想如果是数据库应用,大量并发更新同一条记录,也会导致大量的行级锁产生,性能也会比较差。

huzhoahui168

赞同来自:

目前我想到的就是把这5000条数据去重,{id:1,name:ccc} {id:2,name:ccc}(通过更新时间保留时间最大的name ccc),当然这按自己条件是可以过滤掉的,不知道ES还有没其它的方法。

Cheetah

赞同来自:

肯定是你自己去重最好啊,ES对于更新数据都是先标注删除,再新增,自己去重吧

huzhoahui168

赞同来自:

我测试的结果是一批数据5000更新需要30-40S(文档id有重复),如果是没有重复文档id 更新5000数据只要6-8S

白衬衣 - 金桥

赞同来自:

wood叔威武。

要回复问题请先登录注册