ELK 使用小技巧(第 4 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

Day 21 - ECE 版本升级扫雷实战

Elastic Cloud Enterprise 出来也有一年多的时间了。对于这类的开源软件企业版本,基本都是在可管理性和稳定性上面下功夫。但是新产品免不了需要经历一下bug的打磨才会变得成熟。

下面分享的这个案例是当我们在把集群从5.4.1 升级到5.6.12 的过程中,遇到节点关闭受阻,升级不完整等情景。以及对应的处理方法。
首先在ECE中,版本是通过stack的方式管理
 
QQ20181221-104539@2x.png

Ref : https://www.elastic.co/guide/e ... .html

这些版本都是以docker images的形式存储

QQ20181221-104619@2x.png

因此,ECE根据不同的版本,然后选择对应的docker image就可以创建一个节点了. 那么升级的过程就可以简单分成几个步骤

1.exclude准备升级的节点。

2.停止节点ES进程,更换container 版本。

3.重新启动节点,加入集群。

4.在其他节点上重复以上流程。

在这个过程中, 实际使用的时候发现有一些需要注意的雷区.
扫雷一:使用UI触发升级,必须保证集群没有自定义插件和bundles

ECE 里面的集群操作是通过plan来控制的

QQ20181221-104650@2x.png

任何的集群操作最终都会生成一个plan的diff。如上图,把集群从5.4.1 升级到 5.6.12 会产生以上diff.

正常情况下是没有问题的.

如果集群配置了自定义bundles, 比如LDAP bundles, ref:https://www.elastic.co/guide/e ... .html

那么在集群的plan里面就会存在这么一段配置

QQ20181221-104718@2x.png

那么当我们在按下升级按钮的时候

QQ20181221-104744@2x.png

ECE 只在plan中修改了集群大版本的配置, 但是并没有修改自定义bundles中的版本号(仍然是5.4.1).

在这种情况下去执行升级,会直接产生报错.

QQ20181221-104810@2x.png

界面上没有显示原因, 但是这是因为plan里面大版本和bundle中的版本不一致,然后会导致新增的节点无法启动. 于是ECE 就认为集群升级失败了.
解决方法是手动编辑plan,把自定义bundles中的版本号改成和集群版本一致

QQ20181221-104848@2x.png

然后使用ECE 提供的一种手动使用plan进行集群升级的方式进行升级.

扫雷二:节点无法关闭

ECE 控制container 是通过一个叫做 constructor的服务。constructor 通过接收集群的更改需求,制定具体的更改计划与步骤,指导allocator对container进行操作。同时也负责保证集群高可靠性,通过Availability Zone的数量在不同的AZ上面部署节点。

当Allocator接受到关闭container命令的时候,会尝试去关闭container,如果container处于一个阻塞状态无法响应, 那么关闭命令无法执行成功。这个时候constructor会等待节点关闭,但是allocator又认为节点已经接受到关闭命令了。又或者constructor发送给allocator的过程中网络丢包, 这个时候allocator 没有正确接受关闭container的命令. 整个升级进程就卡住了。 这种情况十分罕见,通常发现一个container如果处于”正在关闭”时间太久了, 那么通常就是中间的通信出现问题了.

QQ20181221-104926@2x.png

解决办法是可以通过手动停止container, 在对应的allocator上面找到container,使用

docker stop <container_name>

停止container,这样可以出发allocator更新container的健康状态,上报这个container已经关闭了, 从而打通流程并执行下一步。
扫雷三: 多版本并存

如果使用上面的方式强行关闭docker container, 虽然可以让升级进程继续进行下去. 但是被手动关闭的节点会保留原来的版本。于是在升级后查看各个节点的版本,会发现部分节点是5.4.1, 部分是5.6.12.

因为节点是强制关闭的, ECE直接认为节点已经完成升级,并重新启动这个container. 而在这个处理中,跳过了升级docker image的一步.

为什么不是生成一个新的container呢? 因为从plan里面可以看到

QQ20181221-104951@2x.png

在默认情况下, ECE 处理版本升级是使用rolling 策略 Ref: https://www.elastic.co/guide/e ... .html

在这个策略下,ECE会停止当前container并直接修改重启。

如果ECE集群容量允许, 可以改成grow_and_shrink 策略, 这样ECE 会创建新的container并且销毁旧的container, 避免集群出现多版本.
如果出现了多版本的集群,可以通过更改集群任意一个配置来触发 grow_and_shrink 同样可以使到版本回归一致.
总结来说ECE 在版本升级方面还是有很多需要改进的地方. 对于ECE用户再说在使用ECE的版本升级功能的时候主要有以下建议

1. 自己学会手动修改plan. 这也是每一个ECE support engineer 都会干的事情.

2.如果集群容量允许,尽量使用 grow_and_shrink的策略来进行集群操作.
 
继续阅读 »
Elastic Cloud Enterprise 出来也有一年多的时间了。对于这类的开源软件企业版本,基本都是在可管理性和稳定性上面下功夫。但是新产品免不了需要经历一下bug的打磨才会变得成熟。

下面分享的这个案例是当我们在把集群从5.4.1 升级到5.6.12 的过程中,遇到节点关闭受阻,升级不完整等情景。以及对应的处理方法。
首先在ECE中,版本是通过stack的方式管理
 
QQ20181221-104539@2x.png

Ref : https://www.elastic.co/guide/e ... .html

这些版本都是以docker images的形式存储

QQ20181221-104619@2x.png

因此,ECE根据不同的版本,然后选择对应的docker image就可以创建一个节点了. 那么升级的过程就可以简单分成几个步骤

1.exclude准备升级的节点。

2.停止节点ES进程,更换container 版本。

3.重新启动节点,加入集群。

4.在其他节点上重复以上流程。

在这个过程中, 实际使用的时候发现有一些需要注意的雷区.
扫雷一:使用UI触发升级,必须保证集群没有自定义插件和bundles

ECE 里面的集群操作是通过plan来控制的

QQ20181221-104650@2x.png

任何的集群操作最终都会生成一个plan的diff。如上图,把集群从5.4.1 升级到 5.6.12 会产生以上diff.

正常情况下是没有问题的.

如果集群配置了自定义bundles, 比如LDAP bundles, ref:https://www.elastic.co/guide/e ... .html

那么在集群的plan里面就会存在这么一段配置

QQ20181221-104718@2x.png

那么当我们在按下升级按钮的时候

QQ20181221-104744@2x.png

ECE 只在plan中修改了集群大版本的配置, 但是并没有修改自定义bundles中的版本号(仍然是5.4.1).

在这种情况下去执行升级,会直接产生报错.

QQ20181221-104810@2x.png

界面上没有显示原因, 但是这是因为plan里面大版本和bundle中的版本不一致,然后会导致新增的节点无法启动. 于是ECE 就认为集群升级失败了.
解决方法是手动编辑plan,把自定义bundles中的版本号改成和集群版本一致

QQ20181221-104848@2x.png

然后使用ECE 提供的一种手动使用plan进行集群升级的方式进行升级.

扫雷二:节点无法关闭

ECE 控制container 是通过一个叫做 constructor的服务。constructor 通过接收集群的更改需求,制定具体的更改计划与步骤,指导allocator对container进行操作。同时也负责保证集群高可靠性,通过Availability Zone的数量在不同的AZ上面部署节点。

当Allocator接受到关闭container命令的时候,会尝试去关闭container,如果container处于一个阻塞状态无法响应, 那么关闭命令无法执行成功。这个时候constructor会等待节点关闭,但是allocator又认为节点已经接受到关闭命令了。又或者constructor发送给allocator的过程中网络丢包, 这个时候allocator 没有正确接受关闭container的命令. 整个升级进程就卡住了。 这种情况十分罕见,通常发现一个container如果处于”正在关闭”时间太久了, 那么通常就是中间的通信出现问题了.

QQ20181221-104926@2x.png

解决办法是可以通过手动停止container, 在对应的allocator上面找到container,使用

docker stop <container_name>

停止container,这样可以出发allocator更新container的健康状态,上报这个container已经关闭了, 从而打通流程并执行下一步。
扫雷三: 多版本并存

如果使用上面的方式强行关闭docker container, 虽然可以让升级进程继续进行下去. 但是被手动关闭的节点会保留原来的版本。于是在升级后查看各个节点的版本,会发现部分节点是5.4.1, 部分是5.6.12.

因为节点是强制关闭的, ECE直接认为节点已经完成升级,并重新启动这个container. 而在这个处理中,跳过了升级docker image的一步.

为什么不是生成一个新的container呢? 因为从plan里面可以看到

QQ20181221-104951@2x.png

在默认情况下, ECE 处理版本升级是使用rolling 策略 Ref: https://www.elastic.co/guide/e ... .html

在这个策略下,ECE会停止当前container并直接修改重启。

如果ECE集群容量允许, 可以改成grow_and_shrink 策略, 这样ECE 会创建新的container并且销毁旧的container, 避免集群出现多版本.
如果出现了多版本的集群,可以通过更改集群任意一个配置来触发 grow_and_shrink 同样可以使到版本回归一致.
总结来说ECE 在版本升级方面还是有很多需要改进的地方. 对于ECE用户再说在使用ECE的版本升级功能的时候主要有以下建议

1. 自己学会手动修改plan. 这也是每一个ECE support engineer 都会干的事情.

2.如果集群容量允许,尽量使用 grow_and_shrink的策略来进行集群操作.
  收起阅读 »

Day 17 - 关于日志型数据管理策略的思考

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

冷热数据分离

冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  1. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: heap-size。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  2. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  3. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  4. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

热结点分组管理

所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

索引的Rollover

Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

索引的Rollup

我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

多集群的便利性

数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。

继续阅读 »

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

冷热数据分离

冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  1. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: heap-size。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  2. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  3. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  4. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

热结点分组管理

所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

索引的Rollover

Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

索引的Rollup

我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

多集群的便利性

数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。

收起阅读 »

Day 15 - 基于海量公司分词ES中文分词插件

介绍

本次想和大家分享一款Elasticsearch分词插件,该插件是基于天津海量信息股份有限公司的中文分词核心开发的。海量分词针对大数据检索场景专门做了定制和优化,更贴近搜索需求,整体分词的性能也是非常高效。

本文章有广告成分。但希望将公司研究成果分享出来,给大家实际工作中多一种选择...

海量分词检索优化点

  • 地名方面海量分词5.0可以识别并检索出关于地名后缀的结果

    可以通过搜索“河南”得到“河南省”的结果,搜索“天津”得到“天津市”的搜索结果,而不是简单河南、天津的识别。

  • 著名人物的人名识别更精准,如刘翔、傅莹等

    部分分词器处理中文分词只有两种方式:一种是单字(unigrams)形式,即简单粗暴的将中文的每一个汉字作为一个词(token)分开;另一种是两字(bigrams)的,也就是任意相邻的两个汉字作为一个词分开。这种简单粗暴的切分方式无法实现时效性较新的人名识别,如刘翔、傅莹等会被识别为单字切开。

  • 外国人名识别方面海量可以将人名识别智能识别

    “玛利亚 凯利”、“乔治·史密斯”、“玛丽·戴维斯”将完整的外国人名识别出姓氏和名,如“乔治·史密斯”可以被识别为“乔治”和 “史密斯”。

  • 常见词的品牌名称识别方面,海量分词5.0识别的结果中包含实际意义的品牌名称

    如“乐高”,“吉米作为简单的词,可以被识别,但是词放在文档语境中有其品牌的属性,海量分词识别的结果中可以准确搜索出品牌的结果。

  • 机构名识别方面

    海量分词5.0可以识别完整的机构名称,如“天津海量信息技术股份有限公司”,可以完整的识别出全称。

海量分词性能评测

评测用例

本次评测选取的语料一共三个。一个是2MB的海量测试语料,一个是4MB的北大语料(新版旧版各2MB),一个是9.4GB海量的线上实际数据

评测指标

本次评测是在开源评测程序上修改而来,评测指标有分词速度、行数完美率、字数完美率(该指标仅供参考)、内存消耗

评测结果

2MB海量测试语料

分词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1049.0212 74.11% 65.97% 85
ltp / 33.748833 55.68% 45.23% 201
IctClass 普通分词 208.69612 48.77% 37.10% 51
IctClass 细粒度分词 691.5951 38.33% 27.95% 51
Jieba SEARCH分词 592.697 47.64% 36.25% 236
FudanNLP / 121.7537 42.99% 31.59% 99
HanLP 标准分词 212.74121 45.30% 34.00% 63
HanLP NLP分词 378.23676 44.09% 32.55% 71
HanLP N-最短路径分词 189.29959 44.19% 32.22% 60
HanLP 最短路径分词 415.63605 43.19% 31.28% 59
HanLP 极速词典分词 6735.1934 36.78% 25.10% 18
THULAC / 0.20857348 54.49% 43.79% 110
Stanford CTB 0.13520464 44.43% 33.25% 1101
Stanford PKU 0.12508623 45.15% 34.01% 1065

可以看到海量分词的行数完美率是最高的,而且速度十分优异;仅有的一个比海量分词速度快的算法是一个追求极限性能舍弃准确率的算法

4MB北大语料

词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1121.7269 85.94% 48.28% 85
ltp / 35.81329 87.37% 49.37% 201
IctClass 普通分词 226.11554 78.55% 42.04% 51
IctClass 细粒度分词 756.5135 59.06% 30.61% 51
Jieba SEARCH分词 957.52826 47.07% 20.01% 236
FudanNLP / 126.09879 58.54% 27.78% 99
HanLP 标准分词 369.66 65.46% 35.04% 63
HanLP NLP分词 439.75632 61.93% 31.37% 71
HanLP N-最短路径分词 223.30482 69.20% 35.07% 60
HanLP 最短路径分词 440.72244 67.74% 33.83% 59
HanLP 极速词典分词 7522.581 58.09% 27.82% 18

(注:THULAC和stanford由于速度问题,不纳入评测)

可以看到海量的速度和行数完美率都很优异而且达到了兼顾,行数完美率只落后更高的ltp算法1.4个百分点,速度却是它的三十多倍

9.4GB线上数据

分词器 分词模式 分词速度(字符/毫秒)
ltp / 33.592
海量 / 960.611
IctClass 普通分词 198.094
HanLP N-最短路径分词 201.735
HanLP 最短路径分词 425.482
HanLP 标准分词 473.400
HanLP NLP分词 361.842
IctClass 细粒度分词 689.183
FudanNLP / 120.860
HanLP 极速词典分词 6238.916
Jieba SEARCH分词 568.262

(注:THULAC和stanford由于速度问题,不纳入评测)

本表格中分词顺序按(4MB北大语料的)行数完美率进行排序,越靠前的(4MB北大语料的)行数完美率越高

可以看出海量的分词速度十分优秀,分词速度拉开了大多数分词数倍,相比于行数完美率小幅领先的ltp要快几十倍

海量分词插件使用方法

安装使用

  • 下载安装 - 地址: https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases

    unzip plugin to folder `your-es-root/plugins/`
  • 使用 elasticsearch-plugin 安装

    ./bin/elasticsearch-plugin install https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases/download/v6.4.2/elasticsearch-analysis-hlseg-6.4.2.zip
  • 重启es集群

实例(借用github-ik分词插件的实例)

1.创建index

curl -XPUT http://localhost:9200/hylanda_seg

2.配置mapping

curl -XPOST http://localhost:9200/hylanda_seg/data/_mapping -H 'Content-Type:application/json' -d'
{
  "properties": {
    "msg": {
      "type": "text",
      "analyzer": "hlseg_search"
    }
  }
}'

3.插入测试数据

curl -XPOST http://localhost:9200/hylanda_seg/data/1 -H 'Content-Type:application/json' -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/2 -H 'Content-Type:application/json' -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/3 -H 'Content-Type:application/json' -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/4 -H 'Content-Type:application/json' -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

4.查询

curl -XPOST http://localhost:9200/hylanda_seg/data/_search  -H 'Content-Type:application/json' -d'
{
  "query": {
    "match": {
      "content": "中国"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}
'

返回结果

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.5754429,
    "hits" : [
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "4",
        "_score" : 0.5754429,
        "_source" : {
          "content" : "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"
        },
        "highlight" : {
          "content" : [
            "中韩渔警冲突调查:韩警平均每天扣1艘<em>中国</em>渔船"
          ]
        }
      },
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "5",
        "_score" : 0.2876821,
        "_source" : {
          "content" : "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
        },
        "highlight" : {
          "content" : [
            "<em>中国</em>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
          ]
        }
      }
    ]
  }
}

字典配置

海量分词分为基础词词典CoreDict.dat和自定义词典userDict_utf8.txt。基础词词典在dictionary目录下,需要将CoreDict.zip解压后放在config目录下,可以通过修改config下的userDict_utf8.txt来更新自定义词典

自定义词典格式如下


1.用户自定义词典采用文本格式,utf-8编码,每行一个词

2.每个词包含三列属性,分别是词串、词的属性以及idf值的加权等级,并以Tab作为分隔,其中除了词串必填外,其他列可以不填,不填写则系统采用默认值

3.“#”表示注释,会在加载时被忽略

4.词的属性以西文逗号分隔,可以是词性、停止词标志或者自定义属性

5.词性标记参考北大标准,用于词性标注时参考,该项不填则默认为名词

6.停止词标志为:stopword,由SegOption.outputStopWord来控制是否输出停止词

7.自定义属性不参与分词过程,分词结果中若Token.userTag不为空,则可以获取到该词的自定义属性。

8.idf值的加权分5级,从低到高的定义是idf-lv1 — idf-lv5,等级越高则该词在关键词计算时的权重会越大,若不填写该值则系统默认是idf-lv3(中等权重)
继续阅读 »

介绍

本次想和大家分享一款Elasticsearch分词插件,该插件是基于天津海量信息股份有限公司的中文分词核心开发的。海量分词针对大数据检索场景专门做了定制和优化,更贴近搜索需求,整体分词的性能也是非常高效。

本文章有广告成分。但希望将公司研究成果分享出来,给大家实际工作中多一种选择...

海量分词检索优化点

  • 地名方面海量分词5.0可以识别并检索出关于地名后缀的结果

    可以通过搜索“河南”得到“河南省”的结果,搜索“天津”得到“天津市”的搜索结果,而不是简单河南、天津的识别。

  • 著名人物的人名识别更精准,如刘翔、傅莹等

    部分分词器处理中文分词只有两种方式:一种是单字(unigrams)形式,即简单粗暴的将中文的每一个汉字作为一个词(token)分开;另一种是两字(bigrams)的,也就是任意相邻的两个汉字作为一个词分开。这种简单粗暴的切分方式无法实现时效性较新的人名识别,如刘翔、傅莹等会被识别为单字切开。

  • 外国人名识别方面海量可以将人名识别智能识别

    “玛利亚 凯利”、“乔治·史密斯”、“玛丽·戴维斯”将完整的外国人名识别出姓氏和名,如“乔治·史密斯”可以被识别为“乔治”和 “史密斯”。

  • 常见词的品牌名称识别方面,海量分词5.0识别的结果中包含实际意义的品牌名称

    如“乐高”,“吉米作为简单的词,可以被识别,但是词放在文档语境中有其品牌的属性,海量分词识别的结果中可以准确搜索出品牌的结果。

  • 机构名识别方面

    海量分词5.0可以识别完整的机构名称,如“天津海量信息技术股份有限公司”,可以完整的识别出全称。

海量分词性能评测

评测用例

本次评测选取的语料一共三个。一个是2MB的海量测试语料,一个是4MB的北大语料(新版旧版各2MB),一个是9.4GB海量的线上实际数据

评测指标

本次评测是在开源评测程序上修改而来,评测指标有分词速度、行数完美率、字数完美率(该指标仅供参考)、内存消耗

评测结果

2MB海量测试语料

分词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1049.0212 74.11% 65.97% 85
ltp / 33.748833 55.68% 45.23% 201
IctClass 普通分词 208.69612 48.77% 37.10% 51
IctClass 细粒度分词 691.5951 38.33% 27.95% 51
Jieba SEARCH分词 592.697 47.64% 36.25% 236
FudanNLP / 121.7537 42.99% 31.59% 99
HanLP 标准分词 212.74121 45.30% 34.00% 63
HanLP NLP分词 378.23676 44.09% 32.55% 71
HanLP N-最短路径分词 189.29959 44.19% 32.22% 60
HanLP 最短路径分词 415.63605 43.19% 31.28% 59
HanLP 极速词典分词 6735.1934 36.78% 25.10% 18
THULAC / 0.20857348 54.49% 43.79% 110
Stanford CTB 0.13520464 44.43% 33.25% 1101
Stanford PKU 0.12508623 45.15% 34.01% 1065

可以看到海量分词的行数完美率是最高的,而且速度十分优异;仅有的一个比海量分词速度快的算法是一个追求极限性能舍弃准确率的算法

4MB北大语料

词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1121.7269 85.94% 48.28% 85
ltp / 35.81329 87.37% 49.37% 201
IctClass 普通分词 226.11554 78.55% 42.04% 51
IctClass 细粒度分词 756.5135 59.06% 30.61% 51
Jieba SEARCH分词 957.52826 47.07% 20.01% 236
FudanNLP / 126.09879 58.54% 27.78% 99
HanLP 标准分词 369.66 65.46% 35.04% 63
HanLP NLP分词 439.75632 61.93% 31.37% 71
HanLP N-最短路径分词 223.30482 69.20% 35.07% 60
HanLP 最短路径分词 440.72244 67.74% 33.83% 59
HanLP 极速词典分词 7522.581 58.09% 27.82% 18

(注:THULAC和stanford由于速度问题,不纳入评测)

可以看到海量的速度和行数完美率都很优异而且达到了兼顾,行数完美率只落后更高的ltp算法1.4个百分点,速度却是它的三十多倍

9.4GB线上数据

分词器 分词模式 分词速度(字符/毫秒)
ltp / 33.592
海量 / 960.611
IctClass 普通分词 198.094
HanLP N-最短路径分词 201.735
HanLP 最短路径分词 425.482
HanLP 标准分词 473.400
HanLP NLP分词 361.842
IctClass 细粒度分词 689.183
FudanNLP / 120.860
HanLP 极速词典分词 6238.916
Jieba SEARCH分词 568.262

(注:THULAC和stanford由于速度问题,不纳入评测)

本表格中分词顺序按(4MB北大语料的)行数完美率进行排序,越靠前的(4MB北大语料的)行数完美率越高

可以看出海量的分词速度十分优秀,分词速度拉开了大多数分词数倍,相比于行数完美率小幅领先的ltp要快几十倍

海量分词插件使用方法

安装使用

  • 下载安装 - 地址: https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases

    unzip plugin to folder `your-es-root/plugins/`
  • 使用 elasticsearch-plugin 安装

    ./bin/elasticsearch-plugin install https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases/download/v6.4.2/elasticsearch-analysis-hlseg-6.4.2.zip
  • 重启es集群

实例(借用github-ik分词插件的实例)

1.创建index

curl -XPUT http://localhost:9200/hylanda_seg

2.配置mapping

curl -XPOST http://localhost:9200/hylanda_seg/data/_mapping -H 'Content-Type:application/json' -d'
{
  "properties": {
    "msg": {
      "type": "text",
      "analyzer": "hlseg_search"
    }
  }
}'

3.插入测试数据

curl -XPOST http://localhost:9200/hylanda_seg/data/1 -H 'Content-Type:application/json' -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/2 -H 'Content-Type:application/json' -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/3 -H 'Content-Type:application/json' -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/4 -H 'Content-Type:application/json' -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

4.查询

curl -XPOST http://localhost:9200/hylanda_seg/data/_search  -H 'Content-Type:application/json' -d'
{
  "query": {
    "match": {
      "content": "中国"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}
'

返回结果

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.5754429,
    "hits" : [
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "4",
        "_score" : 0.5754429,
        "_source" : {
          "content" : "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"
        },
        "highlight" : {
          "content" : [
            "中韩渔警冲突调查:韩警平均每天扣1艘<em>中国</em>渔船"
          ]
        }
      },
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "5",
        "_score" : 0.2876821,
        "_source" : {
          "content" : "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
        },
        "highlight" : {
          "content" : [
            "<em>中国</em>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
          ]
        }
      }
    ]
  }
}

字典配置

海量分词分为基础词词典CoreDict.dat和自定义词典userDict_utf8.txt。基础词词典在dictionary目录下,需要将CoreDict.zip解压后放在config目录下,可以通过修改config下的userDict_utf8.txt来更新自定义词典

自定义词典格式如下


1.用户自定义词典采用文本格式,utf-8编码,每行一个词

2.每个词包含三列属性,分别是词串、词的属性以及idf值的加权等级,并以Tab作为分隔,其中除了词串必填外,其他列可以不填,不填写则系统采用默认值

3.“#”表示注释,会在加载时被忽略

4.词的属性以西文逗号分隔,可以是词性、停止词标志或者自定义属性

5.词性标记参考北大标准,用于词性标注时参考,该项不填则默认为名词

6.停止词标志为:stopword,由SegOption.outputStopWord来控制是否输出停止词

7.自定义属性不参与分词过程,分词结果中若Token.userTag不为空,则可以获取到该词的自定义属性。

8.idf值的加权分5级,从低到高的定义是idf-lv1 — idf-lv5,等级越高则该词在关键词计算时的权重会越大,若不填写该值则系统默认是idf-lv3(中等权重)
收起阅读 »

Day 14 - 订单中心基于elasticsearch 的解决方案

       ElasticSearch分布式搜索储存集群的引入,主要是为了解决订单数据的存储与搜索的问题。

项目背景:
      15年去哪儿网酒店日均订单量达到30w+,随着多平台订单的聚合日均订单能达到100w左右。原来采用的热表分库方式,即将最近6个月的订单的放置在一张表中,将历史订单放在在history表中。history表存储全量的数据,当用户查询的下单时间跨度超过6个月即查询历史订单表,此分表方式热表的数据量为4000w左右,当时能解决的问题。但是显然不能满足携程艺龙订单接入的需求。如果继续按照热表方式,数据量将超过1亿条。全量数据表保存2年的可能就超过4亿的数据量。所以寻找有效途径解决此问题迫在眉睫。由于对这预计4亿的数据量还需按照预定日期、入住日期、离店日期、订单号、联系人姓名、电话、酒店名称、订单状态……等多个条件查询。所以简单按照某一个维度进行分表操作没有意义。ElasticSearch分布式搜索储存集群的引入,就是为了解决订单数据的存储与搜索的问题。

具体解决方案:

1、系统性能
        对订单模型进行抽象和分类,将常用搜索字段和基础属性字段剥离DB做分库分表。存储订单详情,ElasticSearch存储搜素字段。订单复杂查询直接走ElasticSearch。如下图:

elasticsearch1.png

       通用数据存储模型

elasticsearch2.png


关键字段
    ■ 业务核心字段,用于查询过滤
系统字段
    ■ version 避免高并发操作导致数据覆盖
大字段
    ■ order_data订单详情数据(JSON)
    ■ 可灵活需要索引的字段返回的字段

 
 
2、系统可用性
     系统可用性保障:双机房高可用如下图。

       
elasticsearch3.png

     数据可用性保障:
            一、异步多写保证数据一致性。

                
二、数据补充机制:
  1、每天凌晨task扫描数据库热表数据与es数据版本进行比较。
  2、将第三方推送过来数据中的,订单号即时插入订单同步队列表中。如果数据模型解析转换、持久化成功。删除队列中订单号。同时设置1分钟一次的task 扫描队列表。
  3、推送第三方的数据也采用同样的方式。保证给第三方数据的准确性。


elasticsearch4.png


3、系统伸缩性
      elasticSearch中索引设置了8个分片,目前Es单个索引的文档达到到1.4亿,合计达到2亿条数据占磁盘大小64G,集群机器磁盘容量240G。

 
 
 
继续阅读 »
       ElasticSearch分布式搜索储存集群的引入,主要是为了解决订单数据的存储与搜索的问题。

项目背景:
      15年去哪儿网酒店日均订单量达到30w+,随着多平台订单的聚合日均订单能达到100w左右。原来采用的热表分库方式,即将最近6个月的订单的放置在一张表中,将历史订单放在在history表中。history表存储全量的数据,当用户查询的下单时间跨度超过6个月即查询历史订单表,此分表方式热表的数据量为4000w左右,当时能解决的问题。但是显然不能满足携程艺龙订单接入的需求。如果继续按照热表方式,数据量将超过1亿条。全量数据表保存2年的可能就超过4亿的数据量。所以寻找有效途径解决此问题迫在眉睫。由于对这预计4亿的数据量还需按照预定日期、入住日期、离店日期、订单号、联系人姓名、电话、酒店名称、订单状态……等多个条件查询。所以简单按照某一个维度进行分表操作没有意义。ElasticSearch分布式搜索储存集群的引入,就是为了解决订单数据的存储与搜索的问题。

具体解决方案:

1、系统性能
        对订单模型进行抽象和分类,将常用搜索字段和基础属性字段剥离DB做分库分表。存储订单详情,ElasticSearch存储搜素字段。订单复杂查询直接走ElasticSearch。如下图:

elasticsearch1.png

       通用数据存储模型

elasticsearch2.png


关键字段
    ■ 业务核心字段,用于查询过滤
系统字段
    ■ version 避免高并发操作导致数据覆盖
大字段
    ■ order_data订单详情数据(JSON)
    ■ 可灵活需要索引的字段返回的字段

 
 
2、系统可用性
     系统可用性保障:双机房高可用如下图。

       
elasticsearch3.png

     数据可用性保障:
            一、异步多写保证数据一致性。

                
二、数据补充机制:
  1、每天凌晨task扫描数据库热表数据与es数据版本进行比较。
  2、将第三方推送过来数据中的,订单号即时插入订单同步队列表中。如果数据模型解析转换、持久化成功。删除队列中订单号。同时设置1分钟一次的task 扫描队列表。
  3、推送第三方的数据也采用同样的方式。保证给第三方数据的准确性。


elasticsearch4.png


3、系统伸缩性
      elasticSearch中索引设置了8个分片,目前Es单个索引的文档达到到1.4亿,合计达到2亿条数据占磁盘大小64G,集群机器磁盘容量240G。

 
 
  收起阅读 »

Day 11 -父子关系维护检索实战一 - Elasticsearch 5.x-父子关系维护

本次分享包括两篇文章
  • 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
  • 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战

本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
  1. Elasticsearch 5.x 中父子关系mapping结构设计
  2. Elasticsearch 5.x 中维护父子关系数据
  3. Elasticsearch 5.x 中has_child和has_parent查询的基本用法
  4. Elasticsearch 5.x 中如何在检索中同时返回父子数据

案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
parent.png


我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。

1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端

2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": { 
      "_parent": { "type": "basic" },
     .................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>

<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>

这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
	public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");

//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}


3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
  • 定义对象

首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
  • Basic
  • Medical
  • Exam
  • Diagnosis

通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}

  • 通过api维护测试数据

对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
	/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();

//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");

//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");

//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");

//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;

}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;

}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}

  • 通过json报文批量导入测试数据

除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
    <!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }

{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>

<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>

<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>







通过bboss提供的通用api,导入上面定义的数据:
	/**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");

clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
		
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法​
  • 根据父查子-通过客户名称信息查询客户端体检结果

在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
   <!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>

 执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
	/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
 
  • 根据子查父数据-通过医疗信息编码查找客户基本数据

在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
    <!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据
	/**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据
这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
  • 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息

在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
    <!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果
	/**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);

try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}

  •  根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录

在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
    <!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}

},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询:
	/**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象

try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());

}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能:
	@Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。
到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!

相关资料
完整demo工程  https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
 
开发交流
bboss交流群 166471282
bboss公众号
getqrcode.jpg

 
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检
继续阅读 »
本次分享包括两篇文章
  • 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
  • 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战

本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
  1. Elasticsearch 5.x 中父子关系mapping结构设计
  2. Elasticsearch 5.x 中维护父子关系数据
  3. Elasticsearch 5.x 中has_child和has_parent查询的基本用法
  4. Elasticsearch 5.x 中如何在检索中同时返回父子数据

案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
parent.png


我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。

1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端

2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": { 
      "_parent": { "type": "basic" },
     .................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>

<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>

这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
	public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");

//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}


3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
  • 定义对象

首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
  • Basic
  • Medical
  • Exam
  • Diagnosis

通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}

  • 通过api维护测试数据

对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
	/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();

//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");

//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");

//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");

//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;

}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;

}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}

  • 通过json报文批量导入测试数据

除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
    <!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }

{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>

<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>

<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>







通过bboss提供的通用api,导入上面定义的数据:
	/**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");

clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
		
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法​
  • 根据父查子-通过客户名称信息查询客户端体检结果

在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
   <!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>

 执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
	/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
 
  • 根据子查父数据-通过医疗信息编码查找客户基本数据

在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
    <!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据
	/**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据
这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
  • 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息

在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
    <!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果
	/**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);

try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}

  •  根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录

在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
    <!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}

},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询:
	/**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象

try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());

}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能:
	@Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。
到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!

相关资料
完整demo工程  https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
 
开发交流
bboss交流群 166471282
bboss公众号
getqrcode.jpg

 
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检 收起阅读 »

es创建索引失败

#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件
继续阅读 »
#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件 收起阅读 »

【 报名开启】2018 Elastic & 袋鼠云 & 阿里云技术沙龙(杭州)

互联网时代,十亿、百亿、千亿的数据日志呈井喷式增长,基于日志搜索分析的需求也越来越强烈。Elasticsearch 作为一个分布式、可扩展、实时的搜索与数据分析引擎, 在大体量的数据处理上,无论实在全文搜索,还是在结构化数据统计中,都有非常大的优势。然而在真正实践过程中海量数据如何高效采集,如何合理优化分配索引,如何规划集群,如何满足业务分析需求都是我们可能会面临的问题。

本次袋鼠云联合阿里云、Elastic 中文社区,共同邀请滴滴、有赞等行业技术专家一同分享和探讨各自领域Elastic的实践。
 
本次活动时间12月15日 周六人数限制100人,大家抓紧报名哈,报名链接https://meetup.elasticsearch.c ... .html

参与线下互动还有机会获得技术书籍与精美礼品哦!!!
不出意外,这应该是2018年Elastic在杭州的最后一次沙龙,小伙伴们抓紧今年的尾巴,不放过任何学习的机会哦!!!
继续阅读 »
互联网时代,十亿、百亿、千亿的数据日志呈井喷式增长,基于日志搜索分析的需求也越来越强烈。Elasticsearch 作为一个分布式、可扩展、实时的搜索与数据分析引擎, 在大体量的数据处理上,无论实在全文搜索,还是在结构化数据统计中,都有非常大的优势。然而在真正实践过程中海量数据如何高效采集,如何合理优化分配索引,如何规划集群,如何满足业务分析需求都是我们可能会面临的问题。

本次袋鼠云联合阿里云、Elastic 中文社区,共同邀请滴滴、有赞等行业技术专家一同分享和探讨各自领域Elastic的实践。
 
本次活动时间12月15日 周六人数限制100人,大家抓紧报名哈,报名链接https://meetup.elasticsearch.c ... .html

参与线下互动还有机会获得技术书籍与精美礼品哦!!!
不出意外,这应该是2018年Elastic在杭州的最后一次沙龙,小伙伴们抓紧今年的尾巴,不放过任何学习的机会哦!!! 收起阅读 »

关于es映射mapping中的enabled,store,index参数的理解

    因为刚才的一个问题,去看了一下es的文档中关于mapping参数的部分,发现了几个比较有意思的参数,index,store和enabled。下面简要说一下这几个参数的作用,有理解错和不足的地方希望大家指正。
enabled参数:
    默认是true。只用于mapping中的object字段类型。当设置为false时,其作用是使es不去解析该字段,并且该字段不能被查询和store,只有在_source中才能看到(即查询结果中会显示的_source数据)。设置enabled为false,可以不设置字段类型,默认为object
index参数:
    默认是true。当设置为false,表明该字段不能被查询,如果查询会报错。但是可以被store。当该文档被查出来时,在_source中也会显示出该字段。
store参数:
    默认false。store参数的功能和_source有一些相似。我们的数据默认都会在_source中存在。但我们也可以将数据store起来,不过大部分时候这个功能都很鸡肋。不过有一个例外,当我们使用copy_to参数时,copy_to的目标字段并不会在_source中存储,此时store就派上用场了。
三者能否同时存在:
    首先设置了enabled为false就不能设置store为true了,这两者冲突。而index和store是不冲突的。最后index和enabled之间的问题:enabled需要字段类型为object,而当字段类型为object时,好像不能设置index参数,试了几次都会报错。
PUT mindex/
{
"mappings": {
"type": {
"properties": {
"name": {
"type": "text",
"copy_to": "name_title",
"store": true,
"index": false
},
"title": {
"type": "text",
"copy_to": "name_title"
},
"name_title": {
"type": "text",
"store": true
},
"notenabled": {
"type": "object",
"enabled": false
}
}
}
}
}
PUT mindex/type/1
{
"name":"zz",
"title":"zxx",
"notenabled":"baby"
}
在搜索中使用了stored_fields之后,_source不会自己出现了,要手动指定字段。stored_fields里面不能出现notenabled。
GET /mindex/_search
{
"query": {
"match": {
"title": "zxx"
}
},
"stored_fields": ["name_title","title","name"],
"_source": ["name_title","title","name","notenabled"]
}
结果
"_source": {
"name": "zz",
"title": "zxx",
"notenabled": "baby"
},
"fields": {
"name": [
"zz"
],
"name_title": [
"zz",
"zxx"
]
}
看到结果,与上面的分析吻合。
继续阅读 »
    因为刚才的一个问题,去看了一下es的文档中关于mapping参数的部分,发现了几个比较有意思的参数,index,store和enabled。下面简要说一下这几个参数的作用,有理解错和不足的地方希望大家指正。
enabled参数:
    默认是true。只用于mapping中的object字段类型。当设置为false时,其作用是使es不去解析该字段,并且该字段不能被查询和store,只有在_source中才能看到(即查询结果中会显示的_source数据)。设置enabled为false,可以不设置字段类型,默认为object
index参数:
    默认是true。当设置为false,表明该字段不能被查询,如果查询会报错。但是可以被store。当该文档被查出来时,在_source中也会显示出该字段。
store参数:
    默认false。store参数的功能和_source有一些相似。我们的数据默认都会在_source中存在。但我们也可以将数据store起来,不过大部分时候这个功能都很鸡肋。不过有一个例外,当我们使用copy_to参数时,copy_to的目标字段并不会在_source中存储,此时store就派上用场了。
三者能否同时存在:
    首先设置了enabled为false就不能设置store为true了,这两者冲突。而index和store是不冲突的。最后index和enabled之间的问题:enabled需要字段类型为object,而当字段类型为object时,好像不能设置index参数,试了几次都会报错。
PUT mindex/
{
"mappings": {
"type": {
"properties": {
"name": {
"type": "text",
"copy_to": "name_title",
"store": true,
"index": false
},
"title": {
"type": "text",
"copy_to": "name_title"
},
"name_title": {
"type": "text",
"store": true
},
"notenabled": {
"type": "object",
"enabled": false
}
}
}
}
}
PUT mindex/type/1
{
"name":"zz",
"title":"zxx",
"notenabled":"baby"
}
在搜索中使用了stored_fields之后,_source不会自己出现了,要手动指定字段。stored_fields里面不能出现notenabled。
GET /mindex/_search
{
"query": {
"match": {
"title": "zxx"
}
},
"stored_fields": ["name_title","title","name"],
"_source": ["name_title","title","name","notenabled"]
}
结果
"_source": {
"name": "zz",
"title": "zxx",
"notenabled": "baby"
},
"fields": {
"name": [
"zz"
],
"name_title": [
"zz",
"zxx"
]
}
看到结果,与上面的分析吻合。 收起阅读 »

CentOS 7.4 下安装 ES 6.5.1 搜索集群

一、准备安装

1、修改系统 hosts

vi /etc/hosts   # 修改 hosts 文件,添加下面的内容

192.168.11.1    sky-00
192.168.11.2    sky-01
192.168.11.3    sky-02
192.168.11.4    sky-03
192.168.11.5    sky-04
192.168.11.6    sky-05
192.168.11.7    sky-06

2、角色分配

主机名 角色 内存分配
sky-00 Master 4G
sky-01 Master 8G
sky-02 Master+Data 12G
sky-03 Data 12G
sky-04 Data 12G
sky-05 Data 12G
sky-06 Data 12G

3、创建 ES 用户

adduser elastic  # 新增用户
passwd elastic   # 修改用户密码

4、创建 ES 数据和日志目录

cd /data/
mkdir elastic
cd elastic
mkdir data      # 创建数据目录
mkdir log       # 创建日志目录
chown -R elastic /data/elastic/  # 修改拥有着

5、调整文件句柄数以及可用进程数

Elasticsearch 要求其可用的文件句柄至少为 65536,同时要求其进程数限制至少为 2048,可用按照下面的指令进行修改。

分别对应以下两个报错信息:

  • max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
  • max number of threads [1024] for user [es] is too low, increase to at least [2048]
vi /etc/security/limits.conf

*     soft   nofile  100001
*     hard   nofile  100002
*     soft   nproc   4096
*     hard   nproc   8192
elastic soft memlock unlimited
elastic hard memlock unlimited

6、设置内核交换

为了避免不必要的磁盘和内存交换,影响效率,需要将 vm.swappiness 修改为 1(进行最少量的交换,而不禁用交换)或者 10(当系统存在足够内存时,推荐设置为该值以提高性能),其默认值为 60。

此外需要修改最大虚拟内存 vm.max_map_count 防止启动时报错:max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

vi /etc/sysctl.conf

vm.swappiness = 1
vm.max_map_count = 262144

7、下载安装文件

mkdir /opt/downloads/
mkdir /opt/soft/
cd /opt/downloads/

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.1.tar.gz
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.1-linux-x86_64.tar.gz
wget http://download.oracle.com/otn/java/jdk/xxxxxx/jdk-8u191-linux-x64.tar.gz

tar -zxvf elasticsearch-6.5.1.tar.gz -C /opt/soft/
tar -zxvf jdk-8u191-linux-x64.tar.gz -C /opt/soft/
tar -zxvf kibana-6.5.1-linux-x86_64.tar.gz -C /opt/soft/

chown -R elastic /opt/soft/elasticsearch-6.5.1/
chown -R elastic /opt/soft/kibana-6.5.1/

二、开始安装

1、配置 Java 环境

su elastic             #切换到 elastic 用户
vi ~/.bashrc          #只修改 elastic 用户自己的环境变量

export JAVA_HOME=/opt/soft/jdk1.8.0_191
export JRE_HOME=/opt/soft/jdk1.8.0_191/jre
export CLASSPATH=.:/opt/soft/jdk1.8.0_191/lib:/opt/soft/jdk1.8.0_191/jre/lib
export PATH=$PATH:/opt/soft/jdk1.8.0_191/bin:/opt/soft/jdk1.8.0_191/jre/bin

2、配置 ES 内存占用

cd /opt/soft/elasticsearch-6.5.1/config/
vi jvm.options 

-Xms4g      # 请根据自己机器配置调整
-Xmx4g

3、配置 Elasticsearch

下面的配置已经过多个生产环境验证,具体设置值仅供参考,请务必根据实际情况进行调整。

# ---------------------------------- Cluster -----------------------------------
#
# 设置集群名
cluster.name: cluster-name
#
# ------------------------------------ Node ------------------------------------
#
# 设置节点名
node.name: node01

# 设置角色
node.master: true   
node.data: false
node.ingest: true

# 设置机架信息
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# 设置数据路径
path.data: /data/elastic/data

# 设置日志路径
path.logs: /data/elastic/log
#
# ----------------------------------- Memory -----------------------------------
#
# 设置内存锁定
bootstrap.memory_lock: true
bootstrap.system_call_filter: false
#
# ---------------------------------- Network -----------------------------------
#
# 设置ip和端口
network.bind_host: sky-00
network.publish_host: 0.0.0.0
http.port: 9200

# 设置跨域访问
http.cors.enabled: true
http.cors.allow-origin: "*"
http.max_content_length: 500mb

# --------------------------------- Discovery ----------------------------------

# 设置zen发现范围(只需要填写主节点的 ip 即可)
discovery.zen.ping.unicast.hosts: ["sky-00", "sky-01", "sky-02"]

discovery.zen.no_master_block: write
discovery.zen.fd.ping_timeout: 10s

# 设置最小主节点个数,一般为:(master_node_count+1)/2
discovery.zen.minimum_master_nodes: 2

# ---------------------------------- Gateway -----------------------------------
#
# 设置在有4个节点后进行数据恢复
gateway.recover_after_nodes: 4
gateway.expected_nodes: 7
gateway.recover_after_time: 1m
#
# ---------------------------------- Various -----------------------------------
# 禁止通配符模式删除索引
action.destructive_requires_name: true

indices.recovery.max_bytes_per_sec: 200mb
indices.memory.index_buffer_size: 20%

# 默认开启全部类型脚本,可以通过下面配置进行限制
#script.allowed_types: inline
#script.allowed_contexts: search, update

# 关闭xpack的安全校验
xpack.security.enabled: false

# 开启 monitoring
xpack.monitoring.enabled: true
xpack.monitoring.collection.enabled: true

# 设置 monitoring 写入信息
xpack.monitoring.exporters:
  sky:
    type: http
    host: ["sky-02", "sky-03", "sky-04", "sky-05", "sky-06"]
    # 设置 monitoring 索引格式,默认是 YYYY-MM-DD(按天新建)
    index.name.time_format: YYYY-MM
    headers:
      # 设置 Basic 认证信息(详见插件安装部分说明)
      Authorization: "Basic XXXXXXXXXXXXXXX"

三、安装插件

1、安装插件

推荐安装的插件有:

  • IK 中文分词插件
  • Readonlyrest 安全认证插件
  • elasticsearch-head 集群监控管理插件(chrome 插件)

插件下载链接: https://pan.baidu.com/s/1r_322unsIjoWlhY8u7pkBA 提取码: aupq

使用下面命令即可安装。

$ES_HOME/bin/elasticsearch-plugin -install file:///data/downloads/elasticsearch-analysis-ik-6.5.1.zip
$ES_HOME/bin/elasticsearch-plugin -install file:///data/downloads/readonlyrest-1.16.29_es6.5.1.zip

2、配置 Readonlyrest 安全认证

下面只简单介绍 Readonlyrest 的 Basic 认证,更高级的用法可以去官方网站查看,在 ES 安装目录的 conf 目录下新建文件 readonlyrest.yml,并添加下面内容。

readonlyrest:
    access_control_rules:
    - name: "Require HTTP Basic Auth"
      type: allow
      auth_key: 用户名:密码

3、启动 ES

全部安装完成后,即可使用 elastic 用户启动 ES。

# 默认 ES 不支持 root 用户启动
su elastic
cd /opt/soft/elasticsearch-6.5.1/bin
./elasticsearch -d

四、在 Kibana 里面监控

在安装 ES 的时候,我们配置了 ES 的监控信息,这样我们就可以在 Kibana 中查看 ES 索引信息、node 信息等。

1、配置 Kibana

进入 Kibana 的解压目录下的 conf 文件夹,打开配置文件 kibana.yml

# 配置 kibana ui 的端口
server.port: 5601

# 配置 kibana 访问 ip
server.host: "0.0.0.0"

# 设置 ES 地址
elasticsearch.url: "http://sky-00:9200"

# dashboards. Kibana creates a new index if the index doesn't already exist.
#kibana.index: ".kibana"

# 打开 kibana 时默认页面
#kibana.defaultAppId: "home"

# ES Basic 认证信息
elasticsearch.username: "用户名"
elasticsearch.password: "密码"

# 设置时区信息
#i18n.locale: "en"

# 开启监控
xpack.monitoring.enabled: true

# 关闭 kibana 监控,默认为 true
xpack.monitoring.kibana.collection.enabled: false

2、对 Kibana 配置文件的说明

  • ES Basic 认证信息配置(在启动时对 Kibana 索引进行维护)完成后,登陆 kibana 时,依旧需要输入认证信息;
  • 由于 kibana 的 monitoring 无法设置新建的索引的索引名(无法配置 index.name.time_format),这样 kibana 每天会新建一个索引,由于 kibana 只是作为管理查看工具,因此关闭了 kibana 监控;
  • elasticsearch.url 该配置项无法设置多个 es 地址;如果你想实现类似负载均衡的功能,最简单的方法就是在 Kibana 机器上运行一个协调(Coordinating)节点。

3、监控界面

全部配置完成后,启动 kibana,打开 monitoring 即可开始监控 node、index 等。

使用 Kibana 监控节点状态

五、设置索引模板

具体请参考之前发布的文章基于 IK 分词器的 ES 通用索引模板


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

一、准备安装

1、修改系统 hosts

vi /etc/hosts   # 修改 hosts 文件,添加下面的内容

192.168.11.1    sky-00
192.168.11.2    sky-01
192.168.11.3    sky-02
192.168.11.4    sky-03
192.168.11.5    sky-04
192.168.11.6    sky-05
192.168.11.7    sky-06

2、角色分配

主机名 角色 内存分配
sky-00 Master 4G
sky-01 Master 8G
sky-02 Master+Data 12G
sky-03 Data 12G
sky-04 Data 12G
sky-05 Data 12G
sky-06 Data 12G

3、创建 ES 用户

adduser elastic  # 新增用户
passwd elastic   # 修改用户密码

4、创建 ES 数据和日志目录

cd /data/
mkdir elastic
cd elastic
mkdir data      # 创建数据目录
mkdir log       # 创建日志目录
chown -R elastic /data/elastic/  # 修改拥有着

5、调整文件句柄数以及可用进程数

Elasticsearch 要求其可用的文件句柄至少为 65536,同时要求其进程数限制至少为 2048,可用按照下面的指令进行修改。

分别对应以下两个报错信息:

  • max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
  • max number of threads [1024] for user [es] is too low, increase to at least [2048]
vi /etc/security/limits.conf

*     soft   nofile  100001
*     hard   nofile  100002
*     soft   nproc   4096
*     hard   nproc   8192
elastic soft memlock unlimited
elastic hard memlock unlimited

6、设置内核交换

为了避免不必要的磁盘和内存交换,影响效率,需要将 vm.swappiness 修改为 1(进行最少量的交换,而不禁用交换)或者 10(当系统存在足够内存时,推荐设置为该值以提高性能),其默认值为 60。

此外需要修改最大虚拟内存 vm.max_map_count 防止启动时报错:max virtual memory areas vm.max_map_count [65530] likely too low, increase to at least [262144]

vi /etc/sysctl.conf

vm.swappiness = 1
vm.max_map_count = 262144

7、下载安装文件

mkdir /opt/downloads/
mkdir /opt/soft/
cd /opt/downloads/

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.1.tar.gz
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.5.1-linux-x86_64.tar.gz
wget http://download.oracle.com/otn/java/jdk/xxxxxx/jdk-8u191-linux-x64.tar.gz

tar -zxvf elasticsearch-6.5.1.tar.gz -C /opt/soft/
tar -zxvf jdk-8u191-linux-x64.tar.gz -C /opt/soft/
tar -zxvf kibana-6.5.1-linux-x86_64.tar.gz -C /opt/soft/

chown -R elastic /opt/soft/elasticsearch-6.5.1/
chown -R elastic /opt/soft/kibana-6.5.1/

二、开始安装

1、配置 Java 环境

su elastic             #切换到 elastic 用户
vi ~/.bashrc          #只修改 elastic 用户自己的环境变量

export JAVA_HOME=/opt/soft/jdk1.8.0_191
export JRE_HOME=/opt/soft/jdk1.8.0_191/jre
export CLASSPATH=.:/opt/soft/jdk1.8.0_191/lib:/opt/soft/jdk1.8.0_191/jre/lib
export PATH=$PATH:/opt/soft/jdk1.8.0_191/bin:/opt/soft/jdk1.8.0_191/jre/bin

2、配置 ES 内存占用

cd /opt/soft/elasticsearch-6.5.1/config/
vi jvm.options 

-Xms4g      # 请根据自己机器配置调整
-Xmx4g

3、配置 Elasticsearch

下面的配置已经过多个生产环境验证,具体设置值仅供参考,请务必根据实际情况进行调整。

# ---------------------------------- Cluster -----------------------------------
#
# 设置集群名
cluster.name: cluster-name
#
# ------------------------------------ Node ------------------------------------
#
# 设置节点名
node.name: node01

# 设置角色
node.master: true   
node.data: false
node.ingest: true

# 设置机架信息
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# 设置数据路径
path.data: /data/elastic/data

# 设置日志路径
path.logs: /data/elastic/log
#
# ----------------------------------- Memory -----------------------------------
#
# 设置内存锁定
bootstrap.memory_lock: true
bootstrap.system_call_filter: false
#
# ---------------------------------- Network -----------------------------------
#
# 设置ip和端口
network.bind_host: sky-00
network.publish_host: 0.0.0.0
http.port: 9200

# 设置跨域访问
http.cors.enabled: true
http.cors.allow-origin: "*"
http.max_content_length: 500mb

# --------------------------------- Discovery ----------------------------------

# 设置zen发现范围(只需要填写主节点的 ip 即可)
discovery.zen.ping.unicast.hosts: ["sky-00", "sky-01", "sky-02"]

discovery.zen.no_master_block: write
discovery.zen.fd.ping_timeout: 10s

# 设置最小主节点个数,一般为:(master_node_count+1)/2
discovery.zen.minimum_master_nodes: 2

# ---------------------------------- Gateway -----------------------------------
#
# 设置在有4个节点后进行数据恢复
gateway.recover_after_nodes: 4
gateway.expected_nodes: 7
gateway.recover_after_time: 1m
#
# ---------------------------------- Various -----------------------------------
# 禁止通配符模式删除索引
action.destructive_requires_name: true

indices.recovery.max_bytes_per_sec: 200mb
indices.memory.index_buffer_size: 20%

# 默认开启全部类型脚本,可以通过下面配置进行限制
#script.allowed_types: inline
#script.allowed_contexts: search, update

# 关闭xpack的安全校验
xpack.security.enabled: false

# 开启 monitoring
xpack.monitoring.enabled: true
xpack.monitoring.collection.enabled: true

# 设置 monitoring 写入信息
xpack.monitoring.exporters:
  sky:
    type: http
    host: ["sky-02", "sky-03", "sky-04", "sky-05", "sky-06"]
    # 设置 monitoring 索引格式,默认是 YYYY-MM-DD(按天新建)
    index.name.time_format: YYYY-MM
    headers:
      # 设置 Basic 认证信息(详见插件安装部分说明)
      Authorization: "Basic XXXXXXXXXXXXXXX"

三、安装插件

1、安装插件

推荐安装的插件有:

  • IK 中文分词插件
  • Readonlyrest 安全认证插件
  • elasticsearch-head 集群监控管理插件(chrome 插件)

插件下载链接: https://pan.baidu.com/s/1r_322unsIjoWlhY8u7pkBA 提取码: aupq

使用下面命令即可安装。

$ES_HOME/bin/elasticsearch-plugin -install file:///data/downloads/elasticsearch-analysis-ik-6.5.1.zip
$ES_HOME/bin/elasticsearch-plugin -install file:///data/downloads/readonlyrest-1.16.29_es6.5.1.zip

2、配置 Readonlyrest 安全认证

下面只简单介绍 Readonlyrest 的 Basic 认证,更高级的用法可以去官方网站查看,在 ES 安装目录的 conf 目录下新建文件 readonlyrest.yml,并添加下面内容。

readonlyrest:
    access_control_rules:
    - name: "Require HTTP Basic Auth"
      type: allow
      auth_key: 用户名:密码

3、启动 ES

全部安装完成后,即可使用 elastic 用户启动 ES。

# 默认 ES 不支持 root 用户启动
su elastic
cd /opt/soft/elasticsearch-6.5.1/bin
./elasticsearch -d

四、在 Kibana 里面监控

在安装 ES 的时候,我们配置了 ES 的监控信息,这样我们就可以在 Kibana 中查看 ES 索引信息、node 信息等。

1、配置 Kibana

进入 Kibana 的解压目录下的 conf 文件夹,打开配置文件 kibana.yml

# 配置 kibana ui 的端口
server.port: 5601

# 配置 kibana 访问 ip
server.host: "0.0.0.0"

# 设置 ES 地址
elasticsearch.url: "http://sky-00:9200"

# dashboards. Kibana creates a new index if the index doesn't already exist.
#kibana.index: ".kibana"

# 打开 kibana 时默认页面
#kibana.defaultAppId: "home"

# ES Basic 认证信息
elasticsearch.username: "用户名"
elasticsearch.password: "密码"

# 设置时区信息
#i18n.locale: "en"

# 开启监控
xpack.monitoring.enabled: true

# 关闭 kibana 监控,默认为 true
xpack.monitoring.kibana.collection.enabled: false

2、对 Kibana 配置文件的说明

  • ES Basic 认证信息配置(在启动时对 Kibana 索引进行维护)完成后,登陆 kibana 时,依旧需要输入认证信息;
  • 由于 kibana 的 monitoring 无法设置新建的索引的索引名(无法配置 index.name.time_format),这样 kibana 每天会新建一个索引,由于 kibana 只是作为管理查看工具,因此关闭了 kibana 监控;
  • elasticsearch.url 该配置项无法设置多个 es 地址;如果你想实现类似负载均衡的功能,最简单的方法就是在 Kibana 机器上运行一个协调(Coordinating)节点。

3、监控界面

全部配置完成后,启动 kibana,打开 monitoring 即可开始监控 node、index 等。

使用 Kibana 监控节点状态

五、设置索引模板

具体请参考之前发布的文章基于 IK 分词器的 ES 通用索引模板


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

海量科技股份有限公司ES中文插件

海量分词是天津海量信息技术股份有限公司自主研发的中文分词核心,已于2018年7月将分词5.0版免费开放,欢迎试用。
 
海量分词演示界面 http://bigdata.hylanda.com/smartCenter2018/index

另外,海量提供免费API接口,文档详见http://bigdata.hylanda.com/smartCenter2018/doc,欢迎大家试用,如有疑问,请联系nlp@hylanda.com

Analyzer: hlseg_search , hlseg_large , hlseg_normal, Tokenizer: hlseg_search , hlseg_large , hlseg_normal
 
github地址:https://github.com/HylandaOpen ... ME.md
继续阅读 »
海量分词是天津海量信息技术股份有限公司自主研发的中文分词核心,已于2018年7月将分词5.0版免费开放,欢迎试用。
 
海量分词演示界面 http://bigdata.hylanda.com/smartCenter2018/index

另外,海量提供免费API接口,文档详见http://bigdata.hylanda.com/smartCenter2018/doc,欢迎大家试用,如有疑问,请联系nlp@hylanda.com

Analyzer: hlseg_search , hlseg_large , hlseg_normal, Tokenizer: hlseg_search , hlseg_large , hlseg_normal
 
github地址:https://github.com/HylandaOpen ... ME.md 收起阅读 »

ET007 ElasticStack 6.5 介绍

就在 11月14日,ElasticStack 6.5.0 发布了,此次发布带来了许多激动人心的特性,我们一起来体验一下:

WX20181118-120551@2x

如果没有任何数据,kibana会提示我们导入sample数据,这边我选择Try our sample data, 然后导入全部3个样例数据,这可以让我们在没有数据的情况下快速体验新特性。

Infrastructure & Logs UI

很多用户使用 ElasticStack 收集基础架构的日志和指标,比如系统日志、安全日志、CPU指标,内存指标等等。在6.5中,kibana 侧边栏中增加了 Infrastructure 和 Logs 两个新的 tab,让用户更简单地查看自己的基础架构,和每台主机或者容器里的日志。

logs

进入logs标签页,如果当前没有数据,kibana会引导我们添加数据

WX20181118-121032@2x

我们选择 system logs

WX20181118-121047@2x

根据指示,我们安装部署好filebeat并启动,再次进入 logs 标签页便可以看到收集到的系统日志了

image-20181118185158451

  1. 搜索过滤框:在这里可以像在 discover 里一样写query string,并且会有输入提示
  2. 时间选择框:可以选择需要查看的时间点,如果点了 Stream live,会持续监听尾部新输出的日志内容,类似 linux 命令中的tail -f
  3. 日志时间轴:高亮的部位是当前查看日志所在的时间范围,对应的区域图标识了日志量

假如我想实现 tail -f /var/log/system.log | grep google.com 一样的效果,可以打开 Stream live,并在搜索过滤框中这样输入:

WX20181118-173432@2x

很简单,很方便有木有?

Infrastructure

同样在kibana的引导下安装 Metric beat,并开启system模块,启动后进入 infrastructure 标签页:

image-20181118190614385

这里可以直观地看到所有基础架构的指标状况,深色的内层代表主机,颜色代表了健康状况。浅灰色的外层代表了group,因为我只在自己的笔记本上做了部署,所以只能看到一个host。

image-20181118191527060

点击主机会弹出菜单

  • View logs : 跳转到 logs 标签页,并通过搜索过滤框指定host,只查看这台主机的日志。
  • View metrics : 跳转到这台主机的指标详情,可以查看历史数据 shoot

APM

Java 和 Go

不负众望,继 Nodejs、Python、Ruby、Javascript 之后,Elastic APM 5.6.0 新增了对 Java 和 Golang 的支持!

Distributed Tracing

在 SOA 和 MSA 大行其道的年代,如何追踪请求在各个系统之间的流动成为了apm的关键问题。

Elastic APM 支持 OpenTracing 标准,并在各个agent里内置了 OpenTracing 兼容的bridge

以下是官网上该特性的截图:

distributed_tracing

APM Server 监控

如 ElasticStack的其他产品一般,APM也支持了监控,并可以在 Kinbana Montoring下查看监控信息:

apm_monitoring

APM Server 内存占用优化

通过新的基于NDJSON的协议,agent可以在采集信息后通过事件流立即发往APM server,这样 APM Server可以一个接一个地处理接收到的事件,而不是一次性地收到一大块(chunk),这样在很大程度上减少了APM Server的内存占用。

Elasticsearch

Cross-cluster replication

这里的副本并非我们平时常见的分片副本,而是通过在集群B配置一个副本indexB来追随集群A中的indexA,indexA中发生的任何变化都会同步到indexB中来。另外也可以配置一个pattern,当集群A出现符合pattern的索引,自动在集群B创建他的副本,这听起来很酷。值得一提的是,这将是白金版里新增的一个特性。

Minimal Snapshots

snapshot 是 es 中用来创建索引副本的特性,在之前的版本中,snapshot会把完整的 index 都保存下来,包括原始数据和索引数据等等。新的 Minimal Snapshots 提供了一种只备份 _source 内容和 index metadata,当需要恢复时,需要通过 reindex 操作来完成。最小快照最多可能帮你节省50%的磁盘占用,但是会花费更多的时间来恢复。这个特性可能并不适合所有人,但给恢复窗口比较长,且磁盘容量有限的用户多了一种选择。

SQL / ODBC

现在可以使用 支持 ODBC 的第三方工具来连接 elasticsearch 了!我想可以找时间试试用 tableau 直连 elasticsearch会是啥效果。

Java 11

Java11 是一个 LTS 版本,相信会有越来越多的用户升级到 java11

G1GC支持

经过无数的测试,Elasticsearch官方宣布了在 JDK 10+ 上支持 G1GC。G1GC 相比 CMS有诸多优势,如今可以放心地使用G1GC了。(期待对ZGC的支持!)

Authorization realm

X-Pack Security中的新特性,可以对用户认证和用户授权分别配置 realm,比如使用内置的用户体系来认证,再去ldap中获取用户的角色、权限等信息。这也是白金版新增的特性。

机器学习的新特性

  • 支持在同一个机器学习任务中分析多个时间系列
  • 为机器学习任务添加了新的多分桶(multi-bucket) 分析

Kibana

Canvas

Canvas ! 我在做数据分析师的同学看到之后说太酷了,像 PPT。

点击侧边栏的 canvas 标签,可以看到我们先前导入的样本数据也包含了 canvas 样例:

WX20181118-210126@2x

在 11月的 深圳开发者大会上,上海普翔 也用 canvas 对填写调查问卷的参会人员做了分析:

UNADJUSTEDNONRAW_thumb_1adc

https://github.com/alexfrancoeur/kibana_canvas_examples 这里有很多非常不错的 canvas 样例供大家学习,把json文件直接拖到 canvas 页面就可以导入学习了!

Spaces

把 kibana 对象(比如 visualizations、dashboards)组织到独立的 space 里,并且通过 RBAC 来控制哪些用户可以访问哪些 space。这实在是太棒了,想象在一个企业里,多个部门通过kibana查询、分析数据,大家关注的dashboard肯定是不一样的,在6.5之前,我们只能通过社区插件来实现这样的需求,而大版本的升级可能直接导致插件不可用,有了 Space,我们不必再担心!

image-20181118212404768

Rollups UI

Rollup 是 es6.4 中新增的一个特性,用来把一些历史数据压缩归档,用作以后的分析。6.5.0 中 kibana 增加了一个界面用来查看和管理 Rollup 任务。

image9

Data visualizer for files

通过可视化的方式查看文件的结构,查看其中出现最频繁的内容:

highlights_6_5_viz-logs

Beats

Beats Central Management

Beats 终于也支持中心化配置管理了!我们只需按照往常一样安装filebeat、metricbeat,然后使用 filebeat enroll <kibana-url> <token>,便可以通过kibana来管理beats的配置、甚至给他们打上tag:

Image from iOS

想一想,假如我们在上千台机器上部署filebeat,如果哪天需要批量变更配置文件,只需要通过脚本调用配置管理的API就可以了

Functionbeat

Functionbeat是一种新的beat类型,可以被部署为一个方法,而不需要跑在服务器环境上,比如 AWS Lambda function。

以上就是 6.5.0 版本的主要特性,更详细的内容可以查看 https://www.elastic.co/blog/elastic-stack-6-5-0-released ,希望通过我的介绍,可以让大家了解到新版本所带来的激动人心的特性。

Image from iOS

继续阅读 »

就在 11月14日,ElasticStack 6.5.0 发布了,此次发布带来了许多激动人心的特性,我们一起来体验一下:

WX20181118-120551@2x

如果没有任何数据,kibana会提示我们导入sample数据,这边我选择Try our sample data, 然后导入全部3个样例数据,这可以让我们在没有数据的情况下快速体验新特性。

Infrastructure & Logs UI

很多用户使用 ElasticStack 收集基础架构的日志和指标,比如系统日志、安全日志、CPU指标,内存指标等等。在6.5中,kibana 侧边栏中增加了 Infrastructure 和 Logs 两个新的 tab,让用户更简单地查看自己的基础架构,和每台主机或者容器里的日志。

logs

进入logs标签页,如果当前没有数据,kibana会引导我们添加数据

WX20181118-121032@2x

我们选择 system logs

WX20181118-121047@2x

根据指示,我们安装部署好filebeat并启动,再次进入 logs 标签页便可以看到收集到的系统日志了

image-20181118185158451

  1. 搜索过滤框:在这里可以像在 discover 里一样写query string,并且会有输入提示
  2. 时间选择框:可以选择需要查看的时间点,如果点了 Stream live,会持续监听尾部新输出的日志内容,类似 linux 命令中的tail -f
  3. 日志时间轴:高亮的部位是当前查看日志所在的时间范围,对应的区域图标识了日志量

假如我想实现 tail -f /var/log/system.log | grep google.com 一样的效果,可以打开 Stream live,并在搜索过滤框中这样输入:

WX20181118-173432@2x

很简单,很方便有木有?

Infrastructure

同样在kibana的引导下安装 Metric beat,并开启system模块,启动后进入 infrastructure 标签页:

image-20181118190614385

这里可以直观地看到所有基础架构的指标状况,深色的内层代表主机,颜色代表了健康状况。浅灰色的外层代表了group,因为我只在自己的笔记本上做了部署,所以只能看到一个host。

image-20181118191527060

点击主机会弹出菜单

  • View logs : 跳转到 logs 标签页,并通过搜索过滤框指定host,只查看这台主机的日志。
  • View metrics : 跳转到这台主机的指标详情,可以查看历史数据 shoot

APM

Java 和 Go

不负众望,继 Nodejs、Python、Ruby、Javascript 之后,Elastic APM 5.6.0 新增了对 Java 和 Golang 的支持!

Distributed Tracing

在 SOA 和 MSA 大行其道的年代,如何追踪请求在各个系统之间的流动成为了apm的关键问题。

Elastic APM 支持 OpenTracing 标准,并在各个agent里内置了 OpenTracing 兼容的bridge

以下是官网上该特性的截图:

distributed_tracing

APM Server 监控

如 ElasticStack的其他产品一般,APM也支持了监控,并可以在 Kinbana Montoring下查看监控信息:

apm_monitoring

APM Server 内存占用优化

通过新的基于NDJSON的协议,agent可以在采集信息后通过事件流立即发往APM server,这样 APM Server可以一个接一个地处理接收到的事件,而不是一次性地收到一大块(chunk),这样在很大程度上减少了APM Server的内存占用。

Elasticsearch

Cross-cluster replication

这里的副本并非我们平时常见的分片副本,而是通过在集群B配置一个副本indexB来追随集群A中的indexA,indexA中发生的任何变化都会同步到indexB中来。另外也可以配置一个pattern,当集群A出现符合pattern的索引,自动在集群B创建他的副本,这听起来很酷。值得一提的是,这将是白金版里新增的一个特性。

Minimal Snapshots

snapshot 是 es 中用来创建索引副本的特性,在之前的版本中,snapshot会把完整的 index 都保存下来,包括原始数据和索引数据等等。新的 Minimal Snapshots 提供了一种只备份 _source 内容和 index metadata,当需要恢复时,需要通过 reindex 操作来完成。最小快照最多可能帮你节省50%的磁盘占用,但是会花费更多的时间来恢复。这个特性可能并不适合所有人,但给恢复窗口比较长,且磁盘容量有限的用户多了一种选择。

SQL / ODBC

现在可以使用 支持 ODBC 的第三方工具来连接 elasticsearch 了!我想可以找时间试试用 tableau 直连 elasticsearch会是啥效果。

Java 11

Java11 是一个 LTS 版本,相信会有越来越多的用户升级到 java11

G1GC支持

经过无数的测试,Elasticsearch官方宣布了在 JDK 10+ 上支持 G1GC。G1GC 相比 CMS有诸多优势,如今可以放心地使用G1GC了。(期待对ZGC的支持!)

Authorization realm

X-Pack Security中的新特性,可以对用户认证和用户授权分别配置 realm,比如使用内置的用户体系来认证,再去ldap中获取用户的角色、权限等信息。这也是白金版新增的特性。

机器学习的新特性

  • 支持在同一个机器学习任务中分析多个时间系列
  • 为机器学习任务添加了新的多分桶(multi-bucket) 分析

Kibana

Canvas

Canvas ! 我在做数据分析师的同学看到之后说太酷了,像 PPT。

点击侧边栏的 canvas 标签,可以看到我们先前导入的样本数据也包含了 canvas 样例:

WX20181118-210126@2x

在 11月的 深圳开发者大会上,上海普翔 也用 canvas 对填写调查问卷的参会人员做了分析:

UNADJUSTEDNONRAW_thumb_1adc

https://github.com/alexfrancoeur/kibana_canvas_examples 这里有很多非常不错的 canvas 样例供大家学习,把json文件直接拖到 canvas 页面就可以导入学习了!

Spaces

把 kibana 对象(比如 visualizations、dashboards)组织到独立的 space 里,并且通过 RBAC 来控制哪些用户可以访问哪些 space。这实在是太棒了,想象在一个企业里,多个部门通过kibana查询、分析数据,大家关注的dashboard肯定是不一样的,在6.5之前,我们只能通过社区插件来实现这样的需求,而大版本的升级可能直接导致插件不可用,有了 Space,我们不必再担心!

image-20181118212404768

Rollups UI

Rollup 是 es6.4 中新增的一个特性,用来把一些历史数据压缩归档,用作以后的分析。6.5.0 中 kibana 增加了一个界面用来查看和管理 Rollup 任务。

image9

Data visualizer for files

通过可视化的方式查看文件的结构,查看其中出现最频繁的内容:

highlights_6_5_viz-logs

Beats

Beats Central Management

Beats 终于也支持中心化配置管理了!我们只需按照往常一样安装filebeat、metricbeat,然后使用 filebeat enroll <kibana-url> <token>,便可以通过kibana来管理beats的配置、甚至给他们打上tag:

Image from iOS

想一想,假如我们在上千台机器上部署filebeat,如果哪天需要批量变更配置文件,只需要通过脚本调用配置管理的API就可以了

Functionbeat

Functionbeat是一种新的beat类型,可以被部署为一个方法,而不需要跑在服务器环境上,比如 AWS Lambda function。

以上就是 6.5.0 版本的主要特性,更详细的内容可以查看 https://www.elastic.co/blog/elastic-stack-6-5-0-released ,希望通过我的介绍,可以让大家了解到新版本所带来的激动人心的特性。

Image from iOS

收起阅读 »

ELK 使用小技巧(第 2 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Filebeat :Non-zero metrics in the last 30s

  • 问题表现:Filebeat 无法向 Elasticsearch 发送日志数据;
  • 错误信息:INFO [monitoring] 1og/log.go:124 Non-zero metrics in the last 30s
  • 社区反馈:在 input 和 output 下面添加属性 enabled:true。
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log

output.elasticsearch:
  hosts: ["https://localhost:9200"]
  username: "filebeat_internal"
  password: "YOUR_PASSWORD"
  enabled: true

input 和 output 下 enabled 属性默认值为 true,因此怀疑另有其因。

2、Logstash 按月生成索引

output {
    if [type] == "typeA"{
        elasticsearch {
            hosts  => "127.0.0.1:9200"
            index => "log_%{+YYYY_MM}"
        }
    }
}

按照日的原理类似:%{+YYYY.MM.dd}

3、Filebeat 通过配置删除特定字段

Filebeat 实现了类似 Logstash 中 filter 的功能,叫做处理器(processors),processors 种类不多,尽可能在保持 Filebeat 轻量化的基础上提供更多常用的功能。

下面列几种常用的 processors:

  • add_cloud_metadata:添加云服务器的 meta 信息;
  • add_locale:添加本地时区;
  • decode_json_fields:解析并处理包含 Json 字符串的字段;
  • drop_event:丢弃符合条件的消息事件;
  • drop_fields:删除符合条件的字段;
  • include_fields:选择符合条件的字段;
  • rename:字段重命名;
  • add_kubernetes_metadata:添加 k8s 的 meta 信息;
  • add_docker_metadata:添加容器的 meta 信息;
  • add_host_metadata:添加操作系统的 meta 信息;
  • dissect:类似与 gork 的正则匹配字段的功能;
  • dns:配置 filebeat 独立的 dns 解析方式;
  • add_process_metadata:添加进程的元信息。

processors 的使用方式:

- type: <input_type>
  processors:
  - <processor_name>:
      when:
        <condition>
      <parameters>
...

4、LogStash 采集 FTP 日志文件

exec {
    codec => plain { }
    command => "curl ftp://server/logs.log"
    interval => 3000}
}

5、Logstash docker-compose 启动失败(Permission denied)

在 docker-compose 中使用 user 选项设置使用 root 用户启动 docker,能解决权限问题。

$ cat docker-compose.yml

version: '2'
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:6.4.2
    user: root
    command: id

6、Metricize filter plugin

将一条消息拆分为多条消息。

# 原始信息
{
    type => "type A"
    metric1 => "value1"
    metric2 => "value2"
}

# 配置信息
filter {
  metricize {
    metrics => [ "metric1", "metric2" ]
  }
}

# 最终输出
{                               {
    type => "type A"                type => "type A"
    metric => "metric1"             metric => "metric2"
    value => "value1"               value => "value2"
}                               }

二、Elasticsearch

1、ES 倒排索引内部结构

Lucene 的倒排索引都是按照字段(field)来存储对应的文档信息的,如果 docName 和 docContent 中有“苹果”这个 term,就会有这两个索引链,如下所示:

docName:
"苹果" -> "doc1, doc2, doc3..."

docContent:
"苹果" -> "doc2, doc4, doc6..."

2、Jest 和 RestHighLevelClient 哪个好用点

RestHighLevelClient 是官方组件,会一直得到官方的支持,且会与 ES 保持同步更新,推荐使用官方的高阶 API。

Jest 由于是社区维护,所以更新会有一定延迟,目前最新版对接 ES6.3.1,近一个月只有四个 issue,说明整体活跃度较低,因此不推荐使用。

此外推荐一份 TransportClient 的中文使用手册,翻译的很不错:https://github.com/jackiehff/elasticsearch-client-java-api-cn

3、ES 单分片使用 From/Size 分页遇到重复数据

常规情况下 ES 单分片使用 From/Size 是不会遇到数据重复的,数据重复的可能原因有:

  • 没有添加排序;
  • 添加了按得分排序,但是查询语句全部为 filter 过滤条件(此时得分都一致);
  • 添加了排序,但是有索引中文档的新增、修改、删除等操作。

对于多分片,推荐添加 preference 参数来实现分页结果的一致性。

4、The number of object passed must be even but was [1]

ES 在调用 setSource 的时候传入 Json 对象后会报错:The number of object passed must be even but was [1],此时可以推荐将 Json 对象转为 Map 集合,或者把 Json 对象转为 json 字符串,不过传入字符串的时候需要设置类型。

IndexRequest indexRequest = new IndexRequest("index", "type", "id");
JSONObject doc = new JSONObject();
//indexRequest.source(jsonObject); 错误的使用方法
//转为 Map 对象
indexRequest.source(JSONObject.parseObject((String) doc.get("json"), Map.class));
//转为 Json 字符串(声明字符串类型)
indexRequest.source(JSON.toJSONString(doc), XContentType.JSON);

5、跨集群搜索

ES 6.X 原生支持跨集群搜索,具体配置请参考:https://www.elastic.co/guide/en/kibana/current/management-cross-cluster-search.html

PUT _cluster/settings
{
  "persistent": {
    "cluster": {
      "remote": {
        "cluster_one": {
          "seeds": [
            "127.0.0.1:9300"
          ]
        },
        "cluster_two": {
          "seeds": [
            "127.0.0.1:9301"
          ]
        },
        "cluster_three": {
          "seeds": [
            "127.0.0.1:9302"
          ]
        }
      }
    }
  }
}

ES 6.5 推出了新功能,跨集群同步(Cross-cluster replication),感兴趣的可以自行了解。

6、ES 排序时设置空值排序位置

GET /_search
{
    "sort" : [
        { "price" : {"missing" : "_last"} }
    ],
    "query" : {
        "term" : { "product" : "chocolate" }
    }
}

7、ES 冷归档数据如何处理

使用相对低配的大磁盘机器配置为 ES 的 Warm Nodes,可以通过 index.routing.allocation.require.box_type 来设置索引是冷数据或者热数据。如果索引极少使用,可以 close 索引,然后在需要搜索的时候 open 即可。

8、ES 相似文章检测

对于大文本的去重,可以参考 SimHash 算法,通过 SimHash 可以提取到文档指纹(64位),两篇文章通过 SimHash 计算海明距离即可判断是否重复。海明距离计算,可以通过插件实现:https://github.com/joway/elasticsearch-hamming-plugin

9、Terms 聚合查询优化

  • 如果只需要聚合后前 N 条记录,推荐在 Terms 聚合时添加上 "collect_mode": "breadth_first"
  • 此外可以通过设置 "min_doc_count": 10来限制最小匹配文档数;
  • 如果对返回的 Term 有所要求,可以通过设置 includeexclude 来过滤 Term;
  • 如果想获取全部 Term 聚合结果,但是聚合结果又很多,可以考虑将聚合分成多个批次分别取回(Filtering Values with partitions)。

10、Tomcat 字符集造成的 ES 查询无结果

两个系统连接同一个 ES 服务,配置和代码完全一致,同一个搜索条件,一个能够搜索出来东西,一个什么都搜索不出来,排查结果是因为其中一个系统的 tomcat 配置有问题,导致请求的时候乱码了,所以搜不到数据。

11、ES 索引设置默认分词器

默认情况下,如果字段不指定分词器,ES 或使用 standard 分词器进行分词;可以通过下面的设置更改默认的分词器。

2.X 支持设置默认的索引分词器(default_index)和默认的查询分词器(default_search),6.X 已经不再支持。

PUT /index
{
  "settings": {
    "analysis": {
      "analyzer": {
        "default": {
          "type": "ik_max_word",
          "tokenizer": "ik_max_word"
        }
      }
    }
  }
}

12、ES 中的魔法参数

  • 索引名:_index
  • 类型名:_type
  • 文档Id:_id
  • 得分:_score
  • 索引排序:_doc

如果你对排序没有特别的需求,推荐使用 _doc 进行排序,例如执行 Scroll 操作时。

13、ES 延迟执行数据上卷(Rollup )

Rollup job 有个 delay 参数控制 job 执行的延迟时间,默认情况下不延迟执行,这样如果某个 interval 的数据已经聚合好了,该 interval 迟到的数据是不会处理的。

好在 rollup api 可以支持同时搜索裸索引和 rollup 过的索引,所以如果数据经常有延迟的话,可以考虑设置一个合适的 delay,比如 1h、6h 甚至 24h,这样 rollup 的索引产生会有延迟,但是能确保迟到的数据被处理。

从应用场景上看,rollup 一般是为了对历史数据做聚合存放,减少存储空间,所以延迟几个小时,甚至几天都是合理的。搜索的时候,同时搜索最近的裸索引和历史的 rollup 索引,就能将两者的数据组合起来,在给出正确的聚合结果的情况下,又兼顾了性能。

Rollup 是实验性功能,不过非常有用,特别是使用 ES 做数据仓库的场景。

14、ES6.x 获取所有的聚合结果

ES2.x 版本中,在聚合查询时,通过设置 setSize(0) 就可以获取所有的聚合结果,在ES6.x 中直接设置 setSize(Integer.MAX_VALUE) 等效于 2.x 中设置为 0。

15、ES Jar 包冲突问题

经常会遇到 ES 与业务集成时出现 Jar 包冲突问题,推荐的解决方法是使用 maven-shade-plugin 插件,该插件通过将冲突的 Jar 包更换一个命名空间的方式来解决 Jar 包的冲突问题,具体使用可以参考文章:https://www.jianshu.com/p/d9fb7afa634d

<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.1</version>
        <configuration>
            <createDependencyReducedPom>false</createDependencyReducedPom>
        </configuration>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <relocations>
                        <relocation>
                            <pattern>com.google.guava</pattern>
                            <shadedPattern>net.luculent.elasticsearch.guava</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.fasterxml.jackson</pattern>
                            <shadedPattern>net.luculent.elasticsearch.jackson</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>org.joda</pattern>
                            <shadedPattern>net.luculent.elasticsearch.joda</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.google.common</pattern>
                            <shadedPattern>net.luculent.elasticsearch.common</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.google.thirdparty</pattern>
                            <shadedPattern>net.luculent.elasticsearch.thirdparty</shadedPattern>
                        </relocation>
                    </relocations>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>

16、ES 如何选择 Shard 存储文档?

ES 采用 djb2 哈希算法对要索引文档的指定(或者默认随机生成的)_id 进行哈希,得到哈希结果后对索引 shard 数目 n 取模,公式如下:hash(_id) % n;根据取模结果决定存储到哪一个 shard 。

三、Kibana

1、在 Kiabana 的 Discovery 界面显示自定义字段

Kibana 的 Discovery 界面默认只显示 time 和 _source 两个字段,这个界面的左半部分,在 Popular 下面展示了很多,你只需要在你需要展示的字段后面点击 add 即可将自定义的字段添加到 discovery 界面。

在 Kiabana 的 Discovery 界面显示自定义字段

2、filebeat 的 monitor 指标的说明

  • Total:'All events newly created in the publishing pipeline'
  • Emitted: 'Events processed by the output (including retries)'
  • Acknowledged:'Events acknowledged by the output (includes events dropped by the output)'
  • Queued:'Events added to the event pipeline queue'

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Filebeat :Non-zero metrics in the last 30s

  • 问题表现:Filebeat 无法向 Elasticsearch 发送日志数据;
  • 错误信息:INFO [monitoring] 1og/log.go:124 Non-zero metrics in the last 30s
  • 社区反馈:在 input 和 output 下面添加属性 enabled:true。
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/*.log

output.elasticsearch:
  hosts: ["https://localhost:9200"]
  username: "filebeat_internal"
  password: "YOUR_PASSWORD"
  enabled: true

input 和 output 下 enabled 属性默认值为 true,因此怀疑另有其因。

2、Logstash 按月生成索引

output {
    if [type] == "typeA"{
        elasticsearch {
            hosts  => "127.0.0.1:9200"
            index => "log_%{+YYYY_MM}"
        }
    }
}

按照日的原理类似:%{+YYYY.MM.dd}

3、Filebeat 通过配置删除特定字段

Filebeat 实现了类似 Logstash 中 filter 的功能,叫做处理器(processors),processors 种类不多,尽可能在保持 Filebeat 轻量化的基础上提供更多常用的功能。

下面列几种常用的 processors:

  • add_cloud_metadata:添加云服务器的 meta 信息;
  • add_locale:添加本地时区;
  • decode_json_fields:解析并处理包含 Json 字符串的字段;
  • drop_event:丢弃符合条件的消息事件;
  • drop_fields:删除符合条件的字段;
  • include_fields:选择符合条件的字段;
  • rename:字段重命名;
  • add_kubernetes_metadata:添加 k8s 的 meta 信息;
  • add_docker_metadata:添加容器的 meta 信息;
  • add_host_metadata:添加操作系统的 meta 信息;
  • dissect:类似与 gork 的正则匹配字段的功能;
  • dns:配置 filebeat 独立的 dns 解析方式;
  • add_process_metadata:添加进程的元信息。

processors 的使用方式:

- type: <input_type>
  processors:
  - <processor_name>:
      when:
        <condition>
      <parameters>
...

4、LogStash 采集 FTP 日志文件

exec {
    codec => plain { }
    command => "curl ftp://server/logs.log"
    interval => 3000}
}

5、Logstash docker-compose 启动失败(Permission denied)

在 docker-compose 中使用 user 选项设置使用 root 用户启动 docker,能解决权限问题。

$ cat docker-compose.yml

version: '2'
services:
  logstash:
    image: docker.elastic.co/logstash/logstash:6.4.2
    user: root
    command: id

6、Metricize filter plugin

将一条消息拆分为多条消息。

# 原始信息
{
    type => "type A"
    metric1 => "value1"
    metric2 => "value2"
}

# 配置信息
filter {
  metricize {
    metrics => [ "metric1", "metric2" ]
  }
}

# 最终输出
{                               {
    type => "type A"                type => "type A"
    metric => "metric1"             metric => "metric2"
    value => "value1"               value => "value2"
}                               }

二、Elasticsearch

1、ES 倒排索引内部结构

Lucene 的倒排索引都是按照字段(field)来存储对应的文档信息的,如果 docName 和 docContent 中有“苹果”这个 term,就会有这两个索引链,如下所示:

docName:
"苹果" -> "doc1, doc2, doc3..."

docContent:
"苹果" -> "doc2, doc4, doc6..."

2、Jest 和 RestHighLevelClient 哪个好用点

RestHighLevelClient 是官方组件,会一直得到官方的支持,且会与 ES 保持同步更新,推荐使用官方的高阶 API。

Jest 由于是社区维护,所以更新会有一定延迟,目前最新版对接 ES6.3.1,近一个月只有四个 issue,说明整体活跃度较低,因此不推荐使用。

此外推荐一份 TransportClient 的中文使用手册,翻译的很不错:https://github.com/jackiehff/elasticsearch-client-java-api-cn

3、ES 单分片使用 From/Size 分页遇到重复数据

常规情况下 ES 单分片使用 From/Size 是不会遇到数据重复的,数据重复的可能原因有:

  • 没有添加排序;
  • 添加了按得分排序,但是查询语句全部为 filter 过滤条件(此时得分都一致);
  • 添加了排序,但是有索引中文档的新增、修改、删除等操作。

对于多分片,推荐添加 preference 参数来实现分页结果的一致性。

4、The number of object passed must be even but was [1]

ES 在调用 setSource 的时候传入 Json 对象后会报错:The number of object passed must be even but was [1],此时可以推荐将 Json 对象转为 Map 集合,或者把 Json 对象转为 json 字符串,不过传入字符串的时候需要设置类型。

IndexRequest indexRequest = new IndexRequest("index", "type", "id");
JSONObject doc = new JSONObject();
//indexRequest.source(jsonObject); 错误的使用方法
//转为 Map 对象
indexRequest.source(JSONObject.parseObject((String) doc.get("json"), Map.class));
//转为 Json 字符串(声明字符串类型)
indexRequest.source(JSON.toJSONString(doc), XContentType.JSON);

5、跨集群搜索

ES 6.X 原生支持跨集群搜索,具体配置请参考:https://www.elastic.co/guide/en/kibana/current/management-cross-cluster-search.html

PUT _cluster/settings
{
  "persistent": {
    "cluster": {
      "remote": {
        "cluster_one": {
          "seeds": [
            "127.0.0.1:9300"
          ]
        },
        "cluster_two": {
          "seeds": [
            "127.0.0.1:9301"
          ]
        },
        "cluster_three": {
          "seeds": [
            "127.0.0.1:9302"
          ]
        }
      }
    }
  }
}

ES 6.5 推出了新功能,跨集群同步(Cross-cluster replication),感兴趣的可以自行了解。

6、ES 排序时设置空值排序位置

GET /_search
{
    "sort" : [
        { "price" : {"missing" : "_last"} }
    ],
    "query" : {
        "term" : { "product" : "chocolate" }
    }
}

7、ES 冷归档数据如何处理

使用相对低配的大磁盘机器配置为 ES 的 Warm Nodes,可以通过 index.routing.allocation.require.box_type 来设置索引是冷数据或者热数据。如果索引极少使用,可以 close 索引,然后在需要搜索的时候 open 即可。

8、ES 相似文章检测

对于大文本的去重,可以参考 SimHash 算法,通过 SimHash 可以提取到文档指纹(64位),两篇文章通过 SimHash 计算海明距离即可判断是否重复。海明距离计算,可以通过插件实现:https://github.com/joway/elasticsearch-hamming-plugin

9、Terms 聚合查询优化

  • 如果只需要聚合后前 N 条记录,推荐在 Terms 聚合时添加上 "collect_mode": "breadth_first"
  • 此外可以通过设置 "min_doc_count": 10来限制最小匹配文档数;
  • 如果对返回的 Term 有所要求,可以通过设置 includeexclude 来过滤 Term;
  • 如果想获取全部 Term 聚合结果,但是聚合结果又很多,可以考虑将聚合分成多个批次分别取回(Filtering Values with partitions)。

10、Tomcat 字符集造成的 ES 查询无结果

两个系统连接同一个 ES 服务,配置和代码完全一致,同一个搜索条件,一个能够搜索出来东西,一个什么都搜索不出来,排查结果是因为其中一个系统的 tomcat 配置有问题,导致请求的时候乱码了,所以搜不到数据。

11、ES 索引设置默认分词器

默认情况下,如果字段不指定分词器,ES 或使用 standard 分词器进行分词;可以通过下面的设置更改默认的分词器。

2.X 支持设置默认的索引分词器(default_index)和默认的查询分词器(default_search),6.X 已经不再支持。

PUT /index
{
  "settings": {
    "analysis": {
      "analyzer": {
        "default": {
          "type": "ik_max_word",
          "tokenizer": "ik_max_word"
        }
      }
    }
  }
}

12、ES 中的魔法参数

  • 索引名:_index
  • 类型名:_type
  • 文档Id:_id
  • 得分:_score
  • 索引排序:_doc

如果你对排序没有特别的需求,推荐使用 _doc 进行排序,例如执行 Scroll 操作时。

13、ES 延迟执行数据上卷(Rollup )

Rollup job 有个 delay 参数控制 job 执行的延迟时间,默认情况下不延迟执行,这样如果某个 interval 的数据已经聚合好了,该 interval 迟到的数据是不会处理的。

好在 rollup api 可以支持同时搜索裸索引和 rollup 过的索引,所以如果数据经常有延迟的话,可以考虑设置一个合适的 delay,比如 1h、6h 甚至 24h,这样 rollup 的索引产生会有延迟,但是能确保迟到的数据被处理。

从应用场景上看,rollup 一般是为了对历史数据做聚合存放,减少存储空间,所以延迟几个小时,甚至几天都是合理的。搜索的时候,同时搜索最近的裸索引和历史的 rollup 索引,就能将两者的数据组合起来,在给出正确的聚合结果的情况下,又兼顾了性能。

Rollup 是实验性功能,不过非常有用,特别是使用 ES 做数据仓库的场景。

14、ES6.x 获取所有的聚合结果

ES2.x 版本中,在聚合查询时,通过设置 setSize(0) 就可以获取所有的聚合结果,在ES6.x 中直接设置 setSize(Integer.MAX_VALUE) 等效于 2.x 中设置为 0。

15、ES Jar 包冲突问题

经常会遇到 ES 与业务集成时出现 Jar 包冲突问题,推荐的解决方法是使用 maven-shade-plugin 插件,该插件通过将冲突的 Jar 包更换一个命名空间的方式来解决 Jar 包的冲突问题,具体使用可以参考文章:https://www.jianshu.com/p/d9fb7afa634d

<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.4.1</version>
        <configuration>
            <createDependencyReducedPom>false</createDependencyReducedPom>
        </configuration>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <relocations>
                        <relocation>
                            <pattern>com.google.guava</pattern>
                            <shadedPattern>net.luculent.elasticsearch.guava</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.fasterxml.jackson</pattern>
                            <shadedPattern>net.luculent.elasticsearch.jackson</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>org.joda</pattern>
                            <shadedPattern>net.luculent.elasticsearch.joda</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.google.common</pattern>
                            <shadedPattern>net.luculent.elasticsearch.common</shadedPattern>
                        </relocation>
                        <relocation>
                            <pattern>com.google.thirdparty</pattern>
                            <shadedPattern>net.luculent.elasticsearch.thirdparty</shadedPattern>
                        </relocation>
                    </relocations>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" />
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>
</plugins>

16、ES 如何选择 Shard 存储文档?

ES 采用 djb2 哈希算法对要索引文档的指定(或者默认随机生成的)_id 进行哈希,得到哈希结果后对索引 shard 数目 n 取模,公式如下:hash(_id) % n;根据取模结果决定存储到哪一个 shard 。

三、Kibana

1、在 Kiabana 的 Discovery 界面显示自定义字段

Kibana 的 Discovery 界面默认只显示 time 和 _source 两个字段,这个界面的左半部分,在 Popular 下面展示了很多,你只需要在你需要展示的字段后面点击 add 即可将自定义的字段添加到 discovery 界面。

在 Kiabana 的 Discovery 界面显示自定义字段

2、filebeat 的 monitor 指标的说明

  • Total:'All events newly created in the publishing pipeline'
  • Emitted: 'Events processed by the output (including retries)'
  • Acknowledged:'Events acknowledged by the output (includes events dropped by the output)'
  • Queued:'Events added to the event pipeline queue'

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

当Elasticsearch遇见Kafka--Kafka Connect

本文同步发布在腾讯云+社区Elasticsearch专栏中:https://cloud.tencent.com/developer/column/4008
在“当Elasticsearch遇见Kafka--Logstash kafka input插件”一文中,我对Logstash的Kafka input插件进行了简单的介绍,并通过实际操作的方式,为大家呈现了使用该方式实现Kafka与Elastisearch整合的基本过程。可以看出使用Logstash input插件的方式,具有配置简单,数据处理方便等优点。然而使用Logstash Kafka插件并不是Kafka与Elsticsearch整合的唯一方案,另一种比较常见的方案是使用Kafka的开源组件Kafka Connect。

Confluent实现Kafka与Elasticsearch的连接

1 Kafka Connect简介

Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

Kafka connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)

Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)

关于Kafka Connect的详细情况可以参考[Kafka Connect]

2 使用Kafka Connect连接Kafka和Elasticsearch

2.1 测试环境准备

本文与使用Logstash Kafka input插件环境一样[传送门],组件列表如下

服务 ip port
Elasticsearch service 192.168.0.8 9200
Ckafka 192.168.13.10 9092
CVM 192.168.0.13 -

kafka topic也复用原来了的kafka_es_test

2.2 Kafka Connect 安装

[Kafka Connec下载地址]

本文下载的为开源版本confluent-oss-5.0.1-2.11.tar.gz,下载后解压

2.3 Worker配置

1) 配置参考

如前文所说,worker分为Standalone和Distributed两种模式,针对两种模式的配置,参考如下

[通用配置]

[Standalone Woker配置]

[Distributed Worker配置]

此处需要注意的是Kafka Connect默认使用AvroConverter,使用该AvroConverter时需要注意必须启动Schema Registry服务

2) 实际操作

本测试使用standalone模式,因此修改/root/confluent-5.0.1/etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=192.168.13.10:9092

2.4 Elasticsearch Connector配置

1) 配置参考

[Connectors通用配置]

[Elasticsearch Configuration Options]

2) 实际操作

修改/root/confluent-5.0.1/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=kafka_es_test
key.ignore=true
connection.url=http://192.168.0.8:9200
type.name=kafka-connect

注意: 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

2.5 启动connector

1 注意事项

1) 由于配置文件中jar包位置均采用的相对路径,因此建议在confluent根目录下执行命令和启动程序,以避免不必要的问题

2) 如果前面没有修改converter,仍采用AvroConverter, 注意需要在启动connertor前启动Schema Registry服务

2 启动Schema Registry服务

正如前文所说,由于在配置worker时指定使用了AvroConverter,因此需要启动Schema Registry服务。而该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。由于CKafka不支持用户通过接口形式创建topic,因此需要在本机起一个kafka以创建名为_schema的topic。

1) 启动Zookeeper

./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

2) 启动kafka

./bin/kafka-server-start -daemon etc/kafka/server.properties

3) 启动schema Registry

./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

4) 使用netstat -natpl 查看各服务端口是否正常启动

zookeeper 2181

kafka 9092

schema registry 8081

3 启动Connector

./bin/connect-standalone -daemon  etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

ps:以上启动各服务均可在logs目录下找到对应日志

2.6 启动Kafka Producer

由于我们采用的是AvroConverter,因此不能采用Kafka工具包中的producer。Kafka Connector bin目录下提供了Avro Producer

1) 启动Producer

./bin/kafka-avro-console-producer --broker-list 192.168.13.10:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'

2) 输入如下数据

{"nickname":"michel"}
{"nickname":"mushao"}

2.7 Kibana验证结果

1) 查看索引

在kibana Dev Tools的Console中输入

GET _cat/indices

结果

green open kafka_es_test 36QtDP6vQOG7ubOa161wGQ 5 1 1 0 7.9kb 3.9kb
green open .kibana       QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb

可以看到名为kafka_es_test的索引被成功创建

2) 查看数据

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+0",
        "_score": 1,
        "_source": {
          "nickname": "michel"
        }
      },
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+1",
        "_score": 1,
        "_source": {
          "nickname": "mushao"
        }
      }
    ]
  }
}

可以看到数据已经被成功写入

3 Confluent CLI

3.1 简介

查阅资料时发现很多文章都是使用Confluent CLI启动Kafka Connect,然而官方文档已经明确说明了该CLI只是适用于开发阶段,不能用于生产环境。

它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务。但是这些服务对于Kafka Connect都不是必须的,如果不使用AvroConverter,则只需要启动Connect即可。即使使用了AvroConverter, 也只需要启动schema registry,将schema保存在远端的kafka中。Kafka Connect REST API也只是为用户提供一个管理connector的接口,也不是必选的。

另外使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置

3.2 使用Confluent CLI

confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等,详情参考如下简介视频 [Introducing the Confluent CLI | Screencast]

1) 启动

./bin/confluent start

2) 检查confluent运行状态

./bin/confluent status

当得到如下结果则说明confluent启动成功

ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

3) 问题定位

如果第二步出现问题,可以使用log命令查看,如connect未启动成功则

./bin/confluent log connect

4) 加载Elasticsearch Connector

a) 查看connector

./bin/confluent list connectors

结果

Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink

b) 加载Elasticsearch connector

./bin/confluent load elasticsearch-sink

结果

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "kafka_es_test",
        "key.ignore": "true",
        "connection.url": "http://192.168.0.8:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
    },
    "tasks": [],
    "type": null
}

5) 使用producer生产数据,并使用kibana验证是否写入成功

4 Kafka Connect Rest API

Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

1) GET connectors 获取运行中的connector列表

2) POST connectors 使用指定的名称和配置创建connector

3) GET connectors/(string:name) 获取connector的详细信息

4) GET connectors/(string:name)/config 获取connector的配置

5) PUT connectors/(string:name)/config 设置connector的配置

6) GET connectors/(string:name)/status 获取connector状态

7) POST connectors/(stirng:name)/restart 重启connector

8) PUT connectors/(string:name)/pause 暂停connector

9) PUT connectors/(string:name)/resume 恢复connector

10) DELETE connectors/(string:name)/ 删除connector

11) GET connectors/(string:name)/tasks 获取connectors任务列表

12) GET /connectors/(string: name)/tasks/(int: taskid)/status 获取任务状态

13) POST /connectors/(string: name)/tasks/(int: taskid)/restart 重启任务

14) GET /connector-plugins/ 获取已安装插件列表

15) PUT /connector-plugins/(string: name)/config/validate 验证配置

5 总结

Kafka Connect是Kafka一个功能强大的组件,为kafka提供了与外部系统连接的一套完整方案,包括数据传输,连接管理,监控,多副本等。相对于Logstash Kafka插件,功能更为全面,但配置也相对为复杂些。有文章提到其性能也优于Logstash Kafka Input插件,如果对写入性能比较敏感的场景,可以在实际压测的基础上进行选择。另外由于直接将数据从Kafka写入Elasticsearch, 如果需要对文档进行处理时,选择Logstash可能更为方便。

继续阅读 »

本文同步发布在腾讯云+社区Elasticsearch专栏中:https://cloud.tencent.com/developer/column/4008
在“当Elasticsearch遇见Kafka--Logstash kafka input插件”一文中,我对Logstash的Kafka input插件进行了简单的介绍,并通过实际操作的方式,为大家呈现了使用该方式实现Kafka与Elastisearch整合的基本过程。可以看出使用Logstash input插件的方式,具有配置简单,数据处理方便等优点。然而使用Logstash Kafka插件并不是Kafka与Elsticsearch整合的唯一方案,另一种比较常见的方案是使用Kafka的开源组件Kafka Connect。

Confluent实现Kafka与Elasticsearch的连接

1 Kafka Connect简介

Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。

Kafka connect分为企业版和开源版,企业版在开源版的基础之上提供了监控,负载均衡,副本等功能,实际生产环境中建议使用企业版。(本测试使用开源版)

Kafka connect workers有两种工作模式,单机模式和分布式模式。在开发和适合使用单机模式的场景下,可以使用standalone模式, 在实际生产环境下由于单个worker的数据压力会比较大,distributed模式对负载均和和扩展性方面会有很大帮助。(本测试使用standalone模式)

关于Kafka Connect的详细情况可以参考[Kafka Connect]

2 使用Kafka Connect连接Kafka和Elasticsearch

2.1 测试环境准备

本文与使用Logstash Kafka input插件环境一样[传送门],组件列表如下

服务 ip port
Elasticsearch service 192.168.0.8 9200
Ckafka 192.168.13.10 9092
CVM 192.168.0.13 -

kafka topic也复用原来了的kafka_es_test

2.2 Kafka Connect 安装

[Kafka Connec下载地址]

本文下载的为开源版本confluent-oss-5.0.1-2.11.tar.gz,下载后解压

2.3 Worker配置

1) 配置参考

如前文所说,worker分为Standalone和Distributed两种模式,针对两种模式的配置,参考如下

[通用配置]

[Standalone Woker配置]

[Distributed Worker配置]

此处需要注意的是Kafka Connect默认使用AvroConverter,使用该AvroConverter时需要注意必须启动Schema Registry服务

2) 实际操作

本测试使用standalone模式,因此修改/root/confluent-5.0.1/etc/schema-registry/connect-avro-standalone.properties

bootstrap.servers=192.168.13.10:9092

2.4 Elasticsearch Connector配置

1) 配置参考

[Connectors通用配置]

[Elasticsearch Configuration Options]

2) 实际操作

修改/root/confluent-5.0.1/etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=kafka_es_test
key.ignore=true
connection.url=http://192.168.0.8:9200
type.name=kafka-connect

注意: 其中topics不仅对应Kafka的topic名称,同时也是Elasticsearch的索引名,当然也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射

2.5 启动connector

1 注意事项

1) 由于配置文件中jar包位置均采用的相对路径,因此建议在confluent根目录下执行命令和启动程序,以避免不必要的问题

2) 如果前面没有修改converter,仍采用AvroConverter, 注意需要在启动connertor前启动Schema Registry服务

2 启动Schema Registry服务

正如前文所说,由于在配置worker时指定使用了AvroConverter,因此需要启动Schema Registry服务。而该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。由于CKafka不支持用户通过接口形式创建topic,因此需要在本机起一个kafka以创建名为_schema的topic。

1) 启动Zookeeper

./bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

2) 启动kafka

./bin/kafka-server-start -daemon etc/kafka/server.properties

3) 启动schema Registry

./bin/schema-registry-start -daemon etc/schema-registry/schema-registry.properties

4) 使用netstat -natpl 查看各服务端口是否正常启动

zookeeper 2181

kafka 9092

schema registry 8081

3 启动Connector

./bin/connect-standalone -daemon  etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

ps:以上启动各服务均可在logs目录下找到对应日志

2.6 启动Kafka Producer

由于我们采用的是AvroConverter,因此不能采用Kafka工具包中的producer。Kafka Connector bin目录下提供了Avro Producer

1) 启动Producer

./bin/kafka-avro-console-producer --broker-list 192.168.13.10:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'

2) 输入如下数据

{"nickname":"michel"}
{"nickname":"mushao"}

2.7 Kibana验证结果

1) 查看索引

在kibana Dev Tools的Console中输入

GET _cat/indices

结果

green open kafka_es_test 36QtDP6vQOG7ubOa161wGQ 5 1 1 0 7.9kb 3.9kb
green open .kibana       QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kb

可以看到名为kafka_es_test的索引被成功创建

2) 查看数据

{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 2,
    "max_score": 1,
    "hits": [
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+0",
        "_score": 1,
        "_source": {
          "nickname": "michel"
        }
      },
      {
        "_index": "kafka_es_test",
        "_type": "kafka-connect",
        "_id": "kafka_es_test+0+1",
        "_score": 1,
        "_source": {
          "nickname": "mushao"
        }
      }
    ]
  }
}

可以看到数据已经被成功写入

3 Confluent CLI

3.1 简介

查阅资料时发现很多文章都是使用Confluent CLI启动Kafka Connect,然而官方文档已经明确说明了该CLI只是适用于开发阶段,不能用于生产环境。

它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务。但是这些服务对于Kafka Connect都不是必须的,如果不使用AvroConverter,则只需要启动Connect即可。即使使用了AvroConverter, 也只需要启动schema registry,将schema保存在远端的kafka中。Kafka Connect REST API也只是为用户提供一个管理connector的接口,也不是必选的。

另外使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置

3.2 使用Confluent CLI

confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等,详情参考如下简介视频 [Introducing the Confluent CLI | Screencast]

1) 启动

./bin/confluent start

2) 检查confluent运行状态

./bin/confluent status

当得到如下结果则说明confluent启动成功

ksql-server is [UP]
connect is [UP]
kafka-rest is [UP]
schema-registry is [UP]
kafka is [UP]
zookeeper is [UP]

3) 问题定位

如果第二步出现问题,可以使用log命令查看,如connect未启动成功则

./bin/confluent log connect

4) 加载Elasticsearch Connector

a) 查看connector

./bin/confluent list connectors

结果

Bundled Predefined Connectors (edit configuration under etc/):
elasticsearch-sink
file-source
file-sink
jdbc-source
jdbc-sink
hdfs-sink
s3-sink

b) 加载Elasticsearch connector

./bin/confluent load elasticsearch-sink

结果

{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "kafka_es_test",
        "key.ignore": "true",
        "connection.url": "http://192.168.0.8:9200",
        "type.name": "kafka-connect",
        "name": "elasticsearch-sink"
    },
    "tasks": [],
    "type": null
}

5) 使用producer生产数据,并使用kibana验证是否写入成功

4 Kafka Connect Rest API

Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作

1) GET connectors 获取运行中的connector列表

2) POST connectors 使用指定的名称和配置创建connector

3) GET connectors/(string:name) 获取connector的详细信息

4) GET connectors/(string:name)/config 获取connector的配置

5) PUT connectors/(string:name)/config 设置connector的配置

6) GET connectors/(string:name)/status 获取connector状态

7) POST connectors/(stirng:name)/restart 重启connector

8) PUT connectors/(string:name)/pause 暂停connector

9) PUT connectors/(string:name)/resume 恢复connector

10) DELETE connectors/(string:name)/ 删除connector

11) GET connectors/(string:name)/tasks 获取connectors任务列表

12) GET /connectors/(string: name)/tasks/(int: taskid)/status 获取任务状态

13) POST /connectors/(string: name)/tasks/(int: taskid)/restart 重启任务

14) GET /connector-plugins/ 获取已安装插件列表

15) PUT /connector-plugins/(string: name)/config/validate 验证配置

5 总结

Kafka Connect是Kafka一个功能强大的组件,为kafka提供了与外部系统连接的一套完整方案,包括数据传输,连接管理,监控,多副本等。相对于Logstash Kafka插件,功能更为全面,但配置也相对为复杂些。有文章提到其性能也优于Logstash Kafka Input插件,如果对写入性能比较敏感的场景,可以在实际压测的基础上进行选择。另外由于直接将数据从Kafka写入Elasticsearch, 如果需要对文档进行处理时,选择Logstash可能更为方便。

收起阅读 »