ES6.* join类型的可行性

ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过
继续阅读 »
ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过 收起阅读 »

百度智能云招聘ElasticSearch研发工程师

工作职责:
  • 负责ElasticSearch相关产品的设计、研发、维护
  • 与相关大数据产品制定大数据解决方案
  •  

职责要求:
  • 211或985大学本科毕业,计算机相关专业
  • 熟悉Java/Go语言开发,熟悉常见的数据结构与算法
  • 掌握网络编程和多线程开发
  • 对ElasticSearch、Lucene、Slor的原理有较深入了解。
  • 有对开源组件有过二次开发经验者优先
  • 由社区贡献经验者优先

联系方式:shouchunbai@baidu.com
 
继续阅读 »
工作职责:
  • 负责ElasticSearch相关产品的设计、研发、维护
  • 与相关大数据产品制定大数据解决方案
  •  

职责要求:
  • 211或985大学本科毕业,计算机相关专业
  • 熟悉Java/Go语言开发,熟悉常见的数据结构与算法
  • 掌握网络编程和多线程开发
  • 对ElasticSearch、Lucene、Slor的原理有较深入了解。
  • 有对开源组件有过二次开发经验者优先
  • 由社区贡献经验者优先

联系方式:shouchunbai@baidu.com
  收起阅读 »

ELK 使用小技巧(第 5 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、'reader' unacceptable character ' ' (0x0)

logstash 执行使用 Jdbc input plugin 后报错:

[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {
:exception=>#<Psych::SyntaxError: (): 'reader' unacceptable character ' ' (0x0) special characters are not allowed in "'reader'", 
position 0 at line 0 column 0>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:232:in parse'"

解决方案:删除 $USER_HOME/.logstash_jdbc_last_run 文件即可。

二、Elasticsearch

1、TermsQuery 与多个 TermQuery 的区别

当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:

Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);

当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。

当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。

2、ES 借助 nginx 配置域名

upstream /data/ {
    server 192.168.187.xxx:9200;
    keepalive 300 ;
}

server {
    listen 80;
    server_name testelk.xx.com;
    keepalive_timeout 120s 120s;
    location /data {
        proxy_pass http://data/;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_pass_header remote_user 
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Nginx-Proxy true;
    }
}

3、ES Reindex 时如何不停止写入服务

方案一:kennywu76

ES 的 reindex 在索引有实时的 update/delete 的情况下,即使借助 alias,也没有办法实现真正的 zero down time。

增加新文档比较好办,通过 alias 切换写入到新索引,同时 reindex 做旧->新索引的数据传输即可;但是 update/delete 操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。

我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似 last_update_time 字段记录文档最后更新的时间,用作写入 ES 文档的版本号,然后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx

其中 version_type=external_gt 表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,否则会抛版本冲突的异常。另外 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档

这样实时数据写入新索引和 reindex 可以同时进行,实时写入的数据应该具有更高的版本,总是能够成功,reindex 如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。

该方案的缺陷:

  • 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改;
  • delete 操作必须转化为写入一个空文档,delete 实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了 segment merge,delete 可能会被合并以后物理清除。这样 delete 和对应的版本信息丢失,之后 reindex 如果写入了旧版本的文档,仍然会有一致性问题;但是空文档会增加索引文件的大小,有额外的消耗,一个可能的缓解办法是在 reindex 全部做完以后,再做一次空文档的删除。

改进方案:the_best

重建索引步骤如下:

  1. 保证 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档;
  2. 对老索引 old_index(业务上的别名还是挂在老索引上)进行重索引操作(version_type=external);
    curl -X POST 'http://<hostname>:9200/_reindex'
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index",
        "size": 1000
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  3. 将别名切到 newIndex;
  4. 将重索引时间段内 old_index 产生的热数据,再捞一次到 new_index 中(conflicts=proceed&version_type=external);
    curl -X POST /_reindex
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index"
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : <reindex开始时刻前的毫秒时间戳>
                        }
                    }
                }
            }
        }
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  5. 手动做一次空文档的删除。

这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过我们可以视具体业务情况灵活操作。比如说数据量比较大重索引我们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,我们可以按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几次,直到别名切完后,我们能保证最后一次的步骤(4)可以在较短时间内完成。

4、ES 节点通讯配置

http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1

5、把 Lucene 的原生 query 传给 ES

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy, 
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q); 
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();

6、ES 文档字段个数限制

ES 文档默认不允许文档字段超过 1000,超过 1000 会报如下错误:

failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded

可以通过修改索引配置来修改字段个数限制,不过还是推荐从业务上进行优化:

修改settings
{
  "index.mapping.total_fields.limit": 2000
}

7、将 DSL 字符串转换为 QueryBuilder

## wrapper 案例
GET /_search
{
    "query" : {
        "wrapper": {
            "query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0=" 
        }
    }
}

## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")

8、ES 集群重启后 Slice Scroll 速度变慢

重启机器之后,pagecache 都没有了,所有数据都要重新从磁盘加载。

9、ES 开启索引新建删除日志

PUT _cluster/settings
{
  "persistent": {
    "logger.cluster.service": "DEBUG"
  }
}

10、慢日志全局级别设定

  1. 对已经存在的索引可以通过 PUT _settings 做存量设置
  2. 对之后新增的索引,可以使用类似于下面的template
    PUT _template/global-slowlog_template
    {
    "order": -1,
    "version": 0,
    "template": "*",
    "settings": {
        "index.indexing.slowlog.threshold.index.debug" : "10ms",
        "index.indexing.slowlog.threshold.index.info" : "50ms",
        "index.indexing.slowlog.threshold.index.warn" : "100ms",
        "index.search.slowlog.threshold.fetch.debug" : "100ms",
        "index.search.slowlog.threshold.fetch.info" : "200ms",
        "index.search.slowlog.threshold.fetch.warn" : "500ms",
        "index.search.slowlog.threshold.query.debug" : "100ms",
        "index.search.slowlog.threshold.query.info" : "200ms",
        "index.search.slowlog.threshold.query.warn" : "1s"
    }
    }

11、TCP 设置多个端口的用途

transport.tcp.port 这个参数不写,默认为 9300-9399,开放那么多 端口有用么?

  • 如果设置一个端口,假设这个端口占用了程序就无法正常启动;
  • 如果设置多个端口,一个端口占用会寻找下一个端口,直至找到可用端口。

12、ES 临时重启,设置分片延迟分配策略

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

三、Kibana

1、kibana 图表自定义标注

可以用 TSVB,支持标注。

Kibana TSVB 注解的使用:https://elasticsearch.cn/article/701

2、Kibana discover 导出 csv 文件

请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV

3、修改 kibana 的默认主页

https://elasticsearch.cn/article/6335

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、'reader' unacceptable character ' ' (0x0)

logstash 执行使用 Jdbc input plugin 后报错:

[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {
:exception=>#<Psych::SyntaxError: (): 'reader' unacceptable character ' ' (0x0) special characters are not allowed in "'reader'", 
position 0 at line 0 column 0>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:232:in parse'"

解决方案:删除 $USER_HOME/.logstash_jdbc_last_run 文件即可。

二、Elasticsearch

1、TermsQuery 与多个 TermQuery 的区别

当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:

Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);

当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。

当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。

2、ES 借助 nginx 配置域名

upstream /data/ {
    server 192.168.187.xxx:9200;
    keepalive 300 ;
}

server {
    listen 80;
    server_name testelk.xx.com;
    keepalive_timeout 120s 120s;
    location /data {
        proxy_pass http://data/;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_pass_header remote_user 
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Nginx-Proxy true;
    }
}

3、ES Reindex 时如何不停止写入服务

方案一:kennywu76

ES 的 reindex 在索引有实时的 update/delete 的情况下,即使借助 alias,也没有办法实现真正的 zero down time。

增加新文档比较好办,通过 alias 切换写入到新索引,同时 reindex 做旧->新索引的数据传输即可;但是 update/delete 操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。

我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似 last_update_time 字段记录文档最后更新的时间,用作写入 ES 文档的版本号,然后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx

其中 version_type=external_gt 表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,否则会抛版本冲突的异常。另外 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档

这样实时数据写入新索引和 reindex 可以同时进行,实时写入的数据应该具有更高的版本,总是能够成功,reindex 如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。

该方案的缺陷:

  • 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改;
  • delete 操作必须转化为写入一个空文档,delete 实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了 segment merge,delete 可能会被合并以后物理清除。这样 delete 和对应的版本信息丢失,之后 reindex 如果写入了旧版本的文档,仍然会有一致性问题;但是空文档会增加索引文件的大小,有额外的消耗,一个可能的缓解办法是在 reindex 全部做完以后,再做一次空文档的删除。

改进方案:the_best

重建索引步骤如下:

  1. 保证 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档;
  2. 对老索引 old_index(业务上的别名还是挂在老索引上)进行重索引操作(version_type=external);
    curl -X POST 'http://<hostname>:9200/_reindex'
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index",
        "size": 1000
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  3. 将别名切到 newIndex;
  4. 将重索引时间段内 old_index 产生的热数据,再捞一次到 new_index 中(conflicts=proceed&version_type=external);
    curl -X POST /_reindex
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index"
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : <reindex开始时刻前的毫秒时间戳>
                        }
                    }
                }
            }
        }
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  5. 手动做一次空文档的删除。

这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过我们可以视具体业务情况灵活操作。比如说数据量比较大重索引我们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,我们可以按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几次,直到别名切完后,我们能保证最后一次的步骤(4)可以在较短时间内完成。

4、ES 节点通讯配置

http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1

5、把 Lucene 的原生 query 传给 ES

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy, 
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q); 
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();

6、ES 文档字段个数限制

ES 文档默认不允许文档字段超过 1000,超过 1000 会报如下错误:

failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded

可以通过修改索引配置来修改字段个数限制,不过还是推荐从业务上进行优化:

修改settings
{
  "index.mapping.total_fields.limit": 2000
}

7、将 DSL 字符串转换为 QueryBuilder

## wrapper 案例
GET /_search
{
    "query" : {
        "wrapper": {
            "query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0=" 
        }
    }
}

## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")

8、ES 集群重启后 Slice Scroll 速度变慢

重启机器之后,pagecache 都没有了,所有数据都要重新从磁盘加载。

9、ES 开启索引新建删除日志

PUT _cluster/settings
{
  "persistent": {
    "logger.cluster.service": "DEBUG"
  }
}

10、慢日志全局级别设定

  1. 对已经存在的索引可以通过 PUT _settings 做存量设置
  2. 对之后新增的索引,可以使用类似于下面的template
    PUT _template/global-slowlog_template
    {
    "order": -1,
    "version": 0,
    "template": "*",
    "settings": {
        "index.indexing.slowlog.threshold.index.debug" : "10ms",
        "index.indexing.slowlog.threshold.index.info" : "50ms",
        "index.indexing.slowlog.threshold.index.warn" : "100ms",
        "index.search.slowlog.threshold.fetch.debug" : "100ms",
        "index.search.slowlog.threshold.fetch.info" : "200ms",
        "index.search.slowlog.threshold.fetch.warn" : "500ms",
        "index.search.slowlog.threshold.query.debug" : "100ms",
        "index.search.slowlog.threshold.query.info" : "200ms",
        "index.search.slowlog.threshold.query.warn" : "1s"
    }
    }

11、TCP 设置多个端口的用途

transport.tcp.port 这个参数不写,默认为 9300-9399,开放那么多 端口有用么?

  • 如果设置一个端口,假设这个端口占用了程序就无法正常启动;
  • 如果设置多个端口,一个端口占用会寻找下一个端口,直至找到可用端口。

12、ES 临时重启,设置分片延迟分配策略

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

三、Kibana

1、kibana 图表自定义标注

可以用 TSVB,支持标注。

Kibana TSVB 注解的使用:https://elasticsearch.cn/article/701

2、Kibana discover 导出 csv 文件

请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV

3、修改 kibana 的默认主页

https://elasticsearch.cn/article/6335

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

ClusterBlockException SERVICE_UNAVAILABLE/1/state

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
继续阅读 »

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
收起阅读 »

Hive 与 ElasticSearch 的数据交互

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

自研基于StanfordNLP的ES分词插件

为ES构建Stanford NLP分词插件

Stanford NLP?

Stanford分词器是斯坦福大学NLP团队维护的一个开源分词器,支持了包括中文、英文…的语言,而且除了分词之外,它还支持了包括词性分析、情感分析…的各种功能。\ 这俩是这个project的项目主页

Why Stanford core NLP?

  市面上确实会有很多很有名的开源分词器,比如IK、Jieba,还有一些其他团队和公司提供的开源/商用的分词器,他们各有优劣。但是在各种分词器上比较了一大堆的分词case之后,我们发现Stanford NLP似乎是最适合我们当前需求的一个,因为我们不仅仅需要分词,还需要一些包括情感分析之类在内的更多的一些功能。

我们公司是做金融数据的搜索推荐的,在对比了各家分词器之后我们老板觉得Stanford NLP的效果最好,但是作为算法出身的人,他实现了一套非常重的分词、排序、搜索的服务。

在对比如研报、财报之类的信息进行搜索的时候确实会比较有效,但是在对经济类的新闻进行搜索的时候就会显得十分的笨重。

基于这个背景,我开始试图在ES里面引入老板推崇的Stanford 分词器来适应他的搜索、分词的需要,同时也能够不通过他那个笨重的分词排序服务来对我们系统中大量的经济、金融类的新闻进行分词、索引,并提供和他自己分词效果类似的分词和检索服务。

Why this project

我在包括百度、某谷姓404网站、GitHub以及国内的中文社区(Elastic中文社区)在内的各种地方搜过也问过了,但是似乎没有一个直接开箱可用的分词插件。所以,我只剩一条路了,就是搭建一个自己的插件来引用这个分词器。

How

对ES来说,插件主要分为两个部分:

  1. 让ES可以看到的部分(class extends Plugin)
  2. 自己行使职能的部分(functional part)

plugin

  1. 为了让ES可以加载我们的plugin,我们需要先继承Plugin类,然后我们这个是个分词器插件,所以还要实现AnalysisPlugin类
  2. 看过ES源码或者其他分词器源码的同学应该会知道,分词器插件需要实现两个方法,一个用来提供tokenizer,一个是analyzer分别对应分词器中的这俩。
    • 重写Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>>是为了可以提供搜索用分词器
    • 重写Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>>是为了可以提供索引用分词器
  3. 在这个分词器里面我们主要是依靠Tokenizer来实现分词的

functional class

分词器,特别是Tokenizer主要是靠重写三个方法来实现分词的

  1. incrementToken:用来确定每一个词元,输出每一个单词(字)以及它的位置、长度等
  2. reset:用来重制分词结果
  3. end:用来告诉ES,这段文本的分词已经结束了

所以我们主要需要重写的就是这仨方法,当然了,为了能让分词器正确的使用,我们还需要添加一些分词器的配置和初始化的内容,具体代码不写了可以参考我的git,主要讲两个坑:

  1. ES是通过配置文件里的路径来寻找对应的插件类
  2. 然后通过配置文件里的key和刚才提到的代码里的key来寻找对应的分词器,所以这俩地方不要写错了 #plugin-descriptor.properties: classname=org.elasticsearch.plugin.analysis.AnalysisSDPlugin #plugin-descriptor.properties: name=stanford-core-nlp
  3. 在开发过程中由于有java-security的存在,所以需要通过AccessController来调用和加载我们需要的外部jar包

odds and ends

  1. Stanford分词器里面包含了很多功能,目前我使用了分词的部分
  2. 分词器自带词典文件,不过如果要做词典的修改可能需要解包,修改,再重新打包
  3. 我现在hardcode了一大堆的标点符号在里面,后面可能会去优化一下部分逻辑
  4. 待完成的功能还有其他功能包括情感分析之类的

also see

GitHub 地址

继续阅读 »

为ES构建Stanford NLP分词插件

Stanford NLP?

Stanford分词器是斯坦福大学NLP团队维护的一个开源分词器,支持了包括中文、英文…的语言,而且除了分词之外,它还支持了包括词性分析、情感分析…的各种功能。\ 这俩是这个project的项目主页

Why Stanford core NLP?

  市面上确实会有很多很有名的开源分词器,比如IK、Jieba,还有一些其他团队和公司提供的开源/商用的分词器,他们各有优劣。但是在各种分词器上比较了一大堆的分词case之后,我们发现Stanford NLP似乎是最适合我们当前需求的一个,因为我们不仅仅需要分词,还需要一些包括情感分析之类在内的更多的一些功能。

我们公司是做金融数据的搜索推荐的,在对比了各家分词器之后我们老板觉得Stanford NLP的效果最好,但是作为算法出身的人,他实现了一套非常重的分词、排序、搜索的服务。

在对比如研报、财报之类的信息进行搜索的时候确实会比较有效,但是在对经济类的新闻进行搜索的时候就会显得十分的笨重。

基于这个背景,我开始试图在ES里面引入老板推崇的Stanford 分词器来适应他的搜索、分词的需要,同时也能够不通过他那个笨重的分词排序服务来对我们系统中大量的经济、金融类的新闻进行分词、索引,并提供和他自己分词效果类似的分词和检索服务。

Why this project

我在包括百度、某谷姓404网站、GitHub以及国内的中文社区(Elastic中文社区)在内的各种地方搜过也问过了,但是似乎没有一个直接开箱可用的分词插件。所以,我只剩一条路了,就是搭建一个自己的插件来引用这个分词器。

How

对ES来说,插件主要分为两个部分:

  1. 让ES可以看到的部分(class extends Plugin)
  2. 自己行使职能的部分(functional part)

plugin

  1. 为了让ES可以加载我们的plugin,我们需要先继承Plugin类,然后我们这个是个分词器插件,所以还要实现AnalysisPlugin类
  2. 看过ES源码或者其他分词器源码的同学应该会知道,分词器插件需要实现两个方法,一个用来提供tokenizer,一个是analyzer分别对应分词器中的这俩。
    • 重写Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>>是为了可以提供搜索用分词器
    • 重写Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>>是为了可以提供索引用分词器
  3. 在这个分词器里面我们主要是依靠Tokenizer来实现分词的

functional class

分词器,特别是Tokenizer主要是靠重写三个方法来实现分词的

  1. incrementToken:用来确定每一个词元,输出每一个单词(字)以及它的位置、长度等
  2. reset:用来重制分词结果
  3. end:用来告诉ES,这段文本的分词已经结束了

所以我们主要需要重写的就是这仨方法,当然了,为了能让分词器正确的使用,我们还需要添加一些分词器的配置和初始化的内容,具体代码不写了可以参考我的git,主要讲两个坑:

  1. ES是通过配置文件里的路径来寻找对应的插件类
  2. 然后通过配置文件里的key和刚才提到的代码里的key来寻找对应的分词器,所以这俩地方不要写错了 #plugin-descriptor.properties: classname=org.elasticsearch.plugin.analysis.AnalysisSDPlugin #plugin-descriptor.properties: name=stanford-core-nlp
  3. 在开发过程中由于有java-security的存在,所以需要通过AccessController来调用和加载我们需要的外部jar包

odds and ends

  1. Stanford分词器里面包含了很多功能,目前我使用了分词的部分
  2. 分词器自带词典文件,不过如果要做词典的修改可能需要解包,修改,再重新打包
  3. 我现在hardcode了一大堆的标点符号在里面,后面可能会去优化一下部分逻辑
  4. 待完成的功能还有其他功能包括情感分析之类的

also see

GitHub 地址

收起阅读 »

ELK 使用小技巧(第 4 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

Day 21 - ECE 版本升级扫雷实战

Elastic Cloud Enterprise 出来也有一年多的时间了。对于这类的开源软件企业版本,基本都是在可管理性和稳定性上面下功夫。但是新产品免不了需要经历一下bug的打磨才会变得成熟。

下面分享的这个案例是当我们在把集群从5.4.1 升级到5.6.12 的过程中,遇到节点关闭受阻,升级不完整等情景。以及对应的处理方法。
首先在ECE中,版本是通过stack的方式管理
 
QQ20181221-104539@2x.png

Ref : https://www.elastic.co/guide/e ... .html

这些版本都是以docker images的形式存储

QQ20181221-104619@2x.png

因此,ECE根据不同的版本,然后选择对应的docker image就可以创建一个节点了. 那么升级的过程就可以简单分成几个步骤

1.exclude准备升级的节点。

2.停止节点ES进程,更换container 版本。

3.重新启动节点,加入集群。

4.在其他节点上重复以上流程。

在这个过程中, 实际使用的时候发现有一些需要注意的雷区.
扫雷一:使用UI触发升级,必须保证集群没有自定义插件和bundles

ECE 里面的集群操作是通过plan来控制的

QQ20181221-104650@2x.png

任何的集群操作最终都会生成一个plan的diff。如上图,把集群从5.4.1 升级到 5.6.12 会产生以上diff.

正常情况下是没有问题的.

如果集群配置了自定义bundles, 比如LDAP bundles, ref:https://www.elastic.co/guide/e ... .html

那么在集群的plan里面就会存在这么一段配置

QQ20181221-104718@2x.png

那么当我们在按下升级按钮的时候

QQ20181221-104744@2x.png

ECE 只在plan中修改了集群大版本的配置, 但是并没有修改自定义bundles中的版本号(仍然是5.4.1).

在这种情况下去执行升级,会直接产生报错.

QQ20181221-104810@2x.png

界面上没有显示原因, 但是这是因为plan里面大版本和bundle中的版本不一致,然后会导致新增的节点无法启动. 于是ECE 就认为集群升级失败了.
解决方法是手动编辑plan,把自定义bundles中的版本号改成和集群版本一致

QQ20181221-104848@2x.png

然后使用ECE 提供的一种手动使用plan进行集群升级的方式进行升级.

扫雷二:节点无法关闭

ECE 控制container 是通过一个叫做 constructor的服务。constructor 通过接收集群的更改需求,制定具体的更改计划与步骤,指导allocator对container进行操作。同时也负责保证集群高可靠性,通过Availability Zone的数量在不同的AZ上面部署节点。

当Allocator接受到关闭container命令的时候,会尝试去关闭container,如果container处于一个阻塞状态无法响应, 那么关闭命令无法执行成功。这个时候constructor会等待节点关闭,但是allocator又认为节点已经接受到关闭命令了。又或者constructor发送给allocator的过程中网络丢包, 这个时候allocator 没有正确接受关闭container的命令. 整个升级进程就卡住了。 这种情况十分罕见,通常发现一个container如果处于”正在关闭”时间太久了, 那么通常就是中间的通信出现问题了.

QQ20181221-104926@2x.png

解决办法是可以通过手动停止container, 在对应的allocator上面找到container,使用

docker stop <container_name>

停止container,这样可以出发allocator更新container的健康状态,上报这个container已经关闭了, 从而打通流程并执行下一步。
扫雷三: 多版本并存

如果使用上面的方式强行关闭docker container, 虽然可以让升级进程继续进行下去. 但是被手动关闭的节点会保留原来的版本。于是在升级后查看各个节点的版本,会发现部分节点是5.4.1, 部分是5.6.12.

因为节点是强制关闭的, ECE直接认为节点已经完成升级,并重新启动这个container. 而在这个处理中,跳过了升级docker image的一步.

为什么不是生成一个新的container呢? 因为从plan里面可以看到

QQ20181221-104951@2x.png

在默认情况下, ECE 处理版本升级是使用rolling 策略 Ref: https://www.elastic.co/guide/e ... .html

在这个策略下,ECE会停止当前container并直接修改重启。

如果ECE集群容量允许, 可以改成grow_and_shrink 策略, 这样ECE 会创建新的container并且销毁旧的container, 避免集群出现多版本.
如果出现了多版本的集群,可以通过更改集群任意一个配置来触发 grow_and_shrink 同样可以使到版本回归一致.
总结来说ECE 在版本升级方面还是有很多需要改进的地方. 对于ECE用户再说在使用ECE的版本升级功能的时候主要有以下建议

1. 自己学会手动修改plan. 这也是每一个ECE support engineer 都会干的事情.

2.如果集群容量允许,尽量使用 grow_and_shrink的策略来进行集群操作.
 
继续阅读 »
Elastic Cloud Enterprise 出来也有一年多的时间了。对于这类的开源软件企业版本,基本都是在可管理性和稳定性上面下功夫。但是新产品免不了需要经历一下bug的打磨才会变得成熟。

下面分享的这个案例是当我们在把集群从5.4.1 升级到5.6.12 的过程中,遇到节点关闭受阻,升级不完整等情景。以及对应的处理方法。
首先在ECE中,版本是通过stack的方式管理
 
QQ20181221-104539@2x.png

Ref : https://www.elastic.co/guide/e ... .html

这些版本都是以docker images的形式存储

QQ20181221-104619@2x.png

因此,ECE根据不同的版本,然后选择对应的docker image就可以创建一个节点了. 那么升级的过程就可以简单分成几个步骤

1.exclude准备升级的节点。

2.停止节点ES进程,更换container 版本。

3.重新启动节点,加入集群。

4.在其他节点上重复以上流程。

在这个过程中, 实际使用的时候发现有一些需要注意的雷区.
扫雷一:使用UI触发升级,必须保证集群没有自定义插件和bundles

ECE 里面的集群操作是通过plan来控制的

QQ20181221-104650@2x.png

任何的集群操作最终都会生成一个plan的diff。如上图,把集群从5.4.1 升级到 5.6.12 会产生以上diff.

正常情况下是没有问题的.

如果集群配置了自定义bundles, 比如LDAP bundles, ref:https://www.elastic.co/guide/e ... .html

那么在集群的plan里面就会存在这么一段配置

QQ20181221-104718@2x.png

那么当我们在按下升级按钮的时候

QQ20181221-104744@2x.png

ECE 只在plan中修改了集群大版本的配置, 但是并没有修改自定义bundles中的版本号(仍然是5.4.1).

在这种情况下去执行升级,会直接产生报错.

QQ20181221-104810@2x.png

界面上没有显示原因, 但是这是因为plan里面大版本和bundle中的版本不一致,然后会导致新增的节点无法启动. 于是ECE 就认为集群升级失败了.
解决方法是手动编辑plan,把自定义bundles中的版本号改成和集群版本一致

QQ20181221-104848@2x.png

然后使用ECE 提供的一种手动使用plan进行集群升级的方式进行升级.

扫雷二:节点无法关闭

ECE 控制container 是通过一个叫做 constructor的服务。constructor 通过接收集群的更改需求,制定具体的更改计划与步骤,指导allocator对container进行操作。同时也负责保证集群高可靠性,通过Availability Zone的数量在不同的AZ上面部署节点。

当Allocator接受到关闭container命令的时候,会尝试去关闭container,如果container处于一个阻塞状态无法响应, 那么关闭命令无法执行成功。这个时候constructor会等待节点关闭,但是allocator又认为节点已经接受到关闭命令了。又或者constructor发送给allocator的过程中网络丢包, 这个时候allocator 没有正确接受关闭container的命令. 整个升级进程就卡住了。 这种情况十分罕见,通常发现一个container如果处于”正在关闭”时间太久了, 那么通常就是中间的通信出现问题了.

QQ20181221-104926@2x.png

解决办法是可以通过手动停止container, 在对应的allocator上面找到container,使用

docker stop <container_name>

停止container,这样可以出发allocator更新container的健康状态,上报这个container已经关闭了, 从而打通流程并执行下一步。
扫雷三: 多版本并存

如果使用上面的方式强行关闭docker container, 虽然可以让升级进程继续进行下去. 但是被手动关闭的节点会保留原来的版本。于是在升级后查看各个节点的版本,会发现部分节点是5.4.1, 部分是5.6.12.

因为节点是强制关闭的, ECE直接认为节点已经完成升级,并重新启动这个container. 而在这个处理中,跳过了升级docker image的一步.

为什么不是生成一个新的container呢? 因为从plan里面可以看到

QQ20181221-104951@2x.png

在默认情况下, ECE 处理版本升级是使用rolling 策略 Ref: https://www.elastic.co/guide/e ... .html

在这个策略下,ECE会停止当前container并直接修改重启。

如果ECE集群容量允许, 可以改成grow_and_shrink 策略, 这样ECE 会创建新的container并且销毁旧的container, 避免集群出现多版本.
如果出现了多版本的集群,可以通过更改集群任意一个配置来触发 grow_and_shrink 同样可以使到版本回归一致.
总结来说ECE 在版本升级方面还是有很多需要改进的地方. 对于ECE用户再说在使用ECE的版本升级功能的时候主要有以下建议

1. 自己学会手动修改plan. 这也是每一个ECE support engineer 都会干的事情.

2.如果集群容量允许,尽量使用 grow_and_shrink的策略来进行集群操作.
  收起阅读 »

Day 17 - 关于日志型数据管理策略的思考

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

冷热数据分离

冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  1. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: heap-size。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  2. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  3. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  4. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

热结点分组管理

所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

索引的Rollover

Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

索引的Rollup

我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

多集群的便利性

数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。

继续阅读 »

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

冷热数据分离

冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  1. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: heap-size。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  2. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  3. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  4. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

热结点分组管理

所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

索引的Rollover

Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

索引的Rollup

我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

多集群的便利性

数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。

收起阅读 »

Day 15 - 基于海量公司分词ES中文分词插件

介绍

本次想和大家分享一款Elasticsearch分词插件,该插件是基于天津海量信息股份有限公司的中文分词核心开发的。海量分词针对大数据检索场景专门做了定制和优化,更贴近搜索需求,整体分词的性能也是非常高效。

本文章有广告成分。但希望将公司研究成果分享出来,给大家实际工作中多一种选择...

海量分词检索优化点

  • 地名方面海量分词5.0可以识别并检索出关于地名后缀的结果

    可以通过搜索“河南”得到“河南省”的结果,搜索“天津”得到“天津市”的搜索结果,而不是简单河南、天津的识别。

  • 著名人物的人名识别更精准,如刘翔、傅莹等

    部分分词器处理中文分词只有两种方式:一种是单字(unigrams)形式,即简单粗暴的将中文的每一个汉字作为一个词(token)分开;另一种是两字(bigrams)的,也就是任意相邻的两个汉字作为一个词分开。这种简单粗暴的切分方式无法实现时效性较新的人名识别,如刘翔、傅莹等会被识别为单字切开。

  • 外国人名识别方面海量可以将人名识别智能识别

    “玛利亚 凯利”、“乔治·史密斯”、“玛丽·戴维斯”将完整的外国人名识别出姓氏和名,如“乔治·史密斯”可以被识别为“乔治”和 “史密斯”。

  • 常见词的品牌名称识别方面,海量分词5.0识别的结果中包含实际意义的品牌名称

    如“乐高”,“吉米作为简单的词,可以被识别,但是词放在文档语境中有其品牌的属性,海量分词识别的结果中可以准确搜索出品牌的结果。

  • 机构名识别方面

    海量分词5.0可以识别完整的机构名称,如“天津海量信息技术股份有限公司”,可以完整的识别出全称。

海量分词性能评测

评测用例

本次评测选取的语料一共三个。一个是2MB的海量测试语料,一个是4MB的北大语料(新版旧版各2MB),一个是9.4GB海量的线上实际数据

评测指标

本次评测是在开源评测程序上修改而来,评测指标有分词速度、行数完美率、字数完美率(该指标仅供参考)、内存消耗

评测结果

2MB海量测试语料

分词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1049.0212 74.11% 65.97% 85
ltp / 33.748833 55.68% 45.23% 201
IctClass 普通分词 208.69612 48.77% 37.10% 51
IctClass 细粒度分词 691.5951 38.33% 27.95% 51
Jieba SEARCH分词 592.697 47.64% 36.25% 236
FudanNLP / 121.7537 42.99% 31.59% 99
HanLP 标准分词 212.74121 45.30% 34.00% 63
HanLP NLP分词 378.23676 44.09% 32.55% 71
HanLP N-最短路径分词 189.29959 44.19% 32.22% 60
HanLP 最短路径分词 415.63605 43.19% 31.28% 59
HanLP 极速词典分词 6735.1934 36.78% 25.10% 18
THULAC / 0.20857348 54.49% 43.79% 110
Stanford CTB 0.13520464 44.43% 33.25% 1101
Stanford PKU 0.12508623 45.15% 34.01% 1065

可以看到海量分词的行数完美率是最高的,而且速度十分优异;仅有的一个比海量分词速度快的算法是一个追求极限性能舍弃准确率的算法

4MB北大语料

词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1121.7269 85.94% 48.28% 85
ltp / 35.81329 87.37% 49.37% 201
IctClass 普通分词 226.11554 78.55% 42.04% 51
IctClass 细粒度分词 756.5135 59.06% 30.61% 51
Jieba SEARCH分词 957.52826 47.07% 20.01% 236
FudanNLP / 126.09879 58.54% 27.78% 99
HanLP 标准分词 369.66 65.46% 35.04% 63
HanLP NLP分词 439.75632 61.93% 31.37% 71
HanLP N-最短路径分词 223.30482 69.20% 35.07% 60
HanLP 最短路径分词 440.72244 67.74% 33.83% 59
HanLP 极速词典分词 7522.581 58.09% 27.82% 18

(注:THULAC和stanford由于速度问题,不纳入评测)

可以看到海量的速度和行数完美率都很优异而且达到了兼顾,行数完美率只落后更高的ltp算法1.4个百分点,速度却是它的三十多倍

9.4GB线上数据

分词器 分词模式 分词速度(字符/毫秒)
ltp / 33.592
海量 / 960.611
IctClass 普通分词 198.094
HanLP N-最短路径分词 201.735
HanLP 最短路径分词 425.482
HanLP 标准分词 473.400
HanLP NLP分词 361.842
IctClass 细粒度分词 689.183
FudanNLP / 120.860
HanLP 极速词典分词 6238.916
Jieba SEARCH分词 568.262

(注:THULAC和stanford由于速度问题,不纳入评测)

本表格中分词顺序按(4MB北大语料的)行数完美率进行排序,越靠前的(4MB北大语料的)行数完美率越高

可以看出海量的分词速度十分优秀,分词速度拉开了大多数分词数倍,相比于行数完美率小幅领先的ltp要快几十倍

海量分词插件使用方法

安装使用

  • 下载安装 - 地址: https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases

    unzip plugin to folder `your-es-root/plugins/`
  • 使用 elasticsearch-plugin 安装

    ./bin/elasticsearch-plugin install https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases/download/v6.4.2/elasticsearch-analysis-hlseg-6.4.2.zip
  • 重启es集群

实例(借用github-ik分词插件的实例)

1.创建index

curl -XPUT http://localhost:9200/hylanda_seg

2.配置mapping

curl -XPOST http://localhost:9200/hylanda_seg/data/_mapping -H 'Content-Type:application/json' -d'
{
  "properties": {
    "msg": {
      "type": "text",
      "analyzer": "hlseg_search"
    }
  }
}'

3.插入测试数据

curl -XPOST http://localhost:9200/hylanda_seg/data/1 -H 'Content-Type:application/json' -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/2 -H 'Content-Type:application/json' -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/3 -H 'Content-Type:application/json' -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/4 -H 'Content-Type:application/json' -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

4.查询

curl -XPOST http://localhost:9200/hylanda_seg/data/_search  -H 'Content-Type:application/json' -d'
{
  "query": {
    "match": {
      "content": "中国"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}
'

返回结果

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.5754429,
    "hits" : [
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "4",
        "_score" : 0.5754429,
        "_source" : {
          "content" : "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"
        },
        "highlight" : {
          "content" : [
            "中韩渔警冲突调查:韩警平均每天扣1艘<em>中国</em>渔船"
          ]
        }
      },
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "5",
        "_score" : 0.2876821,
        "_source" : {
          "content" : "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
        },
        "highlight" : {
          "content" : [
            "<em>中国</em>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
          ]
        }
      }
    ]
  }
}

字典配置

海量分词分为基础词词典CoreDict.dat和自定义词典userDict_utf8.txt。基础词词典在dictionary目录下,需要将CoreDict.zip解压后放在config目录下,可以通过修改config下的userDict_utf8.txt来更新自定义词典

自定义词典格式如下


1.用户自定义词典采用文本格式,utf-8编码,每行一个词

2.每个词包含三列属性,分别是词串、词的属性以及idf值的加权等级,并以Tab作为分隔,其中除了词串必填外,其他列可以不填,不填写则系统采用默认值

3.“#”表示注释,会在加载时被忽略

4.词的属性以西文逗号分隔,可以是词性、停止词标志或者自定义属性

5.词性标记参考北大标准,用于词性标注时参考,该项不填则默认为名词

6.停止词标志为:stopword,由SegOption.outputStopWord来控制是否输出停止词

7.自定义属性不参与分词过程,分词结果中若Token.userTag不为空,则可以获取到该词的自定义属性。

8.idf值的加权分5级,从低到高的定义是idf-lv1 — idf-lv5,等级越高则该词在关键词计算时的权重会越大,若不填写该值则系统默认是idf-lv3(中等权重)
继续阅读 »

介绍

本次想和大家分享一款Elasticsearch分词插件,该插件是基于天津海量信息股份有限公司的中文分词核心开发的。海量分词针对大数据检索场景专门做了定制和优化,更贴近搜索需求,整体分词的性能也是非常高效。

本文章有广告成分。但希望将公司研究成果分享出来,给大家实际工作中多一种选择...

海量分词检索优化点

  • 地名方面海量分词5.0可以识别并检索出关于地名后缀的结果

    可以通过搜索“河南”得到“河南省”的结果,搜索“天津”得到“天津市”的搜索结果,而不是简单河南、天津的识别。

  • 著名人物的人名识别更精准,如刘翔、傅莹等

    部分分词器处理中文分词只有两种方式:一种是单字(unigrams)形式,即简单粗暴的将中文的每一个汉字作为一个词(token)分开;另一种是两字(bigrams)的,也就是任意相邻的两个汉字作为一个词分开。这种简单粗暴的切分方式无法实现时效性较新的人名识别,如刘翔、傅莹等会被识别为单字切开。

  • 外国人名识别方面海量可以将人名识别智能识别

    “玛利亚 凯利”、“乔治·史密斯”、“玛丽·戴维斯”将完整的外国人名识别出姓氏和名,如“乔治·史密斯”可以被识别为“乔治”和 “史密斯”。

  • 常见词的品牌名称识别方面,海量分词5.0识别的结果中包含实际意义的品牌名称

    如“乐高”,“吉米作为简单的词,可以被识别,但是词放在文档语境中有其品牌的属性,海量分词识别的结果中可以准确搜索出品牌的结果。

  • 机构名识别方面

    海量分词5.0可以识别完整的机构名称,如“天津海量信息技术股份有限公司”,可以完整的识别出全称。

海量分词性能评测

评测用例

本次评测选取的语料一共三个。一个是2MB的海量测试语料,一个是4MB的北大语料(新版旧版各2MB),一个是9.4GB海量的线上实际数据

评测指标

本次评测是在开源评测程序上修改而来,评测指标有分词速度、行数完美率、字数完美率(该指标仅供参考)、内存消耗

评测结果

2MB海量测试语料

分词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1049.0212 74.11% 65.97% 85
ltp / 33.748833 55.68% 45.23% 201
IctClass 普通分词 208.69612 48.77% 37.10% 51
IctClass 细粒度分词 691.5951 38.33% 27.95% 51
Jieba SEARCH分词 592.697 47.64% 36.25% 236
FudanNLP / 121.7537 42.99% 31.59% 99
HanLP 标准分词 212.74121 45.30% 34.00% 63
HanLP NLP分词 378.23676 44.09% 32.55% 71
HanLP N-最短路径分词 189.29959 44.19% 32.22% 60
HanLP 最短路径分词 415.63605 43.19% 31.28% 59
HanLP 极速词典分词 6735.1934 36.78% 25.10% 18
THULAC / 0.20857348 54.49% 43.79% 110
Stanford CTB 0.13520464 44.43% 33.25% 1101
Stanford PKU 0.12508623 45.15% 34.01% 1065

可以看到海量分词的行数完美率是最高的,而且速度十分优异;仅有的一个比海量分词速度快的算法是一个追求极限性能舍弃准确率的算法

4MB北大语料

词器 分词模式 分词速度(字符/毫秒) 行数完美率 字数完美率 占用内存(MB)
海量 / 1121.7269 85.94% 48.28% 85
ltp / 35.81329 87.37% 49.37% 201
IctClass 普通分词 226.11554 78.55% 42.04% 51
IctClass 细粒度分词 756.5135 59.06% 30.61% 51
Jieba SEARCH分词 957.52826 47.07% 20.01% 236
FudanNLP / 126.09879 58.54% 27.78% 99
HanLP 标准分词 369.66 65.46% 35.04% 63
HanLP NLP分词 439.75632 61.93% 31.37% 71
HanLP N-最短路径分词 223.30482 69.20% 35.07% 60
HanLP 最短路径分词 440.72244 67.74% 33.83% 59
HanLP 极速词典分词 7522.581 58.09% 27.82% 18

(注:THULAC和stanford由于速度问题,不纳入评测)

可以看到海量的速度和行数完美率都很优异而且达到了兼顾,行数完美率只落后更高的ltp算法1.4个百分点,速度却是它的三十多倍

9.4GB线上数据

分词器 分词模式 分词速度(字符/毫秒)
ltp / 33.592
海量 / 960.611
IctClass 普通分词 198.094
HanLP N-最短路径分词 201.735
HanLP 最短路径分词 425.482
HanLP 标准分词 473.400
HanLP NLP分词 361.842
IctClass 细粒度分词 689.183
FudanNLP / 120.860
HanLP 极速词典分词 6238.916
Jieba SEARCH分词 568.262

(注:THULAC和stanford由于速度问题,不纳入评测)

本表格中分词顺序按(4MB北大语料的)行数完美率进行排序,越靠前的(4MB北大语料的)行数完美率越高

可以看出海量的分词速度十分优秀,分词速度拉开了大多数分词数倍,相比于行数完美率小幅领先的ltp要快几十倍

海量分词插件使用方法

安装使用

  • 下载安装 - 地址: https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases

    unzip plugin to folder `your-es-root/plugins/`
  • 使用 elasticsearch-plugin 安装

    ./bin/elasticsearch-plugin install https://github.com/HylandaOpen/elasticsearch-analysis-hlseg/releases/download/v6.4.2/elasticsearch-analysis-hlseg-6.4.2.zip
  • 重启es集群

实例(借用github-ik分词插件的实例)

1.创建index

curl -XPUT http://localhost:9200/hylanda_seg

2.配置mapping

curl -XPOST http://localhost:9200/hylanda_seg/data/_mapping -H 'Content-Type:application/json' -d'
{
  "properties": {
    "msg": {
      "type": "text",
      "analyzer": "hlseg_search"
    }
  }
}'

3.插入测试数据

curl -XPOST http://localhost:9200/hylanda_seg/data/1 -H 'Content-Type:application/json' -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/2 -H 'Content-Type:application/json' -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/3 -H 'Content-Type:application/json' -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/hylanda_seg/data/4 -H 'Content-Type:application/json' -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

4.查询

curl -XPOST http://localhost:9200/hylanda_seg/data/_search  -H 'Content-Type:application/json' -d'
{
  "query": {
    "match": {
      "content": "中国"
    }
  },
  "highlight": {
    "fields": {
      "content": {}
    }
  }
}
'

返回结果

{
  "took" : 11,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 0.5754429,
    "hits" : [
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "4",
        "_score" : 0.5754429,
        "_source" : {
          "content" : "中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"
        },
        "highlight" : {
          "content" : [
            "中韩渔警冲突调查:韩警平均每天扣1艘<em>中国</em>渔船"
          ]
        }
      },
      {
        "_index" : "hylanda_seg",
        "_type" : "data",
        "_id" : "5",
        "_score" : 0.2876821,
        "_source" : {
          "content" : "中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
        },
        "highlight" : {
          "content" : [
            "<em>中国</em>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"
          ]
        }
      }
    ]
  }
}

字典配置

海量分词分为基础词词典CoreDict.dat和自定义词典userDict_utf8.txt。基础词词典在dictionary目录下,需要将CoreDict.zip解压后放在config目录下,可以通过修改config下的userDict_utf8.txt来更新自定义词典

自定义词典格式如下


1.用户自定义词典采用文本格式,utf-8编码,每行一个词

2.每个词包含三列属性,分别是词串、词的属性以及idf值的加权等级,并以Tab作为分隔,其中除了词串必填外,其他列可以不填,不填写则系统采用默认值

3.“#”表示注释,会在加载时被忽略

4.词的属性以西文逗号分隔,可以是词性、停止词标志或者自定义属性

5.词性标记参考北大标准,用于词性标注时参考,该项不填则默认为名词

6.停止词标志为:stopword,由SegOption.outputStopWord来控制是否输出停止词

7.自定义属性不参与分词过程,分词结果中若Token.userTag不为空,则可以获取到该词的自定义属性。

8.idf值的加权分5级,从低到高的定义是idf-lv1 — idf-lv5,等级越高则该词在关键词计算时的权重会越大,若不填写该值则系统默认是idf-lv3(中等权重)
收起阅读 »

Day 14 - 订单中心基于elasticsearch 的解决方案

       ElasticSearch分布式搜索储存集群的引入,主要是为了解决订单数据的存储与搜索的问题。

项目背景:
      15年去哪儿网酒店日均订单量达到30w+,随着多平台订单的聚合日均订单能达到100w左右。原来采用的热表分库方式,即将最近6个月的订单的放置在一张表中,将历史订单放在在history表中。history表存储全量的数据,当用户查询的下单时间跨度超过6个月即查询历史订单表,此分表方式热表的数据量为4000w左右,当时能解决的问题。但是显然不能满足携程艺龙订单接入的需求。如果继续按照热表方式,数据量将超过1亿条。全量数据表保存2年的可能就超过4亿的数据量。所以寻找有效途径解决此问题迫在眉睫。由于对这预计4亿的数据量还需按照预定日期、入住日期、离店日期、订单号、联系人姓名、电话、酒店名称、订单状态……等多个条件查询。所以简单按照某一个维度进行分表操作没有意义。ElasticSearch分布式搜索储存集群的引入,就是为了解决订单数据的存储与搜索的问题。

具体解决方案:

1、系统性能
        对订单模型进行抽象和分类,将常用搜索字段和基础属性字段剥离DB做分库分表。存储订单详情,ElasticSearch存储搜素字段。订单复杂查询直接走ElasticSearch。如下图:

elasticsearch1.png

       通用数据存储模型

elasticsearch2.png


关键字段
    ■ 业务核心字段,用于查询过滤
系统字段
    ■ version 避免高并发操作导致数据覆盖
大字段
    ■ order_data订单详情数据(JSON)
    ■ 可灵活需要索引的字段返回的字段

 
 
2、系统可用性
     系统可用性保障:双机房高可用如下图。

       
elasticsearch3.png

     数据可用性保障:
            一、异步多写保证数据一致性。

                
二、数据补充机制:
  1、每天凌晨task扫描数据库热表数据与es数据版本进行比较。
  2、将第三方推送过来数据中的,订单号即时插入订单同步队列表中。如果数据模型解析转换、持久化成功。删除队列中订单号。同时设置1分钟一次的task 扫描队列表。
  3、推送第三方的数据也采用同样的方式。保证给第三方数据的准确性。


elasticsearch4.png


3、系统伸缩性
      elasticSearch中索引设置了8个分片,目前Es单个索引的文档达到到1.4亿,合计达到2亿条数据占磁盘大小64G,集群机器磁盘容量240G。

 
 
 
继续阅读 »
       ElasticSearch分布式搜索储存集群的引入,主要是为了解决订单数据的存储与搜索的问题。

项目背景:
      15年去哪儿网酒店日均订单量达到30w+,随着多平台订单的聚合日均订单能达到100w左右。原来采用的热表分库方式,即将最近6个月的订单的放置在一张表中,将历史订单放在在history表中。history表存储全量的数据,当用户查询的下单时间跨度超过6个月即查询历史订单表,此分表方式热表的数据量为4000w左右,当时能解决的问题。但是显然不能满足携程艺龙订单接入的需求。如果继续按照热表方式,数据量将超过1亿条。全量数据表保存2年的可能就超过4亿的数据量。所以寻找有效途径解决此问题迫在眉睫。由于对这预计4亿的数据量还需按照预定日期、入住日期、离店日期、订单号、联系人姓名、电话、酒店名称、订单状态……等多个条件查询。所以简单按照某一个维度进行分表操作没有意义。ElasticSearch分布式搜索储存集群的引入,就是为了解决订单数据的存储与搜索的问题。

具体解决方案:

1、系统性能
        对订单模型进行抽象和分类,将常用搜索字段和基础属性字段剥离DB做分库分表。存储订单详情,ElasticSearch存储搜素字段。订单复杂查询直接走ElasticSearch。如下图:

elasticsearch1.png

       通用数据存储模型

elasticsearch2.png


关键字段
    ■ 业务核心字段,用于查询过滤
系统字段
    ■ version 避免高并发操作导致数据覆盖
大字段
    ■ order_data订单详情数据(JSON)
    ■ 可灵活需要索引的字段返回的字段

 
 
2、系统可用性
     系统可用性保障:双机房高可用如下图。

       
elasticsearch3.png

     数据可用性保障:
            一、异步多写保证数据一致性。

                
二、数据补充机制:
  1、每天凌晨task扫描数据库热表数据与es数据版本进行比较。
  2、将第三方推送过来数据中的,订单号即时插入订单同步队列表中。如果数据模型解析转换、持久化成功。删除队列中订单号。同时设置1分钟一次的task 扫描队列表。
  3、推送第三方的数据也采用同样的方式。保证给第三方数据的准确性。


elasticsearch4.png


3、系统伸缩性
      elasticSearch中索引设置了8个分片,目前Es单个索引的文档达到到1.4亿,合计达到2亿条数据占磁盘大小64G,集群机器磁盘容量240G。

 
 
  收起阅读 »

Day 11 -父子关系维护检索实战一 - Elasticsearch 5.x-父子关系维护

本次分享包括两篇文章
  • 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
  • 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战

本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
  1. Elasticsearch 5.x 中父子关系mapping结构设计
  2. Elasticsearch 5.x 中维护父子关系数据
  3. Elasticsearch 5.x 中has_child和has_parent查询的基本用法
  4. Elasticsearch 5.x 中如何在检索中同时返回父子数据

案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
parent.png


我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。

1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端

2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": { 
      "_parent": { "type": "basic" },
     .................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>

<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>

这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
	public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");

//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}


3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
  • 定义对象

首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
  • Basic
  • Medical
  • Exam
  • Diagnosis

通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}

  • 通过api维护测试数据

对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
	/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();

//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");

//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");

//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");

//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;

}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;

}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}

  • 通过json报文批量导入测试数据

除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
    <!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }

{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>

<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>

<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>







通过bboss提供的通用api,导入上面定义的数据:
	/**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");

clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
		
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法​
  • 根据父查子-通过客户名称信息查询客户端体检结果

在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
   <!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>

 执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
	/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
 
  • 根据子查父数据-通过医疗信息编码查找客户基本数据

在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
    <!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据
	/**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据
这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
  • 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息

在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
    <!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果
	/**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);

try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}

  •  根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录

在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
    <!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}

},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询:
	/**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象

try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());

}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能:
	@Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。
到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!

相关资料
完整demo工程  https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
 
开发交流
bboss交流群 166471282
bboss公众号
getqrcode.jpg

 
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检
继续阅读 »
本次分享包括两篇文章
  • 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
  • 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战

本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
  1. Elasticsearch 5.x 中父子关系mapping结构设计
  2. Elasticsearch 5.x 中维护父子关系数据
  3. Elasticsearch 5.x 中has_child和has_parent查询的基本用法
  4. Elasticsearch 5.x 中如何在检索中同时返回父子数据

案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
parent.png


我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。

1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端

2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": { 
      "_parent": { "type": "basic" },
     .................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>

<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>

这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
	public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");

//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}


3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
  • 定义对象

首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
  • Basic
  • Medical
  • Exam
  • Diagnosis

通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}

  • 通过api维护测试数据

对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
	/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();

//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");

//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");

//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");

//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;

}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;

}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}

  • 通过json报文批量导入测试数据

除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
    <!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }

{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>

<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }

{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>

<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }

{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>







通过bboss提供的通用api,导入上面定义的数据:
	/**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");

clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
		
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法​
  • 根据父查子-通过客户名称信息查询客户端体检结果

在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
   <!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>

 执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
	/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
 
  • 根据子查父数据-通过医疗信息编码查找客户基本数据

在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
    <!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据
	/**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据
这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
  • 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息

在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
    <!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果
	/**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){

ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);

try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}

  •  根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录

在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
    <!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}

},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询:
	/**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象

try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());

}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能:
	@Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。
到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!

相关资料
完整demo工程  https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
 
开发交流
bboss交流群 166471282
bboss公众号
getqrcode.jpg

 
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检 收起阅读 »

es创建索引失败

#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件
继续阅读 »
#10001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/mstore.log
  fields:
    env: uat-10001-log
  include_lines: ['ERROR']
#10001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/10001/logs/catalina.out
  fields:
    env: uat-10001-catalina
  include_lines: ['ERROR']
#11001-log
- type: log
  enabled: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/delivery.log
  include_lines: ['ERROR']
  fields:
    env: uat-11001-log
#11001-catalina.out
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/11001/logs/catalina.out
  fields:
    env: uat-11001-catalina
  include_lines: ['ERROR']
#12001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/mstore.log
  fields:
    env: uat-12001-log
  include_lines: ['ERROR']
#12001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/12001/logs/catalina.out
  fields:
    env: uat-12001-catalina
  include_lines: ['ERROR']
#13001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/pay-web-boss.log
  fields:
    env: uat-13001-log
    include_lines: ['ERROR']
#13001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/13001/logs/catalina.out
  fields:
    env: uat-13001-catalina
  include_lines: ['ERROR']
#14001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/pay-web-gateway.log
  fields:
    env: uat-14001-log
  include_lines: ['ERROR']
#14001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/14001/logs/catalina.out
  fields:
    env: uat-14001-catalina
  include_lines: ['ERROR']
#15001-log
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/roncoo-pay-web-merchant.log
  fields:
    env: uat-15001-log
  include_lines: ['ERROR']
#15001-catalina
- type: log
  enable: true
  paths:
    - /mkt/tomcat/8.5.32/15001/logs/catalina.out
  fields:
    env: uat-15001-catalina
  include_lines: ['ERROR']      每次创建索引会少uat-11001-log和uat-12001-log,filebeat读取了这个两个日志文件 收起阅读 »

【 报名开启】2018 Elastic & 袋鼠云 & 阿里云技术沙龙(杭州)

互联网时代,十亿、百亿、千亿的数据日志呈井喷式增长,基于日志搜索分析的需求也越来越强烈。Elasticsearch 作为一个分布式、可扩展、实时的搜索与数据分析引擎, 在大体量的数据处理上,无论实在全文搜索,还是在结构化数据统计中,都有非常大的优势。然而在真正实践过程中海量数据如何高效采集,如何合理优化分配索引,如何规划集群,如何满足业务分析需求都是我们可能会面临的问题。

本次袋鼠云联合阿里云、Elastic 中文社区,共同邀请滴滴、有赞等行业技术专家一同分享和探讨各自领域Elastic的实践。
 
本次活动时间12月15日 周六人数限制100人,大家抓紧报名哈,报名链接https://meetup.elasticsearch.c ... .html

参与线下互动还有机会获得技术书籍与精美礼品哦!!!
不出意外,这应该是2018年Elastic在杭州的最后一次沙龙,小伙伴们抓紧今年的尾巴,不放过任何学习的机会哦!!!
继续阅读 »
互联网时代,十亿、百亿、千亿的数据日志呈井喷式增长,基于日志搜索分析的需求也越来越强烈。Elasticsearch 作为一个分布式、可扩展、实时的搜索与数据分析引擎, 在大体量的数据处理上,无论实在全文搜索,还是在结构化数据统计中,都有非常大的优势。然而在真正实践过程中海量数据如何高效采集,如何合理优化分配索引,如何规划集群,如何满足业务分析需求都是我们可能会面临的问题。

本次袋鼠云联合阿里云、Elastic 中文社区,共同邀请滴滴、有赞等行业技术专家一同分享和探讨各自领域Elastic的实践。
 
本次活动时间12月15日 周六人数限制100人,大家抓紧报名哈,报名链接https://meetup.elasticsearch.c ... .html

参与线下互动还有机会获得技术书籍与精美礼品哦!!!
不出意外,这应该是2018年Elastic在杭州的最后一次沙龙,小伙伴们抓紧今年的尾巴,不放过任何学习的机会哦!!! 收起阅读 »