Q:非洲食人族的酋长吃什么?

解决Elasticsearch HTTP方式查询报SocketTimeoutException的问题(待验证)

Elasticsearch | 作者 trycatchfinal | 发布于2020年05月18日 | | 阅读数:2382

注意: 此解决方案,短时间内没有复现,还需要长时间验证是否有效。

现象

在使用HTTP方式,Elasticsearch 长时间不查询后,再次查询会出现抛出SocketTimeoutException的问题。

原因

基本逻辑

Elasticsearch 客户端会根据服务器返回的HTTP报文内容,来决定客户端保持HTTP连接Keep-Alive状态的策略。
如果结果如下,那么保持HTTP连接 Keep-Alive状态为120s

Connection: Keep-Alive
Keep-Alive: max=5, timeout = 120

如果不包含上述内容,那么客户端将保持Keep-Alive状态的时间为永久。
事实上,Elasticsearch服务器返回的报文,并没有上述HTTP头内容,所以客户端所有的HTTP连接都为永久保持Keep-Alive。
如果客户端长时间没有发送请求,服务器或者防火墙已经close了HTTP底层的TCP链接,但是此时客户端并不知道,由于Keep Alive是无限期,那么并不会重新建立连接,而是直接发送请求,此时就会得到SocketTimeout异常。

阅读源码

我使用的Elasticsearch的客户端下面的版本

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>rest</artifactId>
    <version>5.4.1</version>
</dependency>

其HTTP的发送依赖Maven包httpasyncclient.

这个包中的接口ConnectionKeepAliveStrategy,抽象了处理 HTTP Keepalive 的策略,其默认实现为:

@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {

    public static final DefaultConnectionKeepAliveStrategy INSTANCE = new DefaultConnectionKeepAliveStrategy();

    @Override
    public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
        Args.notNull(response, "HTTP response");
        final HeaderElementIterator it = new BasicHeaderElementIterator(
                response.headerIterator(HTTP.CONN_KEEP_ALIVE));
        while (it.hasNext()) {
            final HeaderElement he = it.nextElement();
            final String param = he.getName();
            final String value = he.getValue();
            if (value != null && param.equalsIgnoreCase("timeout")) {
                try {
                    return Long.parseLong(value) * 1000;
                } catch(final NumberFormatException ignore) {
                }
            }
        }
        return -1;
    }
}

-1代表多长时间,接口说明不是很清楚。
PoolingNHttpClientConnectionManager 类中的代码,实现了上述对待KeepAlive的逻辑,可以看到-1表示为:无限期

@Override
    public void releaseConnection(
            final NHttpClientConnection managedConn,
            final Object state,
            final long keepalive,
            final TimeUnit tunit) {
        Args.notNull(managedConn, "Managed connection");
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.detach(managedConn);
            if (entry == null) {
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Releasing connection: " + format(entry) + formatStats(entry.getRoute()));
            }
            final NHttpClientConnection conn = entry.getConnection();
            try {
                if (conn.isOpen()) {
                    entry.setState(state);
                    entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                       // keepalive 就是上面接口 ConnectionKeepAliveStrategy.getKeepAliveDuration()的返回值
                        if (keepalive > 0) {
                            s = "for " + (double) keepalive / 1000 + " seconds";
                        } else {
                       // 如果小于0 ,那么策略为indefinitely:无限期。
                            s = "indefinitely";
                        }
                        this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                    }
                }
            } finally {
                this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
                }
            }
        }
    }

解决方式

自定义类实现ConnectionKeepAliveStrategy接口:

public class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {

    public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();

    private CustomConnectionKeepAliveStrategy() {
        super();
    }

    /**
     * 最大keep alive的时间(分钟)
     * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
     */
    private final long MAX_KEEP_ALIVE_MINUTES = 10;

    @Override
    public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        long keepAliveDuration = super.getKeepAliveDuration(response, context);
        // <0 为无限期keepalive
        // 将无限期替换成一个默认的时间
        if(keepAliveDuration < 0){
            return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
        }
        return keepAliveDuration;
    }
}

在创建Elasticserach Client时,配置

RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
        httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
        return httpClientBuilder;
    }
});

[尊重社区原创,转载请保留或注明出处]
本文地址:http://elasticsearch.cn/article/13778


3 个评论

连接断了直接发送请求,不是会重新建立连接吗? 为什么会出现SocketTimeout异常?
我也遇到了这个问题,一旦长时间不使用连接,后续所有查询都会SocketTimeout。(调整超时时间无效)
目前的解决方案是使用心跳,隔一段时间就去获取一次es的连接状态,以此来保持连接的活跃
es版本6.8.4
使用的是RestHighLevelClient

要回复文章请先登录注册