采用sparkstreaming+reparation分区,写入es出现异常,可以连接,和写入,但是出现大量的failure,报io.netty异常,有大哥哥遇到过么???
这是其中一个节点报的异常:
这是其中一个节点报的异常:
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'org.apache.logging.log4j.simplelog.StatusLogger.level' to TRACE to show Log4j2 internal initialization logging.
18/04/11 15:37:27 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#1],5,main]
java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal$4(Netty4Transport.java:460)
at org.apache.lucene.util.IOUtils.close(IOUtils.java:89)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51)
at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:443)
at org.elasticsearch.transport.TcpTransport.lambda$doStop$4(TcpTransport.java:936)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18/04/11 15:37:27 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#1],5,main]
java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;
at org.elasticsearch.transport.netty4.Netty4Transport.lambda$stopInternal$4(Netty4Transport.java:460)
at org.apache.lucene.util.IOUtils.close(IOUtils.java:89)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:36)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:46)
at org.elasticsearch.common.lease.Releasables.close(Releasables.java:51)
at org.elasticsearch.transport.netty4.Netty4Transport.stopInternal(Netty4Transport.java:443)
at org.elasticsearch.transport.TcpTransport.lambda$doStop$4(TcpTransport.java:936)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:569)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
18/04/11 15:37:27 INFO storage.DiskBlockManager: Shutdown hook called
18/04/11 15:37:27 INFO util.ShutdownHookManager: Shutdown hook called
源码片段如下(提交写入的部分去掉了): ngfwDS1.foreachRDD{rdd=>
rdd.foreachPartition{iterRecords:Iterator[Map[String,String]] =>
println(iterRecords.size)
System.setProperty("es.set.netty.runtime.available.processors", "false")
val settings = Settings.builder().put("cluster.name","bdp-clpp-log").put("client.transport.sniff", false).build()//
val client = new PreBuiltTransportClient(settings)
client.close()
4 个回复
JackGe
赞同来自: stupidsky
找不到某个方法的错误,是由于netty版本冲突导致的,Spark使用的netty版本高于ES使用的netty,高版本的netty没有该方法,可以使用mvn dependency:tree 查看依赖树,并找到造成冲突的netty包,通过在pom文件中exclusions方式排除造成冲突的高版本netty。
zhuo
赞同来自:
ESWorker
赞同来自:
elasticmsj1
赞同来自: