使用 shuf 来打乱一个文件中的行或是选择文件中一个随机的行。
elasticsearch_hadoop

elasticsearch_hadoop

Day 13 - Elasticsearch-Hadoop打通Elasticsearch和Hadoop

AdventJasonbian 发表了文章 • 3 个评论 • 9810 次浏览 • 2018-12-13 09:34 • 来自相关话题

ES-Hadoop打通Elasticsearch和Hadoop

介绍

Elasticsearch作为强大的搜索引擎,Hadoop HDFS是分布式文件系统。

ES-Hadoop是一个深度集成Hadoop和ElasticSearch的项目,也是ES官方来维护的一个子项目。Elasticsearch可以将自身的Document导入到HDFS中用作备份;同时也可以将存储在HDFS上的结构化文件导入为ES中的Document,通过实现Hadoop和ES之间的输入输出,可以在Hadoop里面对ES集群的数据进行读取和写入,充分发挥Map-Reduce并行处理的优势,为Hadoop数据带来实时搜索的可能。

ES-Hadoop插件支持Map-Reduce、Cascading、Hive、Pig、Spark、Storm、yarn等组件。

ES-Hadoop整个数据流转图如下:

ES-Hadoop.jpg

环境配置

  • Elasticsearch 5.0.2
  • Centos 7
  • elasticsearch-hadoop 5.0.2
  • repository-hdfs-5.0.2

Elasticsearch备份数据到HDFS

介绍

Elasticsearch副本提供了数据高可靠性,在部分节点丢失的情况下不中断服务;但是副本并不提供对灾难性故障的保护,同时在运维人员误操作情况下也不能保障数据的可恢复性。对于这种情况,我们需要对Elasticsearch集群数据的真正备份。

通过快照的方式,将Elasticsearch集群中的数据备份到HDFS上,这样数据既存在于Elasticsearch集群中,有存在于HDFS上。当ES集群出现不可恢复的故障时,可以将数据从HDFS上快速恢复。

操作步骤

备份与恢复

  • 构建一个仓库

    PUT http://192.168.10.74:9200/_snapshot/backup
    {  
    "type": "hdfs",  
      "settings": {  
              "uri": "hdfs://192.168.10.170:9000",  
              "path": "/es",  
              "conf_location": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml"  
      }
    }
  • 备份快照

    PUT http://192.168.10.74:9200/_snapshot/backup/snapshot_users?wait_for_completion=true
    {
    "indices": "users",  //备份users的index,注意不设置这个属性,默认是备份所有index
    "ignore_unavailable": true,
    "include_global_state": false
    }
  • 恢复快照

    POST http://192.168.10.74:9200/_snapshot/backup/snapshot_users/_restore
    {
    "indices": "users",    //指定索引恢复,不指定就是所有
    "ignore_unavailable": true,     //忽略恢复时异常索引
    "include_global_state": false    //是否存储全局转态信息,fasle代表有一个或几个失败,不会导致整个任务失败
    }

整合Spark与Elasticsearch

整体思路

  • 数据首先存储在HDFS上,可以通过Spark SQL直接导入到ES中
  • Spark SQL可以直接通过建立Dataframe或者临时表连接ES,达到搜索优化、减少数据量和数据筛选的目的,此时数据只在ES内存中而不再Spark SQL中
  • 筛选后的数据重新导入到Spark SQL中进行查询

引入依赖

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>5.0.2</version>
</dependency>

具体流程

  • 数据在HDFS上,数据存储在HDFS的每个DataNode的block上

image-20181211115144840.png

  • 数据加载到Spark SQL

    • 数据从HDFS加载到Spark SQL中,以RDD形式存储
    JavaRDD<String> textFile = spark.read().textFile("hdfs://192.168.10.170:9000/csv/user.csv")
    • 添加数据结构信息转换为新的RDD
    JavaRDD<UserItem> dataSplits = textFile.map(line -> {
        String records = line.toString().trim();
        String record = records.substring(0,records.length() - 1).trim();
        String[] parts = record.split("\\|");
        UserItem u = new UserItem();
        u.setName(parts[0]);
        u.setAge(parts[1]);
        u.setHeight(parts[2]);
        return u;
    });
    • 根据新的RDD创建DataFrame
    DataSet<Row> ds = spark.createDataFrame(dataSplits, UserItem.class);

image-20181211120610726.png

  • 由Dataset创建索引,并写入ES

    JavaEsSparkSQL.saveToEs(ds, "es_spark/users");
  • 数据在ES中建立索引

image-20181211140747391.png

  • Spark SQL通过索引对ES中的数据进行查询

    SparkSession spark = SparkSession.builder().appName("es-spark").master("local").config("es.index.auto.create", true).getOrCreate();
    Map<String, String> options = new HashMap<>();
    options.put("pushdown", "true");
    options.put("es.nodes","192.168.10.74:9200");
    
    Dataset<Row> df = spark.read().options(options).format("org.elasticsearch.spark.sql").load("es_spark/users");
    df.createOrReplaceTempView("users");
    
    Dataset<Row> userSet = spark.sql("SELECT name FORM users WHERE age >=10 AND age <= 20");
    userSet.show();

结束

ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,从而让ES的强大检索性能帮助我们快速分析海量数据。

elasticsearch-hadoopp hive导入数据到es中的总是version conflict?

Elasticsearchzyb1994111 回复了问题 • 3 人关注 • 2 个回复 • 7151 次浏览 • 2018-04-03 10:23 • 来自相关话题

关于从hadoop导数据到ES的日期格式问题

回复

Elasticsearch啊喔额 发起了问题 • 1 人关注 • 0 个回复 • 2562 次浏览 • 2018-01-26 19:24 • 来自相关话题

elasticsearch-hadoop中spark查询的原理

回复

ElasticsearchJasonbian 发起了问题 • 1 人关注 • 0 个回复 • 5232 次浏览 • 2016-11-14 16:20 • 来自相关话题

elasticsearch-hadoopp hive导入数据到es中的总是version conflict?

回复

Elasticsearchzyb1994111 回复了问题 • 3 人关注 • 2 个回复 • 7151 次浏览 • 2018-04-03 10:23 • 来自相关话题

关于从hadoop导数据到ES的日期格式问题

回复

Elasticsearch啊喔额 发起了问题 • 1 人关注 • 0 个回复 • 2562 次浏览 • 2018-01-26 19:24 • 来自相关话题

elasticsearch-hadoop中spark查询的原理

回复

ElasticsearchJasonbian 发起了问题 • 1 人关注 • 0 个回复 • 5232 次浏览 • 2016-11-14 16:20 • 来自相关话题

Day 13 - Elasticsearch-Hadoop打通Elasticsearch和Hadoop

AdventJasonbian 发表了文章 • 3 个评论 • 9810 次浏览 • 2018-12-13 09:34 • 来自相关话题

ES-Hadoop打通Elasticsearch和Hadoop

介绍

Elasticsearch作为强大的搜索引擎,Hadoop HDFS是分布式文件系统。

ES-Hadoop是一个深度集成Hadoop和ElasticSearch的项目,也是ES官方来维护的一个子项目。Elasticsearch可以将自身的Document导入到HDFS中用作备份;同时也可以将存储在HDFS上的结构化文件导入为ES中的Document,通过实现Hadoop和ES之间的输入输出,可以在Hadoop里面对ES集群的数据进行读取和写入,充分发挥Map-Reduce并行处理的优势,为Hadoop数据带来实时搜索的可能。

ES-Hadoop插件支持Map-Reduce、Cascading、Hive、Pig、Spark、Storm、yarn等组件。

ES-Hadoop整个数据流转图如下:

ES-Hadoop.jpg

环境配置

  • Elasticsearch 5.0.2
  • Centos 7
  • elasticsearch-hadoop 5.0.2
  • repository-hdfs-5.0.2

Elasticsearch备份数据到HDFS

介绍

Elasticsearch副本提供了数据高可靠性,在部分节点丢失的情况下不中断服务;但是副本并不提供对灾难性故障的保护,同时在运维人员误操作情况下也不能保障数据的可恢复性。对于这种情况,我们需要对Elasticsearch集群数据的真正备份。

通过快照的方式,将Elasticsearch集群中的数据备份到HDFS上,这样数据既存在于Elasticsearch集群中,有存在于HDFS上。当ES集群出现不可恢复的故障时,可以将数据从HDFS上快速恢复。

操作步骤

备份与恢复

  • 构建一个仓库

    PUT http://192.168.10.74:9200/_snapshot/backup
    {  
    "type": "hdfs",  
      "settings": {  
              "uri": "hdfs://192.168.10.170:9000",  
              "path": "/es",  
              "conf_location": "/usr/local/hadoop/etc/hadoop/hdfs-site.xml"  
      }
    }
  • 备份快照

    PUT http://192.168.10.74:9200/_snapshot/backup/snapshot_users?wait_for_completion=true
    {
    "indices": "users",  //备份users的index,注意不设置这个属性,默认是备份所有index
    "ignore_unavailable": true,
    "include_global_state": false
    }
  • 恢复快照

    POST http://192.168.10.74:9200/_snapshot/backup/snapshot_users/_restore
    {
    "indices": "users",    //指定索引恢复,不指定就是所有
    "ignore_unavailable": true,     //忽略恢复时异常索引
    "include_global_state": false    //是否存储全局转态信息,fasle代表有一个或几个失败,不会导致整个任务失败
    }

整合Spark与Elasticsearch

整体思路

  • 数据首先存储在HDFS上,可以通过Spark SQL直接导入到ES中
  • Spark SQL可以直接通过建立Dataframe或者临时表连接ES,达到搜索优化、减少数据量和数据筛选的目的,此时数据只在ES内存中而不再Spark SQL中
  • 筛选后的数据重新导入到Spark SQL中进行查询

引入依赖

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>5.0.2</version>
</dependency>

具体流程

  • 数据在HDFS上,数据存储在HDFS的每个DataNode的block上

image-20181211115144840.png

  • 数据加载到Spark SQL

    • 数据从HDFS加载到Spark SQL中,以RDD形式存储
    JavaRDD<String> textFile = spark.read().textFile("hdfs://192.168.10.170:9000/csv/user.csv")
    • 添加数据结构信息转换为新的RDD
    JavaRDD<UserItem> dataSplits = textFile.map(line -> {
        String records = line.toString().trim();
        String record = records.substring(0,records.length() - 1).trim();
        String[] parts = record.split("\\|");
        UserItem u = new UserItem();
        u.setName(parts[0]);
        u.setAge(parts[1]);
        u.setHeight(parts[2]);
        return u;
    });
    • 根据新的RDD创建DataFrame
    DataSet<Row> ds = spark.createDataFrame(dataSplits, UserItem.class);

image-20181211120610726.png

  • 由Dataset创建索引,并写入ES

    JavaEsSparkSQL.saveToEs(ds, "es_spark/users");
  • 数据在ES中建立索引

image-20181211140747391.png

  • Spark SQL通过索引对ES中的数据进行查询

    SparkSession spark = SparkSession.builder().appName("es-spark").master("local").config("es.index.auto.create", true).getOrCreate();
    Map<String, String> options = new HashMap<>();
    options.put("pushdown", "true");
    options.put("es.nodes","192.168.10.74:9200");
    
    Dataset<Row> df = spark.read().options(options).format("org.elasticsearch.spark.sql").load("es_spark/users");
    df.createOrReplaceTempView("users");
    
    Dataset<Row> userSet = spark.sql("SELECT name FORM users WHERE age >=10 AND age <= 20");
    userSet.show();

结束

ES-Hadoop无缝打通了ES和Hadoop两个非常优秀的框架,从而让ES的强大检索性能帮助我们快速分析海量数据。