三人行必有我师

使用spark向elasticsearch中写入数据异常

Elasticsearch | 作者 ggchangan | 发布于2016年03月22日 | 阅读数:20790

源代码如下:
 def main(args: Array[String]) {
val appName = "ElasticsearchSpark";
val master = "local";
val conf = new SparkConf().setAppName(appName).setMaster(master)
conf.set("es.index.auto.create", "true")
conf.set("es.nodes", "/127.0.0.1")
val sc = new SparkContext(conf)
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

}
elasticsearch 使用默认配置,版本是2.1.1
spark 默认配置,版本1.4.1
 
工程依赖如下:

<groupId>groupId</groupId>
<artifactId>spark-scala</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.10</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>

异常如下:
org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[/127.0.0.1:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:317)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:301)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:305)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:119)
    at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:101)
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:374)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/03/22 15:11:58 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[/127.0.0.1:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:317)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:301)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:305)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:119)
    at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:101)
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:374)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

16/03/22 15:11:58 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
16/03/22 15:11:58 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
16/03/22 15:11:58 INFO TaskSchedulerImpl: Cancelling stage 1
16/03/22 15:11:58 INFO DAGScheduler: ResultStage 1 (runJob at EsSpark.scala:67) failed in 0.132 s
16/03/22 15:11:58 INFO DAGScheduler: Job 1 failed: runJob at EsSpark.scala:67, took 0.144979 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[/127.0.0.1:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:317)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:301)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:305)
    at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:119)
    at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:101)
    at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:58)
    at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:374)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/03/22 15:11:58 INFO SparkContext: Invoking stop() from shutdown hook
已邀请:

stab - freshman

赞同来自:

conf.set("es.nodes", "/127.0.0.1")

斜杠是个什么鬼?

ggchangan - 关注elasticsearch原理,源码,应用

赞同来自:

不使用斜杠,会地址解析错误,我看异常信息,把斜杠加上的。

hlstudo

赞同来自:

你本机有es节点吗?
能正常访问吗?[/url]
如果有,可以把这句conf.set("es.nodes", "/127.0.0.1")删除掉,斜杠确实是多余的。

angel2017 - 技术是为了解决问题

赞同来自:

我也遇到类似的问题了,不过语言版本是scala,没有使用斜杠,还是有这个问题,有人有解吗?

hanbj

赞同来自:

报这个错误就是没连上集群,只有这3种情况
1、集群服务进程不可用(检查下)
2、代码里端口写错了,注意是http的端口(默认9200)
3、elasticsearch.yml配置里network.host:这个配置确保是打开的,配的是什么,代码里ip就填什么

joe23_2006

赞同来自:

这个问题最后是怎么解决的?我也遇到这个问题

要回复问题请先登录注册