es个别节点出现Out of Memory现象,是否由于段合并引起

es日志打印OOM现象,根据日志来看,是否因为写入数据过程中,发生段合并所导致,日志内容如下:
org.apache.lucene.store.AlreadyClosedException: this IndexWriter is closed
at org.apache.lucene.index.IndexWriter.ensureOpen(IndexWriter.java:719)
at org.apache.lucene.index.IndexWriter.ensureOpen(IndexWriter.java:733)
at org.apache.lucene.index.IndexWriter.ramBytesUsed(IndexWriter.java:474)
at org.elasticsearch.index.engine.InternalEngine.indexWriterRAMBytesUsed(InternalEngine.java:949)
at org.elasticsearch.index.shard.IndexShard.updateBufferSize(IndexShard.java:1077)
at org.elasticsearch.indices.memory.IndexingMemoryController.updateShardBuffers(IndexingMemoryController.java:232)
at org.elasticsearch.indices.memory.IndexingMemoryController$ShardsIndicesStatusChecker.run(IndexingMemoryController.java:286)
at org.elasticsearch.indices.memory.IndexingMemoryController.forceCheck(IndexingMemoryController.java:245)
at org.elasticsearch.index.shard.IndexShard.markLastWrite(IndexShard.java:990)
at org.elasticsearch.index.shard.IndexShard.index(IndexShard.java:564)
at org.elasticsearch.index.engine.Engine$Index.execute(Engine.java:836)
at org.elasticsearch.action.support.replication.TransportReplicationAction.executeIndexRequestOnPrimary(TransportReplicationAction.java:1073)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardIndexOperation(TransportShardBulkAction.java:338)
at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:131)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.performOnPrimary(TransportReplicationAction.java:579)
at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase$1.doRun(TransportReplicationAction.java:452)
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at org.apache.lucene.index.ConcurrentMergeScheduler.merge(ConcurrentMergeScheduler.java:517)
at org.apache.lucene.index.IndexWriter.maybeMerge(IndexWriter.java:1929)
at org.apache.lucene.index.IndexWriter.getReader(IndexWriter.java:454)
at org.apache.lucene.index.StandardDirectoryReader.doOpenFromWriter(StandardDirectoryReader.java:286)
at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:261)
at org.apache.lucene.index.StandardDirectoryReader.doOpenIfChanged(StandardDirectoryReader.java:251)
at org.apache.lucene.index.FilterDirectoryReader.doOpenIfChanged(FilterDirectoryReader.java:104)
at org.apache.lucene.index.DirectoryReader.openIfChanged(DirectoryReader.java:123)
at org.apache.lucene.search.SearcherManager.refreshIfNeeded(SearcherManager.java:137)
at org.apache.lucene.search.SearcherManager.refreshIfNeeded(SearcherManager.java:58)
at org.apache.lucene.search.ReferenceManager.doMaybeRefresh(ReferenceManager.java:176)
at org.apache.lucene.search.ReferenceManager.maybeRefreshBlocking(ReferenceManager.java:253)
at org.elasticsearch.index.engine.InternalEngine.refresh(InternalEngine.java:678)
at org.elasticsearch.index.shard.IndexShard.refresh(IndexShard.java:615)
at org.elasticsearch.index.shard.IndexShard$EngineRefresher$1.run(IndexShard.java:1255)
根据堆栈信息看,refresh过程会调用IndexWriter的maybeMerge,这个方法,开启一个ConcurrentMergeScheduler.merge操作,因为没有研究过es源码,这里是不是可以断定每次refresh操作都会去判断是否执行merge操作?es的版本是2.3.5
已邀请:

jianjianhe

赞同来自:

这里自己先初步回复一波,每次refresh操作完,执行getReader操作时,有一段会判断是否发生变化,如果是会执行merge操作,估计是将在segment的一些操作,通过段合并期间执行掉,因为es一些操作都是先标记,只有在段合并的时候才真正执行,代码如下:
 if (anyChanges) {
maybeMerge(config.getMergePolicy(), MergeTrigger.FULL_FLUSH, UNBOUNDED_MAX_MERGE_SEGMENTS);
}
anyChanges的赋值:anyChanges |= maybeApplyDeletes(applyAllDeletes);
applyAllDeletes代码传入值为true,maybeApplyDeletes方法如下:
  final synchronized boolean maybeApplyDeletes(boolean applyAllDeletes) throws IOException {
if (applyAllDeletes) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "apply all deletes during flush");
}
return applyAllDeletesAndUpdates();
} else if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "don't apply deletes now delTermCount=" + bufferedUpdatesStream.numTerms() + " bytesUsed=" + bufferedUpdatesStream.ramBytesUsed());
}

return false;
}
就是执行
DeletesAndUpdates操作,这个时候最后所以需要执行以下段合并操作。
这里还有两点疑问,希望能得到解答:
1.每次refresh到getReader都要不是是内存到文件缓存的过程吗?怎么会牵扯到segement?
2.如果期间没有delete和update操作,是不是还会去检测一遍,段合并条件不是达到一定量大小才会去执行吗?

jianjianhe

赞同来自:

自己再补充一下:
看了es官方文档,对refresh原话:
In Elasticsearch, this lightweight process of writing and opening a new segment is called a refresh.即在 Elasticsearch 中,写入和打开一个新段的轻量的过程叫做 refresh。,所以refresh操作会导致segment段数量增加,而段合并是es自动发生运行在后台的,段合并的时候会将那些旧的已删除文档 从文件系统中清除。官方原话:
1.While indexing, the refresh process creates new segments and opens them for search.
2.The merge process selects a few segments of similar size and merges them into a new bigger segment in the background. This does not interrupt indexing and searching.
 即在getReader方法中的applyAllDeletesAndUpdates()方法如果返回false,就不会执行maybeMerge操作,看看applyAllDeletesAndUpdates()方法:
final synchronized boolean applyAllDeletesAndUpdates() throws IOException {
flushDeletesCount.incrementAndGet();
final BufferedUpdatesStream.ApplyDeletesResult result;
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "now apply all deletes for all segments maxDoc=" + (docWriter.getNumDocs() + segmentInfos.totalMaxDoc()));
}
result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());
if (result.anyDeletes) {
checkpoint();
}
if (!keepFullyDeletedSegments && result.allDeleted != null) {
if (infoStream.isEnabled("IW")) {
infoStream.message("IW", "drop 100% deleted segments: " + segString(result.allDeleted));
}
for (SegmentCommitInfo info : result.allDeleted) {
// If a merge has already registered for this
// segment, we leave it in the readerPool; the
// merge will skip merging it and will then drop
// it once it's done:
if (!mergingSegments.contains(info)) {
segmentInfos.remove(info);
pendingNumDocs.addAndGet(-info.info.maxDoc());
readerPool.drop(info);
}
}
checkpoint();
}
bufferedUpdatesStream.prune(segmentInfos);
return result.anyDeletes;
}
其中anyDeletes的赋值在ApplyDeletesResult构造方法中,默认值为false:
  ApplyDeletesResult(boolean anyDeletes, long gen, List<SegmentCommitInfo> allDeleted) {
this.anyDeletes = anyDeletes;
this.gen = gen;
this.allDeleted = allDeleted;
}

result = bufferedUpdatesStream.applyDeletesAndUpdates(readerPool, segmentInfos.asList());

可能因为集群的refresh_interval值默认为1s,频繁的刷新导致了过多的segement段产生,所以在每次refresh操作,触发了段合并操作,但是个人认为段合并的执行不会导致发生OOM的现象发生吧?

要回复问题请先登录注册