ingest pipeline
Day 25 - Elasticsearch Ingest节点数据管道处理器
Advent • bindiego 发表了文章 • 1 个评论 • 6567 次浏览 • 2018-12-24 18:18
首先还是祝大家圣诞快乐,既然是节日,我们就讨论一个比较轻松的话题。如何使用6.5引入数据管道处理器来更好的治理预定义好的数据管道。
背景
2018这一年来拜访了很多用户,其中有相当一部分在数据摄取时遇到包括性能在内的各种各样的问题,那么大多数在我们做了ingest节点的调整后得到了很好的解决。Ingest节点不是万能的,但是使用起来简单,而且抛开后面数据节点来看性能提升趋于线性。所以我一直本着能用ingest节点解决的问题,绝不麻烦其他组件的大体原则 :-)
下面快速回顾一下ingest节点的角色定位。
使用场景
通过上面的图纸我们很容易看到ingest节点可以在数据被索引之前,通过预定义好的处理管道对其进行治理。但这里一直存在一个局限性,就是只能通过一条管道。那么一直以来应对这个不便的方案就是把所有的处理器和细节全部配置到当前管道下。那么带来的问题也是比较明显的:
- 复制、粘贴很多相同的管道配置在不同数据管道里
- 非常难管理、维护冗长的管道
- 如果要更新一个处理细节的话要找到定位所有使用过这个逻辑的管道
其实这块对于开发的同学们很好理解,当你经常复制、粘贴代码的时候,就是时候好好思考一下了。我想说到这里大家其实已经明白了,这个管道处理器实际就是提供了一个允许你在一个管道内调用其他管道的方案。
他的使用非常简单,就像函数调用一样只有一个必要参数name
:
{
"pipeline": {
"name": "<其他管道的名称 - 英文字符>"
}
}
当然,也像其他处理器一样提供了on_failure
参数来处理错误,并且还有一个非常实用的if
参数来判断是否执行这个管道,这里就不做详细介绍了。
举例
这里我们用一个非常简单的案例来看看如何使用管道处理器。
假设在Elastic公司,我们使用员工卡来作为进入公司和各个部门以及房间的钥匙,并且这些刷卡事件也会被记录下来。那么由于上班卡机和门禁供应商不同,数据格式也不一样。但是最后都有一个通用的逻辑,就是除了事件发生的时间,我们还会记录下数据录入到Elasticsearch的时间。
首先我们看一下原始数据:
# 公司正门卡机数据
2018-12-25T08:59:59.312Z,front_door,binw,entered
# 架构部门禁数据
@timestamp=2018-12-25T09:15:34.414Z device_id=recreation_hall user=binw event=entered
那如果在6.5之前,我们定义2条管道是这个样子
-
正门卡机管道
- grok 解析数据
- 打上数据录入的时间戳
- 明确录入时间戳的处理器
- 门禁数据管道
- KV 解析数据
- 打上数据录入的时间戳
- 明确录入时间戳的处理器
很明显又66.67%的配置都是重复的,所以这里我们可以更优雅的解决这个问题
- 统一的数据录入时间戳处理器
- 打上数据录入的时间戳
- 明确录入时间戳的处理器
PUT _ingest/pipeline/pl_cmn
{
"description": "刷卡数据通用管道",
"processors": [
{
"set": {
"field": "ingest_timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "cmn_processed",
"value": "yes"
}
}
]
}
- 正门卡机管道
- grok 解析数据
- <调用管道 pl_cmn>
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "正门打卡机数据处理管道",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:@timestamp},%{WORD:device_id},%{USER:user},%{WORD:event}"
]
}
},
{
"pipeline": {
"name": "pl_cmn"
}
}
]
},
"docs": [
{
"_source": {
"message": "2018-12-25T08:59:59.312Z,front_door,binw,entered"
}
}
]
}
- 门禁数据管道
- KV 解析数据
- <调用管道 pl_cmn>
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "架构部门禁数据处理管道",
"processors": [
{
"kv": {
"field": "message",
"field_split": " ",
"value_split": "="
}
},
{
"pipeline": {
"name": "pl_cmn"
}
}
]
},
"docs": [
{
"_source": {
"message": "@timestamp=2018-12-25T09:15:34.414Z device_id=recreation_hall user=binw event=entered"
}
}
]
}
好啦,这个例子非常简单。但当面对复杂业务场景的时候,会让你整个数据管道的管理比以前整齐很多。再结合合理的架构和数据治理,ingest节点也可以让你的整个数据处理能力有所提升。
写在最后
在文章的例子里,我们往索引里灌注的是一个个的事件数据。那要如何对数据中的实体进行有效的分析呢?那不得不说到面向实体的数据模型设计。Elasticsearch本身也提供了工具能让我们快速实现,让我们明年有机会的时候再与大家分享吧。最后还是祝愿大家度过一个愉快的圣诞节和元旦!
Day 25 - Elasticsearch Ingest节点数据管道处理器
Advent • bindiego 发表了文章 • 1 个评论 • 6567 次浏览 • 2018-12-24 18:18
首先还是祝大家圣诞快乐,既然是节日,我们就讨论一个比较轻松的话题。如何使用6.5引入数据管道处理器来更好的治理预定义好的数据管道。
背景
2018这一年来拜访了很多用户,其中有相当一部分在数据摄取时遇到包括性能在内的各种各样的问题,那么大多数在我们做了ingest节点的调整后得到了很好的解决。Ingest节点不是万能的,但是使用起来简单,而且抛开后面数据节点来看性能提升趋于线性。所以我一直本着能用ingest节点解决的问题,绝不麻烦其他组件的大体原则 :-)
下面快速回顾一下ingest节点的角色定位。
使用场景
通过上面的图纸我们很容易看到ingest节点可以在数据被索引之前,通过预定义好的处理管道对其进行治理。但这里一直存在一个局限性,就是只能通过一条管道。那么一直以来应对这个不便的方案就是把所有的处理器和细节全部配置到当前管道下。那么带来的问题也是比较明显的:
- 复制、粘贴很多相同的管道配置在不同数据管道里
- 非常难管理、维护冗长的管道
- 如果要更新一个处理细节的话要找到定位所有使用过这个逻辑的管道
其实这块对于开发的同学们很好理解,当你经常复制、粘贴代码的时候,就是时候好好思考一下了。我想说到这里大家其实已经明白了,这个管道处理器实际就是提供了一个允许你在一个管道内调用其他管道的方案。
他的使用非常简单,就像函数调用一样只有一个必要参数name
:
{
"pipeline": {
"name": "<其他管道的名称 - 英文字符>"
}
}
当然,也像其他处理器一样提供了on_failure
参数来处理错误,并且还有一个非常实用的if
参数来判断是否执行这个管道,这里就不做详细介绍了。
举例
这里我们用一个非常简单的案例来看看如何使用管道处理器。
假设在Elastic公司,我们使用员工卡来作为进入公司和各个部门以及房间的钥匙,并且这些刷卡事件也会被记录下来。那么由于上班卡机和门禁供应商不同,数据格式也不一样。但是最后都有一个通用的逻辑,就是除了事件发生的时间,我们还会记录下数据录入到Elasticsearch的时间。
首先我们看一下原始数据:
# 公司正门卡机数据
2018-12-25T08:59:59.312Z,front_door,binw,entered
# 架构部门禁数据
@timestamp=2018-12-25T09:15:34.414Z device_id=recreation_hall user=binw event=entered
那如果在6.5之前,我们定义2条管道是这个样子
-
正门卡机管道
- grok 解析数据
- 打上数据录入的时间戳
- 明确录入时间戳的处理器
- 门禁数据管道
- KV 解析数据
- 打上数据录入的时间戳
- 明确录入时间戳的处理器
很明显又66.67%的配置都是重复的,所以这里我们可以更优雅的解决这个问题
- 统一的数据录入时间戳处理器
- 打上数据录入的时间戳
- 明确录入时间戳的处理器
PUT _ingest/pipeline/pl_cmn
{
"description": "刷卡数据通用管道",
"processors": [
{
"set": {
"field": "ingest_timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"set": {
"field": "cmn_processed",
"value": "yes"
}
}
]
}
- 正门卡机管道
- grok 解析数据
- <调用管道 pl_cmn>
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "正门打卡机数据处理管道",
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"%{TIMESTAMP_ISO8601:@timestamp},%{WORD:device_id},%{USER:user},%{WORD:event}"
]
}
},
{
"pipeline": {
"name": "pl_cmn"
}
}
]
},
"docs": [
{
"_source": {
"message": "2018-12-25T08:59:59.312Z,front_door,binw,entered"
}
}
]
}
- 门禁数据管道
- KV 解析数据
- <调用管道 pl_cmn>
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description": "架构部门禁数据处理管道",
"processors": [
{
"kv": {
"field": "message",
"field_split": " ",
"value_split": "="
}
},
{
"pipeline": {
"name": "pl_cmn"
}
}
]
},
"docs": [
{
"_source": {
"message": "@timestamp=2018-12-25T09:15:34.414Z device_id=recreation_hall user=binw event=entered"
}
}
]
}
好啦,这个例子非常简单。但当面对复杂业务场景的时候,会让你整个数据管道的管理比以前整齐很多。再结合合理的架构和数据治理,ingest节点也可以让你的整个数据处理能力有所提升。
写在最后
在文章的例子里,我们往索引里灌注的是一个个的事件数据。那要如何对数据中的实体进行有效的分析呢?那不得不说到面向实体的数据模型设计。Elasticsearch本身也提供了工具能让我们快速实现,让我们明年有机会的时候再与大家分享吧。最后还是祝愿大家度过一个愉快的圣诞节和元旦!