Well,不要刷屏了

社区日报 第533期 (2019-02-21)

1.如何构建实时全文检索引擎
http://t.cn/EVD0sgm
2.elasticsearch深入--地理位置
http://t.cn/EVe00vL
3.Elasticsearch Painless 脚本语言入门
http://t.cn/EVevxBw

编辑:金桥
归档:https://elasticsearch.cn/article/6363
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.如何构建实时全文检索引擎
http://t.cn/EVD0sgm
2.elasticsearch深入--地理位置
http://t.cn/EVe00vL
3.Elasticsearch Painless 脚本语言入门
http://t.cn/EVevxBw

编辑:金桥
归档:https://elasticsearch.cn/article/6363
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第532期 (2019-02-20)

1.有限状态机与 Lucene 的那些事
http://t.cn/EVmHHhp
2.HBase 2.0 协处理器实现 Elasticsearch 数据同步
http://t.cn/EVmHg8V
3.Elasticsearch 官方 Go client
http://t.cn/RlyFLzy
 
编辑:江水
归档:https://elasticsearch.cn/article/6362
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.有限状态机与 Lucene 的那些事
http://t.cn/EVmHHhp
2.HBase 2.0 协处理器实现 Elasticsearch 数据同步
http://t.cn/EVmHg8V
3.Elasticsearch 官方 Go client
http://t.cn/RlyFLzy
 
编辑:江水
归档:https://elasticsearch.cn/article/6362
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第531期 (2019-02-19)

1、(自备梯子)如何使用Elasticsearch,Logstash和Kibana实时显示Python中的日志。
http://t.cn/EtnXpxL
2、Elasticsearch自动补全实践。
http://t.cn/EVl8L8O
​3、如何为旧应用集成ElasticSearch。
http://t.cn/EVl8ItS

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6361
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、(自备梯子)如何使用Elasticsearch,Logstash和Kibana实时显示Python中的日志。
http://t.cn/EtnXpxL
2、Elasticsearch自动补全实践。
http://t.cn/EVl8L8O
​3、如何为旧应用集成ElasticSearch。
http://t.cn/EVl8ItS

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6361
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第530期 (2019-02-18)

1.Elasticsearch 6.3 SQL功能使用案例分享
http://t.cn/EVCAAgJ
2.23个最有用的Elasticsearch检索技巧
http://t.cn/EVC20yd
3.在kibana V6.5.1上开发认证插件的踩坑记录
http://t.cn/EVCyNFH

编辑:cyberdak
归档:https://elasticsearch.cn/article/6360
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.Elasticsearch 6.3 SQL功能使用案例分享
http://t.cn/EVCAAgJ
2.23个最有用的Elasticsearch检索技巧
http://t.cn/EVC20yd
3.在kibana V6.5.1上开发认证插件的踩坑记录
http://t.cn/EVCyNFH

编辑:cyberdak
归档:https://elasticsearch.cn/article/6360
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

【同花顺招聘】AI事业部搜索项目负责人/技术专家/产品总监 30-60K

【公司介绍】
同花顺成立于1995年,国内第一家互联网金融信息服务行业上市公司。目前拥有 3000 多个员工,主要
业务包括四大部分:网上行情交易系统,移动金融信息服务,基金销售,金融大数据处理及云服务等,另
外还有目前大力发展的 AI 项目(问财事业部:智能投顾,智能外呼等业务),欢迎大家加入我们~ 
搜索项目负责人
【岗位职责】
1、负责搜索整体领域架构设计,结合业务领域分析梳理搜索领域解决方案,设计全局搜索整体领域架构,支撑业务场景应用;
2、负责搜索领域生态架构设计:构建搜索领域体系架构设计,含控制架构、数据架构、接口及工具链的系统架构设计;
3、负责搜索领域竞争力演进,架构演进路标规划及架构看护:分析行业动态,并制定架构演进路标,按路标完成架构目标落地,及架构演进看护。
【岗位要求】
1、主持或者参与过业界搜索引擎产品设计;
2、熟悉搜索领域关键技术,精通搜索引擎架构原理、排序算法、索引处理、分词算法和索引数据结构等搜索引擎核心技术;
3、精通ElasticSearch、Lucene、Solr等搜索基础框架,有大中型互联网垂直搜索引擎系统开发经验,或者全站搜索引擎的优先;
4、具备深厚的特性方法和架构设计方法论,高级领域建模、需求建模能力,以及成功的领域设计经验。
 
【我们的福利】
AI领域的全面发展,已经取得部分成效;
30人搜索团队规模,行业前沿的技术研发;
Boss的大力支持,看你想怎么做;
美女多,薪资高,欢迎你来撩~
请联系小姐姐:陈 18810574062(同微信)
 
 
 
继续阅读 »
【公司介绍】
同花顺成立于1995年,国内第一家互联网金融信息服务行业上市公司。目前拥有 3000 多个员工,主要
业务包括四大部分:网上行情交易系统,移动金融信息服务,基金销售,金融大数据处理及云服务等,另
外还有目前大力发展的 AI 项目(问财事业部:智能投顾,智能外呼等业务),欢迎大家加入我们~ 
搜索项目负责人
【岗位职责】
1、负责搜索整体领域架构设计,结合业务领域分析梳理搜索领域解决方案,设计全局搜索整体领域架构,支撑业务场景应用;
2、负责搜索领域生态架构设计:构建搜索领域体系架构设计,含控制架构、数据架构、接口及工具链的系统架构设计;
3、负责搜索领域竞争力演进,架构演进路标规划及架构看护:分析行业动态,并制定架构演进路标,按路标完成架构目标落地,及架构演进看护。
【岗位要求】
1、主持或者参与过业界搜索引擎产品设计;
2、熟悉搜索领域关键技术,精通搜索引擎架构原理、排序算法、索引处理、分词算法和索引数据结构等搜索引擎核心技术;
3、精通ElasticSearch、Lucene、Solr等搜索基础框架,有大中型互联网垂直搜索引擎系统开发经验,或者全站搜索引擎的优先;
4、具备深厚的特性方法和架构设计方法论,高级领域建模、需求建模能力,以及成功的领域设计经验。
 
【我们的福利】
AI领域的全面发展,已经取得部分成效;
30人搜索团队规模,行业前沿的技术研发;
Boss的大力支持,看你想怎么做;
美女多,薪资高,欢迎你来撩~
请联系小姐姐:陈 18810574062(同微信)
 
 
  收起阅读 »

【最新】Elasticsearch 6.6 Index Lifecycle Management 尝鲜

1月29日,Elastic Stack 迎来 6.6 版本的发布,该版本带来很多新功能,比如:

  • Index Lifecycle Management
  • Frozen Index
  • Geoshape based on Bkd Tree
  • SQL adds support for Date histograms
  • ......

在这些众多功能中,Index Lifecycle Management(索引生命周期管理,后文简称 ILM) 是最受社区欢迎的。今天我们从以下几方面来快速了解下该功能:

  1. 为什么索引会有生命?什么是索引生命周期?
  2. ILM 是如何划分索引生命周期的?
  3. ILM 是如何管理索引生命周期的?
  4. 实战

1. Index Lifecycle 索引生命周期

先来看第一个问题:

为什么索引有生命?

索引(Index)是 Elasticsearch 中数据组织的一个逻辑概念,是具有相同或相似字段的文档组合。它由众多分片(Shard)组成,比如 bookpeople都可以用作索引名称,可以简单类比为关系型数据库的表(table)。

所谓生命,即生与死;索引的便是创建删除了。

在我们日常使用 Elasticsearch 的时候,索引的创建与删除似乎是很简单的事情,用的时候便创建,不用了删除即可,有什么好管理的呢?

这就要从 Elasticsearch 的应用场景来看了。

业务搜索场景,用户会将业务数据存储在 Elasticsearch 中,比如商品数据、订单数据、用户数据等,实现快速的全文检索功能。像这类数据基本都是累加的,不会删除。一般删除的话,要么是业务升级,要么是业务歇菜了。此种场景下,基本只有生,没有死,也就不存在管理一说。

而在日志业务场景中,用户会将各种日志,如系统、防火墙、中间件、数据库、web 服务器、应用日志等全部实时地存入 Elasticsearch 中,进行即席日志查询与分析。这种类型的数据都会有时间维度,也就是时序性的数据。由于日志的数据量过大,用户一般不会存储全量的数据,一般会在 Elasticsearch 中存储热数据,比如最近7天、30天的数据等,而在7天或者30天之前的数据都会被删除。为了便于操作,用户一般会按照日期来建立索引,比如 nginx 的日志索引名可能为 nginx_log-2018.11.12nginx_log-2018.11.13等,当要查询或删除某一天的日志时,只需要针对对应日期的索引做操作就可以了。那么在该场景下,每天都会上演大量索引的生与死。

一个索引由生到死的过程,即为一个生命周期。举例如下:

  • 生:在 2019年2月5日 创建 nginx_log-2019.02.05的索引,开始处理日志数据的读写请求
  • 生:在 2019年2月6日 nginx_log-2019.02.05 索引便不再处理写请求,只处理读请求
  • 死:在 2019年3月5日 删除 nginx_log-2018.02.05的索引

其他的索引,如 nginx_log-2019.02.06 等也会经过相同的一个生命周期。

2. ILM 是如何划分索引生命周期的?

我们现在已经了解何为生命周期了,而最简单的生命周期只需要两个阶段即可。但在实际使用中生命周期是有多个阶段的,我们来看下 ILM 是如何划分生命周期的。

ILM 一共将索引生命周期分为四个阶段(Phase):

  1. Hot 阶段
  2. Warm 阶段
  3. Cold 阶段
  4. Delete 阶段

如果我们拿一个人的生命周期来做类比的话,大概如下图所示:

Index Lifecycle

Hot 阶段

Hot 阶段可类比为人类婴儿到青年的阶段,在这个阶段,它会不断地进行知识的输入与输出(数据读写),不断地长高长大(数据量增加)成有用的青年。

由于该阶段需要进行大量的数据读写,因此需要高配置的节点,一般建议将节点内存与磁盘比控制在 32 左右,比如 64GB 内存搭配 2TB 的 SSD 硬盘。

Warm 阶段

Warm 阶段可类比为人类青年到中年的阶段,在这个阶段,它基本不会再进行知识的输入(数据写入),主要进行知识输出(数据读取),为社会贡献价值。

由于该阶段主要负责数据的读取,中等配置的节点即可满足需求,可以将节点内存与磁盘比提高到 64~96 之间,比如 64GB 内存搭配 4~6TB 的 HDD 磁盘。

Cold 阶段

Cold 阶段可类别比为人类中年到老年的阶段,在这个阶段,它退休了,在社会有需要的时候才出来输出下知识(数据读取),大部分情况都是静静地待着。

由于该阶段只负责少量的数据读取工作,低等配置的节点即可满足要求,可以将节点内存与磁盘比进一步提高到 96 以上,比如128,即 64GB 内存搭配 8 TB 的 HDD 磁盘。

Delete 阶段

Delete 阶段可类比为人类寿终正寝的阶段,在发光发热之后,静静地逝去,Rest in Peace~

ILM 对于索引的生命周期进行了非常详细的划分,但它并不强制要求必须有这个4个阶段,用户可以根据自己的需求组合成自己的生命周期。

3. ILM 是如何管理索引生命周期的?

所谓生命周期的管理就是控制 4 个生命阶段转换,何时由 Hot 转为 Warm,何时由 Warm 转为 Cold,何时 Delete 等。

阶段的转换必然是需要时机的,而对于时序数据来说,时间必然是最好的维度,而 ILM 也是以时间为转换的衡量单位。比如下面这张转换的示意图,即默认是 Hot 阶段,在索引创建 3 天后转为 Warm 阶段,7 天后转为 Cold 阶段,30 天后删除。这个设置的相关字段为 min_age,后文会详细讲解。

ILM 将不同的生命周期管理策略称为 Policy,而所谓的 Policy 是由不同阶段(Phase)的不同动作(Action)组成的。

Action是一系列操作索引的动作,比如 Rollover、Shrink、Force Merge等,不同阶段可用的 Action 不同,详情如下:

  • Hot Phase
    • Rollover 滚动索引操作,可用在索引大小或者文档数达到某设定值时,创建新的索引用于数据读写,从而控制单个索引的大小。这里要注意的一点是,如果启用了 Rollover,那么所有阶段的时间不再以索引创建时间为准,而是以该索引 Rollover 的时间为准。
  • Warm Phase
    • Allocate 设定副本数、修改分片分配规则(如将分片迁移到中等配置的节点上)等
    • Read-Onlly 设定当前索引为只读状态
    • Force Merge 合并 segment 操作
    • Shrink 缩小 shard 分片数
  • Cold Phase
    • Allocate 同上
  • Delete Phase
    • Delete 删除

从上面看下来整体操作还是很简单的,Kibana 也提供了一套 UI 界面来设置这些策略,如下所示:

kibana ilm

从上图看下来 ILM 的设置是不是一目了然呢?

当然,ILM 是有自己的 api 的,比如上面图片对应的 api 请求如下:

PUT /_ilm/policy/test_ilm2
{
    "policy": {
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_age": "30d",
                        "max_size": "50gb"
                    }
                }
            },
            "warm": {
                "min_age": "3d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "warm"
                        },
                        "number_of_replicas": 0
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    },
                    "shrink": {
                        "number_of_shards": 1
                    }
                }
            },
            "cold": {
                "min_age": "7d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "cold"
                        }
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
}

这里不展开讲了,感兴趣的同学可以自行查看官方手册。

现在管理策略(Policy)已经有了,那么如何应用到索引(Index)上面呢?

方法为设定如下的索引配置:

  • index.lifecycle.name 设定 Policy 名称,比如上面的 test_ilm2
  • index.lifecycle.rollover_alias 如果使用了 Rollover,那么还需要指定该别名

修改索引配置可以直接修改(`PUT index_name/_settings)或者通过索引模板(Index Template)来实现。

我们这里不展开讲了,大家参考下面的实战就明白了。

4. 实战

下面我们来实际演练一把!

目标

现在需要收集 nginx 日志,只需保留最近30天的日志,但要保证最近7天的日志有良好的查询性能,搜索响应时间在 100ms 以内。

为了让大家可以快速看到效果,下面实际操作的时候会将 30天7天 替换为 40秒20秒

ES 集群架构

这里我们简单介绍下这次实战所用 ES 集群的构成。该 ES 集群一共有 3个节点组成,每个节点都有名为 box_type 的属性,如下所示:

GET _cat/nodeattrs?s=attr
es01_hot  172.24.0.5 172.24.0.5 box_type          hot
es02_warm 172.24.0.4 172.24.0.4 box_type          warm
es03_cold 172.24.0.3 172.24.0.3 box_type          cold

由上可见我们有 1 个 hot 节点、1 个 warm 节点、1 个 cold 节点,分别用于对应 ILM 的阶段,即 Hot 阶段的索引都位于 hot 上,Warm 阶段的索引都位于 warm 上,Cold 阶段的索引都位于 cold 上。

创建 ILM Policy

根据要求,我们的 Policy 设定如下:

  • 索引名以 nginx_logs 为前缀,且以每10个文档做一次 Rollover
  • Rollover 后 5 秒转为 Warm 阶段
  • Rollover 后 20 秒转为 Cold 阶段
  • Rollover 后 40 秒删除

API 请求如下:

PUT /_ilm/policy/nginx_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_docs": "10"
          }
        }
      },
      "warm": {
        "min_age": "5s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "warm"
            }
          }
        }
      },
      "cold": {
        "min_age": "20s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "cold"
            }
          }
        }
      },
      "delete": {
        "min_age": "40s",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建 Index Template

我们基于索引模板来创建所需的索引,如下所示:

PUT /_template/nginx_ilm_template
{
  "index_patterns": ["nginx_logs-*"],                 
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.lifecycle.name": "nginx_ilm_policy",      
    "index.lifecycle.rollover_alias": "nginx_logs",
    "index.routing.allocation.include.box_type": "hot"
  }
}

上述配置解释如下:

  • index.lifecycle.name 指明该索引应用的 ILM Policy
  • index.lifecycle.rollover_alias 指明在 Rollover 的时候使用的 alias
  • index.routing.allocation.include.box_type 指明新建的索引都分配在 hot 节点上

创建初始索引 Index

ILM 的第一个索引需要我们手动来创建,另外启动 Rollover 必须以数值类型结尾,比如 nginx_logs-000001。索引创建的 api 如下:

PUT nginx_logs-000001
{
  "aliases": {
    "nginx_logs": {
      "is_write_index":true
    }
  }
}

此时索引分布如下所示:

修改 ILM Polling Interval

ILM Service 会在后台轮询执行 Policy,默认间隔时间为 10 分钟,为了更快地看到效果,我们将其修改为 1 秒。

PUT _cluster/settings
{
  "persistent": {
    "indices.lifecycle.poll_interval":"1s"
  }
}

开始吧

一切准备就绪,我们开始吧!

首先执行下面的新建文档操作 10 次。

POST nginx_logs/_doc
{
  "name":"abbc"
}

之后 Rollover 执行,新的索引创建,如下所示:

5 秒后,nginx_logs-000001 转到 Warm 阶段

15 秒后(20 秒是指距离 Rollover 的时间,因为上面已经过去5秒了,所以这里只需要15秒),nginx_logs-00001转到 Cold 阶段

25 秒后,nginx_logs-00001删除

至此,一个完整的 ILM Policy 执行的流程就结束了,而后续 nginx_logs-000002 也会按照这个设定进行流转。

总结

ILM 是 Elastic 团队将多年 Elasticsearch 在日志场景领域的最佳实践进行的一次总结归纳和落地实施,极大地降低了用好 Elasticsearch 的门槛。掌握了 ILM 的核心概念,也就意味着掌握了 Elasticsearch 的最佳实践。希望本文能对大家入门 ILM 有所帮助。

在线研讨会

一篇文章所能承载的信息量和演示效果终究是有限的,2月份我们会针对 ILM 做一次线上研讨会,感兴趣的同学可以点击下面的链接注册报名。

https://jinshuju.net/f/TjBbvx

为了提高研讨会的质量,我们本次引入了审核机制,报名的同学请耐心等待,待我们审核通过后,您会收到我们的邮件邀请。

参考资料:

  1. https://www.elastic.co/guide/en/elasticsearch/reference/6.6/index-lifecycle-management.html
  2. https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x
继续阅读 »

1月29日,Elastic Stack 迎来 6.6 版本的发布,该版本带来很多新功能,比如:

  • Index Lifecycle Management
  • Frozen Index
  • Geoshape based on Bkd Tree
  • SQL adds support for Date histograms
  • ......

在这些众多功能中,Index Lifecycle Management(索引生命周期管理,后文简称 ILM) 是最受社区欢迎的。今天我们从以下几方面来快速了解下该功能:

  1. 为什么索引会有生命?什么是索引生命周期?
  2. ILM 是如何划分索引生命周期的?
  3. ILM 是如何管理索引生命周期的?
  4. 实战

1. Index Lifecycle 索引生命周期

先来看第一个问题:

为什么索引有生命?

索引(Index)是 Elasticsearch 中数据组织的一个逻辑概念,是具有相同或相似字段的文档组合。它由众多分片(Shard)组成,比如 bookpeople都可以用作索引名称,可以简单类比为关系型数据库的表(table)。

所谓生命,即生与死;索引的便是创建删除了。

在我们日常使用 Elasticsearch 的时候,索引的创建与删除似乎是很简单的事情,用的时候便创建,不用了删除即可,有什么好管理的呢?

这就要从 Elasticsearch 的应用场景来看了。

业务搜索场景,用户会将业务数据存储在 Elasticsearch 中,比如商品数据、订单数据、用户数据等,实现快速的全文检索功能。像这类数据基本都是累加的,不会删除。一般删除的话,要么是业务升级,要么是业务歇菜了。此种场景下,基本只有生,没有死,也就不存在管理一说。

而在日志业务场景中,用户会将各种日志,如系统、防火墙、中间件、数据库、web 服务器、应用日志等全部实时地存入 Elasticsearch 中,进行即席日志查询与分析。这种类型的数据都会有时间维度,也就是时序性的数据。由于日志的数据量过大,用户一般不会存储全量的数据,一般会在 Elasticsearch 中存储热数据,比如最近7天、30天的数据等,而在7天或者30天之前的数据都会被删除。为了便于操作,用户一般会按照日期来建立索引,比如 nginx 的日志索引名可能为 nginx_log-2018.11.12nginx_log-2018.11.13等,当要查询或删除某一天的日志时,只需要针对对应日期的索引做操作就可以了。那么在该场景下,每天都会上演大量索引的生与死。

一个索引由生到死的过程,即为一个生命周期。举例如下:

  • 生:在 2019年2月5日 创建 nginx_log-2019.02.05的索引,开始处理日志数据的读写请求
  • 生:在 2019年2月6日 nginx_log-2019.02.05 索引便不再处理写请求,只处理读请求
  • 死:在 2019年3月5日 删除 nginx_log-2018.02.05的索引

其他的索引,如 nginx_log-2019.02.06 等也会经过相同的一个生命周期。

2. ILM 是如何划分索引生命周期的?

我们现在已经了解何为生命周期了,而最简单的生命周期只需要两个阶段即可。但在实际使用中生命周期是有多个阶段的,我们来看下 ILM 是如何划分生命周期的。

ILM 一共将索引生命周期分为四个阶段(Phase):

  1. Hot 阶段
  2. Warm 阶段
  3. Cold 阶段
  4. Delete 阶段

如果我们拿一个人的生命周期来做类比的话,大概如下图所示:

Index Lifecycle

Hot 阶段

Hot 阶段可类比为人类婴儿到青年的阶段,在这个阶段,它会不断地进行知识的输入与输出(数据读写),不断地长高长大(数据量增加)成有用的青年。

由于该阶段需要进行大量的数据读写,因此需要高配置的节点,一般建议将节点内存与磁盘比控制在 32 左右,比如 64GB 内存搭配 2TB 的 SSD 硬盘。

Warm 阶段

Warm 阶段可类比为人类青年到中年的阶段,在这个阶段,它基本不会再进行知识的输入(数据写入),主要进行知识输出(数据读取),为社会贡献价值。

由于该阶段主要负责数据的读取,中等配置的节点即可满足需求,可以将节点内存与磁盘比提高到 64~96 之间,比如 64GB 内存搭配 4~6TB 的 HDD 磁盘。

Cold 阶段

Cold 阶段可类别比为人类中年到老年的阶段,在这个阶段,它退休了,在社会有需要的时候才出来输出下知识(数据读取),大部分情况都是静静地待着。

由于该阶段只负责少量的数据读取工作,低等配置的节点即可满足要求,可以将节点内存与磁盘比进一步提高到 96 以上,比如128,即 64GB 内存搭配 8 TB 的 HDD 磁盘。

Delete 阶段

Delete 阶段可类比为人类寿终正寝的阶段,在发光发热之后,静静地逝去,Rest in Peace~

ILM 对于索引的生命周期进行了非常详细的划分,但它并不强制要求必须有这个4个阶段,用户可以根据自己的需求组合成自己的生命周期。

3. ILM 是如何管理索引生命周期的?

所谓生命周期的管理就是控制 4 个生命阶段转换,何时由 Hot 转为 Warm,何时由 Warm 转为 Cold,何时 Delete 等。

阶段的转换必然是需要时机的,而对于时序数据来说,时间必然是最好的维度,而 ILM 也是以时间为转换的衡量单位。比如下面这张转换的示意图,即默认是 Hot 阶段,在索引创建 3 天后转为 Warm 阶段,7 天后转为 Cold 阶段,30 天后删除。这个设置的相关字段为 min_age,后文会详细讲解。

ILM 将不同的生命周期管理策略称为 Policy,而所谓的 Policy 是由不同阶段(Phase)的不同动作(Action)组成的。

Action是一系列操作索引的动作,比如 Rollover、Shrink、Force Merge等,不同阶段可用的 Action 不同,详情如下:

  • Hot Phase
    • Rollover 滚动索引操作,可用在索引大小或者文档数达到某设定值时,创建新的索引用于数据读写,从而控制单个索引的大小。这里要注意的一点是,如果启用了 Rollover,那么所有阶段的时间不再以索引创建时间为准,而是以该索引 Rollover 的时间为准。
  • Warm Phase
    • Allocate 设定副本数、修改分片分配规则(如将分片迁移到中等配置的节点上)等
    • Read-Onlly 设定当前索引为只读状态
    • Force Merge 合并 segment 操作
    • Shrink 缩小 shard 分片数
  • Cold Phase
    • Allocate 同上
  • Delete Phase
    • Delete 删除

从上面看下来整体操作还是很简单的,Kibana 也提供了一套 UI 界面来设置这些策略,如下所示:

kibana ilm

从上图看下来 ILM 的设置是不是一目了然呢?

当然,ILM 是有自己的 api 的,比如上面图片对应的 api 请求如下:

PUT /_ilm/policy/test_ilm2
{
    "policy": {
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_age": "30d",
                        "max_size": "50gb"
                    }
                }
            },
            "warm": {
                "min_age": "3d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "warm"
                        },
                        "number_of_replicas": 0
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    },
                    "shrink": {
                        "number_of_shards": 1
                    }
                }
            },
            "cold": {
                "min_age": "7d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "cold"
                        }
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
}

这里不展开讲了,感兴趣的同学可以自行查看官方手册。

现在管理策略(Policy)已经有了,那么如何应用到索引(Index)上面呢?

方法为设定如下的索引配置:

  • index.lifecycle.name 设定 Policy 名称,比如上面的 test_ilm2
  • index.lifecycle.rollover_alias 如果使用了 Rollover,那么还需要指定该别名

修改索引配置可以直接修改(`PUT index_name/_settings)或者通过索引模板(Index Template)来实现。

我们这里不展开讲了,大家参考下面的实战就明白了。

4. 实战

下面我们来实际演练一把!

目标

现在需要收集 nginx 日志,只需保留最近30天的日志,但要保证最近7天的日志有良好的查询性能,搜索响应时间在 100ms 以内。

为了让大家可以快速看到效果,下面实际操作的时候会将 30天7天 替换为 40秒20秒

ES 集群架构

这里我们简单介绍下这次实战所用 ES 集群的构成。该 ES 集群一共有 3个节点组成,每个节点都有名为 box_type 的属性,如下所示:

GET _cat/nodeattrs?s=attr
es01_hot  172.24.0.5 172.24.0.5 box_type          hot
es02_warm 172.24.0.4 172.24.0.4 box_type          warm
es03_cold 172.24.0.3 172.24.0.3 box_type          cold

由上可见我们有 1 个 hot 节点、1 个 warm 节点、1 个 cold 节点,分别用于对应 ILM 的阶段,即 Hot 阶段的索引都位于 hot 上,Warm 阶段的索引都位于 warm 上,Cold 阶段的索引都位于 cold 上。

创建 ILM Policy

根据要求,我们的 Policy 设定如下:

  • 索引名以 nginx_logs 为前缀,且以每10个文档做一次 Rollover
  • Rollover 后 5 秒转为 Warm 阶段
  • Rollover 后 20 秒转为 Cold 阶段
  • Rollover 后 40 秒删除

API 请求如下:

PUT /_ilm/policy/nginx_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_docs": "10"
          }
        }
      },
      "warm": {
        "min_age": "5s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "warm"
            }
          }
        }
      },
      "cold": {
        "min_age": "20s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "cold"
            }
          }
        }
      },
      "delete": {
        "min_age": "40s",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建 Index Template

我们基于索引模板来创建所需的索引,如下所示:

PUT /_template/nginx_ilm_template
{
  "index_patterns": ["nginx_logs-*"],                 
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.lifecycle.name": "nginx_ilm_policy",      
    "index.lifecycle.rollover_alias": "nginx_logs",
    "index.routing.allocation.include.box_type": "hot"
  }
}

上述配置解释如下:

  • index.lifecycle.name 指明该索引应用的 ILM Policy
  • index.lifecycle.rollover_alias 指明在 Rollover 的时候使用的 alias
  • index.routing.allocation.include.box_type 指明新建的索引都分配在 hot 节点上

创建初始索引 Index

ILM 的第一个索引需要我们手动来创建,另外启动 Rollover 必须以数值类型结尾,比如 nginx_logs-000001。索引创建的 api 如下:

PUT nginx_logs-000001
{
  "aliases": {
    "nginx_logs": {
      "is_write_index":true
    }
  }
}

此时索引分布如下所示:

修改 ILM Polling Interval

ILM Service 会在后台轮询执行 Policy,默认间隔时间为 10 分钟,为了更快地看到效果,我们将其修改为 1 秒。

PUT _cluster/settings
{
  "persistent": {
    "indices.lifecycle.poll_interval":"1s"
  }
}

开始吧

一切准备就绪,我们开始吧!

首先执行下面的新建文档操作 10 次。

POST nginx_logs/_doc
{
  "name":"abbc"
}

之后 Rollover 执行,新的索引创建,如下所示:

5 秒后,nginx_logs-000001 转到 Warm 阶段

15 秒后(20 秒是指距离 Rollover 的时间,因为上面已经过去5秒了,所以这里只需要15秒),nginx_logs-00001转到 Cold 阶段

25 秒后,nginx_logs-00001删除

至此,一个完整的 ILM Policy 执行的流程就结束了,而后续 nginx_logs-000002 也会按照这个设定进行流转。

总结

ILM 是 Elastic 团队将多年 Elasticsearch 在日志场景领域的最佳实践进行的一次总结归纳和落地实施,极大地降低了用好 Elasticsearch 的门槛。掌握了 ILM 的核心概念,也就意味着掌握了 Elasticsearch 的最佳实践。希望本文能对大家入门 ILM 有所帮助。

在线研讨会

一篇文章所能承载的信息量和演示效果终究是有限的,2月份我们会针对 ILM 做一次线上研讨会,感兴趣的同学可以点击下面的链接注册报名。

https://jinshuju.net/f/TjBbvx

为了提高研讨会的质量,我们本次引入了审核机制,报名的同学请耐心等待,待我们审核通过后,您会收到我们的邮件邀请。

参考资料:

  1. https://www.elastic.co/guide/en/elasticsearch/reference/6.6/index-lifecycle-management.html
  2. https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x
收起阅读 »

ClusterBlockException SERVICE_UNAVAILABLE/1/state

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
继续阅读 »

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
收起阅读 »

社区日报 第529期 (2019-02-03)

1.在Elasticsearch数据库上公开2400万个信用和抵押记录。
http://t.cn/EtdTYAs
2.使用Elasticsearch构建长期安全运营平台。
http://t.cn/EtdHXAV
3.(自备梯子)飞行汽车比您想象的更接近现实。
http://t.cn/EtdQtfV

编辑:至尊宝
归档:https://elasticsearch.cn/article/6356
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.在Elasticsearch数据库上公开2400万个信用和抵押记录。
http://t.cn/EtdTYAs
2.使用Elasticsearch构建长期安全运营平台。
http://t.cn/EtdHXAV
3.(自备梯子)飞行汽车比您想象的更接近现实。
http://t.cn/EtdQtfV

编辑:至尊宝
归档:https://elasticsearch.cn/article/6356
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第528期 (2019-02-02)

  1. 使用ELK实时展示python日志(自备梯子) http://t.cn/EtnXpxL

2.使用logstash的grok自定义数据格式(自备梯子) http://t.cn/Etna8mi

  1. docker部署ELK监控日志 http://t.cn/EtnTn3O
继续阅读 »
  1. 使用ELK实时展示python日志(自备梯子) http://t.cn/EtnXpxL

2.使用logstash的grok自定义数据格式(自备梯子) http://t.cn/Etna8mi

  1. docker部署ELK监控日志 http://t.cn/EtnTn3O
收起阅读 »

社区日报 第527期 (2019-02-01)

1、Spring Boot  + Elasticsearch开源实现
http://t.cn/EtRjzdC
2、docker一键搭建elasticsearch6.5.0集群
http://t.cn/EtRjy4M
3、深入研究Elasticsearch聚合的性能
http://t.cn/EtRjtEe

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6354
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、Spring Boot  + Elasticsearch开源实现
http://t.cn/EtRjzdC
2、docker一键搭建elasticsearch6.5.0集群
http://t.cn/EtRjy4M
3、深入研究Elasticsearch聚合的性能
http://t.cn/EtRjtEe

编辑:铭毅天下
归档:https://elasticsearch.cn/article/6354
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第526期 (2019-01-31)

1.Elastic Stack 6.6.0发布
https://www.elastic.co/blog/el ... eased
2.基于Lucene查询原理分析Elasticsearch的性能
http://t.cn/EwZO5to
3.基于ELK的uber实时预测引擎
http://t.cn/Et0qg2x

编辑:金桥
归档:https://elasticsearch.cn/article/6353
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.Elastic Stack 6.6.0发布
https://www.elastic.co/blog/el ... eased
2.基于Lucene查询原理分析Elasticsearch的性能
http://t.cn/EwZO5to
3.基于ELK的uber实时预测引擎
http://t.cn/Et0qg2x

编辑:金桥
归档:https://elasticsearch.cn/article/6353
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第525期 (2019-01-30)

1.Elasticsearch DSL 的 Rust 封装
http://t.cn/EtpIMiG
2.Elasticsearch & Hadoop
http://t.cn/EtpImun
3.Lucene索引结构漫谈
http://t.cn/Re1Dp6g

编辑:江水
归档:https://elasticsearch.cn/article/6352
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.Elasticsearch DSL 的 Rust 封装
http://t.cn/EtpIMiG
2.Elasticsearch & Hadoop
http://t.cn/EtpImun
3.Lucene索引结构漫谈
http://t.cn/Re1Dp6g

编辑:江水
归档:https://elasticsearch.cn/article/6352
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第524期 (2019-01-29)

1、Elasticsearch分布式一致性原理分析之元数据。
http://t.cn/EtbgJxx
2、使用Amazon Elasticsearch Service和AWS Lambda对PCI-DSS感知进行警报、监控和报告。
http://t.cn/Etbgorf
3、Node.js和浏览器官方Elasticsearch客户端介绍。
http://t.cn/Etbg04v

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6351
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1、Elasticsearch分布式一致性原理分析之元数据。
http://t.cn/EtbgJxx
2、使用Amazon Elasticsearch Service和AWS Lambda对PCI-DSS感知进行警报、监控和报告。
http://t.cn/Etbgorf
3、Node.js和浏览器官方Elasticsearch客户端介绍。
http://t.cn/Etbg04v

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6351
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

社区日报 第523期 (2019-01-28)

1.大数据搜索选开源还是商业软件?ElasticSearch 对比 Splunk
http://t.cn/EtVI8va
2.ELK做数据挖掘的优缺点
http://t.cn/EtVMwlq
3.导入CSV 和 日志文件到ES中,并使用可视化展示
http://t.cn/EtVM6TN

编辑:cyberdak
归档:https://elasticsearch.cn/article/6350
订阅:https://tinyletter.com/elastic-daily
继续阅读 »
1.大数据搜索选开源还是商业软件?ElasticSearch 对比 Splunk
http://t.cn/EtVI8va
2.ELK做数据挖掘的优缺点
http://t.cn/EtVMwlq
3.导入CSV 和 日志文件到ES中,并使用可视化展示
http://t.cn/EtVM6TN

编辑:cyberdak
归档:https://elasticsearch.cn/article/6350
订阅:https://tinyletter.com/elastic-daily 收起阅读 »

Hive 与 ElasticSearch 的数据交互

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »