// 创建BulkPorcessor对象
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
System.out.println("---尝试操作" + paramBulkRequest.numberOfActions() + "条数据---");
}
// 执行出错时执行
public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
System.out.println("---尝试操作" + paramBulkRequest.numberOfActions() + "条数据成功---");
}
public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
System.out.println("---尝试操作" + paramBulkRequest.numberOfActions() + "条数据失败---");
}
})
// 1w次请求执行一次bulk
.setBulkActions(10000)
// 1gb的数据刷新一次bulk
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
// 固定5s必须刷新一次
.setFlushInterval(TimeValue.timeValueSeconds(5))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(1)
// 设置退避, 100ms后执行, 最大请求3次
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
// 添加单次请求
Map<String, Object> m = new HashMap<>();
m.put("test2", "999999");
bulkProcessor.add(new IndexRequest("dw", "test", "1").source(m));
bulkProcessor.add(new DeleteRequest("dw", "test", "2"));
// 关闭
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
// 或者
bulkProcessor.close();
一次运行无法成功更新或者插入,而我在idea中使用alt+f8
执行这个bulkProcessor.add(new IndexRequest("dw", "test", "1").source(m));
,执行多次elasticsearch的索引version才发生变化。
我的运行是jdk1.8 es6.0
2 个回复
medcl - 今晚打老虎。
赞同来自:
novia - 1&0
赞同来自:
bulk .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
你设置了flush buffer,所以到了buffer后才会刷新提交数据把