从MySQL建立Elasticsearch索引-索引

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

收起阅读 »

从MySQL建立Elasticsearch索引-引言

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

收起阅读 »

Elastic日报 第661期 (2019-07-06)

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

继续阅读 »

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

收起阅读 »

邀请参加 Elasticsearch 中文分词器需求调查问卷


Jietu20190624-202208.gif

 亲爱的 Elasticsearch 中文用户:
 
为了能够充分了解您对 Elasticsearch 中文分词需求的基本情况,以便我们更有针对性地改进中文分词下的使用体验,并优化相关分词器,请您抽出几分钟的时间填写以下问卷。问卷题目中的选项没有对错之分,请您根据实际情况或想法进行填写。您的信息将被严格保密,请放心填写!

调研结束后,我们将随机抽出 5 名同学,每人送 Elastic 纪念 T 恤一件。谢谢您对本次调查的参与和支持~
 
问卷填写地址:http://elasticsearch.mikecrm.com/nz0FxmK
 
谢谢!
继续阅读 »

Jietu20190624-202208.gif

 亲爱的 Elasticsearch 中文用户:
 
为了能够充分了解您对 Elasticsearch 中文分词需求的基本情况,以便我们更有针对性地改进中文分词下的使用体验,并优化相关分词器,请您抽出几分钟的时间填写以下问卷。问卷题目中的选项没有对错之分,请您根据实际情况或想法进行填写。您的信息将被严格保密,请放心填写!

调研结束后,我们将随机抽出 5 名同学,每人送 Elastic 纪念 T 恤一件。谢谢您对本次调查的参与和支持~
 
问卷填写地址:http://elasticsearch.mikecrm.com/nz0FxmK
 
谢谢! 收起阅读 »

Elastic日报 第632期 (2019-06-04)

1、23条好用的Elasticsearch查询策略。
http://t.cn/R9Sjxf5
2、如何使用Nginx为Elasticsearch设置代理。
http://t.cn/Ai9Oc9Pd
3、Telegraf Elasticsearch插件使用教程。
http://t.cn/Ai9OcpDY

编辑:叮咚光军

归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup
继续阅读 »
1、23条好用的Elasticsearch查询策略。
http://t.cn/R9Sjxf5
2、如何使用Nginx为Elasticsearch设置代理。
http://t.cn/Ai9Oc9Pd
3、Telegraf Elasticsearch插件使用教程。
http://t.cn/Ai9OcpDY

编辑:叮咚光军

归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup 收起阅读 »

ES6.* join类型的可行性

ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过
继续阅读 »
ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过 收起阅读 »

百度智能云招聘ElasticSearch研发工程师

工作职责:
  • 负责ElasticSearch相关产品的设计、研发、维护
  • 与相关大数据产品制定大数据解决方案
  •  

职责要求:
  • 211或985大学本科毕业,计算机相关专业
  • 熟悉Java/Go语言开发,熟悉常见的数据结构与算法
  • 掌握网络编程和多线程开发
  • 对ElasticSearch、Lucene、Slor的原理有较深入了解。
  • 有对开源组件有过二次开发经验者优先
  • 由社区贡献经验者优先

联系方式:shouchunbai@baidu.com
 
继续阅读 »
工作职责:
  • 负责ElasticSearch相关产品的设计、研发、维护
  • 与相关大数据产品制定大数据解决方案
  •  

职责要求:
  • 211或985大学本科毕业,计算机相关专业
  • 熟悉Java/Go语言开发,熟悉常见的数据结构与算法
  • 掌握网络编程和多线程开发
  • 对ElasticSearch、Lucene、Slor的原理有较深入了解。
  • 有对开源组件有过二次开发经验者优先
  • 由社区贡献经验者优先

联系方式:shouchunbai@baidu.com
  收起阅读 »

ELK 使用小技巧(第 5 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、'reader' unacceptable character ' ' (0x0)

logstash 执行使用 Jdbc input plugin 后报错:

[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {
:exception=>#<Psych::SyntaxError: (): 'reader' unacceptable character ' ' (0x0) special characters are not allowed in "'reader'", 
position 0 at line 0 column 0>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:232:in parse'"

解决方案:删除 $USER_HOME/.logstash_jdbc_last_run 文件即可。

二、Elasticsearch

1、TermsQuery 与多个 TermQuery 的区别

当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:

Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);

当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。

当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。

2、ES 借助 nginx 配置域名

upstream /data/ {
    server 192.168.187.xxx:9200;
    keepalive 300 ;
}

server {
    listen 80;
    server_name testelk.xx.com;
    keepalive_timeout 120s 120s;
    location /data {
        proxy_pass http://data/;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_pass_header remote_user 
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Nginx-Proxy true;
    }
}

3、ES Reindex 时如何不停止写入服务

方案一:kennywu76

ES 的 reindex 在索引有实时的 update/delete 的情况下,即使借助 alias,也没有办法实现真正的 zero down time。

增加新文档比较好办,通过 alias 切换写入到新索引,同时 reindex 做旧->新索引的数据传输即可;但是 update/delete 操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。

我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似 last_update_time 字段记录文档最后更新的时间,用作写入 ES 文档的版本号,然后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx

其中 version_type=external_gt 表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,否则会抛版本冲突的异常。另外 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档

这样实时数据写入新索引和 reindex 可以同时进行,实时写入的数据应该具有更高的版本,总是能够成功,reindex 如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。

该方案的缺陷:

  • 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改;
  • delete 操作必须转化为写入一个空文档,delete 实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了 segment merge,delete 可能会被合并以后物理清除。这样 delete 和对应的版本信息丢失,之后 reindex 如果写入了旧版本的文档,仍然会有一致性问题;但是空文档会增加索引文件的大小,有额外的消耗,一个可能的缓解办法是在 reindex 全部做完以后,再做一次空文档的删除。

改进方案:the_best

重建索引步骤如下:

  1. 保证 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档;
  2. 对老索引 old_index(业务上的别名还是挂在老索引上)进行重索引操作(version_type=external);
    curl -X POST 'http://<hostname>:9200/_reindex'
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index",
        "size": 1000
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  3. 将别名切到 newIndex;
  4. 将重索引时间段内 old_index 产生的热数据,再捞一次到 new_index 中(conflicts=proceed&version_type=external);
    curl -X POST /_reindex
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index"
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : <reindex开始时刻前的毫秒时间戳>
                        }
                    }
                }
            }
        }
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  5. 手动做一次空文档的删除。

这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过我们可以视具体业务情况灵活操作。比如说数据量比较大重索引我们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,我们可以按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几次,直到别名切完后,我们能保证最后一次的步骤(4)可以在较短时间内完成。

4、ES 节点通讯配置

http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1

5、把 Lucene 的原生 query 传给 ES

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy, 
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q); 
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();

6、ES 文档字段个数限制

ES 文档默认不允许文档字段超过 1000,超过 1000 会报如下错误:

failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded

可以通过修改索引配置来修改字段个数限制,不过还是推荐从业务上进行优化:

修改settings
{
  "index.mapping.total_fields.limit": 2000
}

7、将 DSL 字符串转换为 QueryBuilder

## wrapper 案例
GET /_search
{
    "query" : {
        "wrapper": {
            "query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0=" 
        }
    }
}

## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")

8、ES 集群重启后 Slice Scroll 速度变慢

重启机器之后,pagecache 都没有了,所有数据都要重新从磁盘加载。

9、ES 开启索引新建删除日志

PUT _cluster/settings
{
  "persistent": {
    "logger.cluster.service": "DEBUG"
  }
}

10、慢日志全局级别设定

  1. 对已经存在的索引可以通过 PUT _settings 做存量设置
  2. 对之后新增的索引,可以使用类似于下面的template
    PUT _template/global-slowlog_template
    {
    "order": -1,
    "version": 0,
    "template": "*",
    "settings": {
        "index.indexing.slowlog.threshold.index.debug" : "10ms",
        "index.indexing.slowlog.threshold.index.info" : "50ms",
        "index.indexing.slowlog.threshold.index.warn" : "100ms",
        "index.search.slowlog.threshold.fetch.debug" : "100ms",
        "index.search.slowlog.threshold.fetch.info" : "200ms",
        "index.search.slowlog.threshold.fetch.warn" : "500ms",
        "index.search.slowlog.threshold.query.debug" : "100ms",
        "index.search.slowlog.threshold.query.info" : "200ms",
        "index.search.slowlog.threshold.query.warn" : "1s"
    }
    }

11、TCP 设置多个端口的用途

transport.tcp.port 这个参数不写,默认为 9300-9399,开放那么多 端口有用么?

  • 如果设置一个端口,假设这个端口占用了程序就无法正常启动;
  • 如果设置多个端口,一个端口占用会寻找下一个端口,直至找到可用端口。

12、ES 临时重启,设置分片延迟分配策略

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

三、Kibana

1、kibana 图表自定义标注

可以用 TSVB,支持标注。

Kibana TSVB 注解的使用:https://elasticsearch.cn/article/701

2、Kibana discover 导出 csv 文件

请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV

3、修改 kibana 的默认主页

https://elasticsearch.cn/article/6335

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、'reader' unacceptable character ' ' (0x0)

logstash 执行使用 Jdbc input plugin 后报错:

[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {
:exception=>#<Psych::SyntaxError: (): 'reader' unacceptable character ' ' (0x0) special characters are not allowed in "'reader'", 
position 0 at line 0 column 0>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:232:in parse'"

解决方案:删除 $USER_HOME/.logstash_jdbc_last_run 文件即可。

二、Elasticsearch

1、TermsQuery 与多个 TermQuery 的区别

当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:

Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);

当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。

当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。

2、ES 借助 nginx 配置域名

upstream /data/ {
    server 192.168.187.xxx:9200;
    keepalive 300 ;
}

server {
    listen 80;
    server_name testelk.xx.com;
    keepalive_timeout 120s 120s;
    location /data {
        proxy_pass http://data/;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_pass_header remote_user 
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Nginx-Proxy true;
    }
}

3、ES Reindex 时如何不停止写入服务

方案一:kennywu76

ES 的 reindex 在索引有实时的 update/delete 的情况下,即使借助 alias,也没有办法实现真正的 zero down time。

增加新文档比较好办,通过 alias 切换写入到新索引,同时 reindex 做旧->新索引的数据传输即可;但是 update/delete 操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。

我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似 last_update_time 字段记录文档最后更新的时间,用作写入 ES 文档的版本号,然后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx

其中 version_type=external_gt 表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,否则会抛版本冲突的异常。另外 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档

这样实时数据写入新索引和 reindex 可以同时进行,实时写入的数据应该具有更高的版本,总是能够成功,reindex 如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。

该方案的缺陷:

  • 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改;
  • delete 操作必须转化为写入一个空文档,delete 实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了 segment merge,delete 可能会被合并以后物理清除。这样 delete 和对应的版本信息丢失,之后 reindex 如果写入了旧版本的文档,仍然会有一致性问题;但是空文档会增加索引文件的大小,有额外的消耗,一个可能的缓解办法是在 reindex 全部做完以后,再做一次空文档的删除。

改进方案:the_best

重建索引步骤如下:

  1. 保证 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档;
  2. 对老索引 old_index(业务上的别名还是挂在老索引上)进行重索引操作(version_type=external);
    curl -X POST 'http://<hostname>:9200/_reindex'
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index",
        "size": 1000
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  3. 将别名切到 newIndex;
  4. 将重索引时间段内 old_index 产生的热数据,再捞一次到 new_index 中(conflicts=proceed&version_type=external);
    curl -X POST /_reindex
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index"
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : <reindex开始时刻前的毫秒时间戳>
                        }
                    }
                }
            }
        }
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  5. 手动做一次空文档的删除。

这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过我们可以视具体业务情况灵活操作。比如说数据量比较大重索引我们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,我们可以按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几次,直到别名切完后,我们能保证最后一次的步骤(4)可以在较短时间内完成。

4、ES 节点通讯配置

http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1

5、把 Lucene 的原生 query 传给 ES

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy, 
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q); 
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();

6、ES 文档字段个数限制

ES 文档默认不允许文档字段超过 1000,超过 1000 会报如下错误:

failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded

可以通过修改索引配置来修改字段个数限制,不过还是推荐从业务上进行优化:

修改settings
{
  "index.mapping.total_fields.limit": 2000
}

7、将 DSL 字符串转换为 QueryBuilder

## wrapper 案例
GET /_search
{
    "query" : {
        "wrapper": {
            "query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0=" 
        }
    }
}

## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")

8、ES 集群重启后 Slice Scroll 速度变慢

重启机器之后,pagecache 都没有了,所有数据都要重新从磁盘加载。

9、ES 开启索引新建删除日志

PUT _cluster/settings
{
  "persistent": {
    "logger.cluster.service": "DEBUG"
  }
}

10、慢日志全局级别设定

  1. 对已经存在的索引可以通过 PUT _settings 做存量设置
  2. 对之后新增的索引,可以使用类似于下面的template
    PUT _template/global-slowlog_template
    {
    "order": -1,
    "version": 0,
    "template": "*",
    "settings": {
        "index.indexing.slowlog.threshold.index.debug" : "10ms",
        "index.indexing.slowlog.threshold.index.info" : "50ms",
        "index.indexing.slowlog.threshold.index.warn" : "100ms",
        "index.search.slowlog.threshold.fetch.debug" : "100ms",
        "index.search.slowlog.threshold.fetch.info" : "200ms",
        "index.search.slowlog.threshold.fetch.warn" : "500ms",
        "index.search.slowlog.threshold.query.debug" : "100ms",
        "index.search.slowlog.threshold.query.info" : "200ms",
        "index.search.slowlog.threshold.query.warn" : "1s"
    }
    }

11、TCP 设置多个端口的用途

transport.tcp.port 这个参数不写,默认为 9300-9399,开放那么多 端口有用么?

  • 如果设置一个端口,假设这个端口占用了程序就无法正常启动;
  • 如果设置多个端口,一个端口占用会寻找下一个端口,直至找到可用端口。

12、ES 临时重启,设置分片延迟分配策略

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

三、Kibana

1、kibana 图表自定义标注

可以用 TSVB,支持标注。

Kibana TSVB 注解的使用:https://elasticsearch.cn/article/701

2、Kibana discover 导出 csv 文件

请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV

3、修改 kibana 的默认主页

https://elasticsearch.cn/article/6335

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

ClusterBlockException SERVICE_UNAVAILABLE/1/state

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
继续阅读 »

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
收起阅读 »

Hive 与 ElasticSearch 的数据交互

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

自研基于StanfordNLP的ES分词插件

为ES构建Stanford NLP分词插件

Stanford NLP?

Stanford分词器是斯坦福大学NLP团队维护的一个开源分词器,支持了包括中文、英文…的语言,而且除了分词之外,它还支持了包括词性分析、情感分析…的各种功能。\ 这俩是这个project的项目主页

Why Stanford core NLP?

  市面上确实会有很多很有名的开源分词器,比如IK、Jieba,还有一些其他团队和公司提供的开源/商用的分词器,他们各有优劣。但是在各种分词器上比较了一大堆的分词case之后,我们发现Stanford NLP似乎是最适合我们当前需求的一个,因为我们不仅仅需要分词,还需要一些包括情感分析之类在内的更多的一些功能。

我们公司是做金融数据的搜索推荐的,在对比了各家分词器之后我们老板觉得Stanford NLP的效果最好,但是作为算法出身的人,他实现了一套非常重的分词、排序、搜索的服务。

在对比如研报、财报之类的信息进行搜索的时候确实会比较有效,但是在对经济类的新闻进行搜索的时候就会显得十分的笨重。

基于这个背景,我开始试图在ES里面引入老板推崇的Stanford 分词器来适应他的搜索、分词的需要,同时也能够不通过他那个笨重的分词排序服务来对我们系统中大量的经济、金融类的新闻进行分词、索引,并提供和他自己分词效果类似的分词和检索服务。

Why this project

我在包括百度、某谷姓404网站、GitHub以及国内的中文社区(Elastic中文社区)在内的各种地方搜过也问过了,但是似乎没有一个直接开箱可用的分词插件。所以,我只剩一条路了,就是搭建一个自己的插件来引用这个分词器。

How

对ES来说,插件主要分为两个部分:

  1. 让ES可以看到的部分(class extends Plugin)
  2. 自己行使职能的部分(functional part)

plugin

  1. 为了让ES可以加载我们的plugin,我们需要先继承Plugin类,然后我们这个是个分词器插件,所以还要实现AnalysisPlugin类
  2. 看过ES源码或者其他分词器源码的同学应该会知道,分词器插件需要实现两个方法,一个用来提供tokenizer,一个是analyzer分别对应分词器中的这俩。
    • 重写Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>>是为了可以提供搜索用分词器
    • 重写Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>>是为了可以提供索引用分词器
  3. 在这个分词器里面我们主要是依靠Tokenizer来实现分词的

functional class

分词器,特别是Tokenizer主要是靠重写三个方法来实现分词的

  1. incrementToken:用来确定每一个词元,输出每一个单词(字)以及它的位置、长度等
  2. reset:用来重制分词结果
  3. end:用来告诉ES,这段文本的分词已经结束了

所以我们主要需要重写的就是这仨方法,当然了,为了能让分词器正确的使用,我们还需要添加一些分词器的配置和初始化的内容,具体代码不写了可以参考我的git,主要讲两个坑:

  1. ES是通过配置文件里的路径来寻找对应的插件类
  2. 然后通过配置文件里的key和刚才提到的代码里的key来寻找对应的分词器,所以这俩地方不要写错了 #plugin-descriptor.properties: classname=org.elasticsearch.plugin.analysis.AnalysisSDPlugin #plugin-descriptor.properties: name=stanford-core-nlp
  3. 在开发过程中由于有java-security的存在,所以需要通过AccessController来调用和加载我们需要的外部jar包

odds and ends

  1. Stanford分词器里面包含了很多功能,目前我使用了分词的部分
  2. 分词器自带词典文件,不过如果要做词典的修改可能需要解包,修改,再重新打包
  3. 我现在hardcode了一大堆的标点符号在里面,后面可能会去优化一下部分逻辑
  4. 待完成的功能还有其他功能包括情感分析之类的

also see

GitHub 地址

继续阅读 »

为ES构建Stanford NLP分词插件

Stanford NLP?

Stanford分词器是斯坦福大学NLP团队维护的一个开源分词器,支持了包括中文、英文…的语言,而且除了分词之外,它还支持了包括词性分析、情感分析…的各种功能。\ 这俩是这个project的项目主页

Why Stanford core NLP?

  市面上确实会有很多很有名的开源分词器,比如IK、Jieba,还有一些其他团队和公司提供的开源/商用的分词器,他们各有优劣。但是在各种分词器上比较了一大堆的分词case之后,我们发现Stanford NLP似乎是最适合我们当前需求的一个,因为我们不仅仅需要分词,还需要一些包括情感分析之类在内的更多的一些功能。

我们公司是做金融数据的搜索推荐的,在对比了各家分词器之后我们老板觉得Stanford NLP的效果最好,但是作为算法出身的人,他实现了一套非常重的分词、排序、搜索的服务。

在对比如研报、财报之类的信息进行搜索的时候确实会比较有效,但是在对经济类的新闻进行搜索的时候就会显得十分的笨重。

基于这个背景,我开始试图在ES里面引入老板推崇的Stanford 分词器来适应他的搜索、分词的需要,同时也能够不通过他那个笨重的分词排序服务来对我们系统中大量的经济、金融类的新闻进行分词、索引,并提供和他自己分词效果类似的分词和检索服务。

Why this project

我在包括百度、某谷姓404网站、GitHub以及国内的中文社区(Elastic中文社区)在内的各种地方搜过也问过了,但是似乎没有一个直接开箱可用的分词插件。所以,我只剩一条路了,就是搭建一个自己的插件来引用这个分词器。

How

对ES来说,插件主要分为两个部分:

  1. 让ES可以看到的部分(class extends Plugin)
  2. 自己行使职能的部分(functional part)

plugin

  1. 为了让ES可以加载我们的plugin,我们需要先继承Plugin类,然后我们这个是个分词器插件,所以还要实现AnalysisPlugin类
  2. 看过ES源码或者其他分词器源码的同学应该会知道,分词器插件需要实现两个方法,一个用来提供tokenizer,一个是analyzer分别对应分词器中的这俩。
    • 重写Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>>是为了可以提供搜索用分词器
    • 重写Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>>是为了可以提供索引用分词器
  3. 在这个分词器里面我们主要是依靠Tokenizer来实现分词的

functional class

分词器,特别是Tokenizer主要是靠重写三个方法来实现分词的

  1. incrementToken:用来确定每一个词元,输出每一个单词(字)以及它的位置、长度等
  2. reset:用来重制分词结果
  3. end:用来告诉ES,这段文本的分词已经结束了

所以我们主要需要重写的就是这仨方法,当然了,为了能让分词器正确的使用,我们还需要添加一些分词器的配置和初始化的内容,具体代码不写了可以参考我的git,主要讲两个坑:

  1. ES是通过配置文件里的路径来寻找对应的插件类
  2. 然后通过配置文件里的key和刚才提到的代码里的key来寻找对应的分词器,所以这俩地方不要写错了 #plugin-descriptor.properties: classname=org.elasticsearch.plugin.analysis.AnalysisSDPlugin #plugin-descriptor.properties: name=stanford-core-nlp
  3. 在开发过程中由于有java-security的存在,所以需要通过AccessController来调用和加载我们需要的外部jar包

odds and ends

  1. Stanford分词器里面包含了很多功能,目前我使用了分词的部分
  2. 分词器自带词典文件,不过如果要做词典的修改可能需要解包,修改,再重新打包
  3. 我现在hardcode了一大堆的标点符号在里面,后面可能会去优化一下部分逻辑
  4. 待完成的功能还有其他功能包括情感分析之类的

also see

GitHub 地址

收起阅读 »

ELK 使用小技巧(第 4 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

Day 21 - ECE 版本升级扫雷实战

Elastic Cloud Enterprise 出来也有一年多的时间了。对于这类的开源软件企业版本,基本都是在可管理性和稳定性上面下功夫。但是新产品免不了需要经历一下bug的打磨才会变得成熟。

下面分享的这个案例是当我们在把集群从5.4.1 升级到5.6.12 的过程中,遇到节点关闭受阻,升级不完整等情景。以及对应的处理方法。
首先在ECE中,版本是通过stack的方式管理
 
QQ20181221-104539@2x.png

Ref : https://www.elastic.co/guide/e ... .html

这些版本都是以docker images的形式存储

QQ20181221-104619@2x.png

因此,ECE根据不同的版本,然后选择对应的docker image就可以创建一个节点了. 那么升级的过程就可以简单分成几个步骤

1.exclude准备升级的节点。

2.停止节点ES进程,更换container 版本。

3.重新启动节点,加入集群。

4.在其他节点上重复以上流程。

在这个过程中, 实际使用的时候发现有一些需要注意的雷区.
扫雷一:使用UI触发升级,必须保证集群没有自定义插件和bundles

ECE 里面的集群操作是通过plan来控制的

QQ20181221-104650@2x.png

任何的集群操作最终都会生成一个plan的diff。如上图,把集群从5.4.1 升级到 5.6.12 会产生以上diff.

正常情况下是没有问题的.

如果集群配置了自定义bundles, 比如LDAP bundles, ref:https://www.elastic.co/guide/e ... .html

那么在集群的plan里面就会存在这么一段配置

QQ20181221-104718@2x.png

那么当我们在按下升级按钮的时候

QQ20181221-104744@2x.png

ECE 只在plan中修改了集群大版本的配置, 但是并没有修改自定义bundles中的版本号(仍然是5.4.1).

在这种情况下去执行升级,会直接产生报错.

QQ20181221-104810@2x.png

界面上没有显示原因, 但是这是因为plan里面大版本和bundle中的版本不一致,然后会导致新增的节点无法启动. 于是ECE 就认为集群升级失败了.
解决方法是手动编辑plan,把自定义bundles中的版本号改成和集群版本一致

QQ20181221-104848@2x.png

然后使用ECE 提供的一种手动使用plan进行集群升级的方式进行升级.

扫雷二:节点无法关闭

ECE 控制container 是通过一个叫做 constructor的服务。constructor 通过接收集群的更改需求,制定具体的更改计划与步骤,指导allocator对container进行操作。同时也负责保证集群高可靠性,通过Availability Zone的数量在不同的AZ上面部署节点。

当Allocator接受到关闭container命令的时候,会尝试去关闭container,如果container处于一个阻塞状态无法响应, 那么关闭命令无法执行成功。这个时候constructor会等待节点关闭,但是allocator又认为节点已经接受到关闭命令了。又或者constructor发送给allocator的过程中网络丢包, 这个时候allocator 没有正确接受关闭container的命令. 整个升级进程就卡住了。 这种情况十分罕见,通常发现一个container如果处于”正在关闭”时间太久了, 那么通常就是中间的通信出现问题了.

QQ20181221-104926@2x.png

解决办法是可以通过手动停止container, 在对应的allocator上面找到container,使用

docker stop <container_name>

停止container,这样可以出发allocator更新container的健康状态,上报这个container已经关闭了, 从而打通流程并执行下一步。
扫雷三: 多版本并存

如果使用上面的方式强行关闭docker container, 虽然可以让升级进程继续进行下去. 但是被手动关闭的节点会保留原来的版本。于是在升级后查看各个节点的版本,会发现部分节点是5.4.1, 部分是5.6.12.

因为节点是强制关闭的, ECE直接认为节点已经完成升级,并重新启动这个container. 而在这个处理中,跳过了升级docker image的一步.

为什么不是生成一个新的container呢? 因为从plan里面可以看到

QQ20181221-104951@2x.png

在默认情况下, ECE 处理版本升级是使用rolling 策略 Ref: https://www.elastic.co/guide/e ... .html

在这个策略下,ECE会停止当前container并直接修改重启。

如果ECE集群容量允许, 可以改成grow_and_shrink 策略, 这样ECE 会创建新的container并且销毁旧的container, 避免集群出现多版本.
如果出现了多版本的集群,可以通过更改集群任意一个配置来触发 grow_and_shrink 同样可以使到版本回归一致.
总结来说ECE 在版本升级方面还是有很多需要改进的地方. 对于ECE用户再说在使用ECE的版本升级功能的时候主要有以下建议

1. 自己学会手动修改plan. 这也是每一个ECE support engineer 都会干的事情.

2.如果集群容量允许,尽量使用 grow_and_shrink的策略来进行集群操作.
 
继续阅读 »
Elastic Cloud Enterprise 出来也有一年多的时间了。对于这类的开源软件企业版本,基本都是在可管理性和稳定性上面下功夫。但是新产品免不了需要经历一下bug的打磨才会变得成熟。

下面分享的这个案例是当我们在把集群从5.4.1 升级到5.6.12 的过程中,遇到节点关闭受阻,升级不完整等情景。以及对应的处理方法。
首先在ECE中,版本是通过stack的方式管理
 
QQ20181221-104539@2x.png

Ref : https://www.elastic.co/guide/e ... .html

这些版本都是以docker images的形式存储

QQ20181221-104619@2x.png

因此,ECE根据不同的版本,然后选择对应的docker image就可以创建一个节点了. 那么升级的过程就可以简单分成几个步骤

1.exclude准备升级的节点。

2.停止节点ES进程,更换container 版本。

3.重新启动节点,加入集群。

4.在其他节点上重复以上流程。

在这个过程中, 实际使用的时候发现有一些需要注意的雷区.
扫雷一:使用UI触发升级,必须保证集群没有自定义插件和bundles

ECE 里面的集群操作是通过plan来控制的

QQ20181221-104650@2x.png

任何的集群操作最终都会生成一个plan的diff。如上图,把集群从5.4.1 升级到 5.6.12 会产生以上diff.

正常情况下是没有问题的.

如果集群配置了自定义bundles, 比如LDAP bundles, ref:https://www.elastic.co/guide/e ... .html

那么在集群的plan里面就会存在这么一段配置

QQ20181221-104718@2x.png

那么当我们在按下升级按钮的时候

QQ20181221-104744@2x.png

ECE 只在plan中修改了集群大版本的配置, 但是并没有修改自定义bundles中的版本号(仍然是5.4.1).

在这种情况下去执行升级,会直接产生报错.

QQ20181221-104810@2x.png

界面上没有显示原因, 但是这是因为plan里面大版本和bundle中的版本不一致,然后会导致新增的节点无法启动. 于是ECE 就认为集群升级失败了.
解决方法是手动编辑plan,把自定义bundles中的版本号改成和集群版本一致

QQ20181221-104848@2x.png

然后使用ECE 提供的一种手动使用plan进行集群升级的方式进行升级.

扫雷二:节点无法关闭

ECE 控制container 是通过一个叫做 constructor的服务。constructor 通过接收集群的更改需求,制定具体的更改计划与步骤,指导allocator对container进行操作。同时也负责保证集群高可靠性,通过Availability Zone的数量在不同的AZ上面部署节点。

当Allocator接受到关闭container命令的时候,会尝试去关闭container,如果container处于一个阻塞状态无法响应, 那么关闭命令无法执行成功。这个时候constructor会等待节点关闭,但是allocator又认为节点已经接受到关闭命令了。又或者constructor发送给allocator的过程中网络丢包, 这个时候allocator 没有正确接受关闭container的命令. 整个升级进程就卡住了。 这种情况十分罕见,通常发现一个container如果处于”正在关闭”时间太久了, 那么通常就是中间的通信出现问题了.

QQ20181221-104926@2x.png

解决办法是可以通过手动停止container, 在对应的allocator上面找到container,使用

docker stop <container_name>

停止container,这样可以出发allocator更新container的健康状态,上报这个container已经关闭了, 从而打通流程并执行下一步。
扫雷三: 多版本并存

如果使用上面的方式强行关闭docker container, 虽然可以让升级进程继续进行下去. 但是被手动关闭的节点会保留原来的版本。于是在升级后查看各个节点的版本,会发现部分节点是5.4.1, 部分是5.6.12.

因为节点是强制关闭的, ECE直接认为节点已经完成升级,并重新启动这个container. 而在这个处理中,跳过了升级docker image的一步.

为什么不是生成一个新的container呢? 因为从plan里面可以看到

QQ20181221-104951@2x.png

在默认情况下, ECE 处理版本升级是使用rolling 策略 Ref: https://www.elastic.co/guide/e ... .html

在这个策略下,ECE会停止当前container并直接修改重启。

如果ECE集群容量允许, 可以改成grow_and_shrink 策略, 这样ECE 会创建新的container并且销毁旧的container, 避免集群出现多版本.
如果出现了多版本的集群,可以通过更改集群任意一个配置来触发 grow_and_shrink 同样可以使到版本回归一致.
总结来说ECE 在版本升级方面还是有很多需要改进的地方. 对于ECE用户再说在使用ECE的版本升级功能的时候主要有以下建议

1. 自己学会手动修改plan. 这也是每一个ECE support engineer 都会干的事情.

2.如果集群容量允许,尽量使用 grow_and_shrink的策略来进行集群操作.
  收起阅读 »

Day 17 - 关于日志型数据管理策略的思考

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

冷热数据分离

冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  1. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: heap-size。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  2. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  3. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  4. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

热结点分组管理

所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

索引的Rollover

Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

索引的Rollup

我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

多集群的便利性

数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。

继续阅读 »

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

冷热数据分离

冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  1. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: heap-size。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  2. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  3. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  4. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

热结点分组管理

所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

索引的Rollover

Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

索引的Rollup

我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

多集群的便利性

数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。

收起阅读 »