Day 13 - Elasticsearch-Hadoop打通Elasticsearch和Hadoop
Advent | 作者 Jasonbian | 发布于2018年12月13日 | | 阅读数:9478ES-Hadoop打通Elasticsearch和Hadoop
介绍
Elasticsearch作为强大的搜索引擎,Hadoop HDFS是分布式文件系统。
ES-Hadoop是一个深度集成Hadoop和ElasticSearch的项目,也是ES官方来维护的一个子项目。Elasticsearch可以将自身的Document导入到HDFS中用作备份;同时也可以将存储在HDFS上的结构化文件导入为ES中的Document,通过实现Hadoop和ES之间的输入输出,可以在Hadoop里面对ES集群的数据进行读取和写入,充分发挥Map-Reduce并行处理的优势,为Hadoop数据带来实时搜索的可能。
ES-Hadoop插件支持Map-Reduce、Cascading、Hive、Pig、Spark、Storm、yarn等组件。
ES-Hadoop整个数据流转图如下:
环境配置
- Elasticsearch 5.0.2
- Centos 7
- elasticsearch-hadoop 5.0.2
- repository-hdfs-5.0.2
Elasticsearch备份数据到HDFS
介绍
Elasticsearch副本提供了数据高可靠性,在部分节点丢失的情况下不中断服务;但是副本并不提供对灾难性故障的保护,同时在运维人员误操作情况下也不能保障数据的可恢复性。对于这种情况,我们需要对Elasticsearch集群数据的真正备份。
通过快照的方式,将Elasticsearch集群中的数据备份到HDFS上,这样数据既存在于Elasticsearch集群中,有存在于HDFS上。当ES集群出现不可恢复的故障时,可以将数据从HDFS上快速恢复。
操作步骤
-
下载插件 https://artifacts.elastic.co/downloads/elasticsearch-plugins/repository-hdfs/repository-hdfs-5.0.2.zip 保存在/usr/local下
-
安装插件
cd /usr/local/es/elasticsearch-5.0.2/bin ./elasticsearch-plugin install file:///usr/local/repository-hdfs-5.0.2.zip
- 安装成功后需要重启Elasticsearch
备份与恢复
-
构建一个仓库
PUT http://192.168.10.74:9200/_snapshot/backup { "type": "hdfs", "settings": { "uri": "hdfs://192.168.10.170:9000", "path": "/es", "conf_location": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml" } }
-
备份快照
PUT http://192.168.10.74:9200/_snapshot/backup/snapshot_users?wait_for_completion=true { "indices": "users", //备份users的index,注意不设置这个属性,默认是备份所有index "ignore_unavailable": true, "include_global_state": false }
-
恢复快照
POST http://192.168.10.74:9200/_snapshot/backup/snapshot_users/_restore { "indices": "users", //指定索引恢复,不指定就是所有 "ignore_unavailable": true, //忽略恢复时异常索引 "include_global_state": false //是否存储全局转态信息,fasle代表有一个或几个失败,不会导致整个任务失败 }
整合Spark与Elasticsearch
整体思路
- 数据首先存储在HDFS上,可以通过Spark SQL直接导入到ES中
- Spark SQL可以直接通过建立Dataframe或者临时表连接ES,达到搜索优化、减少数据量和数据筛选的目的,此时数据只在ES内存中而不再Spark SQL中
- 筛选后的数据重新导入到Spark SQL中进行查询
引入依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.0.2</version>
</dependency>
具体流程
- 数据在HDFS上,数据存储在HDFS的每个DataNode的block上
-
数据加载到Spark SQL
- 数据从HDFS加载到Spark SQL中,以RDD形式存储
JavaRDD<String> textFile = spark.read().textFile("hdfs://192.168.10.170:9000/csv/user.csv")
- 添加数据结构信息转换为新的RDD
JavaRDD<UserItem> dataSplits = textFile.map(line -> { String records = line.toString().trim(); String record = records.substring(0,records.length() - 1).trim(); String[] parts = record.split("\\|"); UserItem u = new UserItem(); u.setName(parts[0]); u.setAge(parts[1]); u.setHeight(parts[2]); return u; });
- 根据新的RDD创建DataFrame
DataSet<Row> ds = spark.createDataFrame(dataSplits, UserItem.class);
-
由Dataset
创建索引,并写入ES
JavaEsSparkSQL.saveToEs(ds, "es_spark/users");
- 数据在ES中建立索引
-
Spark SQL通过索引对ES中的数据进行查询
SparkSession spark = SparkSession.builder().appName("es-spark").master("local").config("es.index.auto.create", true).getOrCreate(); Map<String, String> options = new HashMap<>(); options.put("pushdown", "true"); options.put("es.nodes","192.168.10.74:9200"); Dataset<Row> df = spark.read().options(options).format("org.elasticsearch.spark.sql").load("es_spark/users"); df.createOrReplaceTempView("users"); Dataset<Row> userSet = spark.sql("SELECT name FORM users WHERE age >=10 AND age <= 20"); userSet.show();
结束
ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,从而让ES的强大检索性能帮助我们快速分析海量数据。
本文地址:http://elasticsearch.cn/article/6194