居然是你

flink访问认证kerberos的es集群失败

Elasticsearch | 作者 CatchYa | 发布于2020年09月15日 | 阅读数:2388

flink1.7 -> es 6.7
addSink(esSinkBuilder.build())
请教如何使用connector的放式进行kerberos认证
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);

// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be buffered
builder.setBulkFlushMaxActions(1);

// provide a RestClientFactory for custom configuration on the internally created REST client
builder.setRestClientFactory(
restClientBuilder -> {
restClientBuilder.setDefaultHeaders(...)
restClientBuilder.setMaxRetryTimeoutMillis(...)
restClientBuilder.setPathPrefix(...)
restClientBuilder.setHttpClientConfigCallback(...)
}
);


FLINK官方说:要为REST客户端进行自定义配置,用户可以RestClientFactory在设置ElasticsearchClient.Builder构建接收器时提供实现。
不知道怎么实现
已邀请:

要回复问题请先登录注册