谈谈 ES 6.8 到 7.10 的功能变迁(1)- 性能优化篇
INFINI Labs 小助手 发表了文章 • 0 个评论 • 123 次浏览 • 10 小时前
## 前言
ES 7.10 可能是现在比较常见的 ES 版本。但是对于一些相迭代比较慢的早期业务系统来说,ES 6.8 是一个名副其实的“钉子户”。
借着工作内升级调研的任务东风,我整理从 ES 6.8 到 ES 7.10 **ELastic 重点列出**的新增功能和优化内容。将分为 6 个篇幅给大家详细阐述。
本系列文章主要针对 **Elasticsearch 传统的使用功能和基础的模块**,像是集群任务的管理、搜索、聚合还有字段类型这样的功能。对于付费功能或者全新的模块,比如:CCR、机器学习和数据流,这里不去深入探讨。
内容的主要来源于 Elastic [各个版本的发布信息](https://www.elastic.co/cn/blog/category/releases),这里主要比对 ES 6.8 版本到 7.10 版本的差异,并不一一枚举各个新的功能点出现的时间版本。
下面是第一篇:关于 ES 性能的优化
## ES 7.10 的性能优化
### 集群协调算法升级
基于 Elastic 博客提供的资料,Elasticsearch 7.0 的核心改进在于**集群协调层的彻底重构**,取代了旧版 Zen Discovery 的局限性,引入更健壮、自动化的分布式共识机制。从理论上来说这次优化有着不少的进步,可以显著提升了高可用性与运维效率
主要的优化点有下面三点:
1. **消除分裂脑(Split Brain)风险**:通过**自动化计算**,确保集群状态更新的安全性。旧版 `minimum_master_nodes` 的手动配置被移除,避免人为误操作。
2. **提升集群稳定性与恢复速度**:节点故障时,集群更快达成一致,减少服务中断窗口。
3. **简化运维复杂度**:可以动态扩缩容无需手动调整配置,系统自动管理选举配置。同时提供更清晰的日志和错误提示,加速故障诊断。
| **旧版配置** | **ES 7.0 配置** | **作用** |
| ------------------------------------ | ---------------------- | ----------------------------------------- |
| `discovery.zen.ping.unicast.hosts` | `discovery.seed_hosts` | 定义初始发现的种子节点列表(IP 或主机名) |
| `discovery.zen.minimum_master_nodes` | **已移除** | 由系统自动管理法定人数 |
而在优化的原则里,Elastic 更强调安全第一。比如,在半数以上主节点永久丢失的风险场景下,ES 7.0 之前的集群会静默等待恢复,允许通过启动新空节点强制恢复,这样可能会导致数据不一致或丢失。在 Elasticsearch 7.0 以及更高版本中,这种不安全活动受到了更多限制。集群宁愿保持不可用状态,也不会冒这种风险(除非使用 elasticsearch-node 恢复工具)。
这次优化显著**降低了人为错误的风险**:移除脆弱的手动配置,减少运维使用的理解成本。同时**提升关键业务连续性**:快速故障恢复与明确的容错机制,能适合更多场景需求。
当然也并不是尽善尽美的,也会存在大集群下投票节点过多导致竞争激烈而[无法选主的问题](https://mp.weixin.qq.com/s/jU8HCEf2E6hkz_1ZVH_GaQ),这种情况下,建议部署独立的主节点,并且可以考虑适当增大 cluster.election.duration 的配置。
### Top K 对检索的加速
这里的 Top K 主要是指在普通检索时展示前列的数据 Top K。也就是说 Elasticsearch 7.0 对检索数据的**查询性能**做了明显的改善。那是做了所有查询场景的提升么?
ELastic 做了这么一个场景假设:如果用户通常只关注搜索结果的第一页,且并不关心具体匹配的文档总数,对于超出一定数量的数据搜索引擎可以展示“超过 10,000 条结果”并提供分页浏览来优化搜索效率。但是在实际过程中用户常在查询中使用高频词(如“the”或“a”),这迫使 Elasticsearch 为大量文档计算评分,明显占用了查询资源的使用,即使这些常见词对相关性排序贡献甚微。

而现在,Elasticsearch 现在可以**跳过那些在早期阶段就被判定为不会进入结果集顶部的低排名记录的评分计算**,从而显著提升查询速度。这里主要涉及了 **block-max WAND 算法**的实现。这是一个复杂且漫长的优化过程,有兴趣的同学可以阅读一下这段[Magic WAND: Faster Retrieval of Top Hits in Elasticsearch](https://www.elastic.co/blog/fa ... x-wand)。

从 Elastic 的测试结果来看,新算法的优化**让 term 查询加速了 3-7 倍**。当然从场景背景可以看出,这个优化主要在大数据量下有明显效果(小数据量也不会有太多的日常高频词)。
### 默认开启 soft-delete 减少 translog
从 Elasticsearch 7.4 开始,副本的数据恢复,不再完全依赖 translog 了,而是通过索引的 soft-delete 特性(Elasticsearch 7.0 起所有新索引默认启用软删除 soft-deletes)。这样就可以缩小 translog 的使用场景,从而 **translog 的保留大小也可以减少**了。
那原来使用 translog 是什么样的呢?
translog 是 ES 用于保证数据安全性的重要工具。同时副分片进行恢复时,它也起着重要作用,**只要副分片待获取的差异数据是在 translog 所保留的数据范围内**,就可以只从 trasnlog 复制差异的部分数据,而不用拖取整个分片。在之前的版本中,Elasticsearch 默认会保留 512M 或 12 小时的 translog 用于副本恢复。
那现在使用的 soft-delete 是什么呢?
soft-deletes 是 Lucene 中实现的特性。这个软删除有时候会和 lucene 本身的标记删除概念发生混淆。为了方便理解,我们在这里归纳一下,lucene 实现删除的方式是一种标记删除的方式,而这种标记删除可以分为硬删除和软删除。软删除和硬删除有一个明显的区分点是:硬删除,被删除的文档对应的文档号用索引文件 \.liv 来描述。软删除 soft-delete,被标记为删除的文档不使用索引文件.liv 来描述,**而是通过索引文件 \.dvd \.dvm 来描述**。
这里再扩展一下,\.liv 文件主要实现 fixedbitset 数据结构。而 \.dvd \.dvm 则组合实现了 docvalue 这种正排数据结构。
正排索引的数据结构助力了 translog 的‘减负’,副本可以相对简便的通过软删除中的数据标记来实现数据恢复的处理。

相比较简洁高效的位图索引,docvalue 虽然实现了更多的功能,满足更多的场景,也会带来更多的问题。最明显的就是对于 update 操作,会导致 refresh 变得慢,有些压力场景下 refresh 会达到 10s 以上。
### 数值/日期排序查询加速
Elasticsearch 7.6 版本提升了按**日期或数值(即任何存储为有符号 64 位整数(long 类型)的字段)进行排序**的查询性能。
这背后的优化原理和之前 top K 使用的 Block-Max WAND 算法有点相似,都是利用算法**跳过非竞争性文档**来实现加速。
实际效果可能因环境而异,受多种参数影响。在 Elastic 进行的测试场景下,可以达到 35 倍的速度优化。
### FST 内存使用迁移到堆外
Elastic 7.3 版本实现了这个优化,是藏在 release note 里的彩蛋。
> Also mmap terms index (.tip) files for hybridfs #43150 (issue: #42838)
看似不经意的一行,但是带来效果却不小。FST 从堆内转移到堆外后,**JVM 的空间可以空余出很客观的一部分**。

> 一直以来,ES 堆中常驻内存中占据比重最大是 FST,即 tip(terms index) 文件占据的空间,1TB 索引大约占用 2GB 或者更多的内存,因此为了节点稳定运行,业界通常认为一个节点 open 的索引不超过 5TB。现在,从 ES 7.3 版本开始,将 tip 文件修改为通过 mmap 的方式加载,这使 FST 占据的内存从堆内转移到了堆外由操作系统的 pagecache 管理。
### 存储字段压缩优化
Elasticsearch 7.10 基于 Apache Lucene 8.7 引入了对存储字段(stored fields)的更高压缩率优化。不管是对于基于 DEFLATE 的 `index.codec: best_compression` 还是基于 LZ4 的`index.codec: default`都有不错的表现,在 Elastic 的测试场景下,最大可达到 10%的存储空间减少。
对于数据压缩 lucene 这次主要做了两个优化。
1. Elastic 研究发现在存储数据的时候,底层的 block 越大,压缩效果越好,因为中间被压缩的重复数据可能越多。但是大块的 block 也可能因为解码重复数据降低查询速度。
2. block 间通过共享字典来维持检索效率和数据压缩之间的平衡。
2.1. 首先为压缩算法提供一个数据字典,它也可以用于字符串重复数据删除。如果在要压缩的数据流和字典之间有许多重复的字符串,那么最终可以得到更好的压缩比。在解压缩时也通过字典来快速补足。

2.2. 同时,ES 使用更大的数据块,这些数据块本身被分成一个字典和 10 个子块,这些子块使用这个字典进行压缩。

而对于实际业务场景中,日志和监控数据的重复率往往会很好,因此在这两个场景中的压缩效果也是最明显的。
## 小结
当然,除了这几项外,ES 在各个版本中也做了不少优化,比如:调整 search.max_buckets 增加到 65534;Date histogram 聚合性能优化等等。有兴趣的同学可以参照各个版本的 [release highlight](https://www.elastic.co/guide/e ... s.html)
参考资料:
1. [Save space and money with improved storage efficiency in Elasticsearch 7.10](https://www.elastic.co/blog/sa ... h-7-10)
2. [Elasticsearch 7.3 的 offheap 原理](https://mp.weixin.qq.com/s/QviC_9ElknSS9kxXSMjjbg)
3. [Elasticsearch 7.4 的 soft-deletes 是个什么鬼](https://mp.weixin.qq.com/s/_l8JAtqK_NOSP8b7OqSVDg)
## 推荐阅读
- [谈谈 ES 6.8 到 7.10 的功能变迁(2)- 字段类型篇](https://infinilabs.cn/blog/202 ... part-2)
- [谈谈 ES 6.8 到 7.10 的功能变迁(3)- 查询方法篇](https://infinilabs.cn/blog/202 ... part-3)
- [谈谈 ES 6.8 到 7.10 的功能变迁(4)- 聚合功能篇](https://infinilabs.cn/blog/202 ... part-4)
- [谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理](https://infinilabs.cn/blog/202 ... part-5)
- [谈谈 ES 6.8 到 7.10 的功能变迁(6)- 其他](https://infinilabs.cn/blog/202 ... part-6)
> 作者:金多安,极限科技(INFINI Labs)搜索运维专家,Elastic 认证专家,搜索客社区日报责任编辑。一直从事与搜索运维相关的工作,日常会去挖掘 ES / Lucene 方向的搜索技术原理,保持搜索相关技术发展的关注。
> 原文:https://infinilabs.cn/blog/202 ... rt-1/
代理 Elasticsearch 服务:INFINI Gateway VS Nginx
INFINI Labs 小助手 发表了文章 • 0 个评论 • 719 次浏览 • 4 天前

## INFINI Gateway 简介
[INFINI Gateway](https://infinilabs.cn/products/gateway/) 是一款面向 Elasticsearch 的高性能应用网关,专为提升 Elasticsearch 集群的性能、安全性和可管理性而设计。它作为 Elasticsearch 的前置网关,能够处理所有客户端请求,并将其转发到后端的 Elasticsearch 集群,同时提供丰富的功能来优化请求处理和管理。此外还支持代理 Opensearch、[Easysearch](https://infinilabs.cn/products/easysearch/) 服务。
## Nginx 简介
Nginx 是一个高性能的 HTTP 和反向代理服务器,以其高并发处理能力、低内存消耗和稳定性著称,广泛应用于 Web 服务器、负载均衡和反向代理等场景。在 Elasticsearch 的使用场景里,也有小伙伴使用 Nginx 来代理 Elasticsearch 的服务,利用 Nginx 的负载均衡能力,将请求转发到多个 Elasticsearch 节点。
这两个软件都能代理 Elasticsearch 服务,但是他们有什么区别?我们来一起分析分析。
## 负载均衡
Elasticsearch 是分布式系统,提倡使用 round-robin 方式将请求发送到多个节点。不管是 Nginx 还是 INFINI Gateway 都默认使用 round-boin 方式转发请求,也都支持 weighted round-robin(加权轮询)方式进行请求转发,这点两者相当。
## 节点自动更新
Elasticsearch 集群可能会遇到添加、删除节点的情况,代理程序能否感知 Elasticsearch 节点的变化将变得非常关键。
在 Nginx 中,所有转发节点的 IP 地址都必须写入到配置文件中。如果 Elasticsearch 集群加入了新的节点进行请求处理,则需要 Nginx 编辑配置文件把新节点的 IP 地址加入其中,然后重启或重载 Nginx 服务,才能将请求分发到新的节点。反之如果有节点下线,也要编辑 Nginx 配置文件并重载服务。
INFINI Gateway 是面向 Elasticsearch 设计的应用网关,具有后端节点发现和更新的功能,能够感知 Elasticsearch 集群节点加入、离开的情况。开启节点发现和更新功能后,Gateway 会定期自动更新节点列表,将请求均匀转发到列表中的节点。可参考之前的[博客](https://infinilabs.cn/blog/202 ... teway/)开启节点自动更新。
⚠️ 注意:INFINI Gateway 默认后端节点发现和更新的功能为关闭状态。
## 定向转发请求
使用 Elasticsearch 集群的场景多种多样,如果想对转发的节点做进一步控制,可能需要根据不同条件进行节点筛选:
- IP 地址
- 节点角色
- 节点标签
## Nginx
Nginx 支持根据 IP 地址进行转发的,将需要转发的节点 IP 地址写入配置文件即可。
```plain
upstream es-cluster {
server 192.168.56.102:9200;
server 192.168.56.102:9201;
server 192.168.56.102:9202;
}
```
但不支持按节点角色、节点标签进行筛选,因为 Nginx 中并没有这种概念。
## INFINI Gateway
INFINI Gateway 支持按 IP 地址进行筛选:
- 不开节点发现:只转发到配置文件指定的节点(IP 地址)
- 开启节点发现:转发到所有发现的节点
```plain
flow:
- name: cache_first
filter:
- elasticsearch:
elasticsearch: prod
refresh:
enabled: true
interval: 30s
filter:
hosts:
exclude:
- 192.168.3.201:9200
include:
- 192.168.3.202:9200
- 192.168.3.203:9200
```
此外 Gateway 还支持通过节点角色、节点标签筛选转发节点。
```plain
flow:
- name: cache_first
filter:
- elasticsearch:
elasticsearch: prod
refresh:
enabled: true
interval: 30s
filter:
tags:
exclude:
- temp: cold
include:
- disk: ssd
roles:
exclude:
- master
include:
- data
- ingest
```
多种筛选条件可以同时使用,详细信息请查看官方[文档](https://docs.infinilabs.com/ga ... earch/)。
> 作者:**杨帆**,极限科技(INFINI Labs)高级解决方案架构师、《老杨玩搜索》栏目 B 站 UP 主,拥有十余年金融行业服务工作经验,熟悉 Linux、数据库、网络等领域。目前主要从事 Easysearch、Elasticsearch 等搜索引擎的技术支持工作,服务国内私有化部署的客户。
> 原文:https://infinilabs.cn/blog/202 ... ginx/
是否可以为es的slowlog传入多个id值
kin122 回复了问题 • 2 人关注 • 1 个回复 • 2042 次浏览 • 2025-03-12 08:57
有什么 es 开源工具可以推荐使用
INFINI Labs 小助手 回复了问题 • 2 人关注 • 1 个回复 • 4049 次浏览 • 2025-01-15 14:48
是否可以在forcemerge时指定只清理一个段里的deleted文档,不合并2个段
Fred2000 回复了问题 • 2 人关注 • 1 个回复 • 2852 次浏览 • 2025-01-14 10:14
Elasticseach Ingest 模块&&漏洞分析
Radiancebobo 发表了文章 • 0 个评论 • 2578 次浏览 • 2024-12-23 12:58
本文基于Elasticsearch7.10.2分析
0.Ingest 节点 概述
在实际进行文档index之前,使用采集节点(默认情况下,每个es节点都是ingest)对文档进行预处理。采集节点会拦截bulk和index请求,进行转换,然后将文档传回index或bulk API。
每个索引都有index.default_pipeline 和 index.final_pipeline 两个配置。
他们都是用于指定 Elasticsearch index 或者bulk 文档时要执行的预处理逻辑。
- index.default_pipeline 定义了默认管道,它会在索引文档时首先执行。但如果索引请求中指定了 pipeline 参数,则该参数指定的管道会覆盖默认管道。如果设置了 index.default_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行任何摄取管道。
- index.final_pipeline 定义了最终管道,它总是在请求管道(如果指定)和默认管道(如果存在)之后执行。如果设置了 index.final_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行最终管道。
简而言之,default_pipeline 先执行,可被覆盖;final_pipeline 后执行,不可被覆盖。两者都可设为 _none 以禁用。
一个Pipeline可以有多个Processor组成,每个Processor有着各自的不同功能,官方支持的Processor可参考:https://www.elastic.co/guide/e ... .html
一个简单的例子,利用Set Processor 对新增的文档中加入新的字段和值:
java<br /> PUT _ingest/pipeline/set_os<br /> {<br /> "description": "sets the value of host.os.name from the field os",<br /> "processors": [<br /> {<br /> "set": {<br /> "field": "host.os.name", // 增加的属性<br /> "value": "{{os}}" // 这里引用了文档原先的os属性, 这里可以直接填写其他值<br /> }<br /> }<br /> ]<br /> }<br /> <br /> POST _ingest/pipeline/set_os/_simulate<br /> {<br /> "docs": [<br /> {<br /> "_source": {<br /> "os": "Ubuntu"<br /> }<br /> }<br /> ]<br /> }<br />
这样转换之后,文档内容就变成了:
java<br /> {<br /> "host" : {<br /> "os" : {<br /> "name" : "Ubuntu"<br /> }<br /> },<br /> "os" : "Ubuntu"<br /> }<br />
写单个文档的流程概述
当请求,或者索引本身配置有pipline的时候,协调节点就会转发到ingest节点
java<br /> PSOT source_index/_doc?pipeline=set_os<br /> {<br /> "os": "xxxx"<br /> }<br />
【注】 并不是一定会发生内部rpc的请求转发,如果本地节点能接受当前的请求则不会转发到其他节点。
1. 模块总体概述
本小节关注IngestService中重要的相关类,对这些类有一个整体的了解有助于理解该模块。
- ClusterService
- IngestService 实现了 ClusterStateApplier 接口, 这样就能监听和响应集群的状态变化,当集群状态更新时,IngestService可以调整其内部 pipelines完成CRUD。
- 另外IngestService还有List<Consumer
>用来对提供给对集群状态变更之后需要最新状态的插件。
- IngestService 实现了 ClusterStateApplier 接口, 这样就能监听和响应集群的状态变化,当集群状态更新时,IngestService可以调整其内部 pipelines完成CRUD。
- ScriptService
- 某些Processor需要其用于管理和执行脚本,比如Script Processor。
- 某些Processor需要其用于管理和执行脚本,比如Script Processor。
- AnalysisRegistry
- 某些需要对文档内容进行分词处理的Processor。
- 某些需要对文档内容进行分词处理的Processor。
- ThreadPool
- Processor都是异步执行的,实际执行线程池取决于调用上下文(如 write 或 management)
- bulk API 时发生的pipeline 处理使用的是write线程池。
- pipeline/_simulate API 使用的是management线程池,模拟执行通常是短时间的、低频的任务,不需要高并发支持而且为了不影响实际的文档处理或其他重要任务。
- bulk API 时发生的pipeline 处理使用的是write线程池。
- 另外为了避免Grok Processor运行时间过长,使用了Generic线程做定时调度检查执行时间
- Processor都是异步执行的,实际执行线程池取决于调用上下文(如 write 或 management)
- IngestMetric
- 通过实现ReportingService接口来做到展示ingest内部的执行情况。
- GET _nodes/stats?filter_path=nodes.*.ingest 可以查看到ingest 中的每个Pipeline中的执行的次数、失败次数以及总耗时
- 通过实现ReportingService接口来做到展示ingest内部的执行情况。
- IngestPlugin
- Ingest支持加载自定义的Processor插件,系统内置的所有Processor以及自定义的都通过IngestService 中的Map<String, Processor.Factory> processorFactories来进行管理。
- Ingest支持加载自定义的Processor插件,系统内置的所有Processor以及自定义的都通过IngestService 中的Map<String, Processor.Factory> processorFactories来进行管理。
- IngestDocument
- 其包含文档的源数据,提供了修改和查询文档的字段的能力,为Processor灵活操作文档数据提供基础,在后续pipeline的执行中,也是由其的executePipeline方法驱动的。
2. Processor 实现机制
抽象工厂设计模式的应用
Processor接口设计
每个Processor都有核心方法execute,使得处理器能够以统一的方式操作 IngestDocument,并通过多态实现不同处理逻辑。
Processor.Factory的设计
Processor.Factory 是 Processor 的抽象工厂,负责动态创建处理器实例。其主要职责包括:
- 其包含文档的源数据,提供了修改和查询文档的字段的能力,为Processor灵活操作文档数据提供基础,在后续pipeline的执行中,也是由其的executePipeline方法驱动的。
- 动态实例化:
- Processor create() 方法接收处理器配置并创建具体的处理器。
- 支持递归创建,例如ConditionalProcessor可以嵌套其他处理器。
- Processor create() 方法接收处理器配置并创建具体的处理器。
- 依赖注入:
- Processor.Parameters 提供了一组服务和工具(如 ScriptService),工厂可以利用这些依赖创建复杂的Processor。
Processor.Factory的集中管理
在 IngestService 中,通过 Map<String, Processor.Factory> processorFactories 集中管理所有处理器工厂。这种管理方式提供了以下优势:
动态扩展:
- Processor.Parameters 提供了一组服务和工具(如 ScriptService),工厂可以利用这些依赖创建复杂的Processor。
- 插件可以注册自定义Processor工厂。
- 新处理器类型的注册仅需添加到 processorFactories。
组合以及装饰器设计模式的应用
Processor经典的几个类的关系如下:
CompoundProcessor是经典的组合设计模式,Pipeline这个类可以像使用单个 Processor 一样调用 CompoundProcessor,无需关注其内部具体细节。
而ConditionalProcessor 以及TrackingResultProcessor则体现了装饰器模式,在不改变原有对象的情况下扩展功能:
- ConditionalProcessor 在执行Processor前会调用evaluate方法判断是否需要执行。
- TrackingResultProcessor中decorate是为 CompoundProcessor 及其内部的 Processor 添加跟踪功能。
如何自定义Processor 插件
自定义 Processor 插件的注册方式为实现 IngestPlugin (Elasticsearch 提供不同的插件接口用来扩展不同类型的功能)的 getProcessors 方法,该方法返回一个工厂列表,IngestService 会将这些工厂注入到 processorFactories 中。
分析完代码之后,我们回到实战中来,简单起见,我们实现类似Append的Processor,但是我们这个更简单,输入的是字符串,然后我们用,分割一下将其作为数组设为值。
java<br /> import org.elasticsearch.ingest.Processor;<br /> import org.elasticsearch.plugins.IngestPlugin;<br /> import org.elasticsearch.plugins.Plugin;<br /> import java.util.HashMap;<br /> import java.util.Map;<br /> public class AddArrayProcessorPlugin extends Plugin implements IngestPlugin {<br /> @Override<br /> public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {<br /> Map<String, Processor.Factory> processors = new HashMap<>();<br /> processors.put(AddArrayProcessor.TYPE, new AddArrayProcessor.Factory());<br /> return processors;<br /> }<br /> }<br />
java<br /> import org.elasticsearch.ingest.AbstractProcessor;<br /> import org.elasticsearch.ingest.ConfigurationUtils;<br /> import org.elasticsearch.ingest.IngestDocument;<br /> import org.elasticsearch.ingest.Processor;<br /> import java.util.*;<br /> <br /> public class AddArrayProcessor extends AbstractProcessor {<br /> public static final String TYPE = "add_array";<br /> public String field;<br /> public String value;<br /> protected AddArrayProcessor(String tag, String description, String field, String value) {<br /> super(tag, description);<br /> this.field = field;<br /> this.value = value;<br /> }<br /> @Override<br /> public IngestDocument execute(IngestDocument ingestDocument) throws Exception {<br /> List valueList = new ArrayList<>(Arrays.asList(value.split(",")));<br /> ingestDocument.setFieldValue(field, valueList);<br /> return ingestDocument;<br /> }<br /> @Override<br /> public String getType() {<br /> return TYPE;<br /> }<br /> public static final class Factory implements Processor.Factory {<br /> @Override<br /> public Processor create(Map<String, Processor.Factory> processorFactories, String tag,<br /> String description, Map<String, Object> config) throws Exception {<br /> String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");<br /> String value = ConfigurationUtils.readStringProperty(TYPE, tag, config, "value");<br /> return new AddArrayProcessor(tag, description, field, value);<br /> }<br /> }<br /> }<br />
打包之后,我们去install我们的Processor插件:
java<br /> bin/elasticsearch-plugin install file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip <br /> warning: no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release<br /> -> Installing file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip<br /> -> Downloading file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip<br /> [=================================================] 100% <br /> -> Installed AddArrayProcess<br />
测试一下:
java<br /> POST /_ingest/pipeline/_simulate<br /> {<br /> "pipeline": {<br /> "processors": [<br /> {<br /> "add_array": {<br /> "field": "test_arr",<br /> "value": "a,b,c,d,e,f"<br /> }<br /> }<br /> ]<br /> },<br /> "docs": [<br /> {<br /> "_index": "test",<br /> "_id": "1",<br /> "_source": {<br /> "field": "value"<br /> }<br /> }<br /> ]<br /> }<br /> docs结果:<br /> "_source" : {<br /> "field" : "value",<br /> "test_arr" : [<br /> "a",<br /> "b",<br /> "c",<br /> "d",<br /> "e",<br /> "f"<br /> ]<br /> }<br />
3. Pipeline设计
如何管理Pipeline
在IngestServcie中有一个Map存储所有的Pipeline实例,private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();
这里并没有将Pipeline实例存储在IngestMetadata,这样做的原因有2个:
- 在 Elasticsearch 的启动过程中,插件和节点服务的初始化发生在 ClusterState 加载之后, 只有等所有插件完成加载后,所有的Processor工厂才会被注册到系统中,这意味着在集群状态初始化之前,Processor工厂并不可用。
- ClusterState 中的元数据结构是静态注册的,即在类加载时已经确定,如果要将运行时示例存储进去,必须改变 ClusterState 的元数据结构存储格式,这个很不方便维护,而且这在集群状态同步的时候也会带来不必要的序列化以及反序列化。
所以Pipeline的管理逻辑是:
- 在 Elasticsearch 的启动过程中,插件和节点服务的初始化发生在 ClusterState 加载之后, 只有等所有插件完成加载后,所有的Processor工厂才会被注册到系统中,这意味着在集群状态初始化之前,Processor工厂并不可用。
- ClusterState 中的IngestMetadata 只存储 JSON 格式的管道定义,描述 Pipeline 的配置(例如,包含哪些 Processor,它们的参数等)。
- IngestService 中维护的运行时实例,将 JSON 配置解析为完整的 Pipeline 对象,包括处理器链
在集群变更时, pipeline的CRUD的实现在innerUpdatePipelines方法中 。
责任链设计模式的应用
管道的执行通过IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 去驱动,每个文档的每个Pipeline都会进入到这个函数, 而每个Pipeline有组装好的CompoundProcessor,实际的链式调用是在CompoundProcessor中。
简图大致如下:
这里拿其和Zookeeper(3.6.3)中RequestProcessor 做一些对比:
| 特点 | ES中的CompoundProcessor | Zookeeper中RequestProcessor |
| --- | --- | --- |
| 链的存储定义 | 由两条列表组成,分别保存正常流程以及失败流程的Processor列表。 | 固定的链式结构。 |
| 异步执行 | 支持异步回调,可以异步执行链中的处理器。 | 同步执行 。 |
| 失败处理 | 提供专门的 onFailureProcessors 作为失败处理链。如果不忽略异常并且onFailureProcessors 不为空则会执行失败处理链逻辑。 | 没有专门的失败处理链,异常直接交由上层捕获。 |
| 可配置性 | 可动态调整处理器链和配置(如 ignoreFailure)。 | 代码中写死,无法配置 |
这里并没有说Zookeeper的设计就差于Elasticsearch, 只是设计目标有所不同,RequestProcessor就是适合集中式的强一致、其中Processor并不需要灵活变化,而CompoundProcessor就是适合高并发而Procesor灵活变化场景。
4. Ingest实战建议
回到实战中来,这里结合目前所分析的内容给出相应的实战建议。
- 建立监控
- 对于关键的Piepline,我们需要通过GET _nodes/stats?filter_path=nodes.*.ingest 获取其运行状况,识别延迟或失败的 Processor。
- 文档写入的时候使用的是write线程池,我们也需要监控GET _nodes/stats?filter_path=nodes.*.thread_pool.write 的queue和write_rejections 判断需要扩展线程池。
- 对于关键的Piepline,我们需要通过GET _nodes/stats?filter_path=nodes.*.ingest 获取其运行状况,识别延迟或失败的 Processor。
- 优化Processor
- 减少高开销操作,优先使用内置 Processor,比如script Processor 可适时替换set, append,避免过度复杂的正则表达式或嵌套逻辑。
- 自定义的Processor尽量优化,比如如果涉及查询外部系统可考虑引入缓存。
- 减少高开销操作,优先使用内置 Processor,比如script Processor 可适时替换set, append,避免过度复杂的正则表达式或嵌套逻辑。
- 建立单独的Ingest节点
- 如果有大量的Pipeline需要执行,则可以考虑增加专用 Ingest 节点,避免与数据节点争夺资源。
5. 漏洞&&修复分析
这里分析7.10.2版本Ingest模块存在的漏洞以及官方是如何修复的。
CVE-2021-22144
https://discuss.elastic.co/t/e ... 78100
Elasticsearch Grok 解析器中发现了一个不受控制的递归漏洞,该漏洞可能导致拒绝服务攻击。能够向 Elasticsearch 提交任意查询的用户可能会创建恶意 Grok 查询,从而导致 Elasticsearch 节点崩溃。
漏洞复现
发起这个请求:
- 如果有大量的Pipeline需要执行,则可以考虑增加专用 Ingest 节点,避免与数据节点争夺资源。
- patterns: 处理字段时使用的 Grok 模式,这里设置为 %{INT}。
- pattern_definitions: 定义自定义 Grok 模式,这里故意让 INT 模式递归引用自身,导致循环引用问题。
go<br /> POST /_ingest/pipeline/_simulate<br /> {<br /> "pipeline": {<br /> "processors": [<br /> {<br /> "grok": {<br /> "field": "message",<br /> "patterns": [<br /> "%{INT}"<br /> ],<br /> "pattern_definitions": {<br /> "INT": "%{INT}"<br /> }<br /> }<br /> }<br /> ]<br /> },<br /> "docs": [<br /> {<br /> "_source": {<br /> "message": "test"<br /> }<br /> }<br /> ]<br /> }<br /> <br />
当执行之后会使得节点直接StackOverflow中止进程。
修复逻辑
这个问题的关键在于原先的逻辑中,只会对间接的递归引用(pattern1 => pattern2 => pattern3 => pattern1)做了检测,但是没有对直接的自引用(pattern1 => pattern1 )做检测。
java<br /> private void forbidCircularReferences() {<br /> // 这个是增加的逻辑,检测直接的自引用<br /> for (Map.Entry<String, String> entry : patternBank.entrySet()) {<br /> if (patternReferencesItself(entry.getValue(), entry.getKey())) {<br /> throw new IllegalArgumentException("circular reference in pattern [" + entry.getKey() + "][" + entry.getValue() + "]");<br /> }<br /> }<br /> <br /> // 间接递归引用检测(这个是原先的逻辑)<br /> for (Map.Entry<String, String> entry : patternBank.entrySet()) {<br /> String name = entry.getKey();<br /> String pattern = entry.getValue();<br /> innerForbidCircularReferences(name, new ArrayList<>(), pattern);<br /> }<br /> }<br />
CVE-2023-46673
https://discuss.elastic.co/t/e ... 47708
漏洞复现
尝试了很多已有的Processor都没有复现,我们这使用自定义的Processor来复现,将之前的自定义AddArrayProcessor加一行代码:
java<br /> @Override<br /> public IngestDocument execute(IngestDocument ingestDocument) throws Exception {<br /> List valueList = new ArrayList<>(Arrays.asList(value.split(",")));<br /> valueList.add(valueList); // 增加的代码<br /> ingestDocument.setFieldValue(field, valueList);<br /> return ingestDocument;<br /> }<br />
重新编译再安装插件之后,执行改Processor 将会StackOverflow。
修复逻辑
这个问题的关键在于在IngestDocument的deepCopyMap的方法之前没有判断这样的无限引用的情况:
那么在此之前做一个检测就好了,这个方法在原本的ES代码中就存在:org.elasticsearch.common.util.CollectionUtils#ensureNoSelfReferences(java.lang.Object, java.lang.String) ,其利用 IdentityHashMap 记录已访问对象的引用,检测并防止对象间的循环引用。
CVE-2024-23450
https://discuss.elastic.co/t/e ... 56314
漏洞复现
虽然我们的索引只有2个Pipeline的配置,但是由于Pipeline Processor的存在,所以实际上一个文档其实能被很多Pipeline处理,当需要执行足够多个的pipline个数时,则会发生StackOverflow。
修复逻辑
这个问题的关键在于对Pipeline的个数并没有限制,添加一个配置项,当超出该个数则直接抛出异常。
java<br /> public static final int MAX_PIPELINES = Integer.parseInt(System.getProperty("es.ingest.max_pipelines", "100"));<br />
IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 添加逻辑:
java<br /> public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {<br /> if (executedPipelines.size() >= MAX_PIPELINES) {<br /> handler.accept(<br /> null,<br /> new IllegalStateException(PIPELINE_TOO_MANY_ERROR_MESSAGE + MAX_PIPELINES + " nested pipelines")<br /> );<br /> } <br />
思考: 这里判断pipeline是否超出100个限制是用已经执行的pipeline个数来计算的。 假设已经超出100个pipeline,那这100个pipeline是会白跑的, 如果能在真正执行之前分析需要执行的Pipeline个数会更好。
6. 总结
Ingest 模块作为 Elasticsearch 数据处理流程的重要组成部分,提供了灵活的管道化能力,使得用户能够在数据写入前进行丰富的预处理操作。然而,在实际场景中,Ingest 模块也面临性能瓶颈、资源竞争等挑战,需要结合业务需求和系统现状进行精细化调优,通过优化 Processor 执行效率、合理规划集群架构以及增强监控与诊断手段,我们可以充分释放 Ingest 模块的能力,提升 Elasticsearch 的整体数据处理能力。
ES官方版本为什么到现在为止没有提供限流的功能?
Fred2000 回复了问题 • 2 人关注 • 1 个回复 • 2483 次浏览 • 2024-12-19 19:18
es filter script获取nested结构数据如何实现?
duanxiaobiao 回复了问题 • 2 人关注 • 1 个回复 • 3935 次浏览 • 2024-11-16 22:04
目前市面上常用的ES压测工具是啥
wu370324 回复了问题 • 3 人关注 • 2 个回复 • 3947 次浏览 • 2024-10-31 11:51
ES的_sql查询不返回长度超过ingore_above记录
kin122 回复了问题 • 2 人关注 • 1 个回复 • 4490 次浏览 • 2024-12-07 23:08
es 中keyword查询构建bitSet成本
Fred2000 回复了问题 • 2 人关注 • 1 个回复 • 5553 次浏览 • 2024-09-25 18:30
_reindex时怎样判断当目标记录中某个字段值大于原记录字段时跳过不更新?
kin122 回复了问题 • 3 人关注 • 2 个回复 • 4633 次浏览 • 2024-12-07 23:13
Elastic 宣布修改开源协议为 AGPL:Elasticsearch 再次成为开源软件
searchkit 发表了文章 • 0 个评论 • 6077 次浏览 • 2024-08-30 11:26
今日快讯!就在刚刚,开源搜索领域行业巨头 Elastic 官方博客发表了一篇最新公告《Elasticsearch is Open Source, Again》,Elastic 创始人& CTO Shay Banon 宣布 Elasticsearch 和 Kibana 许可证协议修改为 AGPL。
以下为搜索客社区从 Elastic 官方博客翻译的原文内容:
---
Elasticsearch 再次成为开源软件
[D.N.A] Elasticsearch 和 Kibana 现在可以再次被称为开源软件了。这句话让我感到无比的兴奋。真的忍不住跳起来庆祝!我们所有在 Elastic 的人都很高兴。开源精神是我的DNA,也是 Elastic 的DNA。能够再次称 Elasticsearch 为开源软件,真的是一种纯粹的快乐。
[LOVE.] 简而言之,我们将在接下来的几周内,除了 ELv2 和 SSPL 之外,增加 AGPL 作为另一个许可选项。在更改许可后,我们从未停止过像一个开源社区一样相信和行动。但通过使用 AGPL 这一获得开放源码促进会(OSI)批准的许可,消除了人们可能存在的任何疑问或混淆。
[Not Like Us] 我们在 Elastic 从未停止过对开源的信仰。我个人对开源的信仰也从未动摇,至今已有 25 年了。那么为什么三年前我们要做出改变呢?我们遇到了与 AWS 相关的问题,以及他们的服务引发的市场混乱。在尝试了所有能想到的其他选项后,我们更改了许可协议,明知道这会导致 Elasticsearch 被分叉成另一个名称并走上不同的发展轨迹。这是一个漫长的故事。
[Like That] 好消息是,虽然过程痛苦,但结果奏效了。三年后,亚马逊已经完全投入到了他们的分叉项目中,市场的混乱(大部分)得到了缓解,我们与 AWS 的合作伙伴关系比以往更强。我们甚至被评为 AWS 的年度合作伙伴。我一直希望时间能过去得足够久,以至于我们可以安全地回到开源项目的状态——现在终于到了。
[All The Stars] 我们希望尽可能简化用户的使用体验。我们有用户非常喜欢 ELv2(一个受 BSD 启发的许可)。我们也有用户已经批准使用 SSPL(通过 MongoDB 使用)。这就是为什么我们只是增加了一个选项,而不是移除任何东西。如果你已经在使用并喜欢 Elasticsearch,请继续使用,没有任何变化。对于其他人,现在你也可以选择使用 AGPL。
[LOYALTY.] 我们选择 AGPL 而不是其他许可,是因为我们希望通过与 OSI 的合作,能在开源许可领域创造更多的选项。随着我们更改许可以来的发展(例如 Grafana 从 Apache2 转移到 AGPL),也许 AGPL 已经足够适用于像我们这样的基础设施软件了。我们致力于找到最合适的解决方案。
[euphoria] 我非常高兴能够再次称 Elasticsearch 为开源软件。
[Alright] 任何改变都可能引发混淆,当然也可能引来一些网络喷子。(网络喷子总是存在的,对吧?)让我们愉快地回答一些可能出现的问题吧。我可以想象到的一些问题如下,但我们会继续补充。
“更改许可是个错误,Elastic 现在在回撤”:我们在三年前更改许可时消除了很多市场混乱。由于我们的行动,很多事情已经改变。现在是一个完全不同的市场环境。我们不生活在过去。我们希望为用户打造一个更好的未来。正是因为我们当时采取了行动,现在我们才有能力采取新的行动。
“AGPL 不是真正的开源许可,X 才是”:AGPL 是获得 OSI 批准的许可,并且被广泛采用。例如,MongoDB 曾经使用 AGPL,Grafana 现在也是 AGPL。这表明 AGPL 并不影响使用或流行度。我们选择 AGPL 是因为我们认为这是与 OSI 一起为世界开辟更多开源道路的最佳方式,而不是减少开源。
“Elastic 更改许可是因为他们表现不好”:我要说的是,我今天对 Elastic 的未来依然充满期待。我为我们的产品和团队的执行力感到无比自豪。我们发布了无状态的 Elasticsearch ES|QL 和大量用于 GenAI 用例的向量数据库/混合搜索改进。我们在日志记录和可观察性方面大力投入 OTel。我们的安全产品 SIEM 不断添加令人惊叹的功能,并且是市场上增长最快的产品之一。用户的反应让我们感到非常谦卑。股市总有起伏,但我可以向你保证,我们始终着眼于长远发展,而这次的变更就是其中的一部分。
如果我们看到更多问题,会在上面继续添加,以期减少混淆。
[HUMBLE.] 为未来而构建真是令人兴奋。Elasticsearch 回归开源。万岁!这真是一件美妙的事情。今天真是美好的一天。
Forever :elasticheart: Open Source
Shay 2024-08-30
---
原文地址:https://www.elastic.co/blog/el ... gain/
社区热评
Elasticsearch 再次回归开源的消息迅速引发了技术社区的广泛关注。这不仅是 Elastic 对自身开源信念的重申,也是其在激烈市场竞争中精心策划的一次战略调整。
三年前,Elastic 因与 AWS 的市场竞争而选择更改许可协议,这一决定在当时引发了不小的争议。尽管如此,事实证明,这一策略有效地减少了市场上的混淆,也为 Elastic 与 AWS 的合作奠定了更坚实的基础。如今,Elastic 再度选择开源,并新增 AGPL 作为许可选项,这一举措无疑展现了 Elastic 在市场中更加成熟的定位和对未来发展的自信。
这一变化不仅仅是一个公司的商业决策,更是开源生态系统的一次重要信号。Elastic 的回归开源,传递出一个明确的信息:即使在商业竞争中,开源仍然是企业实现长远发展的重要路径。随着这一决定的落地,其他软件公司可能也会重新审视自身的许可策略,推动更多开源项目的发展与创新。
此外,Elastic 选择 AGPL 作为新许可选项,也显示出其对开源生态未来走向的深刻洞察。AGPL 的引入,表明 Elastic 希望在开源社区中保持灵活性和多样性,同时推动整个行业向更加开放和透明的方向迈进。
总体而言,Elastic 重返开源的举动,不仅意在巩固其在开源社区中的地位,也为行业树立了一个新的标杆。这一事件无疑将成为开源软件发展史上的重要篇章,未来可能会激发更多企业重新考虑其开源战略,进而推动整个技术行业的进一步繁荣与进步。
让我们拭目以待!
ES在匹配结果数据量较少时,使用sort排序占用资源吗?
Fred2000 回复了问题 • 2 人关注 • 1 个回复 • 3572 次浏览 • 2024-08-28 10:16
【第3期】2024 搜索客 Meetup | Elasticsearch 的代码结构和写入查询流程的解读 - 下篇
searchkit 发表了文章 • 0 个评论 • 3931 次浏览 • 2024-08-21 13:00
本次活动由 搜索客社区、极限科技(INFINI Labs)联合举办,活动主题将深入探讨 Elasticsearch 的两个核心方面:代码结构以及写入和查询的关键流程。本次活动将为 Elasticsearch 初学者和有经验的用户提供宝贵的见解,欢迎大家报名参加、交流学习。
活动主题:Elasticsearch 的代码结构和写入查询流程的解读 - 下篇
活动时间:2024 年 8 月 28 日 19:00-20:00(周三)
活动形式:微信视频号(极限实验室)直播
报名方式:关注或扫码海报中的二维码进行预约

嘉宾介绍
张磊,极限科技搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。
主题摘要
本次分享将探讨 Elasticsearch 的代码结构及其写入和查询流程。内容包括:项目架构、核心模块、插件系统,以及文档索引和查询的各个阶段与分布式查询协调。
活动亮点
- 深入解析 Elasticsearch 代码结构:
- 项目的整体结构:了解 Elasticsearch 项目的组织方式。
- 核心模块及其职责:学习 Elasticsearch 的核心模块及其在系统中的具体角色。
- 插件系统的设计:探索 Elasticsearch 灵活的插件系统设计及其扩展方式。
- 项目的整体结构:了解 Elasticsearch 项目的组织方式。
- 详细解读 Elasticsearch 写入和查询流程:
- 文档索引过程的各个阶段:跟随文档从初始接收至最终存储的索引过程。
- 查询解析和执行的步骤:理解 Elasticsearch 中查询解析和执行的各个步骤。
- 分布式查询的协调过程:学习 Elasticsearch 如何在分布式架构中协调查询,以提供高效且准确的搜索结果。
参与有奖
直播中将设有福袋抽奖环节,参与就有机会获得 INFINI Labs 周边纪念品,包括 T 恤、鸭舌帽、咖啡杯、指甲刀套件等等(图片仅供参考,款式、颜色与尺码随机)。

活动交流
本活动设有 Meetup 技术交流群,可添加小助手微信入群。

本次 Meetup 是深入了解 Elasticsearch 内部工作机制、提升使用技能的绝佳机会。不要错过!
Meetup 讲师招募

搜索客社区 Meetup 的成功举办,离不开社区小伙伴的热情参与。目前社区讲师招募计划也在持续进行中,我们诚挚邀请各位技术大咖、行业精英踊跃提交演讲议题,与大家分享您的经验。
讲师报名链接:[http://cfp.searchkit.cn](http://cfp.searchkit.cn)
或扫描下方二维码,立刻报名成为讲师!

Meetup 活动聚焦 AI 与搜索领域的最新动态,以及数据实时搜索分析、向量检索、技术实践与案例分析、日志分析、安全等领域的深度探讨。
我们热切期待您的精彩分享!
往期回顾
- [【第 2 期】2024 搜索客 Meetup | Elasticsearch 的代码结构和写入查询流程的解读 - 上篇](https://mp.weixin.qq.com/s/oQNej6aDMzLp64_AKxSONw)
- [【第 1 期】2024 搜索客 Meetup | Easysearch 结合大模型实现 RAG](https://mp.weixin.qq.com/s/7fpPFGKltJASspmIaUqJMg)
关于 搜索客(SearchKit)社区
搜索客社区由 Elasticsearch 中文社区进行全新的品牌升级,以新的 Slogan:“搜索人自己的社区” 为宣言。汇集搜索领域最新动态、精选干货文章、精华讨论、文档资料、翻译与版本发布等,为广大搜索领域从业者提供更为丰富便捷的学习和交流平台。
社区官网:[https://searchkit.cn](https://searchkit.cn) 。
- 文档索引过程的各个阶段:跟随文档从初始接收至最终存储的索引过程。