极限网关
如何防止 Elasticsearch 服务 OOM?
Easysearch • yangmf2040 发表了文章 • 0 个评论 • 1868 次浏览 • 2024-02-26 10:12
Elasticsearch(简称:ES) 和传统关系型数据库有很多区别, 比如传统数据中普遍都有一个叫“最大连接数”的设置。目的是使数据库系统工作在可控的负载下,避免出现负载过高,资源耗尽,谁也无法登录的局面。
那 ES 在这方面有类似参数吗?答案是没有,这也是为何 ES 会被流量打爆的原因之一。
针对大并发访问 ES 服务,造成 ES 节点 OOM,服务中断的情况,极限科技旗下的 INFINI Gateway 产品(以下简称 “极限网关”)可从两个方面入手,保障 ES 服务的可用性。
- 限制最大并发访问连接数。
- 限制非重要索引的请求速度,保障重要业务索引的访问速度。
下面我们来详细聊聊。
架构图
所有访问 ES 的请求都发给网关,可部署多个网关。
限制最大连接数
在网关配置文件中,默认有最大并发连接数限制,默认最大 10000。
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: $[[env.GW_BINDING]]
# See `gateway.disable_reuse_port_by_default` for more information.
reuse_port: true
使用压测程序测试,看看到达10000个连接后,能否限制新的连接。 超过的连接请求,被丢弃。更多信息参考官方文档。
限制索引写入速度
我们先看看不做限制的时候,测试环境的写入速度,在 9w - 15w docs/s 之间波动。虽然峰值很高,但不稳定。 接下来,我们通过网关把写入速度控制在最大 1w docs/s 。 对网关的配置文件 gateway.yml ,做以下修改。
env: # env 下添加
THROTTLE_BULK_INDEXING_MAX_BYTES: 40485760 #40MB/s
THROTTLE_BULK_INDEXING_MAX_REQUESTS: 10000 #10k docs/s
THROTTLE_BULK_INDEXING_ACTION: retry #retry,drop
THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES: 10 #1000
THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS: 100 #10
router: # route 部分修改 flow
- name: my_router
default_flow: default_flow
tracing_flow: logging_flow
rules:
- method:
- "*"
pattern:
- "/_bulk"
- "/{any_index}/_bulk"
flow:
- write_flow
flow: #flow 部分增加下面两段
- name: write_flow
filter:
- flow:
flows:
- bulking_indexing_limit
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
- name: bulking_indexing_limit
filter:
- bulk_request_throttle:
indices:
"test-index":
max_bytes: $[[env.THROTTLE_BULK_INDEXING_MAX_BYTES]]
max_requests: $[[env.THROTTLE_BULK_INDEXING_MAX_REQUESTS]]
action: $[[env.THROTTLE_BULK_INDEXING_ACTION]]
retry_delay_in_ms: $[[env.THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS]]
max_retry_times: $[[env.THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES]]
message: "bulk writing too fast" #触发限流告警message自定义
log_warn_message: true
再次压测,test-index 索引写入速度被限制在了 1w docs/s 。
限制多个索引写入速度
上面的配置是针对 test-index 索引的写入速度控制。如果想添加其他的索引,新增一段配置即可。 比如,我允许 abc 索引写入达到 2w docs/s,test-index 索引最多不超过 1w docs/s ,可配置如下。
- name: bulking_indexing_limit
filter:
- bulk_request_throttle:
indices:
"abc":
max_requests: 20000
action: drop
message: "abc doc写入超阈值" #触发限流告警message自定义
log_warn_message: true
"test-index":
max_bytes: $[[env.THROTTLE_BULK_INDEXING_MAX_BYTES]]
max_requests: $[[env.THROTTLE_BULK_INDEXING_MAX_REQUESTS]]
action: $[[env.THROTTLE_BULK_INDEXING_ACTION]]
retry_delay_in_ms: $[[env.THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS]]
max_retry_times: $[[env.THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES]]
message: "bulk writing too fast" #触发限流告警message自定义
log_warn_message: true
限速效果如下
限制读请求速度
我们先看看不做限制的时候,测试环境的读取速度,7w qps 。 接下来我们通过网关把读取速度控制在最大 1w qps 。 继续对网关的配置文件 gateway.yml 做以下修改。
- name: default_flow
filter:
- request_path_limiter:
message: "Hey, You just reached our request limit!" rules:
- pattern: "/(?P<index_name>abc)/_search"
max_qps: 10000
group: index_name
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
再次进行测试,读取速度被限制在了 1w qps 。
限制多个索引读取速度
上面的配置是针对 abc 索引的写入速度控制。如果想添加其他的索引,新增一段配置即可。 比如,我允许 abc 索引读取达到 1w qps,test-index 索引最多不超过 2w qps ,可配置如下。
- name: default_flow
filter:
- request_path_limiter:
message: "Hey, You just reached our request limit!"
rules:
- pattern: "/(?P<index_name>abc)/_search"
max_qps: 10000
group: index_name
- pattern: "/(?P<index_name>test-index)/_search"
max_qps: 20000
group: index_name
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
多个网关限速
限速是每个网关自身的控制,如果有多个网关,那么后端 ES 集群收到的请求数等于多个网关限速的总和。 本次介绍就到这里了。相信大家在使用 ES 的过程中也遇到过各种各样的问题。欢迎大家来我们这个平台分享自己的问题、解决方案等。如有任何问题,请随时联系我,期待与您交流!
开启安全功能 ES 集群就安全了吗?
Easysearch • yangmf2040 发表了文章 • 0 个评论 • 2917 次浏览 • 2023-12-27 10:38
背景
经常跟 ES 打交道的朋友都知道,现在主流的 ES 集群安全方案是:RBAC + TLS for Internal + HTTPS 。
作为终端用户一般只需要关心用户名和密码就行了。作为管理和运维 ES 的人员来说,可能希望 ES 能提供密码策略来强制密码强度和密码使用周期。遗憾的是 ES 对密码强度和密码使用周期没有任何强制要求。如果不注意,可能我们天天都在使用“弱密码”或从不修改的密码(直到无法登录)。而且 ES 对连续的认证失败,不会做任何处理,这让 ES 很容易遭受暴力破解的入侵。
那还有没有别的办法,进一步提高安全呢? 其实,网关可以来帮忙。
虽然网关无法强制提高密码复杂度,但可以提高 ES 集群被暴力破解的难度。
大家都知道,暴力破解--本质就是不停的“猜”你的密码。以现在的 CPU 算力,一秒钟“猜”个几千上万次不过是洒洒水,而且 CPU 监控都不带波动的,很难发现异常。从这里入手,一方面,网关可以延长认证失败的过程--延迟返回结果,让破解不再暴力。另一方面,网关可以记录认证失败的情况,做到实时监控,有条件的告警。一旦出现苗头,可以使用网关阻断该 IP 或用户发来的任何请求。
场景模拟
首先,用网关代理 ES 集群,并在 default_flow 中增加一段 response_status_filter 过滤器配置,对返回码是 401 的请求,跳转到 rate_limit_flow 进行降速,延迟 5 秒返回。
- name: default_flow
filter:
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
- response_status_filter:
exclude:
- 401
action: redirect_flow
flow: rate_limit_flow
- name: rate_limit_flow
filter:
- sleep:
sleep_in_million_seconds: 5000
其次,对于失败的认证,我们可以通过 Console 来做个看板实时分析,展示。
折线图、饼图图、柱状图等,多种展示方式,大家可充分发挥。
最后,可在 Console 的告警中心,配置对应的告警规则,实时监控该类事件,方便及时跟进处置。
效果测试
先带上正确的用户名密码测试,看看返回速度。
0.011 秒返回。再使用错误的密码测试。
整整 5 秒多后,才回返结果。如果要暴力破解,每 5 秒钟甚至更久才尝试一个密码,这还叫暴力吗?
看板示例
此处仅仅是抛砖引玉,欢迎大家发挥想象。
告警示例
建立告警规则,用户 1 分钟内超过 3 次登录失败,就产生告警。
可在告警中心查看详情,也可将告警推送至微信、钉钉、飞书、邮件等。
查看告警详情,是 es 用户触发了告警。
最后,剩下的工作就是对该账号的处置了。如果有需要可以考虑阻止该用户或 IP 的请求,对应的过滤器文档在这里,老规矩加到 default_flow 里就行了。
如果小伙伴有其他办法提升 ES 集群安全,欢迎和我们一起讨论、交流。我们的宗旨是:让搜索更简单!
用极限网关实现ES容灾,简单!
Elasticsearch • yangmf2040 发表了文章 • 0 个评论 • 2021 次浏览 • 2023-07-20 10:33
身为 IT 人士,大伙身边的各种系统肯定不少吧。系统虽多,但最最最重要的那套、那几套,大伙肯定是捧在手心,关怀备至。如此重要的系统,万一发生故障了且短期无法恢复,该如何保障业务持续运行? 有过这方面思考或经验的同学,肯定脱口而出--切灾备啊。 是的,接下来我来介绍下我们的 ES 灾备方案。当然如果你有更好的,请使用各种可用的渠道联系我们。
总体设计
通过极限网关将应用对主集群的写操作,复制到灾备集群。应用发送的读请求则直接转发到主集群,并将响应结果转发给应用。应用对网关无感知,访问方式与访问 ES 集群一样。
方案优势
- 轻量级
极限网关使用 Golang 编写,安装包很小,只有 10MB 左右,没有任何外部环境依赖,部署安装都非常简单,只需要下载对应平台的二进制可执行文件,启动网关程序的二进制程序文件执行即可。
- 跨版本支持
极限网关针对不同的 Elasticsearch 版本做了兼容和针对性处理,能够让业务代码无缝的进行适配,后端 Elasticsearch 集群版本升级能够做到无缝过渡,降低版本升级和数据迁移的复杂度。
- 高可用
极限网关内置多种高可用解决方案,前端请求入口支持基于虚拟 IP 的双机热备,后端集群支持集群拓扑的自动感知,节点上下线能自动发现,自动处理后端故障,自动进行请求的重试和迁移。
- 灵活性
主备集群都是可读可写,切换迅速,只需切换网关到另一套配置即可。回切灵活,恢复使用原配置即可。
架构图
网关程序部署
下载
根据操作系统和平台选择下面相应的安装包: 解压到指定目录:
mkdir gateway
tar -zxf xxx.gz -C gateway
修改网关配置
在此 下载 网关配置,默认网关会加载配置文件 gateway.yml ,如果要指定其他配置文件使用 -config 选项指定。 网关配置文件内容较多,下面展示必要部分。
#primary
PRIMARY_ENDPOINT: http://192.168.56.3:7171
PRIMARY_USERNAME: elastic
PRIMARY_PASSWORD: password
PRIMARY_MAX_QPS_PER_NODE: 10000
PRIMARY_MAX_BYTES_PER_NODE: 104857600 #100MB/s
PRIMARY_MAX_CONNECTION_PER_NODE: 200
PRIMARY_DISCOVERY_ENABLED: false
PRIMARY_DISCOVERY_REFRESH_ENABLED: false
#backup
BACKUP_ENDPOINT: http://192.168.56.3:9200
BACKUP_USERNAME: admin
BACKUP_PASSWORD: admin
BACKUP_MAX_QPS_PER_NODE: 10000
BACKUP_MAX_BYTES_PER_NODE: 104857600 #100MB/s
BACKUP_MAX_CONNECTION_PER_NODE: 200
BACKUP_DISCOVERY_ENABLED: false
BACKUP_DISCOVERY_REFRESH_ENABLED: false
PRIMARY_ENDPOINT:配置主集群地址和端口
PRIMARY_USERNAME、PRIMARY_PASSWORD: 访问主集群的用户信息
BACKUP_ENDPOINT:配置备集群地址和端口
BACKUP_USERNAME、BACKUP_PASSWORD: 访问备集群的用户信息
运行网关
前台运行 直接运行网关程序即可启动极限网关了,如下:
./gateway-linux-amd64
后台运行
./gateway-linux-amd64 -service install
Success
./gateway-linux-amd64 -service start
Success
卸载服务
./gateway-linux-amd64 -service stop
Success
./gateway-linux-amd64 -service uninstall
Success
灾备功能测试
在灾备场景下,为保证数据一致性,对集群的访问操作都通过网关进行。注意只有 bulk API 的操作才会被复制到备集群。 在此次测试中,网关灾备配置功能为:
- 主备集群正常时
读写请求正常执行; 写请求被记录到队列,备集群实时消费队列数据。
- 当主集群故障时
写入请求报错,主备集群都不写入数据; 查询请求转到备集群执行,并返回结果给客户端。
- 当备集群故障时
读写请求都正常执行; 写操作记录到磁盘队列,待备集群恢复后,自动消费队列数据直到两个集群一致。
主备集群正常时写入、查询测试
写入数据
# 通过网关写入数据
curl -X POST "localhost:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "create" : { "_index" : "test", "_id" : "2" } }
{ "field2" : "value2" }
'
查询数据
# 查询主集群
curl 192.168.56.3:7171/test/_search?pretty -uelastic:password
# 查询备集群
curl 192.168.56.3:9200/test/_search?pretty -uadmin:admin
# 查询网关,网关转发给主集群执行
curl 192.168.56.3:18000/test/_search?pretty -uelastic:password
主备集群都已写入数据,且数据一致。通过网关查询,也正常返回。
删除和更新文档
# 通过网关删除和更新文档
curl -X POST "192.168.56.3:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "delete" : { "_index" : "test", "_id" : "1" } }
{ "update" : {"_id" : "2", "_index" : "test"} }
{ "doc" : {"field2" : "value2-updated"} }
'
查询数据
# 查询主集群
curl 192.168.56.3:7171/test/_search?pretty -uelastic:password
# 查询备集群
curl 192.168.56.3:9200/test/_search?pretty -uadmin:admin
两个集群都已执行删除和更新操作,数据一致。
主集群故障时写入、查询测试
为模拟主集群故障,直接关闭主集群。
写入数据
# 通过网关写入数据
curl -X POST "192.168.56.3:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "index" : { "_index" : "test", "_id" : "3" } }
{ "field3" : "value3" }
{ "create" : { "_index" : "test", "_id" : "4" } }
{ "field4" : "value4" }
'
写入数据报错
查询数据
# 通过网关查询,因为主集群不可用,网关将查询转发到备集群执行
curl 192.168.56.3:18000/test/_search?pretty -uelastic:password
正常查询到数据,说明请求被转发到了备集群执行。
备集群故障时写入、查询测试
为模拟备集群故障,直接关闭备集群。
写入数据
# 通过网关写入数据
curl -X POST "192.168.56.3:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "index" : { "_index" : "test", "_id" : "5" } }
{ "field5" : "value5" }
{ "create" : { "_index" : "test", "_id" : "6" } }
{ "field6" : "value6" }
'
数据正常写入。
查询数据
# 通过网关查询
curl 192.168.56.3:18000/test/_search?pretty -uelastic:password
查询成功返回。主集群成功写入了两条新数据。同时此数据会被记录到备集群的队列中,待备集群恢复后,会消费此队列追数据。
恢复备集群
启动备集群。
查询数据
等待片刻或通过 INFINI Console 确定网关队列消费完毕后,查询备集群的数据。 (生产和消费 offset 相同,说明消费完毕。)
# 查询备集群的数据
curl 192.168.56.3:9200/test/_search?pretty -uadmin:admin
备集群启动后自动消费队列数据,消费完后备集群数据达到与主集群数据一致。
灾备切换
测试了这么多,终于到切换的时刻了。切换前我们判断下主系统是否短期无法修复。
如果我们判断主用系统无法短时间恢复,要执行切换。非常简单,我们直接将配置文件中定义的主备集群互换,然后重启网关程序就行了。但我们推荐在相同主机上另部署一套网关程序--网关B,先前那套用网关A指代。网关B中的配置文件把原备集群定义为主集群,原主集群定义为备集群。若要执行切换,我们先停止网关A,然后启动网关B,此时应用连接到网关(端口不变),就把原备系统当作主系统使用,把原主系统当作备系统,也就完成了主备系统的切换。
灾备回切
当原主集群修复后,正常启动,就会从消费队列追写修复期间产生数据直到主备数据一致,同样我们可通过 INFINI Console 查看消费的进度。如果大家还是担心数据的一致性,INFINI Console 还能帮大家做[校验数据]()任务,做到数据完全一致后(文档数量及文档内容一致),才进行回切。
回切也非常简单,停止网关B,启动网关A即可。
网关高可用
网关自带浮动 IP 模块,可进行双机热备。客户端通过 VIP 连接网关,网关出现故障时,VIP 漂移到备网关。 视频教程戳这里。
这样的优点是简单,不足是只有一个网关在线提供服务。如果想多个网关在线提供服务,则需搭配分布式消息系统一起工作,架构如下。
前端通过负载均衡将流量分散到多个在线网关,网关将消息存入分布式消息系统。此时,网关可看作无状态应用,可根据需要扩缩规模。
以上就是我介绍的ES灾备方案,是不是相当灵活了。有问题还是那句话 Call me 。
关于极限网关
INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。
在 Kibana 里统一访问来自不同集群的索引
资料分享 • medcl 发表了文章 • 0 个评论 • 1965 次浏览 • 2022-04-21 15:29
在 Kibana 里统一访问来自不同集群的索引
现在有这么一个需求,客户根据需要将数据按照业务维度划分,将索引分别存放在了不同的三个集群, 将一个大集群拆分成多个小集群有很多好处,比如降低了耦合,带来了集群可用性和稳定性方面的好处,也避免了单个业务的热点访问造成其他业务的影响, 尽管拆分集群是很常见的玩法,但是管理起来不是那么方便了,尤其是在查询的时候,可能要分别访问三套集群各自的 API,甚至要切换三套不同的 Kibana 来访问集群的数据, 那么有没有办法将他们无缝的联合在一起呢?
极限网关!
答案自然是有的,通过将 Kibana 访问 Elasticsearch 的地址切换为极限网关的地址,我们可以将请求按照索引来进行智能的路由, 也就是当访问不同的业务索引时会智能的路由到不同的集群,如下图:
上图,我们分别有 3 个不同的索引:
- apm-*
- erp-*
- mall-*
分别对应不同的三套 Elasticsearch 集群:
- ES1-APM
- ES2-ERP
- ES3-MALL
接下来我们来看如何在极限网关里面进行相应的配置来满足这个业务需求。
配置集群信息
首先配置 3 个集群的连接信息。
elasticsearch:
- name: es1-apm
enabled: true
endpoints:
- http://192.168.3.188:9206
- name: es2-erp
enabled: true
endpoints:
- http://192.168.3.188:9207
- name: es3-mall
enabled: true
endpoints:
- http://192.168.3.188:9208
配置服务 Flow
然后,我们定义 3 个 Flow,分别对应用来访问 3 个不同的 Elasticsearch 集群,如下:
flow:
- name: es1-flow
filter:
- elasticsearch:
elasticsearch: es1-apm
- name: es2-flow
filter:
- elasticsearch:
elasticsearch: es2-erp
- name: es3-flow
filter:
- elasticsearch:
elasticsearch: es3-mall
然后再定义一个 flow 用来进行路径的判断和转发,如下:
- name: default-flow
filter:
- switch:
remove_prefix: false
path_rules:
- prefix: apm-
flow: es1-flow
- prefix: erp-
flow: es2-flow
- prefix: mall-
flow: es3-flow
- flow: #default flow
flows:
- es1-flow
根据请求路径里面的索引前缀来匹配不同的索引,并转发到不同的 Flow。
配置路由信息
接下来,我们定义路由信息,具体配置如下:
router:
- name: my_router
default_flow: default-flow
指向上面定义的默认 flow 来统一请求的处理。
定义服务及关联路由
最后,我们定义一个监听为 8000 端口的服务,用来提供给 Kibana 来进行统一的入口访问,如下:
entry:
- name: es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
完整配置
最后的完整配置如下:
path.data: data
path.logs: log
entry:
- name: es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
flow:
- name: default-flow
filter:
- switch:
remove_prefix: false
path_rules:
- prefix: apm-
flow: es1-flow
- prefix: erp-
flow: es2-flow
- prefix: mall-
flow: es3-flow
- flow: #default flow
flows:
- es1-flow
- name: es1-flow
filter:
- elasticsearch:
elasticsearch: es1-apm
- name: es2-flow
filter:
- elasticsearch:
elasticsearch: es2-erp
- name: es3-flow
filter:
- elasticsearch:
elasticsearch: es3-mall
router:
- name: my_router
default_flow: default-flow
elasticsearch:
- name: es1-apm
enabled: true
endpoints:
- http://192.168.3.188:9206
- name: es2-erp
enabled: true
endpoints:
- http://192.168.3.188:9207
- name: es3-mall
enabled: true
endpoints:
- http://192.168.3.188:9208
启动网关
直接启动网关,如下:
➜ gateway git:(master) ✗ ./bin/gateway -config sample-configs/elasticsearch-route-by-index.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2022-04-20 08:23:56, 2023-12-31 10:10:10, 51650a5c3d6aaa436f3c8a8828ea74894c3524b9
[04-21 13:41:21] [INF] [app.go:174] initializing gateway.
[04-21 13:41:21] [INF] [app.go:175] using config: /Users/medcl/go/src/infini.sh/gateway/sample-configs/elasticsearch-route-by-index.yml.
[04-21 13:41:21] [INF] [instance.go:72] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/c9bpg0ai4h931o4ngs3g
[04-21 13:41:21] [INF] [app.go:283] gateway is up and running now.
[04-21 13:41:21] [INF] [api.go:262] api listen at: http://0.0.0.0:2900
[04-21 13:41:21] [INF] [reverseproxy.go:255] elasticsearch [es1-apm] hosts: [] => [192.168.3.188:9206]
[04-21 13:41:21] [INF] [reverseproxy.go:255] elasticsearch [es2-erp] hosts: [] => [192.168.3.188:9207]
[04-21 13:41:21] [INF] [reverseproxy.go:255] elasticsearch [es3-mall] hosts: [] => [192.168.3.188:9208]
[04-21 13:41:21] [INF] [actions.go:349] elasticsearch [es2-erp] is available
[04-21 13:41:21] [INF] [actions.go:349] elasticsearch [es1-apm] is available
[04-21 13:41:21] [INF] [entry.go:312] entry [es_entry] listen at: http://0.0.0.0:8000
[04-21 13:41:21] [INF] [module.go:116] all modules are started
[04-21 13:41:21] [INF] [actions.go:349] elasticsearch [es3-mall] is available
[04-21 13:41:55] [INF] [reverseproxy.go:255] elasticsearch [es1-apm] hosts: [] => [192.168.3.188:9206]
网关启动成功之后,就可以通过网关的 IP+8000 端口来访问目标 Elasticsearch 集群了。
测试访问
首先通过 API 来访问测试一下,如下:
➜ ~ curl http://localhost:8000/apm-2022/_search -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8000 (#0)
> GET /apm-2022/_search HTTP/1.1
> Host: localhost:8000
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 21 Apr 2022 05:45:44 GMT
< content-type: application/json; charset=UTF-8
< Content-Length: 162
< X-elastic-product: Elasticsearch
< X-Backend-Cluster: es1-apm
< X-Backend-Server: 192.168.3.188:9206
< X-Filters: filters->elasticsearch
<
* Connection #0 to host localhost left intact
{"took":142,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}%
可以看到 apm-2022 指向了后端的 es1-apm
集群。
继续测试,erp 索引的访问,如下:
➜ ~ curl http://localhost:8000/erp-2022/_search -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8000 (#0)
> GET /erp-2022/_search HTTP/1.1
> Host: localhost:8000
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 21 Apr 2022 06:24:46 GMT
< content-type: application/json; charset=UTF-8
< Content-Length: 161
< X-Backend-Cluster: es2-erp
< X-Backend-Server: 192.168.3.188:9207
< X-Filters: filters->switch->filters->elasticsearch->skipped
<
* Connection #0 to host localhost left intact
{"took":12,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}%
继续测试,mall 索引的访问,如下:
➜ ~ curl http://localhost:8000/mall-2022/_search -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8000 (#0)
> GET /mall-2022/_search HTTP/1.1
> Host: localhost:8000
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 21 Apr 2022 06:25:08 GMT
< content-type: application/json; charset=UTF-8
< Content-Length: 134
< X-Backend-Cluster: es3-mall
< X-Backend-Server: 192.168.3.188:9208
< X-Filters: filters->switch->filters->elasticsearch->skipped
<
* Connection #0 to host localhost left intact
{"took":8,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}%
完美转发。
修改 Kibana 配置
修改 Kibana 的配置文件: kibana.yml
,替换 Elasticsearch 的地址为网关地址(http://192.168.3.200:8000
),如下:
elasticsearch.hosts: ["http://192.168.3.200:8000"]
重启 Kibana 让配置生效。
效果如下
可以看到,在一个 Kibana 的开发者工具里面,我们已经可以像操作一个集群一样来同时读写实际上来自三个不同集群的索引数据了。
展望
通过极限网关,我们还可以非常灵活的进行在线请求的流量编辑,动态组合不同集群的操作。
在极限网关里面使用 JavaScript 脚本来进行复杂的查询改写
资料分享 • medcl 发表了文章 • 1 个评论 • 1192 次浏览 • 2022-04-19 11:55
使用 JavaScript 脚本来进行复杂的查询改写
有这么一个需求:
网关里怎样对跨集群搜索进行支持的呢?我想实现: 输入的搜索请求是 lp:9200/index1/_search 这个索引在3个集群上,需要跨集群检索,也就是网关能否改成 lp:9200/cluster01:index1,cluster02,index1,cluster03:index1/_search 呢? 索引有一百多个,名称不一定是 app, 还可能多个索引一起的。
极限网关自带的过滤器 content_regex_replace
虽然可以实现字符正则替换,但是这个需求是带参数的变量替换,稍微复杂一点,没有办法直接用这个正则替换实现,有什么其他办法实现么?
使用脚本过滤器
当然有的,上面的这个需求,理论上我们只需要将其中的索引 index1
匹配之后,替换为 cluster01:index1,cluster02,index1,cluster03:index1
就行了。
答案就是使用自定义脚本来做,再复杂的业务逻辑都不是问题,都能通过自定义脚本来实现,一行脚本不行,那就两行。
使用极限网关提供的 JavaScript 过滤器可以很灵活的实现这个功能,具体继续看。
定义过滤器
首先创建一个脚本文件,放在网关数据目录的 scripts
子目录下面,如下:
➜ gateway ✗ tree data
data
└── gateway
└── nodes
└── c9bpg0ai4h931o4ngs3g
├── kvdb
├── queue
├── scripts
│ └── index_path_rewrite.js
└── stats
这个脚本的内容如下:
function process(context) {
var originalPath = context.Get("_ctx.request.path");
var matches = originalPath.match(/\/?(.*?)\/_search/)
var indexNames = [];
if(matches && matches.length > 1) {
indexNames = matches[1].split(",")
}
var resultNames = []
var clusterNames = ["cluster01", "cluster02"]
if(indexNames.length > 0) {
for(var i=0; i<indexNames.length; i++){
if(indexNames[i].length > 0) {
for(var j=0; j<clusterNames.length; j++){
resultNames.push(clusterNames[j]+":"+indexNames[i])
}
}
}
}
if (resultNames.length>0){
var newPath="/"+resultNames.join(",")+"/_search";
context.Put("_ctx.request.path",newPath);
}
}
和普通的 JavaScript 一样,定义一个特定的函数 process
来处理请求里面的上下文信息,_ctx.request.path
是网关内置上下文的一个变量,用来获取请求的路径,通过 context.Get("_ctx.request.path")
在脚本里面进行访问。
中间我们使用了 JavaScript 的正则匹配和字符处理,做了一些字符拼接,得到新的路径 newPath
变量,最后使用 context.Put("_ctx.request.path",newPath)
更新网关请求的路径信息,从而实现查询条件里面的参数替换。
有关网关内置上下文的变量列表,请访问 Request Context
接下来,创建一个网关配置,并使用 javascript
过滤器调用该脚本,如下:
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
flow:
- name: default_flow
filter:
- dump:
context:
- _ctx.request.path
- javascript:
file: index_path_rewrite.js
- dump:
context:
- _ctx.request.path
- elasticsearch:
elasticsearch: dev
router:
- name: my_router
default_flow: default_flow
elasticsearch:
- name: dev
enabled: true
schema: http
hosts:
- 192.168.3.188:9206
上面的例子中,使用了一个 javascript
过滤器,并且指定了加载的脚本文件为 index_path_rewrite.js
,并使用了两个 dump
过滤器来输出脚本运行前后的路径信息,最后再使用一个 elasticsearch
过滤器来转发请求给 Elasticsearch 进行查询。
我们启动网关测试一下,如下:
➜ gateway ✗ ./bin/gateway
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2022-04-18 07:11:09, 2023-12-31 10:10:10, 8062c4bc6e57a3fefcce71c0628d2d4141e46953
[04-19 11:41:29] [INF] [app.go:174] initializing gateway.
[04-19 11:41:29] [INF] [app.go:175] using config: /Users/medcl/go/src/infini.sh/gateway/gateway.yml.
[04-19 11:41:29] [INF] [instance.go:72] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/c9bpg0ai4h931o4ngs3g
[04-19 11:41:29] [INF] [app.go:283] gateway is up and running now.
[04-19 11:41:30] [INF] [api.go:262] api listen at: http://0.0.0.0:2900
[04-19 11:41:30] [INF] [entry.go:312] entry [my_es_entry] listen at: http://0.0.0.0:8000
[04-19 11:41:30] [INF] [module.go:116] all modules are started
[04-19 11:41:30] [INF] [actions.go:349] elasticsearch [dev] is available
运行下面的查询来验证查询结果,如下:
curl localhost:8000/abc,efg/_search
可以看到网关通过 dump
过滤器输出的调试信息:
---- DUMPING CONTEXT ----
_ctx.request.path : /abc,efg/_search
---- DUMPING CONTEXT ----
_ctx.request.path : /cluster01:abc,cluster02:abc,cluster01:efg,cluster02:efg/_search
查询条件按照我们的需求进行了改写,Nice!
重写 DSL 查询语句
好吧,我们刚刚只是修改了查询的索引而已,那么查询请求的 DSL 呢?行不行?
那自然是可以的嘛,瞧下面的例子:
function process(context) {
var originalDSL = context.Get("_ctx.request.body");
if (originalDSL.length >0){
var jsonObj=JSON.parse(originalDSL);
jsonObj.size=123;
jsonObj.aggs= {
"test1": {
"terms": {
"field": "abc",
"size": 10
}
}
}
context.Put("_ctx.request.body",JSON.stringify(jsonObj));
}
}
先是获取查询请求,然后转换成 JSON 对象,之后任意修改查询对象就行了,保存回去,搞掂。
测试一下:
curl -XPOST localhost:8000/abc,efg/_search -d'{"query":{}}'
输出:
---- DUMPING CONTEXT ----
_ctx.request.path : /abc,efg/_search
_ctx.request.body : {"query":{}}
[04-19 18:14:24] [INF] [reverseproxy.go:255] elasticsearch [dev] hosts: [] => [192.168.3.188:9206]
---- DUMPING CONTEXT ----
_ctx.request.path : /abc,efg/_search
_ctx.request.body : {"query":{},"size":123,"aggs":{"test1":{"terms":{"field":"abc","size":10}}}}
是不是感觉解锁了新的世界?
结论
通过使用 Javascript 脚本过滤器,我们可以非常灵活的进行复杂逻辑的操作来满足我们的业务需求。
极限网关初探(2)配置
Elasticsearch • xushuhui 发表了文章 • 0 个评论 • 1802 次浏览 • 2022-04-06 17:03
配置
上一篇我们先学习了极限网关的安装和启动,今天学习配置。
读写分离
现在我们遇到读写分离的需求,用网关该怎么做呢? 假设服务端现在从 http://127.0.0.1:8000 写入数据,从 http://127.0.0.1:9000 读取数据,怎么设计呢?
首先查看文档配置文档
我们在 gateway.yml 中定义两个 entry,分别绑定不同的端口,配置不同的 router
entry:
- name: write_es
enabled: true
router: write_router
network:
binding: 0.0.0.0:8000
- name: read_es
enabled: true
router: read_router
network:
binding: 0.0.0.0:9000
router:
- name: write_router
default_flow: default_flow
tracing_flow: logging
- name: read_router
default_flow: default_flow
tracing_flow: logging
为了演示效果,只配置一个 Elasticsearch
elasticsearch:
- name: dev
enabled: true
schema: http
hosts:
- 192.168.3.188:9206
启动项目
我们从 http://127.0.0.1:8000 写入一条数据,再从 http://127.0.0.1:9000 读取该条数据
添加接口
返回字符串
我们想自定义添加一个接口,怎么在不写代码的情况下通过配置实现返回字符串
flow:
- name: hello_flow
filter:
- echo:
message: "hello flow"
router:
- name: read_router
default_flow: hello_flow
修改配置后启动
返回 json 数据
返回字符串不符合标准的 restful 接口规范,怎么返回给调用方标准 json 数据?
filter:
- set_response:
content_type: application/json
body: '{"message":"hello world"}'
修改配置后启动
修改路由
我们已经新加了接口,返回 json 数据,但是接口是直接定义在 http://127.0.0.1:9000 中,之前网关的接口就无法使用,所以我们需要单独为自定义的接口指定单独的路由
router:
- name: read_router
default_flow: default_flow
tracing_flow: logging
rules:
- method:
- GET
pattern:
- "/hello"
flow:
- hello_flow
default_flow: 默认的处理流,也就是业务处理的主流程,请求转发、过滤、缓存等操作都在这里面进行
tracing_flow:用于追踪请求状态的流,用于记录请求日志、统计等
如果我们有过开发经验,了解 MVC 模式,flow 就类似 MVC 中的 Controller,rules 中类似路由规则,当请求匹配到配置中的路由规则时,由配置的 flow 处理业务逻辑。
数据整体流向,从服务端发到网关,网关为每个 Elasticsearch 绑定不同的 IP 地址,每个 Elasticsearch 都有唯一一个 router 和它对应,根据请求的 method 和 path 匹配到 router 中的一个 flow,flow 中包含多个 filter 处理对数据进行流式处理。
如下图所示
流式处理是什么,假设水从一个管子里面流出来,管子旁边每一段依次站了几个人,第一个人往水里放点鱼,鱼和水到了第二个人,第二个人往水里放点草,鱼、水和草到了第三人等等,每个人对水做一定的操作,水经过这些操作后最后到达水池里。
我们可以把数据当成水,filter 是管子旁边的人,水池就是 Elasticsearch
总结
在学习了router/flow/filter后,我们已经对极限网关的配置有了初步的了解,后续开发的时候直接查阅文档。
极限网关初探(1) 安装启动
Elasticsearch • xushuhui 发表了文章 • 0 个评论 • 1806 次浏览 • 2022-04-06 16:54
产品介绍
极限网关(INFINI Gateway)是一个面向 Elasticsearch 的高性能应用网关。特性丰富,使用简单。
它和其他业务型网关最大的区别是业务网关把请求转发给各个底层微服务,而它把请求转发给 Elasticsearch,更多是类似 Mycat 的中间件的作用。
没有使用网关之前,服务端请求多个节点
使用网关后
下载地址
打开 下载地址,根据操作系统版本选择。
Windows 安装和启动
安装
下载 gateway-1.6.0_SNAPSHOT-597-windows-amd64.zip,解压如下。 gateway-windows-amd64.exe 是启动文件,gateway.yml 是默认配置文件。
启动失败
当 gateway.yml 的 elasticsearch 选项中的 hosts 不能正常响应请求的时候,启动界面如下。
为什么 elasticsearch 不能访问的时候,网关还要继续提供服务呢,为什么不像业务接口启动时在基础业务组件如 MySQL/Redis 不能正常响应就直接 panic?
一方面网关作为 elasticsearch 抵挡流量冲击的城墙,在 elasticsearch 不能提供服务的时候,对之前成功的请求缓存结果,继续提供有限度的服务,为 elasticsearch 修复后上线争取时间。
另一方面业务接口和基础组件是强耦合关系,没有基础组件就完全无法对外提供数据读写服务,而网关与 elasticsearch 是松耦合关系,网关在没有 elasticsearch 的情况下也能对外提供有限度的服务。
在 gateway.yml 的 elasticsearch 选项中的 hosts 改成能够正常响应的 elasticsearch 请求地址。
启动成功
双击 gateway-windows-amd64.exe 文件,启动成功界面如下
访问
API 访问
由启动后终端显示可知,网关的 API 接口地址是 http://localhost:2900
[api.go:262] api listen at: http://0.0.0.0:2900
打开浏览器输入 http://localhost:2900,显示所有可以对外提供的 API 接口
我们选择其中一个,在浏览器中输入 http://localhost:2900/_framework/api/_version 从路由上看该接口是查询产品的版本信息,显示如下
gateway.yml 中可以看到有被注释掉的一段配置,看起来应该是配置 api 地址的地方。
#api:
# enabled: true
# network:
# binding: 127.0.0.1:2900
把注释去掉后尝试把端口改成 2901。
api:
enabled: true
network:
binding: 127.0.0.1:2901
改完后启动 打开浏览器先输入 http://localhost:2900,无法正常响应请求,再输入 http://localhost:2901,可以正常响应,界面和修改配置前访问 http://localhost:2900 的界面一样,说明 API 请求地址成功修改
Elasticsearch 访问
启动日志中显示监听 8000 端口,猜测应该是 elasticsearch 请求地址,打开浏览器输入 http://127.0.0.1:8000/
entry [my_es_entry] listen at: http://0.0.0.0:8000
gateway.yml 中可以看到 my_es_entry 的 network 绑定 8000 端口,显而易见的这部分就是配置代理转发给 elasticsearch 的地址,所以安装后只需要把以前请求 elasticsearch 的地址修改为该地址。
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
总结
我们成功安装和启动极限网关,接下来我们学习怎么根据需求修改配置。
兼容不同版本的查询响应结果的 Count 结构
Elasticsearch • medcl 发表了文章 • 0 个评论 • 3 次浏览 • 2022-02-21 16:08
使用极限网关来代理 Kibana
Kibana • medcl 发表了文章 • 0 个评论 • 1825 次浏览 • 2022-02-15 20:29
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
skip_occupied_port: true
tls:
enabled: true
flow:
- name: default_flow
filter:
- basic_auth:
valid_users:
medcl: passwd
- http:
schema: "http" #https or http
host: "192.168.3.98:5601"
router:
- name: my_router
default_flow: default_flow
修改里面的 ip 为你实际的 Kibana IP 地址,如需多个,使用 hosts 参数,详见文档手册:
https://极限网关.com/docs/references/filters/http/
启动网关,因为自动开启了 https,访问网关的服务地址:https://localhost:8000,如下:
输入配置的用户名密码,即可。
简直无语,升级到 Elasticsearch 8.0 之后,我的数据居然写不进去了
Elasticsearch • medcl 发表了文章 • 5 个评论 • 7470 次浏览 • 2022-02-11 19:45
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Action/metadata line [1] contains an unknown parameter [_type]"
}
],
"type": "illegal_argument_exception",
"reason": "Action/metadata line [1] contains an unknown parameter [_type]"
},
"status": 400
}
熟悉的 type,不一样的问题,大爷的,还是大意了。
type 这厮在 8.0 里面已经被正式干掉了,而我还傻傻的等在原地。
怎么办,我几万个 agent、上千个 Filebeat 和 Logstash 大军总不能现在就升级吧,还不知道有没有幺蛾子在前方等待我,心里有点方。
恩,这个时候我想到了鲁迅说的,用 Elasitcsearch 之前先看看极限网关(文档地址:http://极限网关.com) ,所以我立马就去南山寺问了问极限网关,他说可以, O 了。
首先,从这里下载最新的极限网关 523 版本:http://release.elasticsearch.cn/gateway/snapshot/
然后,解压,可以看到立马有一个配置,佛祖显灵,居然有一个配置文件`sample-configs/v8-bulk-indexing-compatibility.yml` 在闪光,就是它:
恩,看来是这个 filter 可以帮我去掉了当前 whatever 版本生成的多余的不要的 type,修改配置我就试试看。
启动启动启动,如下:
没有报错!yes。
查看索引的统计指标:
O 了,数据都正常写入了,采集端啥都不用动,就能用上新版的 Elasticsearch,真是快哉。晚上自己加个卤蛋。
以上情节,纯属虚构,如有雷同,哪又咋地。
使用极限网关来处置 Elasticsearch 的 Apache Log4j 漏洞
Elasticsearch • medcl 发表了文章 • 2 个评论 • 5220 次浏览 • 2021-12-11 03:57
昨日爆出的 Log4j 安全漏洞,业界一片哗然,今天给大家介绍一下,如何使用极限网关来快速处置 Elasticsearch 的 Apache Log4j 漏洞。
【CVE 地址】
https://github.com/advisories/GHSA-jfh8-c2jp-5v3q
【漏洞描述】
Apache Log4j 是一款非常流行的开源的用于 Java 运行环境的日志记录工具包,大量的 Java 框架包括 Elasticsearch 的最新版本都使用了该组件,故影响范围非常之大。
近日, 随着 Apache Log4j 的远程代码执行最新漏洞细节被公开,攻击者可通过构造恶意请求利用该漏洞实现在目标服务器上执行任意代码。可导致服务器被黑客控制,从而进行页面篡改、数据窃取、挖矿、勒索等行为。建议使用该组件的用户第一时间启动应急响应进行修复。
简单总结一下就是,在使用 Log4j 打印输出的日志中,如果发现日志内容中包含关键词 ${
,那么这个里面包含的内容会当做变量来进行替换和执行,导致攻击者可以通过恶意构造日志内容来让 Java 进程来执行任意命令,达到攻击的效果。
【漏洞等级】:非常紧急
此次漏洞是用于 Log4j2 提供的 lookup 功能造成的,该功能允许开发者通过一些协议去读取相应环境中的配置。但在实现的过程中,并未对输入进行严格的判断,从而造成漏洞的发生。
【影响范围】:Java 类产品:Apache Log4j 2.x < 2.15.0-rc2,Elasticsearch 当前所有版本。
【攻击检测】
可以通过检查日志中是否存在 jndi:ldap://
、jndi:rmi
等字符来发现可能的攻击行为。
处理办法
最简单的办法是通过修改 config/jvm.options
,新增以下参数,重启集群所有节点即可。
-Dlog4j2.formatMsgNoLookups=true
不过,如果集群规模较大,数据较多,业务不能中断,不能通过修改 Elasticsearch 配置、或者替换 Log4j 的最新 jar 包来重启集群的情况,可以考虑使用极限网关来进行拦截或者参数替换甚至是直接阻断请求。
通过在网关层对发往 Elasticsearch 的请求统一进行参数检测,将包含的敏感关键词 ${
进行替换或者直接拒绝,
可以防止带攻击的请求到达 Elasticsearch 服务端而被 Log4j 打印相关日志的时候执行恶意攻击命令,从而避免被攻击。
极限网关是透明代理,只需要在应用端,将以往配置指向 Elasticsearch 的地址替换为现在网关的地址即可,其他都不用动。
参考配置
下载最新的 1.5.0-SNAPSHOT
版本http://release.elasticsearch.cn/gateway/snapshot/
使用极限网关的 context_filter
过滤器,对请求上下文 _ctx.request.to_string
进行关键字检测,过滤掉恶意流量,从而阻断攻击。
新增一个配置文件 gateway.yml
path.data: data
path.logs: log
entry:
- name: es_entrypoint
enabled: true
router: default
max_concurrency: 20000
network:
binding: 0.0.0.0:8000
router:
- name: default
default_flow: main_flow
flow:
- name: main_flow
filter:
- context_filter:
context: _ctx.request.to_string
action: redirect_flow
status: 403
flow: log4j_matched_flow
must_not: # any match will be filtered
regex:
- \$\{.*?\}
- "%24%7B.*?%7D" #urlencode
contain:
- "jndi:"
- "jndi:ldap:"
- "jndi:rmi:"
- "jndi%3A" #urlencode
- "jndi%3Aldap%3A" #urlencode
- "jndi%3Armi%3A" #urlencode
- elasticsearch:
elasticsearch: es-server
- name: log4j_matched_flow
filter:
- echo:
message: 'Apache Log4j 2, Boom!'
elasticsearch:
- name: es-server
enabled: true
endpoints:
- http://localhost:9200
启动网关:
➜ ./bin/gateway -config /tmp/gateway.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2021-12-10 23:55:34, 2212aff
[12-11 01:55:49] [INF] [app.go:250] initializing gateway.
[12-11 01:55:49] [INF] [instance.go:26] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/0
[12-11 01:55:49] [INF] [api.go:261] api listen at: http://0.0.0.0:2900
[12-11 01:55:49] [INF] [reverseproxy.go:253] elasticsearch [es-server] hosts: [] => [localhost:9200]
[12-11 01:55:49] [INF] [entry.go:296] entry [es_entrypoint] listen at: http://0.0.0.0:8000
[12-11 01:55:49] [INF] [module.go:116] all modules started
[12-11 01:55:49] [INF] [app.go:357] gateway is running now.
[12-11 01:55:49] [INF] [actions.go:236] elasticsearch [es-server] is available
将要使用的测试命令 ${java:os}
使用 urlencode 转码为 %24%7Bjava%3Aos%7D
,构造查询语句,分别测试。
不走网关:
~% curl 'http://localhost:9200/index1/_search?q=%24%7Bjava%3Aos%7D'
{"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"index1","index_uuid":"_na_","index":"index1"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"index1","index_uuid":"_na_","index":"index1"},"status":404}%
查看 Elasticsearch 端日志为:
[2021-12-11T01:49:50,303][DEBUG][r.suppressed ] path: /index1/_search, params: {q=Mac OS X 10.13.4 unknown, architecture: x86_64-64, index=index1}
org.elasticsearch.index.IndexNotFoundException: no such index
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.infe(IndexNameExpressionResolver.java:678) ~[elasticsearch-5.6.15.jar:5.6.15]
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.innerResolve(IndexNameExpressionResolver.java:632) ~[elasticsearch-5.6.15.jar:5.6.15]
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.resolve(IndexNameExpressionResolver.java:580) ~[elasticsearch-5.6.15.jar:5.6.15]
可以看到查询条件里面的 q=${java:os}
被执行了,变成了 q=Mac OS X 10.13.4 unknown, architecture: x86_64-64, index=index1
,说明变量被解析且执行,存在漏洞利用的风险。
那走网关之后呢:
~% curl 'http://localhost:8000/index1/_search?q=%24%7Bjava%3Aos%7D'
Apache Log4j 2, Boom!%
可以看到请求被过滤掉了,返回了自定义的信息。
还有一些其他测试命令,大家也可以试试:
#{java:vm}
~% curl 'http://localhost:9200/index/_search?q=%24%7Bjava%3Avm%7D'
[2021-12-11T02:36:04,764][DEBUG][r.suppressed ] [INFINI-2.local] path: /index/_search, params: {q=OpenJDK 64-Bit Server VM (build 25.72-b15, mixed mode), index=index}
~% curl 'http://localhost:8000/index/_search?q=%24%7Bjava%3Avm%7D'
Apache Log4j 2, Boom!%
#{jndi:rmi://localhost:1099/api}
~% curl 'http://localhost:9200/index/_search?q=%24%7Bjndi%3Armi%3A%2F%2Flocalhost%3A1099%2Fapi%7D'
2021-12-11 03:35:06,493 elasticsearch[YOmFJsW][search][T#3] ERROR An exception occurred processing Appender console java.lang.SecurityException: attempt to add a Permission to a readonly Permissions object
~% curl 'http://localhost:8000/index/_search?q=%24%7Bjndi%3Armi%3A%2F%2Flocalhost%3A1099%2Fapi%7D'
Apache Log4j 2, Boom!%
另外不同版本的 Elasticsearch 对于攻击的复现程度参差不齐,因为 es 不同版本是否有 Java Security Manager 、不同版本 JDK 、以及默认配置也不相同,新一点的 es 其实同样可以触发恶意请求,只不过网络调用被默认的网络策略给拒绝了,相对安全,当然如果设置不当同样存在风险,见过很多用户一上来就关默认安全配置的,甚至还放开很多暂时用不上的权限,另外未知的攻击方式也一定有,比如大量日志产生的系统调用可能会拖垮机器造成服务不可用,所以要么还是尽快改配置换 log4j 包重启集群,或者走网关来过滤阻断请求吧。
使用极限网关处置类似安全事件的好处是,Elasticsearch 服务器不用做任何变动,尤其是大规模集群的场景,可以节省大量的工作,提升效率,非常灵活,缩短安全处置的时间,降低企业风险。
求教极限网关的endpoins如何配置多个es节点的ip
Elasticsearch • zqc0512 回复了问题 • 2 人关注 • 1 个回复 • 2743 次浏览 • 2021-11-25 11:27
极限网关入门视频教程已发布
Elasticsearch • medcl 发表了文章 • 4 个评论 • 1687 次浏览 • 2021-11-21 11:41
使用极限网关来进行 Elasticsearch 跨集群跨版本查询及所有其它请求
Elasticsearch • medcl 发表了文章 • 7 个评论 • 3720 次浏览 • 2021-10-16 11:31
使用场景
如果你的业务需要用到有多个集群,并且版本还不一样,是不是管理起来很麻烦,如果能够通过一个 API 来进行查询就方便了,聪明的你相信已经想到了 CCS,没错用 CCS 可以实现跨集群的查询,不过 Elasticsearch 提供的 CCS 对版本有一点的限制,并且需要提前做好 mTLS,也就是需要提前配置好两个集群之间的证书互信,这个免不了要重启维护,好像有点麻烦,那么问题来咯,有没有更好的方案呢?
😁 有办法,今天我就给大家介绍一个基于极限网关的方案,极限网关的网址:http://极限网关.com/。
假设现在有两个集群,一个集群是 v2.4.6,有不少业务数据,舍不得删,里面有很多好东西 :)还有一个集群是 v7.14.0,版本还算比较新,业务正在做的一个新的试点,没什么好东西,但是也得用 :(,现在老板们的的需求是希望通过在一个统一的接口就能访问这些数据,程序员懒得很,懂得都懂。
集群信息
- v2.4.6 集群的访问入口地址:192.168.3.188:9202
- v7.14.0 集群的访问入口地址:192.168.3.188:9206
这两个集群都是 http 协议的。
实现步骤
今天用到的是极限网关的 switch 过滤器:https://极限网关.com/docs/references/filters/switch/
网关下载下来就两个文件,一个主程序,一个配置文件,记得下载对应操作系统的包。
定义两个集群资源
elasticsearch:
- name: v2
enabled: true
endpoint: http://192.168.3.188:9202
- name: v7
enabled: true
endpoint: http://192.168.3.188:9206
上面定义了两个集群,分别命名为 v2
和 v7
,待会会用到这些资源。
定义一个服务入口
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 1000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
这里定义了一个名为 my_es_entry
的资源入口,并引用了一个名为 my_router
的请求转发路由,同时绑定了网卡的 0.0.0.0:8000
也就是所有本地网卡监听 IP 的 8000
端口,访问任意 IP 的 8000
端口就能访问到这个网关了。
另外老板也说了,Elasticsearch 用 HTTP 协议简直就是裸奔,通过这里开启 tls
,可以让网关对外提供的是 HTTPS 协议,这样用户连接的 Elasticsearch 服务就自带 buffer 了,后端的 es 集群和网关直接可以做好网络流量隔离,集群不用动,简直完美。
为什么定义 TLS 不用指定证书,好用的软件不需要这么麻烦,就这样,不解释。
最后,通过设置 max_concurrency
为 1000,限制下并发数,避免野猴子把我们的后端的 Elasticsearch 给压挂了。
定义一个请求路由
router:
- name: my_router
default_flow: cross-cluster-search
这里的名称 my_router
就是表示上面的服务入口的router
参数指定的值。
另外设置一个 default_flow
来将所有的请求都转发给一个名为 cross-cluster-search
的请求处理流程,还没定义,别急,马上。
定义请求处理流程
来啦,来啦,先定义两个 flow,如下,分别名为 v2-flow
和 v7-flow
,每节配置的 filter
定义了一系列过滤器,用来对请求进行处理,这里就用了一个 elasticsearch
过滤器,也就是转发请求给指定的 Elasticsearch 后端服务器,了否?
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
然后,在定义额外一个名为 cross-cluster-search
的 flow,如下:
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
这个 flow 就是通过请求的路径的前缀来进行路由的过滤器,如果是 v2:
开头的请求,则转发给 v2-flow
继续处理,如果是 v7:
开头的请求,则转发给 v7-flow
来处理,使用的用法和 CCS 是一样的。so easy!
对了,那是不是每个请求都需要加前缀啊,费事啊,没事,在这个 cross-cluster-search
的 filter 最后再加上一个 elasticsearch
filter,前面前缀匹配不上的都会走它,假设默认都走 v7
,最后完整的 flow 配置如下:
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
- elasticsearch:
elasticsearch: v7
然后就没有然后了,因为就配置这些就行了。
启动网关
假设配置文件的路径为 sample-configs/cross-cluster-search.yml
,运行如下命令:
➜ gateway git:(master) ✗ ./bin/gateway -config sample-configs/cross-cluster-search.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2021-10-15 16:25:56, 3d0a1cd
[10-16 11:00:52] [INF] [app.go:228] initializing gateway.
[10-16 11:00:52] [INF] [instance.go:24] workspace: data/gateway/nodes/0
[10-16 11:00:52] [INF] [api.go:260] api listen at: http://0.0.0.0:2900
[10-16 11:00:52] [INF] [reverseproxy.go:257] elasticsearch [v7] hosts: [] => [192.168.3.188:9206]
[10-16 11:00:52] [INF] [entry.go:225] auto generating cert files
[10-16 11:00:52] [INF] [actions.go:223] elasticsearch [v2] is available
[10-16 11:00:52] [INF] [actions.go:223] elasticsearch [v7] is available
[10-16 11:00:53] [INF] [entry.go:296] entry [my_es_entry] listen at: https://0.0.0.0:8000
[10-16 11:00:53] [INF] [app.go:309] gateway is running now.
可以看到网关输出了启动成功的日志,网关服务监听在 https://0.0.0.0:8000
。
试试访问网关
直接访问网关的 8000 端口,因为是网关自签的证书,加上 -k 来跳过证书的校验,如下:
➜ loadgen git:(master) ✗ curl -k https://localhost:8000
{
"name" : "LENOVO",
"cluster_name" : "es-v7140",
"cluster_uuid" : "npWjpIZmS8iP_p3GK01-xg",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
正如前面配置所配置的一样,默认请求访问的就是 v7 集群。
访问 v2 集群
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:/
{
"name" : "Solomon O'Sullivan",
"cluster_name" : "es-v246",
"cluster_uuid" : "cqlpjByvQVWDAv6VvRwPAw",
"version" : {
"number" : "2.4.6",
"build_hash" : "5376dca9f70f3abef96a77f4bb22720ace8240fd",
"build_timestamp" : "2017-07-18T12:17:44Z",
"build_snapshot" : false,
"lucene_version" : "5.5.4"
},
"tagline" : "You Know, for Search"
}
查看集群信息:
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:_cluster/health\?pretty
{
"cluster_name" : "es-v246",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 5,
"active_shards" : 5,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 5,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 50.0
}
插入一条文档:
➜ loadgen git:(master) ✗ curl-json -k https://localhost:8000/v2:medcl/doc/1 -d '{"name":"hello world"}'
{"_index":"medcl","_type":"doc","_id":"1","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}%
执行一个查询
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:medcl/_search\?q\=name:hello
{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":0.19178301,"hits":[{"_index":"medcl","_type":"doc","_id":"1","_score":0.19178301,"_source":{"name":"hello world"}}]}}%
可以看到,所有的请求,不管是集群的操作,还是索引的增删改查都可以,而 Elasticsearch 自带的 CCS 是只读的,只能进行查询。
访问 v7 集群
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v7:/
{
"name" : "LENOVO",
"cluster_name" : "es-v7140",
"cluster_uuid" : "npWjpIZmS8iP_p3GK01-xg",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
Kibana 里面访问
完全没问题,有图有真相:
其他操作也类似,就不重复了。
完整的配置
path.data: data
path.logs: log
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
- elasticsearch:
elasticsearch: v7
router:
- name: my_router
default_flow: cross-cluster-search
elasticsearch:
- name: v2
enabled: true
endpoint: http://192.168.3.188:9202
- name: v7
enabled: true
endpoint: http://192.168.3.188:9206
小结
好了,今天给大家分享的如何使用极限网关来进行 Elasticsearch 跨集群跨版本的操作就到这里了,希望大家周末玩的开心。😁
INFINI Gateway 的使用方法和使用心得分享
Elasticsearch • tianqi 发表了文章 • 7 个评论 • 5074 次浏览 • 2020-12-19 16:46
[program:gateway]
command = /app/logger/gateway/gateway-linux64
username = appuser
autostart=false
autorestart=true
startsecs=3
priority=1007
stdout_logfile=/app/logger/gateway/log/infini_gateway.log
注意这里由于我的测试服务器kibana、gateway和es节点都部署在一起,统一都是用supervisord进行纳管,由于网关必须等到es节点启动以后才可以启动成功,所以将gateway 的autostart设置为false。另外网关节点由于流量、资源不足等问题会有一定风险自己挂掉(测试过程中就遇到过),所以推荐将autorestart设置为true,当网关节点意外挂掉以后能够马上重启不影响应用使用。
配置文件
这块可能是我比较重点想分享的地方,因为可能由于目前项目刚刚发布,medcl大神一直忙于功能开发无暇顾及使用说明的介绍(我妄自揣测,请medcl大神不要见怪^_^),所以配置文件目前在安装介质中只有一个模板,外加模板中的一些参数注释的介绍,可能对于新手进行配置还是有比较大的难度(比如我),所以在自己摸索外加medcl大神的指导下我初步成功配置并实现了预期效果,现在分享给大家。
Path模块:path.data: data
path.logs: log
这块没什么好说的,就是这是data和log的相对路径,一般也不要改
entry模块:entry:
- name: es_gateway #your gateway endpoint
enable: true
router: default #configure your gateway's routing flow
network:
binding: 0.0.0.0:8000
reuse_port: true #you can start multi gateway instance, they share same port, to full utilize system's resources
tls:
enabled: false #if your es is using https, the gateway entrypoint should enable https too
这里实际上就是定义gateway的总体模块,指定了gateway网关名字,绑定的地址端口和网关是否启用的开关,这个开关就是指enable参数,我发现后面的版本有的模板文件中是不带enable这个参数的,这里其实有问题,因为测试时发现如果不设置enable为true的话默认值时false,也就是说网关不生效,为了当时排查这个问题我花了不少时间。这里划重点哈,entry模块里定义最重要的参数就是router,也就是网关gateway的路由策略。gateway的配置文件包含关系是这样的,gateway的entry指定了router,router中指定了tracing_flow和default_flow以及默认flow选择策略,tracing_flow是指定网关自己的流,也就是网关监控的流的处理逻辑,和业务查询和写入是无关的,default_flow实际上是真实网关的业务流,这个流一定是要走cache的。然后flow里面才会包含filter,filter顾名思义就是网关流的过滤筛选条件,filter里会有cache逻辑、rate限流条件、filter条件、elasticsearch属性。因为网关的监控数据要单独向es写入,需要设置一些写入属性和参数,所以需要module模块去指定模板、运行态参数和pipeline,然后再pipeline模块中指定写入es属性,包括es地址、索引名、队列名、worksize、bulksize等等,另外网关自己的一些队列参数设置在queue模块中,网关监控信息参数设置在statsd模块中。
flow模块:- name: cache_first
filter: #comment out any filter sections, like you don't need cache or rate-limiter
- name: get_cache_1
type: get_cache
- name: rate_limit_1
type: rate_limit
parameters:
message: "Hey, You just reached our request limit!"
rules: #configure match rules against request's PATH, eg: /_cluster/health, match the first rule and return
- pattern: "/(?P<index_name>test.*?)/_search" #use regex pattern to match index, will match any /$index/_search, and limit each index with max_qps ~=100
max_qps: 1000
group: index_name
- name: elasticsearch_1
type: elasticsearch
parameters:
elasticsearch: default #elasticsearch configure reference name
max_connection: 1000 #max tcp connection to upstream, default for all nodes
max_response_size: -1 #default for all nodes
balancer: weight
- host: 192.168.3.201:9200 #the format is host:port
weight: 100
- host: 192.168.3.202:9200
weight: 100
discovery:
enabled: false
- name: set_cache_1
type: set_cache
parameters:
cache_ttl: 1000s
# max_cache_items: 100000
- name: request_logging # this flow is used for request logging, refer to `router`'s `tracing_flow`
filter:
- name: request_path_filter_1
type: request_path_filter
parameters:
must: #must match all rules to continue
prefix:
- /test
- name: request_logging_1
type: request_logging
parameters:
queue_name: request_logging
如上,flow模块就是定义了两个flow,一个是cache_first,一个是request_logging,cache_first是承接业务流,所以在filter中一定要设置get_cache和set_cache强制走缓存策略,这里注意set_cache中要有缓存失效时间cache_ttl,默认是10s,这个我根据我的业务需求直接增加到了1000s,因为我想将缓存多保留一段时间,max_cache_items我直接注释掉了,不限制缓存数量的大小,如果一定要设置这个值要注意一下这个值针对的是每一个查询语句的而不是总共的。另外在cache_first中可以设置限流策略,/(?P<index_name>test.*?)表示test开头的索引全部应用限流策略。这里还有一个关键点是指定elasticsearch地址时可以配置连接权重,这个参数对我还是蛮有用的,因为我想我的应用查询通过网关只从coor节点进入而不是datanode和masternode,做到读写节点分离同时降低大查询灾难蔓延扩散的风险。所以我会将es集群中两个coor节点配置到这里,weight尽量设置的大一点,这里要注意如果这样配置必须把discovery的enable设置为false,否则网关还是会从master或者data节点进入。
request_logging的流其实就是网关trace监控自己用的,由于后面进行了filter强匹配,所以采样sample参数我就没有配置。注意这里其实我们在cache_first中并没有配置filter规则,所以这里任何过网关的查询都会进入缓存,包括查询tracing_flow自己创建的索引。在request_logging尽量按照自己的需求去配置一些filter规则来减少监控写入的索引,如果不配置,写入的量级会非常多,后续用仪表盘进行监控时响应会非常慢。目前gateway网关支持的filter类型非常全面,包括request_path_filter根据索引名或者路径去筛选,request_header_filter根据请求头部信息去筛选索引,request_method_filter根据请求的方法类型去筛选,注意如果使用request_header_filter根据请求头部信息去筛选索引,需要在应用在请求的头里加入特殊标识,比如如果要通过request_header_filter方式把所有kibana的请求全部过滤掉,就需要在kibana配置文件中增加头部参数并自定义值:elasticsearch.customHeaders: { "app": "kibana" }
然后在request_header_filter中加过滤条件:- name: request_header_filter1 # filter out the requests that we are not interested, reduce tracing pressure
type: request_header_filter
parameters:
exclude: # any rule match will marked request as filtered
- app: kibana # in order to filter kibana's access log, config `elasticsearch.customHeaders: { "app": "kibana" }` to your kibana's config `/config/kibana.yml`
我目前的需求是只想监控某些固定索引前缀的请求,所以我只配置了request_path_filter并must强配置了比如test索引前缀的索引,这样已经满足我得需求并最大限度的减少了监控请求数量。
这里还有一个点,就是在filter类型中还有个特殊的类型,叫做request_logging,这个是专门针对tracing_flow设计的,其中有一个重要参数是queue_name,他会在gateway网关所在的服务器磁盘上创建一个队列用来加速写入,减少tracing_flow对网关所造成的性能影响,所以这个队列在这里创建之后会在后面的pipelines中去指定写入磁盘队列。
router模块:router:
- name: default
tracing_flow: request_logging #a flow will execute after request finish
default_flow: cache_first
rules: #rules can't be conflicted with each other, will be improved in the future
- id: 1 # this rule means match every requests, and sent to `cache_first` flow
method:
- "*"
pattern:
- /
# priority: 1
flow:
- cache_first # after match, which processing flow will go through
router模块最重要的就是指定了tracing_flow和default_flow,这里可以定义路由规则,按照默认全部匹配就走缓存可以,同时最后在flow参数指定默认要走的flow流,也就是cache_first业务请求流。
elasticsearch模块:elasticsearch:
- name: default
enabled: true
endpoint: http://localhost:9200 # if your elasticsearch is using https, your gateway should be listen on as https as well
version: 7.9.1 #optional, used to select es adaptor, can be done automatically after connect to es
index_prefix: gateway_
basic_auth: #used to discovery full cluster nodes, or check elasticsearch's health and versions
username: elastic
password: pass
这里需要指定es地址和端口,可以配置多个,也就是说可以将业务的es和监控索引存储的es进行分离,这里有个参数index_prefix,我的理解是这个会在kibana中去创建这个索引模式,用来后续进行仪表盘监控,但是真实测试结果是这个设置没有生效,索引模式前缀都是gateway_requests,这个可能需要后续和Medcl大神再确认一下。
modules模块:modules:
- name: elastic
enabled: true
elasticsearch: default
init_template: true
- name: pipeline
enabled: true
runners:
- name: primary
enabled: true
max_go_routine: 1
threshold_in_ms: 0
timeout_in_ms: 5000
pipeline_id: request_logging_index
modules需要配置两个模块,一个是es模块,一个是pipeline模块,主要是为
request_logging要往es中写数做准备,es模块使用默认模板,pipeline设置一些写入需要的参数,我全部没改使用默认值。
pipelines模块:pipelines:
- name: request_logging_index
start:
joint: json_indexing
enabled: true
parameters:
index_name: "gateway_requests"
elasticsearch: "default"
input_queue: "request_logging"
timeout: "60s"
worker_size: 1
bulk_size_in_mb: 10 #in MB
process:
因为要向es中写入trace数据,所以需要在pipeline中配置写入参数,包括json方式传输,写入的es(如果es有多个需要拆分),使用的磁盘队列queue,超时时间,写入worker数以及bulksize,这个我觉得后续可以根据监控的量级进行优化。
queue模块:queue:
min_msg_size: 1
max_msg_size: 50000000
max_bytes_per_file: 53687091200
sync_every_records: 100000 # sync by records count
sync_timeout_in_ms: 10000 # sync by time in million seconds
read_chan_buffer: 0
这个就是那个trace数据本次磁盘队列的一些写入参数,包括消息大小,磁盘文件大小,同步消息数,read buffer等等,如果像我目前的应用场景已经严格匹配索引的量级有限可以不用改,但是我觉得默认max_bytes_per_file默认值是50G,这个我觉得单个文件有点大,生产环境可以考虑调小。
statsd模块:enabled: false
host: 127.0.0.1
port: 8125
namespace: gateway.
这个模块目前我没用到,应该是本身监控gateway状态的模块,指定了监控端口,后续可以进行验证。
使用对比结果:
通过gateway中自带仪表盘可以对使用情况和效果进行方便的监控,通过比对可以发现一条复用的查询语句走网关缓存和不走网关缓存性能差距在100倍以上,使用网关进行查询优势巨大特别明显。
未来展望:
首先感谢Medcl大神把这么好的东西发布出来,使用网关对于业务查询有百倍上的性能提升确实十分诱人。而且目前虽然INFINI Gateway刚刚发布出来,但是已经陆陆续续迭代了好几个版本,可以明显看到一些bug修复、功能增强和性能优化提高,目前最新的版本又增加了请求灰度切换、流量迁移和流量复制功能,可以实现双写和多写,这个功能对于我来说后续很可能会应用到,因为实际上是一种变相的多集群数据同步方式的实现。所以可以看出INFINI Gateway网关不仅仅定位于查询缓存网关,而是集查询和写入功能为一体的综合性大网关,期待Medcl大神后续的精彩表演,让我们拭目以待。 极限网关初探(2)配置
Elasticsearch • xushuhui 发表了文章 • 0 个评论 • 1802 次浏览 • 2022-04-06 17:03
配置
上一篇我们先学习了极限网关的安装和启动,今天学习配置。
读写分离
现在我们遇到读写分离的需求,用网关该怎么做呢? 假设服务端现在从 http://127.0.0.1:8000 写入数据,从 http://127.0.0.1:9000 读取数据,怎么设计呢?
首先查看文档配置文档
我们在 gateway.yml 中定义两个 entry,分别绑定不同的端口,配置不同的 router
entry:
- name: write_es
enabled: true
router: write_router
network:
binding: 0.0.0.0:8000
- name: read_es
enabled: true
router: read_router
network:
binding: 0.0.0.0:9000
router:
- name: write_router
default_flow: default_flow
tracing_flow: logging
- name: read_router
default_flow: default_flow
tracing_flow: logging
为了演示效果,只配置一个 Elasticsearch
elasticsearch:
- name: dev
enabled: true
schema: http
hosts:
- 192.168.3.188:9206
启动项目
我们从 http://127.0.0.1:8000 写入一条数据,再从 http://127.0.0.1:9000 读取该条数据
添加接口
返回字符串
我们想自定义添加一个接口,怎么在不写代码的情况下通过配置实现返回字符串
flow:
- name: hello_flow
filter:
- echo:
message: "hello flow"
router:
- name: read_router
default_flow: hello_flow
修改配置后启动
返回 json 数据
返回字符串不符合标准的 restful 接口规范,怎么返回给调用方标准 json 数据?
filter:
- set_response:
content_type: application/json
body: '{"message":"hello world"}'
修改配置后启动
修改路由
我们已经新加了接口,返回 json 数据,但是接口是直接定义在 http://127.0.0.1:9000 中,之前网关的接口就无法使用,所以我们需要单独为自定义的接口指定单独的路由
router:
- name: read_router
default_flow: default_flow
tracing_flow: logging
rules:
- method:
- GET
pattern:
- "/hello"
flow:
- hello_flow
default_flow: 默认的处理流,也就是业务处理的主流程,请求转发、过滤、缓存等操作都在这里面进行
tracing_flow:用于追踪请求状态的流,用于记录请求日志、统计等
如果我们有过开发经验,了解 MVC 模式,flow 就类似 MVC 中的 Controller,rules 中类似路由规则,当请求匹配到配置中的路由规则时,由配置的 flow 处理业务逻辑。
数据整体流向,从服务端发到网关,网关为每个 Elasticsearch 绑定不同的 IP 地址,每个 Elasticsearch 都有唯一一个 router 和它对应,根据请求的 method 和 path 匹配到 router 中的一个 flow,flow 中包含多个 filter 处理对数据进行流式处理。
如下图所示
流式处理是什么,假设水从一个管子里面流出来,管子旁边每一段依次站了几个人,第一个人往水里放点鱼,鱼和水到了第二个人,第二个人往水里放点草,鱼、水和草到了第三人等等,每个人对水做一定的操作,水经过这些操作后最后到达水池里。
我们可以把数据当成水,filter 是管子旁边的人,水池就是 Elasticsearch
总结
在学习了router/flow/filter后,我们已经对极限网关的配置有了初步的了解,后续开发的时候直接查阅文档。
极限网关初探(1) 安装启动
Elasticsearch • xushuhui 发表了文章 • 0 个评论 • 1806 次浏览 • 2022-04-06 16:54
产品介绍
极限网关(INFINI Gateway)是一个面向 Elasticsearch 的高性能应用网关。特性丰富,使用简单。
它和其他业务型网关最大的区别是业务网关把请求转发给各个底层微服务,而它把请求转发给 Elasticsearch,更多是类似 Mycat 的中间件的作用。
没有使用网关之前,服务端请求多个节点
使用网关后
下载地址
打开 下载地址,根据操作系统版本选择。
Windows 安装和启动
安装
下载 gateway-1.6.0_SNAPSHOT-597-windows-amd64.zip,解压如下。 gateway-windows-amd64.exe 是启动文件,gateway.yml 是默认配置文件。
启动失败
当 gateway.yml 的 elasticsearch 选项中的 hosts 不能正常响应请求的时候,启动界面如下。
为什么 elasticsearch 不能访问的时候,网关还要继续提供服务呢,为什么不像业务接口启动时在基础业务组件如 MySQL/Redis 不能正常响应就直接 panic?
一方面网关作为 elasticsearch 抵挡流量冲击的城墙,在 elasticsearch 不能提供服务的时候,对之前成功的请求缓存结果,继续提供有限度的服务,为 elasticsearch 修复后上线争取时间。
另一方面业务接口和基础组件是强耦合关系,没有基础组件就完全无法对外提供数据读写服务,而网关与 elasticsearch 是松耦合关系,网关在没有 elasticsearch 的情况下也能对外提供有限度的服务。
在 gateway.yml 的 elasticsearch 选项中的 hosts 改成能够正常响应的 elasticsearch 请求地址。
启动成功
双击 gateway-windows-amd64.exe 文件,启动成功界面如下
访问
API 访问
由启动后终端显示可知,网关的 API 接口地址是 http://localhost:2900
[api.go:262] api listen at: http://0.0.0.0:2900
打开浏览器输入 http://localhost:2900,显示所有可以对外提供的 API 接口
我们选择其中一个,在浏览器中输入 http://localhost:2900/_framework/api/_version 从路由上看该接口是查询产品的版本信息,显示如下
gateway.yml 中可以看到有被注释掉的一段配置,看起来应该是配置 api 地址的地方。
#api:
# enabled: true
# network:
# binding: 127.0.0.1:2900
把注释去掉后尝试把端口改成 2901。
api:
enabled: true
network:
binding: 127.0.0.1:2901
改完后启动 打开浏览器先输入 http://localhost:2900,无法正常响应请求,再输入 http://localhost:2901,可以正常响应,界面和修改配置前访问 http://localhost:2900 的界面一样,说明 API 请求地址成功修改
Elasticsearch 访问
启动日志中显示监听 8000 端口,猜测应该是 elasticsearch 请求地址,打开浏览器输入 http://127.0.0.1:8000/
entry [my_es_entry] listen at: http://0.0.0.0:8000
gateway.yml 中可以看到 my_es_entry 的 network 绑定 8000 端口,显而易见的这部分就是配置代理转发给 elasticsearch 的地址,所以安装后只需要把以前请求 elasticsearch 的地址修改为该地址。
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
总结
我们成功安装和启动极限网关,接下来我们学习怎么根据需求修改配置。
使用极限网关来处置 Elasticsearch 的 Apache Log4j 漏洞
Elasticsearch • medcl 发表了文章 • 2 个评论 • 5220 次浏览 • 2021-12-11 03:57
昨日爆出的 Log4j 安全漏洞,业界一片哗然,今天给大家介绍一下,如何使用极限网关来快速处置 Elasticsearch 的 Apache Log4j 漏洞。
【CVE 地址】
https://github.com/advisories/GHSA-jfh8-c2jp-5v3q
【漏洞描述】
Apache Log4j 是一款非常流行的开源的用于 Java 运行环境的日志记录工具包,大量的 Java 框架包括 Elasticsearch 的最新版本都使用了该组件,故影响范围非常之大。
近日, 随着 Apache Log4j 的远程代码执行最新漏洞细节被公开,攻击者可通过构造恶意请求利用该漏洞实现在目标服务器上执行任意代码。可导致服务器被黑客控制,从而进行页面篡改、数据窃取、挖矿、勒索等行为。建议使用该组件的用户第一时间启动应急响应进行修复。
简单总结一下就是,在使用 Log4j 打印输出的日志中,如果发现日志内容中包含关键词 ${
,那么这个里面包含的内容会当做变量来进行替换和执行,导致攻击者可以通过恶意构造日志内容来让 Java 进程来执行任意命令,达到攻击的效果。
【漏洞等级】:非常紧急
此次漏洞是用于 Log4j2 提供的 lookup 功能造成的,该功能允许开发者通过一些协议去读取相应环境中的配置。但在实现的过程中,并未对输入进行严格的判断,从而造成漏洞的发生。
【影响范围】:Java 类产品:Apache Log4j 2.x < 2.15.0-rc2,Elasticsearch 当前所有版本。
【攻击检测】
可以通过检查日志中是否存在 jndi:ldap://
、jndi:rmi
等字符来发现可能的攻击行为。
处理办法
最简单的办法是通过修改 config/jvm.options
,新增以下参数,重启集群所有节点即可。
-Dlog4j2.formatMsgNoLookups=true
不过,如果集群规模较大,数据较多,业务不能中断,不能通过修改 Elasticsearch 配置、或者替换 Log4j 的最新 jar 包来重启集群的情况,可以考虑使用极限网关来进行拦截或者参数替换甚至是直接阻断请求。
通过在网关层对发往 Elasticsearch 的请求统一进行参数检测,将包含的敏感关键词 ${
进行替换或者直接拒绝,
可以防止带攻击的请求到达 Elasticsearch 服务端而被 Log4j 打印相关日志的时候执行恶意攻击命令,从而避免被攻击。
极限网关是透明代理,只需要在应用端,将以往配置指向 Elasticsearch 的地址替换为现在网关的地址即可,其他都不用动。
参考配置
下载最新的 1.5.0-SNAPSHOT
版本http://release.elasticsearch.cn/gateway/snapshot/
使用极限网关的 context_filter
过滤器,对请求上下文 _ctx.request.to_string
进行关键字检测,过滤掉恶意流量,从而阻断攻击。
新增一个配置文件 gateway.yml
path.data: data
path.logs: log
entry:
- name: es_entrypoint
enabled: true
router: default
max_concurrency: 20000
network:
binding: 0.0.0.0:8000
router:
- name: default
default_flow: main_flow
flow:
- name: main_flow
filter:
- context_filter:
context: _ctx.request.to_string
action: redirect_flow
status: 403
flow: log4j_matched_flow
must_not: # any match will be filtered
regex:
- \$\{.*?\}
- "%24%7B.*?%7D" #urlencode
contain:
- "jndi:"
- "jndi:ldap:"
- "jndi:rmi:"
- "jndi%3A" #urlencode
- "jndi%3Aldap%3A" #urlencode
- "jndi%3Armi%3A" #urlencode
- elasticsearch:
elasticsearch: es-server
- name: log4j_matched_flow
filter:
- echo:
message: 'Apache Log4j 2, Boom!'
elasticsearch:
- name: es-server
enabled: true
endpoints:
- http://localhost:9200
启动网关:
➜ ./bin/gateway -config /tmp/gateway.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2021-12-10 23:55:34, 2212aff
[12-11 01:55:49] [INF] [app.go:250] initializing gateway.
[12-11 01:55:49] [INF] [instance.go:26] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/0
[12-11 01:55:49] [INF] [api.go:261] api listen at: http://0.0.0.0:2900
[12-11 01:55:49] [INF] [reverseproxy.go:253] elasticsearch [es-server] hosts: [] => [localhost:9200]
[12-11 01:55:49] [INF] [entry.go:296] entry [es_entrypoint] listen at: http://0.0.0.0:8000
[12-11 01:55:49] [INF] [module.go:116] all modules started
[12-11 01:55:49] [INF] [app.go:357] gateway is running now.
[12-11 01:55:49] [INF] [actions.go:236] elasticsearch [es-server] is available
将要使用的测试命令 ${java:os}
使用 urlencode 转码为 %24%7Bjava%3Aos%7D
,构造查询语句,分别测试。
不走网关:
~% curl 'http://localhost:9200/index1/_search?q=%24%7Bjava%3Aos%7D'
{"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"index1","index_uuid":"_na_","index":"index1"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"index1","index_uuid":"_na_","index":"index1"},"status":404}%
查看 Elasticsearch 端日志为:
[2021-12-11T01:49:50,303][DEBUG][r.suppressed ] path: /index1/_search, params: {q=Mac OS X 10.13.4 unknown, architecture: x86_64-64, index=index1}
org.elasticsearch.index.IndexNotFoundException: no such index
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.infe(IndexNameExpressionResolver.java:678) ~[elasticsearch-5.6.15.jar:5.6.15]
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.innerResolve(IndexNameExpressionResolver.java:632) ~[elasticsearch-5.6.15.jar:5.6.15]
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.resolve(IndexNameExpressionResolver.java:580) ~[elasticsearch-5.6.15.jar:5.6.15]
可以看到查询条件里面的 q=${java:os}
被执行了,变成了 q=Mac OS X 10.13.4 unknown, architecture: x86_64-64, index=index1
,说明变量被解析且执行,存在漏洞利用的风险。
那走网关之后呢:
~% curl 'http://localhost:8000/index1/_search?q=%24%7Bjava%3Aos%7D'
Apache Log4j 2, Boom!%
可以看到请求被过滤掉了,返回了自定义的信息。
还有一些其他测试命令,大家也可以试试:
#{java:vm}
~% curl 'http://localhost:9200/index/_search?q=%24%7Bjava%3Avm%7D'
[2021-12-11T02:36:04,764][DEBUG][r.suppressed ] [INFINI-2.local] path: /index/_search, params: {q=OpenJDK 64-Bit Server VM (build 25.72-b15, mixed mode), index=index}
~% curl 'http://localhost:8000/index/_search?q=%24%7Bjava%3Avm%7D'
Apache Log4j 2, Boom!%
#{jndi:rmi://localhost:1099/api}
~% curl 'http://localhost:9200/index/_search?q=%24%7Bjndi%3Armi%3A%2F%2Flocalhost%3A1099%2Fapi%7D'
2021-12-11 03:35:06,493 elasticsearch[YOmFJsW][search][T#3] ERROR An exception occurred processing Appender console java.lang.SecurityException: attempt to add a Permission to a readonly Permissions object
~% curl 'http://localhost:8000/index/_search?q=%24%7Bjndi%3Armi%3A%2F%2Flocalhost%3A1099%2Fapi%7D'
Apache Log4j 2, Boom!%
另外不同版本的 Elasticsearch 对于攻击的复现程度参差不齐,因为 es 不同版本是否有 Java Security Manager 、不同版本 JDK 、以及默认配置也不相同,新一点的 es 其实同样可以触发恶意请求,只不过网络调用被默认的网络策略给拒绝了,相对安全,当然如果设置不当同样存在风险,见过很多用户一上来就关默认安全配置的,甚至还放开很多暂时用不上的权限,另外未知的攻击方式也一定有,比如大量日志产生的系统调用可能会拖垮机器造成服务不可用,所以要么还是尽快改配置换 log4j 包重启集群,或者走网关来过滤阻断请求吧。
使用极限网关处置类似安全事件的好处是,Elasticsearch 服务器不用做任何变动,尤其是大规模集群的场景,可以节省大量的工作,提升效率,非常灵活,缩短安全处置的时间,降低企业风险。
使用极限网关来进行 Elasticsearch 跨集群跨版本查询及所有其它请求
Elasticsearch • medcl 发表了文章 • 7 个评论 • 3720 次浏览 • 2021-10-16 11:31
使用场景
如果你的业务需要用到有多个集群,并且版本还不一样,是不是管理起来很麻烦,如果能够通过一个 API 来进行查询就方便了,聪明的你相信已经想到了 CCS,没错用 CCS 可以实现跨集群的查询,不过 Elasticsearch 提供的 CCS 对版本有一点的限制,并且需要提前做好 mTLS,也就是需要提前配置好两个集群之间的证书互信,这个免不了要重启维护,好像有点麻烦,那么问题来咯,有没有更好的方案呢?
😁 有办法,今天我就给大家介绍一个基于极限网关的方案,极限网关的网址:http://极限网关.com/。
假设现在有两个集群,一个集群是 v2.4.6,有不少业务数据,舍不得删,里面有很多好东西 :)还有一个集群是 v7.14.0,版本还算比较新,业务正在做的一个新的试点,没什么好东西,但是也得用 :(,现在老板们的的需求是希望通过在一个统一的接口就能访问这些数据,程序员懒得很,懂得都懂。
集群信息
- v2.4.6 集群的访问入口地址:192.168.3.188:9202
- v7.14.0 集群的访问入口地址:192.168.3.188:9206
这两个集群都是 http 协议的。
实现步骤
今天用到的是极限网关的 switch 过滤器:https://极限网关.com/docs/references/filters/switch/
网关下载下来就两个文件,一个主程序,一个配置文件,记得下载对应操作系统的包。
定义两个集群资源
elasticsearch:
- name: v2
enabled: true
endpoint: http://192.168.3.188:9202
- name: v7
enabled: true
endpoint: http://192.168.3.188:9206
上面定义了两个集群,分别命名为 v2
和 v7
,待会会用到这些资源。
定义一个服务入口
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 1000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
这里定义了一个名为 my_es_entry
的资源入口,并引用了一个名为 my_router
的请求转发路由,同时绑定了网卡的 0.0.0.0:8000
也就是所有本地网卡监听 IP 的 8000
端口,访问任意 IP 的 8000
端口就能访问到这个网关了。
另外老板也说了,Elasticsearch 用 HTTP 协议简直就是裸奔,通过这里开启 tls
,可以让网关对外提供的是 HTTPS 协议,这样用户连接的 Elasticsearch 服务就自带 buffer 了,后端的 es 集群和网关直接可以做好网络流量隔离,集群不用动,简直完美。
为什么定义 TLS 不用指定证书,好用的软件不需要这么麻烦,就这样,不解释。
最后,通过设置 max_concurrency
为 1000,限制下并发数,避免野猴子把我们的后端的 Elasticsearch 给压挂了。
定义一个请求路由
router:
- name: my_router
default_flow: cross-cluster-search
这里的名称 my_router
就是表示上面的服务入口的router
参数指定的值。
另外设置一个 default_flow
来将所有的请求都转发给一个名为 cross-cluster-search
的请求处理流程,还没定义,别急,马上。
定义请求处理流程
来啦,来啦,先定义两个 flow,如下,分别名为 v2-flow
和 v7-flow
,每节配置的 filter
定义了一系列过滤器,用来对请求进行处理,这里就用了一个 elasticsearch
过滤器,也就是转发请求给指定的 Elasticsearch 后端服务器,了否?
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
然后,在定义额外一个名为 cross-cluster-search
的 flow,如下:
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
这个 flow 就是通过请求的路径的前缀来进行路由的过滤器,如果是 v2:
开头的请求,则转发给 v2-flow
继续处理,如果是 v7:
开头的请求,则转发给 v7-flow
来处理,使用的用法和 CCS 是一样的。so easy!
对了,那是不是每个请求都需要加前缀啊,费事啊,没事,在这个 cross-cluster-search
的 filter 最后再加上一个 elasticsearch
filter,前面前缀匹配不上的都会走它,假设默认都走 v7
,最后完整的 flow 配置如下:
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
- elasticsearch:
elasticsearch: v7
然后就没有然后了,因为就配置这些就行了。
启动网关
假设配置文件的路径为 sample-configs/cross-cluster-search.yml
,运行如下命令:
➜ gateway git:(master) ✗ ./bin/gateway -config sample-configs/cross-cluster-search.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2021-10-15 16:25:56, 3d0a1cd
[10-16 11:00:52] [INF] [app.go:228] initializing gateway.
[10-16 11:00:52] [INF] [instance.go:24] workspace: data/gateway/nodes/0
[10-16 11:00:52] [INF] [api.go:260] api listen at: http://0.0.0.0:2900
[10-16 11:00:52] [INF] [reverseproxy.go:257] elasticsearch [v7] hosts: [] => [192.168.3.188:9206]
[10-16 11:00:52] [INF] [entry.go:225] auto generating cert files
[10-16 11:00:52] [INF] [actions.go:223] elasticsearch [v2] is available
[10-16 11:00:52] [INF] [actions.go:223] elasticsearch [v7] is available
[10-16 11:00:53] [INF] [entry.go:296] entry [my_es_entry] listen at: https://0.0.0.0:8000
[10-16 11:00:53] [INF] [app.go:309] gateway is running now.
可以看到网关输出了启动成功的日志,网关服务监听在 https://0.0.0.0:8000
。
试试访问网关
直接访问网关的 8000 端口,因为是网关自签的证书,加上 -k 来跳过证书的校验,如下:
➜ loadgen git:(master) ✗ curl -k https://localhost:8000
{
"name" : "LENOVO",
"cluster_name" : "es-v7140",
"cluster_uuid" : "npWjpIZmS8iP_p3GK01-xg",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
正如前面配置所配置的一样,默认请求访问的就是 v7 集群。
访问 v2 集群
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:/
{
"name" : "Solomon O'Sullivan",
"cluster_name" : "es-v246",
"cluster_uuid" : "cqlpjByvQVWDAv6VvRwPAw",
"version" : {
"number" : "2.4.6",
"build_hash" : "5376dca9f70f3abef96a77f4bb22720ace8240fd",
"build_timestamp" : "2017-07-18T12:17:44Z",
"build_snapshot" : false,
"lucene_version" : "5.5.4"
},
"tagline" : "You Know, for Search"
}
查看集群信息:
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:_cluster/health\?pretty
{
"cluster_name" : "es-v246",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 5,
"active_shards" : 5,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 5,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 50.0
}
插入一条文档:
➜ loadgen git:(master) ✗ curl-json -k https://localhost:8000/v2:medcl/doc/1 -d '{"name":"hello world"}'
{"_index":"medcl","_type":"doc","_id":"1","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}%
执行一个查询
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:medcl/_search\?q\=name:hello
{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":0.19178301,"hits":[{"_index":"medcl","_type":"doc","_id":"1","_score":0.19178301,"_source":{"name":"hello world"}}]}}%
可以看到,所有的请求,不管是集群的操作,还是索引的增删改查都可以,而 Elasticsearch 自带的 CCS 是只读的,只能进行查询。
访问 v7 集群
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v7:/
{
"name" : "LENOVO",
"cluster_name" : "es-v7140",
"cluster_uuid" : "npWjpIZmS8iP_p3GK01-xg",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
Kibana 里面访问
完全没问题,有图有真相:
其他操作也类似,就不重复了。
完整的配置
path.data: data
path.logs: log
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
- elasticsearch:
elasticsearch: v7
router:
- name: my_router
default_flow: cross-cluster-search
elasticsearch:
- name: v2
enabled: true
endpoint: http://192.168.3.188:9202
- name: v7
enabled: true
endpoint: http://192.168.3.188:9206
小结
好了,今天给大家分享的如何使用极限网关来进行 Elasticsearch 跨集群跨版本的操作就到这里了,希望大家周末玩的开心。😁
求教极限网关的endpoins如何配置多个es节点的ip
回复Elasticsearch • zqc0512 回复了问题 • 2 人关注 • 1 个回复 • 2743 次浏览 • 2021-11-25 11:27
如何防止 Elasticsearch 服务 OOM?
Easysearch • yangmf2040 发表了文章 • 0 个评论 • 1868 次浏览 • 2024-02-26 10:12
Elasticsearch(简称:ES) 和传统关系型数据库有很多区别, 比如传统数据中普遍都有一个叫“最大连接数”的设置。目的是使数据库系统工作在可控的负载下,避免出现负载过高,资源耗尽,谁也无法登录的局面。
那 ES 在这方面有类似参数吗?答案是没有,这也是为何 ES 会被流量打爆的原因之一。
针对大并发访问 ES 服务,造成 ES 节点 OOM,服务中断的情况,极限科技旗下的 INFINI Gateway 产品(以下简称 “极限网关”)可从两个方面入手,保障 ES 服务的可用性。
- 限制最大并发访问连接数。
- 限制非重要索引的请求速度,保障重要业务索引的访问速度。
下面我们来详细聊聊。
架构图
所有访问 ES 的请求都发给网关,可部署多个网关。
限制最大连接数
在网关配置文件中,默认有最大并发连接数限制,默认最大 10000。
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: $[[env.GW_BINDING]]
# See `gateway.disable_reuse_port_by_default` for more information.
reuse_port: true
使用压测程序测试,看看到达10000个连接后,能否限制新的连接。 超过的连接请求,被丢弃。更多信息参考官方文档。
限制索引写入速度
我们先看看不做限制的时候,测试环境的写入速度,在 9w - 15w docs/s 之间波动。虽然峰值很高,但不稳定。 接下来,我们通过网关把写入速度控制在最大 1w docs/s 。 对网关的配置文件 gateway.yml ,做以下修改。
env: # env 下添加
THROTTLE_BULK_INDEXING_MAX_BYTES: 40485760 #40MB/s
THROTTLE_BULK_INDEXING_MAX_REQUESTS: 10000 #10k docs/s
THROTTLE_BULK_INDEXING_ACTION: retry #retry,drop
THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES: 10 #1000
THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS: 100 #10
router: # route 部分修改 flow
- name: my_router
default_flow: default_flow
tracing_flow: logging_flow
rules:
- method:
- "*"
pattern:
- "/_bulk"
- "/{any_index}/_bulk"
flow:
- write_flow
flow: #flow 部分增加下面两段
- name: write_flow
filter:
- flow:
flows:
- bulking_indexing_limit
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
- name: bulking_indexing_limit
filter:
- bulk_request_throttle:
indices:
"test-index":
max_bytes: $[[env.THROTTLE_BULK_INDEXING_MAX_BYTES]]
max_requests: $[[env.THROTTLE_BULK_INDEXING_MAX_REQUESTS]]
action: $[[env.THROTTLE_BULK_INDEXING_ACTION]]
retry_delay_in_ms: $[[env.THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS]]
max_retry_times: $[[env.THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES]]
message: "bulk writing too fast" #触发限流告警message自定义
log_warn_message: true
再次压测,test-index 索引写入速度被限制在了 1w docs/s 。
限制多个索引写入速度
上面的配置是针对 test-index 索引的写入速度控制。如果想添加其他的索引,新增一段配置即可。 比如,我允许 abc 索引写入达到 2w docs/s,test-index 索引最多不超过 1w docs/s ,可配置如下。
- name: bulking_indexing_limit
filter:
- bulk_request_throttle:
indices:
"abc":
max_requests: 20000
action: drop
message: "abc doc写入超阈值" #触发限流告警message自定义
log_warn_message: true
"test-index":
max_bytes: $[[env.THROTTLE_BULK_INDEXING_MAX_BYTES]]
max_requests: $[[env.THROTTLE_BULK_INDEXING_MAX_REQUESTS]]
action: $[[env.THROTTLE_BULK_INDEXING_ACTION]]
retry_delay_in_ms: $[[env.THROTTLE_BULK_INDEXING_RETRY_DELAY_IN_MS]]
max_retry_times: $[[env.THROTTLE_BULK_INDEXING_MAX_RETRY_TIMES]]
message: "bulk writing too fast" #触发限流告警message自定义
log_warn_message: true
限速效果如下
限制读请求速度
我们先看看不做限制的时候,测试环境的读取速度,7w qps 。 接下来我们通过网关把读取速度控制在最大 1w qps 。 继续对网关的配置文件 gateway.yml 做以下修改。
- name: default_flow
filter:
- request_path_limiter:
message: "Hey, You just reached our request limit!" rules:
- pattern: "/(?P<index_name>abc)/_search"
max_qps: 10000
group: index_name
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
再次进行测试,读取速度被限制在了 1w qps 。
限制多个索引读取速度
上面的配置是针对 abc 索引的写入速度控制。如果想添加其他的索引,新增一段配置即可。 比如,我允许 abc 索引读取达到 1w qps,test-index 索引最多不超过 2w qps ,可配置如下。
- name: default_flow
filter:
- request_path_limiter:
message: "Hey, You just reached our request limit!"
rules:
- pattern: "/(?P<index_name>abc)/_search"
max_qps: 10000
group: index_name
- pattern: "/(?P<index_name>test-index)/_search"
max_qps: 20000
group: index_name
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
多个网关限速
限速是每个网关自身的控制,如果有多个网关,那么后端 ES 集群收到的请求数等于多个网关限速的总和。 本次介绍就到这里了。相信大家在使用 ES 的过程中也遇到过各种各样的问题。欢迎大家来我们这个平台分享自己的问题、解决方案等。如有任何问题,请随时联系我,期待与您交流!
开启安全功能 ES 集群就安全了吗?
Easysearch • yangmf2040 发表了文章 • 0 个评论 • 2917 次浏览 • 2023-12-27 10:38
背景
经常跟 ES 打交道的朋友都知道,现在主流的 ES 集群安全方案是:RBAC + TLS for Internal + HTTPS 。
作为终端用户一般只需要关心用户名和密码就行了。作为管理和运维 ES 的人员来说,可能希望 ES 能提供密码策略来强制密码强度和密码使用周期。遗憾的是 ES 对密码强度和密码使用周期没有任何强制要求。如果不注意,可能我们天天都在使用“弱密码”或从不修改的密码(直到无法登录)。而且 ES 对连续的认证失败,不会做任何处理,这让 ES 很容易遭受暴力破解的入侵。
那还有没有别的办法,进一步提高安全呢? 其实,网关可以来帮忙。
虽然网关无法强制提高密码复杂度,但可以提高 ES 集群被暴力破解的难度。
大家都知道,暴力破解--本质就是不停的“猜”你的密码。以现在的 CPU 算力,一秒钟“猜”个几千上万次不过是洒洒水,而且 CPU 监控都不带波动的,很难发现异常。从这里入手,一方面,网关可以延长认证失败的过程--延迟返回结果,让破解不再暴力。另一方面,网关可以记录认证失败的情况,做到实时监控,有条件的告警。一旦出现苗头,可以使用网关阻断该 IP 或用户发来的任何请求。
场景模拟
首先,用网关代理 ES 集群,并在 default_flow 中增加一段 response_status_filter 过滤器配置,对返回码是 401 的请求,跳转到 rate_limit_flow 进行降速,延迟 5 秒返回。
- name: default_flow
filter:
- elasticsearch:
elasticsearch: prod
max_connection_per_node: 1000
- response_status_filter:
exclude:
- 401
action: redirect_flow
flow: rate_limit_flow
- name: rate_limit_flow
filter:
- sleep:
sleep_in_million_seconds: 5000
其次,对于失败的认证,我们可以通过 Console 来做个看板实时分析,展示。
折线图、饼图图、柱状图等,多种展示方式,大家可充分发挥。
最后,可在 Console 的告警中心,配置对应的告警规则,实时监控该类事件,方便及时跟进处置。
效果测试
先带上正确的用户名密码测试,看看返回速度。
0.011 秒返回。再使用错误的密码测试。
整整 5 秒多后,才回返结果。如果要暴力破解,每 5 秒钟甚至更久才尝试一个密码,这还叫暴力吗?
看板示例
此处仅仅是抛砖引玉,欢迎大家发挥想象。
告警示例
建立告警规则,用户 1 分钟内超过 3 次登录失败,就产生告警。
可在告警中心查看详情,也可将告警推送至微信、钉钉、飞书、邮件等。
查看告警详情,是 es 用户触发了告警。
最后,剩下的工作就是对该账号的处置了。如果有需要可以考虑阻止该用户或 IP 的请求,对应的过滤器文档在这里,老规矩加到 default_flow 里就行了。
如果小伙伴有其他办法提升 ES 集群安全,欢迎和我们一起讨论、交流。我们的宗旨是:让搜索更简单!
用极限网关实现ES容灾,简单!
Elasticsearch • yangmf2040 发表了文章 • 0 个评论 • 2021 次浏览 • 2023-07-20 10:33
身为 IT 人士,大伙身边的各种系统肯定不少吧。系统虽多,但最最最重要的那套、那几套,大伙肯定是捧在手心,关怀备至。如此重要的系统,万一发生故障了且短期无法恢复,该如何保障业务持续运行? 有过这方面思考或经验的同学,肯定脱口而出--切灾备啊。 是的,接下来我来介绍下我们的 ES 灾备方案。当然如果你有更好的,请使用各种可用的渠道联系我们。
总体设计
通过极限网关将应用对主集群的写操作,复制到灾备集群。应用发送的读请求则直接转发到主集群,并将响应结果转发给应用。应用对网关无感知,访问方式与访问 ES 集群一样。
方案优势
- 轻量级
极限网关使用 Golang 编写,安装包很小,只有 10MB 左右,没有任何外部环境依赖,部署安装都非常简单,只需要下载对应平台的二进制可执行文件,启动网关程序的二进制程序文件执行即可。
- 跨版本支持
极限网关针对不同的 Elasticsearch 版本做了兼容和针对性处理,能够让业务代码无缝的进行适配,后端 Elasticsearch 集群版本升级能够做到无缝过渡,降低版本升级和数据迁移的复杂度。
- 高可用
极限网关内置多种高可用解决方案,前端请求入口支持基于虚拟 IP 的双机热备,后端集群支持集群拓扑的自动感知,节点上下线能自动发现,自动处理后端故障,自动进行请求的重试和迁移。
- 灵活性
主备集群都是可读可写,切换迅速,只需切换网关到另一套配置即可。回切灵活,恢复使用原配置即可。
架构图
网关程序部署
下载
根据操作系统和平台选择下面相应的安装包: 解压到指定目录:
mkdir gateway
tar -zxf xxx.gz -C gateway
修改网关配置
在此 下载 网关配置,默认网关会加载配置文件 gateway.yml ,如果要指定其他配置文件使用 -config 选项指定。 网关配置文件内容较多,下面展示必要部分。
#primary
PRIMARY_ENDPOINT: http://192.168.56.3:7171
PRIMARY_USERNAME: elastic
PRIMARY_PASSWORD: password
PRIMARY_MAX_QPS_PER_NODE: 10000
PRIMARY_MAX_BYTES_PER_NODE: 104857600 #100MB/s
PRIMARY_MAX_CONNECTION_PER_NODE: 200
PRIMARY_DISCOVERY_ENABLED: false
PRIMARY_DISCOVERY_REFRESH_ENABLED: false
#backup
BACKUP_ENDPOINT: http://192.168.56.3:9200
BACKUP_USERNAME: admin
BACKUP_PASSWORD: admin
BACKUP_MAX_QPS_PER_NODE: 10000
BACKUP_MAX_BYTES_PER_NODE: 104857600 #100MB/s
BACKUP_MAX_CONNECTION_PER_NODE: 200
BACKUP_DISCOVERY_ENABLED: false
BACKUP_DISCOVERY_REFRESH_ENABLED: false
PRIMARY_ENDPOINT:配置主集群地址和端口
PRIMARY_USERNAME、PRIMARY_PASSWORD: 访问主集群的用户信息
BACKUP_ENDPOINT:配置备集群地址和端口
BACKUP_USERNAME、BACKUP_PASSWORD: 访问备集群的用户信息
运行网关
前台运行 直接运行网关程序即可启动极限网关了,如下:
./gateway-linux-amd64
后台运行
./gateway-linux-amd64 -service install
Success
./gateway-linux-amd64 -service start
Success
卸载服务
./gateway-linux-amd64 -service stop
Success
./gateway-linux-amd64 -service uninstall
Success
灾备功能测试
在灾备场景下,为保证数据一致性,对集群的访问操作都通过网关进行。注意只有 bulk API 的操作才会被复制到备集群。 在此次测试中,网关灾备配置功能为:
- 主备集群正常时
读写请求正常执行; 写请求被记录到队列,备集群实时消费队列数据。
- 当主集群故障时
写入请求报错,主备集群都不写入数据; 查询请求转到备集群执行,并返回结果给客户端。
- 当备集群故障时
读写请求都正常执行; 写操作记录到磁盘队列,待备集群恢复后,自动消费队列数据直到两个集群一致。
主备集群正常时写入、查询测试
写入数据
# 通过网关写入数据
curl -X POST "localhost:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "create" : { "_index" : "test", "_id" : "2" } }
{ "field2" : "value2" }
'
查询数据
# 查询主集群
curl 192.168.56.3:7171/test/_search?pretty -uelastic:password
# 查询备集群
curl 192.168.56.3:9200/test/_search?pretty -uadmin:admin
# 查询网关,网关转发给主集群执行
curl 192.168.56.3:18000/test/_search?pretty -uelastic:password
主备集群都已写入数据,且数据一致。通过网关查询,也正常返回。
删除和更新文档
# 通过网关删除和更新文档
curl -X POST "192.168.56.3:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "delete" : { "_index" : "test", "_id" : "1" } }
{ "update" : {"_id" : "2", "_index" : "test"} }
{ "doc" : {"field2" : "value2-updated"} }
'
查询数据
# 查询主集群
curl 192.168.56.3:7171/test/_search?pretty -uelastic:password
# 查询备集群
curl 192.168.56.3:9200/test/_search?pretty -uadmin:admin
两个集群都已执行删除和更新操作,数据一致。
主集群故障时写入、查询测试
为模拟主集群故障,直接关闭主集群。
写入数据
# 通过网关写入数据
curl -X POST "192.168.56.3:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "index" : { "_index" : "test", "_id" : "3" } }
{ "field3" : "value3" }
{ "create" : { "_index" : "test", "_id" : "4" } }
{ "field4" : "value4" }
'
写入数据报错
查询数据
# 通过网关查询,因为主集群不可用,网关将查询转发到备集群执行
curl 192.168.56.3:18000/test/_search?pretty -uelastic:password
正常查询到数据,说明请求被转发到了备集群执行。
备集群故障时写入、查询测试
为模拟备集群故障,直接关闭备集群。
写入数据
# 通过网关写入数据
curl -X POST "192.168.56.3:18000/_bulk?pretty" -H 'Content-Type: application/json' -uelastic:password -d'
{ "index" : { "_index" : "test", "_id" : "5" } }
{ "field5" : "value5" }
{ "create" : { "_index" : "test", "_id" : "6" } }
{ "field6" : "value6" }
'
数据正常写入。
查询数据
# 通过网关查询
curl 192.168.56.3:18000/test/_search?pretty -uelastic:password
查询成功返回。主集群成功写入了两条新数据。同时此数据会被记录到备集群的队列中,待备集群恢复后,会消费此队列追数据。
恢复备集群
启动备集群。
查询数据
等待片刻或通过 INFINI Console 确定网关队列消费完毕后,查询备集群的数据。 (生产和消费 offset 相同,说明消费完毕。)
# 查询备集群的数据
curl 192.168.56.3:9200/test/_search?pretty -uadmin:admin
备集群启动后自动消费队列数据,消费完后备集群数据达到与主集群数据一致。
灾备切换
测试了这么多,终于到切换的时刻了。切换前我们判断下主系统是否短期无法修复。
如果我们判断主用系统无法短时间恢复,要执行切换。非常简单,我们直接将配置文件中定义的主备集群互换,然后重启网关程序就行了。但我们推荐在相同主机上另部署一套网关程序--网关B,先前那套用网关A指代。网关B中的配置文件把原备集群定义为主集群,原主集群定义为备集群。若要执行切换,我们先停止网关A,然后启动网关B,此时应用连接到网关(端口不变),就把原备系统当作主系统使用,把原主系统当作备系统,也就完成了主备系统的切换。
灾备回切
当原主集群修复后,正常启动,就会从消费队列追写修复期间产生数据直到主备数据一致,同样我们可通过 INFINI Console 查看消费的进度。如果大家还是担心数据的一致性,INFINI Console 还能帮大家做[校验数据]()任务,做到数据完全一致后(文档数量及文档内容一致),才进行回切。
回切也非常简单,停止网关B,启动网关A即可。
网关高可用
网关自带浮动 IP 模块,可进行双机热备。客户端通过 VIP 连接网关,网关出现故障时,VIP 漂移到备网关。 视频教程戳这里。
这样的优点是简单,不足是只有一个网关在线提供服务。如果想多个网关在线提供服务,则需搭配分布式消息系统一起工作,架构如下。
前端通过负载均衡将流量分散到多个在线网关,网关将消息存入分布式消息系统。此时,网关可看作无状态应用,可根据需要扩缩规模。
以上就是我介绍的ES灾备方案,是不是相当灵活了。有问题还是那句话 Call me 。
关于极限网关
INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。
在 Kibana 里统一访问来自不同集群的索引
资料分享 • medcl 发表了文章 • 0 个评论 • 1965 次浏览 • 2022-04-21 15:29
在 Kibana 里统一访问来自不同集群的索引
现在有这么一个需求,客户根据需要将数据按照业务维度划分,将索引分别存放在了不同的三个集群, 将一个大集群拆分成多个小集群有很多好处,比如降低了耦合,带来了集群可用性和稳定性方面的好处,也避免了单个业务的热点访问造成其他业务的影响, 尽管拆分集群是很常见的玩法,但是管理起来不是那么方便了,尤其是在查询的时候,可能要分别访问三套集群各自的 API,甚至要切换三套不同的 Kibana 来访问集群的数据, 那么有没有办法将他们无缝的联合在一起呢?
极限网关!
答案自然是有的,通过将 Kibana 访问 Elasticsearch 的地址切换为极限网关的地址,我们可以将请求按照索引来进行智能的路由, 也就是当访问不同的业务索引时会智能的路由到不同的集群,如下图:
上图,我们分别有 3 个不同的索引:
- apm-*
- erp-*
- mall-*
分别对应不同的三套 Elasticsearch 集群:
- ES1-APM
- ES2-ERP
- ES3-MALL
接下来我们来看如何在极限网关里面进行相应的配置来满足这个业务需求。
配置集群信息
首先配置 3 个集群的连接信息。
elasticsearch:
- name: es1-apm
enabled: true
endpoints:
- http://192.168.3.188:9206
- name: es2-erp
enabled: true
endpoints:
- http://192.168.3.188:9207
- name: es3-mall
enabled: true
endpoints:
- http://192.168.3.188:9208
配置服务 Flow
然后,我们定义 3 个 Flow,分别对应用来访问 3 个不同的 Elasticsearch 集群,如下:
flow:
- name: es1-flow
filter:
- elasticsearch:
elasticsearch: es1-apm
- name: es2-flow
filter:
- elasticsearch:
elasticsearch: es2-erp
- name: es3-flow
filter:
- elasticsearch:
elasticsearch: es3-mall
然后再定义一个 flow 用来进行路径的判断和转发,如下:
- name: default-flow
filter:
- switch:
remove_prefix: false
path_rules:
- prefix: apm-
flow: es1-flow
- prefix: erp-
flow: es2-flow
- prefix: mall-
flow: es3-flow
- flow: #default flow
flows:
- es1-flow
根据请求路径里面的索引前缀来匹配不同的索引,并转发到不同的 Flow。
配置路由信息
接下来,我们定义路由信息,具体配置如下:
router:
- name: my_router
default_flow: default-flow
指向上面定义的默认 flow 来统一请求的处理。
定义服务及关联路由
最后,我们定义一个监听为 8000 端口的服务,用来提供给 Kibana 来进行统一的入口访问,如下:
entry:
- name: es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
完整配置
最后的完整配置如下:
path.data: data
path.logs: log
entry:
- name: es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
flow:
- name: default-flow
filter:
- switch:
remove_prefix: false
path_rules:
- prefix: apm-
flow: es1-flow
- prefix: erp-
flow: es2-flow
- prefix: mall-
flow: es3-flow
- flow: #default flow
flows:
- es1-flow
- name: es1-flow
filter:
- elasticsearch:
elasticsearch: es1-apm
- name: es2-flow
filter:
- elasticsearch:
elasticsearch: es2-erp
- name: es3-flow
filter:
- elasticsearch:
elasticsearch: es3-mall
router:
- name: my_router
default_flow: default-flow
elasticsearch:
- name: es1-apm
enabled: true
endpoints:
- http://192.168.3.188:9206
- name: es2-erp
enabled: true
endpoints:
- http://192.168.3.188:9207
- name: es3-mall
enabled: true
endpoints:
- http://192.168.3.188:9208
启动网关
直接启动网关,如下:
➜ gateway git:(master) ✗ ./bin/gateway -config sample-configs/elasticsearch-route-by-index.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2022-04-20 08:23:56, 2023-12-31 10:10:10, 51650a5c3d6aaa436f3c8a8828ea74894c3524b9
[04-21 13:41:21] [INF] [app.go:174] initializing gateway.
[04-21 13:41:21] [INF] [app.go:175] using config: /Users/medcl/go/src/infini.sh/gateway/sample-configs/elasticsearch-route-by-index.yml.
[04-21 13:41:21] [INF] [instance.go:72] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/c9bpg0ai4h931o4ngs3g
[04-21 13:41:21] [INF] [app.go:283] gateway is up and running now.
[04-21 13:41:21] [INF] [api.go:262] api listen at: http://0.0.0.0:2900
[04-21 13:41:21] [INF] [reverseproxy.go:255] elasticsearch [es1-apm] hosts: [] => [192.168.3.188:9206]
[04-21 13:41:21] [INF] [reverseproxy.go:255] elasticsearch [es2-erp] hosts: [] => [192.168.3.188:9207]
[04-21 13:41:21] [INF] [reverseproxy.go:255] elasticsearch [es3-mall] hosts: [] => [192.168.3.188:9208]
[04-21 13:41:21] [INF] [actions.go:349] elasticsearch [es2-erp] is available
[04-21 13:41:21] [INF] [actions.go:349] elasticsearch [es1-apm] is available
[04-21 13:41:21] [INF] [entry.go:312] entry [es_entry] listen at: http://0.0.0.0:8000
[04-21 13:41:21] [INF] [module.go:116] all modules are started
[04-21 13:41:21] [INF] [actions.go:349] elasticsearch [es3-mall] is available
[04-21 13:41:55] [INF] [reverseproxy.go:255] elasticsearch [es1-apm] hosts: [] => [192.168.3.188:9206]
网关启动成功之后,就可以通过网关的 IP+8000 端口来访问目标 Elasticsearch 集群了。
测试访问
首先通过 API 来访问测试一下,如下:
➜ ~ curl http://localhost:8000/apm-2022/_search -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8000 (#0)
> GET /apm-2022/_search HTTP/1.1
> Host: localhost:8000
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 21 Apr 2022 05:45:44 GMT
< content-type: application/json; charset=UTF-8
< Content-Length: 162
< X-elastic-product: Elasticsearch
< X-Backend-Cluster: es1-apm
< X-Backend-Server: 192.168.3.188:9206
< X-Filters: filters->elasticsearch
<
* Connection #0 to host localhost left intact
{"took":142,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}%
可以看到 apm-2022 指向了后端的 es1-apm
集群。
继续测试,erp 索引的访问,如下:
➜ ~ curl http://localhost:8000/erp-2022/_search -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8000 (#0)
> GET /erp-2022/_search HTTP/1.1
> Host: localhost:8000
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 21 Apr 2022 06:24:46 GMT
< content-type: application/json; charset=UTF-8
< Content-Length: 161
< X-Backend-Cluster: es2-erp
< X-Backend-Server: 192.168.3.188:9207
< X-Filters: filters->switch->filters->elasticsearch->skipped
<
* Connection #0 to host localhost left intact
{"took":12,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":0,"relation":"eq"},"max_score":null,"hits":[]}}%
继续测试,mall 索引的访问,如下:
➜ ~ curl http://localhost:8000/mall-2022/_search -v
* Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8000 (#0)
> GET /mall-2022/_search HTTP/1.1
> Host: localhost:8000
> User-Agent: curl/7.54.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Date: Thu, 21 Apr 2022 06:25:08 GMT
< content-type: application/json; charset=UTF-8
< Content-Length: 134
< X-Backend-Cluster: es3-mall
< X-Backend-Server: 192.168.3.188:9208
< X-Filters: filters->switch->filters->elasticsearch->skipped
<
* Connection #0 to host localhost left intact
{"took":8,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}%
完美转发。
修改 Kibana 配置
修改 Kibana 的配置文件: kibana.yml
,替换 Elasticsearch 的地址为网关地址(http://192.168.3.200:8000
),如下:
elasticsearch.hosts: ["http://192.168.3.200:8000"]
重启 Kibana 让配置生效。
效果如下
可以看到,在一个 Kibana 的开发者工具里面,我们已经可以像操作一个集群一样来同时读写实际上来自三个不同集群的索引数据了。
展望
通过极限网关,我们还可以非常灵活的进行在线请求的流量编辑,动态组合不同集群的操作。
在极限网关里面使用 JavaScript 脚本来进行复杂的查询改写
资料分享 • medcl 发表了文章 • 1 个评论 • 1192 次浏览 • 2022-04-19 11:55
使用 JavaScript 脚本来进行复杂的查询改写
有这么一个需求:
网关里怎样对跨集群搜索进行支持的呢?我想实现: 输入的搜索请求是 lp:9200/index1/_search 这个索引在3个集群上,需要跨集群检索,也就是网关能否改成 lp:9200/cluster01:index1,cluster02,index1,cluster03:index1/_search 呢? 索引有一百多个,名称不一定是 app, 还可能多个索引一起的。
极限网关自带的过滤器 content_regex_replace
虽然可以实现字符正则替换,但是这个需求是带参数的变量替换,稍微复杂一点,没有办法直接用这个正则替换实现,有什么其他办法实现么?
使用脚本过滤器
当然有的,上面的这个需求,理论上我们只需要将其中的索引 index1
匹配之后,替换为 cluster01:index1,cluster02,index1,cluster03:index1
就行了。
答案就是使用自定义脚本来做,再复杂的业务逻辑都不是问题,都能通过自定义脚本来实现,一行脚本不行,那就两行。
使用极限网关提供的 JavaScript 过滤器可以很灵活的实现这个功能,具体继续看。
定义过滤器
首先创建一个脚本文件,放在网关数据目录的 scripts
子目录下面,如下:
➜ gateway ✗ tree data
data
└── gateway
└── nodes
└── c9bpg0ai4h931o4ngs3g
├── kvdb
├── queue
├── scripts
│ └── index_path_rewrite.js
└── stats
这个脚本的内容如下:
function process(context) {
var originalPath = context.Get("_ctx.request.path");
var matches = originalPath.match(/\/?(.*?)\/_search/)
var indexNames = [];
if(matches && matches.length > 1) {
indexNames = matches[1].split(",")
}
var resultNames = []
var clusterNames = ["cluster01", "cluster02"]
if(indexNames.length > 0) {
for(var i=0; i<indexNames.length; i++){
if(indexNames[i].length > 0) {
for(var j=0; j<clusterNames.length; j++){
resultNames.push(clusterNames[j]+":"+indexNames[i])
}
}
}
}
if (resultNames.length>0){
var newPath="/"+resultNames.join(",")+"/_search";
context.Put("_ctx.request.path",newPath);
}
}
和普通的 JavaScript 一样,定义一个特定的函数 process
来处理请求里面的上下文信息,_ctx.request.path
是网关内置上下文的一个变量,用来获取请求的路径,通过 context.Get("_ctx.request.path")
在脚本里面进行访问。
中间我们使用了 JavaScript 的正则匹配和字符处理,做了一些字符拼接,得到新的路径 newPath
变量,最后使用 context.Put("_ctx.request.path",newPath)
更新网关请求的路径信息,从而实现查询条件里面的参数替换。
有关网关内置上下文的变量列表,请访问 Request Context
接下来,创建一个网关配置,并使用 javascript
过滤器调用该脚本,如下:
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
flow:
- name: default_flow
filter:
- dump:
context:
- _ctx.request.path
- javascript:
file: index_path_rewrite.js
- dump:
context:
- _ctx.request.path
- elasticsearch:
elasticsearch: dev
router:
- name: my_router
default_flow: default_flow
elasticsearch:
- name: dev
enabled: true
schema: http
hosts:
- 192.168.3.188:9206
上面的例子中,使用了一个 javascript
过滤器,并且指定了加载的脚本文件为 index_path_rewrite.js
,并使用了两个 dump
过滤器来输出脚本运行前后的路径信息,最后再使用一个 elasticsearch
过滤器来转发请求给 Elasticsearch 进行查询。
我们启动网关测试一下,如下:
➜ gateway ✗ ./bin/gateway
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2022-04-18 07:11:09, 2023-12-31 10:10:10, 8062c4bc6e57a3fefcce71c0628d2d4141e46953
[04-19 11:41:29] [INF] [app.go:174] initializing gateway.
[04-19 11:41:29] [INF] [app.go:175] using config: /Users/medcl/go/src/infini.sh/gateway/gateway.yml.
[04-19 11:41:29] [INF] [instance.go:72] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/c9bpg0ai4h931o4ngs3g
[04-19 11:41:29] [INF] [app.go:283] gateway is up and running now.
[04-19 11:41:30] [INF] [api.go:262] api listen at: http://0.0.0.0:2900
[04-19 11:41:30] [INF] [entry.go:312] entry [my_es_entry] listen at: http://0.0.0.0:8000
[04-19 11:41:30] [INF] [module.go:116] all modules are started
[04-19 11:41:30] [INF] [actions.go:349] elasticsearch [dev] is available
运行下面的查询来验证查询结果,如下:
curl localhost:8000/abc,efg/_search
可以看到网关通过 dump
过滤器输出的调试信息:
---- DUMPING CONTEXT ----
_ctx.request.path : /abc,efg/_search
---- DUMPING CONTEXT ----
_ctx.request.path : /cluster01:abc,cluster02:abc,cluster01:efg,cluster02:efg/_search
查询条件按照我们的需求进行了改写,Nice!
重写 DSL 查询语句
好吧,我们刚刚只是修改了查询的索引而已,那么查询请求的 DSL 呢?行不行?
那自然是可以的嘛,瞧下面的例子:
function process(context) {
var originalDSL = context.Get("_ctx.request.body");
if (originalDSL.length >0){
var jsonObj=JSON.parse(originalDSL);
jsonObj.size=123;
jsonObj.aggs= {
"test1": {
"terms": {
"field": "abc",
"size": 10
}
}
}
context.Put("_ctx.request.body",JSON.stringify(jsonObj));
}
}
先是获取查询请求,然后转换成 JSON 对象,之后任意修改查询对象就行了,保存回去,搞掂。
测试一下:
curl -XPOST localhost:8000/abc,efg/_search -d'{"query":{}}'
输出:
---- DUMPING CONTEXT ----
_ctx.request.path : /abc,efg/_search
_ctx.request.body : {"query":{}}
[04-19 18:14:24] [INF] [reverseproxy.go:255] elasticsearch [dev] hosts: [] => [192.168.3.188:9206]
---- DUMPING CONTEXT ----
_ctx.request.path : /abc,efg/_search
_ctx.request.body : {"query":{},"size":123,"aggs":{"test1":{"terms":{"field":"abc","size":10}}}}
是不是感觉解锁了新的世界?
结论
通过使用 Javascript 脚本过滤器,我们可以非常灵活的进行复杂逻辑的操作来满足我们的业务需求。
极限网关初探(2)配置
Elasticsearch • xushuhui 发表了文章 • 0 个评论 • 1802 次浏览 • 2022-04-06 17:03
配置
上一篇我们先学习了极限网关的安装和启动,今天学习配置。
读写分离
现在我们遇到读写分离的需求,用网关该怎么做呢? 假设服务端现在从 http://127.0.0.1:8000 写入数据,从 http://127.0.0.1:9000 读取数据,怎么设计呢?
首先查看文档配置文档
我们在 gateway.yml 中定义两个 entry,分别绑定不同的端口,配置不同的 router
entry:
- name: write_es
enabled: true
router: write_router
network:
binding: 0.0.0.0:8000
- name: read_es
enabled: true
router: read_router
network:
binding: 0.0.0.0:9000
router:
- name: write_router
default_flow: default_flow
tracing_flow: logging
- name: read_router
default_flow: default_flow
tracing_flow: logging
为了演示效果,只配置一个 Elasticsearch
elasticsearch:
- name: dev
enabled: true
schema: http
hosts:
- 192.168.3.188:9206
启动项目
我们从 http://127.0.0.1:8000 写入一条数据,再从 http://127.0.0.1:9000 读取该条数据
添加接口
返回字符串
我们想自定义添加一个接口,怎么在不写代码的情况下通过配置实现返回字符串
flow:
- name: hello_flow
filter:
- echo:
message: "hello flow"
router:
- name: read_router
default_flow: hello_flow
修改配置后启动
返回 json 数据
返回字符串不符合标准的 restful 接口规范,怎么返回给调用方标准 json 数据?
filter:
- set_response:
content_type: application/json
body: '{"message":"hello world"}'
修改配置后启动
修改路由
我们已经新加了接口,返回 json 数据,但是接口是直接定义在 http://127.0.0.1:9000 中,之前网关的接口就无法使用,所以我们需要单独为自定义的接口指定单独的路由
router:
- name: read_router
default_flow: default_flow
tracing_flow: logging
rules:
- method:
- GET
pattern:
- "/hello"
flow:
- hello_flow
default_flow: 默认的处理流,也就是业务处理的主流程,请求转发、过滤、缓存等操作都在这里面进行
tracing_flow:用于追踪请求状态的流,用于记录请求日志、统计等
如果我们有过开发经验,了解 MVC 模式,flow 就类似 MVC 中的 Controller,rules 中类似路由规则,当请求匹配到配置中的路由规则时,由配置的 flow 处理业务逻辑。
数据整体流向,从服务端发到网关,网关为每个 Elasticsearch 绑定不同的 IP 地址,每个 Elasticsearch 都有唯一一个 router 和它对应,根据请求的 method 和 path 匹配到 router 中的一个 flow,flow 中包含多个 filter 处理对数据进行流式处理。
如下图所示
流式处理是什么,假设水从一个管子里面流出来,管子旁边每一段依次站了几个人,第一个人往水里放点鱼,鱼和水到了第二个人,第二个人往水里放点草,鱼、水和草到了第三人等等,每个人对水做一定的操作,水经过这些操作后最后到达水池里。
我们可以把数据当成水,filter 是管子旁边的人,水池就是 Elasticsearch
总结
在学习了router/flow/filter后,我们已经对极限网关的配置有了初步的了解,后续开发的时候直接查阅文档。
极限网关初探(1) 安装启动
Elasticsearch • xushuhui 发表了文章 • 0 个评论 • 1806 次浏览 • 2022-04-06 16:54
产品介绍
极限网关(INFINI Gateway)是一个面向 Elasticsearch 的高性能应用网关。特性丰富,使用简单。
它和其他业务型网关最大的区别是业务网关把请求转发给各个底层微服务,而它把请求转发给 Elasticsearch,更多是类似 Mycat 的中间件的作用。
没有使用网关之前,服务端请求多个节点
使用网关后
下载地址
打开 下载地址,根据操作系统版本选择。
Windows 安装和启动
安装
下载 gateway-1.6.0_SNAPSHOT-597-windows-amd64.zip,解压如下。 gateway-windows-amd64.exe 是启动文件,gateway.yml 是默认配置文件。
启动失败
当 gateway.yml 的 elasticsearch 选项中的 hosts 不能正常响应请求的时候,启动界面如下。
为什么 elasticsearch 不能访问的时候,网关还要继续提供服务呢,为什么不像业务接口启动时在基础业务组件如 MySQL/Redis 不能正常响应就直接 panic?
一方面网关作为 elasticsearch 抵挡流量冲击的城墙,在 elasticsearch 不能提供服务的时候,对之前成功的请求缓存结果,继续提供有限度的服务,为 elasticsearch 修复后上线争取时间。
另一方面业务接口和基础组件是强耦合关系,没有基础组件就完全无法对外提供数据读写服务,而网关与 elasticsearch 是松耦合关系,网关在没有 elasticsearch 的情况下也能对外提供有限度的服务。
在 gateway.yml 的 elasticsearch 选项中的 hosts 改成能够正常响应的 elasticsearch 请求地址。
启动成功
双击 gateway-windows-amd64.exe 文件,启动成功界面如下
访问
API 访问
由启动后终端显示可知,网关的 API 接口地址是 http://localhost:2900
[api.go:262] api listen at: http://0.0.0.0:2900
打开浏览器输入 http://localhost:2900,显示所有可以对外提供的 API 接口
我们选择其中一个,在浏览器中输入 http://localhost:2900/_framework/api/_version 从路由上看该接口是查询产品的版本信息,显示如下
gateway.yml 中可以看到有被注释掉的一段配置,看起来应该是配置 api 地址的地方。
#api:
# enabled: true
# network:
# binding: 127.0.0.1:2900
把注释去掉后尝试把端口改成 2901。
api:
enabled: true
network:
binding: 127.0.0.1:2901
改完后启动 打开浏览器先输入 http://localhost:2900,无法正常响应请求,再输入 http://localhost:2901,可以正常响应,界面和修改配置前访问 http://localhost:2900 的界面一样,说明 API 请求地址成功修改
Elasticsearch 访问
启动日志中显示监听 8000 端口,猜测应该是 elasticsearch 请求地址,打开浏览器输入 http://127.0.0.1:8000/
entry [my_es_entry] listen at: http://0.0.0.0:8000
gateway.yml 中可以看到 my_es_entry 的 network 绑定 8000 端口,显而易见的这部分就是配置代理转发给 elasticsearch 的地址,所以安装后只需要把以前请求 elasticsearch 的地址修改为该地址。
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
总结
我们成功安装和启动极限网关,接下来我们学习怎么根据需求修改配置。
兼容不同版本的查询响应结果的 Count 结构
Elasticsearch • medcl 发表了文章 • 0 个评论 • 3 次浏览 • 2022-02-21 16:08
使用极限网关来代理 Kibana
Kibana • medcl 发表了文章 • 0 个评论 • 1825 次浏览 • 2022-02-15 20:29
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
skip_occupied_port: true
tls:
enabled: true
flow:
- name: default_flow
filter:
- basic_auth:
valid_users:
medcl: passwd
- http:
schema: "http" #https or http
host: "192.168.3.98:5601"
router:
- name: my_router
default_flow: default_flow
修改里面的 ip 为你实际的 Kibana IP 地址,如需多个,使用 hosts 参数,详见文档手册:
https://极限网关.com/docs/references/filters/http/
启动网关,因为自动开启了 https,访问网关的服务地址:https://localhost:8000,如下:
输入配置的用户名密码,即可。
简直无语,升级到 Elasticsearch 8.0 之后,我的数据居然写不进去了
Elasticsearch • medcl 发表了文章 • 5 个评论 • 7470 次浏览 • 2022-02-11 19:45
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "Action/metadata line [1] contains an unknown parameter [_type]"
}
],
"type": "illegal_argument_exception",
"reason": "Action/metadata line [1] contains an unknown parameter [_type]"
},
"status": 400
}
熟悉的 type,不一样的问题,大爷的,还是大意了。
type 这厮在 8.0 里面已经被正式干掉了,而我还傻傻的等在原地。
怎么办,我几万个 agent、上千个 Filebeat 和 Logstash 大军总不能现在就升级吧,还不知道有没有幺蛾子在前方等待我,心里有点方。
恩,这个时候我想到了鲁迅说的,用 Elasitcsearch 之前先看看极限网关(文档地址:http://极限网关.com) ,所以我立马就去南山寺问了问极限网关,他说可以, O 了。
首先,从这里下载最新的极限网关 523 版本:http://release.elasticsearch.cn/gateway/snapshot/
然后,解压,可以看到立马有一个配置,佛祖显灵,居然有一个配置文件`sample-configs/v8-bulk-indexing-compatibility.yml` 在闪光,就是它:
恩,看来是这个 filter 可以帮我去掉了当前 whatever 版本生成的多余的不要的 type,修改配置我就试试看。
启动启动启动,如下:
没有报错!yes。
查看索引的统计指标:
O 了,数据都正常写入了,采集端啥都不用动,就能用上新版的 Elasticsearch,真是快哉。晚上自己加个卤蛋。
以上情节,纯属虚构,如有雷同,哪又咋地。
使用极限网关来处置 Elasticsearch 的 Apache Log4j 漏洞
Elasticsearch • medcl 发表了文章 • 2 个评论 • 5220 次浏览 • 2021-12-11 03:57
昨日爆出的 Log4j 安全漏洞,业界一片哗然,今天给大家介绍一下,如何使用极限网关来快速处置 Elasticsearch 的 Apache Log4j 漏洞。
【CVE 地址】
https://github.com/advisories/GHSA-jfh8-c2jp-5v3q
【漏洞描述】
Apache Log4j 是一款非常流行的开源的用于 Java 运行环境的日志记录工具包,大量的 Java 框架包括 Elasticsearch 的最新版本都使用了该组件,故影响范围非常之大。
近日, 随着 Apache Log4j 的远程代码执行最新漏洞细节被公开,攻击者可通过构造恶意请求利用该漏洞实现在目标服务器上执行任意代码。可导致服务器被黑客控制,从而进行页面篡改、数据窃取、挖矿、勒索等行为。建议使用该组件的用户第一时间启动应急响应进行修复。
简单总结一下就是,在使用 Log4j 打印输出的日志中,如果发现日志内容中包含关键词 ${
,那么这个里面包含的内容会当做变量来进行替换和执行,导致攻击者可以通过恶意构造日志内容来让 Java 进程来执行任意命令,达到攻击的效果。
【漏洞等级】:非常紧急
此次漏洞是用于 Log4j2 提供的 lookup 功能造成的,该功能允许开发者通过一些协议去读取相应环境中的配置。但在实现的过程中,并未对输入进行严格的判断,从而造成漏洞的发生。
【影响范围】:Java 类产品:Apache Log4j 2.x < 2.15.0-rc2,Elasticsearch 当前所有版本。
【攻击检测】
可以通过检查日志中是否存在 jndi:ldap://
、jndi:rmi
等字符来发现可能的攻击行为。
处理办法
最简单的办法是通过修改 config/jvm.options
,新增以下参数,重启集群所有节点即可。
-Dlog4j2.formatMsgNoLookups=true
不过,如果集群规模较大,数据较多,业务不能中断,不能通过修改 Elasticsearch 配置、或者替换 Log4j 的最新 jar 包来重启集群的情况,可以考虑使用极限网关来进行拦截或者参数替换甚至是直接阻断请求。
通过在网关层对发往 Elasticsearch 的请求统一进行参数检测,将包含的敏感关键词 ${
进行替换或者直接拒绝,
可以防止带攻击的请求到达 Elasticsearch 服务端而被 Log4j 打印相关日志的时候执行恶意攻击命令,从而避免被攻击。
极限网关是透明代理,只需要在应用端,将以往配置指向 Elasticsearch 的地址替换为现在网关的地址即可,其他都不用动。
参考配置
下载最新的 1.5.0-SNAPSHOT
版本http://release.elasticsearch.cn/gateway/snapshot/
使用极限网关的 context_filter
过滤器,对请求上下文 _ctx.request.to_string
进行关键字检测,过滤掉恶意流量,从而阻断攻击。
新增一个配置文件 gateway.yml
path.data: data
path.logs: log
entry:
- name: es_entrypoint
enabled: true
router: default
max_concurrency: 20000
network:
binding: 0.0.0.0:8000
router:
- name: default
default_flow: main_flow
flow:
- name: main_flow
filter:
- context_filter:
context: _ctx.request.to_string
action: redirect_flow
status: 403
flow: log4j_matched_flow
must_not: # any match will be filtered
regex:
- \$\{.*?\}
- "%24%7B.*?%7D" #urlencode
contain:
- "jndi:"
- "jndi:ldap:"
- "jndi:rmi:"
- "jndi%3A" #urlencode
- "jndi%3Aldap%3A" #urlencode
- "jndi%3Armi%3A" #urlencode
- elasticsearch:
elasticsearch: es-server
- name: log4j_matched_flow
filter:
- echo:
message: 'Apache Log4j 2, Boom!'
elasticsearch:
- name: es-server
enabled: true
endpoints:
- http://localhost:9200
启动网关:
➜ ./bin/gateway -config /tmp/gateway.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2021-12-10 23:55:34, 2212aff
[12-11 01:55:49] [INF] [app.go:250] initializing gateway.
[12-11 01:55:49] [INF] [instance.go:26] workspace: /Users/medcl/go/src/infini.sh/gateway/data/gateway/nodes/0
[12-11 01:55:49] [INF] [api.go:261] api listen at: http://0.0.0.0:2900
[12-11 01:55:49] [INF] [reverseproxy.go:253] elasticsearch [es-server] hosts: [] => [localhost:9200]
[12-11 01:55:49] [INF] [entry.go:296] entry [es_entrypoint] listen at: http://0.0.0.0:8000
[12-11 01:55:49] [INF] [module.go:116] all modules started
[12-11 01:55:49] [INF] [app.go:357] gateway is running now.
[12-11 01:55:49] [INF] [actions.go:236] elasticsearch [es-server] is available
将要使用的测试命令 ${java:os}
使用 urlencode 转码为 %24%7Bjava%3Aos%7D
,构造查询语句,分别测试。
不走网关:
~% curl 'http://localhost:9200/index1/_search?q=%24%7Bjava%3Aos%7D'
{"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"index1","index_uuid":"_na_","index":"index1"}],"type":"index_not_found_exception","reason":"no such index","resource.type":"index_or_alias","resource.id":"index1","index_uuid":"_na_","index":"index1"},"status":404}%
查看 Elasticsearch 端日志为:
[2021-12-11T01:49:50,303][DEBUG][r.suppressed ] path: /index1/_search, params: {q=Mac OS X 10.13.4 unknown, architecture: x86_64-64, index=index1}
org.elasticsearch.index.IndexNotFoundException: no such index
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.infe(IndexNameExpressionResolver.java:678) ~[elasticsearch-5.6.15.jar:5.6.15]
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.innerResolve(IndexNameExpressionResolver.java:632) ~[elasticsearch-5.6.15.jar:5.6.15]
at org.elasticsearch.cluster.metadata.IndexNameExpressionResolver$WildcardExpressionResolver.resolve(IndexNameExpressionResolver.java:580) ~[elasticsearch-5.6.15.jar:5.6.15]
可以看到查询条件里面的 q=${java:os}
被执行了,变成了 q=Mac OS X 10.13.4 unknown, architecture: x86_64-64, index=index1
,说明变量被解析且执行,存在漏洞利用的风险。
那走网关之后呢:
~% curl 'http://localhost:8000/index1/_search?q=%24%7Bjava%3Aos%7D'
Apache Log4j 2, Boom!%
可以看到请求被过滤掉了,返回了自定义的信息。
还有一些其他测试命令,大家也可以试试:
#{java:vm}
~% curl 'http://localhost:9200/index/_search?q=%24%7Bjava%3Avm%7D'
[2021-12-11T02:36:04,764][DEBUG][r.suppressed ] [INFINI-2.local] path: /index/_search, params: {q=OpenJDK 64-Bit Server VM (build 25.72-b15, mixed mode), index=index}
~% curl 'http://localhost:8000/index/_search?q=%24%7Bjava%3Avm%7D'
Apache Log4j 2, Boom!%
#{jndi:rmi://localhost:1099/api}
~% curl 'http://localhost:9200/index/_search?q=%24%7Bjndi%3Armi%3A%2F%2Flocalhost%3A1099%2Fapi%7D'
2021-12-11 03:35:06,493 elasticsearch[YOmFJsW][search][T#3] ERROR An exception occurred processing Appender console java.lang.SecurityException: attempt to add a Permission to a readonly Permissions object
~% curl 'http://localhost:8000/index/_search?q=%24%7Bjndi%3Armi%3A%2F%2Flocalhost%3A1099%2Fapi%7D'
Apache Log4j 2, Boom!%
另外不同版本的 Elasticsearch 对于攻击的复现程度参差不齐,因为 es 不同版本是否有 Java Security Manager 、不同版本 JDK 、以及默认配置也不相同,新一点的 es 其实同样可以触发恶意请求,只不过网络调用被默认的网络策略给拒绝了,相对安全,当然如果设置不当同样存在风险,见过很多用户一上来就关默认安全配置的,甚至还放开很多暂时用不上的权限,另外未知的攻击方式也一定有,比如大量日志产生的系统调用可能会拖垮机器造成服务不可用,所以要么还是尽快改配置换 log4j 包重启集群,或者走网关来过滤阻断请求吧。
使用极限网关处置类似安全事件的好处是,Elasticsearch 服务器不用做任何变动,尤其是大规模集群的场景,可以节省大量的工作,提升效率,非常灵活,缩短安全处置的时间,降低企业风险。
极限网关入门视频教程已发布
Elasticsearch • medcl 发表了文章 • 4 个评论 • 1687 次浏览 • 2021-11-21 11:41
使用极限网关来进行 Elasticsearch 跨集群跨版本查询及所有其它请求
Elasticsearch • medcl 发表了文章 • 7 个评论 • 3720 次浏览 • 2021-10-16 11:31
使用场景
如果你的业务需要用到有多个集群,并且版本还不一样,是不是管理起来很麻烦,如果能够通过一个 API 来进行查询就方便了,聪明的你相信已经想到了 CCS,没错用 CCS 可以实现跨集群的查询,不过 Elasticsearch 提供的 CCS 对版本有一点的限制,并且需要提前做好 mTLS,也就是需要提前配置好两个集群之间的证书互信,这个免不了要重启维护,好像有点麻烦,那么问题来咯,有没有更好的方案呢?
😁 有办法,今天我就给大家介绍一个基于极限网关的方案,极限网关的网址:http://极限网关.com/。
假设现在有两个集群,一个集群是 v2.4.6,有不少业务数据,舍不得删,里面有很多好东西 :)还有一个集群是 v7.14.0,版本还算比较新,业务正在做的一个新的试点,没什么好东西,但是也得用 :(,现在老板们的的需求是希望通过在一个统一的接口就能访问这些数据,程序员懒得很,懂得都懂。
集群信息
- v2.4.6 集群的访问入口地址:192.168.3.188:9202
- v7.14.0 集群的访问入口地址:192.168.3.188:9206
这两个集群都是 http 协议的。
实现步骤
今天用到的是极限网关的 switch 过滤器:https://极限网关.com/docs/references/filters/switch/
网关下载下来就两个文件,一个主程序,一个配置文件,记得下载对应操作系统的包。
定义两个集群资源
elasticsearch:
- name: v2
enabled: true
endpoint: http://192.168.3.188:9202
- name: v7
enabled: true
endpoint: http://192.168.3.188:9206
上面定义了两个集群,分别命名为 v2
和 v7
,待会会用到这些资源。
定义一个服务入口
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 1000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
这里定义了一个名为 my_es_entry
的资源入口,并引用了一个名为 my_router
的请求转发路由,同时绑定了网卡的 0.0.0.0:8000
也就是所有本地网卡监听 IP 的 8000
端口,访问任意 IP 的 8000
端口就能访问到这个网关了。
另外老板也说了,Elasticsearch 用 HTTP 协议简直就是裸奔,通过这里开启 tls
,可以让网关对外提供的是 HTTPS 协议,这样用户连接的 Elasticsearch 服务就自带 buffer 了,后端的 es 集群和网关直接可以做好网络流量隔离,集群不用动,简直完美。
为什么定义 TLS 不用指定证书,好用的软件不需要这么麻烦,就这样,不解释。
最后,通过设置 max_concurrency
为 1000,限制下并发数,避免野猴子把我们的后端的 Elasticsearch 给压挂了。
定义一个请求路由
router:
- name: my_router
default_flow: cross-cluster-search
这里的名称 my_router
就是表示上面的服务入口的router
参数指定的值。
另外设置一个 default_flow
来将所有的请求都转发给一个名为 cross-cluster-search
的请求处理流程,还没定义,别急,马上。
定义请求处理流程
来啦,来啦,先定义两个 flow,如下,分别名为 v2-flow
和 v7-flow
,每节配置的 filter
定义了一系列过滤器,用来对请求进行处理,这里就用了一个 elasticsearch
过滤器,也就是转发请求给指定的 Elasticsearch 后端服务器,了否?
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
然后,在定义额外一个名为 cross-cluster-search
的 flow,如下:
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
这个 flow 就是通过请求的路径的前缀来进行路由的过滤器,如果是 v2:
开头的请求,则转发给 v2-flow
继续处理,如果是 v7:
开头的请求,则转发给 v7-flow
来处理,使用的用法和 CCS 是一样的。so easy!
对了,那是不是每个请求都需要加前缀啊,费事啊,没事,在这个 cross-cluster-search
的 filter 最后再加上一个 elasticsearch
filter,前面前缀匹配不上的都会走它,假设默认都走 v7
,最后完整的 flow 配置如下:
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
- elasticsearch:
elasticsearch: v7
然后就没有然后了,因为就配置这些就行了。
启动网关
假设配置文件的路径为 sample-configs/cross-cluster-search.yml
,运行如下命令:
➜ gateway git:(master) ✗ ./bin/gateway -config sample-configs/cross-cluster-search.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.0.0_SNAPSHOT, 2021-10-15 16:25:56, 3d0a1cd
[10-16 11:00:52] [INF] [app.go:228] initializing gateway.
[10-16 11:00:52] [INF] [instance.go:24] workspace: data/gateway/nodes/0
[10-16 11:00:52] [INF] [api.go:260] api listen at: http://0.0.0.0:2900
[10-16 11:00:52] [INF] [reverseproxy.go:257] elasticsearch [v7] hosts: [] => [192.168.3.188:9206]
[10-16 11:00:52] [INF] [entry.go:225] auto generating cert files
[10-16 11:00:52] [INF] [actions.go:223] elasticsearch [v2] is available
[10-16 11:00:52] [INF] [actions.go:223] elasticsearch [v7] is available
[10-16 11:00:53] [INF] [entry.go:296] entry [my_es_entry] listen at: https://0.0.0.0:8000
[10-16 11:00:53] [INF] [app.go:309] gateway is running now.
可以看到网关输出了启动成功的日志,网关服务监听在 https://0.0.0.0:8000
。
试试访问网关
直接访问网关的 8000 端口,因为是网关自签的证书,加上 -k 来跳过证书的校验,如下:
➜ loadgen git:(master) ✗ curl -k https://localhost:8000
{
"name" : "LENOVO",
"cluster_name" : "es-v7140",
"cluster_uuid" : "npWjpIZmS8iP_p3GK01-xg",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
正如前面配置所配置的一样,默认请求访问的就是 v7 集群。
访问 v2 集群
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:/
{
"name" : "Solomon O'Sullivan",
"cluster_name" : "es-v246",
"cluster_uuid" : "cqlpjByvQVWDAv6VvRwPAw",
"version" : {
"number" : "2.4.6",
"build_hash" : "5376dca9f70f3abef96a77f4bb22720ace8240fd",
"build_timestamp" : "2017-07-18T12:17:44Z",
"build_snapshot" : false,
"lucene_version" : "5.5.4"
},
"tagline" : "You Know, for Search"
}
查看集群信息:
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:_cluster/health\?pretty
{
"cluster_name" : "es-v246",
"status" : "yellow",
"timed_out" : false,
"number_of_nodes" : 1,
"number_of_data_nodes" : 1,
"active_primary_shards" : 5,
"active_shards" : 5,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 5,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 50.0
}
插入一条文档:
➜ loadgen git:(master) ✗ curl-json -k https://localhost:8000/v2:medcl/doc/1 -d '{"name":"hello world"}'
{"_index":"medcl","_type":"doc","_id":"1","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}%
执行一个查询
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v2:medcl/_search\?q\=name:hello
{"took":78,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":0.19178301,"hits":[{"_index":"medcl","_type":"doc","_id":"1","_score":0.19178301,"_source":{"name":"hello world"}}]}}%
可以看到,所有的请求,不管是集群的操作,还是索引的增删改查都可以,而 Elasticsearch 自带的 CCS 是只读的,只能进行查询。
访问 v7 集群
➜ loadgen git:(master) ✗ curl -k https://localhost:8000/v7:/
{
"name" : "LENOVO",
"cluster_name" : "es-v7140",
"cluster_uuid" : "npWjpIZmS8iP_p3GK01-xg",
"version" : {
"number" : "7.14.0",
"build_flavor" : "default",
"build_type" : "zip",
"build_hash" : "dd5a0a2acaa2045ff9624f3729fc8a6f40835aa1",
"build_date" : "2021-07-29T20:49:32.864135063Z",
"build_snapshot" : false,
"lucene_version" : "8.9.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}
Kibana 里面访问
完全没问题,有图有真相:
其他操作也类似,就不重复了。
完整的配置
path.data: data
path.logs: log
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
flow:
- name: v2-flow
filter:
- elasticsearch:
elasticsearch: v2
- name: v7-flow
filter:
- elasticsearch:
elasticsearch: v7
- name: cross-cluster-search
filter:
- switch:
path_rules:
- prefix: "v2:"
flow: v2-flow
- prefix: "v7:"
flow: v7-flow
- elasticsearch:
elasticsearch: v7
router:
- name: my_router
default_flow: cross-cluster-search
elasticsearch:
- name: v2
enabled: true
endpoint: http://192.168.3.188:9202
- name: v7
enabled: true
endpoint: http://192.168.3.188:9206
小结
好了,今天给大家分享的如何使用极限网关来进行 Elasticsearch 跨集群跨版本的操作就到这里了,希望大家周末玩的开心。😁
INFINI Gateway 的使用方法和使用心得分享
Elasticsearch • tianqi 发表了文章 • 7 个评论 • 5074 次浏览 • 2020-12-19 16:46
[program:gateway]
command = /app/logger/gateway/gateway-linux64
username = appuser
autostart=false
autorestart=true
startsecs=3
priority=1007
stdout_logfile=/app/logger/gateway/log/infini_gateway.log
注意这里由于我的测试服务器kibana、gateway和es节点都部署在一起,统一都是用supervisord进行纳管,由于网关必须等到es节点启动以后才可以启动成功,所以将gateway 的autostart设置为false。另外网关节点由于流量、资源不足等问题会有一定风险自己挂掉(测试过程中就遇到过),所以推荐将autorestart设置为true,当网关节点意外挂掉以后能够马上重启不影响应用使用。
配置文件
这块可能是我比较重点想分享的地方,因为可能由于目前项目刚刚发布,medcl大神一直忙于功能开发无暇顾及使用说明的介绍(我妄自揣测,请medcl大神不要见怪^_^),所以配置文件目前在安装介质中只有一个模板,外加模板中的一些参数注释的介绍,可能对于新手进行配置还是有比较大的难度(比如我),所以在自己摸索外加medcl大神的指导下我初步成功配置并实现了预期效果,现在分享给大家。
Path模块:path.data: data
path.logs: log
这块没什么好说的,就是这是data和log的相对路径,一般也不要改
entry模块:entry:
- name: es_gateway #your gateway endpoint
enable: true
router: default #configure your gateway's routing flow
network:
binding: 0.0.0.0:8000
reuse_port: true #you can start multi gateway instance, they share same port, to full utilize system's resources
tls:
enabled: false #if your es is using https, the gateway entrypoint should enable https too
这里实际上就是定义gateway的总体模块,指定了gateway网关名字,绑定的地址端口和网关是否启用的开关,这个开关就是指enable参数,我发现后面的版本有的模板文件中是不带enable这个参数的,这里其实有问题,因为测试时发现如果不设置enable为true的话默认值时false,也就是说网关不生效,为了当时排查这个问题我花了不少时间。这里划重点哈,entry模块里定义最重要的参数就是router,也就是网关gateway的路由策略。gateway的配置文件包含关系是这样的,gateway的entry指定了router,router中指定了tracing_flow和default_flow以及默认flow选择策略,tracing_flow是指定网关自己的流,也就是网关监控的流的处理逻辑,和业务查询和写入是无关的,default_flow实际上是真实网关的业务流,这个流一定是要走cache的。然后flow里面才会包含filter,filter顾名思义就是网关流的过滤筛选条件,filter里会有cache逻辑、rate限流条件、filter条件、elasticsearch属性。因为网关的监控数据要单独向es写入,需要设置一些写入属性和参数,所以需要module模块去指定模板、运行态参数和pipeline,然后再pipeline模块中指定写入es属性,包括es地址、索引名、队列名、worksize、bulksize等等,另外网关自己的一些队列参数设置在queue模块中,网关监控信息参数设置在statsd模块中。
flow模块:- name: cache_first
filter: #comment out any filter sections, like you don't need cache or rate-limiter
- name: get_cache_1
type: get_cache
- name: rate_limit_1
type: rate_limit
parameters:
message: "Hey, You just reached our request limit!"
rules: #configure match rules against request's PATH, eg: /_cluster/health, match the first rule and return
- pattern: "/(?P<index_name>test.*?)/_search" #use regex pattern to match index, will match any /$index/_search, and limit each index with max_qps ~=100
max_qps: 1000
group: index_name
- name: elasticsearch_1
type: elasticsearch
parameters:
elasticsearch: default #elasticsearch configure reference name
max_connection: 1000 #max tcp connection to upstream, default for all nodes
max_response_size: -1 #default for all nodes
balancer: weight
- host: 192.168.3.201:9200 #the format is host:port
weight: 100
- host: 192.168.3.202:9200
weight: 100
discovery:
enabled: false
- name: set_cache_1
type: set_cache
parameters:
cache_ttl: 1000s
# max_cache_items: 100000
- name: request_logging # this flow is used for request logging, refer to `router`'s `tracing_flow`
filter:
- name: request_path_filter_1
type: request_path_filter
parameters:
must: #must match all rules to continue
prefix:
- /test
- name: request_logging_1
type: request_logging
parameters:
queue_name: request_logging
如上,flow模块就是定义了两个flow,一个是cache_first,一个是request_logging,cache_first是承接业务流,所以在filter中一定要设置get_cache和set_cache强制走缓存策略,这里注意set_cache中要有缓存失效时间cache_ttl,默认是10s,这个我根据我的业务需求直接增加到了1000s,因为我想将缓存多保留一段时间,max_cache_items我直接注释掉了,不限制缓存数量的大小,如果一定要设置这个值要注意一下这个值针对的是每一个查询语句的而不是总共的。另外在cache_first中可以设置限流策略,/(?P<index_name>test.*?)表示test开头的索引全部应用限流策略。这里还有一个关键点是指定elasticsearch地址时可以配置连接权重,这个参数对我还是蛮有用的,因为我想我的应用查询通过网关只从coor节点进入而不是datanode和masternode,做到读写节点分离同时降低大查询灾难蔓延扩散的风险。所以我会将es集群中两个coor节点配置到这里,weight尽量设置的大一点,这里要注意如果这样配置必须把discovery的enable设置为false,否则网关还是会从master或者data节点进入。
request_logging的流其实就是网关trace监控自己用的,由于后面进行了filter强匹配,所以采样sample参数我就没有配置。注意这里其实我们在cache_first中并没有配置filter规则,所以这里任何过网关的查询都会进入缓存,包括查询tracing_flow自己创建的索引。在request_logging尽量按照自己的需求去配置一些filter规则来减少监控写入的索引,如果不配置,写入的量级会非常多,后续用仪表盘进行监控时响应会非常慢。目前gateway网关支持的filter类型非常全面,包括request_path_filter根据索引名或者路径去筛选,request_header_filter根据请求头部信息去筛选索引,request_method_filter根据请求的方法类型去筛选,注意如果使用request_header_filter根据请求头部信息去筛选索引,需要在应用在请求的头里加入特殊标识,比如如果要通过request_header_filter方式把所有kibana的请求全部过滤掉,就需要在kibana配置文件中增加头部参数并自定义值:elasticsearch.customHeaders: { "app": "kibana" }
然后在request_header_filter中加过滤条件:- name: request_header_filter1 # filter out the requests that we are not interested, reduce tracing pressure
type: request_header_filter
parameters:
exclude: # any rule match will marked request as filtered
- app: kibana # in order to filter kibana's access log, config `elasticsearch.customHeaders: { "app": "kibana" }` to your kibana's config `/config/kibana.yml`
我目前的需求是只想监控某些固定索引前缀的请求,所以我只配置了request_path_filter并must强配置了比如test索引前缀的索引,这样已经满足我得需求并最大限度的减少了监控请求数量。
这里还有一个点,就是在filter类型中还有个特殊的类型,叫做request_logging,这个是专门针对tracing_flow设计的,其中有一个重要参数是queue_name,他会在gateway网关所在的服务器磁盘上创建一个队列用来加速写入,减少tracing_flow对网关所造成的性能影响,所以这个队列在这里创建之后会在后面的pipelines中去指定写入磁盘队列。
router模块:router:
- name: default
tracing_flow: request_logging #a flow will execute after request finish
default_flow: cache_first
rules: #rules can't be conflicted with each other, will be improved in the future
- id: 1 # this rule means match every requests, and sent to `cache_first` flow
method:
- "*"
pattern:
- /
# priority: 1
flow:
- cache_first # after match, which processing flow will go through
router模块最重要的就是指定了tracing_flow和default_flow,这里可以定义路由规则,按照默认全部匹配就走缓存可以,同时最后在flow参数指定默认要走的flow流,也就是cache_first业务请求流。
elasticsearch模块:elasticsearch:
- name: default
enabled: true
endpoint: http://localhost:9200 # if your elasticsearch is using https, your gateway should be listen on as https as well
version: 7.9.1 #optional, used to select es adaptor, can be done automatically after connect to es
index_prefix: gateway_
basic_auth: #used to discovery full cluster nodes, or check elasticsearch's health and versions
username: elastic
password: pass
这里需要指定es地址和端口,可以配置多个,也就是说可以将业务的es和监控索引存储的es进行分离,这里有个参数index_prefix,我的理解是这个会在kibana中去创建这个索引模式,用来后续进行仪表盘监控,但是真实测试结果是这个设置没有生效,索引模式前缀都是gateway_requests,这个可能需要后续和Medcl大神再确认一下。
modules模块:modules:
- name: elastic
enabled: true
elasticsearch: default
init_template: true
- name: pipeline
enabled: true
runners:
- name: primary
enabled: true
max_go_routine: 1
threshold_in_ms: 0
timeout_in_ms: 5000
pipeline_id: request_logging_index
modules需要配置两个模块,一个是es模块,一个是pipeline模块,主要是为
request_logging要往es中写数做准备,es模块使用默认模板,pipeline设置一些写入需要的参数,我全部没改使用默认值。
pipelines模块:pipelines:
- name: request_logging_index
start:
joint: json_indexing
enabled: true
parameters:
index_name: "gateway_requests"
elasticsearch: "default"
input_queue: "request_logging"
timeout: "60s"
worker_size: 1
bulk_size_in_mb: 10 #in MB
process:
因为要向es中写入trace数据,所以需要在pipeline中配置写入参数,包括json方式传输,写入的es(如果es有多个需要拆分),使用的磁盘队列queue,超时时间,写入worker数以及bulksize,这个我觉得后续可以根据监控的量级进行优化。
queue模块:queue:
min_msg_size: 1
max_msg_size: 50000000
max_bytes_per_file: 53687091200
sync_every_records: 100000 # sync by records count
sync_timeout_in_ms: 10000 # sync by time in million seconds
read_chan_buffer: 0
这个就是那个trace数据本次磁盘队列的一些写入参数,包括消息大小,磁盘文件大小,同步消息数,read buffer等等,如果像我目前的应用场景已经严格匹配索引的量级有限可以不用改,但是我觉得默认max_bytes_per_file默认值是50G,这个我觉得单个文件有点大,生产环境可以考虑调小。
statsd模块:enabled: false
host: 127.0.0.1
port: 8125
namespace: gateway.
这个模块目前我没用到,应该是本身监控gateway状态的模块,指定了监控端口,后续可以进行验证。
使用对比结果:
通过gateway中自带仪表盘可以对使用情况和效果进行方便的监控,通过比对可以发现一条复用的查询语句走网关缓存和不走网关缓存性能差距在100倍以上,使用网关进行查询优势巨大特别明显。
未来展望:
首先感谢Medcl大神把这么好的东西发布出来,使用网关对于业务查询有百倍上的性能提升确实十分诱人。而且目前虽然INFINI Gateway刚刚发布出来,但是已经陆陆续续迭代了好几个版本,可以明显看到一些bug修复、功能增强和性能优化提高,目前最新的版本又增加了请求灰度切换、流量迁移和流量复制功能,可以实现双写和多写,这个功能对于我来说后续很可能会应用到,因为实际上是一种变相的多集群数据同步方式的实现。所以可以看出INFINI Gateway网关不仅仅定位于查询缓存网关,而是集查询和写入功能为一体的综合性大网关,期待Medcl大神后续的精彩表演,让我们拭目以待。 极限网关 INFINI Gateway 初体验
Elasticsearch • liaosy 发表了文章 • 3 个评论 • 6841 次浏览 • 2020-12-09 00:57
#切换该路径下(路径自定)
cd /Users/shiyang/code/elastic/gateway
#下载
wget https://github.com/medcl/infin ... ar.gz
#下载完后解压
tar -zxvf GATEWAY-darwin64.tar.gz
#解压后能看到两个新文件,一个可执行二进制文件,一个yml配置文件
ls
#gateway-darwin64 gateway.yml
安装部署
在run之前需要先运行elastisearch,否则会报错,如图所示:
接下来先启动es集群(如果你本地还没有部署es,建议先参考官网的es安装教程下载部署)
本机用的es版本为7.9.0,如下图表示启动es成功:
接下来再启动gateway,yml配置文件可以先默认,后续可根据需要再修改。#启动
./gateway-darwin64
启动成功如下图所示:
成功启动后,我们就可以直接访问gateway了。curl http://0.0.0.0:8000
到此,gateway就算本地部署完毕了。
是不是很简单?嗯,下载即使用,简单方便。
(接下来可以试用一下gateway的特性了。将发布在下一篇文章。)