ElasticSearch异步创建索引报错,Connection closed unexpectedly,同步创建就没有问题
Elasticsearch | 作者 yinianhuakai | 发布于2020年02月13日 | 阅读数:6021
@Service
public class EsApiServiceImpl implements EsApiService {
@Autowired
private RestHighLevelClient client;
private static ObjectMapper mapper = new ObjectMapper();
@Override
public IndexResponse createIndex() {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 索引名,文档id,文档
IndexRequest request = new IndexRequest("index-8").id("1").source(jsonMap);
// 可选参数
// routing默认是null
String routing = request.routing();
request.routing(routing);
System.out.println("默认routing是: " + routing);
// Timeout to wait for primary shard to become available as a TimeValue
request.timeout(TimeValue.timeValueSeconds(1));
// 刷新策略,参考https://blog.csdn.net/hanchao5 ... 51166
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.opType(DocWriteRequest.OpType.CREATE);
IndexResponse response = null;
try {
response = client.index(request, RequestOptions.DEFAULT);
System.out.println("Result is: " + JsonParser.bean2Json(response));
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
@Override
public void createIndexAsync() {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 索引名,文档id,文档
IndexRequest request = new IndexRequest("index-9").id("1").source(jsonMap);
// listener用于成功或失败的回调
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println("Result is: " + JsonParser.bean2Json(indexResponse));
}
@Override
public void onFailure(Exception e) {
System.out.println("Exception is: " + e.getMessage());
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
}
}
public class EsApiServiceImpl implements EsApiService {
@Autowired
private RestHighLevelClient client;
private static ObjectMapper mapper = new ObjectMapper();
@Override
public IndexResponse createIndex() {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 索引名,文档id,文档
IndexRequest request = new IndexRequest("index-8").id("1").source(jsonMap);
// 可选参数
// routing默认是null
String routing = request.routing();
request.routing(routing);
System.out.println("默认routing是: " + routing);
// Timeout to wait for primary shard to become available as a TimeValue
request.timeout(TimeValue.timeValueSeconds(1));
// 刷新策略,参考https://blog.csdn.net/hanchao5 ... 51166
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.opType(DocWriteRequest.OpType.CREATE);
IndexResponse response = null;
try {
response = client.index(request, RequestOptions.DEFAULT);
System.out.println("Result is: " + JsonParser.bean2Json(response));
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
@Override
public void createIndexAsync() {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
// 索引名,文档id,文档
IndexRequest request = new IndexRequest("index-9").id("1").source(jsonMap);
// listener用于成功或失败的回调
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
System.out.println("Result is: " + JsonParser.bean2Json(indexResponse));
}
@Override
public void onFailure(Exception e) {
System.out.println("Exception is: " + e.getMessage());
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
}
}
3 个回复
Charele - Cisco4321
赞同来自:
你可以把报错信息贴全一点吗?
yinianhuakai
赞同来自:
shylock - 软件工程师
赞同来自:
Thread.sleep(1000);
这种原因是因为 当异步发生时,索引创建是否成功还未返回就已经 断开了client连接,这时候需要等待一小会,或者直接springboot启动则不会断开连接,如果进行单元测试则需要创建!