
ES数据没了?谁动了我的数据?
背景
我们在使用 Elasticsearch 的时候,可能会遇到数据“丢”了的情况。有可能是数据没成功写入 ES 集群,也可能是数据被误删了。
针对数据被误删,有没有好的解决办法呢?
其实我们可以把“删除数据”这个操作管理起来。当 ES 集群接收到删除数据命令的时候,先不执行该命令,而是生成一条删除数据的记录,经过管理人员批准后,该命令才会执行。这样不仅可以管理数据的删除,还可以进行删除操作的追踪:什么人,什么时间,发送了什么样的删除指令,从哪个 IP 发送的,以什么身份登录的等等。
要实现这个解决办法,我们可借助 INFINI Gateway 和 Console 的帮助。
方案架构
方案效果
- INFINI Gateway 作为 ES 集群的代理,接收所有请求
- INFINI Gateway 对删除数据操作进行拦截,在 Console UI 界面生成记录
- 管理人员 在 Console UI 界面审批操作记录,审批通过操作被执行
方案演示
测试数据准备
测试索引 test1,一共有 3 条数据。message 内容分别是"line 1","line 2"和"line 3"。
启动 INFINI Gateway 及 Console
网关配置新增内容
增加对 DELETE 操作的捕获,不直接执行,写入队列中。后续由队列生成特定的记录。
router:
- name: my_router
default_flow: default_flow
tracing_flow: logging_flow
rules:
- method:
- "DELETE"
pattern:
- "/{any_index}"
- "/{any_index}/{any_type}"
- "/{any_index}/{any_type}/{any_docid}"
flow:
- audit_flow
- method:
- "*"
pattern:
- "/{any_index}/_delete_by_query"
- "/_delete_by_query"
flow:
- audit_flow
flow:
- name: audit_flow
filter:
- logging:
queue_name: del_queue
pipeline:
- name: del_queue_ingest
auto_start: true
keep_running: true
processor:
- json_indexing:
input_queue: "del_queue"
idle_timeout_in_seconds: 1
elasticsearch: "logging-server"
index_name: "del_requests"
worker_size: 1
bulk_size_in_kb: 1
执行删除操作
ES 支持多种删除操作,简单总结归纳如下:
- 删除指定文档 id
- 删除索引
- 根据查询删除指定数据(_delete_by_query)
执行删除操作之前,先通过 INFINI Gateway 访问 ES 集群,证明可正常访问数据。
执行上述的几种删除命令,注意要发给 INFINI Gateway 的 8000 端口。
数据查询验证数据还在
Console 界面查看未批准的删除记录
所有删除操作,都被记录,待审批
Console 界面进行审批通过
选择一条记录,批准执行。Operation-approve
数据查询验证数据
"message": "line 2"的文档已被删除。
Console 界面查看历史记录
继续批准测试
批准删除一条文档
"message": "line 1" 的文档不在了。
批准删除索引
索引不在了。
至此我们演示了如何利用 INFINI Gateway 和 Console 对 ES 集群删除操作进行管控,本文只是抛砖引玉,相信还有更多有意思的场景等待大家发掘。
背景
我们在使用 Elasticsearch 的时候,可能会遇到数据“丢”了的情况。有可能是数据没成功写入 ES 集群,也可能是数据被误删了。
针对数据被误删,有没有好的解决办法呢?
其实我们可以把“删除数据”这个操作管理起来。当 ES 集群接收到删除数据命令的时候,先不执行该命令,而是生成一条删除数据的记录,经过管理人员批准后,该命令才会执行。这样不仅可以管理数据的删除,还可以进行删除操作的追踪:什么人,什么时间,发送了什么样的删除指令,从哪个 IP 发送的,以什么身份登录的等等。
要实现这个解决办法,我们可借助 INFINI Gateway 和 Console 的帮助。
方案架构
方案效果
- INFINI Gateway 作为 ES 集群的代理,接收所有请求
- INFINI Gateway 对删除数据操作进行拦截,在 Console UI 界面生成记录
- 管理人员 在 Console UI 界面审批操作记录,审批通过操作被执行
方案演示
测试数据准备
测试索引 test1,一共有 3 条数据。message 内容分别是"line 1","line 2"和"line 3"。
启动 INFINI Gateway 及 Console
网关配置新增内容
增加对 DELETE 操作的捕获,不直接执行,写入队列中。后续由队列生成特定的记录。
router:
- name: my_router
default_flow: default_flow
tracing_flow: logging_flow
rules:
- method:
- "DELETE"
pattern:
- "/{any_index}"
- "/{any_index}/{any_type}"
- "/{any_index}/{any_type}/{any_docid}"
flow:
- audit_flow
- method:
- "*"
pattern:
- "/{any_index}/_delete_by_query"
- "/_delete_by_query"
flow:
- audit_flow
flow:
- name: audit_flow
filter:
- logging:
queue_name: del_queue
pipeline:
- name: del_queue_ingest
auto_start: true
keep_running: true
processor:
- json_indexing:
input_queue: "del_queue"
idle_timeout_in_seconds: 1
elasticsearch: "logging-server"
index_name: "del_requests"
worker_size: 1
bulk_size_in_kb: 1
执行删除操作
ES 支持多种删除操作,简单总结归纳如下:
- 删除指定文档 id
- 删除索引
- 根据查询删除指定数据(_delete_by_query)
执行删除操作之前,先通过 INFINI Gateway 访问 ES 集群,证明可正常访问数据。
执行上述的几种删除命令,注意要发给 INFINI Gateway 的 8000 端口。
数据查询验证数据还在
Console 界面查看未批准的删除记录
所有删除操作,都被记录,待审批
Console 界面进行审批通过
选择一条记录,批准执行。Operation-approve
数据查询验证数据
"message": "line 2"的文档已被删除。
Console 界面查看历史记录
继续批准测试
批准删除一条文档
"message": "line 1" 的文档不在了。
批准删除索引
索引不在了。
至此我们演示了如何利用 INFINI Gateway 和 Console 对 ES 集群删除操作进行管控,本文只是抛砖引玉,相信还有更多有意思的场景等待大家发掘。
收起阅读 »
Elasticsearch:如何在 Elastic 中实现图片相似度搜索
原文: Elasticsearch:如何在 Elastic 中实现图片相似度搜索
在本文章,我们将了解如何通过几个步骤在 Elastic 中实施相似图像搜索。 开始设置应用程序环境,然后导入 NLP 模型,最后完成为你的图像集生成嵌入。
Elasticsearch:如何在 Elastic 中实现图片相似度搜索
如何设置环境
第一步是为你的应用程序设置环境。 一般要求包括:
- Git
- Python 3.9
- Docker
- 数百张图片
使用数百张图像以确保获得最佳效果非常重要。
转到工作文件夹并检查创建的存储库代码。 然后导航到存储库文件夹。
1. git clone https://github.com/radoondas/flask-elastic-image-search.git
2. cd flask-elastic-image-search
1. $ git clone https://github.com/radoondas/flask-elastic-image-search.git
2. Cloning into 'flask-elastic-image-search'...
3. remote: Enumerating objects: 105, done.
4. remote: Counting objects: 100% (105/105), done.
5. remote: Compressing objects: 100% (72/72), done.
6. remote: Total 105 (delta 37), reused 94 (delta 27), pack-reused 0
7. Receiving objects: 100% (105/105), 20.72 MiB | 9.75 MiB/s, done.
8. Resolving deltas: 100% (37/37), done.
9. $ cd flask-elastic-image-search/
10. $ pwd
11. /Users/liuxg/python/flask-elastic-image-search
因为你将使用 Python 来运行代码,所以你需要确保满足所有要求并且环境已准备就绪。 现在创建虚拟环境并安装所有依赖项。
1. python3 -m venv .venv
2. source .venv/bin/activate
3. pip install -r requirements.txt
安装
如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的文章来进行安装:
特别注意的是:我们将以最新的 Elastic Stack 8.6.1 来进行展示。请参考 Elastic Stack 8.x 的文章进行安装。
启动白金版试用功能
由于上传模型是一个白金版的功能,我们需要启动试用功能。更多关于订阅的信息,请参考网址:订阅 | Elastic Stack 产品和支持 | Elastic。
这样我们就成功地启动了白金版试用功能。
Elasticsearch 集群和嵌入模型
登录到你的帐户以启动 Elasticsearch 集群。 设置一个小型集群:
- 一个具有 2GB 内存的 HOT 节点
- 一个具有 4GB 内存的 ML(机器学习)节点(此节点的大小很重要,因为你将导入 Elasticsearch 的 NLP 模型会消耗约 1.5GB 的内存。)
部署准备就绪后,转到 Kibana 并检查机器学习节点的容量。 你将在视图中看到一个机器学习节点。 目前没有加载模型。
使用 Eland 库从 OpenAI 上传 CLIP 嵌入模型。 Eland 是一个 Python Elasticsearch 客户端,用于在 Elasticsearch 中探索和分析数据,能够处理文本和图像。 您将使用此模型从文本输入生成嵌入并查询匹配图像。 在 Eland 库的文档中找到更多详细信息。
对于下一步,你将需要 Elasticsearch 端点。 你可以从部署详细信息部分的 Elasticsearch 云控制台获取它。
在本示例中,我们将使用本地部署来进行展示,所以,我们并不必要完成上面的步骤。
Eland
Eland 可以通过 pip 从 PyPI 安装。在安装之前,我们需要安装好自己的 Python。
1. $ python --version
2. Python 3.10.2
可以使用 Pip 从 PyPI 安装 Eland:
python -m pip install eland
也可以使用 Conda 从 Conda Forge 安装 Eland:
conda install -c conda-forge eland
希望在不安装 Eland 的情况下使用它的用户,为了只运行可用的脚本,可以构建 Docker 容器:
1. git clone https://github.com/elastic/eland
2. cd eland
3. docker build -t elastic/eland .
Eland 将 Hugging Face 转换器模型到其 TorchScript 表示的转换和分块过程封装在一个 Python 方法中; 因此,这是推荐的导入方法。
- 安装 Eland Python 客户端。
- 运行 eland_import_hub_model 脚本。 例如:
1. eland_import_hub_model --url <clusterUrl> \
2. --hub-model-id elastic/distilbert-base-cased-finetuned-conll03-english \
3. --task-type ner
- 指定 URL 以访问你的集群。 例如,https://user>:
@ : 。 - 在 Hugging Face 模型中心中指定模型的标识符。
- 指定 NLP 任务的类型。 支持的值为 fill_mask、ner、text_classification、text_embedding, question_answering 和 zero_shot_classification。
上传模型
我们使用如下的命令来进行上传模型:
1. eland_import_hub_model --url https://<user>:<password>@<hostname>:<port> \
2. --hub-model-id sentence-transformers/clip-ViT-B-32-multilingual-v1 \
3. --task-type text_embedding \
4. --ca-certs <your certificate> \
5. --start
针对我的情况:
1. eland_import_hub_model --url https://elastic:ZgzSt2vHNwA6yPn-fllr@localhost:9200 \
2. --hub-model-id sentence-transformers/clip-ViT-B-32-multilingual-v1 \
3. --task-type text_embedding \
4. --ca-certs /Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt \
5. --start
请注意: 你需要根据自己的 Elasticsearch 访问端点,用户名及密码来修改上面的设置,同时你需要根据自己的配置修改上面的证书路径。
运行上面的命令:
上面显示,我们已经成功地上传了模型。我们可以到 Kibana 中进行查看:
上面显示我们已经上传了所需要的 CLIP 模型,并且它的状态是 started。
如何创建图像嵌入
在设置 Elasticsearch 集群并导入嵌入模型后,你需要矢量化图像数据并为数据集中的每个图像创建图像嵌入。
要创建图像嵌入,请使用简单的 Python 脚本。 你可以在此处找到该脚本:create-image-embeddings.py。 该脚本将遍历你的图像目录并生成单独的图像嵌入。 它将使用名称和相对路径创建文档,并使用提供的映射将其保存到 Elasticsearch 索引 my-image-embeddings 中。
将所有图像(照片)放入文件夹 app/static/images。 使用带有子文件夹的目录结构来组织图像。 所有图像准备就绪后,使用几个参数执行脚本。
至少要有几百张图像才能获得合理的结果,这一点至关重要。 图像太少不会产生预期的结果,因为你要搜索的空间非常小,而且到搜索向量的距离也非常相似。我尝试在网上下载很多的照片,但是感觉一张一张地下载非常麻烦。你可以在谷歌浏览器中添加插件 Image downloader - Imageye。它可以方便地把很多照片一次下载下来。
在 image_embeddings 文件夹中,运行脚本并为变量使用你的值。
1. cd image_embeddings
2. python3 create-image-embeddings.py \
3. --es_host='https://localhost:9200' \
4. --es_user='elastic' --es_password='ZgzSt2vHNwA6yPn-fllr' \
5. --ca_certs='/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt'
根据图像的数量、它们的大小、你的 CPU 和你的网络连接,此任务将需要一些时间。 在尝试处理完整数据集之前,先试验少量图像。脚本完成后,你可以使用 Kibana 开发工具验证索引 my-image-embeddings 是否存在并具有相应的文档。
我们在Kibana 中进行查看:
GET _cat/indices/my-image-embeddings?v
上面命令的响应为:
1. health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
2. yellow open my-image-embeddings h6oUBdHCScWmXOZaf57oWg 1 1 145 0 1.4mb 1.4mb
查看文档,你会看到非常相似的 JSON 对象(如示例)。 你将在图像文件夹中看到图像名称、图像 ID 和相对路径。 此路径用于前端应用程序以在搜索时正确显示图像。JSON 文档中最重要的部分是包含 CLIP 模型生成的密集矢量的 image_embedding。 当应用程序正在搜索图像或类似图像时使用此矢量。
GET my-image-embeddings/_search
1. {
2. "_index": "my-image-embeddings",
3. "_id": "_g9ACIUBMEjlQge4tztV",
4. "_score": 6.703597,
5. "_source": {
6. "image_id": "IMG_4032",
7. "image_name": "IMG_4032.jpeg",
8. "image_embedding": [
9. -0.3415695130825043,
10. 0.1906963288784027,
11. .....
12. -0.10289803147315979,
13. -0.15871885418891907
14. ],
15. "relative_path": "phone/IMG_4032.jpeg"
16. }
17. }
使用 Flask 应用程序搜索图像
现在你的环境已全部设置完毕,你可以进行下一步,使用我们作为概念证明提供的 Flask 应用程序,使用自然语言实际搜索图像并查找相似图像。 该 Web 应用程序具有简单的 UI,使图像搜索变得简单。 你可以在此 GitHub 存储库中访问原型 Flask 应用程序。
后台应用程序执行两个任务。 在搜索框中输入搜索字符串后,文本将使用机器学习 _infer 端点进行矢量化。 然后,针对带有向量的索引 my-image-embeddings 执行带有密集向量的查询。
你可以在示例中看到这两个查询。 第一个 API 调用使用 _infer 端点,结果是一个密集矢量。
1. POST _ml/trained_models/sentence-transformers__clip-vit-b-32-multilingual-v1/_infer
2. {
3. "docs" : [
4. {"text_field": "Yellow mountain is the most beautiful mountain in China"}
5. ]
6. }
上面的响应如下:
在第二个任务中,搜索查询,我们将使用密集矢量并获得按分数排序的图像。
`
1. GET my-image-embeddings/_search
2. {
3. "fields": [
4. "image_id",
5. "image_name",
6. "relative_path"
7. ],
8. "_source": false,
9. "knn": {
10. "field": "image_embedding",
11. "k": 5,
12. "num_candidates": 10,
13. "query_vector": [
14. 0.03395160660147667,
15. 0.007704082876443863,
16. 0.14996188879013062,
17. -0.10693030804395676,
18. ...
19. 0.05140634626150131,
20. 0.07114913314580917
21. ]
22. }
23. }
`
要启动并运行 Flask 应用程序,请导航到存储库的根文件夹并配置 .env 文件。 配置文件中的值用于连接到 Elasticsearch 集群。 你需要为以下变量插入值。 这些与图像嵌入生成中使用的值相同。
.env
1. ES_HOST='URL:PORT'
2. ES_USER='elastic'
3. ES_PWD='password'
为了能够使得我们自构建的 Elasticsearch 集群能够被正确地访问,我们必须把 Elasticsearch 的根证书拷贝到 Flask 应用的相应目录中:
flask-elastic-image-search/app/conf/ca.crt
1. (.venv) $ pwd
2. /Users/liuxg/python/flask-elastic-image-search/app/conf
3. (.venv) $ cp ~/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt ca.crt
4. overwrite ca.crt? (y/n [n]) y
在上面,我们替换了仓库中原有的证书文件 ca.crt。
准备就绪后,运行主文件夹中的 flask 应用程序并等待它启动。
1. # In the main directory
2. $ flask run --port=5001
如果应用程序启动,你将看到类似于下面的输出,它在末尾指示你需要访问哪个 URL 才能访问该应用程序。
恭喜! 你的应用程序现在应该已启动并正在运行,并且可以通过互联网浏览器在 http://127.0.0.1:5001 上访问。
导航到图像搜索选项卡并输入描述你最佳图像的文本。 尝试使用非关键字或描述性文字。
在下面的示例中,输入的文本是 “Yellow mountain is the most beautiful mountain in China”。 结果显示在我们的数据集中。 如果用户喜欢结果集中的一张特定图像,只需单击它旁边的按钮,就会显示类似的图像。 用户可以无限次地这样做,并通过图像数据集构建自己的路径。
我们尝试另外的一个例子。这次我们输入:I love beautiful girls。
搜索也可以通过简单地上传图像来进行。 该应用程序会将图像转换为矢量并在数据集中搜索相似的图像。 为此,导航到第三个选项卡 “Similar Image”,从磁盘上传图像,然后点击 “Search”。
我们可以看到相似的图片。我们尝试使用一个女孩的照片再试试:
因为我们在 Elasticsearch 中使用的 NLP(sentence-transformers/clip-ViT-B-32-multilingual-v1)模型是多语言的,支持多语言推理,所以尽量搜索自己语言的图片。 然后也使用英文文本验证结果。我们尝试使用 “黄山是中国最漂亮的山”:
请务必注意,使用的模型是通用模型,这些模型非常准确,但你获得的结果会因用例或其他因素而异。 如果你需要更高的精度,则必须采用通用模型或开发自己的模型 —— CLIP 模型只是一个起点。
代码摘要
你可以在 GitHub 存储库中找到完整的代码。 你可能正在检查 routes.py 中的代码,它实现了应用程序的主要逻辑。 除了明显的路线定义之外,你还应该关注定义 _infer 和 _search 端点(infer_trained_model 和 knn_search_images)的方法。 生成图像嵌入的代码位于 create-image-embeddings.py文件中。
总结
现在你已经设置了 Flask 应用程序,你可以轻松地搜索你自己的图像集! Elastic 在平台内提供了矢量搜索的原生集成,避免了与外部进程的通信。 你可以灵活地开发和使用你可能使用 PyTorch 开发的自定义嵌入模型。
语义图像搜索具有其他传统图像搜索方法的以下优点:
- 更高的准确度:向量相似性捕获上下文和关联,而不依赖于图像的文本元描述。
- 增强的用户体验:与猜测哪些关键字可能相关相比,描述你正在寻找的内容或提供示例图像。
- 图像数据库的分类:不用担心对图像进行分类——相似性搜索可以在一堆图像中找到相关图像,而无需对它们进行组织。
如果你的用例更多地依赖于文本数据,你可以在以前的博客中了解更多关于实现语义搜索和将自然语言处理应用于文本的信息。 对于文本数据,向量相似度与传统关键词评分的结合呈现了两全其美的效果。
原文: Elasticsearch:如何在 Elastic 中实现图片相似度搜索
在本文章,我们将了解如何通过几个步骤在 Elastic 中实施相似图像搜索。 开始设置应用程序环境,然后导入 NLP 模型,最后完成为你的图像集生成嵌入。
Elasticsearch:如何在 Elastic 中实现图片相似度搜索
如何设置环境
第一步是为你的应用程序设置环境。 一般要求包括:
- Git
- Python 3.9
- Docker
- 数百张图片
使用数百张图像以确保获得最佳效果非常重要。
转到工作文件夹并检查创建的存储库代码。 然后导航到存储库文件夹。
1. git clone https://github.com/radoondas/flask-elastic-image-search.git
2. cd flask-elastic-image-search
1. $ git clone https://github.com/radoondas/flask-elastic-image-search.git
2. Cloning into 'flask-elastic-image-search'...
3. remote: Enumerating objects: 105, done.
4. remote: Counting objects: 100% (105/105), done.
5. remote: Compressing objects: 100% (72/72), done.
6. remote: Total 105 (delta 37), reused 94 (delta 27), pack-reused 0
7. Receiving objects: 100% (105/105), 20.72 MiB | 9.75 MiB/s, done.
8. Resolving deltas: 100% (37/37), done.
9. $ cd flask-elastic-image-search/
10. $ pwd
11. /Users/liuxg/python/flask-elastic-image-search
因为你将使用 Python 来运行代码,所以你需要确保满足所有要求并且环境已准备就绪。 现在创建虚拟环境并安装所有依赖项。
1. python3 -m venv .venv
2. source .venv/bin/activate
3. pip install -r requirements.txt
安装
如果你还没有安装好自己的 Elasticsearch 及 Kibana,请参考如下的文章来进行安装:
特别注意的是:我们将以最新的 Elastic Stack 8.6.1 来进行展示。请参考 Elastic Stack 8.x 的文章进行安装。
启动白金版试用功能
由于上传模型是一个白金版的功能,我们需要启动试用功能。更多关于订阅的信息,请参考网址:订阅 | Elastic Stack 产品和支持 | Elastic。
这样我们就成功地启动了白金版试用功能。
Elasticsearch 集群和嵌入模型
登录到你的帐户以启动 Elasticsearch 集群。 设置一个小型集群:
- 一个具有 2GB 内存的 HOT 节点
- 一个具有 4GB 内存的 ML(机器学习)节点(此节点的大小很重要,因为你将导入 Elasticsearch 的 NLP 模型会消耗约 1.5GB 的内存。)
部署准备就绪后,转到 Kibana 并检查机器学习节点的容量。 你将在视图中看到一个机器学习节点。 目前没有加载模型。
使用 Eland 库从 OpenAI 上传 CLIP 嵌入模型。 Eland 是一个 Python Elasticsearch 客户端,用于在 Elasticsearch 中探索和分析数据,能够处理文本和图像。 您将使用此模型从文本输入生成嵌入并查询匹配图像。 在 Eland 库的文档中找到更多详细信息。
对于下一步,你将需要 Elasticsearch 端点。 你可以从部署详细信息部分的 Elasticsearch 云控制台获取它。
在本示例中,我们将使用本地部署来进行展示,所以,我们并不必要完成上面的步骤。
Eland
Eland 可以通过 pip 从 PyPI 安装。在安装之前,我们需要安装好自己的 Python。
1. $ python --version
2. Python 3.10.2
可以使用 Pip 从 PyPI 安装 Eland:
python -m pip install eland
也可以使用 Conda 从 Conda Forge 安装 Eland:
conda install -c conda-forge eland
希望在不安装 Eland 的情况下使用它的用户,为了只运行可用的脚本,可以构建 Docker 容器:
1. git clone https://github.com/elastic/eland
2. cd eland
3. docker build -t elastic/eland .
Eland 将 Hugging Face 转换器模型到其 TorchScript 表示的转换和分块过程封装在一个 Python 方法中; 因此,这是推荐的导入方法。
- 安装 Eland Python 客户端。
- 运行 eland_import_hub_model 脚本。 例如:
1. eland_import_hub_model --url <clusterUrl> \
2. --hub-model-id elastic/distilbert-base-cased-finetuned-conll03-english \
3. --task-type ner
- 指定 URL 以访问你的集群。 例如,https://user>:
@ : 。 - 在 Hugging Face 模型中心中指定模型的标识符。
- 指定 NLP 任务的类型。 支持的值为 fill_mask、ner、text_classification、text_embedding, question_answering 和 zero_shot_classification。
上传模型
我们使用如下的命令来进行上传模型:
1. eland_import_hub_model --url https://<user>:<password>@<hostname>:<port> \
2. --hub-model-id sentence-transformers/clip-ViT-B-32-multilingual-v1 \
3. --task-type text_embedding \
4. --ca-certs <your certificate> \
5. --start
针对我的情况:
1. eland_import_hub_model --url https://elastic:ZgzSt2vHNwA6yPn-fllr@localhost:9200 \
2. --hub-model-id sentence-transformers/clip-ViT-B-32-multilingual-v1 \
3. --task-type text_embedding \
4. --ca-certs /Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt \
5. --start
请注意: 你需要根据自己的 Elasticsearch 访问端点,用户名及密码来修改上面的设置,同时你需要根据自己的配置修改上面的证书路径。
运行上面的命令:
上面显示,我们已经成功地上传了模型。我们可以到 Kibana 中进行查看:
上面显示我们已经上传了所需要的 CLIP 模型,并且它的状态是 started。
如何创建图像嵌入
在设置 Elasticsearch 集群并导入嵌入模型后,你需要矢量化图像数据并为数据集中的每个图像创建图像嵌入。
要创建图像嵌入,请使用简单的 Python 脚本。 你可以在此处找到该脚本:create-image-embeddings.py。 该脚本将遍历你的图像目录并生成单独的图像嵌入。 它将使用名称和相对路径创建文档,并使用提供的映射将其保存到 Elasticsearch 索引 my-image-embeddings 中。
将所有图像(照片)放入文件夹 app/static/images。 使用带有子文件夹的目录结构来组织图像。 所有图像准备就绪后,使用几个参数执行脚本。
至少要有几百张图像才能获得合理的结果,这一点至关重要。 图像太少不会产生预期的结果,因为你要搜索的空间非常小,而且到搜索向量的距离也非常相似。我尝试在网上下载很多的照片,但是感觉一张一张地下载非常麻烦。你可以在谷歌浏览器中添加插件 Image downloader - Imageye。它可以方便地把很多照片一次下载下来。
在 image_embeddings 文件夹中,运行脚本并为变量使用你的值。
1. cd image_embeddings
2. python3 create-image-embeddings.py \
3. --es_host='https://localhost:9200' \
4. --es_user='elastic' --es_password='ZgzSt2vHNwA6yPn-fllr' \
5. --ca_certs='/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt'
根据图像的数量、它们的大小、你的 CPU 和你的网络连接,此任务将需要一些时间。 在尝试处理完整数据集之前,先试验少量图像。脚本完成后,你可以使用 Kibana 开发工具验证索引 my-image-embeddings 是否存在并具有相应的文档。
我们在Kibana 中进行查看:
GET _cat/indices/my-image-embeddings?v
上面命令的响应为:
1. health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
2. yellow open my-image-embeddings h6oUBdHCScWmXOZaf57oWg 1 1 145 0 1.4mb 1.4mb
查看文档,你会看到非常相似的 JSON 对象(如示例)。 你将在图像文件夹中看到图像名称、图像 ID 和相对路径。 此路径用于前端应用程序以在搜索时正确显示图像。JSON 文档中最重要的部分是包含 CLIP 模型生成的密集矢量的 image_embedding。 当应用程序正在搜索图像或类似图像时使用此矢量。
GET my-image-embeddings/_search
1. {
2. "_index": "my-image-embeddings",
3. "_id": "_g9ACIUBMEjlQge4tztV",
4. "_score": 6.703597,
5. "_source": {
6. "image_id": "IMG_4032",
7. "image_name": "IMG_4032.jpeg",
8. "image_embedding": [
9. -0.3415695130825043,
10. 0.1906963288784027,
11. .....
12. -0.10289803147315979,
13. -0.15871885418891907
14. ],
15. "relative_path": "phone/IMG_4032.jpeg"
16. }
17. }
使用 Flask 应用程序搜索图像
现在你的环境已全部设置完毕,你可以进行下一步,使用我们作为概念证明提供的 Flask 应用程序,使用自然语言实际搜索图像并查找相似图像。 该 Web 应用程序具有简单的 UI,使图像搜索变得简单。 你可以在此 GitHub 存储库中访问原型 Flask 应用程序。
后台应用程序执行两个任务。 在搜索框中输入搜索字符串后,文本将使用机器学习 _infer 端点进行矢量化。 然后,针对带有向量的索引 my-image-embeddings 执行带有密集向量的查询。
你可以在示例中看到这两个查询。 第一个 API 调用使用 _infer 端点,结果是一个密集矢量。
1. POST _ml/trained_models/sentence-transformers__clip-vit-b-32-multilingual-v1/_infer
2. {
3. "docs" : [
4. {"text_field": "Yellow mountain is the most beautiful mountain in China"}
5. ]
6. }
上面的响应如下:
在第二个任务中,搜索查询,我们将使用密集矢量并获得按分数排序的图像。
`
1. GET my-image-embeddings/_search
2. {
3. "fields": [
4. "image_id",
5. "image_name",
6. "relative_path"
7. ],
8. "_source": false,
9. "knn": {
10. "field": "image_embedding",
11. "k": 5,
12. "num_candidates": 10,
13. "query_vector": [
14. 0.03395160660147667,
15. 0.007704082876443863,
16. 0.14996188879013062,
17. -0.10693030804395676,
18. ...
19. 0.05140634626150131,
20. 0.07114913314580917
21. ]
22. }
23. }
`
要启动并运行 Flask 应用程序,请导航到存储库的根文件夹并配置 .env 文件。 配置文件中的值用于连接到 Elasticsearch 集群。 你需要为以下变量插入值。 这些与图像嵌入生成中使用的值相同。
.env
1. ES_HOST='URL:PORT'
2. ES_USER='elastic'
3. ES_PWD='password'
为了能够使得我们自构建的 Elasticsearch 集群能够被正确地访问,我们必须把 Elasticsearch 的根证书拷贝到 Flask 应用的相应目录中:
flask-elastic-image-search/app/conf/ca.crt
1. (.venv) $ pwd
2. /Users/liuxg/python/flask-elastic-image-search/app/conf
3. (.venv) $ cp ~/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt ca.crt
4. overwrite ca.crt? (y/n [n]) y
在上面,我们替换了仓库中原有的证书文件 ca.crt。
准备就绪后,运行主文件夹中的 flask 应用程序并等待它启动。
1. # In the main directory
2. $ flask run --port=5001
如果应用程序启动,你将看到类似于下面的输出,它在末尾指示你需要访问哪个 URL 才能访问该应用程序。
恭喜! 你的应用程序现在应该已启动并正在运行,并且可以通过互联网浏览器在 http://127.0.0.1:5001 上访问。
导航到图像搜索选项卡并输入描述你最佳图像的文本。 尝试使用非关键字或描述性文字。
在下面的示例中,输入的文本是 “Yellow mountain is the most beautiful mountain in China”。 结果显示在我们的数据集中。 如果用户喜欢结果集中的一张特定图像,只需单击它旁边的按钮,就会显示类似的图像。 用户可以无限次地这样做,并通过图像数据集构建自己的路径。
我们尝试另外的一个例子。这次我们输入:I love beautiful girls。
搜索也可以通过简单地上传图像来进行。 该应用程序会将图像转换为矢量并在数据集中搜索相似的图像。 为此,导航到第三个选项卡 “Similar Image”,从磁盘上传图像,然后点击 “Search”。
我们可以看到相似的图片。我们尝试使用一个女孩的照片再试试:
因为我们在 Elasticsearch 中使用的 NLP(sentence-transformers/clip-ViT-B-32-multilingual-v1)模型是多语言的,支持多语言推理,所以尽量搜索自己语言的图片。 然后也使用英文文本验证结果。我们尝试使用 “黄山是中国最漂亮的山”:
请务必注意,使用的模型是通用模型,这些模型非常准确,但你获得的结果会因用例或其他因素而异。 如果你需要更高的精度,则必须采用通用模型或开发自己的模型 —— CLIP 模型只是一个起点。
代码摘要
你可以在 GitHub 存储库中找到完整的代码。 你可能正在检查 routes.py 中的代码,它实现了应用程序的主要逻辑。 除了明显的路线定义之外,你还应该关注定义 _infer 和 _search 端点(infer_trained_model 和 knn_search_images)的方法。 生成图像嵌入的代码位于 create-image-embeddings.py文件中。
总结
现在你已经设置了 Flask 应用程序,你可以轻松地搜索你自己的图像集! Elastic 在平台内提供了矢量搜索的原生集成,避免了与外部进程的通信。 你可以灵活地开发和使用你可能使用 PyTorch 开发的自定义嵌入模型。
语义图像搜索具有其他传统图像搜索方法的以下优点:
- 更高的准确度:向量相似性捕获上下文和关联,而不依赖于图像的文本元描述。
- 增强的用户体验:与猜测哪些关键字可能相关相比,描述你正在寻找的内容或提供示例图像。
- 图像数据库的分类:不用担心对图像进行分类——相似性搜索可以在一堆图像中找到相关图像,而无需对它们进行组织。
如果你的用例更多地依赖于文本数据,你可以在以前的博客中了解更多关于实现语义搜索和将自然语言处理应用于文本的信息。 对于文本数据,向量相似度与传统关键词评分的结合呈现了两全其美的效果。
准备好开始了吗? 在我们的虚拟活动中心报名参加矢量搜索实践研讨会,并在我们的在线论坛中与社区互动。
收起阅读 »
Web Scraper + Elasticsearch + Kibana + SearchKit 打造的豆瓣电影top250 搜索演示系统
Web Scraper + Elasticsearch + Kibana + SearchKit 打造的豆瓣电影top250 搜索演示系统
作者:小森同学
声明:电影数据来源于“豆瓣电影”,如有侵权,请联系删除
Web Scraper
{
"_id": "top250",
"startUrl": ["https://movie.douban.com/top250?start=[0-225:25]&filter="],
"selectors": [{
"id": "container",
"multiple": true,
"parentSelectors": ["_root"],
"selector": ".grid_view li",
"type": "SelectorElement"
}, {
"id": "name",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "span.title:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "number",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "em",
"type": "SelectorText"
}, {
"id": "score",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "span.rating_num",
"type": "SelectorText"
}, {
"id": "review",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "span.inq",
"type": "SelectorText"
}, {
"id": "year",
"multiple": false,
"parentSelectors": ["container"],
"regex": "\\d{4}",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "tour_guide",
"multiple": false,
"parentSelectors": ["container"],
"regex": "^导演: \\S*",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "type",
"multiple": false,
"parentSelectors": ["container"],
"regex": "[^/]+$",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "area",
"multiple": false,
"parentSelectors": ["container"],
"regex": "[^\\/]+(?=\\/[^\\/]*$)",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "detail_link",
"multiple": false,
"parentSelectors": ["container"],
"selector": ".hd a",
"type": "SelectorLink"
}, {
"id": "director",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "",
"selector": "span:nth-of-type(1) .attrs a",
"type": "SelectorText"
}, {
"id": "screenwriter",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=编剧: )[\\u4e00-\\u9fa5A-Za-z0-9/()\\·\\s]+(?=主演)",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "film_length",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "\\d+",
"selector": "span[property='v:runtime']",
"type": "SelectorText"
}, {
"id": "IMDb",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=[IMDb:\\s+])\\S*(?=\\d*$)",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "language",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=语言: )\\S+",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "alias",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=又名: )[\\u4e00-\\u9fa5A-Za-z0-9/()\\s]+(?=IMDb)",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "pic",
"multiple": false,
"parentSelectors": ["container"],
"selector": "img",
"type": "SelectorImage"
}]
}
elasticsearch
{
"mappings": {
"properties": {
"IMDb": {
"type": "keyword",
"copy_to": [
"all"
]
},
"alias": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"all": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"area": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"director": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"film_length": {
"type": "long"
},
"id": {
"type": "keyword"
},
"language": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"link": {
"type": "keyword"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"number": {
"type": "long"
},
"photo": {
"type": "keyword"
},
"review": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"score": {
"type": "double"
},
"screenwriter": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"year": {
"type": "long"
}
}
}
}
kibana
需要使用pipeline对索引字段进行处理,如对type 通过空格进行分割为数组等,可以参照官方文档或其他博客。
制作仪表板省略, 请自行搜索
SearchKit
Web Scraper + Elasticsearch + Kibana + SearchKit 打造的豆瓣电影top250 搜索演示系统
作者:小森同学
声明:电影数据来源于“豆瓣电影”,如有侵权,请联系删除
Web Scraper
{
"_id": "top250",
"startUrl": ["https://movie.douban.com/top250?start=[0-225:25]&filter="],
"selectors": [{
"id": "container",
"multiple": true,
"parentSelectors": ["_root"],
"selector": ".grid_view li",
"type": "SelectorElement"
}, {
"id": "name",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "span.title:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "number",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "em",
"type": "SelectorText"
}, {
"id": "score",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "span.rating_num",
"type": "SelectorText"
}, {
"id": "review",
"multiple": false,
"parentSelectors": ["container"],
"regex": "",
"selector": "span.inq",
"type": "SelectorText"
}, {
"id": "year",
"multiple": false,
"parentSelectors": ["container"],
"regex": "\\d{4}",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "tour_guide",
"multiple": false,
"parentSelectors": ["container"],
"regex": "^导演: \\S*",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "type",
"multiple": false,
"parentSelectors": ["container"],
"regex": "[^/]+$",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "area",
"multiple": false,
"parentSelectors": ["container"],
"regex": "[^\\/]+(?=\\/[^\\/]*$)",
"selector": "p:nth-of-type(1)",
"type": "SelectorText"
}, {
"id": "detail_link",
"multiple": false,
"parentSelectors": ["container"],
"selector": ".hd a",
"type": "SelectorLink"
}, {
"id": "director",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "",
"selector": "span:nth-of-type(1) .attrs a",
"type": "SelectorText"
}, {
"id": "screenwriter",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=编剧: )[\\u4e00-\\u9fa5A-Za-z0-9/()\\·\\s]+(?=主演)",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "film_length",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "\\d+",
"selector": "span[property='v:runtime']",
"type": "SelectorText"
}, {
"id": "IMDb",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=[IMDb:\\s+])\\S*(?=\\d*$)",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "language",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=语言: )\\S+",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "alias",
"multiple": false,
"parentSelectors": ["detail_link"],
"regex": "(?<=又名: )[\\u4e00-\\u9fa5A-Za-z0-9/()\\s]+(?=IMDb)",
"selector": "div#info",
"type": "SelectorText"
}, {
"id": "pic",
"multiple": false,
"parentSelectors": ["container"],
"selector": "img",
"type": "SelectorImage"
}]
}
elasticsearch
{
"mappings": {
"properties": {
"IMDb": {
"type": "keyword",
"copy_to": [
"all"
]
},
"alias": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"all": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"area": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"director": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"film_length": {
"type": "long"
},
"id": {
"type": "keyword"
},
"language": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"link": {
"type": "keyword"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"number": {
"type": "long"
},
"photo": {
"type": "keyword"
},
"review": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"score": {
"type": "double"
},
"screenwriter": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"copy_to": [
"all"
],
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"year": {
"type": "long"
}
}
}
}
kibana
需要使用pipeline对索引字段进行处理,如对type 通过空格进行分割为数组等,可以参照官方文档或其他博客。
制作仪表板省略, 请自行搜索
SearchKit
参考 https://github.com/searchkit/searchkit-starter-app
收起阅读 »
如何正确处理 Elasticsearch 摄取管道故障

Elasticsearch:使用 pipelines 路由文档到想要的 Elasticsearch 索引中去
原文地址 elasticstack.blog.csdn.net
路由文件
当应用程序需要向 Elasticsearch 添加文档时,它们首先要知道目标索引是什么。在很多的应用案例中,特别是针对时序数据,我们想把每个月的数据写入到一个特定的索引中。一方面便于管理索引,另外一方面在将来搜索的时候可以按照每个月的索引来进行搜索,这样速度更快,更便捷。
当你处于某种类型的文档总是转到特定索引的琐碎情况时,这似乎很明显,但当你的索引名称可能根据杂项参数(无论它们是否在你的系统外部 - 当前例如日期 - 或者你尝试存储的文档的固有属性 - 大多数时候是文档字段之一的值)。
当发生最后一种情况时(我们指的是索引名称可以变化的情况),在向 Elasticsearch 发出索引命令之前,你的应用程序需要计算目标索引的名称。
此外 —— 即使一开始这看起来像是一种反模式 —— 你可以有多个应用程序需要在索引中索引相同类型的文档,这些文档的名称可能会发生变化。 现在你必须维护跨多个组件重复的索引名称计算逻辑:就可维护性和敏捷性而言,这不是好消息。
Logstash —— Elastic Stack 的一个知名成员 —— 可以帮助集中这样的逻辑,但代价是维护另一个正在运行的软件,这需要配置、知识等。
我们想要在本文中展示的是通过将索引名称计算委托给 Elasticsearch 而不是我们的应用程序来解决此问题的解决方案。
Date index name processor 介绍
该处理器的目的是通过使用日期数学索引名称支持,根据文档中的日期或时间戳字段将文档指向基于正确时间的索引。
处理器根据提供的索引名称前缀、正在处理的文档中的日期或时间戳字段以及提供的日期舍入,使用日期数学索引名称表达式设置 _index 元数据字段。
首先,此处理器从正在处理的文档中的字段中获取日期或时间戳。 或者,可以根据字段值应如何解析为日期来配置日期格式。 然后这个日期、提供的索引名称前缀和提供的日期舍入被格式化为日期数学索引名称表达式。 此处还可以选择日期格式指定日期应如何格式化为日期数学索引名称表达式。
将文档指向每月索引的示例管道,该索引以基于 date1 字段中的日期的 my-index-prefix 开头:
1. PUT _ingest/pipeline/monthlyindex
2. {
3. "description": "monthly date-time index naming",
4. "processors" : [
5. {
6. "date_index_name" : {
7. "field" : "date1",
8. "index_name_prefix" : "my-",
9. "date_rounding" : "M"
10. }
11. }
12. ]
13. }
使用该管道进行索引请求:
1. PUT /my-index/_doc/1?pipeline=monthlyindex
2. {
3. "date1" : "2016-04-25T12:02:01.789Z"
4. }
上面命令运行的结果是:
1. {
2. "_index": "my-index-2016-04-01",
3. "_id": "1",
4. "_version": 1,
5. "result": "created",
6. "_shards": {
7. "total": 2,
8. "successful": 1,
9. "failed": 0
10. },
11. "_seq_no": 0,
12. "_primary_term": 1
13. }
上面的请求不会将这个文档索引到 my-index 索引中,而是索引到 my-index-2016-04-01 索引中,因为它是按月取整的。 这是因为 date-index-name-processor 覆盖了文档的 _index 属性。
要查看导致上述文档被索引到 my-index-2016-04-01 中的实际索引请求中提供的索引的日期数学(date-math)值,我们可以使用模拟请求检查处理器的效果。
1. POST _ingest/pipeline/_simulate
2. {
3. "pipeline" :
4. {
5. "description": "monthly date-time index naming",
6. "processors" : [
7. {
8. "date_index_name" : {
9. "field" : "date1",
10. "index_name_prefix" : "my-",
11. "date_rounding" : "M"
12. }
13. }
14. ]
15. },
16. "docs": [
17. {
18. "_source": {
19. "date1": "2016-04-25T12:02:01.789Z"
20. }
21. }
22. ]
23. }
上面命令返回结果:
`
1. {
2. "docs": [
3. {
4. "doc": {
5. "_index": "<my-index-{2016-04-25||/M{yyyy-MM-dd|UTC}}>",
6. "_id": "_id",
7. "_version": "-3",
8. "_source": {
9. "date1": "2016-04-25T12:02:01.789Z"
10. },
11. "_ingest": {
12. "timestamp": "2023-02-23T01:15:52.214364Z"
13. }
14. }
15. }
16. ]
17. }
`
以上示例显示 _index 设置为 <my-index-{2016-04-25||/M{yyyy-MM-dd|UTC}}>。 Elasticsearch 将其理解为 2016-04-01,如日期数学索引名称文档中所述。
日期索引名称选项
以下是使用 date index name 的一些选项
no | - | 处理器的描述。 用于描述处理器的用途或其配置。 | |
if | no | - | 有条件地执行处理器。 请参阅有条件地运行处理器。 |
ignore_failure | no | false | 忽略处理器的故障。 请参阅处理管道故障。 |
on_failure | no | - | 处理处理器的故障。 请参阅处理管道故障。 |
tag | no | - | 处理器的标识符。 用于调试和指标 |
使用案例 - 基于时间的时序索引
这是一个众所周知的用例,通常在您要处理日志时发现。这个想法是索引文档,索引的名称由根名称和根据日志事件的日期计算的值组成。 该日期实际上是你要索引的文档的一个字段。
这不是本文的重点,但以这种方式索引文档有几个优点,包括更容易的数据管理、启用冷/暖架构等。让我们举个例子。
假设我们必须处理来自多个来源的数据 —— 例如物联网。 我们的每个对象每分钟都会向不同的后端发送一些数据(是的,这真的很可悲,但我们的对象不通过相同的网络进行通信,因此选择通过多个系统来处理这个问题)。
对象发送的数据被转换成如下所示的 JSON 文档:
1. POST data/_doc?pipeline=compute-index-name
2. {
3. "objectId": 1234,
4. "manufacturer": "SHIELD",
5. "payload": "some_data",
6. "date": "2019-04-01T12:10:30.000Z"
7. }
我们有一个用于传输数据的对象的 UID、一个制造商 ID、一个有效负载部分和一个日期字段。
索引名称计算
假设我们要将对象的数据存储在名为 data-{YYYYMMDD} 的索引中,其中根名称是数据后跟日期模式。基于上面的例子,后端收到这个文档应该怎么办呢?
首先它必须解析它以提取日期字段的值,然后它必须根据它在文档中找到的日期计算目标索引名称。 最后,它向 Elasticsearch 向刚刚计算出名称的索引发出索引请求。
1. document.date = "2019-04-01T12:10:30Z"
2. index.name = "" + "20190401"
在我们的例子中,我们有几个后端必须知道如何计算索引名称,因此必须知道索引的命名逻辑。
如果索引名的计算直接由 Elasticsearch 进行,岂不是更聪明?
Ingest pipeline 的力量
从 Elasticsearch 的第 5 版开始,我们现在有了一种称为摄取的节点。默认情况下,集群的所有节点都具有 ingest 类型。这些节点有权在索引文档之前执行所谓的管道。 管道是一组处理器,每个处理器都可以以某种特定方式转换输入文档。当一个文档被摄入到 Elasticsearch 集群时,它的工作流是这样的。
从上面,我们可以看出来,在文档被写入之前,它必须经过 ingest node 进行处理。我们可以通过 date index name processor 来获得我们想要的 index 名称,进而写入到我们想要的索引中去。 这里有用的是,管道不仅可以转换文档的固有数据,还可以修改文档元数据,特别是它的 _index 属性。
现在让我们回到我们的例子。我们建议定义一个管道来完成这项工作,而不是将索引名称计算委托给应用程序。根据文档,此处理器允许你定义包含日期的字段名称、索引的根名称(前缀)以及计算附加到此前缀的日期的舍入方法。
如果我们想将 IoT 数据添加到模式为 data-{YYYYMMDD} 的索引中,我们只需创建如下所示的管道:
1. PUT _ingest/pipeline/compute-index-name
2. {
3. "description": "Set the document destination index by appending a prefix and its 'date' field",
4. "processors": [
5. {
6. "date_index_name": {
7. "field": "date",
8. "index_name_prefix": "",
9. "date_rounding": "d",
10. "index_name_format": "yyyyMMdd"
11. }
12. }
13. ]
14. }
一个索引 = 一个管道?
好的,现在我们知道如何定义一个管道来为特定的目标索引建立一个名称。 但是我们可以通过操纵文档元数据来做更多的事情!
假设我们有不同类型的文档,每个文档都有一个日期字段,但需要在不同的索引中进行索引。计算目标索引名称的逻辑对于每种文档类型都是相同的,但使用上述策略将导致创建多个管道。
让我们试着做一些更简单和可重用的东西。
回到我们的示例,我们现在有两种文档类型:一种需要在 adata-{YYYYMMDD} 索引(和以前一样)中建立索引,另一种其目的地是名为 new_data-{YYYYMMDD} 的索引。
目标为 new_data 的文档具有以下结构:
1. {
2. "newObjectId": 1234,
3. "source": "HYDRA",
4. "payload": "some_data",
5. "date": "2019-04-02T13:10:30.000Z"
6. }
该结构与标准 IoT 文档略有不同,但重要的是日期字段存在于两个映射中。
现在我们要定义一个管道来计算我们两种文档类型的目标索引。 我们所要做的就是通过分析通过索引 API 发出的请求目的地来构建目的地索引名称。
1. PUT _ingest/pipeline/compute-index-name
2. {
3. "description": "Set the document destination index by appending the requested index and its 'date' field",
4. "processors": [
5. {
6. "date_index_name": {
7. "field": "date",
8. "index_name_prefix": "{{ _index }}-",
9. "date_rounding": "d",
10. "index_name_format": "yyyyMMdd"
11. }
12. }
13. ]
14. }
请注意,索引名称前缀现在位于名为_index 的索引元数据字段中。 通过使用这个字段,我们的管道现在是通用的并且可以与任何索引一起使用 —— 假设目标索引是根据相同的规则计算的。
使用我们的 “路由” 管道
现在我们有了一个能够根据文档的日期字段计算目标索引名称的通用管道,让我们看看如何让 Elasticsearch 使用它。
我们可以通过两种方式告诉 Elasticsearch 使用管道,让我们评估一下。
Index API 调用
第一个 —— 也是直接的解决方案——是使用 Index API 的管道参数。
换句话说:每次你想索引一个文档,你必须告诉 Elasticsearch 要使用的管道。
1. POST data/_doc?pipeline=compute-index-name
2. {
3. "objectId": 1234,
4. "manufacturer": "SHIELD",
5. "payload": "some_data",
6. "date": "2019-04-01T12:10:30.000Z"
7. }
现在,每次我们通过指示 compute-index-name 管道将文档添加到索引中时,该文档将被添加到正确的基于时间的索引中。 在此示例中,目标索引将为 data-20190401 。
我们提供给 Index API 的 data 索引名称呢? 它可以被看作是一个假索引:它只是用来执行 API 调用并且是真正目标索引的根,它不一定存在!
默认管道:引入 “虚拟索引”
索引默认管道(default pipeline)是使用管道的另一种有用方式:当你创建索引时,有一个名为 index.default_pipeline 的设置可以设置为管道的名称,只要你将文档添加到相应的索引就会执行该管道并且没有管道被添加到 API 调用中。 你还可以在索引文档时使用特殊管道名称 _none 来绕过此默认索引。 通过使用此功能,你可以定义我称之为 “虚拟索引” 的内容,并将其与默认管道相关联,该默认管道将充当我们上面看到的路由管道。
让我们将其应用到我们的示例中。
我们假设我们的通用路由管道 compute-index-name 已经创建。 我们现在可以创建一个名为 data 的索引,它将使用此管道作为其默认管道。
1. PUT data
2. {
3. "settings" : {
4. "index" : {
5. "number_of_shards" : 3,
6. "number_of_replicas" : 1,
7. "default_pipeline" : "compute-index-name"
8. }
9. }
10. }
现在,每次我们要求 Elasticsearch 为数据索引中的文档编制索引时,计算索引名称管道将负责该文档的实际路由。 因此,数据索引中永远不会有单个文档被索引,但我们将调用管道的责任完全委托给 Elasticsearch。
运行完上面的命令后,我们来尝试写入一个文档:
1. POST data/_doc
2. {
3. "objectId": 1234,
4. "manufacturer": "SHIELD",
5. "payload": "some_data",
6. "date": "2019-04-01T12:10:30.000Z"
7. }
上面的命令返回的结果是:
1. {
2. "_index": "data-20190401",
3. "_id": "2DMGfIYBaog4blQ55Qr7",
4. "_version": 1,
5. "result": "created",
6. "_shards": {
7. "total": 2,
8. "successful": 1,
9. "failed": 0
10. },
11. "_seq_no": 1,
12. "_primary_term": 1
13. }
结论
我们刚刚在这里看到了如何利用 Elasticsearch 中的管道功能根据文档的固有属性来路由文档。Ingest pipeline 不仅仅可以替代 Logstash 过滤器:你可以定义复杂的管道,使用多个处理器(一个特定的处理器甚至允许你调用另一个管道)、条件等。有关 ingest pipeline 的更多文章,请参阅 “Elastic:开发者上手指南” 文章中的 “Ingest pipeline” 章节。
在我看来,本文末尾看到的 “虚拟索引” 非常有趣。 包含创建这样一个并非真正的索引的索引只是为了创建路由管道的入口点的功能甚至可以成为 Elasticsearch 的一个新的和有用的功能,为什么不呢?
原文地址 elasticstack.blog.csdn.net
路由文件
当应用程序需要向 Elasticsearch 添加文档时,它们首先要知道目标索引是什么。在很多的应用案例中,特别是针对时序数据,我们想把每个月的数据写入到一个特定的索引中。一方面便于管理索引,另外一方面在将来搜索的时候可以按照每个月的索引来进行搜索,这样速度更快,更便捷。
当你处于某种类型的文档总是转到特定索引的琐碎情况时,这似乎很明显,但当你的索引名称可能根据杂项参数(无论它们是否在你的系统外部 - 当前例如日期 - 或者你尝试存储的文档的固有属性 - 大多数时候是文档字段之一的值)。
当发生最后一种情况时(我们指的是索引名称可以变化的情况),在向 Elasticsearch 发出索引命令之前,你的应用程序需要计算目标索引的名称。
此外 —— 即使一开始这看起来像是一种反模式 —— 你可以有多个应用程序需要在索引中索引相同类型的文档,这些文档的名称可能会发生变化。 现在你必须维护跨多个组件重复的索引名称计算逻辑:就可维护性和敏捷性而言,这不是好消息。
Logstash —— Elastic Stack 的一个知名成员 —— 可以帮助集中这样的逻辑,但代价是维护另一个正在运行的软件,这需要配置、知识等。
我们想要在本文中展示的是通过将索引名称计算委托给 Elasticsearch 而不是我们的应用程序来解决此问题的解决方案。
Date index name processor 介绍
该处理器的目的是通过使用日期数学索引名称支持,根据文档中的日期或时间戳字段将文档指向基于正确时间的索引。
处理器根据提供的索引名称前缀、正在处理的文档中的日期或时间戳字段以及提供的日期舍入,使用日期数学索引名称表达式设置 _index 元数据字段。
首先,此处理器从正在处理的文档中的字段中获取日期或时间戳。 或者,可以根据字段值应如何解析为日期来配置日期格式。 然后这个日期、提供的索引名称前缀和提供的日期舍入被格式化为日期数学索引名称表达式。 此处还可以选择日期格式指定日期应如何格式化为日期数学索引名称表达式。
将文档指向每月索引的示例管道,该索引以基于 date1 字段中的日期的 my-index-prefix 开头:
1. PUT _ingest/pipeline/monthlyindex
2. {
3. "description": "monthly date-time index naming",
4. "processors" : [
5. {
6. "date_index_name" : {
7. "field" : "date1",
8. "index_name_prefix" : "my-",
9. "date_rounding" : "M"
10. }
11. }
12. ]
13. }
使用该管道进行索引请求:
1. PUT /my-index/_doc/1?pipeline=monthlyindex
2. {
3. "date1" : "2016-04-25T12:02:01.789Z"
4. }
上面命令运行的结果是:
1. {
2. "_index": "my-index-2016-04-01",
3. "_id": "1",
4. "_version": 1,
5. "result": "created",
6. "_shards": {
7. "total": 2,
8. "successful": 1,
9. "failed": 0
10. },
11. "_seq_no": 0,
12. "_primary_term": 1
13. }
上面的请求不会将这个文档索引到 my-index 索引中,而是索引到 my-index-2016-04-01 索引中,因为它是按月取整的。 这是因为 date-index-name-processor 覆盖了文档的 _index 属性。
要查看导致上述文档被索引到 my-index-2016-04-01 中的实际索引请求中提供的索引的日期数学(date-math)值,我们可以使用模拟请求检查处理器的效果。
1. POST _ingest/pipeline/_simulate
2. {
3. "pipeline" :
4. {
5. "description": "monthly date-time index naming",
6. "processors" : [
7. {
8. "date_index_name" : {
9. "field" : "date1",
10. "index_name_prefix" : "my-",
11. "date_rounding" : "M"
12. }
13. }
14. ]
15. },
16. "docs": [
17. {
18. "_source": {
19. "date1": "2016-04-25T12:02:01.789Z"
20. }
21. }
22. ]
23. }
上面命令返回结果:
`
1. {
2. "docs": [
3. {
4. "doc": {
5. "_index": "<my-index-{2016-04-25||/M{yyyy-MM-dd|UTC}}>",
6. "_id": "_id",
7. "_version": "-3",
8. "_source": {
9. "date1": "2016-04-25T12:02:01.789Z"
10. },
11. "_ingest": {
12. "timestamp": "2023-02-23T01:15:52.214364Z"
13. }
14. }
15. }
16. ]
17. }
`
以上示例显示 _index 设置为 <my-index-{2016-04-25||/M{yyyy-MM-dd|UTC}}>。 Elasticsearch 将其理解为 2016-04-01,如日期数学索引名称文档中所述。
日期索引名称选项
以下是使用 date index name 的一些选项
no | - | 处理器的描述。 用于描述处理器的用途或其配置。 | |
if | no | - | 有条件地执行处理器。 请参阅有条件地运行处理器。 |
ignore_failure | no | false | 忽略处理器的故障。 请参阅处理管道故障。 |
on_failure | no | - | 处理处理器的故障。 请参阅处理管道故障。 |
tag | no | - | 处理器的标识符。 用于调试和指标 |
使用案例 - 基于时间的时序索引
这是一个众所周知的用例,通常在您要处理日志时发现。这个想法是索引文档,索引的名称由根名称和根据日志事件的日期计算的值组成。 该日期实际上是你要索引的文档的一个字段。
这不是本文的重点,但以这种方式索引文档有几个优点,包括更容易的数据管理、启用冷/暖架构等。让我们举个例子。
假设我们必须处理来自多个来源的数据 —— 例如物联网。 我们的每个对象每分钟都会向不同的后端发送一些数据(是的,这真的很可悲,但我们的对象不通过相同的网络进行通信,因此选择通过多个系统来处理这个问题)。
对象发送的数据被转换成如下所示的 JSON 文档:
1. POST data/_doc?pipeline=compute-index-name
2. {
3. "objectId": 1234,
4. "manufacturer": "SHIELD",
5. "payload": "some_data",
6. "date": "2019-04-01T12:10:30.000Z"
7. }
我们有一个用于传输数据的对象的 UID、一个制造商 ID、一个有效负载部分和一个日期字段。
索引名称计算
假设我们要将对象的数据存储在名为 data-{YYYYMMDD} 的索引中,其中根名称是数据后跟日期模式。基于上面的例子,后端收到这个文档应该怎么办呢?
首先它必须解析它以提取日期字段的值,然后它必须根据它在文档中找到的日期计算目标索引名称。 最后,它向 Elasticsearch 向刚刚计算出名称的索引发出索引请求。
1. document.date = "2019-04-01T12:10:30Z"
2. index.name = "" + "20190401"
在我们的例子中,我们有几个后端必须知道如何计算索引名称,因此必须知道索引的命名逻辑。
如果索引名的计算直接由 Elasticsearch 进行,岂不是更聪明?
Ingest pipeline 的力量
从 Elasticsearch 的第 5 版开始,我们现在有了一种称为摄取的节点。默认情况下,集群的所有节点都具有 ingest 类型。这些节点有权在索引文档之前执行所谓的管道。 管道是一组处理器,每个处理器都可以以某种特定方式转换输入文档。当一个文档被摄入到 Elasticsearch 集群时,它的工作流是这样的。
从上面,我们可以看出来,在文档被写入之前,它必须经过 ingest node 进行处理。我们可以通过 date index name processor 来获得我们想要的 index 名称,进而写入到我们想要的索引中去。 这里有用的是,管道不仅可以转换文档的固有数据,还可以修改文档元数据,特别是它的 _index 属性。
现在让我们回到我们的例子。我们建议定义一个管道来完成这项工作,而不是将索引名称计算委托给应用程序。根据文档,此处理器允许你定义包含日期的字段名称、索引的根名称(前缀)以及计算附加到此前缀的日期的舍入方法。
如果我们想将 IoT 数据添加到模式为 data-{YYYYMMDD} 的索引中,我们只需创建如下所示的管道:
1. PUT _ingest/pipeline/compute-index-name
2. {
3. "description": "Set the document destination index by appending a prefix and its 'date' field",
4. "processors": [
5. {
6. "date_index_name": {
7. "field": "date",
8. "index_name_prefix": "",
9. "date_rounding": "d",
10. "index_name_format": "yyyyMMdd"
11. }
12. }
13. ]
14. }
一个索引 = 一个管道?
好的,现在我们知道如何定义一个管道来为特定的目标索引建立一个名称。 但是我们可以通过操纵文档元数据来做更多的事情!
假设我们有不同类型的文档,每个文档都有一个日期字段,但需要在不同的索引中进行索引。计算目标索引名称的逻辑对于每种文档类型都是相同的,但使用上述策略将导致创建多个管道。
让我们试着做一些更简单和可重用的东西。
回到我们的示例,我们现在有两种文档类型:一种需要在 adata-{YYYYMMDD} 索引(和以前一样)中建立索引,另一种其目的地是名为 new_data-{YYYYMMDD} 的索引。
目标为 new_data 的文档具有以下结构:
1. {
2. "newObjectId": 1234,
3. "source": "HYDRA",
4. "payload": "some_data",
5. "date": "2019-04-02T13:10:30.000Z"
6. }
该结构与标准 IoT 文档略有不同,但重要的是日期字段存在于两个映射中。
现在我们要定义一个管道来计算我们两种文档类型的目标索引。 我们所要做的就是通过分析通过索引 API 发出的请求目的地来构建目的地索引名称。
1. PUT _ingest/pipeline/compute-index-name
2. {
3. "description": "Set the document destination index by appending the requested index and its 'date' field",
4. "processors": [
5. {
6. "date_index_name": {
7. "field": "date",
8. "index_name_prefix": "{{ _index }}-",
9. "date_rounding": "d",
10. "index_name_format": "yyyyMMdd"
11. }
12. }
13. ]
14. }
请注意,索引名称前缀现在位于名为_index 的索引元数据字段中。 通过使用这个字段,我们的管道现在是通用的并且可以与任何索引一起使用 —— 假设目标索引是根据相同的规则计算的。
使用我们的 “路由” 管道
现在我们有了一个能够根据文档的日期字段计算目标索引名称的通用管道,让我们看看如何让 Elasticsearch 使用它。
我们可以通过两种方式告诉 Elasticsearch 使用管道,让我们评估一下。
Index API 调用
第一个 —— 也是直接的解决方案——是使用 Index API 的管道参数。
换句话说:每次你想索引一个文档,你必须告诉 Elasticsearch 要使用的管道。
1. POST data/_doc?pipeline=compute-index-name
2. {
3. "objectId": 1234,
4. "manufacturer": "SHIELD",
5. "payload": "some_data",
6. "date": "2019-04-01T12:10:30.000Z"
7. }
现在,每次我们通过指示 compute-index-name 管道将文档添加到索引中时,该文档将被添加到正确的基于时间的索引中。 在此示例中,目标索引将为 data-20190401 。
我们提供给 Index API 的 data 索引名称呢? 它可以被看作是一个假索引:它只是用来执行 API 调用并且是真正目标索引的根,它不一定存在!
默认管道:引入 “虚拟索引”
索引默认管道(default pipeline)是使用管道的另一种有用方式:当你创建索引时,有一个名为 index.default_pipeline 的设置可以设置为管道的名称,只要你将文档添加到相应的索引就会执行该管道并且没有管道被添加到 API 调用中。 你还可以在索引文档时使用特殊管道名称 _none 来绕过此默认索引。 通过使用此功能,你可以定义我称之为 “虚拟索引” 的内容,并将其与默认管道相关联,该默认管道将充当我们上面看到的路由管道。
让我们将其应用到我们的示例中。
我们假设我们的通用路由管道 compute-index-name 已经创建。 我们现在可以创建一个名为 data 的索引,它将使用此管道作为其默认管道。
1. PUT data
2. {
3. "settings" : {
4. "index" : {
5. "number_of_shards" : 3,
6. "number_of_replicas" : 1,
7. "default_pipeline" : "compute-index-name"
8. }
9. }
10. }
现在,每次我们要求 Elasticsearch 为数据索引中的文档编制索引时,计算索引名称管道将负责该文档的实际路由。 因此,数据索引中永远不会有单个文档被索引,但我们将调用管道的责任完全委托给 Elasticsearch。
运行完上面的命令后,我们来尝试写入一个文档:
1. POST data/_doc
2. {
3. "objectId": 1234,
4. "manufacturer": "SHIELD",
5. "payload": "some_data",
6. "date": "2019-04-01T12:10:30.000Z"
7. }
上面的命令返回的结果是:
1. {
2. "_index": "data-20190401",
3. "_id": "2DMGfIYBaog4blQ55Qr7",
4. "_version": 1,
5. "result": "created",
6. "_shards": {
7. "total": 2,
8. "successful": 1,
9. "failed": 0
10. },
11. "_seq_no": 1,
12. "_primary_term": 1
13. }
结论
我们刚刚在这里看到了如何利用 Elasticsearch 中的管道功能根据文档的固有属性来路由文档。Ingest pipeline 不仅仅可以替代 Logstash 过滤器:你可以定义复杂的管道,使用多个处理器(一个特定的处理器甚至允许你调用另一个管道)、条件等。有关 ingest pipeline 的更多文章,请参阅 “Elastic:开发者上手指南” 文章中的 “Ingest pipeline” 章节。
在我看来,本文末尾看到的 “虚拟索引” 非常有趣。 包含创建这样一个并非真正的索引的索引只是为了创建路由管道的入口点的功能甚至可以成为 Elasticsearch 的一个新的和有用的功能,为什么不呢?
收起阅读 »
Elasticsearch:如何在 Elasticsearch 中正确使用同义词功能
原文地址 elasticstack.blog.csdn.net
同义词用于提高搜索质量并扩大匹配范围。 例如,搜索 England 的用户可能希望找到包含 British 或 UK 的文档,尽管这三个词完全不同。
Elasticsearch 中的同义词功能非常强大,如果实施得当,可以使你的搜索引擎更加健壮和强大。 在本文中,我们将通过简单的代码片段介绍在实践中实现同义词功能的要点。 特别是,我们将介绍如何更新现有索引的同义词,这是一个相对高级的话题。
在今天的展示中,我将使用最新的 Elastic Stack 8.6.0,尽管版本不对我们的展示有任何的影响。
准备
我们将使用 Docker 在本地启动一个 Elasticsearch 服务器,并使用 Kibana 来管理索引和运行命令。 如果你以前从未使用过 Elasticsearch 或想快速复习一下,这篇文章可能会对你有所帮助。 如果你在 Docker 中运行 Elasticsearch 时遇到问题,这篇文章很可能会帮助你解决问题。在今天的文章中,我们尝试使用 docker 来部署一个没有安全功能的 Elasticsearch 集群。
准备就绪后,让我们开始探索 Elasticsearch 中的同义词功能的旅程。
我们将在本文中使用的 docker-compose.yaml 文件包含以下内容,稍后我们将向其添加更多功能:
docker-compose.yml
`
1. version: "3.9"
2. services:
3. elasticsearch:
4. image: elasticsearch:8.6.0
5. environment:
6. - discovery.type=single-node
7. - ES_JAVA_OPTS=-Xms1g -Xmx1g
8. - xpack.security.enabled=false
9. volumes:
10. - type: volume
11. source: es_data
12. target: /usr/share/elasticsearch/data
13. ports:
14. - target: 9200
15. published: 9200
16. networks:
17. - elastic
19. kibana:
20. image: kibana:8.6.0
21. ports:
22. - target: 5601
23. published: 5601
24. depends_on:
25. - elasticsearch
26. networks:
27. - elastic
29. volumes:
30. es_data:
31. driver: local
33. networks:
34. elastic:
35. name: elastic
36. driver: bridge
`
你可以使用以下命令之一启动 Elasticsearch 和 Kibana:
docker-compose up
或者
docker-compose up -d
如果加上 -d 选项的话,Elasticsearch 会以 daemon 的形式来运行。上面是一种最为简单的方式来启动 Elasticsearch 集群及 Kibana。由于它没有设置安全,我们无需输入任何凭证就可以直接进入到 Kibana 了。
使用带有同义词列表的标准同义词 token 过滤器
让我们首先使用带有同义词列表的标准同义词标记过滤器创建一个索引。 在 Kibana 中运行以下命令,我们将在稍后解释详细信息:
`
1. PUT synonyms
2. {
3. "settings": {
4. "index": {
5. "analysis": {
6. "analyzer": {
7. "index_analyzer": {
8. "tokenizer": "standard",
9. "filter": [
10. "lowercase",
11. "synonym_filter"
12. ]
13. }
14. },
15. "filter": {
16. "synonym_filter": {
17. "type": "synonym",
18. "synonyms": [
19. "elk => Elastic Stack",
20. "elkb => Elastic Stack"
21. ]
22. }
23. }
24. }
25. }
26. },
27. "mappings": {
28. "properties": {
29. "name": {
30. "type": "text",
31. "analyzer": "index_analyzer"
32. }
33. }
34. }
35. }
`
这里的要点:
- 请注意设置键的嵌套级别。 settings => index => analysis => analyzer/filter 都是内置关键字。 但是,index_analyzer 和 synonym_filter 分别是自定义分析器和过滤器的自定义名称。
- 我们需要创建一个 type 为 synonym 的自定义过滤器。 synonym 选项明确提供了同义词列表。 这通常应该只用于测试,因为更新同义词列表不方便,我们稍后会看到。
- 本文中使用了 Solr 同义词。 对于此示例,使用了显式映射,这意味着 => 左侧的标记将替换为右侧的标记。 稍后我们将使用等同的同义词,这意味着提供的 token 被等同对待。
- synonym_filter 添加到名为 index_analyzer 的新自定义分析器的过滤器列表中。 通常过滤器的顺序很重要。 然而,对于同义词过滤器来说,它有点特殊,可能会让我们中的许多人感到惊讶。 在此示例中,即使 synonym_filter 过滤器放在小写过滤器之后,此过滤器返回的标记也会传递给小写过滤器,因此也会变成小写。 因此,你不需要在同义词列表或同义词文件中提供小写 token。
- 最后,在文档的映射中,为名称字段指定了自定义分析器。
我们知道在早期的 Elastic 产品中 elk 就是 Elastic Stack 的代名词。之后随着 Beats 的加入,很多开发者也把 elkb 当做 Elastic Stack 的代名词。要测试在索引中创建的分析器,我们可以调用 _analyze 端点:
1. GET /synonyms/_analyze
2. {
3. "analyzer": "index_analyzer",
4. "text": "elk is powerful"
5. }
上面命令的输出为:
`
1. {
2. "tokens": [
3. {
4. "token": "elastic",
5. "start_offset": 0,
6. "end_offset": 3,
7. "type": "SYNONYM",
8. "position": 0
9. },
10. {
11. "token": "is",
12. "start_offset": 4,
13. "end_offset": 6,
14. "type": "<ALPHANUM>",
15. "position": 1
16. },
17. {
18. "token": "stack",
19. "start_offset": 4,
20. "end_offset": 6,
21. "type": "SYNONYM",
22. "position": 1
23. },
24. {
25. "token": "powerful",
26. "start_offset": 7,
27. "end_offset": 15,
28. "type": "<ALPHANUM>",
29. "position": 2
30. }
31. ]
32. }
`
从上面的输出中,我们可以看到 type 为 SNONYM 的 token 为 elastic 及 stack。让我们向索引中添加一些文档并测试它在搜索中是否正常工作:
1. PUT /synonyms/_doc/1
2. {
3. "name": "elk is very powerful"
4. }
6. PUT /synonyms/_doc/2
7. {
8. "name": "elkb is useful"
9. }
11. PUT /synonyms/_doc/3
12. {
13. "name": "Elastic Stack is so widely used"
14. }
我们可以使用 match 关键字进行简单的搜索:
1. GET /synonyms/_search?filter_path=**.hits
2. {
3. "query": {
4. "match": {
5. "name": "elk"
6. }
7. }
8. }
如果没有问题,所有三个文件都应该被搜索到:
`
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "synonyms",
6. "_id": "2",
7. "_score": 0.31931418,
8. "_source": {
9. "name": "elkb is useful"
10. }
11. },
12. {
13. "_index": "synonyms",
14. "_id": "1",
15. "_score": 0.29086044,
16. "_source": {
17. "name": "elk is very powerful"
18. }
19. },
20. {
21. "_index": "synonyms",
22. "_id": "3",
23. "_score": 0.24686477,
24. "_source": {
25. "name": "Elastic Stack is so widely used"
26. }
27. }
28. ]
29. }
30. }
`
索引时间 vs 搜索时间进行同义词操作
如你所见,在上面的示例中,只创建了一个分析器,它用于索引和搜索。
不鼓励在索引(indexing)步骤中对所有文档应用同义词,因为它有一些主要缺点:
- 如果不重新索引所有内容,就无法更新同义词列表,这在实践中是非常低效的。
- 搜索分数会受到影响,因为同义词 token 也会被计算在内。
- 索引过程变得更加耗时并且索引将变得更大。 对于小数据集来说可以忽略不计,但对于大数据集来说非常重要。
因此,最好在搜索步骤中只应用同义词,这样可以克服所有三个缺点。 为此,我们需要创建一个用于搜索的新分析器。
使用 search_analyzer 并应用搜索时间同义词
在 Kibana 中运行以下命令以创建具有搜索时同义词的新索引:
`
1. PUT synonym_graph
2. {
3. "settings": {
4. "index": {
5. "analysis": {
6. "analyzer": {
7. "index_analyzer": {
8. "tokenizer": "standard",
9. "filter": [
10. "lowercase"
11. ]
12. },
13. "search_analyzer": {
14. "tokenizer": "standard",
15. "filter": [
16. "lowercase",
17. "synonym_filter"
18. ]
19. }
20. },
21. "filter": {
22. "synonym_filter": {
23. "type": "synonym_graph",
24. "synonyms": [
25. "elk => Elastic Stack",
26. "elkb => Elastic Stack"
27. ]
28. }
29. }
30. }
31. }
32. },
33. "mappings": {
34. "properties": {
35. "name": {
36. "type": "text",
37. "analyzer": "index_analyzer",
38. "search_analyzer": "search_analyzer"
39. }
40. }
41. }
42. }
`
关键点:
- 该类型现在更改为 synonym_graph,这是一个更复杂的同义词过滤器,旨在仅用作搜索分析器的一部分。 它可以更恰当地处理多词同义词,推荐用于搜索时分析。 但是,你可以继续使用原来的 synonym 类型,它在这篇文章中的表现是一样的。
- 同义词过滤器从索引时间分析器中删除并添加到搜索时间分析器中。
- search_analyzer 是为 name 字段明确指定的。 如果未指定,则相同的分析器 (index_analyzer) 将用于索引和搜索。
分析器应该返回与以前相同的 token。 然而,当你用这些命令为三个文档建立索引并再次执行相同的搜索后,结果会有所不同:
1. PUT /synonym_graph/_doc/1
2. {
3. "name": "elk is very powerful"
4. }
6. PUT /synonym_graph/_doc/2
7. {
8. "name": "elkb is useful"
9. }
11. PUT /synonym_graph/_doc/3
12. {
13. "name": "Elastic Stack is so widely used"
14. }
我们使用如下的命令来进行搜索:
1. GET /synonym_graph/_search?filter_path=**.hits
2. {
3. "query": {
4. "match": {
5. "name": "elk"
6. }
7. }
8. }
这一次,只有如下的结果返回。甚至 “elk is very powerful” 这个文档也没有被返回:
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "synonym_graph",
6. "_id": "3",
7. "_score": 2.3589978,
8. "_source": {
9. "name": "Elastic Stack is so widely used"
10. }
11. }
12. ]
13. }
14. }
原因是同义词过滤器仅在搜索时应用。 搜索查询 elk 被替换为同义词标记 “Elastic Stack”。 然而,索引中的文档没有被同义词过滤器(synonym_filter)过滤,因此 “elk” 只是被标记为 elk 而没有被 Elastic Stack 替换。 类似于 elkb。 结果,只能匹配 “Elastic Stack is so widely used”。
为了使其像前面的示例一样正常工作,我们需要将同义词规则从显式映射更改为等效同义词。 让我们按如下方式更新同义词过滤器:
1. ......
2. "filter": {
3. "synonym_filter": {
4. "type": "synonym_graph",
5. "synonyms": [
6. "elk, elkb, Elastic Stack"
7. ]
8. }
9. }
10. ......
要更改现有索引的同义词,我们可以重新创建索引并重新索引所有文档,这是愚蠢且低效的。
更好的方法是更新索引的设置。 但是,我们需要在更新设置之前关闭索引,然后重新打开它才能访问它:
1. POST /synonym_graph/_close
4. PUT /synonym_graph/_settings
5. {
6. "settings": {
7. "index.analysis.filter.synonym_filter.synonyms": [
8. "elk, elkb, Elastic Stack"
9. ]
10. }
11. }
13. POST /synonym_graph/_open
请注意更新索引设置的特殊语法。
运行上述命令后,我们可以通过如下命令的返回值来进行验证:
GET synonym_graph
上面的命令返回:
`
1. {
2. "synonym_graph": {
3. "aliases": {},
4. "mappings": {
5. "properties": {
6. "name": {
7. "type": "text",
8. "analyzer": "index_analyzer",
9. "search_analyzer": "search_analyzer"
10. }
11. }
12. },
13. "settings": {
14. "index": {
15. "routing": {
16. "allocation": {
17. "include": {
18. "_tier_preference": "data_content"
19. }
20. }
21. },
22. "number_of_shards": "1",
23. "provided_name": "synonym_graph",
24. "creation_date": "1673501061514",
25. "analysis": {
26. "filter": {
27. "synonym_filter": {
28. "type": "synonym_graph",
29. "synonyms": [
30. "elk, elkb, Elastic Stack"
31. ]
32. }
33. },
34. "analyzer": {
35. "index_analyzer": {
36. "filter": [
37. "lowercase"
38. ],
39. "tokenizer": "standard"
40. },
41. "search_analyzer": {
42. "filter": [
43. "lowercase",
44. "synonym_filter"
45. ],
46. "tokenizer": "standard"
47. }
48. }
49. },
50. "number_of_replicas": "1",
51. "uuid": "UCIWtpQMTsCc1TwnvsywHA",
52. "version": {
53. "created": "8060099"
54. }
55. }
56. }
57. }
58. }
`
让我们使用 _analyzer 端点测试 search_analyzer 并查看生成的 token:
1. GET /synonym_graph/_analyze
2. {
3. "analyzer": "search_analyzer",
4. "text": "elk"
5. }
上述命令返回:
它表明 elk 搜索查询被三个同义词的 token 替换和扩展(由 expand 选项控制)。 它还证明,如果在索引时应用等效同义词,则结果索引的大小可以显着增加。
然后当我们再次执行相同的搜索时:
1. GET /synonym_graph/_search?filter_path=**.hits
2. {
3. "query": {
4. "match": {
5. "name": "elk"
6. }
7. }
8. }
这次搜索的结果是:
`
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "synonym_graph",
6. "_id": "3",
7. "_score": 1.6949677,
8. "_source": {
9. "name": "Elastic Stack is so widely used"
10. }
11. },
12. {
13. "_index": "synonym_graph",
14. "_id": "2",
15. "_score": 1.1220688,
16. "_source": {
17. "name": "elkb is useful"
18. }
19. },
20. {
21. "_index": "synonym_graph",
22. "_id": "1",
23. "_score": 1.0126972,
24. "_source": {
25. "name": "elk is very powerful"
26. }
27. }
28. ]
29. }
30. }
`
我们可以看到三个文档都被搜索出来了。
使用同义词文件
上面我们一直在创建索引时直接指定同义词列表。 但是,当你有大量同义词时,将它们全部添加到索引中会很麻烦。 更好的方法是将它们存储在一个文件中,然后动态地将它们加载到索引中。 使用同义词文件有很多好处,其中包括:
- 方便维护大量的同义词。
- 可以被不同的索引使用。
- 可以在不关闭索引的情况下动态重新加载。
首先,我们需要先将同义词放入一个文件中。 每行都是一个同义词规则,与上面演示的相同。 更多细节可以在官方文档中找到。
我们将创建的同义词文件称为 synonyms.txt,但可以任意命名。 它具有以下内容:
1. $ pwd
2. /Users/liuxg/data/docker8
3. $ ls
4. docker-compose.yml synonyms.txt
5. $ cat synonyms.txt
6. # This is a comment! The file is named synonyms.txt.
7. elk,elkb,Elastic Stack
然后我们需要将同义词文件绑定到 Docker 容器中。 更新 docker-compose.yaml 如下:
docker-compose.yml
`
1. version: "3.9"
2. services:
3. elasticsearch:
4. image: elasticsearch:8.6.0
5. environment:
6. - discovery.type=single-node
7. - ES_JAVA_OPTS=-Xms1g -Xmx1g
8. - xpack.security.enabled=false
9. volumes:
10. - type: volume
11. source: es_data
12. target: /usr/share/elasticsearch/data
13. - type: bind
14. source: ./synonyms.txt
15. target: /usr/share/elasticsearch/config/synonyms.txt
16. ports:
17. - target: 9200
18. published: 9200
19. networks:
20. - elastic
22. kibana:
23. image: kibana:8.6.0
24. ports:
25. - target: 5601
26. published: 5601
27. depends_on:
28. - elasticsearch
29. networks:
30. - elastic
32. volumes:
33. es_data:
34. driver: local
36. networks:
37. elastic:
38. name: elastic
39. driver: bridge
`
我们可以使用 CTRL+C 来终止之前运行的 docker,然后再次使用如下命令来启动:
docker-compose up
请注意,同义词文件已加载到容器中的 config 文件夹中。你可以进入容器并使用以下两个命令之一检查它:
1. # User docker
2. docker exec -it elasticsearch-1 bash
4. # User docker-compose
5. docker-compose exec elasticsearch bash
现在我们需要停止并重新启动服务以使更改生效。 请注意,仅重新启动服务将不起作用。
1. docker-compose stop elasticsearch
2. docker-compose up -d elasticsearch
1. $ docker ps
2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3. 3ae4b728dd44 kibana:8.6.0 "/bin/tini -- /usr/l…" 23 seconds ago Up 21 seconds 0.0.0.0:5601->5601/tcp docker8-kibana-1
4. 878c82384761 elasticsearch:8.6.0 "/bin/tini -- /usr/l…" 23 seconds ago Up 22 seconds 0.0.0.0:9200->9200/tcp, 9300/tcp docker8-elasticsearch-1
5. $ docker exec -it docker8-elasticsearch-1 bash
6. elasticsearch@878c82384761:~$ pwd
7. /usr/share/elasticsearch
8. elasticsearch@878c82384761:~$ ls
9. LICENSE.txt NOTICE.txt README.asciidoc bin config data jdk lib logs modules plugins
10. elasticsearch@878c82384761:~$ cd config/
11. elasticsearch@878c82384761:~/config$ ls
12. elasticsearch-plugins.example.yml jvm.options log4j2.properties synonyms.txt
13. elasticsearch.keystore jvm.options.d role_mapping.yml users
14. elasticsearch.yml log4j2.file.properties roles.yml users_roles
从上面的输出中,我们可以看到 synonyms.txt 已经被成功地加载到容器里了。
然后我们可以使用同义词文件创建一个新索引:
`
1. PUT /synonym_graph_file
2. {
3. "settings": {
4. "index": {
5. "analysis": {
6. "analyzer": {
7. "index_analyzer": {
8. "tokenizer": "standard",
9. "filter": [
10. "lowercase"
11. ]
12. },
13. "search_analyzer": {
14. "tokenizer": "standard",
15. "filter": [
16. "lowercase",
17. "synonym_filter"
18. ]
19. }
20. },
21. "filter": {
22. "synonym_filter": {
23. "type": "synonym_graph",
24. "synonyms_path": "synonyms.txt",
25. "updateable": true
26. }
27. }
28. }
29. }
30. },
31. "mappings": {
32. "properties": {
33. "name": {
34. "type": "text",
35. "analyzer": "index_analyzer",
36. "search_analyzer": "search_analyzer"
37. }
38. }
39. }
40. }
`
关键点:
- 对于 synonyms_path,它是同义词文件相对于 Elasticsearch 服务器中 config 文件夹的路径。
- 添加了一个新的 updateable 字段,它指定相应的过滤器是否可更新。 我们很快就会看到如何在不关闭和打开索引的情况下重新加载搜索分析器。
这个新索引 synonym_graph_file 的行为应该与前一个 synonym_graph 的行为相同。
现在让我们在同义词文件中添加更多的同义词,其内容如下:
1. $ pwd
2. /Users/liuxg/data/docker8
3. $ ls
4. docker-compose.yml synonyms.txt
5. $ cat synonyms.txt
6. # This is a comment! The file is named synonyms.txt.
7. elk,elkb,Elastic Stack
8. JS => JavaScript
9. TS => TypeScript
10. Py => Python
添加同义词后,我们可以关闭并打开索引使其生效。 然而,由于我们将同义词过滤器标记为可更新,我们可以重新加载搜索分析器以使更改立即生效,而无需关闭索引,因此无需停机。
要重新加载索引的搜索分析器,我们需要调用 _reload_search_analyzers 端点:
POST /synonym_graph_file/_reload_search_analyzers
上面的命令输出为:
`
1. {
2. "_shards": {
3. "total": 2,
4. "successful": 1,
5. "failed": 0
6. },
7. "reload_details": [
8. {
9. "index": "synonym_graph_file",
10. "reloaded_analyzers": [
11. "search_analyzer"
12. ],
13. "reloaded_node_ids": [
14. "tZLy82KRTaiCdpsbkEYnuA"
15. ]
16. }
17. ]
18. }
`
现在,当我们分析 JS 字符串时,我们将看到返回的 javascript token。
1. GET /synonym_graph_file/_analyze
2. {
3. "analyzer": "search_analyzer",
4. "text": "JS"
5. }
上面的命令返回:
1. {
2. "tokens": [
3. {
4. "token": "javascript",
5. "start_offset": 0,
6. "end_offset": 2,
7. "type": "SYNONYM",
8. "position": 0
9. }
10. ]
11. }
这里应该注意两件重要的事情:
- 如果同义词过滤器的 updateable 设置为true,那么对应的分析器只能作为 search_analyzer 使用,不能用于索引,即使类型是同义词。
- updateable 选项只能在同义词文件与 synonyms_path 选项一起使用时使用,而不是在同义词直接通过 synonyms 选项提供时使用。
恭喜你到达这里! 我们已经涵盖了在 Elasticsearch 中使用同义词功能的所有要点。
我们已经分别介绍了如何在索引时间和搜索时间分析步骤中使用同义词。 此外,还介绍了如何直接提供同义词列表,以及如何通过文件提供。 最后但同样重要的是,介绍了关于如何更新现有索引的同义词列表的不同方法。 建议重新加载索引的搜索分析器,因为它不会给服务带来停机时间。
原文地址 elasticstack.blog.csdn.net
同义词用于提高搜索质量并扩大匹配范围。 例如,搜索 England 的用户可能希望找到包含 British 或 UK 的文档,尽管这三个词完全不同。
Elasticsearch 中的同义词功能非常强大,如果实施得当,可以使你的搜索引擎更加健壮和强大。 在本文中,我们将通过简单的代码片段介绍在实践中实现同义词功能的要点。 特别是,我们将介绍如何更新现有索引的同义词,这是一个相对高级的话题。
在今天的展示中,我将使用最新的 Elastic Stack 8.6.0,尽管版本不对我们的展示有任何的影响。
准备
我们将使用 Docker 在本地启动一个 Elasticsearch 服务器,并使用 Kibana 来管理索引和运行命令。 如果你以前从未使用过 Elasticsearch 或想快速复习一下,这篇文章可能会对你有所帮助。 如果你在 Docker 中运行 Elasticsearch 时遇到问题,这篇文章很可能会帮助你解决问题。在今天的文章中,我们尝试使用 docker 来部署一个没有安全功能的 Elasticsearch 集群。
准备就绪后,让我们开始探索 Elasticsearch 中的同义词功能的旅程。
我们将在本文中使用的 docker-compose.yaml 文件包含以下内容,稍后我们将向其添加更多功能:
docker-compose.yml
`
1. version: "3.9"
2. services:
3. elasticsearch:
4. image: elasticsearch:8.6.0
5. environment:
6. - discovery.type=single-node
7. - ES_JAVA_OPTS=-Xms1g -Xmx1g
8. - xpack.security.enabled=false
9. volumes:
10. - type: volume
11. source: es_data
12. target: /usr/share/elasticsearch/data
13. ports:
14. - target: 9200
15. published: 9200
16. networks:
17. - elastic
19. kibana:
20. image: kibana:8.6.0
21. ports:
22. - target: 5601
23. published: 5601
24. depends_on:
25. - elasticsearch
26. networks:
27. - elastic
29. volumes:
30. es_data:
31. driver: local
33. networks:
34. elastic:
35. name: elastic
36. driver: bridge
`
你可以使用以下命令之一启动 Elasticsearch 和 Kibana:
docker-compose up
或者
docker-compose up -d
如果加上 -d 选项的话,Elasticsearch 会以 daemon 的形式来运行。上面是一种最为简单的方式来启动 Elasticsearch 集群及 Kibana。由于它没有设置安全,我们无需输入任何凭证就可以直接进入到 Kibana 了。
使用带有同义词列表的标准同义词 token 过滤器
让我们首先使用带有同义词列表的标准同义词标记过滤器创建一个索引。 在 Kibana 中运行以下命令,我们将在稍后解释详细信息:
`
1. PUT synonyms
2. {
3. "settings": {
4. "index": {
5. "analysis": {
6. "analyzer": {
7. "index_analyzer": {
8. "tokenizer": "standard",
9. "filter": [
10. "lowercase",
11. "synonym_filter"
12. ]
13. }
14. },
15. "filter": {
16. "synonym_filter": {
17. "type": "synonym",
18. "synonyms": [
19. "elk => Elastic Stack",
20. "elkb => Elastic Stack"
21. ]
22. }
23. }
24. }
25. }
26. },
27. "mappings": {
28. "properties": {
29. "name": {
30. "type": "text",
31. "analyzer": "index_analyzer"
32. }
33. }
34. }
35. }
`
这里的要点:
- 请注意设置键的嵌套级别。 settings => index => analysis => analyzer/filter 都是内置关键字。 但是,index_analyzer 和 synonym_filter 分别是自定义分析器和过滤器的自定义名称。
- 我们需要创建一个 type 为 synonym 的自定义过滤器。 synonym 选项明确提供了同义词列表。 这通常应该只用于测试,因为更新同义词列表不方便,我们稍后会看到。
- 本文中使用了 Solr 同义词。 对于此示例,使用了显式映射,这意味着 => 左侧的标记将替换为右侧的标记。 稍后我们将使用等同的同义词,这意味着提供的 token 被等同对待。
- synonym_filter 添加到名为 index_analyzer 的新自定义分析器的过滤器列表中。 通常过滤器的顺序很重要。 然而,对于同义词过滤器来说,它有点特殊,可能会让我们中的许多人感到惊讶。 在此示例中,即使 synonym_filter 过滤器放在小写过滤器之后,此过滤器返回的标记也会传递给小写过滤器,因此也会变成小写。 因此,你不需要在同义词列表或同义词文件中提供小写 token。
- 最后,在文档的映射中,为名称字段指定了自定义分析器。
我们知道在早期的 Elastic 产品中 elk 就是 Elastic Stack 的代名词。之后随着 Beats 的加入,很多开发者也把 elkb 当做 Elastic Stack 的代名词。要测试在索引中创建的分析器,我们可以调用 _analyze 端点:
1. GET /synonyms/_analyze
2. {
3. "analyzer": "index_analyzer",
4. "text": "elk is powerful"
5. }
上面命令的输出为:
`
1. {
2. "tokens": [
3. {
4. "token": "elastic",
5. "start_offset": 0,
6. "end_offset": 3,
7. "type": "SYNONYM",
8. "position": 0
9. },
10. {
11. "token": "is",
12. "start_offset": 4,
13. "end_offset": 6,
14. "type": "<ALPHANUM>",
15. "position": 1
16. },
17. {
18. "token": "stack",
19. "start_offset": 4,
20. "end_offset": 6,
21. "type": "SYNONYM",
22. "position": 1
23. },
24. {
25. "token": "powerful",
26. "start_offset": 7,
27. "end_offset": 15,
28. "type": "<ALPHANUM>",
29. "position": 2
30. }
31. ]
32. }
`
从上面的输出中,我们可以看到 type 为 SNONYM 的 token 为 elastic 及 stack。让我们向索引中添加一些文档并测试它在搜索中是否正常工作:
1. PUT /synonyms/_doc/1
2. {
3. "name": "elk is very powerful"
4. }
6. PUT /synonyms/_doc/2
7. {
8. "name": "elkb is useful"
9. }
11. PUT /synonyms/_doc/3
12. {
13. "name": "Elastic Stack is so widely used"
14. }
我们可以使用 match 关键字进行简单的搜索:
1. GET /synonyms/_search?filter_path=**.hits
2. {
3. "query": {
4. "match": {
5. "name": "elk"
6. }
7. }
8. }
如果没有问题,所有三个文件都应该被搜索到:
`
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "synonyms",
6. "_id": "2",
7. "_score": 0.31931418,
8. "_source": {
9. "name": "elkb is useful"
10. }
11. },
12. {
13. "_index": "synonyms",
14. "_id": "1",
15. "_score": 0.29086044,
16. "_source": {
17. "name": "elk is very powerful"
18. }
19. },
20. {
21. "_index": "synonyms",
22. "_id": "3",
23. "_score": 0.24686477,
24. "_source": {
25. "name": "Elastic Stack is so widely used"
26. }
27. }
28. ]
29. }
30. }
`
索引时间 vs 搜索时间进行同义词操作
如你所见,在上面的示例中,只创建了一个分析器,它用于索引和搜索。
不鼓励在索引(indexing)步骤中对所有文档应用同义词,因为它有一些主要缺点:
- 如果不重新索引所有内容,就无法更新同义词列表,这在实践中是非常低效的。
- 搜索分数会受到影响,因为同义词 token 也会被计算在内。
- 索引过程变得更加耗时并且索引将变得更大。 对于小数据集来说可以忽略不计,但对于大数据集来说非常重要。
因此,最好在搜索步骤中只应用同义词,这样可以克服所有三个缺点。 为此,我们需要创建一个用于搜索的新分析器。
使用 search_analyzer 并应用搜索时间同义词
在 Kibana 中运行以下命令以创建具有搜索时同义词的新索引:
`
1. PUT synonym_graph
2. {
3. "settings": {
4. "index": {
5. "analysis": {
6. "analyzer": {
7. "index_analyzer": {
8. "tokenizer": "standard",
9. "filter": [
10. "lowercase"
11. ]
12. },
13. "search_analyzer": {
14. "tokenizer": "standard",
15. "filter": [
16. "lowercase",
17. "synonym_filter"
18. ]
19. }
20. },
21. "filter": {
22. "synonym_filter": {
23. "type": "synonym_graph",
24. "synonyms": [
25. "elk => Elastic Stack",
26. "elkb => Elastic Stack"
27. ]
28. }
29. }
30. }
31. }
32. },
33. "mappings": {
34. "properties": {
35. "name": {
36. "type": "text",
37. "analyzer": "index_analyzer",
38. "search_analyzer": "search_analyzer"
39. }
40. }
41. }
42. }
`
关键点:
- 该类型现在更改为 synonym_graph,这是一个更复杂的同义词过滤器,旨在仅用作搜索分析器的一部分。 它可以更恰当地处理多词同义词,推荐用于搜索时分析。 但是,你可以继续使用原来的 synonym 类型,它在这篇文章中的表现是一样的。
- 同义词过滤器从索引时间分析器中删除并添加到搜索时间分析器中。
- search_analyzer 是为 name 字段明确指定的。 如果未指定,则相同的分析器 (index_analyzer) 将用于索引和搜索。
分析器应该返回与以前相同的 token。 然而,当你用这些命令为三个文档建立索引并再次执行相同的搜索后,结果会有所不同:
1. PUT /synonym_graph/_doc/1
2. {
3. "name": "elk is very powerful"
4. }
6. PUT /synonym_graph/_doc/2
7. {
8. "name": "elkb is useful"
9. }
11. PUT /synonym_graph/_doc/3
12. {
13. "name": "Elastic Stack is so widely used"
14. }
我们使用如下的命令来进行搜索:
1. GET /synonym_graph/_search?filter_path=**.hits
2. {
3. "query": {
4. "match": {
5. "name": "elk"
6. }
7. }
8. }
这一次,只有如下的结果返回。甚至 “elk is very powerful” 这个文档也没有被返回:
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "synonym_graph",
6. "_id": "3",
7. "_score": 2.3589978,
8. "_source": {
9. "name": "Elastic Stack is so widely used"
10. }
11. }
12. ]
13. }
14. }
原因是同义词过滤器仅在搜索时应用。 搜索查询 elk 被替换为同义词标记 “Elastic Stack”。 然而,索引中的文档没有被同义词过滤器(synonym_filter)过滤,因此 “elk” 只是被标记为 elk 而没有被 Elastic Stack 替换。 类似于 elkb。 结果,只能匹配 “Elastic Stack is so widely used”。
为了使其像前面的示例一样正常工作,我们需要将同义词规则从显式映射更改为等效同义词。 让我们按如下方式更新同义词过滤器:
1. ......
2. "filter": {
3. "synonym_filter": {
4. "type": "synonym_graph",
5. "synonyms": [
6. "elk, elkb, Elastic Stack"
7. ]
8. }
9. }
10. ......
要更改现有索引的同义词,我们可以重新创建索引并重新索引所有文档,这是愚蠢且低效的。
更好的方法是更新索引的设置。 但是,我们需要在更新设置之前关闭索引,然后重新打开它才能访问它:
1. POST /synonym_graph/_close
4. PUT /synonym_graph/_settings
5. {
6. "settings": {
7. "index.analysis.filter.synonym_filter.synonyms": [
8. "elk, elkb, Elastic Stack"
9. ]
10. }
11. }
13. POST /synonym_graph/_open
请注意更新索引设置的特殊语法。
运行上述命令后,我们可以通过如下命令的返回值来进行验证:
GET synonym_graph
上面的命令返回:
`
1. {
2. "synonym_graph": {
3. "aliases": {},
4. "mappings": {
5. "properties": {
6. "name": {
7. "type": "text",
8. "analyzer": "index_analyzer",
9. "search_analyzer": "search_analyzer"
10. }
11. }
12. },
13. "settings": {
14. "index": {
15. "routing": {
16. "allocation": {
17. "include": {
18. "_tier_preference": "data_content"
19. }
20. }
21. },
22. "number_of_shards": "1",
23. "provided_name": "synonym_graph",
24. "creation_date": "1673501061514",
25. "analysis": {
26. "filter": {
27. "synonym_filter": {
28. "type": "synonym_graph",
29. "synonyms": [
30. "elk, elkb, Elastic Stack"
31. ]
32. }
33. },
34. "analyzer": {
35. "index_analyzer": {
36. "filter": [
37. "lowercase"
38. ],
39. "tokenizer": "standard"
40. },
41. "search_analyzer": {
42. "filter": [
43. "lowercase",
44. "synonym_filter"
45. ],
46. "tokenizer": "standard"
47. }
48. }
49. },
50. "number_of_replicas": "1",
51. "uuid": "UCIWtpQMTsCc1TwnvsywHA",
52. "version": {
53. "created": "8060099"
54. }
55. }
56. }
57. }
58. }
`
让我们使用 _analyzer 端点测试 search_analyzer 并查看生成的 token:
1. GET /synonym_graph/_analyze
2. {
3. "analyzer": "search_analyzer",
4. "text": "elk"
5. }
上述命令返回:
它表明 elk 搜索查询被三个同义词的 token 替换和扩展(由 expand 选项控制)。 它还证明,如果在索引时应用等效同义词,则结果索引的大小可以显着增加。
然后当我们再次执行相同的搜索时:
1. GET /synonym_graph/_search?filter_path=**.hits
2. {
3. "query": {
4. "match": {
5. "name": "elk"
6. }
7. }
8. }
这次搜索的结果是:
`
1. {
2. "hits": {
3. "hits": [
4. {
5. "_index": "synonym_graph",
6. "_id": "3",
7. "_score": 1.6949677,
8. "_source": {
9. "name": "Elastic Stack is so widely used"
10. }
11. },
12. {
13. "_index": "synonym_graph",
14. "_id": "2",
15. "_score": 1.1220688,
16. "_source": {
17. "name": "elkb is useful"
18. }
19. },
20. {
21. "_index": "synonym_graph",
22. "_id": "1",
23. "_score": 1.0126972,
24. "_source": {
25. "name": "elk is very powerful"
26. }
27. }
28. ]
29. }
30. }
`
我们可以看到三个文档都被搜索出来了。
使用同义词文件
上面我们一直在创建索引时直接指定同义词列表。 但是,当你有大量同义词时,将它们全部添加到索引中会很麻烦。 更好的方法是将它们存储在一个文件中,然后动态地将它们加载到索引中。 使用同义词文件有很多好处,其中包括:
- 方便维护大量的同义词。
- 可以被不同的索引使用。
- 可以在不关闭索引的情况下动态重新加载。
首先,我们需要先将同义词放入一个文件中。 每行都是一个同义词规则,与上面演示的相同。 更多细节可以在官方文档中找到。
我们将创建的同义词文件称为 synonyms.txt,但可以任意命名。 它具有以下内容:
1. $ pwd
2. /Users/liuxg/data/docker8
3. $ ls
4. docker-compose.yml synonyms.txt
5. $ cat synonyms.txt
6. # This is a comment! The file is named synonyms.txt.
7. elk,elkb,Elastic Stack
然后我们需要将同义词文件绑定到 Docker 容器中。 更新 docker-compose.yaml 如下:
docker-compose.yml
`
1. version: "3.9"
2. services:
3. elasticsearch:
4. image: elasticsearch:8.6.0
5. environment:
6. - discovery.type=single-node
7. - ES_JAVA_OPTS=-Xms1g -Xmx1g
8. - xpack.security.enabled=false
9. volumes:
10. - type: volume
11. source: es_data
12. target: /usr/share/elasticsearch/data
13. - type: bind
14. source: ./synonyms.txt
15. target: /usr/share/elasticsearch/config/synonyms.txt
16. ports:
17. - target: 9200
18. published: 9200
19. networks:
20. - elastic
22. kibana:
23. image: kibana:8.6.0
24. ports:
25. - target: 5601
26. published: 5601
27. depends_on:
28. - elasticsearch
29. networks:
30. - elastic
32. volumes:
33. es_data:
34. driver: local
36. networks:
37. elastic:
38. name: elastic
39. driver: bridge
`
我们可以使用 CTRL+C 来终止之前运行的 docker,然后再次使用如下命令来启动:
docker-compose up
请注意,同义词文件已加载到容器中的 config 文件夹中。你可以进入容器并使用以下两个命令之一检查它:
1. # User docker
2. docker exec -it elasticsearch-1 bash
4. # User docker-compose
5. docker-compose exec elasticsearch bash
现在我们需要停止并重新启动服务以使更改生效。 请注意,仅重新启动服务将不起作用。
1. docker-compose stop elasticsearch
2. docker-compose up -d elasticsearch
1. $ docker ps
2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
3. 3ae4b728dd44 kibana:8.6.0 "/bin/tini -- /usr/l…" 23 seconds ago Up 21 seconds 0.0.0.0:5601->5601/tcp docker8-kibana-1
4. 878c82384761 elasticsearch:8.6.0 "/bin/tini -- /usr/l…" 23 seconds ago Up 22 seconds 0.0.0.0:9200->9200/tcp, 9300/tcp docker8-elasticsearch-1
5. $ docker exec -it docker8-elasticsearch-1 bash
6. elasticsearch@878c82384761:~$ pwd
7. /usr/share/elasticsearch
8. elasticsearch@878c82384761:~$ ls
9. LICENSE.txt NOTICE.txt README.asciidoc bin config data jdk lib logs modules plugins
10. elasticsearch@878c82384761:~$ cd config/
11. elasticsearch@878c82384761:~/config$ ls
12. elasticsearch-plugins.example.yml jvm.options log4j2.properties synonyms.txt
13. elasticsearch.keystore jvm.options.d role_mapping.yml users
14. elasticsearch.yml log4j2.file.properties roles.yml users_roles
从上面的输出中,我们可以看到 synonyms.txt 已经被成功地加载到容器里了。
然后我们可以使用同义词文件创建一个新索引:
`
1. PUT /synonym_graph_file
2. {
3. "settings": {
4. "index": {
5. "analysis": {
6. "analyzer": {
7. "index_analyzer": {
8. "tokenizer": "standard",
9. "filter": [
10. "lowercase"
11. ]
12. },
13. "search_analyzer": {
14. "tokenizer": "standard",
15. "filter": [
16. "lowercase",
17. "synonym_filter"
18. ]
19. }
20. },
21. "filter": {
22. "synonym_filter": {
23. "type": "synonym_graph",
24. "synonyms_path": "synonyms.txt",
25. "updateable": true
26. }
27. }
28. }
29. }
30. },
31. "mappings": {
32. "properties": {
33. "name": {
34. "type": "text",
35. "analyzer": "index_analyzer",
36. "search_analyzer": "search_analyzer"
37. }
38. }
39. }
40. }
`
关键点:
- 对于 synonyms_path,它是同义词文件相对于 Elasticsearch 服务器中 config 文件夹的路径。
- 添加了一个新的 updateable 字段,它指定相应的过滤器是否可更新。 我们很快就会看到如何在不关闭和打开索引的情况下重新加载搜索分析器。
这个新索引 synonym_graph_file 的行为应该与前一个 synonym_graph 的行为相同。
现在让我们在同义词文件中添加更多的同义词,其内容如下:
1. $ pwd
2. /Users/liuxg/data/docker8
3. $ ls
4. docker-compose.yml synonyms.txt
5. $ cat synonyms.txt
6. # This is a comment! The file is named synonyms.txt.
7. elk,elkb,Elastic Stack
8. JS => JavaScript
9. TS => TypeScript
10. Py => Python
添加同义词后,我们可以关闭并打开索引使其生效。 然而,由于我们将同义词过滤器标记为可更新,我们可以重新加载搜索分析器以使更改立即生效,而无需关闭索引,因此无需停机。
要重新加载索引的搜索分析器,我们需要调用 _reload_search_analyzers 端点:
POST /synonym_graph_file/_reload_search_analyzers
上面的命令输出为:
`
1. {
2. "_shards": {
3. "total": 2,
4. "successful": 1,
5. "failed": 0
6. },
7. "reload_details": [
8. {
9. "index": "synonym_graph_file",
10. "reloaded_analyzers": [
11. "search_analyzer"
12. ],
13. "reloaded_node_ids": [
14. "tZLy82KRTaiCdpsbkEYnuA"
15. ]
16. }
17. ]
18. }
`
现在,当我们分析 JS 字符串时,我们将看到返回的 javascript token。
1. GET /synonym_graph_file/_analyze
2. {
3. "analyzer": "search_analyzer",
4. "text": "JS"
5. }
上面的命令返回:
1. {
2. "tokens": [
3. {
4. "token": "javascript",
5. "start_offset": 0,
6. "end_offset": 2,
7. "type": "SYNONYM",
8. "position": 0
9. }
10. ]
11. }
这里应该注意两件重要的事情:
- 如果同义词过滤器的 updateable 设置为true,那么对应的分析器只能作为 search_analyzer 使用,不能用于索引,即使类型是同义词。
- updateable 选项只能在同义词文件与 synonyms_path 选项一起使用时使用,而不是在同义词直接通过 synonyms 选项提供时使用。
恭喜你到达这里! 我们已经涵盖了在 Elasticsearch 中使用同义词功能的所有要点。
我们已经分别介绍了如何在索引时间和搜索时间分析步骤中使用同义词。 此外,还介绍了如何直接提供同义词列表,以及如何通过文件提供。 最后但同样重要的是,介绍了关于如何更新现有索引的同义词列表的不同方法。 建议重新加载索引的搜索分析器,因为它不会给服务带来停机时间。
收起阅读 »
回顾 2022 年 — 回顾 Elastic 这一年
2022 年对 Elastic 来说是非凡的一年,我们在可观察性、安全性和企业搜索解决方案、新客户和深化合作伙伴关系方面引入了数十项创新。
在我们花点时间回顾过去的一年时,我们汇总了 2022 年的热门博文。希望你喜欢!
Elastic 8.0
在年初之际,我们还推出了 Elastic 8.0 的新篇章:速度、规模、相关性和简单性的新时代。 Elastic 8.0 是 7.x 系列多年投资的结晶,旨在减少内存使用和查询开销,并引入新功能以增强相关性。
例如,我们提高了日期直方图和搜索聚合的速度,增强了页面缓存的性能,并创建了一个新的 “预过滤” 搜索阶段。 此外,我们通过减少内存堆减少了资源需求以降低客户的总拥有成本,引入了使用更少存储的新方法,并使我们的客户能够通过新的冻结层和可搜索快照轻松地将计算与存储分离。
在 8.0 中,我们还为 Elasticsearch 带来了一整套原生矢量搜索功能,使客户和员工能够使用他们自己的词汇和语言搜索和接收高度相关的结果。
Machine Learning Relevance Tuning
作为 7.x 发布版中两年多工作的结晶,使向量搜索的实现更加实用,我们还引入了对近似最近邻搜索的原生支持 — 使得可以将基于向量的查询作用于基于向量词库进行比较快速、大规模地比较。 查看我们的自然语言处理 (NLP) 博客系列,获取部署矢量搜索的快速入门指南。
Elastic Security
随着 Elastic Security for Cloud 的推出,这也是 Elastic Security 具有里程碑意义的一年。 Elastic Security for Cloud 扩展了我们的 SIEM、安全分析和端点安全功能,在单个统一平台中提供了风险和状态管理、威胁监控和工作负载保护的新功能。
我们通过再次加倍承诺公开和透明的安全来结束这一年。 我们对开放式安全的承诺在 Forrester Wave™:安全分析平台,2022 年第 4 季度中得到进一步认可,其中 Elastic 被评为领导者,并指出“Elastic 在开放式产品中提供了令人难以置信的灵活性和可视化。”
更多阅读:https://elasticstack.blog.csdn ... 94458
2022 年对 Elastic 来说是非凡的一年,我们在可观察性、安全性和企业搜索解决方案、新客户和深化合作伙伴关系方面引入了数十项创新。
在我们花点时间回顾过去的一年时,我们汇总了 2022 年的热门博文。希望你喜欢!
Elastic 8.0
在年初之际,我们还推出了 Elastic 8.0 的新篇章:速度、规模、相关性和简单性的新时代。 Elastic 8.0 是 7.x 系列多年投资的结晶,旨在减少内存使用和查询开销,并引入新功能以增强相关性。
例如,我们提高了日期直方图和搜索聚合的速度,增强了页面缓存的性能,并创建了一个新的 “预过滤” 搜索阶段。 此外,我们通过减少内存堆减少了资源需求以降低客户的总拥有成本,引入了使用更少存储的新方法,并使我们的客户能够通过新的冻结层和可搜索快照轻松地将计算与存储分离。
在 8.0 中,我们还为 Elasticsearch 带来了一整套原生矢量搜索功能,使客户和员工能够使用他们自己的词汇和语言搜索和接收高度相关的结果。
Machine Learning Relevance Tuning
作为 7.x 发布版中两年多工作的结晶,使向量搜索的实现更加实用,我们还引入了对近似最近邻搜索的原生支持 — 使得可以将基于向量的查询作用于基于向量词库进行比较快速、大规模地比较。 查看我们的自然语言处理 (NLP) 博客系列,获取部署矢量搜索的快速入门指南。
Elastic Security
随着 Elastic Security for Cloud 的推出,这也是 Elastic Security 具有里程碑意义的一年。 Elastic Security for Cloud 扩展了我们的 SIEM、安全分析和端点安全功能,在单个统一平台中提供了风险和状态管理、威胁监控和工作负载保护的新功能。
我们通过再次加倍承诺公开和透明的安全来结束这一年。 我们对开放式安全的承诺在 Forrester Wave™:安全分析平台,2022 年第 4 季度中得到进一步认可,其中 Elastic 被评为领导者,并指出“Elastic 在开放式产品中提供了令人难以置信的灵活性和可视化。”
更多阅读:https://elasticstack.blog.csdn ... 94458 收起阅读 »

Elasticsearch:使用 Node.js 将实时数据提取到 Elasticsearch 中(一)
如果你选择的编程语言是 JavaScript,并且你需要使用 RESTful API 方法从第三方应用程序获取数据,那么使用 Node.js 获取数据是一个不错的选择。 你还可以托管服务器,让它持续实时摄取数据。 该演示将向您展示如何设置一个 Node.js + Express.js 服务器,该服务器实时将数据提取到 Elasticsearch 中,然后可以对这些数据进行分析并以有意义的方式采取行动。
对于此演示,我们将使用 USGS 实时发布的公开可用的全球地震数据。
更多阅读 https://elasticstack.blog.csdn ... 05743
如果你选择的编程语言是 JavaScript,并且你需要使用 RESTful API 方法从第三方应用程序获取数据,那么使用 Node.js 获取数据是一个不错的选择。 你还可以托管服务器,让它持续实时摄取数据。 该演示将向您展示如何设置一个 Node.js + Express.js 服务器,该服务器实时将数据提取到 Elasticsearch 中,然后可以对这些数据进行分析并以有意义的方式采取行动。
对于此演示,我们将使用 USGS 实时发布的公开可用的全球地震数据。
更多阅读 https://elasticstack.blog.csdn ... 05743 收起阅读 »

Observability:从零开始创建 Java 微服务并监控它 (一)
- 创建示例 Java 应用程序。
- 使用 Filebeat 提取日志并在 Kibana 中查看你的日志。
- 使用 Metricbeat Prometheus 模块获取指标并在 Kibana 中查看你的指标。
- 使用 Elastic APM Java 代理检测你的应用程序。
- 使用 Heartbeat 监控您的服务并在 Kibana 中查看您的正常运行时间数据。
在下面的展示中,我将使用最新的 Elastic Stack 8.5.2 来进行展示。为了方便大家的学习,源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。
更多阅读:https://elasticstack.blog.csdn ... 13010
- 创建示例 Java 应用程序。
- 使用 Filebeat 提取日志并在 Kibana 中查看你的日志。
- 使用 Metricbeat Prometheus 模块获取指标并在 Kibana 中查看你的指标。
- 使用 Elastic APM Java 代理检测你的应用程序。
- 使用 Heartbeat 监控您的服务并在 Kibana 中查看您的正常运行时间数据。
在下面的展示中,我将使用最新的 Elastic Stack 8.5.2 来进行展示。为了方便大家的学习,源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。
更多阅读:https://elasticstack.blog.csdn ... 13010 收起阅读 »

Elasticsearch:将关系数据库中的数据提取到 Elasticsearch 集群中
此处提供的代码和方法已经过 MySQL 测试。 他们应该也适用于其他关系数据库。
Logstash Java 数据库连接 (JDBC) 输入插件使你能够从许多流行的关系数据库(包括 MySQL 和 Postgres)中提取数据。 从概念上讲,JDBC 输入插件运行一个循环,该循环定期轮询关系数据库以查找自该循环的最后一次迭代以来插入或修改的记录。
原文链接:https://elasticstack.blog.csdn ... 63743
此处提供的代码和方法已经过 MySQL 测试。 他们应该也适用于其他关系数据库。
Logstash Java 数据库连接 (JDBC) 输入插件使你能够从许多流行的关系数据库(包括 MySQL 和 Postgres)中提取数据。 从概念上讲,JDBC 输入插件运行一个循环,该循环定期轮询关系数据库以查找自该循环的最后一次迭代以来插入或修改的记录。
原文链接:https://elasticstack.blog.csdn ... 63743 收起阅读 »

ES7.5升级7.17后在写多读少场景下CPU、IO飙升
背景
1.ES PAAS管理的集群升级了100+,从7.5升级到7.17 (保证每个大版本最终仅维护一个小版本集群)
2.由于业务使用差异大,也出了不少问题,前面的文章也有提到过Integer类型字段使用terms查询效率低的情况
3.这里再分析一个CPU、IO飙升的场景
现象
1.用户报障:“ES集群写入吞吐量变小了”
2.观察下来发现确实CPU高了,IO也有明显抖动
排查与分析
1.发现YoungGC频率变高了一些,猜测可能是G1GC的问题(我们使用JDK11重新打了ES镜像),经过版本替换,没有明显变化。
参考issue:https://github.com/elastic/elasticsearch/pull/46169
这可能是另一个场景的case,经过测试,不属于我们的场景。
2.多次执行hot_threads API观察, 发现时不时会出现 update相关函数 消耗的 CPU多。
3.继续使用arthas抓取一段时间的数据,发现是 FST、DocID 读取慢
从图中可以看到Bulk请求执行过程中的getDocID方法占有大量CPU。
4.集群写多读少,使用的是niofs。可知,7.5版本的FST是在堆外,但是_id字段是在堆内。升级到7.17版本后,FST在堆外,该字段也放到了堆外(官方版本应该是7.9就开始放到堆外了)。数据放到堆外,其实也就是文件放到磁盘,读一次之后放到pagecache。
这样也就可以解释了,在upsert类请求多的时候会频繁查询docId,此时如果_id字段没有进入pageCache或者被踢出pageCache,那么就会出现响应慢,并且CPU高、IO高的情况。
5.mmapfs、hybridfs实测是什么情况暂时不明确,目前没有收到搜索类集群CPU、IO方面的报障。
测试验证
将FST、BKD等全部改成放到堆内(开源版需要改造)
可以看到,CPU有显著下降,也相对均衡。(之前蓝色线高,是因为该节点有大量的主分片)
结论
1.update、upsert、get等请求如果十分频繁,_id使用offheap将不会是个好的选择,除非给足够的堆外内存,并且保证尽可能常驻内存。
2.不同的业务场景下使用ES的同一版本也会有不一样的效果。
3.mmapfs、hybridfs在频繁update情况下,实测是什么情况暂时不明确,目前没有收到搜索类集群CPU、IO方面的报障,可能不会有这么明显的差距。(官方描述写入速度仅降低了1.8%)
4.最后吐槽一下,写入不停的情况下,translog的恢复实在是太慢了,由于大分片恢复/rebalance时,translog不会被清理,导致恢复/迁移速度急剧下降...目前各个版本也没什么好的解决方式。
背景
1.ES PAAS管理的集群升级了100+,从7.5升级到7.17 (保证每个大版本最终仅维护一个小版本集群)
2.由于业务使用差异大,也出了不少问题,前面的文章也有提到过Integer类型字段使用terms查询效率低的情况
3.这里再分析一个CPU、IO飙升的场景
现象
1.用户报障:“ES集群写入吞吐量变小了”
2.观察下来发现确实CPU高了,IO也有明显抖动
排查与分析
1.发现YoungGC频率变高了一些,猜测可能是G1GC的问题(我们使用JDK11重新打了ES镜像),经过版本替换,没有明显变化。
参考issue:https://github.com/elastic/elasticsearch/pull/46169
这可能是另一个场景的case,经过测试,不属于我们的场景。
2.多次执行hot_threads API观察, 发现时不时会出现 update相关函数 消耗的 CPU多。
3.继续使用arthas抓取一段时间的数据,发现是 FST、DocID 读取慢
从图中可以看到Bulk请求执行过程中的getDocID方法占有大量CPU。
4.集群写多读少,使用的是niofs。可知,7.5版本的FST是在堆外,但是_id字段是在堆内。升级到7.17版本后,FST在堆外,该字段也放到了堆外(官方版本应该是7.9就开始放到堆外了)。数据放到堆外,其实也就是文件放到磁盘,读一次之后放到pagecache。
这样也就可以解释了,在upsert类请求多的时候会频繁查询docId,此时如果_id字段没有进入pageCache或者被踢出pageCache,那么就会出现响应慢,并且CPU高、IO高的情况。
5.mmapfs、hybridfs实测是什么情况暂时不明确,目前没有收到搜索类集群CPU、IO方面的报障。
测试验证
将FST、BKD等全部改成放到堆内(开源版需要改造)
可以看到,CPU有显著下降,也相对均衡。(之前蓝色线高,是因为该节点有大量的主分片)
结论
1.update、upsert、get等请求如果十分频繁,_id使用offheap将不会是个好的选择,除非给足够的堆外内存,并且保证尽可能常驻内存。
2.不同的业务场景下使用ES的同一版本也会有不一样的效果。
3.mmapfs、hybridfs在频繁update情况下,实测是什么情况暂时不明确,目前没有收到搜索类集群CPU、IO方面的报障,可能不会有这么明显的差距。(官方描述写入速度仅降低了1.8%)
4.最后吐槽一下,写入不停的情况下,translog的恢复实在是太慢了,由于大分片恢复/rebalance时,translog不会被清理,导致恢复/迁移速度急剧下降...目前各个版本也没什么好的解决方式。
收起阅读 »
Elasticsearch:Hadoop 大数据集成 (Hadoop => Elasticsearch)
如上所示,我们在左边的 macOS 中安装 Elasticsearch 及 Kibana,而在 Ubuntu OS 中安装 Hadoop。我们将以最新的 Elastic Stack 8.4.2 来进行展示。
Hadoop 是什么?
当我们需要收集、处理/转换和/或存储数千 GB、数千 TB 甚至更多的数据时,Hadoop 可能是完成这项工作的合适工具。它是从头开始构建的,考虑到这样的想法:
- 一次使用多台计算机(形成一个集群),以便它可以并行处理数据,从而更快地完成工作。我们可以这样想。如果一台服务器需要处理 100 TB 的数据,它可能会在 500 小时内完成。但是如果我们有 100 台服务器,每台只能取一部分数据,例如 server1 可以取第一个 TB,server2 可以取第二个 TB,以此类推。现在他们每个人都只有 1 TB 的数据要处理,而且他们都可以同时处理自己的数据部分。这样,工作可以在 5 小时内完成,而不是 500 小时。当然,这是理论上的和想象的,因为在实践中我们不会减少 100 倍所需的时间,但我们可以非常接近如果条件理想。
- 在需要时可以很容易地调整计算能力。有更多的数据要处理,而问题要复杂得多?将更多计算机添加到集群。从某种意义上说,这就像在超级计算机上增加了更多的 CPU 内核。
- 数据不断增长,因此 Hadoop 也必须能够轻松灵活地扩展其存储容量,以满足需求。我们添加到集群的每台计算机都会扩展 Hadoop 分布式文件系统 (HDFS) 的可用总存储空间。
- 与其他软件不同,它不仅会在硬件故障发生时尝试从硬件故障中恢复。设计理念实际上假设某些硬件肯定会失败。当有数千台计算机并行工作时,可以保证某处某处会不时出现故障。因此,默认情况下,Hadoop 创建数据块的副本并将它们分布在单独的硬件上,因此当偶尔的服务器起火或硬盘或 SSD 死机时,不会丢失任何内容。
总而言之,Hadoop 非常擅长摄取和处理大量信息。它将数据分布在集群中可用的多个节点上,并使用 MapReduce 编程模型在多台机器上同时处理数据(并行处理)。
但这听起来可能有点类似于 Elasticsearch 数据摄取工具所做的事情。尽管它们是为处理相当不同的场景而设计的,但它们有时可能会有些重叠。那么我们为什么以及何时使用其中一个而不是另一个呢?
Hadoop vs Logstash/Elasticsearch
首先,我们不应该考虑哪个比哪个更好。 每个人都擅长为其创造的工作。 每个都有优点和缺点。
为了尝试给你绘制一个图片并让你了解我们何时使用其中一个,让我们考虑以下场景:
- 当我们需要从数十亿个网站中提取数据时,就像谷歌这样的搜索引擎所做的那样,我们会发现像 Elasticsearch 及 Hadoop 这样的工具非常有用和高效。
- 当我们需要以这样一种方式存储数据并对其进行索引以便以后可以快速有效地搜索时,我们会发现像 Elasticsearch 这样的东西非常有用。
- 最后,当我们想要收集实时数据时,例如来自互联网上许多交易所的美元/欧元价格,我们会发现像 Logstash 这样的工具非常适合这项工作。
更多阅读,请参阅 https://elasticstack.blog.csdn ... 97392
如上所示,我们在左边的 macOS 中安装 Elasticsearch 及 Kibana,而在 Ubuntu OS 中安装 Hadoop。我们将以最新的 Elastic Stack 8.4.2 来进行展示。
Hadoop 是什么?
当我们需要收集、处理/转换和/或存储数千 GB、数千 TB 甚至更多的数据时,Hadoop 可能是完成这项工作的合适工具。它是从头开始构建的,考虑到这样的想法:
- 一次使用多台计算机(形成一个集群),以便它可以并行处理数据,从而更快地完成工作。我们可以这样想。如果一台服务器需要处理 100 TB 的数据,它可能会在 500 小时内完成。但是如果我们有 100 台服务器,每台只能取一部分数据,例如 server1 可以取第一个 TB,server2 可以取第二个 TB,以此类推。现在他们每个人都只有 1 TB 的数据要处理,而且他们都可以同时处理自己的数据部分。这样,工作可以在 5 小时内完成,而不是 500 小时。当然,这是理论上的和想象的,因为在实践中我们不会减少 100 倍所需的时间,但我们可以非常接近如果条件理想。
- 在需要时可以很容易地调整计算能力。有更多的数据要处理,而问题要复杂得多?将更多计算机添加到集群。从某种意义上说,这就像在超级计算机上增加了更多的 CPU 内核。
- 数据不断增长,因此 Hadoop 也必须能够轻松灵活地扩展其存储容量,以满足需求。我们添加到集群的每台计算机都会扩展 Hadoop 分布式文件系统 (HDFS) 的可用总存储空间。
- 与其他软件不同,它不仅会在硬件故障发生时尝试从硬件故障中恢复。设计理念实际上假设某些硬件肯定会失败。当有数千台计算机并行工作时,可以保证某处某处会不时出现故障。因此,默认情况下,Hadoop 创建数据块的副本并将它们分布在单独的硬件上,因此当偶尔的服务器起火或硬盘或 SSD 死机时,不会丢失任何内容。
总而言之,Hadoop 非常擅长摄取和处理大量信息。它将数据分布在集群中可用的多个节点上,并使用 MapReduce 编程模型在多台机器上同时处理数据(并行处理)。
但这听起来可能有点类似于 Elasticsearch 数据摄取工具所做的事情。尽管它们是为处理相当不同的场景而设计的,但它们有时可能会有些重叠。那么我们为什么以及何时使用其中一个而不是另一个呢?
Hadoop vs Logstash/Elasticsearch
首先,我们不应该考虑哪个比哪个更好。 每个人都擅长为其创造的工作。 每个都有优点和缺点。
为了尝试给你绘制一个图片并让你了解我们何时使用其中一个,让我们考虑以下场景:
- 当我们需要从数十亿个网站中提取数据时,就像谷歌这样的搜索引擎所做的那样,我们会发现像 Elasticsearch 及 Hadoop 这样的工具非常有用和高效。
- 当我们需要以这样一种方式存储数据并对其进行索引以便以后可以快速有效地搜索时,我们会发现像 Elasticsearch 这样的东西非常有用。
- 最后,当我们想要收集实时数据时,例如来自互联网上许多交易所的美元/欧元价格,我们会发现像 Logstash 这样的工具非常适合这项工作。
更多阅读,请参阅 https://elasticstack.blog.csdn ... 97392 收起阅读 »

ES7.17版本terms查询性能问题
背景
1.对于7版本(大版本)集群希望只维护一个版本,最终选择7.17,对同大版本的7.5版本集群进行升级
2.根据官方描述,_id放到堆外性能损失非常小可以忽略,且对BKD进行了优化
3.升级完成,一段时间之后,收到用户报障
4.抽样检查了下部分升级的集群,其中部分受到影响,部分不受影响。且每个集群内存均有一定优化(预期内)
调查&分析
1.发现is_deleted文档特别多,怀疑是7.17版本对于碎片过于敏感。做forcemerge,没什么效果。
2.GET _nodes/hot_threads 查看耗时部分,结果展示笼统,没得到关键信息。
3.给语句加上profile,查看耗时部分。
GET index-v1/_search
{"profile":"true","query":{"bool":{"filter":[{"term":{"xid":{"value":"11111111","boost":1.0}}},{"terms":{"status":[2,3,4],"boost":1.0}},{"terms":{"platform":["aaa","bbb"],"boost":1.0}},{"terms":{"pId":[1,2],"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"sort":[{"time":{"order":"desc"}}]}
从脱敏的简化结果中可以看出来,主要是 status、pId 字段耗时高,这两个字段都是integer类型,并且使用了terms查询。
{
"took": 554,
"timed_out": false,
"_shards": {
"total": 3,
"successful": 3,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": null,
"hits": [
...
]
},
"profile": {
"shards": [
{
"id": "[APxxxxxxxxxxxxxxQ][index-v1][0]",
"searches": [
{
"query": [
{
"type": "BooleanQuery",
"description": "#xid:111111111 #status:{2 3 4} #ConstantScore(platform:aaa platform:bbb) #pId:{1 2}",
"time_in_nanos": 415205306,
"breakdown": {
...
"build_scorer": 415028271
},
"children": [
{
"type": "TermQuery",
"description": "xid:111111111",
"time_in_nanos": 102656,
"breakdown": {
.....
"build_scorer": 86264
}
},
{
"type": "PointInSetQuery",
"description": "status:{2 3 4}",
"time_in_nanos": 220394978,
"breakdown": {
....
"build_scorer": 220385119
}
},
{
"type": "ConstantScoreQuery",
"description": "ConstantScore(platform:aaa platform:bbb)",
"time_in_nanos": 341845,
"breakdown": {
.....
"build_scorer": 282277
},
"children": [
{
"type": "BooleanQuery",
"description": "platform:aaa platform:bbb",
"time_in_nanos": 329042,
"breakdown": {
.....
"build_scorer": 277752
},
"children": [
{
"type": "TermQuery",
"description": "platform:aaa",
"time_in_nanos": 62446,
"breakdown": {
.....
"build_scorer": 37931
}
},
{
"type": "TermQuery",
"description": "platform:bbb",
"time_in_nanos": 15093,
"breakdown": {
.....
"build_scorer": 6981
}
}
]
}
]
},
{
"type": "PointInSetQuery",
"description": "pId:{1 2}",
"time_in_nanos": 194164297,
"breakdown": {
....
"build_scorer": 194160452
}
}
]
}
],
"rewrite_time": 40044,
"collector": [
{
"name": "SimpleFieldCollector",
"reason": "search_top_hits",
"time_in_nanos": 144012
}
]
}
]
4.单个的profile无法说明问题,进一步排查:使用arthas工具获取一段时间内的火焰图
可以看到主要就是BKD数据结构占用的CPU。
5.参考官方论坛相似问题:https://discuss.elastic.co/t/very-slow-search-performance-after-upgrade-to-7-16-1/296152/3
6.integer类型的terms查询性能较差,看起来官方描述的BKD相关优化指的是range
7.测试验证,将字段改成keyword,查看结果,CPU查询耗时恢复到正常范围
背景
1.对于7版本(大版本)集群希望只维护一个版本,最终选择7.17,对同大版本的7.5版本集群进行升级
2.根据官方描述,_id放到堆外性能损失非常小可以忽略,且对BKD进行了优化
3.升级完成,一段时间之后,收到用户报障
4.抽样检查了下部分升级的集群,其中部分受到影响,部分不受影响。且每个集群内存均有一定优化(预期内)
调查&分析
1.发现is_deleted文档特别多,怀疑是7.17版本对于碎片过于敏感。做forcemerge,没什么效果。
2.GET _nodes/hot_threads 查看耗时部分,结果展示笼统,没得到关键信息。
3.给语句加上profile,查看耗时部分。
GET index-v1/_search
{"profile":"true","query":{"bool":{"filter":[{"term":{"xid":{"value":"11111111","boost":1.0}}},{"terms":{"status":[2,3,4],"boost":1.0}},{"terms":{"platform":["aaa","bbb"],"boost":1.0}},{"terms":{"pId":[1,2],"boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"sort":[{"time":{"order":"desc"}}]}
从脱敏的简化结果中可以看出来,主要是 status、pId 字段耗时高,这两个字段都是integer类型,并且使用了terms查询。
{
"took": 554,
"timed_out": false,
"_shards": {
"total": 3,
"successful": 3,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": null,
"hits": [
...
]
},
"profile": {
"shards": [
{
"id": "[APxxxxxxxxxxxxxxQ][index-v1][0]",
"searches": [
{
"query": [
{
"type": "BooleanQuery",
"description": "#xid:111111111 #status:{2 3 4} #ConstantScore(platform:aaa platform:bbb) #pId:{1 2}",
"time_in_nanos": 415205306,
"breakdown": {
...
"build_scorer": 415028271
},
"children": [
{
"type": "TermQuery",
"description": "xid:111111111",
"time_in_nanos": 102656,
"breakdown": {
.....
"build_scorer": 86264
}
},
{
"type": "PointInSetQuery",
"description": "status:{2 3 4}",
"time_in_nanos": 220394978,
"breakdown": {
....
"build_scorer": 220385119
}
},
{
"type": "ConstantScoreQuery",
"description": "ConstantScore(platform:aaa platform:bbb)",
"time_in_nanos": 341845,
"breakdown": {
.....
"build_scorer": 282277
},
"children": [
{
"type": "BooleanQuery",
"description": "platform:aaa platform:bbb",
"time_in_nanos": 329042,
"breakdown": {
.....
"build_scorer": 277752
},
"children": [
{
"type": "TermQuery",
"description": "platform:aaa",
"time_in_nanos": 62446,
"breakdown": {
.....
"build_scorer": 37931
}
},
{
"type": "TermQuery",
"description": "platform:bbb",
"time_in_nanos": 15093,
"breakdown": {
.....
"build_scorer": 6981
}
}
]
}
]
},
{
"type": "PointInSetQuery",
"description": "pId:{1 2}",
"time_in_nanos": 194164297,
"breakdown": {
....
"build_scorer": 194160452
}
}
]
}
],
"rewrite_time": 40044,
"collector": [
{
"name": "SimpleFieldCollector",
"reason": "search_top_hits",
"time_in_nanos": 144012
}
]
}
]
4.单个的profile无法说明问题,进一步排查:使用arthas工具获取一段时间内的火焰图
可以看到主要就是BKD数据结构占用的CPU。
5.参考官方论坛相似问题:https://discuss.elastic.co/t/very-slow-search-performance-after-upgrade-to-7-16-1/296152/3
6.integer类型的terms查询性能较差,看起来官方描述的BKD相关优化指的是range
7.测试验证,将字段改成keyword,查看结果,CPU查询耗时恢复到正常范围
收起阅读 »
API 网关 Apache APISIX 集成 Elasticsearch 实现实时日志监控
本文将为你介绍 Apache APISIX 的 elasticsearch-logger 插件的相关信息,以及如何通过此插件获取 APISIX 的实时日志。
背景信息
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供了负载均衡、动态上游、灰度发布、服务熔断、身份认证、可观测性等丰富的流量管理功能。作为 API 网关,Apache APISIX 不仅拥有丰富的插件,而且支持插件的热加载。
Elasticsearch 是一个基于 Lucene 库的搜索引擎。它提供了分布式、RESTful 风格的搜索和数据分析引擎,具有可扩展性、可分布式部署和可进行相关度搜索等特点,能够解决不断涌现出的各种用例。同时还可以集中存储用户数据,帮助用户发现意料之中以及意料之外的情况。
插件介绍
APISIX 以 HTTP 请求的方式向 Elasticsearch 发送 APISIX 的 Runtime 日志。插件 elasticsearch-logger
采用 bulk 的格式进行日志上报,这允许 APISIX 可以将多条日志合并后再进行上报,这使得 APISIX 在对 Elasticsearch 进行日志上报方面更加灵活并且具有较好的性能。你可以参考文档 APISIX 批处理器 对日志合进行更加细致的配置。
配置步骤
首先,你需要安装完成 APISIX,本文所有步骤基于 Centos 7.5 系统进行。详细的安装步骤参考 APISIX 安装指南。
步骤1:启动 Elasticsearch
本示例只演示了通过 docker-compose
启动 Elasticsearch 单节点的方式,其它启动方式可参考 Elasticsearch 官方文档。
# 使用 docker-compose 启动 1 个 Elasticsearch 节点, 1 个 kibana
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1
container_name: elasticsearch
environment:
ES_JAVA_OPTS: -Xms512m -Xmx512m
discovery.type: single-node
xpack.security.enabled: 'false'
networks:
- es-net
ports:
- "9200:9200"
- "9300:9300"
kibana:
image: docker.elastic.co/kibana/kibana:7.17.1
container_name: kibana
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
I18N_LOCALE: zh-CN
networks:
- es-net
depends_on:
- elasticsearch
ports:
- "5601:5601"
networks:
es-net:
driver: bridge
步骤2:创建路由并配置插件
APISIX 默认配置文件中已启用 elasticsearch-logger
插件,所以你只需要通过下方命令创建路由并配置 elasticsearch-logger
插件就可以在 APISIX 中正常使用了。
curl http://127.0.0.1:9180/apisix/admin/routes/1 \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins":{
"elasticsearch-logger":{
"endpoint_addr":"http://127.0.0.1:9200",
"field":{
"index":"services",
"type":"collector"
},
"ssl_verify":false,
"retry_delay":1,
"buffer_duration":60,
"max_retry_count":0,
"batch_max_size":1000,
"inactive_timeout":5,
"name":"elasticsearch-logger"
}
},
"upstream":{
"type":"roundrobin",
"nodes":{
"127.0.0.1:1980":1
}
},
"uri":"/elasticsearch.do"
}'
上述代码中配置了 Elasticsearch 地址、目标 field
,用户名与密码。
通过上述设置,就可以实现将 /elasticsearch.do
路径的 API 请求日志发送至 Elasticsearch 的功能。
步骤3:发送请求
接下来我们通过 API 发送一些请求。
curl -i http://127.0.0.1:9080/elasticsearch.do\?q\=hello
HTTP/1.1 200 OK
...
hello, world
此时你可以登录 Kibana 控制台检索查看相关日志:
自定义日志结构
当然,在使用过程中我们也可以通过 elasticsearch-logger
插件提供的元数据配置,来设置发送至 Elasticsearch 的日志数据结构。通过设置 log_format
数据,可以控制发送的数据类型。
比如以下数据中的 $host
、$time_iso8601
等,都是来自于 NGINX 提供的内置变量;也支持如 $route_id
和 $service_id
等 Apache APISIX 提供的变量配置。
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
通过发送请求进行简单测试,可以看到上述日志结构设置已生效。目前 Apache APISIX 提供多种日志格式模板,在配置上具有极大的灵活性,更多日志格式细节可参考 Apache APISIX 官方文档。
此时你可以登录 Kibana 控制台检索查看相关自定义日志:
如需关闭自定义日志结构,可参考下方操作。
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X DELETE
此时,插件 elasticsearch-logger
将使用默认格式上报日志。
关闭插件
如使用完毕,只需移除路由配置中 elasticsearch-logger
插件相关的配置并保存,即可关闭路由上的插件。得益于 Apache APISIX 的动态化优势,开启和关闭插件的过程都不需要重启 Apache APISIX。
curl http://127.0.0.1:9080/apisix/admin/routes/1 \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
总结
本文为大家介绍了关于 elasticsearch-logger 插件的功能与使用步骤,更多关于 elasticsearch-logger 插件说明和完整配置列表,可以参考官方文档。
也欢迎随时在 GitHub Discussions 中发起讨论,或通过邮件列表进行交流。
本文将为你介绍 Apache APISIX 的 elasticsearch-logger 插件的相关信息,以及如何通过此插件获取 APISIX 的实时日志。
背景信息
Apache APISIX 是一个动态、实时、高性能的 API 网关,提供了负载均衡、动态上游、灰度发布、服务熔断、身份认证、可观测性等丰富的流量管理功能。作为 API 网关,Apache APISIX 不仅拥有丰富的插件,而且支持插件的热加载。
Elasticsearch 是一个基于 Lucene 库的搜索引擎。它提供了分布式、RESTful 风格的搜索和数据分析引擎,具有可扩展性、可分布式部署和可进行相关度搜索等特点,能够解决不断涌现出的各种用例。同时还可以集中存储用户数据,帮助用户发现意料之中以及意料之外的情况。
插件介绍
APISIX 以 HTTP 请求的方式向 Elasticsearch 发送 APISIX 的 Runtime 日志。插件 elasticsearch-logger
采用 bulk 的格式进行日志上报,这允许 APISIX 可以将多条日志合并后再进行上报,这使得 APISIX 在对 Elasticsearch 进行日志上报方面更加灵活并且具有较好的性能。你可以参考文档 APISIX 批处理器 对日志合进行更加细致的配置。
配置步骤
首先,你需要安装完成 APISIX,本文所有步骤基于 Centos 7.5 系统进行。详细的安装步骤参考 APISIX 安装指南。
步骤1:启动 Elasticsearch
本示例只演示了通过 docker-compose
启动 Elasticsearch 单节点的方式,其它启动方式可参考 Elasticsearch 官方文档。
# 使用 docker-compose 启动 1 个 Elasticsearch 节点, 1 个 kibana
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.1
container_name: elasticsearch
environment:
ES_JAVA_OPTS: -Xms512m -Xmx512m
discovery.type: single-node
xpack.security.enabled: 'false'
networks:
- es-net
ports:
- "9200:9200"
- "9300:9300"
kibana:
image: docker.elastic.co/kibana/kibana:7.17.1
container_name: kibana
environment:
ELASTICSEARCH_HOSTS: http://elasticsearch:9200
I18N_LOCALE: zh-CN
networks:
- es-net
depends_on:
- elasticsearch
ports:
- "5601:5601"
networks:
es-net:
driver: bridge
步骤2:创建路由并配置插件
APISIX 默认配置文件中已启用 elasticsearch-logger
插件,所以你只需要通过下方命令创建路由并配置 elasticsearch-logger
插件就可以在 APISIX 中正常使用了。
curl http://127.0.0.1:9180/apisix/admin/routes/1 \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"plugins":{
"elasticsearch-logger":{
"endpoint_addr":"http://127.0.0.1:9200",
"field":{
"index":"services",
"type":"collector"
},
"ssl_verify":false,
"retry_delay":1,
"buffer_duration":60,
"max_retry_count":0,
"batch_max_size":1000,
"inactive_timeout":5,
"name":"elasticsearch-logger"
}
},
"upstream":{
"type":"roundrobin",
"nodes":{
"127.0.0.1:1980":1
}
},
"uri":"/elasticsearch.do"
}'
上述代码中配置了 Elasticsearch 地址、目标 field
,用户名与密码。
通过上述设置,就可以实现将 /elasticsearch.do
路径的 API 请求日志发送至 Elasticsearch 的功能。
步骤3:发送请求
接下来我们通过 API 发送一些请求。
curl -i http://127.0.0.1:9080/elasticsearch.do\?q\=hello
HTTP/1.1 200 OK
...
hello, world
此时你可以登录 Kibana 控制台检索查看相关日志:
自定义日志结构
当然,在使用过程中我们也可以通过 elasticsearch-logger
插件提供的元数据配置,来设置发送至 Elasticsearch 的日志数据结构。通过设置 log_format
数据,可以控制发送的数据类型。
比如以下数据中的 $host
、$time_iso8601
等,都是来自于 NGINX 提供的内置变量;也支持如 $route_id
和 $service_id
等 Apache APISIX 提供的变量配置。
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"log_format": {
"host": "$host",
"@timestamp": "$time_iso8601",
"client_ip": "$remote_addr"
}
}'
通过发送请求进行简单测试,可以看到上述日志结构设置已生效。目前 Apache APISIX 提供多种日志格式模板,在配置上具有极大的灵活性,更多日志格式细节可参考 Apache APISIX 官方文档。
此时你可以登录 Kibana 控制台检索查看相关自定义日志:
如需关闭自定义日志结构,可参考下方操作。
curl http://127.0.0.1:9180/apisix/admin/plugin_metadata/elasticsearch-logger \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X DELETE
此时,插件 elasticsearch-logger
将使用默认格式上报日志。
关闭插件
如使用完毕,只需移除路由配置中 elasticsearch-logger
插件相关的配置并保存,即可关闭路由上的插件。得益于 Apache APISIX 的动态化优势,开启和关闭插件的过程都不需要重启 Apache APISIX。
curl http://127.0.0.1:9080/apisix/admin/routes/1 \
-H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
总结
本文为大家介绍了关于 elasticsearch-logger 插件的功能与使用步骤,更多关于 elasticsearch-logger 插件说明和完整配置列表,可以参考官方文档。
也欢迎随时在 GitHub Discussions 中发起讨论,或通过邮件列表进行交流。
收起阅读 »
Observability:使用 Elastic Agent 来进行 Uptime 监控
Beats:使用 Heartbeat 进行 Uptime 监控
Observability:使用 Elastic Agent 来摄入日志及指标 - Elastic Stack 8.0
Observability:如何使用 Elastic Agents 把微服务的数据摄入到 Elasticsearch 中
更多阅读,请参阅 https://elasticstack.blog.csdn ... 29912
Beats:使用 Heartbeat 进行 Uptime 监控
Observability:使用 Elastic Agent 来摄入日志及指标 - Elastic Stack 8.0
Observability:如何使用 Elastic Agents 把微服务的数据摄入到 Elasticsearch 中
更多阅读,请参阅 https://elasticstack.blog.csdn ... 29912 收起阅读 »