ElasticTalk

ElasticTalk

【最新】Elasticsearch 6.6 Index Lifecycle Management 尝鲜

资讯动态rockybean 发表了文章 • 2 个评论 • 587 次浏览 • 2019-02-06 11:32 • 来自相关话题

1月29日,Elastic Stack 迎来 6.6 版本的发布,该版本带来很多新功能,比如:

  • Index Lifecycle Management
  • Frozen Index
  • Geoshape based on Bkd Tree
  • SQL adds support for Date histograms
  • ......

在这些众多功能中,Index Lifecycle Management(索引生命周期管理,后文简称 ILM) 是最受社区欢迎的。今天我们从以下几方面来快速了解下该功能:

  1. 为什么索引会有生命?什么是索引生命周期?
  2. ILM 是如何划分索引生命周期的?
  3. ILM 是如何管理索引生命周期的?
  4. 实战

1. Index Lifecycle 索引生命周期

先来看第一个问题:

为什么索引有生命?

索引(Index)是 Elasticsearch 中数据组织的一个逻辑概念,是具有相同或相似字段的文档组合。它由众多分片(Shard)组成,比如 bookpeople都可以用作索引名称,可以简单类比为关系型数据库的表(table)。

所谓生命,即生与死;索引的便是创建删除了。

在我们日常使用 Elasticsearch 的时候,索引的创建与删除似乎是很简单的事情,用的时候便创建,不用了删除即可,有什么好管理的呢?

这就要从 Elasticsearch 的应用场景来看了。

业务搜索场景,用户会将业务数据存储在 Elasticsearch 中,比如商品数据、订单数据、用户数据等,实现快速的全文检索功能。像这类数据基本都是累加的,不会删除。一般删除的话,要么是业务升级,要么是业务歇菜了。此种场景下,基本只有生,没有死,也就不存在管理一说。

而在日志业务场景中,用户会将各种日志,如系统、防火墙、中间件、数据库、web 服务器、应用日志等全部实时地存入 Elasticsearch 中,进行即席日志查询与分析。这种类型的数据都会有时间维度,也就是时序性的数据。由于日志的数据量过大,用户一般不会存储全量的数据,一般会在 Elasticsearch 中存储热数据,比如最近7天、30天的数据等,而在7天或者30天之前的数据都会被删除。为了便于操作,用户一般会按照日期来建立索引,比如 nginx 的日志索引名可能为 nginx_log-2018.11.12nginx_log-2018.11.13等,当要查询或删除某一天的日志时,只需要针对对应日期的索引做操作就可以了。那么在该场景下,每天都会上演大量索引的生与死。

一个索引由生到死的过程,即为一个生命周期。举例如下:

  • 生:在 2019年2月5日 创建 nginx_log-2019.02.05的索引,开始处理日志数据的读写请求
  • 生:在 2019年2月6日 nginx_log-2019.02.05 索引便不再处理写请求,只处理读请求
  • 死:在 2019年3月5日 删除 nginx_log-2018.02.05的索引

其他的索引,如 nginx_log-2019.02.06 等也会经过相同的一个生命周期。

2. ILM 是如何划分索引生命周期的?

我们现在已经了解何为生命周期了,而最简单的生命周期只需要两个阶段即可。但在实际使用中生命周期是有多个阶段的,我们来看下 ILM 是如何划分生命周期的。

ILM 一共将索引生命周期分为四个阶段(Phase):

  1. Hot 阶段
  2. Warm 阶段
  3. Cold 阶段
  4. Delete 阶段

如果我们拿一个人的生命周期来做类比的话,大概如下图所示:

Index Lifecycle

Hot 阶段

Hot 阶段可类比为人类婴儿到青年的阶段,在这个阶段,它会不断地进行知识的输入与输出(数据读写),不断地长高长大(数据量增加)成有用的青年。

由于该阶段需要进行大量的数据读写,因此需要高配置的节点,一般建议将节点内存与磁盘比控制在 32 左右,比如 64GB 内存搭配 2TB 的 SSD 硬盘。

Warm 阶段

Warm 阶段可类比为人类青年到中年的阶段,在这个阶段,它基本不会再进行知识的输入(数据写入),主要进行知识输出(数据读取),为社会贡献价值。

由于该阶段主要负责数据的读取,中等配置的节点即可满足需求,可以将节点内存与磁盘比提高到 64~96 之间,比如 64GB 内存搭配 4~6TB 的 HDD 磁盘。

Cold 阶段

Cold 阶段可类别比为人类中年到老年的阶段,在这个阶段,它退休了,在社会有需要的时候才出来输出下知识(数据读取),大部分情况都是静静地待着。

由于该阶段只负责少量的数据读取工作,低等配置的节点即可满足要求,可以将节点内存与磁盘比进一步提高到 96 以上,比如128,即 64GB 内存搭配 8 TB 的 HDD 磁盘。

Delete 阶段

Delete 阶段可类比为人类寿终正寝的阶段,在发光发热之后,静静地逝去,Rest in Peace~

ILM 对于索引的生命周期进行了非常详细的划分,但它并不强制要求必须有这个4个阶段,用户可以根据自己的需求组合成自己的生命周期。

3. ILM 是如何管理索引生命周期的?

所谓生命周期的管理就是控制 4 个生命阶段转换,何时由 Hot 转为 Warm,何时由 Warm 转为 Cold,何时 Delete 等。

阶段的转换必然是需要时机的,而对于时序数据来说,时间必然是最好的维度,而 ILM 也是以时间为转换的衡量单位。比如下面这张转换的示意图,即默认是 Hot 阶段,在索引创建 3 天后转为 Warm 阶段,7 天后转为 Cold 阶段,30 天后删除。这个设置的相关字段为 min_age,后文会详细讲解。

ILM 将不同的生命周期管理策略称为 Policy,而所谓的 Policy 是由不同阶段(Phase)的不同动作(Action)组成的。

Action是一系列操作索引的动作,比如 Rollover、Shrink、Force Merge等,不同阶段可用的 Action 不同,详情如下:

  • Hot Phase
    • Rollover 滚动索引操作,可用在索引大小或者文档数达到某设定值时,创建新的索引用于数据读写,从而控制单个索引的大小。这里要注意的一点是,如果启用了 Rollover,那么所有阶段的时间不再以索引创建时间为准,而是以该索引 Rollover 的时间为准。
  • Warm Phase
    • Allocate 设定副本数、修改分片分配规则(如将分片迁移到中等配置的节点上)等
    • Read-Onlly 设定当前索引为只读状态
    • Force Merge 合并 segment 操作
    • Shrink 缩小 shard 分片数
  • Cold Phase
    • Allocate 同上
  • Delete Phase
    • Delete 删除

从上面看下来整体操作还是很简单的,Kibana 也提供了一套 UI 界面来设置这些策略,如下所示:

kibana ilm

从上图看下来 ILM 的设置是不是一目了然呢?

当然,ILM 是有自己的 api 的,比如上面图片对应的 api 请求如下:

PUT /_ilm/policy/test_ilm2
{
    "policy": {
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_age": "30d",
                        "max_size": "50gb"
                    }
                }
            },
            "warm": {
                "min_age": "3d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "warm"
                        },
                        "number_of_replicas": 0
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    },
                    "shrink": {
                        "number_of_shards": 1
                    }
                }
            },
            "cold": {
                "min_age": "7d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "cold"
                        }
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
}

这里不展开讲了,感兴趣的同学可以自行查看官方手册。

现在管理策略(Policy)已经有了,那么如何应用到索引(Index)上面呢?

方法为设定如下的索引配置:

  • index.lifecycle.name 设定 Policy 名称,比如上面的 test_ilm2
  • index.lifecycle.rollover_alias 如果使用了 Rollover,那么还需要指定该别名

修改索引配置可以直接修改(`PUT index_name/_settings)或者通过索引模板(Index Template)来实现。

我们这里不展开讲了,大家参考下面的实战就明白了。

4. 实战

下面我们来实际演练一把!

目标

现在需要收集 nginx 日志,只需保留最近30天的日志,但要保证最近7天的日志有良好的查询性能,搜索响应时间在 100ms 以内。

为了让大家可以快速看到效果,下面实际操作的时候会将 30天7天 替换为 40秒20秒

ES 集群架构

这里我们简单介绍下这次实战所用 ES 集群的构成。该 ES 集群一共有 3个节点组成,每个节点都有名为 box_type 的属性,如下所示:

GET _cat/nodeattrs?s=attr
es01_hot  172.24.0.5 172.24.0.5 box_type          hot
es02_warm 172.24.0.4 172.24.0.4 box_type          warm
es03_cold 172.24.0.3 172.24.0.3 box_type          cold

由上可见我们有 1 个 hot 节点、1 个 warm 节点、1 个 cold 节点,分别用于对应 ILM 的阶段,即 Hot 阶段的索引都位于 hot 上,Warm 阶段的索引都位于 warm 上,Cold 阶段的索引都位于 cold 上。

创建 ILM Policy

根据要求,我们的 Policy 设定如下:

  • 索引名以 nginx_logs 为前缀,且以每10个文档做一次 Rollover
  • Rollover 后 5 秒转为 Warm 阶段
  • Rollover 后 20 秒转为 Cold 阶段
  • Rollover 后 40 秒删除

API 请求如下:

PUT /_ilm/policy/nginx_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_docs": "10"
          }
        }
      },
      "warm": {
        "min_age": "5s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "warm"
            }
          }
        }
      },
      "cold": {
        "min_age": "20s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "cold"
            }
          }
        }
      },
      "delete": {
        "min_age": "40s",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建 Index Template

我们基于索引模板来创建所需的索引,如下所示:

PUT /_template/nginx_ilm_template
{
  "index_patterns": ["nginx_logs-*"],                 
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.lifecycle.name": "nginx_ilm_policy",      
    "index.lifecycle.rollover_alias": "nginx_logs",
    "index.routing.allocation.include.box_type": "hot"
  }
}

上述配置解释如下:

  • index.lifecycle.name 指明该索引应用的 ILM Policy
  • index.lifecycle.rollover_alias 指明在 Rollover 的时候使用的 alias
  • index.routing.allocation.include.box_type 指明新建的索引都分配在 hot 节点上

创建初始索引 Index

ILM 的第一个索引需要我们手动来创建,另外启动 Rollover 必须以数值类型结尾,比如 nginx_logs-000001。索引创建的 api 如下:

PUT nginx_logs-000001
{
  "aliases": {
    "nginx_logs": {
      "is_write_index":true
    }
  }
}

此时索引分布如下所示:

修改 ILM Polling Interval

ILM Service 会在后台轮询执行 Policy,默认间隔时间为 10 分钟,为了更快地看到效果,我们将其修改为 1 秒。

PUT _cluster/settings
{
  "persistent": {
    "indices.lifecycle.poll_interval":"1s"
  }
}

开始吧

一切准备就绪,我们开始吧!

首先执行下面的新建文档操作 10 次。

POST nginx_logs/_doc
{
  "name":"abbc"
}

之后 Rollover 执行,新的索引创建,如下所示:

5 秒后,nginx_logs-000001 转到 Warm 阶段

15 秒后(20 秒是指距离 Rollover 的时间,因为上面已经过去5秒了,所以这里只需要15秒),nginx_logs-00001转到 Cold 阶段

25 秒后,nginx_logs-00001删除

至此,一个完整的 ILM Policy 执行的流程就结束了,而后续 nginx_logs-000002 也会按照这个设定进行流转。

总结

ILM 是 Elastic 团队将多年 Elasticsearch 在日志场景领域的最佳实践进行的一次总结归纳和落地实施,极大地降低了用好 Elasticsearch 的门槛。掌握了 ILM 的核心概念,也就意味着掌握了 Elasticsearch 的最佳实践。希望本文能对大家入门 ILM 有所帮助。

在线研讨会

一篇文章所能承载的信息量和演示效果终究是有限的,2月份我们会针对 ILM 做一次线上研讨会,感兴趣的同学可以点击下面的链接注册报名。

https://jinshuju.net/f/TjBbvx

为了提高研讨会的质量,我们本次引入了审核机制,报名的同学请耐心等待,待我们审核通过后,您会收到我们的邮件邀请。

参考资料:

  1. https://www.elastic.co/guide/en/elasticsearch/reference/6.6/index-lifecycle-management.html
  2. https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x

ET007 ElasticStack 6.5 介绍

ElasticsearchLeon J 发表了文章 • 1 个评论 • 915 次浏览 • 2018-11-19 09:18 • 来自相关话题

就在 11月14日,ElasticStack 6.5.0 发布了,此次发布带来了许多激动人心的特性,我们一起来体验一下:

WX20181118-120551@2x

如果没有任何数据,kibana会提示我们导入sample数据,这边我选择Try our sample data, 然后导入全部3个样例数据,这可以让我们在没有数据的情况下快速体验新特性。

Infrastructure & Logs UI

很多用户使用 ElasticStack 收集基础架构的日志和指标,比如系统日志、安全日志、CPU指标,内存指标等等。在6.5中,kibana 侧边栏中增加了 Infrastructure 和 Logs 两个新的 tab,让用户更简单地查看自己的基础架构,和每台主机或者容器里的日志。

logs

进入logs标签页,如果当前没有数据,kibana会引导我们添加数据

WX20181118-121032@2x

我们选择 system logs

WX20181118-121047@2x

根据指示,我们安装部署好filebeat并启动,再次进入 logs 标签页便可以看到收集到的系统日志了

image-20181118185158451

  1. 搜索过滤框:在这里可以像在 discover 里一样写query string,并且会有输入提示
  2. 时间选择框:可以选择需要查看的时间点,如果点了 Stream live,会持续监听尾部新输出的日志内容,类似 linux 命令中的tail -f
  3. 日志时间轴:高亮的部位是当前查看日志所在的时间范围,对应的区域图标识了日志量

假如我想实现 tail -f /var/log/system.log | grep google.com 一样的效果,可以打开 Stream live,并在搜索过滤框中这样输入:

WX20181118-173432@2x

很简单,很方便有木有?

Infrastructure

同样在kibana的引导下安装 Metric beat,并开启system模块,启动后进入 infrastructure 标签页:

image-20181118190614385

这里可以直观地看到所有基础架构的指标状况,深色的内层代表主机,颜色代表了健康状况。浅灰色的外层代表了group,因为我只在自己的笔记本上做了部署,所以只能看到一个host。

image-20181118191527060

点击主机会弹出菜单

  • View logs : 跳转到 logs 标签页,并通过搜索过滤框指定host,只查看这台主机的日志。
  • View metrics : 跳转到这台主机的指标详情,可以查看历史数据 shoot

APM

Java 和 Go

不负众望,继 Nodejs、Python、Ruby、Javascript 之后,Elastic APM 5.6.0 新增了对 Java 和 Golang 的支持!

Distributed Tracing

在 SOA 和 MSA 大行其道的年代,如何追踪请求在各个系统之间的流动成为了apm的关键问题。

Elastic APM 支持 OpenTracing 标准,并在各个agent里内置了 OpenTracing 兼容的bridge

以下是官网上该特性的截图:

distributed_tracing

APM Server 监控

如 ElasticStack的其他产品一般,APM也支持了监控,并可以在 Kinbana Montoring下查看监控信息:

apm_monitoring

APM Server 内存占用优化

通过新的基于NDJSON的协议,agent可以在采集信息后通过事件流立即发往APM server,这样 APM Server可以一个接一个地处理接收到的事件,而不是一次性地收到一大块(chunk),这样在很大程度上减少了APM Server的内存占用。

Elasticsearch

Cross-cluster replication

这里的副本并非我们平时常见的分片副本,而是通过在集群B配置一个副本indexB来追随集群A中的indexA,indexA中发生的任何变化都会同步到indexB中来。另外也可以配置一个pattern,当集群A出现符合pattern的索引,自动在集群B创建他的副本,这听起来很酷。值得一提的是,这将是白金版里新增的一个特性。

Minimal Snapshots

snapshot 是 es 中用来创建索引副本的特性,在之前的版本中,snapshot会把完整的 index 都保存下来,包括原始数据和索引数据等等。新的 Minimal Snapshots 提供了一种只备份 _source 内容和 index metadata,当需要恢复时,需要通过 reindex 操作来完成。最小快照最多可能帮你节省50%的磁盘占用,但是会花费更多的时间来恢复。这个特性可能并不适合所有人,但给恢复窗口比较长,且磁盘容量有限的用户多了一种选择。

SQL / ODBC

现在可以使用 支持 ODBC 的第三方工具来连接 elasticsearch 了!我想可以找时间试试用 tableau 直连 elasticsearch会是啥效果。

Java 11

Java11 是一个 LTS 版本,相信会有越来越多的用户升级到 java11

G1GC支持

经过无数的测试,Elasticsearch官方宣布了在 JDK 10+ 上支持 G1GC。G1GC 相比 CMS有诸多优势,如今可以放心地使用G1GC了。(期待对ZGC的支持!)

Authorization realm

X-Pack Security中的新特性,可以对用户认证和用户授权分别配置 realm,比如使用内置的用户体系来认证,再去ldap中获取用户的角色、权限等信息。这也是白金版新增的特性。

机器学习的新特性

  • 支持在同一个机器学习任务中分析多个时间系列
  • 为机器学习任务添加了新的多分桶(multi-bucket) 分析

Kibana

Canvas

Canvas ! 我在做数据分析师的同学看到之后说太酷了,像 PPT。

点击侧边栏的 canvas 标签,可以看到我们先前导入的样本数据也包含了 canvas 样例:

WX20181118-210126@2x

在 11月的 深圳开发者大会上,上海普翔 也用 canvas 对填写调查问卷的参会人员做了分析:

UNADJUSTEDNONRAW_thumb_1adc

https://github.com/alexfrancoeur/kibana_canvas_examples 这里有很多非常不错的 canvas 样例供大家学习,把json文件直接拖到 canvas 页面就可以导入学习了!

Spaces

把 kibana 对象(比如 visualizations、dashboards)组织到独立的 space 里,并且通过 RBAC 来控制哪些用户可以访问哪些 space。这实在是太棒了,想象在一个企业里,多个部门通过kibana查询、分析数据,大家关注的dashboard肯定是不一样的,在6.5之前,我们只能通过社区插件来实现这样的需求,而大版本的升级可能直接导致插件不可用,有了 Space,我们不必再担心!

image-20181118212404768

Rollups UI

Rollup 是 es6.4 中新增的一个特性,用来把一些历史数据压缩归档,用作以后的分析。6.5.0 中 kibana 增加了一个界面用来查看和管理 Rollup 任务。

image9

Data visualizer for files

通过可视化的方式查看文件的结构,查看其中出现最频繁的内容:

highlights_6_5_viz-logs

Beats

Beats Central Management

Beats 终于也支持中心化配置管理了!我们只需按照往常一样安装filebeat、metricbeat,然后使用 filebeat enroll <kibana-url> <token>,便可以通过kibana来管理beats的配置、甚至给他们打上tag:

Image from iOS

想一想,假如我们在上千台机器上部署filebeat,如果哪天需要批量变更配置文件,只需要通过脚本调用配置管理的API就可以了

Functionbeat

Functionbeat是一种新的beat类型,可以被部署为一个方法,而不需要跑在服务器环境上,比如 AWS Lambda function。

以上就是 6.5.0 版本的主要特性,更详细的内容可以查看 https://www.elastic.co/blog/elastic-stack-6-5-0-released ,希望通过我的介绍,可以让大家了解到新版本所带来的激动人心的特性。

Image from iOS

你看懂 Elasticsearch Log 中的 GC 日志了吗?

Elasticsearchrockybean 发表了文章 • 0 个评论 • 858 次浏览 • 2018-09-22 23:29 • 来自相关话题

如果你关注过 elasticsearch 的日志,可能会看到如下类似的内容:

[2018-06-30T17:57:23,848][WARN ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][228384] overhead, spent [2.2s] collecting in the last [2.3s]

[2018-06-30T17:57:29,020][INFO ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][old][228385][160772] duration [5s], collections [1]/[5.1s], total [5s]/[4.4d], memory [945.4mb]->[958.5mb]/[1007.3mb], all_pools {[young] [87.8mb]->[100.9mb]/[133.1mb]}{[survivor] [0b]->[0b]/[16.6mb]}{[old] [857.6mb]->[857.6mb]/[857.6mb]}

看到其中的[gc]关键词你也猜到了这是与 GC 相关的日志,那么你了解每一部分的含义吗?如果不了解,你可以继续往下看了。

我们先从最简单的看起:

  1. 第一部分是日志发生的时间
  2. 第二部分是日志级别,这里分别是WARNINFO
  3. 第三部分是输出日志的类,我们后面也会讲到这个类
  4. 第四部分是当前 ES 节点名称
  5. 第五部分是 gc 关键词,我们就从这个关键词聊起。

友情提示:对 GC 已经了如指掌的同学,可以直接翻到最后看答案。

1. 什么是 GC?

GC,全称是 Garbage Collection (垃圾收集)或者 Garbage Collector(垃圾收集器)。

在使用 C语言编程的时候,我们要手动的通过 mallocfree来申请和释放数据需要的内存,如果忘记释放内存,就会发生内存泄露的情况,即无用的数据占用了宝贵的内存资源。而Java 语言编程不需要显示的申请和释放内存,因为 JVM 可以自动管理内存,这其中最重要的一部分就是 GC,即 JVM 可以自主地去释放无用数据(垃圾)占用的内存。

我们研究 GC 的主要原因是 GC 的过程会有 Stop The World(STW)的情况发生,即此时用户线程会停止工作,如果 STW 的时间过长,则应用的可用性、实时性等就下降的很厉害。

GC主要解决如下3个问题:

  1. 如何找到垃圾?
  2. 如何回收垃圾?
  3. 何时回收垃圾?

我们一个个来看下。

1.1 如何找到垃圾?

所谓垃圾,指的是不再被使用(引用)的对象。Java 的对象都是在堆(Heap)上创建的,我们这里默认也只讨论堆。那么现在问题就变为如何判定一个对象是否还有被引用,思路主要有如下两种:

  1. 引用计数法,即在对象被引用时加1,去除引用时减1,如果引用值为0,即表明该对象可回收了。
  2. 可达性分析法,即通过遍历已知的存活对象(GC Roots)的引用链来标记出所有存活对象

方法1简单粗暴效率高,但准确度不行,尤其是面对互相引用的垃圾对象时无能为力。

方法2是目前常用的方法,这里有一个关键是 GC Roots,它是判定的源头,感兴趣的同学可以自己去研究下,这里就不展开讲了。

1.2 如何回收垃圾?

垃圾找到了,该怎么回收呢?看起来似乎是个很傻的问题。直接收起来扔掉不就好了?!对应到程序的操作,就是直接将这些对象占用的空间标记为空闲不就好了吗?那我们就来看一下这个基础的回收算法:标记-清除(Mark-Sweep)算法。

1.2.1 标记-清除 算法(Mark Sweep)

该算法很简单,使用通过可达性分析分析方法标记出垃圾,然后直接回收掉垃圾区域。它的一个显著问题是一段时间后,内存会出现大量碎片,导致虽然碎片总和很大,但无法满足一个大对象的内存申请,从而导致 OOM,而过多的内存碎片(需要类似链表的数据结构维护),也会导致标记和清除的操作成本高,效率低下,如下图所示:

1.2.2 复制算法(Copying)

为了解决上面算法的效率问题,有人提出了复制算法。它将可用内存一分为二,每次只用一块,当这一块内存不够用时,便触发 GC,将当前存活对象复制(Copy)到另一块上,以此往复。这种算法高效的原因在于分配内存时只需要将指针后移,不需要维护链表等。但它最大的问题是对内存的浪费,使用率只有 50%。

但这种算法在一种情况下会很高效:Java 对象的存活时间极短。据 IBM 研究,Java 对象高达 98% 是朝生夕死的,这也意味着每次 GC 可以回收大部分的内存,需要复制的数据量也很小,这样它的执行效率就会很高。

1.2.3 标记-整理算法(Mark Compact)

该算法解决了第1中算法的内存碎片问题,它会在回收阶段将所有内存做整理,如下图所示:

但它的问题也在于增加了整理阶段,也就增加了 GC 的时间。

1.2.4 分代收集算法(Generation Collection)

既然大部分 Java 对象是朝生夕死的,那么我们将内存按照 Java 生存时间分为 新生代(Young)老年代(Old),前者存放短命僧,后者存放长寿佛,当然长寿佛也是由短命僧升级上来的。然后针对两者可以采用不同的回收算法,比如对于新生代采用复制算法会比较高效,而对老年代可以采用标记-清除或者标记-整理算法。这种算法也是最常用的。JVM Heap 分代后的划分一般如下所示,新生代一般会分为 Eden、Survivor0、Survivor1区,便于使用复制算法。

将内存分代后的 GC 过程一般类似下图所示:

  1. 对象一般都是先在 Eden区创建
  2. Eden区满,触发 Young GC,此时将 Eden中还存活的对象复制到 S0中,并清空 Eden区后继续为新的对象分配内存
  3. Eden区再次满后,触发又一次的 Young GC,此时会将 EdenS0中存活的对象复制到 S1中,然后清空EdenS0后继续为新的对象分配内存
  4. 每经过一次 Young GC,存活下来的对象都会将自己存活次数加1,当达到一定次数后,会随着一次 Young GC 晋升到 Old
  5. Old区也会在合适的时机进行自己的 GC

1.2.5 常见的垃圾收集器

前面我们讲了众多的垃圾收集算法,那么其具体的实现就是垃圾收集器,也是我们实际使用中会具体用到的。现代的垃圾收集机制基本都是分代收集算法,而 YoungOld区分别有不同的垃圾收集器,简单总结如下图:

从上图我们可以看到 YoungOld区有不同的垃圾收集器,实际使用时会搭配使用,也就是上图中两两连线的收集器是可以搭配使用的。这些垃圾收集器按照运行原理大概可以分为如下几类:

  • Serial GC串行,单线程的收集器,运行 GC 时需要停止所有的用户线程,且只有一个 GC 线程
  • Parallel GC并行,多线程的收集器,是 Serial 的多线程版,运行时也需要停止所有用户线程,但同时运行多个 GC 线程,所以效率高一些
  • Concurrent GC并发,多线程收集器,GC 分多阶段执行,部分阶段允许用户线程与 GC 线程同时运行,这也就是并发的意思,大家要和并行做一个区分。
  • 其他

我们下面简单看一下他们的运行机制。

1.2.5.1 Serial GC

该类 Young区的为 Serial GCOld区的为Serial Old GC。执行大致如下所示:

1.2.5.2 Parallel GC

该类Young 区的有 ParNewParallel ScavengeOld 区的有Parallel Old。其运行机制如下,相比 Serial GC ,其最大特点在于 GC 线程是并行的,效率高很多:

1.2.5.3 Concurrent Mark-Sweep GC

该类目前只是针对 Old 区,最常见就是CMS GC,它的执行分为多个阶段,只有部分阶段需要停止用户进程,这里不详细介绍了,感兴趣可以去找相关文章来看,大体执行如下:

1.2.5.4 其他

目前最新的 GC 有G1GCZGC,其运行机制与上述均不相同,虽然他们也是分代收集算法,但会把 Heap 分成多个 region 来做处理,这里不展开讲,感兴趣的可以参看最后参考资料的内容。

1.2.6 Elasticsearch 的 GC 组合

Elasticsearch 默认的 GC 配置是CMS GC ,其 Young 区ParNewOld 区CMS,大家可以在 config/jvm.options中看到如下的配置:

## GC configuration
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly

1.3 何时进行回收?

现在我们已经知道如何找到和回收垃圾了,那么什么时候回收呢?简单总结如下:

  1. Young 区的GC 都是在 Eden 区满时触发
  2. Serial Old 和 Parallel Old 在 Old 区是在 Young GC 时预测Old 区是否可以为 young 区 promote 到 old 区 的 object 分配空间,如果不可用则触发 Old GC。这个也可以理解为是 Old区满时。
  3. CMS GC 是在 Old 区大小超过一定比例后触发,而不是 Old 区满。这个原因在于 CMS GC 是并发的算法,也就是说在 GC 线程收集垃圾的时候,用户线程也在运行,因此需要预留一些 Heap 空间给用户线程使用,防止由于无法分配空间而导致 Full GC 发生。

2. GC Log 如何阅读?

前面讲了这么多,终于可以回到开篇的问题了,我们直接来看答案

[2018-06-30T17:57:23,848][WARN ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][228384] overhead, spent [2.2s] collecting in the last [2.3s]

[gc][这是第228384次GC 检查] 在最近 2.3 s 内花了 2.2s 用来做垃圾收集,这占比似乎有些过了,请抓紧来关注下。

[2018-06-30T17:57:29,020][INFO ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][old][228385][160772] duration [5s], collections [1]/[5.1s], total [5s]/[4.4d], memory [945.4mb]->[958.5mb]/[1007.3mb], all_pools {[young] [87.8mb]->[100.9mb]/[133.1mb]}{[survivor] [0b]->[0b]/[16.6mb]}{[old] [857.6mb]->[857.6mb]/[857.6mb]}

我们直接来看具体的含义好了,相信有了前面的 GC 基础知识,大家在看这里解释的时候就非常清楚了。

  • [gc][本次是 old GC][这是第228385次 GC 检查][从 JVM 启动至今发生的第 160772次 GC]

  • duration [本次检查到的 GC 总耗时 5 秒,可能是多次的加和],

  • collections [从上次检查至今总共发生1次 GC]/[从上次检查至今已过去 5.1 秒],

  • total [本次检查到的 GC 总耗时为 5 秒]/[从 JVM 启动至今发生的 GC 总耗时为 4.4 天],

  • memory [ GC 前 Heap memory 空间]->[GC 后 Heap memory 空间]/[Heap memory 总空间],

  • all_pools(分代部分的详情) {[young 区][GC 前 Memory ]->[GC后 Memory]/[young区 Memory 总大小] } {[survivor 区][GC 前 Memory ]->[GC后 Memory]/[survivor区 Memory 总大小] }{[old 区][GC 前 Memory ]->[GC后 Memory]/[old区 Memory 总大小] }

3. 看看源码

从日志中我们可以看到输出这些日志的类名叫做JvmGcMonitorService,我们去源码中搜索很快会找到它/Users/rockybean/code/elasticsearch/core/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java,这里就不详细展开讲解源码了,它执行的内容大概如下图所示:

关于打印日志的格式在源码也有,如下所示:

private static final String SLOW_GC_LOG_MESSAGE =
"[gc][{}][{}][{}] duration [{}], collections [{}]/[{}], total [{}]/[{}], memory [{}]->[{}]/[{}], all_pools {}";
private static final String OVERHEAD_LOG_MESSAGE = "[gc][{}] overhead, spent [{}] collecting in the last [{}]";

另外细心的同学会发现输出的日志中 gc 只分了 young 和 old ,原因在于 ES 对 GC Name 做了封装,封装的类为:org.elasticsearch.monitor.jvm.GCNames,相关代码如下:

    public static String getByMemoryPoolName(String poolName, String defaultName) {
        if ("Eden Space".equals(poolName) || "PS Eden Space".equals(poolName) || "Par Eden Space".equals(poolName) || "G1 Eden Space".equals(poolName)) {
            return YOUNG;
        }
        if ("Survivor Space".equals(poolName) || "PS Survivor Space".equals(poolName) || "Par Survivor Space".equals(poolName) || "G1 Survivor Space".equals(poolName)) {
            return SURVIVOR;
        }
        if ("Tenured Gen".equals(poolName) || "PS Old Gen".equals(poolName) || "CMS Old Gen".equals(poolName) || "G1 Old Gen".equals(poolName)) {
            return OLD;
        }
        return defaultName;
    }

    public static String getByGcName(String gcName, String defaultName) {
        if ("Copy".equals(gcName) || "PS Scavenge".equals(gcName) || "ParNew".equals(gcName) || "G1 Young Generation".equals(gcName)) {
            return YOUNG;
        }
        if ("MarkSweepCompact".equals(gcName) || "PS MarkSweep".equals(gcName) || "ConcurrentMarkSweep".equals(gcName) || "G1 Old Generation".equals(gcName)) {
            return OLD;
        }
        return defaultName;
    }

在上面的代码中,你会看到很多我们在上一节中提到的 GC 算法的名称。

至此,源码相关部分也讲解完毕,感兴趣的大家可以自行去查阅。

4. 总结

讲解 GC 的文章已经很多,本文又唠唠叨叨地讲一遍基础知识,是希望对于第一次了解 GC 的同学有所帮助。因为只有了解了这些基础知识,你才不至于被这些 GC 的输出吓懵。希望本文对你理解 ES 的 GC 日志 有所帮助。

5. 参考资料

  1. Java Hotspot G1 GC的一些关键技术(https://mp.weixin.qq.com/s/4ufdCXCwO56WAJnzng_-ow
  2. Understanding Java Garbage Collection(https://www.cubrid.org/blog/understanding-java-garbage-collection
  3. 《深入理解Java虚拟机:JVM高级特性与最佳实践》

6. 相关推荐

如果你想深入的了解 JAVA GC 的知识,可以关注 ElasticTalk 公众号,回复 GC关键词后即可获取作者推荐的电子书等资料。

elasticTalk,qrcode

通过 metadata 使logstash配置更简洁

LogstashLeon J 发表了文章 • 0 个评论 • 738 次浏览 • 2018-09-04 13:17 • 来自相关话题

从Logstash 1.5开始,我们可以在logstash配置中使用metadata。metadata不会在output中被序列化输出,这样我们便可以在metadata中添加一些临时的中间数据,而不需要去删除它。

我们可以通过以下方式来访问metadata:

[@metadata][foo]

用例

假设我们有这样一条日志:

[2017-04-01 22:21:21] production.INFO: this is a test log message by leon

我们可以在filter中使用grok来做解析:

grok {
      match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{DATA:env}\.%{DATA:log_level}: %{DATA:content}" }
    }

解析的结果为

{
      "env" => "production",
      "timestamp" => "2017-04-01 22:21:21",
      "log_level" => "INFO",
      "content" => "{\"message\":\"[2017-04-01 22:21:21] production.INFO: this is a test log message by leon\"}"
}

假设我们希望

  1. 能把log_level为INFO的日志丢弃掉,但又不想让该字段出现在最终的输出中
  2. 输出的索引名中能体现出env,但也不想让该字段出现在输出结果里

对于1,一种方案是在输出之前通过mutate插件把不需要的字段删除掉,但是一旦这样的处理多了,会让配置文件变得“不干净”。

通过 metadata,我们可以轻松地处理这些问题:

grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{DATA:[@metadata][env]}\.%{DATA:[@metadata][log_level]}: %{DATA:content}" }
}

if [@metadata][log_level] == "INFO"{
    drop{}    
}

output{
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "%{[@metadata][env]}-log-%{+YYYY.MM}"
        document_type => "_doc"
    }
}

除了简化我们的配置文件、减少冗余字段意外,同时也能提高logstash的处理速度。

Elasticsearch input插件

有些插件会用到metadata这个特性,比如elasticsearch input插件:

input {
  elasticsearch {
    host => "127.0.0.1"
    # 把 ES document metadata (_index, _type, _id) 包存到 @metadata 中
    docinfo_in_metadata => true
  }
}

filter{
    ......
}

output {
  elasticsearch {
    document_id => "%{[@metadata][_id]}"
    index => "transformed-%{[@metadata][_index]}"
    type => "%{[@metadata][_type]}"
  }
}

调试

一般来说metadata是不会出现在输出中的,除非使用 rubydebug codec 的方式输出:

output { 
  stdout { 
    codec  => rubydebug {
      metadata => true
    }
  }
}

日志经过处理后输出中会包含:

{
    ....,
    "@metadata" => {
        "env" => "production",
        "log_level" => "INFO"
    }
}

总结

由上可见,metadata提供了一种简单、方便的方式来保存中间数据。这样一方面减少了logstash配置文件的复杂性:避免调用remove_field,另一方面也减少了输出中的一些不必要的数据。通过这篇对metadata的介绍,希望能对大家有所帮助。

elasticTalk,qrcode

Curator从入门到实战

ElasticsearchLeon J 发表了文章 • 2 个评论 • 1722 次浏览 • 2018-08-30 22:05 • 来自相关话题

Curator 是elasticsearch 官方的一个索引管理工具,可以通过配置文件的方式帮助我们对指定的一批索引进行创建/删除、打开/关闭、快照/恢复等管理操作。

场景

比如,出于读写性能的考虑,我们通常会把基于时间的数据按时间来创建索引。

indices当数据量到达一定量级时,为了节省内存或者磁盘空间,我们往往会根据实际情况选择关闭或者删除一定时间之前的索引。通常我们会写一段脚本调用elasticsearch的api,放到crontab中定期执行。这样虽然可以达到目的,但是脚本多了之后会变得难以维护。

Curator是如何解决这类问题的呢?我们一步一步来:

安装

首先,Curator是基于python实现的,我们可以直接通过pip来安装,这种方式最简单。

pip install elasticsearch-curator

基本配置

接下来,需要为 Curator 配置es连接:

# ~/.curator/curator.yml

client:
  hosts:
    - 127.0.0.1
  port: 9200

logging:
  loglevel: INFO

其中hosts 允许配置多个地址,但是只能属于同一个集群。

这边只列举了最基本的配置,官方文档中包含了更详细的配置。

动作配置

然后需要配置我们需要执行的动作,每个动作会按顺序执行:

# /etc/curator/actions/maintain_log.yml

actions:
  1:
    #创建第二天的索引
    action: create_index
    description: "create new time-based index for log-*"
    options:
      name: '<log-{now/d+1d}>'
  2:
    #删除3天前的索引
    action: delete_indices
    description: "delete outdated indices for log-*"
    filters:
    - filtertype: pattern
      kind: prefix
      value: log
    - filtertype: age
      source: name
      direction: older
      timestring: '%Y.%m.%d'
      unit: days
      unit_count: 3

action 定义了需要执行的动作,curator支持十多种动作,可以在官方文档查看完整的动作列表。

options 定义了执行动作所需的参数,不同动作的参数也不尽相同,具体文档中都有写明。

filters 定义了动作的执行对象,通过设置filter,可以过滤出我们需要操作的索引。同一个action下的filter之间是的关系。比如在上面的定义中,delete_indices下定义了两个filters:

  • 模式匹配:匹配前缀为log的索引
  • “年龄”匹配:根据索引名中“%Y.%m.%d”时间格式,过滤出3天以前的索引

curator支持十多种filter,可以在官方文档查看完整列表。

执行

最后,我们通过curator命令行工具来执行:

curator --config /etc/curator/curator.yml /etc/curator/actions/maintain_log.yml

得到命令行输出:

2018-08-30 12:31:26,829 INFO      Preparing Action ID: 1, "create_index"
2018-08-30 12:31:26,841 INFO      Trying Action ID: 1, "create_index": create new time-based index for log-*
2018-08-30 12:31:26,841 INFO      "<log-{now/d+1d}>" is using Elasticsearch date math.
2018-08-30 12:31:26,841 INFO      Creating index "<log-{now/d+1d}>" with settings: {}
2018-08-30 12:31:27,049 INFO      Action ID: 1, "create_index" completed.
2018-08-30 12:31:27,050 INFO      Preparing Action ID: 2, "delete_indices"
2018-08-30 12:31:27,058 INFO      Trying Action ID: 2, "delete_indices": delete outdated indices for log-*
2018-08-30 12:31:27,119 INFO      Deleting selected indices: ['log-2018.08.24', 'log-2018.08.25', 'log-2018.08.27', 'log-2018.08.26', 'log-2018.08.23']
2018-08-30 12:31:27,119 INFO      ---deleting index log-2018.08.24
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.25
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.27
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.26
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.23
2018-08-30 12:31:27,282 INFO      Action ID: 2, "delete_indices" completed.
2018-08-30 12:31:27,283 INFO      Job completed.

从日志中可以看到,我们已经成功创建了隔天的索引,并删除了28号以前的索引。

定时执行

配置好curator后,还需要配置定时任务

使用crontab -e编辑crontab,

添加一行:

0 23 * * * /usr/local/bin/curator --config /root/.curator/curator.yml /etc/curator/actions/maintain_log.yml >> /var/curator.log 2>&1

crontab配置中的第一段是执行的周期,6个值分别是“分 时 日 月 周”,*表示全部。所以这段配置的含义是在每天23点执行我们的这段脚本。

单个执行

除了定时任务,我们也可以在不依赖action配置文件的情况下用curator执行一些临时的批量操作。curator提供了curator_cli的命令来执行单个action,比如我们想对所有log开头的索引做快照,使用一条命令即可完成:

curator_cli snapshot --repository repo_name --filter_list {"filtertype": "pattern","kind": "prefix", "value": "log"}

是不是特别方便?

执行流程

image-20180830200126973

在命令执行过程中,Curator 会进行以下几步操作:

  1. 从ES拉取所有的索引信息
  2. 根据设置的过滤条件过滤出需要操作的索引
  3. 对过滤后的索引执行指定的动作

复杂需求

实际生产中,会有一些更复杂的需求,简单的action和filter组合并不能满足我们的业务。Curator还提供了python包,方便我们自己写脚本时调用它提供的actions和filters,减少我们的开发工作量。

以上通过一个实际的场景向大家介绍了Curator的使用方式,但是只用到了它一小部分的功能。大家可以通过文中的链接查看官方文档,发掘出更多的使用姿势。希望对大家有所帮助!

elasticTalk,qrcode

听说你还没掌握 Normalizer 的使用方法?

Elasticsearchrockybean 发表了文章 • 1 个评论 • 413 次浏览 • 2018-08-28 12:43 • 来自相关话题

在 Elasticsearch 中处理字符串类型的数据时,如果我们想把整个字符串作为一个完整的 term 存储,我们通常会将其类型 type 设定为 keyword。但有时这种设定又会给我们带来麻烦,比如同一个数据再写入时由于没有做好清洗,导致大小写不一致,比如 appleApple两个实际都是 apple,但当我们去搜索 apple时却无法返回 Apple的文档。要解决这个问题,就需要 Normalizer出场了。废话不多说,直接上手看!

1. 上手

我们先来重现一下开篇的问题

PUT test_normalizer
{
  "mappings": {
    "doc":{
      "properties": {
        "type":{
          "type":"keyword"
        }
      }
    }
  }
}

PUT test_normalizer/doc/1
{
  "type":"apple"
}

PUT test_normalizer/doc/2
{
  "type":"Apple"
}

# 查询一 
GET test_normalizer/_search
{
  "query": {
    "match":{
      "type":"apple"
    }
  }
}

# 查询二
GET test_normalizer/_search
{
  "query": {
    "match":{
      "type":"aPple"
    }
  }
}

大家执行后会发现 查询一返回了文档1,而 查询二没有文档返回,原因如下图所示:

  1. Docs写入 Elasticsearch时由于 typekeyword,分词结果为原始字符串
  2. 查询 Query 时分词默认是采用和字段写时相同的配置,因此这里也是 keyword,因此分词结果也是原始字符
  3. 两边的分词进行匹对,便得出了我们上面的结果

2. Normalizer

normalizerkeyword的一个属性,可以对 keyword生成的单一 Term再做进一步的处理,比如 lowercase,即做小写变换。使用方法和自定义分词器有些类似,需要自定义,如下所示:

DELETE test_normalizer
# 自定义 normalizer
PUT test_normalizer
{
  "settings": {
    "analysis": {
      "normalizer": {
        "lowercase": {
          "type": "custom",
          "filter": [
            "lowercase"
          ]
        }
      }
    }
  },
  "mappings": {
    "doc": {
      "properties": {
        "type": {
          "type": "keyword"
        },
        "type_normalizer": {
          "type": "keyword",
          "normalizer": "lowercase"
        }
      }
    }
  }
}

PUT test_normalizer/doc/1
{
  "type": "apple",
  "type_normalizer": "apple"
}

PUT test_normalizer/doc/2
{
  "type": "Apple",
  "type_normalizer": "Apple"
}
# 查询三
GET test_normalizer/_search
{
  "query": {
    "term":{
      "type":"aPple"
    }
  }
}

# 查询四
GET test_normalizer/_search
{
  "query": {
    "term":{
      "type_normalizer":"aPple"
    }
  }
}

我们第一步是自定义了名为 lowercase的 normalizer,其中filter 类似自定义分词器中的 filter ,但是可用的种类很少,详情大家可以查看官方文档。然后通过 normalizer属性设定到字段type_normalizer中,然后插入相同的2条文档。执行发现,查询三无结果返回,查询四返回2条文档。

问题解决了!我们来看下是如何解决的

  1. 文档写入时由于加入了 normalizer,所有的 term都会被做小写处理
  2. 查询时搜索词同样采用有 normalizer的配置,因此处理后的 term也是小写的
  3. 两边分词匹对,就得到了我们上面的结果

3. 总结

本文通过一个实例来给大家讲解了 Normalizer的实际使用场景,希望对大家有所帮助!

掌握 analyze API,一举搞定 Elasticsearch 分词难题

Elasticsearchrockybean 发表了文章 • 1 个评论 • 1332 次浏览 • 2018-08-25 22:44 • 来自相关话题

初次接触 Elasticsearch 的同学经常会遇到分词相关的难题,比如如下这些场景:

  1. 为什么明明有包含搜索关键词的文档,但结果里面就没有相关文档呢?
  2. 我存进去的文档到底被分成哪些词(term)了?
  3. 我得自定义分词规则,但感觉好麻烦呢,无从下手

如果你遇到过类似的问题,希望本文可以解决你的疑惑。

1. 上手

让我们从一个实例出发,如下创建一个文档:

PUT test/doc/1
{
  "msg":"Eating an apple a day keeps doctor away"
}

然后我们做一个查询,我们试图通过搜索 eat这个关键词来搜索这个文档

POST test/_search
{
  "query":{
    "match":{
      "msg":"eat"
    }
  }
}

ES的返回结果为0。这不太对啊,我们用最基本的字符串查找也应该能匹配到上面新建的文档才对啊!

各位不要急,我们先来看看什么是分词。

2. 分词

搜索引擎的核心是倒排索引(这里不展开讲),而倒排索引的基础就是分词。所谓分词可以简单理解为将一个完整的句子切割为一个个单词的过程。在 es 中单词对应英文为 term。我们简单看个例子:

ES 的倒排索引即是根据分词后的单词创建,即 北京天安门这4个单词。这也意味着你在搜索的时候也只能搜索这4个单词才能命中该文档。

实际上 ES 的分词不仅仅发生在文档创建的时候,也发生在搜索的时候,如下图所示:

读时分词发生在用户查询时,ES 会即时地对用户输入的关键词进行分词,分词结果只存在内存中,当查询结束时,分词结果也会随即消失。而写时分词发生在文档写入时,ES 会对文档进行分词后,将结果存入倒排索引,该部分最终会以文件的形式存储于磁盘上,不会因查询结束或者 ES 重启而丢失。

ES 中处理分词的部分被称作分词器,英文是Analyzer,它决定了分词的规则。ES 自带了很多默认的分词器,比如StandardKeywordWhitespace等等,默认是 Standard。当我们在读时或者写时分词时可以指定要使用的分词器。

3. 写时分词结果

回到上手阶段,我们来看下写入的文档最终分词结果是什么。通过如下 api 可以查看:

POST test/_analyze
{
  "field": "msg",
  "text": "Eating an apple a day keeps doctor away"
}

其中 test为索引名,_analyze 为查看分词结果的 endpoint,请求体中 field 为要查看的字段名,text为具体值。该 api 的作用就是请告诉我在 test 索引使用 msg 字段存储一段文本时,es 会如何分词。

返回结果如下:

{
  "tokens": [
    {
      "token": "eating",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    },
    {
      "token": "an",
      "start_offset": 7,
      "end_offset": 9,
      "type": "<ALPHANUM>",
      "position": 1
    },
    {
      "token": "apple",
      "start_offset": 10,
      "end_offset": 15,
      "type": "<ALPHANUM>",
      "position": 2
    },
    {
      "token": "a",
      "start_offset": 16,
      "end_offset": 17,
      "type": "<ALPHANUM>",
      "position": 3
    },
    {
      "token": "day",
      "start_offset": 18,
      "end_offset": 21,
      "type": "<ALPHANUM>",
      "position": 4
    },
    {
      "token": "keeps",
      "start_offset": 22,
      "end_offset": 27,
      "type": "<ALPHANUM>",
      "position": 5
    },
    {
      "token": "doctor",
      "start_offset": 28,
      "end_offset": 34,
      "type": "<ALPHANUM>",
      "position": 6
    },
    {
      "token": "away",
      "start_offset": 35,
      "end_offset": 39,
      "type": "<ALPHANUM>",
      "position": 7
    }
  ]
}

返回结果中的每一个 token即为分词后的每一个单词,我们可以看到这里是没有 eat 这个单词的,这也解释了在上手中我们搜索 eat 没有结果的情况。如果你去搜索 eating ,会有结果返回。

写时分词器需要在 mapping 中指定,而且一经指定就不能再修改,若要修改必须新建索引。如下所示我们新建一个名为ms_english 的字段,指定其分词器为 english

PUT test/_mapping/doc
{
  "properties": {
    "msg_english":{
      "type":"text",
      "analyzer": "english"
    }
  }
}

4. 读时分词结果

由于读时分词器默认与写时分词器默认保持一致,拿 上手 中的例子,你搜索 msg 字段,那么读时分词器为 Standard ,搜索 msg_english 时分词器则为 english。这种默认设定也是非常容易理解的,读写采用一致的分词器,才能尽最大可能保证分词的结果是可以匹配的。

然后 ES 允许读时分词器单独设置,如下所示:

POST test/_search
  {
    "query":{
      "match":{
        "msg":{
          "query": "eating",
          "analyzer": "english"
        }
      }
    }
  }

如上 analyzer 字段即可以自定义读时分词器,一般来讲不需要特别指定读时分词器。

如果不单独设置分词器,那么读时分词器的验证方法与写时一致;如果是自定义分词器,那么可以使用如下的 api 来自行验证结果。

POST _analyze
  {
    "text":"eating",
    "analyzer":"english"
  }

返回结果如下:

{
  "tokens": [
    {
      "token": "eat",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    }
  ]
}

由上可知 english分词器会将 eating处理为 eat,大家可以再测试下默认的 standard分词器,它没有做任何处理。

5. 解释问题

现在我们再来看下 上手 中所遇问题的解决思路。

  1. 查看文档写时分词结果
  2. 查看查询关键词的读时分词结果
  3. 匹对两者是否有命中

我们简单分析如下:

由上图可以定位问题的原因了。

6. 解决需求

由于 eating只是 eat的一个变形,我们依然希望输入 eat时可以匹配包含 eating的文档,那么该如何解决呢?

答案很简单,既然原因是在分词结果不匹配,那么我们就换一个分词器呗~ 我们可以先试下 ES 自带的 english分词器,如下:

# 增加字段 msg_english,与 msg 做对比
PUT test/_mapping/doc
{
  "properties": {
    "msg_english":{
      "type":"text",
      "analyzer": "english"
    }
  }
}

# 写入相同文档
PUT test/doc/1
{
  "msg":"Eating an apple a day keeps doctor away",
  "msg_english":"Eating an apple a day keeps doctor away"
}

# 搜索 msg_english 字段
POST test/_search
{
  "query": {
    "match": {
      "msg_english": "eat"
    }
  }
}

执行上面的内容,我们会发现结果有内容了,原因也很简单,如下图所示:

由上图可见 english分词器会将 eating分词为 eat,此时我们搜索 eat或者 eating肯定都可以匹配对应的文档了。至此,需求解决。

7. 深入分析

最后我们来看下为什么english分词器可以解决我们遇到的问题。一个分词器由三部分组成:char filter、tokenizer 和 token filter。各部分的作用我们这里就不展开了,我们来看下 standardenglish分词器的区别。

从上图可以看出,english分词器在 Token Filter 中和 Standard不同,而发挥主要作用的就是 stemmer,感兴趣的同学可以自行去看起它的作用。

8. 自定义分词

如果我们不使用 english分词器,自定义一个分词器来实现上述需求也是完全可行的,这里不详细讲解了,只给大家讲一个快速验证自定义分词器效果的方法,如下:

POST _analyze
{
  "char_filter": [], 
  "tokenizer": "standard",
  "filter": [
    "stop",
    "lowercase",
    "stemmer"
  ],
  "text": "Eating an apple a day keeps doctor away"
}

通过上面的 api 你可以快速验证自己要定制的分词器,当达到自己需求后,再将这一部分配置加入索引的配置。

至此,我们再看开篇的三个问题,相信你已经心里有答案了,赶紧上手去自行测试下吧!

【视频】ElasticTalk#4 Elastic认证考试那些事儿

资讯动态rockybean 发表了文章 • 2 个评论 • 765 次浏览 • 2018-07-27 07:56 • 来自相关话题

Elastic 在今年6月29日推出了面向 Elasticsearch 工程师的认证考试,官方描述如下: The Elastic Certification Program was created to recognize individuals who have demonstrated a high-level of knowledge, competence and expertise with Elasticsearch. Elastic Certified Professionals demonstrate these skills by completing challenging and relevant real-world tasks on a live Elastic Stack cluster in our hands-on, performance-based certification exams. 我们此次直播便邀请中国第1位通过该认证的工程师 rockybean 来分享下认证考试的一些信息,通过这次直播,你可以了解如下信息:
  1. 如何注册考试?费用?
  2. 如何准备考试?
  3. 考试的形式是怎样的?有哪些类型的考题?
本次直播由于设备问题,声音有些卡顿,大家见谅! 视频链接如下: http://v.qq.com/x/page/f073779epxd.html  

【直播预告】ElasticTalk #4 Elastic 官方认证考试那些事儿

资料分享rockybean 发表了文章 • 0 个评论 • 477 次浏览 • 2018-07-23 08:33 • 来自相关话题

大家好,ElasticTalk 第4次直播将于本周进行,主题是关于 Elastic 官方认证考试的。   我于7月初成功通过了 Elastic Certified Engineer 的考试,拿到下面的徽章。
bin_certificate.png
  感兴趣的同学可以扫下面海报中的二维码或者搜索 elastic-talk 微信号,添加好友后进入直播群。  
phone_post.001_.jpeg
 
大家好,ElasticTalk 第4次直播将于本周进行,主题是关于 Elastic 官方认证考试的。   我于7月初成功通过了 Elastic Certified Engineer 的考试,拿到下面的徽章。
bin_certificate.png
  感兴趣的同学可以扫下面海报中的二维码或者搜索 elastic-talk 微信号,添加好友后进入直播群。  
phone_post.001_.jpeg
 

ET001 不可不掌握的 Logstash 使用技巧

Logstashrockybean 发表了文章 • 3 个评论 • 616 次浏览 • 2018-07-21 11:53 • 来自相关话题

Logstash 是 Elastic Stack 中功能最强大的 ETL 工具,相较于 beats 家族,虽然它略显臃肿,但是强在功能丰富、处理能力强大。大家在使用的过程中肯定也体验过其启动时的慢吞吞,那么有什么办法可以减少等待 Logstash 的启动时间,提高编写其处理配置文件的效率呢?本文给大家推荐一个小技巧,帮助大家解决如下两个问题,让大家更好地与这个笨重的大家伙相处。

  1. 减少 Logstash 重启的次数,也就节省宝贵的时间
  2. 方便快捷地向 Logstash 输入需要处理的内容

1. 打开 reload 配置开关

Logstash 启动的时候可以加上 -r 的参数来做到配置文件热加载,效果是:

  • 当你修改了配置文件后,无需重启 Logstash 即可让新配置文件生效。

它的含义如下:

当你写好配置文件,比如 test.conf ,启动命令如下:

bin/logstash -f test.conf -r

启动完毕,修改 test.conf 的内容并保存后,过 1 秒钟,你会发现 Logstash 端有类似如下日志输出(注意红色框标记的部分),此时说明 reload 的成功。

如果你修改的配置文件有错误,会看到报错的日志,你可以根据错误提示修改。

至此,第一个问题解决!

2. 使用 HTTP INPUT

编写配置文件的另一个痛点是需要针对不同格式的输入内容进行详细的测试,以防解析报错的情况出现。此时大家常用标准输入来解决这个问题(stdin input),但是标准输入对于文字编辑支持不太友好,而且配置文件热更新的功能也不支持标准输入。

在这里向大家推荐使用 http input 插件,配置如下:

input{
    http{
        port => 7474
        codec => "json"
    }
}

然后大家再用自己喜欢的 http 请求工具,比如 POSTMan、Insomnia 等向 http://loclahost:7474发送待测试内容即可,如下是 Insomnia 的截图。

至此,第二个问题也解决了。

3. 总结

相信看到这里,大家一定是跃跃欲试了,赶紧打开电脑,找到 Logstash,然后编辑 test.conf,输入如下内容:

input{
    http{
        port => 7474
        codec => "json"
    }
}

filter{

}

output{
        stdout{
        codec => rubydebug{
            metadata => true
        }
    }
}

然后执行启动命令:

bin/logstash -f test.conf -r

打开 Insomnia ,输入要测试的内容,点击发送,开始舒爽流畅的配置文件编写之旅吧!

ElasticTalk #3 Elasticsearch压测实战 II esrally 进阶实战

资料分享rockybean 发表了文章 • 0 个评论 • 298 次浏览 • 2018-07-20 19:51 • 来自相关话题

ElasticTalk 第3期 直播的内容是 Elasticsearch 压测实战之 esrally 进阶实战。 本次我们主要讲解了 esrally 如何自定义测试集群、自定义数据集和报告,最后还讲了三步上手 esrally 的方法。   视频地址如下: http://www.bilibili.com/video/av27117279/
ElasticTalk 第3期 直播的内容是 Elasticsearch 压测实战之 esrally 进阶实战。 本次我们主要讲解了 esrally 如何自定义测试集群、自定义数据集和报告,最后还讲了三步上手 esrally 的方法。   视频地址如下: http://www.bilibili.com/video/av27117279/

ElasticTalk #2 Elasticsearch压测实战 I esrally 入门与实战

资料分享rockybean 发表了文章 • 0 个评论 • 323 次浏览 • 2018-07-20 19:49 • 来自相关话题

  ElasticTalk 第2期 直播的内容是 Elasticsearch 压测实战之 esrally 入门和实战。希望这次直播可以帮助大家快速掌握 esrally 这款优秀的 es 压测工具。   视频地址如下: https://www.bilibili.com/video/av27114309/
  ElasticTalk 第2期 直播的内容是 Elasticsearch 压测实战之 esrally 入门和实战。希望这次直播可以帮助大家快速掌握 esrally 这款优秀的 es 压测工具。   视频地址如下: https://www.bilibili.com/video/av27114309/

ElasticTalk #1 用 ElasticStack 快速收集和分析 Nginx 日志

资料分享rockybean 发表了文章 • 2 个评论 • 358 次浏览 • 2018-07-20 19:45 • 来自相关话题

  去年做了3期 ElasticTalk 的直播节目,预计下周开始恢复。现在放出相关的视频内容,希望对大家有所帮助。   第1期的课程内容为用 ElasticStack 快速收集分析 Nginx 日志,其中详细讲解了如何使用 filebeat 的 module 功能。     视频地址如下: https://www.bilibili.com/video/av27123368/
  去年做了3期 ElasticTalk 的直播节目,预计下周开始恢复。现在放出相关的视频内容,希望对大家有所帮助。   第1期的课程内容为用 ElasticStack 快速收集分析 Nginx 日志,其中详细讲解了如何使用 filebeat 的 module 功能。     视频地址如下: https://www.bilibili.com/video/av27123368/

Elasticsearch snapshot 备份的使用方法

Elasticsearchrockybean 发表了文章 • 1 个评论 • 1479 次浏览 • 2018-05-31 23:23 • 来自相关话题

常见的数据库都会提供备份的机制,以解决在数据库无法使用的情况下,可以开启新的实例,然后通过备份来恢复数据减少损失。虽然 Elasticsearch 有良好的容灾性,但由于以下原因,其依然需要备份机制。

  1. 数据灾备。在整个集群无法正常工作时,可以及时从备份中恢复数据。
  2. 归档数据。随着数据的积累,比如日志类的数据,集群的存储压力会越来越大,不管是内存还是磁盘都要承担数据增多带来的压力,此时我们往往会选择只保留最近一段时间的数据,比如1个月,而将1个月之前的数据删除。如果你不想删除这些数据,以备后续有查看的需求,那么你就可以将这些数据以备份的形式归档。
  3. 迁移数据。当你需要将数据从一个集群迁移到另一个集群时,也可以用备份的方式来实现。

Elasticsearch 做备份有两种方式,一是将数据导出成文本文件,比如通过 elasticdumpesm 等工具将存储在 Elasticsearch 中的数据导出到文件中。二是以备份 elasticsearch data 目录中文件的形式来做快照,也就是 Elasticsearch 中 snapshot 接口实现的功能。第一种方式相对简单,在数据量小的时候比较实用,当应对大数据量场景效率就大打折扣。我们今天就着重讲解下第二种备份的方式,即 snapshot api 的使用。

备份要解决备份到哪里、如何备份、何时备份和如何恢复的问题,那么我们接下来一个个解决。

1. 备份到哪里

在 Elasticsearch 中通过 repository 定义备份存储类型和位置,存储类型有共享文件系统、AWS 的 S3存储、HDFS、微软 Azure的存储、Google Cloud 的存储等,当然你也可以自己写代码实现国内阿里云的存储。我们这里以最简单的共享文件系统为例,你也可以在本地做实验。

首先,你要在 elasticsearch.yml 的配置文件中注明可以用作备份路径 path.repo ,如下所示:

path.repo: ["/mount/backups", "/mount/longterm_backups"]

配置好后,就可以使用 snapshot api 来创建一个 repository 了,如下我们创建一个名为 my_backup 的 repository。

PUT /_snapshot/my_backup
{
  "type": "fs",
  "settings": {
    "location": "/mount/backups/my_backup"
  }
}

之后我们就可以在这个 repository 中来备份数据了。

2. 如何备份

有了 repostiroy 后,我们就可以做备份了,也叫快照,也就是记录当下数据的状态。如下所示我们创建一个名为 snapshot_1 的快照。

PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true

wait_for_completion 为 true 是指该 api 在备份执行完毕后再返回结果,否则默认是异步执行的,我们这里为了立刻看到效果,所以设置了该参数,线上执行时不用设置该参数,让其在后台异步执行即可。

执行成功后会返回如下结果,用于说明备份的情况:

{
  "snapshots": [
    {
      "snapshot": "snapshot_1",
      "uuid": "52Lr4aFuQYGjMEv5ZFeFEg",
      "version_id": 6030099,
      "version": "6.3.0",
      "indices": [
        ".monitoring-kibana-6-2018.05.30",
        ".monitoring-es-6-2018.05.28",
        ".watcher-history-7-2018.05.30",
        ".monitoring-beats-6-2018.05.29",
        "metricbeat-6.2.4-2018.05.28",
        ".monitoring-alerts-6",
        "metricbeat-6.2.4-2018.05.30"
      ],
      "include_global_state": true,
      "state": "SUCCESS",
      "start_time": "2018-05-31T12:45:57.492Z",
      "start_time_in_millis": 1527770757492,
      "end_time": "2018-05-31T12:46:15.214Z",
      "end_time_in_millis": 1527770775214,
      "duration_in_millis": 17722,
      "failures": [],
      "shards": {
        "total": 28,
        "failed": 0,
        "successful": 28
      }
    }
  ]
}

返回结果的参数意义都是比较直观的,比如 indices 指明此次备份涉及到的索引名称,由于我们没有指定需要备份的索引,这里备份了所有索引;state 指明状态;duration_in_millis 指明备份任务执行时长等。

我们可以通过 GET _snapshot/my_backup/snapshot_1获取 snapshot_1 的执行状态。

此时如果去 /mount/backups/my_backup 查看,会发现里面多了很多文件,这些文件其实都是基于 elasticsearch data 目录中的文件生成的压缩存储的备份文件。大家可以通过 du -sh . 命令看一下该目录的大小,方便后续做对比。

3. 何时备份

通过上面的步骤我们成功创建了一个备份,但随着数据的新增,我们需要对新增的数据也做备份,那么我们如何做呢?方法很简单,只要再创建一个快照 snapshot_2 就可以了。

PUT /_snapshot/my_backup/snapshot_2?wait_for_completion=true

当执行完毕后,你会发现 /mount/backups/my_backup 体积变大了。这说明新数据备份进来了。要说明的一点是,当你在同一个 repository 中做多次 snapshot 时,elasticsearch 会检查要备份的数据 segment 文件是否有变化,如果没有变化则不处理,否则只会把发生变化的 segment file 备份下来。这其实就实现了增量备份。

elasticsearch 的资深用户应该了解 force merge 功能,即可以强行将一个索引的 segment file 合并成指定数目,这里要注意的是如果你主动调用 force merge api,那么 snapshot 功能的增量备份功能就失效了,因为 api 调用完毕后,数据目录中的所有 segment file 都发生变化了。

另一个就是备份时机的问题,虽然 snapshot 不会占用太多的 cpu、磁盘和网络资源,但还是建议大家尽量在闲时做备份。

4. 如何恢复

所谓“养兵千日,用兵一时”,我们该演练下备份的成果,将其恢复出来。通过调用如下 api 即可快速实现恢复功能。

POST /_snapshot/my_backup/snapshot_1/_restore?wait_for_completion=true
{
  "indices": "index_1",
  "rename_replacement": "restored_index_1"
}

通过上面的 api,我们可以将 index_1 索引恢复到 restored_index_1 中。这个恢复过程完全是基于文件的,因此效率会比较高。

虽然我们这里演示的是在同一个集群做备份与恢复,你也可以在另一个集群上连接该 repository 做恢复。我们这里就不做说明了。

5. 其他

由于 Elasticsearch 版本更新比较快,因此大家在做备份与恢复的时候,要注意版本问题,同一个大版本之间的备份与恢复是没有问题的,比如都是 5.1 和 5.6 之间可以互相备份恢复。但你不能把一个高版本的备份在低版本恢复,比如将 6.x 的备份在 5.x 中恢复。而低版本备份在高版本恢复有一定要求:

1) 5.x 可以在 6.x 恢复

2) 2.x 可以在 5.x 恢复

3) 1.x 可以在 2.x 恢复

其他跨大版本的升级都是不可用的,比如1.x 的无法在 5.x 恢复。这里主要原因还是 Lucene 版本问题导致的,每一次 ES 的大版本升级都会伴随 Lucene 的大版本,而 Lucene 的版本是尽量保证向前兼容,即新版可以读旧版的文件,但版本跨越太多,无法实现兼容的情况也在所难免了。

6. 继续学习

本文只是简单对 snapshot 功能做了一个演示,希望这足够引起你的兴趣。如果你想进一步深入的了解该功能,比如备份的时候如何指定部分索引、如何查询备份和还原的进度、如何跨集群恢复数据、如何备份到 HDFS 等,可以详细阅读官方手册https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html,如果在使用的过程中遇到了问题,欢迎留言讨论。

Elasticsearch如何实现 SQL语句中 Group By 和 Limit 的功能

Elasticsearchrockybean 发表了文章 • 2 个评论 • 862 次浏览 • 2018-05-21 07:45 • 来自相关话题

有 SQL 背景的同学在学习 Elasticsearch 时,面对一个查询需求,不由自主地会先思考如何用 SQL 来实现,然后再去想 Elasticsearch 的 Query DSL 如何实现。那么本篇就给大家讲一条常见的 SQL 语句如何用 Elasticsearch 的查询语言实现。

1. SQL语句

假设我们有一个汽车的数据集,每个汽车都有车型、颜色等字段,我希望获取颜色种类大于1个的前2车型。假设汽车的数据模型如下:

{
    "model":"modelA",
    "color":"red"
}

假设我们有一个 cars 表,通过如下语句创建测试数据。

INSERT INTO cars (model,color) VALUES ('A','red'); 
INSERT INTO cars (model,color) VALUES ('A','white'); 
INSERT INTO cars (model,color) VALUES ('A','black'); 
INSERT INTO cars (model,color) VALUES ('A','yellow'); 
INSERT INTO cars (model,color) VALUES ('B','red'); 
INSERT INTO cars (model,color) VALUES ('B','white'); 
INSERT INTO cars (model,color) VALUES ('C','black'); 
INSERT INTO cars (model,color) VALUES ('C','red'); 
INSERT INTO cars (model,color) VALUES ('C','white'); 
INSERT INTO cars (model,color) VALUES ('C','yellow'); 
INSERT INTO cars (model,color) VALUES ('C','blue'); 
INSERT INTO cars (model,color) VALUES ('D','red');
INSERT INTO cars (model,color) VALUES ('A','red'); 

那么实现我们需求的 SQL 语句也比较简单,实现如下:

SELECT model,COUNT(DISTINCT color) color_count FROM cars GROUP BY model HAVING color_count > 1 ORDER BY color_count desc LIMIT 2;

这条查询语句中 Group By 是按照 model 做分组, Having color_count>1 限定了车型颜色种类大于1,ORDER BY color_count desc 限定结果按照颜色种类倒序排列,而 LIMIT 2 限定只返回前3条数据。

那么在 Elasticsearch 中如何实现这个需求呢?

2. 在 Elasticsearch 模拟测试数据

首先我们需要先在 elasticsearch 中插入测试的数据,这里我们使用 bulk 接口 ,如下所示:

POST _bulk
{"index":{"_index":"cars","_type":"doc","_id":"1"}}
{"model":"A","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"2"}}
{"model":"A","color":"white"}
{"index":{"_index":"cars","_type":"doc","_id":"3"}}
{"model":"A","color":"black"}
{"index":{"_index":"cars","_type":"doc","_id":"4"}}
{"model":"A","color":"yellow"}
{"index":{"_index":"cars","_type":"doc","_id":"5"}}
{"model":"B","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"6"}}
{"model":"B","color":"white"}
{"index":{"_index":"cars","_type":"doc","_id":"7"}}
{"model":"C","color":"black"}
{"index":{"_index":"cars","_type":"doc","_id":"8"}}
{"model":"C","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"9"}}
{"model":"C","color":"white"}
{"index":{"_index":"cars","_type":"doc","_id":"10"}}
{"model":"C","color":"yellow"}
{"index":{"_index":"cars","_type":"doc","_id":"11"}}
{"model":"C","color":"blue"}
{"index":{"_index":"cars","_type":"doc","_id":"12"}}
{"model":"D","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"13"}}
{"model":"A","color":"red"}

其中 index 为 cars,type 为 doc,所有数据与mysql 数据保持一致。大家可以在 Kibana 的 Dev Tools 中执行上面的命令,然后执行下面的查询语句验证数据是否已经成功存入。

GET cars/_search

3. Group By VS Terms/Metric Aggregation

SQL 中 Group By 语句在 Elasticsearch 中对应的是 Terms Aggregation,即分桶聚合,对应 Group By color 的语句如下所示:

GET cars/_search
{
  "size":0,
  "aggs":{
    "models":{
      "terms":{
        "field":"model.keyword"
      }
    }
  }
}

结果如下:

{
  "took": 161,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "A",
          "doc_count": 5
        },
        {
          "key": "C",
          "doc_count": 5
        },
        {
          "key": "B",
          "doc_count": 2
        },
        {
          "key": "D",
          "doc_count": 1
        }
      ]
    }
  }
}

我们看 aggregations 这个 key 下面的即为返回结果。

SQL 语句中还有一项是 COUNT(DISTINCT color) color_count 用于计算每个 model 的颜色数,在 Elasticsearch 中我们需要使用一个指标类聚合 Cardinality ,进行不同值计数。语句如下:

GET cars/_search
{
  "size": 0,
  "aggs": {
    "models": {
      "terms": {
        "field": "model.keyword"
      },
      "aggs": {
        "color_count": {
          "cardinality": {
            "field": "color.keyword"
          }
        }
      }
    }
  }
}

其返回结果如下:

{
  "took": 74,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "A",
          "doc_count": 5,
          "color_count": {
            "value": 4
          }
        },
        {
          "key": "C",
          "doc_count": 5,
          "color_count": {
            "value": 5
          }
        },
        {
          "key": "B",
          "doc_count": 2,
          "color_count": {
            "value": 2
          }
        },
        {
          "key": "D",
          "doc_count": 1,
          "color_count": {
            "value": 1
          }
        }
      ]
    }
  }
}

结果中 color_count 即为每个 model 的颜色数,但这里所有的模型都返回了,我们只想要颜色数大于1的模型,因此这里还要加一个过滤条件。

4. Having Condition VS Bucket Filter Aggregation

Having color_count > 1 在 Elasticsearch 中对应的是 Bucket Filter 聚合,语句如下所示:

GET cars/_search
{
  "size": 0,
  "aggs": {
    "models": {
      "terms": {
        "field": "model.keyword"
      },
      "aggs": {
        "color_count": {
          "cardinality": {
            "field": "color.keyword"
          }
        },
        "color_count_filter": {
          "bucket_selector": {
            "buckets_path": {
              "colorCount": "color_count"
            },
            "script": "params.colorCount>1"
          }
        }
      }
    }
  }
}

返回结果如下:

{
  "took": 39,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "A",
          "doc_count": 5,
          "color_count": {
            "value": 4
          }
        },
        {
          "key": "C",
          "doc_count": 5,
          "color_count": {
            "value": 5
          }
        },
        {
          "key": "B",
          "doc_count": 2,
          "color_count": {
            "value": 2
          }
        }
      ]
    }
  }
}

此时返回结果只包含颜色数大于1的模型,但大家会发现颜色数多的 C 不是在第一个位置,我们还需要做排序处理。

5. Order By Limit VS Bucket Sort Aggregation

ORDER BY color_count desc LIMIT 3 在 Elasticsearch 中可以使用 Bucket Sort 聚合实现,语句如下所示:

GET cars/_search
{
  "size": 0,
  "aggs": {
    "models": {
      "terms": {
        "field": "model.keyword"
      },
      "aggs": {
        "color_count": {
          "cardinality": {
            "field": "color.keyword"
          }
        },
        "color_count_filter": {
          "bucket_selector": {
            "buckets_path": {
              "colorCount": "color_count"
            },
            "script": "params.colorCount>1"
          }
        },
        "color_count_sort": {
          "bucket_sort": {
            "sort": {
              "color_count": "desc"
            },
            "size": 2
          }
        }
      }
    }
  }
}

返回结果如下:

{
  "took": 32,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "C",
          "doc_count": 5,
          "color_count": {
            "value": 5
          }
        },
        {
          "key": "A",
          "doc_count": 5,
          "color_count": {
            "value": 4
          }
        }
      ]
    }
  }
}

至此我们便将 SQL 语句实现的功能用 Elasticsearch 查询语句实现了。对比 SQL 语句与 Elasticsearch 的查询语句,大家会发现后者复杂了很多,但并非无章可循,随着大家对常见语法越来越熟悉,相信一定会越写越得心应手!

【最新】Elasticsearch 6.6 Index Lifecycle Management 尝鲜

资讯动态rockybean 发表了文章 • 2 个评论 • 587 次浏览 • 2019-02-06 11:32 • 来自相关话题

1月29日,Elastic Stack 迎来 6.6 版本的发布,该版本带来很多新功能,比如:

  • Index Lifecycle Management
  • Frozen Index
  • Geoshape based on Bkd Tree
  • SQL adds support for Date histograms
  • ......

在这些众多功能中,Index Lifecycle Management(索引生命周期管理,后文简称 ILM) 是最受社区欢迎的。今天我们从以下几方面来快速了解下该功能:

  1. 为什么索引会有生命?什么是索引生命周期?
  2. ILM 是如何划分索引生命周期的?
  3. ILM 是如何管理索引生命周期的?
  4. 实战

1. Index Lifecycle 索引生命周期

先来看第一个问题:

为什么索引有生命?

索引(Index)是 Elasticsearch 中数据组织的一个逻辑概念,是具有相同或相似字段的文档组合。它由众多分片(Shard)组成,比如 bookpeople都可以用作索引名称,可以简单类比为关系型数据库的表(table)。

所谓生命,即生与死;索引的便是创建删除了。

在我们日常使用 Elasticsearch 的时候,索引的创建与删除似乎是很简单的事情,用的时候便创建,不用了删除即可,有什么好管理的呢?

这就要从 Elasticsearch 的应用场景来看了。

业务搜索场景,用户会将业务数据存储在 Elasticsearch 中,比如商品数据、订单数据、用户数据等,实现快速的全文检索功能。像这类数据基本都是累加的,不会删除。一般删除的话,要么是业务升级,要么是业务歇菜了。此种场景下,基本只有生,没有死,也就不存在管理一说。

而在日志业务场景中,用户会将各种日志,如系统、防火墙、中间件、数据库、web 服务器、应用日志等全部实时地存入 Elasticsearch 中,进行即席日志查询与分析。这种类型的数据都会有时间维度,也就是时序性的数据。由于日志的数据量过大,用户一般不会存储全量的数据,一般会在 Elasticsearch 中存储热数据,比如最近7天、30天的数据等,而在7天或者30天之前的数据都会被删除。为了便于操作,用户一般会按照日期来建立索引,比如 nginx 的日志索引名可能为 nginx_log-2018.11.12nginx_log-2018.11.13等,当要查询或删除某一天的日志时,只需要针对对应日期的索引做操作就可以了。那么在该场景下,每天都会上演大量索引的生与死。

一个索引由生到死的过程,即为一个生命周期。举例如下:

  • 生:在 2019年2月5日 创建 nginx_log-2019.02.05的索引,开始处理日志数据的读写请求
  • 生:在 2019年2月6日 nginx_log-2019.02.05 索引便不再处理写请求,只处理读请求
  • 死:在 2019年3月5日 删除 nginx_log-2018.02.05的索引

其他的索引,如 nginx_log-2019.02.06 等也会经过相同的一个生命周期。

2. ILM 是如何划分索引生命周期的?

我们现在已经了解何为生命周期了,而最简单的生命周期只需要两个阶段即可。但在实际使用中生命周期是有多个阶段的,我们来看下 ILM 是如何划分生命周期的。

ILM 一共将索引生命周期分为四个阶段(Phase):

  1. Hot 阶段
  2. Warm 阶段
  3. Cold 阶段
  4. Delete 阶段

如果我们拿一个人的生命周期来做类比的话,大概如下图所示:

Index Lifecycle

Hot 阶段

Hot 阶段可类比为人类婴儿到青年的阶段,在这个阶段,它会不断地进行知识的输入与输出(数据读写),不断地长高长大(数据量增加)成有用的青年。

由于该阶段需要进行大量的数据读写,因此需要高配置的节点,一般建议将节点内存与磁盘比控制在 32 左右,比如 64GB 内存搭配 2TB 的 SSD 硬盘。

Warm 阶段

Warm 阶段可类比为人类青年到中年的阶段,在这个阶段,它基本不会再进行知识的输入(数据写入),主要进行知识输出(数据读取),为社会贡献价值。

由于该阶段主要负责数据的读取,中等配置的节点即可满足需求,可以将节点内存与磁盘比提高到 64~96 之间,比如 64GB 内存搭配 4~6TB 的 HDD 磁盘。

Cold 阶段

Cold 阶段可类别比为人类中年到老年的阶段,在这个阶段,它退休了,在社会有需要的时候才出来输出下知识(数据读取),大部分情况都是静静地待着。

由于该阶段只负责少量的数据读取工作,低等配置的节点即可满足要求,可以将节点内存与磁盘比进一步提高到 96 以上,比如128,即 64GB 内存搭配 8 TB 的 HDD 磁盘。

Delete 阶段

Delete 阶段可类比为人类寿终正寝的阶段,在发光发热之后,静静地逝去,Rest in Peace~

ILM 对于索引的生命周期进行了非常详细的划分,但它并不强制要求必须有这个4个阶段,用户可以根据自己的需求组合成自己的生命周期。

3. ILM 是如何管理索引生命周期的?

所谓生命周期的管理就是控制 4 个生命阶段转换,何时由 Hot 转为 Warm,何时由 Warm 转为 Cold,何时 Delete 等。

阶段的转换必然是需要时机的,而对于时序数据来说,时间必然是最好的维度,而 ILM 也是以时间为转换的衡量单位。比如下面这张转换的示意图,即默认是 Hot 阶段,在索引创建 3 天后转为 Warm 阶段,7 天后转为 Cold 阶段,30 天后删除。这个设置的相关字段为 min_age,后文会详细讲解。

ILM 将不同的生命周期管理策略称为 Policy,而所谓的 Policy 是由不同阶段(Phase)的不同动作(Action)组成的。

Action是一系列操作索引的动作,比如 Rollover、Shrink、Force Merge等,不同阶段可用的 Action 不同,详情如下:

  • Hot Phase
    • Rollover 滚动索引操作,可用在索引大小或者文档数达到某设定值时,创建新的索引用于数据读写,从而控制单个索引的大小。这里要注意的一点是,如果启用了 Rollover,那么所有阶段的时间不再以索引创建时间为准,而是以该索引 Rollover 的时间为准。
  • Warm Phase
    • Allocate 设定副本数、修改分片分配规则(如将分片迁移到中等配置的节点上)等
    • Read-Onlly 设定当前索引为只读状态
    • Force Merge 合并 segment 操作
    • Shrink 缩小 shard 分片数
  • Cold Phase
    • Allocate 同上
  • Delete Phase
    • Delete 删除

从上面看下来整体操作还是很简单的,Kibana 也提供了一套 UI 界面来设置这些策略,如下所示:

kibana ilm

从上图看下来 ILM 的设置是不是一目了然呢?

当然,ILM 是有自己的 api 的,比如上面图片对应的 api 请求如下:

PUT /_ilm/policy/test_ilm2
{
    "policy": {
        "phases": {
            "hot": {
                "actions": {
                    "rollover": {
                        "max_age": "30d",
                        "max_size": "50gb"
                    }
                }
            },
            "warm": {
                "min_age": "3d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "warm"
                        },
                        "number_of_replicas": 0
                    },
                    "forcemerge": {
                        "max_num_segments": 1
                    },
                    "shrink": {
                        "number_of_shards": 1
                    }
                }
            },
            "cold": {
                "min_age": "7d",
                "actions": {
                    "allocate": {
                        "require": {
                            "box_type": "cold"
                        }
                    }
                }
            },
            "delete": {
                "min_age": "30d",
                "actions": {
                    "delete": {}
                }
            }
        }
    }
}

这里不展开讲了,感兴趣的同学可以自行查看官方手册。

现在管理策略(Policy)已经有了,那么如何应用到索引(Index)上面呢?

方法为设定如下的索引配置:

  • index.lifecycle.name 设定 Policy 名称,比如上面的 test_ilm2
  • index.lifecycle.rollover_alias 如果使用了 Rollover,那么还需要指定该别名

修改索引配置可以直接修改(`PUT index_name/_settings)或者通过索引模板(Index Template)来实现。

我们这里不展开讲了,大家参考下面的实战就明白了。

4. 实战

下面我们来实际演练一把!

目标

现在需要收集 nginx 日志,只需保留最近30天的日志,但要保证最近7天的日志有良好的查询性能,搜索响应时间在 100ms 以内。

为了让大家可以快速看到效果,下面实际操作的时候会将 30天7天 替换为 40秒20秒

ES 集群架构

这里我们简单介绍下这次实战所用 ES 集群的构成。该 ES 集群一共有 3个节点组成,每个节点都有名为 box_type 的属性,如下所示:

GET _cat/nodeattrs?s=attr
es01_hot  172.24.0.5 172.24.0.5 box_type          hot
es02_warm 172.24.0.4 172.24.0.4 box_type          warm
es03_cold 172.24.0.3 172.24.0.3 box_type          cold

由上可见我们有 1 个 hot 节点、1 个 warm 节点、1 个 cold 节点,分别用于对应 ILM 的阶段,即 Hot 阶段的索引都位于 hot 上,Warm 阶段的索引都位于 warm 上,Cold 阶段的索引都位于 cold 上。

创建 ILM Policy

根据要求,我们的 Policy 设定如下:

  • 索引名以 nginx_logs 为前缀,且以每10个文档做一次 Rollover
  • Rollover 后 5 秒转为 Warm 阶段
  • Rollover 后 20 秒转为 Cold 阶段
  • Rollover 后 40 秒删除

API 请求如下:

PUT /_ilm/policy/nginx_ilm_policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_docs": "10"
          }
        }
      },
      "warm": {
        "min_age": "5s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "warm"
            }
          }
        }
      },
      "cold": {
        "min_age": "20s",
        "actions": {
          "allocate": {
            "include": {
              "box_type": "cold"
            }
          }
        }
      },
      "delete": {
        "min_age": "40s",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

创建 Index Template

我们基于索引模板来创建所需的索引,如下所示:

PUT /_template/nginx_ilm_template
{
  "index_patterns": ["nginx_logs-*"],                 
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0,
    "index.lifecycle.name": "nginx_ilm_policy",      
    "index.lifecycle.rollover_alias": "nginx_logs",
    "index.routing.allocation.include.box_type": "hot"
  }
}

上述配置解释如下:

  • index.lifecycle.name 指明该索引应用的 ILM Policy
  • index.lifecycle.rollover_alias 指明在 Rollover 的时候使用的 alias
  • index.routing.allocation.include.box_type 指明新建的索引都分配在 hot 节点上

创建初始索引 Index

ILM 的第一个索引需要我们手动来创建,另外启动 Rollover 必须以数值类型结尾,比如 nginx_logs-000001。索引创建的 api 如下:

PUT nginx_logs-000001
{
  "aliases": {
    "nginx_logs": {
      "is_write_index":true
    }
  }
}

此时索引分布如下所示:

修改 ILM Polling Interval

ILM Service 会在后台轮询执行 Policy,默认间隔时间为 10 分钟,为了更快地看到效果,我们将其修改为 1 秒。

PUT _cluster/settings
{
  "persistent": {
    "indices.lifecycle.poll_interval":"1s"
  }
}

开始吧

一切准备就绪,我们开始吧!

首先执行下面的新建文档操作 10 次。

POST nginx_logs/_doc
{
  "name":"abbc"
}

之后 Rollover 执行,新的索引创建,如下所示:

5 秒后,nginx_logs-000001 转到 Warm 阶段

15 秒后(20 秒是指距离 Rollover 的时间,因为上面已经过去5秒了,所以这里只需要15秒),nginx_logs-00001转到 Cold 阶段

25 秒后,nginx_logs-00001删除

至此,一个完整的 ILM Policy 执行的流程就结束了,而后续 nginx_logs-000002 也会按照这个设定进行流转。

总结

ILM 是 Elastic 团队将多年 Elasticsearch 在日志场景领域的最佳实践进行的一次总结归纳和落地实施,极大地降低了用好 Elasticsearch 的门槛。掌握了 ILM 的核心概念,也就意味着掌握了 Elasticsearch 的最佳实践。希望本文能对大家入门 ILM 有所帮助。

在线研讨会

一篇文章所能承载的信息量和演示效果终究是有限的,2月份我们会针对 ILM 做一次线上研讨会,感兴趣的同学可以点击下面的链接注册报名。

https://jinshuju.net/f/TjBbvx

为了提高研讨会的质量,我们本次引入了审核机制,报名的同学请耐心等待,待我们审核通过后,您会收到我们的邮件邀请。

参考资料:

  1. https://www.elastic.co/guide/en/elasticsearch/reference/6.6/index-lifecycle-management.html
  2. https://www.elastic.co/blog/hot-warm-architecture-in-elasticsearch-5-x

ET007 ElasticStack 6.5 介绍

ElasticsearchLeon J 发表了文章 • 1 个评论 • 915 次浏览 • 2018-11-19 09:18 • 来自相关话题

就在 11月14日,ElasticStack 6.5.0 发布了,此次发布带来了许多激动人心的特性,我们一起来体验一下:

WX20181118-120551@2x

如果没有任何数据,kibana会提示我们导入sample数据,这边我选择Try our sample data, 然后导入全部3个样例数据,这可以让我们在没有数据的情况下快速体验新特性。

Infrastructure & Logs UI

很多用户使用 ElasticStack 收集基础架构的日志和指标,比如系统日志、安全日志、CPU指标,内存指标等等。在6.5中,kibana 侧边栏中增加了 Infrastructure 和 Logs 两个新的 tab,让用户更简单地查看自己的基础架构,和每台主机或者容器里的日志。

logs

进入logs标签页,如果当前没有数据,kibana会引导我们添加数据

WX20181118-121032@2x

我们选择 system logs

WX20181118-121047@2x

根据指示,我们安装部署好filebeat并启动,再次进入 logs 标签页便可以看到收集到的系统日志了

image-20181118185158451

  1. 搜索过滤框:在这里可以像在 discover 里一样写query string,并且会有输入提示
  2. 时间选择框:可以选择需要查看的时间点,如果点了 Stream live,会持续监听尾部新输出的日志内容,类似 linux 命令中的tail -f
  3. 日志时间轴:高亮的部位是当前查看日志所在的时间范围,对应的区域图标识了日志量

假如我想实现 tail -f /var/log/system.log | grep google.com 一样的效果,可以打开 Stream live,并在搜索过滤框中这样输入:

WX20181118-173432@2x

很简单,很方便有木有?

Infrastructure

同样在kibana的引导下安装 Metric beat,并开启system模块,启动后进入 infrastructure 标签页:

image-20181118190614385

这里可以直观地看到所有基础架构的指标状况,深色的内层代表主机,颜色代表了健康状况。浅灰色的外层代表了group,因为我只在自己的笔记本上做了部署,所以只能看到一个host。

image-20181118191527060

点击主机会弹出菜单

  • View logs : 跳转到 logs 标签页,并通过搜索过滤框指定host,只查看这台主机的日志。
  • View metrics : 跳转到这台主机的指标详情,可以查看历史数据 shoot

APM

Java 和 Go

不负众望,继 Nodejs、Python、Ruby、Javascript 之后,Elastic APM 5.6.0 新增了对 Java 和 Golang 的支持!

Distributed Tracing

在 SOA 和 MSA 大行其道的年代,如何追踪请求在各个系统之间的流动成为了apm的关键问题。

Elastic APM 支持 OpenTracing 标准,并在各个agent里内置了 OpenTracing 兼容的bridge

以下是官网上该特性的截图:

distributed_tracing

APM Server 监控

如 ElasticStack的其他产品一般,APM也支持了监控,并可以在 Kinbana Montoring下查看监控信息:

apm_monitoring

APM Server 内存占用优化

通过新的基于NDJSON的协议,agent可以在采集信息后通过事件流立即发往APM server,这样 APM Server可以一个接一个地处理接收到的事件,而不是一次性地收到一大块(chunk),这样在很大程度上减少了APM Server的内存占用。

Elasticsearch

Cross-cluster replication

这里的副本并非我们平时常见的分片副本,而是通过在集群B配置一个副本indexB来追随集群A中的indexA,indexA中发生的任何变化都会同步到indexB中来。另外也可以配置一个pattern,当集群A出现符合pattern的索引,自动在集群B创建他的副本,这听起来很酷。值得一提的是,这将是白金版里新增的一个特性。

Minimal Snapshots

snapshot 是 es 中用来创建索引副本的特性,在之前的版本中,snapshot会把完整的 index 都保存下来,包括原始数据和索引数据等等。新的 Minimal Snapshots 提供了一种只备份 _source 内容和 index metadata,当需要恢复时,需要通过 reindex 操作来完成。最小快照最多可能帮你节省50%的磁盘占用,但是会花费更多的时间来恢复。这个特性可能并不适合所有人,但给恢复窗口比较长,且磁盘容量有限的用户多了一种选择。

SQL / ODBC

现在可以使用 支持 ODBC 的第三方工具来连接 elasticsearch 了!我想可以找时间试试用 tableau 直连 elasticsearch会是啥效果。

Java 11

Java11 是一个 LTS 版本,相信会有越来越多的用户升级到 java11

G1GC支持

经过无数的测试,Elasticsearch官方宣布了在 JDK 10+ 上支持 G1GC。G1GC 相比 CMS有诸多优势,如今可以放心地使用G1GC了。(期待对ZGC的支持!)

Authorization realm

X-Pack Security中的新特性,可以对用户认证和用户授权分别配置 realm,比如使用内置的用户体系来认证,再去ldap中获取用户的角色、权限等信息。这也是白金版新增的特性。

机器学习的新特性

  • 支持在同一个机器学习任务中分析多个时间系列
  • 为机器学习任务添加了新的多分桶(multi-bucket) 分析

Kibana

Canvas

Canvas ! 我在做数据分析师的同学看到之后说太酷了,像 PPT。

点击侧边栏的 canvas 标签,可以看到我们先前导入的样本数据也包含了 canvas 样例:

WX20181118-210126@2x

在 11月的 深圳开发者大会上,上海普翔 也用 canvas 对填写调查问卷的参会人员做了分析:

UNADJUSTEDNONRAW_thumb_1adc

https://github.com/alexfrancoeur/kibana_canvas_examples 这里有很多非常不错的 canvas 样例供大家学习,把json文件直接拖到 canvas 页面就可以导入学习了!

Spaces

把 kibana 对象(比如 visualizations、dashboards)组织到独立的 space 里,并且通过 RBAC 来控制哪些用户可以访问哪些 space。这实在是太棒了,想象在一个企业里,多个部门通过kibana查询、分析数据,大家关注的dashboard肯定是不一样的,在6.5之前,我们只能通过社区插件来实现这样的需求,而大版本的升级可能直接导致插件不可用,有了 Space,我们不必再担心!

image-20181118212404768

Rollups UI

Rollup 是 es6.4 中新增的一个特性,用来把一些历史数据压缩归档,用作以后的分析。6.5.0 中 kibana 增加了一个界面用来查看和管理 Rollup 任务。

image9

Data visualizer for files

通过可视化的方式查看文件的结构,查看其中出现最频繁的内容:

highlights_6_5_viz-logs

Beats

Beats Central Management

Beats 终于也支持中心化配置管理了!我们只需按照往常一样安装filebeat、metricbeat,然后使用 filebeat enroll <kibana-url> <token>,便可以通过kibana来管理beats的配置、甚至给他们打上tag:

Image from iOS

想一想,假如我们在上千台机器上部署filebeat,如果哪天需要批量变更配置文件,只需要通过脚本调用配置管理的API就可以了

Functionbeat

Functionbeat是一种新的beat类型,可以被部署为一个方法,而不需要跑在服务器环境上,比如 AWS Lambda function。

以上就是 6.5.0 版本的主要特性,更详细的内容可以查看 https://www.elastic.co/blog/elastic-stack-6-5-0-released ,希望通过我的介绍,可以让大家了解到新版本所带来的激动人心的特性。

Image from iOS

你看懂 Elasticsearch Log 中的 GC 日志了吗?

Elasticsearchrockybean 发表了文章 • 0 个评论 • 858 次浏览 • 2018-09-22 23:29 • 来自相关话题

如果你关注过 elasticsearch 的日志,可能会看到如下类似的内容:

[2018-06-30T17:57:23,848][WARN ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][228384] overhead, spent [2.2s] collecting in the last [2.3s]

[2018-06-30T17:57:29,020][INFO ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][old][228385][160772] duration [5s], collections [1]/[5.1s], total [5s]/[4.4d], memory [945.4mb]->[958.5mb]/[1007.3mb], all_pools {[young] [87.8mb]->[100.9mb]/[133.1mb]}{[survivor] [0b]->[0b]/[16.6mb]}{[old] [857.6mb]->[857.6mb]/[857.6mb]}

看到其中的[gc]关键词你也猜到了这是与 GC 相关的日志,那么你了解每一部分的含义吗?如果不了解,你可以继续往下看了。

我们先从最简单的看起:

  1. 第一部分是日志发生的时间
  2. 第二部分是日志级别,这里分别是WARNINFO
  3. 第三部分是输出日志的类,我们后面也会讲到这个类
  4. 第四部分是当前 ES 节点名称
  5. 第五部分是 gc 关键词,我们就从这个关键词聊起。

友情提示:对 GC 已经了如指掌的同学,可以直接翻到最后看答案。

1. 什么是 GC?

GC,全称是 Garbage Collection (垃圾收集)或者 Garbage Collector(垃圾收集器)。

在使用 C语言编程的时候,我们要手动的通过 mallocfree来申请和释放数据需要的内存,如果忘记释放内存,就会发生内存泄露的情况,即无用的数据占用了宝贵的内存资源。而Java 语言编程不需要显示的申请和释放内存,因为 JVM 可以自动管理内存,这其中最重要的一部分就是 GC,即 JVM 可以自主地去释放无用数据(垃圾)占用的内存。

我们研究 GC 的主要原因是 GC 的过程会有 Stop The World(STW)的情况发生,即此时用户线程会停止工作,如果 STW 的时间过长,则应用的可用性、实时性等就下降的很厉害。

GC主要解决如下3个问题:

  1. 如何找到垃圾?
  2. 如何回收垃圾?
  3. 何时回收垃圾?

我们一个个来看下。

1.1 如何找到垃圾?

所谓垃圾,指的是不再被使用(引用)的对象。Java 的对象都是在堆(Heap)上创建的,我们这里默认也只讨论堆。那么现在问题就变为如何判定一个对象是否还有被引用,思路主要有如下两种:

  1. 引用计数法,即在对象被引用时加1,去除引用时减1,如果引用值为0,即表明该对象可回收了。
  2. 可达性分析法,即通过遍历已知的存活对象(GC Roots)的引用链来标记出所有存活对象

方法1简单粗暴效率高,但准确度不行,尤其是面对互相引用的垃圾对象时无能为力。

方法2是目前常用的方法,这里有一个关键是 GC Roots,它是判定的源头,感兴趣的同学可以自己去研究下,这里就不展开讲了。

1.2 如何回收垃圾?

垃圾找到了,该怎么回收呢?看起来似乎是个很傻的问题。直接收起来扔掉不就好了?!对应到程序的操作,就是直接将这些对象占用的空间标记为空闲不就好了吗?那我们就来看一下这个基础的回收算法:标记-清除(Mark-Sweep)算法。

1.2.1 标记-清除 算法(Mark Sweep)

该算法很简单,使用通过可达性分析分析方法标记出垃圾,然后直接回收掉垃圾区域。它的一个显著问题是一段时间后,内存会出现大量碎片,导致虽然碎片总和很大,但无法满足一个大对象的内存申请,从而导致 OOM,而过多的内存碎片(需要类似链表的数据结构维护),也会导致标记和清除的操作成本高,效率低下,如下图所示:

1.2.2 复制算法(Copying)

为了解决上面算法的效率问题,有人提出了复制算法。它将可用内存一分为二,每次只用一块,当这一块内存不够用时,便触发 GC,将当前存活对象复制(Copy)到另一块上,以此往复。这种算法高效的原因在于分配内存时只需要将指针后移,不需要维护链表等。但它最大的问题是对内存的浪费,使用率只有 50%。

但这种算法在一种情况下会很高效:Java 对象的存活时间极短。据 IBM 研究,Java 对象高达 98% 是朝生夕死的,这也意味着每次 GC 可以回收大部分的内存,需要复制的数据量也很小,这样它的执行效率就会很高。

1.2.3 标记-整理算法(Mark Compact)

该算法解决了第1中算法的内存碎片问题,它会在回收阶段将所有内存做整理,如下图所示:

但它的问题也在于增加了整理阶段,也就增加了 GC 的时间。

1.2.4 分代收集算法(Generation Collection)

既然大部分 Java 对象是朝生夕死的,那么我们将内存按照 Java 生存时间分为 新生代(Young)老年代(Old),前者存放短命僧,后者存放长寿佛,当然长寿佛也是由短命僧升级上来的。然后针对两者可以采用不同的回收算法,比如对于新生代采用复制算法会比较高效,而对老年代可以采用标记-清除或者标记-整理算法。这种算法也是最常用的。JVM Heap 分代后的划分一般如下所示,新生代一般会分为 Eden、Survivor0、Survivor1区,便于使用复制算法。

将内存分代后的 GC 过程一般类似下图所示:

  1. 对象一般都是先在 Eden区创建
  2. Eden区满,触发 Young GC,此时将 Eden中还存活的对象复制到 S0中,并清空 Eden区后继续为新的对象分配内存
  3. Eden区再次满后,触发又一次的 Young GC,此时会将 EdenS0中存活的对象复制到 S1中,然后清空EdenS0后继续为新的对象分配内存
  4. 每经过一次 Young GC,存活下来的对象都会将自己存活次数加1,当达到一定次数后,会随着一次 Young GC 晋升到 Old
  5. Old区也会在合适的时机进行自己的 GC

1.2.5 常见的垃圾收集器

前面我们讲了众多的垃圾收集算法,那么其具体的实现就是垃圾收集器,也是我们实际使用中会具体用到的。现代的垃圾收集机制基本都是分代收集算法,而 YoungOld区分别有不同的垃圾收集器,简单总结如下图:

从上图我们可以看到 YoungOld区有不同的垃圾收集器,实际使用时会搭配使用,也就是上图中两两连线的收集器是可以搭配使用的。这些垃圾收集器按照运行原理大概可以分为如下几类:

  • Serial GC串行,单线程的收集器,运行 GC 时需要停止所有的用户线程,且只有一个 GC 线程
  • Parallel GC并行,多线程的收集器,是 Serial 的多线程版,运行时也需要停止所有用户线程,但同时运行多个 GC 线程,所以效率高一些
  • Concurrent GC并发,多线程收集器,GC 分多阶段执行,部分阶段允许用户线程与 GC 线程同时运行,这也就是并发的意思,大家要和并行做一个区分。
  • 其他

我们下面简单看一下他们的运行机制。

1.2.5.1 Serial GC

该类 Young区的为 Serial GCOld区的为Serial Old GC。执行大致如下所示:

1.2.5.2 Parallel GC

该类Young 区的有 ParNewParallel ScavengeOld 区的有Parallel Old。其运行机制如下,相比 Serial GC ,其最大特点在于 GC 线程是并行的,效率高很多:

1.2.5.3 Concurrent Mark-Sweep GC

该类目前只是针对 Old 区,最常见就是CMS GC,它的执行分为多个阶段,只有部分阶段需要停止用户进程,这里不详细介绍了,感兴趣可以去找相关文章来看,大体执行如下:

1.2.5.4 其他

目前最新的 GC 有G1GCZGC,其运行机制与上述均不相同,虽然他们也是分代收集算法,但会把 Heap 分成多个 region 来做处理,这里不展开讲,感兴趣的可以参看最后参考资料的内容。

1.2.6 Elasticsearch 的 GC 组合

Elasticsearch 默认的 GC 配置是CMS GC ,其 Young 区ParNewOld 区CMS,大家可以在 config/jvm.options中看到如下的配置:

## GC configuration
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly

1.3 何时进行回收?

现在我们已经知道如何找到和回收垃圾了,那么什么时候回收呢?简单总结如下:

  1. Young 区的GC 都是在 Eden 区满时触发
  2. Serial Old 和 Parallel Old 在 Old 区是在 Young GC 时预测Old 区是否可以为 young 区 promote 到 old 区 的 object 分配空间,如果不可用则触发 Old GC。这个也可以理解为是 Old区满时。
  3. CMS GC 是在 Old 区大小超过一定比例后触发,而不是 Old 区满。这个原因在于 CMS GC 是并发的算法,也就是说在 GC 线程收集垃圾的时候,用户线程也在运行,因此需要预留一些 Heap 空间给用户线程使用,防止由于无法分配空间而导致 Full GC 发生。

2. GC Log 如何阅读?

前面讲了这么多,终于可以回到开篇的问题了,我们直接来看答案

[2018-06-30T17:57:23,848][WARN ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][228384] overhead, spent [2.2s] collecting in the last [2.3s]

[gc][这是第228384次GC 检查] 在最近 2.3 s 内花了 2.2s 用来做垃圾收集,这占比似乎有些过了,请抓紧来关注下。

[2018-06-30T17:57:29,020][INFO ][o.e.m.j.JvmGcMonitorService] [qoo--eS] [gc][old][228385][160772] duration [5s], collections [1]/[5.1s], total [5s]/[4.4d], memory [945.4mb]->[958.5mb]/[1007.3mb], all_pools {[young] [87.8mb]->[100.9mb]/[133.1mb]}{[survivor] [0b]->[0b]/[16.6mb]}{[old] [857.6mb]->[857.6mb]/[857.6mb]}

我们直接来看具体的含义好了,相信有了前面的 GC 基础知识,大家在看这里解释的时候就非常清楚了。

  • [gc][本次是 old GC][这是第228385次 GC 检查][从 JVM 启动至今发生的第 160772次 GC]

  • duration [本次检查到的 GC 总耗时 5 秒,可能是多次的加和],

  • collections [从上次检查至今总共发生1次 GC]/[从上次检查至今已过去 5.1 秒],

  • total [本次检查到的 GC 总耗时为 5 秒]/[从 JVM 启动至今发生的 GC 总耗时为 4.4 天],

  • memory [ GC 前 Heap memory 空间]->[GC 后 Heap memory 空间]/[Heap memory 总空间],

  • all_pools(分代部分的详情) {[young 区][GC 前 Memory ]->[GC后 Memory]/[young区 Memory 总大小] } {[survivor 区][GC 前 Memory ]->[GC后 Memory]/[survivor区 Memory 总大小] }{[old 区][GC 前 Memory ]->[GC后 Memory]/[old区 Memory 总大小] }

3. 看看源码

从日志中我们可以看到输出这些日志的类名叫做JvmGcMonitorService,我们去源码中搜索很快会找到它/Users/rockybean/code/elasticsearch/core/src/main/java/org/elasticsearch/monitor/jvm/JvmGcMonitorService.java,这里就不详细展开讲解源码了,它执行的内容大概如下图所示:

关于打印日志的格式在源码也有,如下所示:

private static final String SLOW_GC_LOG_MESSAGE =
"[gc][{}][{}][{}] duration [{}], collections [{}]/[{}], total [{}]/[{}], memory [{}]->[{}]/[{}], all_pools {}";
private static final String OVERHEAD_LOG_MESSAGE = "[gc][{}] overhead, spent [{}] collecting in the last [{}]";

另外细心的同学会发现输出的日志中 gc 只分了 young 和 old ,原因在于 ES 对 GC Name 做了封装,封装的类为:org.elasticsearch.monitor.jvm.GCNames,相关代码如下:

    public static String getByMemoryPoolName(String poolName, String defaultName) {
        if ("Eden Space".equals(poolName) || "PS Eden Space".equals(poolName) || "Par Eden Space".equals(poolName) || "G1 Eden Space".equals(poolName)) {
            return YOUNG;
        }
        if ("Survivor Space".equals(poolName) || "PS Survivor Space".equals(poolName) || "Par Survivor Space".equals(poolName) || "G1 Survivor Space".equals(poolName)) {
            return SURVIVOR;
        }
        if ("Tenured Gen".equals(poolName) || "PS Old Gen".equals(poolName) || "CMS Old Gen".equals(poolName) || "G1 Old Gen".equals(poolName)) {
            return OLD;
        }
        return defaultName;
    }

    public static String getByGcName(String gcName, String defaultName) {
        if ("Copy".equals(gcName) || "PS Scavenge".equals(gcName) || "ParNew".equals(gcName) || "G1 Young Generation".equals(gcName)) {
            return YOUNG;
        }
        if ("MarkSweepCompact".equals(gcName) || "PS MarkSweep".equals(gcName) || "ConcurrentMarkSweep".equals(gcName) || "G1 Old Generation".equals(gcName)) {
            return OLD;
        }
        return defaultName;
    }

在上面的代码中,你会看到很多我们在上一节中提到的 GC 算法的名称。

至此,源码相关部分也讲解完毕,感兴趣的大家可以自行去查阅。

4. 总结

讲解 GC 的文章已经很多,本文又唠唠叨叨地讲一遍基础知识,是希望对于第一次了解 GC 的同学有所帮助。因为只有了解了这些基础知识,你才不至于被这些 GC 的输出吓懵。希望本文对你理解 ES 的 GC 日志 有所帮助。

5. 参考资料

  1. Java Hotspot G1 GC的一些关键技术(https://mp.weixin.qq.com/s/4ufdCXCwO56WAJnzng_-ow
  2. Understanding Java Garbage Collection(https://www.cubrid.org/blog/understanding-java-garbage-collection
  3. 《深入理解Java虚拟机:JVM高级特性与最佳实践》

6. 相关推荐

如果你想深入的了解 JAVA GC 的知识,可以关注 ElasticTalk 公众号,回复 GC关键词后即可获取作者推荐的电子书等资料。

elasticTalk,qrcode

通过 metadata 使logstash配置更简洁

LogstashLeon J 发表了文章 • 0 个评论 • 738 次浏览 • 2018-09-04 13:17 • 来自相关话题

从Logstash 1.5开始,我们可以在logstash配置中使用metadata。metadata不会在output中被序列化输出,这样我们便可以在metadata中添加一些临时的中间数据,而不需要去删除它。

我们可以通过以下方式来访问metadata:

[@metadata][foo]

用例

假设我们有这样一条日志:

[2017-04-01 22:21:21] production.INFO: this is a test log message by leon

我们可以在filter中使用grok来做解析:

grok {
      match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{DATA:env}\.%{DATA:log_level}: %{DATA:content}" }
    }

解析的结果为

{
      "env" => "production",
      "timestamp" => "2017-04-01 22:21:21",
      "log_level" => "INFO",
      "content" => "{\"message\":\"[2017-04-01 22:21:21] production.INFO: this is a test log message by leon\"}"
}

假设我们希望

  1. 能把log_level为INFO的日志丢弃掉,但又不想让该字段出现在最终的输出中
  2. 输出的索引名中能体现出env,但也不想让该字段出现在输出结果里

对于1,一种方案是在输出之前通过mutate插件把不需要的字段删除掉,但是一旦这样的处理多了,会让配置文件变得“不干净”。

通过 metadata,我们可以轻松地处理这些问题:

grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{DATA:[@metadata][env]}\.%{DATA:[@metadata][log_level]}: %{DATA:content}" }
}

if [@metadata][log_level] == "INFO"{
    drop{}    
}

output{
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "%{[@metadata][env]}-log-%{+YYYY.MM}"
        document_type => "_doc"
    }
}

除了简化我们的配置文件、减少冗余字段意外,同时也能提高logstash的处理速度。

Elasticsearch input插件

有些插件会用到metadata这个特性,比如elasticsearch input插件:

input {
  elasticsearch {
    host => "127.0.0.1"
    # 把 ES document metadata (_index, _type, _id) 包存到 @metadata 中
    docinfo_in_metadata => true
  }
}

filter{
    ......
}

output {
  elasticsearch {
    document_id => "%{[@metadata][_id]}"
    index => "transformed-%{[@metadata][_index]}"
    type => "%{[@metadata][_type]}"
  }
}

调试

一般来说metadata是不会出现在输出中的,除非使用 rubydebug codec 的方式输出:

output { 
  stdout { 
    codec  => rubydebug {
      metadata => true
    }
  }
}

日志经过处理后输出中会包含:

{
    ....,
    "@metadata" => {
        "env" => "production",
        "log_level" => "INFO"
    }
}

总结

由上可见,metadata提供了一种简单、方便的方式来保存中间数据。这样一方面减少了logstash配置文件的复杂性:避免调用remove_field,另一方面也减少了输出中的一些不必要的数据。通过这篇对metadata的介绍,希望能对大家有所帮助。

elasticTalk,qrcode

Curator从入门到实战

ElasticsearchLeon J 发表了文章 • 2 个评论 • 1722 次浏览 • 2018-08-30 22:05 • 来自相关话题

Curator 是elasticsearch 官方的一个索引管理工具,可以通过配置文件的方式帮助我们对指定的一批索引进行创建/删除、打开/关闭、快照/恢复等管理操作。

场景

比如,出于读写性能的考虑,我们通常会把基于时间的数据按时间来创建索引。

indices当数据量到达一定量级时,为了节省内存或者磁盘空间,我们往往会根据实际情况选择关闭或者删除一定时间之前的索引。通常我们会写一段脚本调用elasticsearch的api,放到crontab中定期执行。这样虽然可以达到目的,但是脚本多了之后会变得难以维护。

Curator是如何解决这类问题的呢?我们一步一步来:

安装

首先,Curator是基于python实现的,我们可以直接通过pip来安装,这种方式最简单。

pip install elasticsearch-curator

基本配置

接下来,需要为 Curator 配置es连接:

# ~/.curator/curator.yml

client:
  hosts:
    - 127.0.0.1
  port: 9200

logging:
  loglevel: INFO

其中hosts 允许配置多个地址,但是只能属于同一个集群。

这边只列举了最基本的配置,官方文档中包含了更详细的配置。

动作配置

然后需要配置我们需要执行的动作,每个动作会按顺序执行:

# /etc/curator/actions/maintain_log.yml

actions:
  1:
    #创建第二天的索引
    action: create_index
    description: "create new time-based index for log-*"
    options:
      name: '<log-{now/d+1d}>'
  2:
    #删除3天前的索引
    action: delete_indices
    description: "delete outdated indices for log-*"
    filters:
    - filtertype: pattern
      kind: prefix
      value: log
    - filtertype: age
      source: name
      direction: older
      timestring: '%Y.%m.%d'
      unit: days
      unit_count: 3

action 定义了需要执行的动作,curator支持十多种动作,可以在官方文档查看完整的动作列表。

options 定义了执行动作所需的参数,不同动作的参数也不尽相同,具体文档中都有写明。

filters 定义了动作的执行对象,通过设置filter,可以过滤出我们需要操作的索引。同一个action下的filter之间是的关系。比如在上面的定义中,delete_indices下定义了两个filters:

  • 模式匹配:匹配前缀为log的索引
  • “年龄”匹配:根据索引名中“%Y.%m.%d”时间格式,过滤出3天以前的索引

curator支持十多种filter,可以在官方文档查看完整列表。

执行

最后,我们通过curator命令行工具来执行:

curator --config /etc/curator/curator.yml /etc/curator/actions/maintain_log.yml

得到命令行输出:

2018-08-30 12:31:26,829 INFO      Preparing Action ID: 1, "create_index"
2018-08-30 12:31:26,841 INFO      Trying Action ID: 1, "create_index": create new time-based index for log-*
2018-08-30 12:31:26,841 INFO      "<log-{now/d+1d}>" is using Elasticsearch date math.
2018-08-30 12:31:26,841 INFO      Creating index "<log-{now/d+1d}>" with settings: {}
2018-08-30 12:31:27,049 INFO      Action ID: 1, "create_index" completed.
2018-08-30 12:31:27,050 INFO      Preparing Action ID: 2, "delete_indices"
2018-08-30 12:31:27,058 INFO      Trying Action ID: 2, "delete_indices": delete outdated indices for log-*
2018-08-30 12:31:27,119 INFO      Deleting selected indices: ['log-2018.08.24', 'log-2018.08.25', 'log-2018.08.27', 'log-2018.08.26', 'log-2018.08.23']
2018-08-30 12:31:27,119 INFO      ---deleting index log-2018.08.24
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.25
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.27
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.26
2018-08-30 12:31:27,120 INFO      ---deleting index log-2018.08.23
2018-08-30 12:31:27,282 INFO      Action ID: 2, "delete_indices" completed.
2018-08-30 12:31:27,283 INFO      Job completed.

从日志中可以看到,我们已经成功创建了隔天的索引,并删除了28号以前的索引。

定时执行

配置好curator后,还需要配置定时任务

使用crontab -e编辑crontab,

添加一行:

0 23 * * * /usr/local/bin/curator --config /root/.curator/curator.yml /etc/curator/actions/maintain_log.yml >> /var/curator.log 2>&1

crontab配置中的第一段是执行的周期,6个值分别是“分 时 日 月 周”,*表示全部。所以这段配置的含义是在每天23点执行我们的这段脚本。

单个执行

除了定时任务,我们也可以在不依赖action配置文件的情况下用curator执行一些临时的批量操作。curator提供了curator_cli的命令来执行单个action,比如我们想对所有log开头的索引做快照,使用一条命令即可完成:

curator_cli snapshot --repository repo_name --filter_list {"filtertype": "pattern","kind": "prefix", "value": "log"}

是不是特别方便?

执行流程

image-20180830200126973

在命令执行过程中,Curator 会进行以下几步操作:

  1. 从ES拉取所有的索引信息
  2. 根据设置的过滤条件过滤出需要操作的索引
  3. 对过滤后的索引执行指定的动作

复杂需求

实际生产中,会有一些更复杂的需求,简单的action和filter组合并不能满足我们的业务。Curator还提供了python包,方便我们自己写脚本时调用它提供的actions和filters,减少我们的开发工作量。

以上通过一个实际的场景向大家介绍了Curator的使用方式,但是只用到了它一小部分的功能。大家可以通过文中的链接查看官方文档,发掘出更多的使用姿势。希望对大家有所帮助!

elasticTalk,qrcode

听说你还没掌握 Normalizer 的使用方法?

Elasticsearchrockybean 发表了文章 • 1 个评论 • 413 次浏览 • 2018-08-28 12:43 • 来自相关话题

在 Elasticsearch 中处理字符串类型的数据时,如果我们想把整个字符串作为一个完整的 term 存储,我们通常会将其类型 type 设定为 keyword。但有时这种设定又会给我们带来麻烦,比如同一个数据再写入时由于没有做好清洗,导致大小写不一致,比如 appleApple两个实际都是 apple,但当我们去搜索 apple时却无法返回 Apple的文档。要解决这个问题,就需要 Normalizer出场了。废话不多说,直接上手看!

1. 上手

我们先来重现一下开篇的问题

PUT test_normalizer
{
  "mappings": {
    "doc":{
      "properties": {
        "type":{
          "type":"keyword"
        }
      }
    }
  }
}

PUT test_normalizer/doc/1
{
  "type":"apple"
}

PUT test_normalizer/doc/2
{
  "type":"Apple"
}

# 查询一 
GET test_normalizer/_search
{
  "query": {
    "match":{
      "type":"apple"
    }
  }
}

# 查询二
GET test_normalizer/_search
{
  "query": {
    "match":{
      "type":"aPple"
    }
  }
}

大家执行后会发现 查询一返回了文档1,而 查询二没有文档返回,原因如下图所示:

  1. Docs写入 Elasticsearch时由于 typekeyword,分词结果为原始字符串
  2. 查询 Query 时分词默认是采用和字段写时相同的配置,因此这里也是 keyword,因此分词结果也是原始字符
  3. 两边的分词进行匹对,便得出了我们上面的结果

2. Normalizer

normalizerkeyword的一个属性,可以对 keyword生成的单一 Term再做进一步的处理,比如 lowercase,即做小写变换。使用方法和自定义分词器有些类似,需要自定义,如下所示:

DELETE test_normalizer
# 自定义 normalizer
PUT test_normalizer
{
  "settings": {
    "analysis": {
      "normalizer": {
        "lowercase": {
          "type": "custom",
          "filter": [
            "lowercase"
          ]
        }
      }
    }
  },
  "mappings": {
    "doc": {
      "properties": {
        "type": {
          "type": "keyword"
        },
        "type_normalizer": {
          "type": "keyword",
          "normalizer": "lowercase"
        }
      }
    }
  }
}

PUT test_normalizer/doc/1
{
  "type": "apple",
  "type_normalizer": "apple"
}

PUT test_normalizer/doc/2
{
  "type": "Apple",
  "type_normalizer": "Apple"
}
# 查询三
GET test_normalizer/_search
{
  "query": {
    "term":{
      "type":"aPple"
    }
  }
}

# 查询四
GET test_normalizer/_search
{
  "query": {
    "term":{
      "type_normalizer":"aPple"
    }
  }
}

我们第一步是自定义了名为 lowercase的 normalizer,其中filter 类似自定义分词器中的 filter ,但是可用的种类很少,详情大家可以查看官方文档。然后通过 normalizer属性设定到字段type_normalizer中,然后插入相同的2条文档。执行发现,查询三无结果返回,查询四返回2条文档。

问题解决了!我们来看下是如何解决的

  1. 文档写入时由于加入了 normalizer,所有的 term都会被做小写处理
  2. 查询时搜索词同样采用有 normalizer的配置,因此处理后的 term也是小写的
  3. 两边分词匹对,就得到了我们上面的结果

3. 总结

本文通过一个实例来给大家讲解了 Normalizer的实际使用场景,希望对大家有所帮助!

掌握 analyze API,一举搞定 Elasticsearch 分词难题

Elasticsearchrockybean 发表了文章 • 1 个评论 • 1332 次浏览 • 2018-08-25 22:44 • 来自相关话题

初次接触 Elasticsearch 的同学经常会遇到分词相关的难题,比如如下这些场景:

  1. 为什么明明有包含搜索关键词的文档,但结果里面就没有相关文档呢?
  2. 我存进去的文档到底被分成哪些词(term)了?
  3. 我得自定义分词规则,但感觉好麻烦呢,无从下手

如果你遇到过类似的问题,希望本文可以解决你的疑惑。

1. 上手

让我们从一个实例出发,如下创建一个文档:

PUT test/doc/1
{
  "msg":"Eating an apple a day keeps doctor away"
}

然后我们做一个查询,我们试图通过搜索 eat这个关键词来搜索这个文档

POST test/_search
{
  "query":{
    "match":{
      "msg":"eat"
    }
  }
}

ES的返回结果为0。这不太对啊,我们用最基本的字符串查找也应该能匹配到上面新建的文档才对啊!

各位不要急,我们先来看看什么是分词。

2. 分词

搜索引擎的核心是倒排索引(这里不展开讲),而倒排索引的基础就是分词。所谓分词可以简单理解为将一个完整的句子切割为一个个单词的过程。在 es 中单词对应英文为 term。我们简单看个例子:

ES 的倒排索引即是根据分词后的单词创建,即 北京天安门这4个单词。这也意味着你在搜索的时候也只能搜索这4个单词才能命中该文档。

实际上 ES 的分词不仅仅发生在文档创建的时候,也发生在搜索的时候,如下图所示:

读时分词发生在用户查询时,ES 会即时地对用户输入的关键词进行分词,分词结果只存在内存中,当查询结束时,分词结果也会随即消失。而写时分词发生在文档写入时,ES 会对文档进行分词后,将结果存入倒排索引,该部分最终会以文件的形式存储于磁盘上,不会因查询结束或者 ES 重启而丢失。

ES 中处理分词的部分被称作分词器,英文是Analyzer,它决定了分词的规则。ES 自带了很多默认的分词器,比如StandardKeywordWhitespace等等,默认是 Standard。当我们在读时或者写时分词时可以指定要使用的分词器。

3. 写时分词结果

回到上手阶段,我们来看下写入的文档最终分词结果是什么。通过如下 api 可以查看:

POST test/_analyze
{
  "field": "msg",
  "text": "Eating an apple a day keeps doctor away"
}

其中 test为索引名,_analyze 为查看分词结果的 endpoint,请求体中 field 为要查看的字段名,text为具体值。该 api 的作用就是请告诉我在 test 索引使用 msg 字段存储一段文本时,es 会如何分词。

返回结果如下:

{
  "tokens": [
    {
      "token": "eating",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    },
    {
      "token": "an",
      "start_offset": 7,
      "end_offset": 9,
      "type": "<ALPHANUM>",
      "position": 1
    },
    {
      "token": "apple",
      "start_offset": 10,
      "end_offset": 15,
      "type": "<ALPHANUM>",
      "position": 2
    },
    {
      "token": "a",
      "start_offset": 16,
      "end_offset": 17,
      "type": "<ALPHANUM>",
      "position": 3
    },
    {
      "token": "day",
      "start_offset": 18,
      "end_offset": 21,
      "type": "<ALPHANUM>",
      "position": 4
    },
    {
      "token": "keeps",
      "start_offset": 22,
      "end_offset": 27,
      "type": "<ALPHANUM>",
      "position": 5
    },
    {
      "token": "doctor",
      "start_offset": 28,
      "end_offset": 34,
      "type": "<ALPHANUM>",
      "position": 6
    },
    {
      "token": "away",
      "start_offset": 35,
      "end_offset": 39,
      "type": "<ALPHANUM>",
      "position": 7
    }
  ]
}

返回结果中的每一个 token即为分词后的每一个单词,我们可以看到这里是没有 eat 这个单词的,这也解释了在上手中我们搜索 eat 没有结果的情况。如果你去搜索 eating ,会有结果返回。

写时分词器需要在 mapping 中指定,而且一经指定就不能再修改,若要修改必须新建索引。如下所示我们新建一个名为ms_english 的字段,指定其分词器为 english

PUT test/_mapping/doc
{
  "properties": {
    "msg_english":{
      "type":"text",
      "analyzer": "english"
    }
  }
}

4. 读时分词结果

由于读时分词器默认与写时分词器默认保持一致,拿 上手 中的例子,你搜索 msg 字段,那么读时分词器为 Standard ,搜索 msg_english 时分词器则为 english。这种默认设定也是非常容易理解的,读写采用一致的分词器,才能尽最大可能保证分词的结果是可以匹配的。

然后 ES 允许读时分词器单独设置,如下所示:

POST test/_search
  {
    "query":{
      "match":{
        "msg":{
          "query": "eating",
          "analyzer": "english"
        }
      }
    }
  }

如上 analyzer 字段即可以自定义读时分词器,一般来讲不需要特别指定读时分词器。

如果不单独设置分词器,那么读时分词器的验证方法与写时一致;如果是自定义分词器,那么可以使用如下的 api 来自行验证结果。

POST _analyze
  {
    "text":"eating",
    "analyzer":"english"
  }

返回结果如下:

{
  "tokens": [
    {
      "token": "eat",
      "start_offset": 0,
      "end_offset": 6,
      "type": "<ALPHANUM>",
      "position": 0
    }
  ]
}

由上可知 english分词器会将 eating处理为 eat,大家可以再测试下默认的 standard分词器,它没有做任何处理。

5. 解释问题

现在我们再来看下 上手 中所遇问题的解决思路。

  1. 查看文档写时分词结果
  2. 查看查询关键词的读时分词结果
  3. 匹对两者是否有命中

我们简单分析如下:

由上图可以定位问题的原因了。

6. 解决需求

由于 eating只是 eat的一个变形,我们依然希望输入 eat时可以匹配包含 eating的文档,那么该如何解决呢?

答案很简单,既然原因是在分词结果不匹配,那么我们就换一个分词器呗~ 我们可以先试下 ES 自带的 english分词器,如下:

# 增加字段 msg_english,与 msg 做对比
PUT test/_mapping/doc
{
  "properties": {
    "msg_english":{
      "type":"text",
      "analyzer": "english"
    }
  }
}

# 写入相同文档
PUT test/doc/1
{
  "msg":"Eating an apple a day keeps doctor away",
  "msg_english":"Eating an apple a day keeps doctor away"
}

# 搜索 msg_english 字段
POST test/_search
{
  "query": {
    "match": {
      "msg_english": "eat"
    }
  }
}

执行上面的内容,我们会发现结果有内容了,原因也很简单,如下图所示:

由上图可见 english分词器会将 eating分词为 eat,此时我们搜索 eat或者 eating肯定都可以匹配对应的文档了。至此,需求解决。

7. 深入分析

最后我们来看下为什么english分词器可以解决我们遇到的问题。一个分词器由三部分组成:char filter、tokenizer 和 token filter。各部分的作用我们这里就不展开了,我们来看下 standardenglish分词器的区别。

从上图可以看出,english分词器在 Token Filter 中和 Standard不同,而发挥主要作用的就是 stemmer,感兴趣的同学可以自行去看起它的作用。

8. 自定义分词

如果我们不使用 english分词器,自定义一个分词器来实现上述需求也是完全可行的,这里不详细讲解了,只给大家讲一个快速验证自定义分词器效果的方法,如下:

POST _analyze
{
  "char_filter": [], 
  "tokenizer": "standard",
  "filter": [
    "stop",
    "lowercase",
    "stemmer"
  ],
  "text": "Eating an apple a day keeps doctor away"
}

通过上面的 api 你可以快速验证自己要定制的分词器,当达到自己需求后,再将这一部分配置加入索引的配置。

至此,我们再看开篇的三个问题,相信你已经心里有答案了,赶紧上手去自行测试下吧!

【视频】ElasticTalk#4 Elastic认证考试那些事儿

资讯动态rockybean 发表了文章 • 2 个评论 • 765 次浏览 • 2018-07-27 07:56 • 来自相关话题

Elastic 在今年6月29日推出了面向 Elasticsearch 工程师的认证考试,官方描述如下: The Elastic Certification Program was created to recognize individuals who have demonstrated a high-level of knowledge, competence and expertise with Elasticsearch. Elastic Certified Professionals demonstrate these skills by completing challenging and relevant real-world tasks on a live Elastic Stack cluster in our hands-on, performance-based certification exams. 我们此次直播便邀请中国第1位通过该认证的工程师 rockybean 来分享下认证考试的一些信息,通过这次直播,你可以了解如下信息:
  1. 如何注册考试?费用?
  2. 如何准备考试?
  3. 考试的形式是怎样的?有哪些类型的考题?
本次直播由于设备问题,声音有些卡顿,大家见谅! 视频链接如下: http://v.qq.com/x/page/f073779epxd.html  

【直播预告】ElasticTalk #4 Elastic 官方认证考试那些事儿

资料分享rockybean 发表了文章 • 0 个评论 • 477 次浏览 • 2018-07-23 08:33 • 来自相关话题

大家好,ElasticTalk 第4次直播将于本周进行,主题是关于 Elastic 官方认证考试的。   我于7月初成功通过了 Elastic Certified Engineer 的考试,拿到下面的徽章。
bin_certificate.png
  感兴趣的同学可以扫下面海报中的二维码或者搜索 elastic-talk 微信号,添加好友后进入直播群。  
phone_post.001_.jpeg
 
大家好,ElasticTalk 第4次直播将于本周进行,主题是关于 Elastic 官方认证考试的。   我于7月初成功通过了 Elastic Certified Engineer 的考试,拿到下面的徽章。
bin_certificate.png
  感兴趣的同学可以扫下面海报中的二维码或者搜索 elastic-talk 微信号,添加好友后进入直播群。  
phone_post.001_.jpeg
 

ET001 不可不掌握的 Logstash 使用技巧

Logstashrockybean 发表了文章 • 3 个评论 • 616 次浏览 • 2018-07-21 11:53 • 来自相关话题

Logstash 是 Elastic Stack 中功能最强大的 ETL 工具,相较于 beats 家族,虽然它略显臃肿,但是强在功能丰富、处理能力强大。大家在使用的过程中肯定也体验过其启动时的慢吞吞,那么有什么办法可以减少等待 Logstash 的启动时间,提高编写其处理配置文件的效率呢?本文给大家推荐一个小技巧,帮助大家解决如下两个问题,让大家更好地与这个笨重的大家伙相处。

  1. 减少 Logstash 重启的次数,也就节省宝贵的时间
  2. 方便快捷地向 Logstash 输入需要处理的内容

1. 打开 reload 配置开关

Logstash 启动的时候可以加上 -r 的参数来做到配置文件热加载,效果是:

  • 当你修改了配置文件后,无需重启 Logstash 即可让新配置文件生效。

它的含义如下:

当你写好配置文件,比如 test.conf ,启动命令如下:

bin/logstash -f test.conf -r

启动完毕,修改 test.conf 的内容并保存后,过 1 秒钟,你会发现 Logstash 端有类似如下日志输出(注意红色框标记的部分),此时说明 reload 的成功。

如果你修改的配置文件有错误,会看到报错的日志,你可以根据错误提示修改。

至此,第一个问题解决!

2. 使用 HTTP INPUT

编写配置文件的另一个痛点是需要针对不同格式的输入内容进行详细的测试,以防解析报错的情况出现。此时大家常用标准输入来解决这个问题(stdin input),但是标准输入对于文字编辑支持不太友好,而且配置文件热更新的功能也不支持标准输入。

在这里向大家推荐使用 http input 插件,配置如下:

input{
    http{
        port => 7474
        codec => "json"
    }
}

然后大家再用自己喜欢的 http 请求工具,比如 POSTMan、Insomnia 等向 http://loclahost:7474发送待测试内容即可,如下是 Insomnia 的截图。

至此,第二个问题也解决了。

3. 总结

相信看到这里,大家一定是跃跃欲试了,赶紧打开电脑,找到 Logstash,然后编辑 test.conf,输入如下内容:

input{
    http{
        port => 7474
        codec => "json"
    }
}

filter{

}

output{
        stdout{
        codec => rubydebug{
            metadata => true
        }
    }
}

然后执行启动命令:

bin/logstash -f test.conf -r

打开 Insomnia ,输入要测试的内容,点击发送,开始舒爽流畅的配置文件编写之旅吧!

ElasticTalk #3 Elasticsearch压测实战 II esrally 进阶实战

资料分享rockybean 发表了文章 • 0 个评论 • 298 次浏览 • 2018-07-20 19:51 • 来自相关话题

ElasticTalk 第3期 直播的内容是 Elasticsearch 压测实战之 esrally 进阶实战。 本次我们主要讲解了 esrally 如何自定义测试集群、自定义数据集和报告,最后还讲了三步上手 esrally 的方法。   视频地址如下: http://www.bilibili.com/video/av27117279/
ElasticTalk 第3期 直播的内容是 Elasticsearch 压测实战之 esrally 进阶实战。 本次我们主要讲解了 esrally 如何自定义测试集群、自定义数据集和报告,最后还讲了三步上手 esrally 的方法。   视频地址如下: http://www.bilibili.com/video/av27117279/

ElasticTalk #2 Elasticsearch压测实战 I esrally 入门与实战

资料分享rockybean 发表了文章 • 0 个评论 • 323 次浏览 • 2018-07-20 19:49 • 来自相关话题

  ElasticTalk 第2期 直播的内容是 Elasticsearch 压测实战之 esrally 入门和实战。希望这次直播可以帮助大家快速掌握 esrally 这款优秀的 es 压测工具。   视频地址如下: https://www.bilibili.com/video/av27114309/
  ElasticTalk 第2期 直播的内容是 Elasticsearch 压测实战之 esrally 入门和实战。希望这次直播可以帮助大家快速掌握 esrally 这款优秀的 es 压测工具。   视频地址如下: https://www.bilibili.com/video/av27114309/

ElasticTalk #1 用 ElasticStack 快速收集和分析 Nginx 日志

资料分享rockybean 发表了文章 • 2 个评论 • 358 次浏览 • 2018-07-20 19:45 • 来自相关话题

  去年做了3期 ElasticTalk 的直播节目,预计下周开始恢复。现在放出相关的视频内容,希望对大家有所帮助。   第1期的课程内容为用 ElasticStack 快速收集分析 Nginx 日志,其中详细讲解了如何使用 filebeat 的 module 功能。     视频地址如下: https://www.bilibili.com/video/av27123368/
  去年做了3期 ElasticTalk 的直播节目,预计下周开始恢复。现在放出相关的视频内容,希望对大家有所帮助。   第1期的课程内容为用 ElasticStack 快速收集分析 Nginx 日志,其中详细讲解了如何使用 filebeat 的 module 功能。     视频地址如下: https://www.bilibili.com/video/av27123368/

Elasticsearch snapshot 备份的使用方法

Elasticsearchrockybean 发表了文章 • 1 个评论 • 1479 次浏览 • 2018-05-31 23:23 • 来自相关话题

常见的数据库都会提供备份的机制,以解决在数据库无法使用的情况下,可以开启新的实例,然后通过备份来恢复数据减少损失。虽然 Elasticsearch 有良好的容灾性,但由于以下原因,其依然需要备份机制。

  1. 数据灾备。在整个集群无法正常工作时,可以及时从备份中恢复数据。
  2. 归档数据。随着数据的积累,比如日志类的数据,集群的存储压力会越来越大,不管是内存还是磁盘都要承担数据增多带来的压力,此时我们往往会选择只保留最近一段时间的数据,比如1个月,而将1个月之前的数据删除。如果你不想删除这些数据,以备后续有查看的需求,那么你就可以将这些数据以备份的形式归档。
  3. 迁移数据。当你需要将数据从一个集群迁移到另一个集群时,也可以用备份的方式来实现。

Elasticsearch 做备份有两种方式,一是将数据导出成文本文件,比如通过 elasticdumpesm 等工具将存储在 Elasticsearch 中的数据导出到文件中。二是以备份 elasticsearch data 目录中文件的形式来做快照,也就是 Elasticsearch 中 snapshot 接口实现的功能。第一种方式相对简单,在数据量小的时候比较实用,当应对大数据量场景效率就大打折扣。我们今天就着重讲解下第二种备份的方式,即 snapshot api 的使用。

备份要解决备份到哪里、如何备份、何时备份和如何恢复的问题,那么我们接下来一个个解决。

1. 备份到哪里

在 Elasticsearch 中通过 repository 定义备份存储类型和位置,存储类型有共享文件系统、AWS 的 S3存储、HDFS、微软 Azure的存储、Google Cloud 的存储等,当然你也可以自己写代码实现国内阿里云的存储。我们这里以最简单的共享文件系统为例,你也可以在本地做实验。

首先,你要在 elasticsearch.yml 的配置文件中注明可以用作备份路径 path.repo ,如下所示:

path.repo: ["/mount/backups", "/mount/longterm_backups"]

配置好后,就可以使用 snapshot api 来创建一个 repository 了,如下我们创建一个名为 my_backup 的 repository。

PUT /_snapshot/my_backup
{
  "type": "fs",
  "settings": {
    "location": "/mount/backups/my_backup"
  }
}

之后我们就可以在这个 repository 中来备份数据了。

2. 如何备份

有了 repostiroy 后,我们就可以做备份了,也叫快照,也就是记录当下数据的状态。如下所示我们创建一个名为 snapshot_1 的快照。

PUT /_snapshot/my_backup/snapshot_1?wait_for_completion=true

wait_for_completion 为 true 是指该 api 在备份执行完毕后再返回结果,否则默认是异步执行的,我们这里为了立刻看到效果,所以设置了该参数,线上执行时不用设置该参数,让其在后台异步执行即可。

执行成功后会返回如下结果,用于说明备份的情况:

{
  "snapshots": [
    {
      "snapshot": "snapshot_1",
      "uuid": "52Lr4aFuQYGjMEv5ZFeFEg",
      "version_id": 6030099,
      "version": "6.3.0",
      "indices": [
        ".monitoring-kibana-6-2018.05.30",
        ".monitoring-es-6-2018.05.28",
        ".watcher-history-7-2018.05.30",
        ".monitoring-beats-6-2018.05.29",
        "metricbeat-6.2.4-2018.05.28",
        ".monitoring-alerts-6",
        "metricbeat-6.2.4-2018.05.30"
      ],
      "include_global_state": true,
      "state": "SUCCESS",
      "start_time": "2018-05-31T12:45:57.492Z",
      "start_time_in_millis": 1527770757492,
      "end_time": "2018-05-31T12:46:15.214Z",
      "end_time_in_millis": 1527770775214,
      "duration_in_millis": 17722,
      "failures": [],
      "shards": {
        "total": 28,
        "failed": 0,
        "successful": 28
      }
    }
  ]
}

返回结果的参数意义都是比较直观的,比如 indices 指明此次备份涉及到的索引名称,由于我们没有指定需要备份的索引,这里备份了所有索引;state 指明状态;duration_in_millis 指明备份任务执行时长等。

我们可以通过 GET _snapshot/my_backup/snapshot_1获取 snapshot_1 的执行状态。

此时如果去 /mount/backups/my_backup 查看,会发现里面多了很多文件,这些文件其实都是基于 elasticsearch data 目录中的文件生成的压缩存储的备份文件。大家可以通过 du -sh . 命令看一下该目录的大小,方便后续做对比。

3. 何时备份

通过上面的步骤我们成功创建了一个备份,但随着数据的新增,我们需要对新增的数据也做备份,那么我们如何做呢?方法很简单,只要再创建一个快照 snapshot_2 就可以了。

PUT /_snapshot/my_backup/snapshot_2?wait_for_completion=true

当执行完毕后,你会发现 /mount/backups/my_backup 体积变大了。这说明新数据备份进来了。要说明的一点是,当你在同一个 repository 中做多次 snapshot 时,elasticsearch 会检查要备份的数据 segment 文件是否有变化,如果没有变化则不处理,否则只会把发生变化的 segment file 备份下来。这其实就实现了增量备份。

elasticsearch 的资深用户应该了解 force merge 功能,即可以强行将一个索引的 segment file 合并成指定数目,这里要注意的是如果你主动调用 force merge api,那么 snapshot 功能的增量备份功能就失效了,因为 api 调用完毕后,数据目录中的所有 segment file 都发生变化了。

另一个就是备份时机的问题,虽然 snapshot 不会占用太多的 cpu、磁盘和网络资源,但还是建议大家尽量在闲时做备份。

4. 如何恢复

所谓“养兵千日,用兵一时”,我们该演练下备份的成果,将其恢复出来。通过调用如下 api 即可快速实现恢复功能。

POST /_snapshot/my_backup/snapshot_1/_restore?wait_for_completion=true
{
  "indices": "index_1",
  "rename_replacement": "restored_index_1"
}

通过上面的 api,我们可以将 index_1 索引恢复到 restored_index_1 中。这个恢复过程完全是基于文件的,因此效率会比较高。

虽然我们这里演示的是在同一个集群做备份与恢复,你也可以在另一个集群上连接该 repository 做恢复。我们这里就不做说明了。

5. 其他

由于 Elasticsearch 版本更新比较快,因此大家在做备份与恢复的时候,要注意版本问题,同一个大版本之间的备份与恢复是没有问题的,比如都是 5.1 和 5.6 之间可以互相备份恢复。但你不能把一个高版本的备份在低版本恢复,比如将 6.x 的备份在 5.x 中恢复。而低版本备份在高版本恢复有一定要求:

1) 5.x 可以在 6.x 恢复

2) 2.x 可以在 5.x 恢复

3) 1.x 可以在 2.x 恢复

其他跨大版本的升级都是不可用的,比如1.x 的无法在 5.x 恢复。这里主要原因还是 Lucene 版本问题导致的,每一次 ES 的大版本升级都会伴随 Lucene 的大版本,而 Lucene 的版本是尽量保证向前兼容,即新版可以读旧版的文件,但版本跨越太多,无法实现兼容的情况也在所难免了。

6. 继续学习

本文只是简单对 snapshot 功能做了一个演示,希望这足够引起你的兴趣。如果你想进一步深入的了解该功能,比如备份的时候如何指定部分索引、如何查询备份和还原的进度、如何跨集群恢复数据、如何备份到 HDFS 等,可以详细阅读官方手册https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-snapshots.html,如果在使用的过程中遇到了问题,欢迎留言讨论。

Elasticsearch如何实现 SQL语句中 Group By 和 Limit 的功能

Elasticsearchrockybean 发表了文章 • 2 个评论 • 862 次浏览 • 2018-05-21 07:45 • 来自相关话题

有 SQL 背景的同学在学习 Elasticsearch 时,面对一个查询需求,不由自主地会先思考如何用 SQL 来实现,然后再去想 Elasticsearch 的 Query DSL 如何实现。那么本篇就给大家讲一条常见的 SQL 语句如何用 Elasticsearch 的查询语言实现。

1. SQL语句

假设我们有一个汽车的数据集,每个汽车都有车型、颜色等字段,我希望获取颜色种类大于1个的前2车型。假设汽车的数据模型如下:

{
    "model":"modelA",
    "color":"red"
}

假设我们有一个 cars 表,通过如下语句创建测试数据。

INSERT INTO cars (model,color) VALUES ('A','red'); 
INSERT INTO cars (model,color) VALUES ('A','white'); 
INSERT INTO cars (model,color) VALUES ('A','black'); 
INSERT INTO cars (model,color) VALUES ('A','yellow'); 
INSERT INTO cars (model,color) VALUES ('B','red'); 
INSERT INTO cars (model,color) VALUES ('B','white'); 
INSERT INTO cars (model,color) VALUES ('C','black'); 
INSERT INTO cars (model,color) VALUES ('C','red'); 
INSERT INTO cars (model,color) VALUES ('C','white'); 
INSERT INTO cars (model,color) VALUES ('C','yellow'); 
INSERT INTO cars (model,color) VALUES ('C','blue'); 
INSERT INTO cars (model,color) VALUES ('D','red');
INSERT INTO cars (model,color) VALUES ('A','red'); 

那么实现我们需求的 SQL 语句也比较简单,实现如下:

SELECT model,COUNT(DISTINCT color) color_count FROM cars GROUP BY model HAVING color_count > 1 ORDER BY color_count desc LIMIT 2;

这条查询语句中 Group By 是按照 model 做分组, Having color_count>1 限定了车型颜色种类大于1,ORDER BY color_count desc 限定结果按照颜色种类倒序排列,而 LIMIT 2 限定只返回前3条数据。

那么在 Elasticsearch 中如何实现这个需求呢?

2. 在 Elasticsearch 模拟测试数据

首先我们需要先在 elasticsearch 中插入测试的数据,这里我们使用 bulk 接口 ,如下所示:

POST _bulk
{"index":{"_index":"cars","_type":"doc","_id":"1"}}
{"model":"A","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"2"}}
{"model":"A","color":"white"}
{"index":{"_index":"cars","_type":"doc","_id":"3"}}
{"model":"A","color":"black"}
{"index":{"_index":"cars","_type":"doc","_id":"4"}}
{"model":"A","color":"yellow"}
{"index":{"_index":"cars","_type":"doc","_id":"5"}}
{"model":"B","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"6"}}
{"model":"B","color":"white"}
{"index":{"_index":"cars","_type":"doc","_id":"7"}}
{"model":"C","color":"black"}
{"index":{"_index":"cars","_type":"doc","_id":"8"}}
{"model":"C","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"9"}}
{"model":"C","color":"white"}
{"index":{"_index":"cars","_type":"doc","_id":"10"}}
{"model":"C","color":"yellow"}
{"index":{"_index":"cars","_type":"doc","_id":"11"}}
{"model":"C","color":"blue"}
{"index":{"_index":"cars","_type":"doc","_id":"12"}}
{"model":"D","color":"red"}
{"index":{"_index":"cars","_type":"doc","_id":"13"}}
{"model":"A","color":"red"}

其中 index 为 cars,type 为 doc,所有数据与mysql 数据保持一致。大家可以在 Kibana 的 Dev Tools 中执行上面的命令,然后执行下面的查询语句验证数据是否已经成功存入。

GET cars/_search

3. Group By VS Terms/Metric Aggregation

SQL 中 Group By 语句在 Elasticsearch 中对应的是 Terms Aggregation,即分桶聚合,对应 Group By color 的语句如下所示:

GET cars/_search
{
  "size":0,
  "aggs":{
    "models":{
      "terms":{
        "field":"model.keyword"
      }
    }
  }
}

结果如下:

{
  "took": 161,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "A",
          "doc_count": 5
        },
        {
          "key": "C",
          "doc_count": 5
        },
        {
          "key": "B",
          "doc_count": 2
        },
        {
          "key": "D",
          "doc_count": 1
        }
      ]
    }
  }
}

我们看 aggregations 这个 key 下面的即为返回结果。

SQL 语句中还有一项是 COUNT(DISTINCT color) color_count 用于计算每个 model 的颜色数,在 Elasticsearch 中我们需要使用一个指标类聚合 Cardinality ,进行不同值计数。语句如下:

GET cars/_search
{
  "size": 0,
  "aggs": {
    "models": {
      "terms": {
        "field": "model.keyword"
      },
      "aggs": {
        "color_count": {
          "cardinality": {
            "field": "color.keyword"
          }
        }
      }
    }
  }
}

其返回结果如下:

{
  "took": 74,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "A",
          "doc_count": 5,
          "color_count": {
            "value": 4
          }
        },
        {
          "key": "C",
          "doc_count": 5,
          "color_count": {
            "value": 5
          }
        },
        {
          "key": "B",
          "doc_count": 2,
          "color_count": {
            "value": 2
          }
        },
        {
          "key": "D",
          "doc_count": 1,
          "color_count": {
            "value": 1
          }
        }
      ]
    }
  }
}

结果中 color_count 即为每个 model 的颜色数,但这里所有的模型都返回了,我们只想要颜色数大于1的模型,因此这里还要加一个过滤条件。

4. Having Condition VS Bucket Filter Aggregation

Having color_count > 1 在 Elasticsearch 中对应的是 Bucket Filter 聚合,语句如下所示:

GET cars/_search
{
  "size": 0,
  "aggs": {
    "models": {
      "terms": {
        "field": "model.keyword"
      },
      "aggs": {
        "color_count": {
          "cardinality": {
            "field": "color.keyword"
          }
        },
        "color_count_filter": {
          "bucket_selector": {
            "buckets_path": {
              "colorCount": "color_count"
            },
            "script": "params.colorCount>1"
          }
        }
      }
    }
  }
}

返回结果如下:

{
  "took": 39,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "A",
          "doc_count": 5,
          "color_count": {
            "value": 4
          }
        },
        {
          "key": "C",
          "doc_count": 5,
          "color_count": {
            "value": 5
          }
        },
        {
          "key": "B",
          "doc_count": 2,
          "color_count": {
            "value": 2
          }
        }
      ]
    }
  }
}

此时返回结果只包含颜色数大于1的模型,但大家会发现颜色数多的 C 不是在第一个位置,我们还需要做排序处理。

5. Order By Limit VS Bucket Sort Aggregation

ORDER BY color_count desc LIMIT 3 在 Elasticsearch 中可以使用 Bucket Sort 聚合实现,语句如下所示:

GET cars/_search
{
  "size": 0,
  "aggs": {
    "models": {
      "terms": {
        "field": "model.keyword"
      },
      "aggs": {
        "color_count": {
          "cardinality": {
            "field": "color.keyword"
          }
        },
        "color_count_filter": {
          "bucket_selector": {
            "buckets_path": {
              "colorCount": "color_count"
            },
            "script": "params.colorCount>1"
          }
        },
        "color_count_sort": {
          "bucket_sort": {
            "sort": {
              "color_count": "desc"
            },
            "size": 2
          }
        }
      }
    }
  }
}

返回结果如下:

{
  "took": 32,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 13,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "models": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "C",
          "doc_count": 5,
          "color_count": {
            "value": 5
          }
        },
        {
          "key": "A",
          "doc_count": 5,
          "color_count": {
            "value": 4
          }
        }
      ]
    }
  }
}

至此我们便将 SQL 语句实现的功能用 Elasticsearch 查询语句实现了。对比 SQL 语句与 Elasticsearch 的查询语句,大家会发现后者复杂了很多,但并非无章可循,随着大家对常见语法越来越熟悉,相信一定会越写越得心应手!