愚者求师之过,智者从师之长。

我发现用bulkProcessor似乎有问题,重复add多次才能成功创建索引

Elasticsearch | 作者 aoeiuv | 发布于2017年12月04日 | 阅读数:9642

// 创建BulkPorcessor对象
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
System.out.println("---尝试操作" + paramBulkRequest.numberOfActions() + "条数据---");
}

// 执行出错时执行
public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
System.out.println("---尝试操作" + paramBulkRequest.numberOfActions() + "条数据成功---");
}

public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
System.out.println("---尝试操作" + paramBulkRequest.numberOfActions() + "条数据失败---");
}
})
// 1w次请求执行一次bulk
.setBulkActions(10000)
// 1gb的数据刷新一次bulk
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
// 固定5s必须刷新一次
.setFlushInterval(TimeValue.timeValueSeconds(5))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(1)
// 设置退避, 100ms后执行, 最大请求3次
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();

// 添加单次请求
Map<String, Object> m = new HashMap<>();
m.put("test2", "999999");
bulkProcessor.add(new IndexRequest("dw", "test", "1").source(m));
bulkProcessor.add(new DeleteRequest("dw", "test", "2"));

// 关闭
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
// 或者
bulkProcessor.close();

一次运行无法成功更新或者插入,而我在idea中使用alt+f8
执行这个bulkProcessor.add(new IndexRequest("dw", "test", "1").source(m));
,执行多次elasticsearch的索引version才发生变化。
我的运行是jdk1.8 es6.0
已邀请:

medcl - 今晚打老虎。

赞同来自:

每次添加完所有的文档手动 flush 一次呢?
bulkProcessor.flush();

novia - 1&0

赞同来自:

1w次请求执行一次bulk .setBulkActions(10000) // 1gb的数据刷新一次
bulk .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
 
你设置了flush buffer,所以到了buffer后才会刷新提交数据把

要回复问题请先登录注册