如何使用 DataX 连接 Easysearch
yangmf2040 发表了文章 • 0 个评论 • 1014 次浏览 • 6 天前
DataX
DataX 是阿里开源的一款离线数据同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。
本篇主要介绍 DataX 如何将数据写入到 Easysearch,对于各种数据源的连接不会做深入的探讨,感兴趣的小伙伴可以访问 [DataX](https://github.com/alibaba/Dat ... ion.md) 的 Github 仓库查看详情。
下载与安装
DataX 无需安装,下载后解压即可使用。
系统需求:
- JDK 1.8 及以上
- Python2 或 3
创建任务配置文件
每个数据同步的操作可称为一个任务,任务的配置文件定义了数据源(reader)、数据目的(writer) ,以及任务的设置信息,如并发数、速度控制等。DataX 集成了如此多的数据源,如果靠纯手工编写任务配置显然不现实。官方也出了个命令可以根据指定的数据源和数据目的帮助大家生成任务配置。
shell<br /> python datax.py -r {YOUR_READER} -w {YOUR_WRITER}<br />
测试配置文件
此次演示使用 streamreader 和 elasticsearchwriter 作为数据源和数据目的,任务配置如下:
json<br /> {<br /> "job": {<br /> "content": [<br /> {<br /> "reader": {<br /> "name": "streamreader",<br /> "parameter": {<br /> "sliceRecordCount": 10000,<br /> "column": [<br /> {<br /> "type": "long",<br /> "value": "10"<br /> },<br /> {<br /> "type": "string",<br /> "value": "hello,你好,世界-DataX"<br /> },<br /> {<br /> "type": "string",<br /> "value": "hello,你好,Easysearch"<br /> }<br /> ]<br /> }<br /> },<br /> "writer": {<br /> "name": "elasticsearchwriter",<br /> "parameter": {<br /> "endpoint": "<a href="http://localhost:9200"" rel="nofollow" target="_blank">http://localhost:9200"</a>,<br /> "accessId": "admin",<br /> "accessKey": "1ef0c661d8562aaa06be",<br /> "index": "yf-test",<br /> "column": [<br /> { "name": "no", "type": "long" },<br /> { "name": "content", "type": "keyword" },<br /> { "name": "content2", "type": "keyword" }<br /> ]<br /> }<br /> }<br /> }<br /> ],<br /> "setting": {<br /> "speed": {<br /> "channel": 50<br /> }<br /> }<br /> }<br /> }<br />
streamreader 是一个从内存读取数据的插件, 它主要用来快速生成期望的数据并对写入插件进行测试。
我们用 streamreader 构造了 10000 个文档,文档含三个字段,任务启动了 50 个 channel 进行数据发送,结果就是共计发送 50w 个文档。
elasticssearchwriter 指定了 Easysearch 的连接信息:
- endpoint: Easysearch 的地址和端口
- accessId: 用户名
- accessKey: 密码
- index: 写入索引名
- column: 对 reader 发来数据的 schema 定义
- batchsize: 默认 1000
这次我们 Easysearch 开启的 http 服务,因为 DataX 的 elasticsearchwriter 无法跳过证书验证。对于必须使用 https 的场景,可使用 [INFINI Gateway](https://infinilabs.cn/docs/latest/gateway/) 代理 ES 服务,提供 http 通道给离线数据同步专用。
⚠️ 注意:
不同的 reader、writer 对 sliceRecordCount 和 channel 会有不同的行为。
Easysearch
本次测试使用的 [Easysearch](https://infinilabs.cn/download/) 版本是 1.9.0,需要注意是 Easysearch 要开启兼容性参数:
yml<br /> elasticsearch.api_compatibility: true<br />
否则创建索引报错退出。(实际索引创建成功了但是 mapping 信息是空的)

运行任务
编辑好任务配置文件后,下一步就是执行任务。
shell<br /> python3 datax.py yf-test.json<br />

写入数据时索引不存在,Datax 根据 schema 定义创建了索引。

OK 任务执行完毕,写入 50w 个文档耗时 10 秒。
如果有其他问题欢迎与我联系。

- endpoint: Easysearch 的地址和端口
引爆知识革命!Easysearch+携手+DeepSeek+打造下一代智能问答系统
yangmf2040 发表了文章 • 0 个评论 • 419 次浏览 • 2 天前
去年我们尝试过使用 Easysearch + 千问 2 大模型打造一个[企业内部知识问答系统](https://infinilabs.cn/blog/202 ... earch/),今年又有更加给力的大模型出现了--DeepSeek,性能对标 OpenAI o1 正式版。而且 Easysearch 对比去年也有了不少进步,是时候让我们升级下问答系统了。
DeepSeek
2025 年 1 月 20 日,人工智能领域迎来里程碑式突破!深度求索(DeepSeek)正式发布新一代推理大模型 DeepSeek-R1,不仅实现与 OpenAI 最新 o1 正式版的性能对标,更以全栈开放的生态布局引发行业震动。DeepSeek-R1 是首个遵循 MIT License 开源协议的高性能推理模型,完全开源,不限制商用,无需申请,极大地推动了 AI 技术的开放与共享。

下载模型
我们使用 ollama 下载运行 DeepSeek-R1,根据本地资源情况选择一个大小合适的版本:8b。

- 8b 蒸馏模型源自 Llama3.1-8B-Base
- 7b 蒸馏模型源自 Qwen-2.5 系列
这两个可能是个人用户使用最多的选择,大家资源充足的可以都下载下来对比下效果。

由于是升级,我们只需在原有程序基础上替换新版本的 Easysearch 和集成 DeepSeek 即可,[Easysearch](https://infinilabs.cn/download/) 升级成新版本 1.10.1,程序框架和 embedding 模型 (mxbai-embed-large:latest) 仍然保持不变。

数据准备
跟上次一样,使用 "INFINI 产品安装手册.PDF" 作为知识内容,通过程序将文档内容切片、转换成向量后写入 Easysearch 存储,然后结合大模型对其中的内容进行提问。
程序调整
程序代码需要调整 LLM 为 deepseek-r1:8b。另外本地主机资源有限,为节约时间,取消上个版本的用户问题改写功能(注释部分)。定义新的 retriever 和 qa_chain 直接将用户问题和 context 信息发送给大模型。
```python实例化一个大模型工具
from langchain_community.chat_models import ChatOllama
llm = ChatOllama(model="deepseek-r1:8b")
from langchain.prompts import PromptTemplate
my_template = PromptTemplate(
input_variables=["question"],
template="""You are an AI language model assistant. Your task is
to generate 3 different versions of the given user
question in Chinese to retrieve relevant documents from a vector database.
By generating multiple perspectives on the user question,
your goal is to help the user overcome some of the limitations
of distance-based similarity search. Provide these alternative
questions separated by newlines. Original question: {question}""",
)
实例化一个MultiQueryRetriever
retriever_from_llm = MultiQueryRetriever.from_llm(
retriever=docsearch.as_retriever(),
llm=llm,
prompt=my_template,
include_original=True)
retriever = docsearch.as_retriever()实例化一个RetrievalQA链
qa_chain = RetrievalQA.from_chain_type(llm, retriever=retriever)
```
至此程序修改已经完成,原程序算上注释也不过 100 来行,大家感兴趣的可以去查看[原博客](https://infinilabs.cn/blog/202 ... earch/)。
效果测试
模拟用户提问:网关运行后监听哪个端口。

系统回答如下。

在回答中,可以看到 DeepSeek 的"思考"过程,另外回答结果也非常正确,文档中原文还是用的英语 INFINI Gateway 表示网关。

模拟用户提问:LOGGING_ES_ENDPOINT 有什么用。

系统回答如下。

文档原文内容如下。

好了,我对 DeepSeek 的表现很满意,至此知识问答系统就升级完了。
如有任何问题,请随时联系我,期待与您交流!
关于 Easysearch

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。
官网文档:<https://infinilabs.cn/docs/latest/easysearch>
作者:杨帆,极限科技(INFINI Labs)高级解决方案架构师、《老杨玩搜索》栏目 B 站 UP 主,拥有十余年金融行业服务工作经验,熟悉 Linux、数据库、网络等领域。目前主要从事 Easysearch、Elasticsearch 等搜索引擎的技术支持工作,服务国内私有化部署的客户。
Easysearch 磁盘水位线注意事项
yangmf2040 发表了文章 • 0 个评论 • 404 次浏览 • 2 天前
[Easyearch](https://infinilabs.cn/products/easysearch/) 为了防止索引将磁盘空间完全占满,使用磁盘水位线进行磁盘空间控制。具体来说有三条磁盘水位线:low、high、flood。
低水位线
通过参数 cluster.routing.allocation.disk.watermark.low
进行设置,默认值 85%。也可设置成一个具体值,比如:400mb,代表须保留 400mb 空闲磁盘空间,否则就算超水位线。
一旦节点磁盘使用率超过了低水位线,Easysearch 集群不会将分片分配至该节点,但是不影响新建索引的主分片分配到该节点,新建索引的副本分配不能分配到该节点。
如果所有节点都超过高水位线,此时创建新索引会导致集群状态变成 yellow。
高水位线
通过参数 cluster.routing.allocation.disk.watermark.high
进行设置,默认值 90%。也可设置成一个具体值,比如:300mb,代表须保留 300mb 空闲磁盘空间,否则就算超水位线。
一旦节点磁盘使用率超过了高水位线,Easysearch 集群会尝试将分片移动到其他节点,不允许任何分片分配到该节点。
如果所有节点都超过高水位线,此时创建新索引会导致集群状态变成 red。

洪水位线
通过参数 cluster.routing.allocation.disk.watermark.flood_stage
进行设置,默认值 95%。也可设置成一个具体值,比如:200mb,代表须保留 200mb 空闲磁盘空间,否则就算超水位线。
一旦节点磁盘使用率超过了洪水位线,Easysearch 集群会为该节点上的所有索引添加只读锁,包括系统索引。只读锁会阻止新数据写入,当磁盘利用率低于高水位线时,只读锁会自动释放。
针对节点磁盘使用率,我们可以使用 [INFINI Console 进行节点磁盘使用率告警](https://docs.infinilabs.com/co ... usage/),便于我们及时发现问题苗头,提前进行处理。有任何问题,欢迎加我微信沟通。
关于 Easysearch

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。
官网文档:<https://infinilabs.cn/docs/latest/easysearch>
作者:杨帆,极限科技(INFINI Labs)高级解决方案架构师、《老杨玩搜索》栏目 B 站 UP 主,拥有十余年金融行业服务工作经验,熟悉 Linux、数据库、网络等领域。目前主要从事 Easysearch、Elasticsearch 等搜索引擎的技术支持工作,服务国内私有化部署的客户。