沙师弟,师父的充电器掉了

ES java API批量插入的时候,插入失败的数据如何捕获?现在只能捕获id。

Elasticsearch | 作者 anzhiruo | 发布于2019年10月31日 | 阅读数:4329

如果有插入失败的数据,只能捕获到id, 不能捕获到整条数据,因为Id是自动生成的,所以也无法用id获取整条数据
错误信息:
[1273]: index [detail_test], type [all], id [AW37-ZedyEGvZfaBx9wW], message [MapperParsingException[failed to parse [appliedDate]]; nested: IllegalArgumentException[Invalid format: "2015-12-03 2015-10-29" is malformed at " 2015-10-29"];]
已邀请:

Marquezzzz

赞同来自:

插入的时候,用自己的id,不用es的自动生成的id

God_lockin

赞同来自:

如果你用bulkprocessor的话,可以试试这个
private BulkProcessor.Listener getBPListener() {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
log.info("Start to handle bulk commit executionId:[{}] for {} requests", executionId, request.numberOfActions());
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
log.info("Finished handling bulk commit executionId:[{}] for {} requests", executionId, request.numberOfActions());

if (response.hasFailures()) {
AtomicInteger count = new AtomicInteger();
response.spliterator().forEachRemaining(x -> {
if (x.isFailed()) {
BulkItemResponse.Failure failure = x.getFailure();
String msg = String.format(
"Index:[%s], type:[%s], id:[%s], itemId:[%s], opt:[%s], version:[%s], errMsg:%s"
, x.getIndex()
, x.getType()
, x.getId()
, x.getItemId()
, x.getOpType().getLowercase()
, x.getVersion()
, failure.getCause().getMessage()
);
log.error("Bulk executionId:[{}] has error messages:\t{}", executionId, msg);
count.incrementAndGet();
}
});
log.info("Finished handling bulk commit executionId:[{}] for {} requests with {} errors", executionId, request.numberOfActions(), count.intValue());
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
failure.printStackTrace();
Class clazz = failure.getClass();
log.error("Bulk [{}] finished with [{}] requests of error:{}, {}, {}:-[{}]", executionId
, request.numberOfActions()
, clazz.getName()
, clazz.getSimpleName()
, clazz.getTypeName()
, clazz.getCanonicalName()
,failure.getMessage());
request.requests().stream().filter(x -> x instanceof IndexRequest)
.forEach(x -> {
Map source = ((IndexRequest) x).sourceAsMap();
String pk = DataUtils.getNotNullValue(source, "id", String.class, "");
log.error("Failure to handle index:[{}], type:[{}] id:[{}], data:[{}]", x.index(), x.type(), pk, JSON.toJSONString(source));
});

if (failure instanceof IllegalStateException) {
synchronized (ESService.class) {
try {
initESClient();
} catch (ZhuangbilityException e) {
e.printStackTrace();
log.error("Re init ES client failure");
}
}
}
}
};
}

要回复问题请先登录注册