本文基于Elasticsearch7.10.2分析
0.Ingest 节点 概述
在实际进行文档index之前,使用采集节点(默认情况下,每个es节点都是ingest)对文档进行预处理。采集节点会拦截bulk和index请求,进行转换,然后将文档传回index或bulk API。
每个索引都有index.default_pipeline 和 index.final_pipeline 两个配置。
他们都是用于指定 Elasticsearch index 或者bulk 文档时要执行的预处理逻辑。
- index.default_pipeline 定义了默认管道,它会在索引文档时首先执行。但如果索引请求中指定了 pipeline 参数,则该参数指定的管道会覆盖默认管道。如果设置了 index.default_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行任何摄取管道。
- index.final_pipeline 定义了最终管道,它总是在请求管道(如果指定)和默认管道(如果存在)之后执行。如果设置了 index.final_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行最终管道。
简而言之,default_pipeline 先执行,可被覆盖;final_pipeline 后执行,不可被覆盖。两者都可设为 _none 以禁用。
一个Pipeline可以有多个Processor组成,每个Processor有着各自的不同功能,官方支持的Processor可参考:https://www.elastic.co/guide/en/elasticsearch/reference/7.10/ingest-processors.html
一个简单的例子,利用Set Processor 对新增的文档中加入新的字段和值:
PUT _ingest/pipeline/set_os
{
"description": "sets the value of host.os.name from the field os",
"processors": [
{
"set": {
"field": "host.os.name", // 增加的属性
"value": "{{os}}" // 这里引用了文档原先的os属性, 这里可以直接填写其他值
}
}
]
}
POST _ingest/pipeline/set_os/_simulate
{
"docs": [
{
"_source": {
"os": "Ubuntu"
}
}
]
}
这样转换之后,文档内容就变成了:
{
"host" : {
"os" : {
"name" : "Ubuntu"
}
},
"os" : "Ubuntu"
}
写单个文档的流程概述
当请求,或者索引本身配置有pipline的时候,协调节点就会转发到ingest节点
PSOT source_index/_doc?pipeline=set_os
{
"os": "xxxx"
}
【注】 并不是一定会发生内部rpc的请求转发,如果本地节点能接受当前的请求则不会转发到其他节点。
1. 模块总体概述
本小节关注IngestService中重要的相关类,对这些类有一个整体的了解有助于理解该模块。
- ClusterService
- IngestService 实现了 ClusterStateApplier 接口, 这样就能监听和响应集群的状态变化,当集群状态更新时,IngestService可以调整其内部 pipelines完成CRUD。
- 另外IngestService还有List<Consumer
>用来对提供给对集群状态变更之后需要最新状态的插件。
- ScriptService
- 某些Processor需要其用于管理和执行脚本,比如Script Processor。
- AnalysisRegistry
- 某些需要对文档内容进行分词处理的Processor。
- ThreadPool
- Processor都是异步执行的,实际执行线程池取决于调用上下文(如 write 或 management)
- bulk API 时发生的pipeline 处理使用的是write线程池。
- pipeline/_simulate API 使用的是management线程池,模拟执行通常是短时间的、低频的任务,不需要高并发支持而且为了不影响实际的文档处理或其他重要任务。
- 另外为了避免Grok Processor运行时间过长,使用了Generic线程做定时调度检查执行时间
- Processor都是异步执行的,实际执行线程池取决于调用上下文(如 write 或 management)
- IngestMetric
- 通过实现ReportingService接口来做到展示ingest内部的执行情况。
- GET _nodes/stats?filter_path=nodes.*.ingest 可以查看到ingest 中的每个Pipeline中的执行的次数、失败次数以及总耗时
- IngestPlugin
- Ingest支持加载自定义的Processor插件,系统内置的所有Processor以及自定义的都通过IngestService 中的Map<String, Processor.Factory> processorFactories来进行管理。
- IngestDocument
- 其包含文档的源数据,提供了修改和查询文档的字段的能力,为Processor灵活操作文档数据提供基础,在后续pipeline的执行中,也是由其的executePipeline方法驱动的。
2. Processor 实现机制
抽象工厂设计模式的应用
Processor接口设计
每个Processor都有核心方法execute,使得处理器能够以统一的方式操作 IngestDocument,并通过多态实现不同处理逻辑。
Processor.Factory的设计
Processor.Factory 是 Processor 的抽象工厂,负责动态创建处理器实例。其主要职责包括:
- 动态实例化:
- Processor create() 方法接收处理器配置并创建具体的处理器。
- 支持递归创建,例如ConditionalProcessor可以嵌套其他处理器。
- 依赖注入:
- Processor.Parameters 提供了一组服务和工具(如 ScriptService),工厂可以利用这些依赖创建复杂的Processor。
Processor.Factory的集中管理
在 IngestService 中,通过 Map<String, Processor.Factory> processorFactories 集中管理所有处理器工厂。这种管理方式提供了以下优势:
动态扩展:
- 插件可以注册自定义Processor工厂。
- 新处理器类型的注册仅需添加到 processorFactories。
组合以及装饰器设计模式的应用
Processor经典的几个类的关系如下:
CompoundProcessor是经典的组合设计模式,Pipeline这个类可以像使用单个 Processor 一样调用 CompoundProcessor,无需关注其内部具体细节。
而ConditionalProcessor 以及TrackingResultProcessor则体现了装饰器模式,在不改变原有对象的情况下扩展功能:
- ConditionalProcessor 在执行Processor前会调用evaluate方法判断是否需要执行。
- TrackingResultProcessor中decorate是为 CompoundProcessor 及其内部的 Processor 添加跟踪功能。
如何自定义Processor 插件
自定义 Processor 插件的注册方式为实现 IngestPlugin (Elasticsearch 提供不同的插件接口用来扩展不同类型的功能)的 getProcessors 方法,该方法返回一个工厂列表,IngestService 会将这些工厂注入到 processorFactories 中。
分析完代码之后,我们回到实战中来,简单起见,我们实现类似Append的Processor,但是我们这个更简单,输入的是字符串,然后我们用,分割一下将其作为数组设为值。
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.HashMap;
import java.util.Map;
public class AddArrayProcessorPlugin extends Plugin implements IngestPlugin {
@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
Map<String, Processor.Factory> processors = new HashMap<>();
processors.put(AddArrayProcessor.TYPE, new AddArrayProcessor.Factory());
return processors;
}
}
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.*;
public class AddArrayProcessor extends AbstractProcessor {
public static final String TYPE = "add_array";
public String field;
public String value;
protected AddArrayProcessor(String tag, String description, String field, String value) {
super(tag, description);
this.field = field;
this.value = value;
}
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
List valueList = new ArrayList<>(Arrays.asList(value.split(",")));
ingestDocument.setFieldValue(field, valueList);
return ingestDocument;
}
@Override
public String getType() {
return TYPE;
}
public static final class Factory implements Processor.Factory {
@Override
public Processor create(Map<String, Processor.Factory> processorFactories, String tag,
String description, Map<String, Object> config) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
String value = ConfigurationUtils.readStringProperty(TYPE, tag, config, "value");
return new AddArrayProcessor(tag, description, field, value);
}
}
}
打包之后,我们去install我们的Processor插件:
bin/elasticsearch-plugin install file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
warning: no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release
-> Installing file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
-> Downloading file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
[=================================================] 100%
-> Installed AddArrayProcess
测试一下:
POST /_ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"add_array": {
"field": "test_arr",
"value": "a,b,c,d,e,f"
}
}
]
},
"docs": [
{
"_index": "test",
"_id": "1",
"_source": {
"field": "value"
}
}
]
}
docs结果:
"_source" : {
"field" : "value",
"test_arr" : [
"a",
"b",
"c",
"d",
"e",
"f"
]
}
3. Pipeline设计
如何管理Pipeline
在IngestServcie中有一个Map存储所有的Pipeline实例,private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();
这里并没有将Pipeline实例存储在IngestMetadata,这样做的原因有2个:
- 在 Elasticsearch 的启动过程中,插件和节点服务的初始化发生在 ClusterState 加载之后, 只有等所有插件完成加载后,所有的Processor工厂才会被注册到系统中,这意味着在集群状态初始化之前,Processor工厂并不可用。
- ClusterState 中的元数据结构是静态注册的,即在类加载时已经确定,如果要将运行时示例存储进去,必须改变 ClusterState 的元数据结构存储格式,这个很不方便维护,而且这在集群状态同步的时候也会带来不必要的序列化以及反序列化。
所以Pipeline的管理逻辑是:
- ClusterState 中的IngestMetadata 只存储 JSON 格式的管道定义,描述 Pipeline 的配置(例如,包含哪些 Processor,它们的参数等)。
- IngestService 中维护的运行时实例,将 JSON 配置解析为完整的 Pipeline 对象,包括处理器链
在集群变更时, pipeline的CRUD的实现在innerUpdatePipelines方法中 。
责任链设计模式的应用
管道的执行通过IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 去驱动,每个文档的每个Pipeline都会进入到这个函数, 而每个Pipeline有组装好的CompoundProcessor,实际的链式调用是在CompoundProcessor中。
简图大致如下:
这里拿其和Zookeeper(3.6.3)中RequestProcessor 做一些对比:
特点 | ES中的CompoundProcessor | Zookeeper中RequestProcessor |
---|---|---|
链的存储定义 | 由两条列表组成,分别保存正常流程以及失败流程的Processor列表。 | 固定的链式结构。 |
异步执行 | 支持异步回调,可以异步执行链中的处理器。 | 同步执行 。 |
失败处理 | 提供专门的 onFailureProcessors 作为失败处理链。如果不忽略异常并且onFailureProcessors 不为空则会执行失败处理链逻辑。 | 没有专门的失败处理链,异常直接交由上层捕获。 |
可配置性 | 可动态调整处理器链和配置(如 ignoreFailure)。 | 代码中写死,无法配置 |
这里并没有说Zookeeper的设计就差于Elasticsearch, 只是设计目标有所不同,RequestProcessor就是适合集中式的强一致、其中Processor并不需要灵活变化,而CompoundProcessor就是适合高并发而Procesor灵活变化场景。
4. Ingest实战建议
回到实战中来,这里结合目前所分析的内容给出相应的实战建议。
- 建立监控
- 对于关键的Piepline,我们需要通过GET _nodes/stats?filter_path=nodes.*.ingest 获取其运行状况,识别延迟或失败的 Processor。
- 文档写入的时候使用的是write线程池,我们也需要监控GET _nodes/stats?filter_path=nodes.*.thread_pool.write 的queue和write_rejections 判断需要扩展线程池。
- 优化Processor
- 减少高开销操作,优先使用内置 Processor,比如script Processor 可适时替换set, append,避免过度复杂的正则表达式或嵌套逻辑。
- 自定义的Processor尽量优化,比如如果涉及查询外部系统可考虑引入缓存。
- 建立单独的Ingest节点
- 如果有大量的Pipeline需要执行,则可以考虑增加专用 Ingest 节点,避免与数据节点争夺资源。
5. 漏洞&&修复分析
这里分析7.10.2版本Ingest模块存在的漏洞以及官方是如何修复的。
CVE-2021-22144
https://discuss.elastic.co/t/elasticsearch-7-13-3-and-6-8-17-security-update/278100
Elasticsearch Grok 解析器中发现了一个不受控制的递归漏洞,该漏洞可能导致拒绝服务攻击。能够向 Elasticsearch 提交任意查询的用户可能会创建恶意 Grok 查询,从而导致 Elasticsearch 节点崩溃。
漏洞复现
发起这个请求:
- patterns: 处理字段时使用的 Grok 模式,这里设置为 %{INT}。
- pattern_definitions: 定义自定义 Grok 模式,这里故意让 INT 模式递归引用自身,导致循环引用问题。
POST /_ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{INT}"
],
"pattern_definitions": {
"INT": "%{INT}"
}
}
}
]
},
"docs": [
{
"_source": {
"message": "test"
}
}
]
}
当执行之后会使得节点直接StackOverflow中止进程。
修复逻辑
这个问题的关键在于原先的逻辑中,只会对间接的递归引用(pattern1 => pattern2 => pattern3 => pattern1)做了检测,但是没有对直接的自引用(pattern1 => pattern1 )做检测。
private void forbidCircularReferences() {
// 这个是增加的逻辑,检测直接的自引用
for (Map.Entry<String, String> entry : patternBank.entrySet()) {
if (patternReferencesItself(entry.getValue(), entry.getKey())) {
throw new IllegalArgumentException("circular reference in pattern [" + entry.getKey() + "][" + entry.getValue() + "]");
}
}
// 间接递归引用检测(这个是原先的逻辑)
for (Map.Entry<String, String> entry : patternBank.entrySet()) {
String name = entry.getKey();
String pattern = entry.getValue();
innerForbidCircularReferences(name, new ArrayList<>(), pattern);
}
}
CVE-2023-46673
https://discuss.elastic.co/t/elasticsearch-7-17-14-8-10-3-security-update-esa-2023-24/347708
漏洞复现
尝试了很多已有的Processor都没有复现,我们这使用自定义的Processor来复现,将之前的自定义AddArrayProcessor加一行代码:
@Override
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
List valueList = new ArrayList<>(Arrays.asList(value.split(",")));
valueList.add(valueList); // 增加的代码
ingestDocument.setFieldValue(field, valueList);
return ingestDocument;
}
重新编译再安装插件之后,执行改Processor 将会StackOverflow。
修复逻辑
这个问题的关键在于在IngestDocument的deepCopyMap的方法之前没有判断这样的无限引用的情况:
那么在此之前做一个检测就好了,这个方法在原本的ES代码中就存在:org.elasticsearch.common.util.CollectionUtils#ensureNoSelfReferences(java.lang.Object, java.lang.String) ,其利用 IdentityHashMap 记录已访问对象的引用,检测并防止对象间的循环引用。
CVE-2024-23450
https://discuss.elastic.co/t/elasticsearch-8-13-0-7-17-19-security-update-esa-2024-06/356314
漏洞复现
虽然我们的索引只有2个Pipeline的配置,但是由于Pipeline Processor的存在,所以实际上一个文档其实能被很多Pipeline处理,当需要执行足够多个的pipline个数时,则会发生StackOverflow。
修复逻辑
这个问题的关键在于对Pipeline的个数并没有限制,添加一个配置项,当超出该个数则直接抛出异常。
public static final int MAX_PIPELINES = Integer.parseInt(System.getProperty("es.ingest.max_pipelines", "100"));
IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 添加逻辑:
public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
if (executedPipelines.size() >= MAX_PIPELINES) {
handler.accept(
null,
new IllegalStateException(PIPELINE_TOO_MANY_ERROR_MESSAGE + MAX_PIPELINES + " nested pipelines")
);
}
思考: 这里判断pipeline是否超出100个限制是用已经执行的pipeline个数来计算的。 假设已经超出100个pipeline,那这100个pipeline是会白跑的, 如果能在真正执行之前分析需要执行的Pipeline个数会更好。
6. 总结
Ingest 模块作为 Elasticsearch 数据处理流程的重要组成部分,提供了灵活的管道化能力,使得用户能够在数据写入前进行丰富的预处理操作。然而,在实际场景中,Ingest 模块也面临性能瓶颈、资源竞争等挑战,需要结合业务需求和系统现状进行精细化调优,通过优化 Processor 执行效率、合理规划集群架构以及增强监控与诊断手段,我们可以充分释放 Ingest 模块的能力,提升 Elasticsearch 的整体数据处理能力。
本文地址:http://elasticsearch.cn/article/15331