行动是治愈恐惧的良药,而犹豫、拖延将不断滋养恐惧。

ES创建client连接出错:java.io.StreamCorruptedException: invalid internal transport message

Elasticsearch | 作者 su123456 | 发布于2017年10月19日 | 阅读数:21483

我在启动web的java程序时,在创建ES的client端连接时,出现了以下的问题:
 elasticsearch 是5.4.1版本。
 
[2017-10-19 16:26:21,371] [INFO] [TransportClientNodesService] [elasticsearch[_client_][generic][T#3]] ache.logging.slf4j.SLF4JLogger.logMessage():243 - failed to get local cluster state for {esnode1}{pvCRd71tQmCjxMN38uJqGA}{-SooQOMDRxKyssDS6YkFcw}{10.202.77.204}{10.202.77.204:9300}, disconnecting...
ReceiveTimeoutTransportException[[esnode1][10.202.77.204:9300][cluster:monitor/state] request_id [12] timed out after [20619ms]]
at org.elasticsearch.transport.TransportService$TimeoutHandler.run(TransportService.java:934)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-10-19 16:26:39,439] [INFO] [ClientCnxn] [RMI TCP Connection(10.202.8.21:2181)] ookeeper.ClientCnxn$SendThread.run():1096 - Client session timed out, have not heard from server in 17944ms for sessionid 0x35f313787c90111, closing socket connection and attempting reconnect
[2017-10-19 16:26:39,471] [WARN] [Netty4Transport] [elasticsearch[_client_][transport_client_boss][T#7]] ache.logging.slf4j.SLF4JLogger.logMessage():246 - exception caught on transport layer [[id: 0xdd7017fd, L:/10.118.44.28:49317 - R:/10.202.77.204:9300]], closing connection
io.netty.handler.codec.DecoderException: java.io.StreamCorruptedException: invalid internal transport message format, got (0,0,0,0)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:272)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:544)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:498)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.StreamCorruptedException: invalid internal transport message format, got (0,0,0,0)
at org.elasticsearch.transport.TcpTransport.validateMessageHeader(TcpTransport.java:1266)
at org.elasticsearch.transport.netty4.Netty4SizeHeaderFrameDecoder.decode(Netty4SizeHeaderFrameDecoder.java:36)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:241)
... 15 more

 
 pom.xml内容
        <dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.8.2</version>
</dependency>

 
client端的连接代码
    public ElasticSearchNewVersion() {
TransportClient tclient ;
try {
if (client == null) {
// ip地址加到客户端中

Settings settings = Settings.builder().put("cluster.name", "sf_bdp")
.put("client.transport.sniff", true).put("client.transport.ping_timeout", "20s").build();
tclient = new PreBuiltTransportClient(settings);
String[] nodes = IP.split(",");
for (String node : nodes) {
if (node.length() > 0) {
// 跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node)
String[] hostPort = node.split(":");
try {
client =tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
} catch (UnknownHostException e) {
System.out.println("Initialize ElasticSearch Client fails."+ e);
throw new RuntimeException(e);
}

}
}


}
} catch (Exception e) {
System.out.println(e);
}

}

求大神指教,谢了。
已邀请:

su123456

赞同来自:

此问题已解决.问题的原因是项目中有使用netty其他版本的JAR包,导致JAR包冲突了.
 
解决方案:把低版本的JAR包excule掉就OK了.

要回复问题请先登录注册