找到问题的解决办法了么?

【搜索客社区日报】第1963期 (2024-12-31)


1. 在SpringBoot里用注解生成ES index(需要梯子)
https://serdaralkancode.medium ... 807a8
2. ES 里的数据摄取管道(需要梯子)
https://medium.com/%40imadsadd ... c97ae
3. Docker 里的ES和kibana的安全设置(需要梯子)
https://umasrinivask.medium.co ... cb3fd
编辑:斯蒂文
更多资讯:http://news.searchkit.cn
 
继续阅读 »

1. 在SpringBoot里用注解生成ES index(需要梯子)
https://serdaralkancode.medium ... 807a8
2. ES 里的数据摄取管道(需要梯子)
https://medium.com/%40imadsadd ... c97ae
3. Docker 里的ES和kibana的安全设置(需要梯子)
https://umasrinivask.medium.co ... cb3fd
编辑:斯蒂文
更多资讯:http://news.searchkit.cn
  收起阅读 »

从 Elastic 迁移到 Easysearch 指引

从 Elasticsearch 迁移到 Easysearch 需要考虑多个方面,这取决于当前使用的 Elasticsearch 版本、能容忍的停机时间、应用需求等。在此背景下,我们梳理了一下通用的升级指引,方便大家进行迁移工作。

迁移路径

Elasticsearch 版本 快照兼容 推荐升级方法
5.x 使用 INFINI Console 迁移
6.x 快照恢复迁移
7.0.0 - 7.10.2 快照恢复迁移
>7.11.0 使用 INFINI Console 迁移

之前有同事做过相关测试,详情请移步这里

快照恢复迁移

  1. 部署新的 Easysearch 集群,如果有使用插件(如 IK),也一并安装。
  2. 将备份仓库注册到 Easysearch 集群。
  3. 在 Easysearch 中设置需要使用的用户名和密码信息。
  4. 原 Elasticsearch 集群进行快照备份。
  5. 在 Easysearch 集群中进行备份还原。
  6. 另部署一套应用连接 Easysearch 集群,进行数据、功能验证。
  7. 停止应用写入新的数据到 Elasticsearch。
  8. 原 Elasticsearch 集群进行快照备份。
  9. 在 Easysearch 集群中进行备份还原。
  10. 再次使用应用验证数据、功能正常。
  11. 切换,老应用下线使用新应用或者老应用修改地址连接 Easysearch 集群。

INFINI Console 迁移

  1. 部署新的 Easysearch 集群及其插件(如 IK)。
  2. 部署 INFINI Console、Gateway 程序。
  3. 将 Elasticsearch 和 Easysearch 注册到 INFINI Console 中。
  4. 在 Easysearch 中设置需要使用的用户名和密码信息。
  5. 建立数据迁移任务,对业务索引进行迁移,建议启用压缩功能。
  6. 另部署一套应用连接 Easysearch 集群,进行数据、功能验证。
  7. 停止应用写入新的数据到 Elasticsearch。
  8. 再次建立数据迁移任务,设置条件,只迁移增量数据。
  9. 再次使用应用验证数据、功能正常。
  10. 切换,老应用下线使用新应用或者老应用修改地址连接 Easysearch 集群。

客户端调整

如果要继续使用 Java High Level REST Client,建议将版本调整到 7.10.2 。当然更建议的是使用 Easysearch 的客户端,更轻更快,构建查询,跟搭积木一样简单。

开源事业

极限科技(INFINI Labs) 一直致力于为开发者和企业提供优质的开源工具,提升整个技术生态的活力。除了维护国内最流行的分词器 analysis-ik 和 analysis-pinyin ,也在不断推动更多高质量开源产品的诞生。

在极限科技成立三周年之际,公司宣布以下产品和工具已全面开源:

以上开源软件都可以在 Github 上面找到:https://github.com/infinilabs

欢迎大家一起参与到开源工具的维护、贡献当中来,别忘了 Star🌟 支持一下!!!

如果您对迁移过程有任何疑问,欢迎与我讨论。

继续阅读 »

从 Elasticsearch 迁移到 Easysearch 需要考虑多个方面,这取决于当前使用的 Elasticsearch 版本、能容忍的停机时间、应用需求等。在此背景下,我们梳理了一下通用的升级指引,方便大家进行迁移工作。

迁移路径

Elasticsearch 版本 快照兼容 推荐升级方法
5.x 使用 INFINI Console 迁移
6.x 快照恢复迁移
7.0.0 - 7.10.2 快照恢复迁移
>7.11.0 使用 INFINI Console 迁移

之前有同事做过相关测试,详情请移步这里

快照恢复迁移

  1. 部署新的 Easysearch 集群,如果有使用插件(如 IK),也一并安装。
  2. 将备份仓库注册到 Easysearch 集群。
  3. 在 Easysearch 中设置需要使用的用户名和密码信息。
  4. 原 Elasticsearch 集群进行快照备份。
  5. 在 Easysearch 集群中进行备份还原。
  6. 另部署一套应用连接 Easysearch 集群,进行数据、功能验证。
  7. 停止应用写入新的数据到 Elasticsearch。
  8. 原 Elasticsearch 集群进行快照备份。
  9. 在 Easysearch 集群中进行备份还原。
  10. 再次使用应用验证数据、功能正常。
  11. 切换,老应用下线使用新应用或者老应用修改地址连接 Easysearch 集群。

INFINI Console 迁移

  1. 部署新的 Easysearch 集群及其插件(如 IK)。
  2. 部署 INFINI Console、Gateway 程序。
  3. 将 Elasticsearch 和 Easysearch 注册到 INFINI Console 中。
  4. 在 Easysearch 中设置需要使用的用户名和密码信息。
  5. 建立数据迁移任务,对业务索引进行迁移,建议启用压缩功能。
  6. 另部署一套应用连接 Easysearch 集群,进行数据、功能验证。
  7. 停止应用写入新的数据到 Elasticsearch。
  8. 再次建立数据迁移任务,设置条件,只迁移增量数据。
  9. 再次使用应用验证数据、功能正常。
  10. 切换,老应用下线使用新应用或者老应用修改地址连接 Easysearch 集群。

客户端调整

如果要继续使用 Java High Level REST Client,建议将版本调整到 7.10.2 。当然更建议的是使用 Easysearch 的客户端,更轻更快,构建查询,跟搭积木一样简单。

开源事业

极限科技(INFINI Labs) 一直致力于为开发者和企业提供优质的开源工具,提升整个技术生态的活力。除了维护国内最流行的分词器 analysis-ik 和 analysis-pinyin ,也在不断推动更多高质量开源产品的诞生。

在极限科技成立三周年之际,公司宣布以下产品和工具已全面开源:

以上开源软件都可以在 Github 上面找到:https://github.com/infinilabs

欢迎大家一起参与到开源工具的维护、贡献当中来,别忘了 Star🌟 支持一下!!!

如果您对迁移过程有任何疑问,欢迎与我讨论。

收起阅读 »

Easysearch 可搜索快照功能,看这篇就够了

可搜索快照功能改变了我们对备份数据的查询方式。以往要查询备份数据时,要先找到备份数据所在的快照,然后在一个合适的环境中恢复快照,最后再发起请求查询数据。这个处理路径很长,而且很消耗时间。可搜索快照功能将大大简化该处理路径,节约时间。

角色设置

相信你对节点角色的概念已经有所熟悉。要启用可搜索快照功能,Easysearch 集群中必须至少有一个节点拥有 search 角色。参考设置如下。

node.roles: ["search"]
node.search.cache.size: 500mb
  • node.roles: 指定节点角色,只有 search 角色的节点才能去搜索快照中的数据。
  • node.search.cache.size: 执行快照搜索时,数据缓存大小。

混合角色设置,参考如下。

node.roles: ["master","data","search","ingest"]
node.search.cache.size: 500mb

创建快照

可搜索快照功能使用普通快照作为基础,创建快照命令不变。比如我创建且备份个 infini 索引。

# 创建 infini 索引
POST infini/_doc
{
  "test":"Searchable snapshots"
}

# 创建快照备份 infini 索引
PUT _snapshot/my-fs-repository/1
{
  "indices": "infini",
  "include_global_state": false
}

创建快照索引

可搜索快照功能的核心是搜索快照中的索引,这一步是通过快照索引实现的。为了和集群中的普通索引区别开来,我们将实际存储在快照中的索引称为快照索引。通过使用 Easysearch 的 _restore API 并指定 remote_snapshot 存储类型来创建快照索引。

创建快照索引时,注意名称不能与当前索引名称重复。通常我们备份完索引后,可删除索引释放节点磁盘空间,创建快照索引时默认使用原来的名称。

# 删除 infini 索引释放磁盘空间
DELETE infini

# 创建快照索引,使用原索引名称
POST /_snapshot/my-fs-repository/1/_restore
{
  "indices": "infini",
  "include_global_state": false,
  "include_aliases": false,
  "storage_type": "remote_snapshot"
}

创建快照索引的命令和还原快照的命令非常相似,关键在于 storage_type 参数指定 remote_snapshot 存储类型。

如果要将快照中的全部索引都创建快照索引,可省略 indices 参数。

如果想在创建快照索引时指定不同的名字,参考下面的命令。

POST /_snapshot/my-fs-repository/1/_restore
{
  "indices": "infini",
  "include_global_state": false,
  "include_aliases": false,
  "storage_type": "remote_snapshot",
  "rename_pattern": "(infini)",
  "rename_replacement": "snapshot-$1"
}
  • rename_pattern: 使用此选项指定索引匹配的正则表达式。使用捕获组重用索引名称的部分。
  • rename_replacement: 使用 $0 包括整个匹配索引名称,使用 $1 包括第一个捕获组的内容,等等。

上述命令创建出来的快照索引名称是 snapshot-infini 。

经过上面一系列的操作,我已经拥有了两个快照索引。

搜索快照索引

我们通过搜索快照索引达到搜索快照数据的目的,令人开心的是搜索快照索引和搜索普通索引的语法完全一样。😀

常见问题

如何区分普通索引和快照索引呢?

我们可以通过索引的 settings 信息区分,快照索引的 settings 信息中有 store.type: remote_snapshot 信息,普通索引没有此信息。

快照索引能写入数据吗?

快照索引无法写入,数据仍然保持在快照格式中存储在存储库中,因此可搜索快照索引本质上是只读的。 任何尝试写入可搜索快照索引的操作都会导致错误。

快照索引不想要了怎么办?

直接删除,需要时再执行创建快照索引流程。此外快照在创建快照索引后,无法直接删除快照,要先删除快照索引。

如果您对上述内容有任何疑问,欢迎与我讨论。

继续阅读 »

可搜索快照功能改变了我们对备份数据的查询方式。以往要查询备份数据时,要先找到备份数据所在的快照,然后在一个合适的环境中恢复快照,最后再发起请求查询数据。这个处理路径很长,而且很消耗时间。可搜索快照功能将大大简化该处理路径,节约时间。

角色设置

相信你对节点角色的概念已经有所熟悉。要启用可搜索快照功能,Easysearch 集群中必须至少有一个节点拥有 search 角色。参考设置如下。

node.roles: ["search"]
node.search.cache.size: 500mb
  • node.roles: 指定节点角色,只有 search 角色的节点才能去搜索快照中的数据。
  • node.search.cache.size: 执行快照搜索时,数据缓存大小。

混合角色设置,参考如下。

node.roles: ["master","data","search","ingest"]
node.search.cache.size: 500mb

创建快照

可搜索快照功能使用普通快照作为基础,创建快照命令不变。比如我创建且备份个 infini 索引。

# 创建 infini 索引
POST infini/_doc
{
  "test":"Searchable snapshots"
}

# 创建快照备份 infini 索引
PUT _snapshot/my-fs-repository/1
{
  "indices": "infini",
  "include_global_state": false
}

创建快照索引

可搜索快照功能的核心是搜索快照中的索引,这一步是通过快照索引实现的。为了和集群中的普通索引区别开来,我们将实际存储在快照中的索引称为快照索引。通过使用 Easysearch 的 _restore API 并指定 remote_snapshot 存储类型来创建快照索引。

创建快照索引时,注意名称不能与当前索引名称重复。通常我们备份完索引后,可删除索引释放节点磁盘空间,创建快照索引时默认使用原来的名称。

# 删除 infini 索引释放磁盘空间
DELETE infini

# 创建快照索引,使用原索引名称
POST /_snapshot/my-fs-repository/1/_restore
{
  "indices": "infini",
  "include_global_state": false,
  "include_aliases": false,
  "storage_type": "remote_snapshot"
}

创建快照索引的命令和还原快照的命令非常相似,关键在于 storage_type 参数指定 remote_snapshot 存储类型。

如果要将快照中的全部索引都创建快照索引,可省略 indices 参数。

如果想在创建快照索引时指定不同的名字,参考下面的命令。

POST /_snapshot/my-fs-repository/1/_restore
{
  "indices": "infini",
  "include_global_state": false,
  "include_aliases": false,
  "storage_type": "remote_snapshot",
  "rename_pattern": "(infini)",
  "rename_replacement": "snapshot-$1"
}
  • rename_pattern: 使用此选项指定索引匹配的正则表达式。使用捕获组重用索引名称的部分。
  • rename_replacement: 使用 $0 包括整个匹配索引名称,使用 $1 包括第一个捕获组的内容,等等。

上述命令创建出来的快照索引名称是 snapshot-infini 。

经过上面一系列的操作,我已经拥有了两个快照索引。

搜索快照索引

我们通过搜索快照索引达到搜索快照数据的目的,令人开心的是搜索快照索引和搜索普通索引的语法完全一样。😀

常见问题

如何区分普通索引和快照索引呢?

我们可以通过索引的 settings 信息区分,快照索引的 settings 信息中有 store.type: remote_snapshot 信息,普通索引没有此信息。

快照索引能写入数据吗?

快照索引无法写入,数据仍然保持在快照格式中存储在存储库中,因此可搜索快照索引本质上是只读的。 任何尝试写入可搜索快照索引的操作都会导致错误。

快照索引不想要了怎么办?

直接删除,需要时再执行创建快照索引流程。此外快照在创建快照索引后,无法直接删除快照,要先删除快照索引。

如果您对上述内容有任何疑问,欢迎与我讨论。

收起阅读 »

【搜索客社区日报】第1962期 (2024-12-30)

1、Easysearch 可搜索快照功能,看这篇就够了
https://infinilabs.cn/blog/202 ... icle/

2、Spring Boot 集成 Easysearch 完整指南
https://infinilabs.cn/blog/202 ... tion/

3、DeepSeek-V3 是怎么训练的|深度拆解
https://mp.weixin.qq.com/s/2M_f2ow3rfarr-vLeKbVCA

4、一文讲透 AI Agent 与 AI Workflow 的区别和深度解析:从自动化到智能化的演进
https://mp.weixin.qq.com/s/GCifbH9wGdPysu1z-n3KDA

5、从ES的JVM配置起步思考JVM常见参数优化
https://blog.csdn.net/xiaofeng ... 39510

编辑:Muse
更多资讯:http://news.searchkit.cn
继续阅读 »
1、Easysearch 可搜索快照功能,看这篇就够了
https://infinilabs.cn/blog/202 ... icle/

2、Spring Boot 集成 Easysearch 完整指南
https://infinilabs.cn/blog/202 ... tion/

3、DeepSeek-V3 是怎么训练的|深度拆解
https://mp.weixin.qq.com/s/2M_f2ow3rfarr-vLeKbVCA

4、一文讲透 AI Agent 与 AI Workflow 的区别和深度解析:从自动化到智能化的演进
https://mp.weixin.qq.com/s/GCifbH9wGdPysu1z-n3KDA

5、从ES的JVM配置起步思考JVM常见参数优化
https://blog.csdn.net/xiaofeng ... 39510

编辑:Muse
更多资讯:http://news.searchkit.cn 收起阅读 »

Spring Boot 集成 Easysearch 完整指南

Easysearch 的很多用户都有这样的需要,之前是用的 ES,现在要迁移到 Easysearch,但是业务方使用的是 Spring Boot 集成的客户端,问是否能平滑迁移。

Easysearch 是完全兼容 Spring Boot 的,完全不用修改,本指南将探讨如何将 Spring Boot 和 ES 的 high-level 客户端 与 Easysearch 进行集成,涵盖从基础设置到实现 CRUD 操作和测试的所有内容。

服务器设置

首先,需要修改 Easysearch 节点的 easysearch.yml 文件,打开并配置这 2 个配置项:

elasticsearch.api_compatibility: true

#根据客户端版本配置版本号,我这里配置成 7.17.18
elasticsearch.api_compatibility_version: "7.17.18"

项目设置

然后,让我们设置 Maven 依赖。以下是 pom.xml 中的基本配置:

<properties>
    <java.version>11</java.version>
    <spring-data-elasticsearch.version>4.4.18</spring-data-elasticsearch.version>
    <elasticsearch.version>7.17.18</elasticsearch.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
        <version>${spring-data-elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

客户端连接配置

完全和连接 Elasticsearch 的方式一样,不用修改:

配置 src/main/resources/application.yml 文件

spring:
  elasticsearch:
    rest:
      uris: https://localhost:9202
      username: admin
      password: xxxxxxxxxxx
    ssl:
      verification-mode: none

连接配置类

@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.rest.uris}")
    private String elasticsearchUrl;

    @Value("${spring.elasticsearch.rest.username}")
    private String username;

    @Value("${spring.elasticsearch.rest.password}")
    private String password;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(null, (x509Certificates, s) -> true)
                .build();

        RestClientBuilder builder = RestClient.builder(HttpHost.create(elasticsearchUrl))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setSSLContext(sslContext)
                        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));

        return new RestHighLevelClient(builder);
    }
}

领域模型

使用 Spring 的 Elasticsearch 注解定义领域模型:

@Data
@Document(indexName = "products")
public class Product {
    @Id
    private String id;

    @Field(type = FieldType.Text, name = "name")
    private String name;

    @Field(type = FieldType.Double, name = "price")
    private Double price;
}

仓库层

创建继承 ElasticsearchRepository 的仓库接口:

@Repository
@EnableElasticsearchRepositories
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
}

服务层

实现服务层来处理业务逻辑:

@Service
public class ProductService {
    private final ProductRepository productRepository;

    @Autowired
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    public Product saveProduct(Product product) {
        return productRepository.save(product);
    }

    public Product findProductById(String id) {
        return productRepository.findById(id).orElse(null);
    }
}

测试

编写集成测试类:

@SpringBootTest
public class ProductServiceIntegrationTest {
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;

    @Autowired
    private ProductService productService;

    private static final String INDEX_NAME = "products";

    @BeforeEach
    public void setUp() {
        IndexOperations indexOperations = elasticsearchOperations.indexOps(IndexCoordinates.of(INDEX_NAME));
        if (indexOperations.exists()) {
            indexOperations.delete();
        }

        // 定义 mapping
        Document mapping = Document.create()
                .append("properties", Document.create()
                        .append("name", Document.create()
                                .append("type", "text")
                                .append("analyzer", "standard"))
                        .append("price", Document.create()
                                .append("type", "double")));

        // 创建索引并应用 mapping
        indexOperations.create(Collections.EMPTY_MAP, mapping);
    }

    @Test
    public void testSaveAndFindProduct() {
         List<Product> products = Arrays.asList(
                new Product("Test Product 1", 99.99),
                new Product("Test Product 2", 199.99),
                new Product("Test Product 3", 299.99)
        );

        List<IndexQuery> queries = products.stream()
            .map(product -> new IndexQueryBuilder()
                .withObject(product)
                .withIndex(INDEX_NAME)
                .build())
            .collect(Collectors.toList());

        List<IndexedObjectInformation> indexedInfos = elasticsearchOperations.bulkIndex(
            queries,
            IndexCoordinates.of(INDEX_NAME)
        );

        // 验证结果
        List<String> ids = indexedInfos.stream()
            .map(IndexedObjectInformation::getId)
            .collect(Collectors.toList());

        assertFalse(ids.isEmpty());
        assertEquals(products.size(), ids.size());
    }
}

结论

本指南展示了 Easysearch 与 Elasticsearch 的高度兼容性:

  1. 配置方式相同,仅需启用 Easysearch 的 API 兼容模式。
  2. 可直接使用现有 Elasticsearch 客户端。
  3. Maven 依赖无需更改。
  4. API、注解和仓库接口完全兼容。
  5. 现有测试代码可直接应用。

这种兼容性使得从 Elasticsearch 迁移到 Easysearch 成为一个简单、低风险的过程。Spring Boot 项目可以几乎无缝地切换到 Easysearch,同时获得其性能和资源利用方面的优势。

关于 Easysearch

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。

官网文档:https://infinilabs.cn/docs/latest/easysearch

作者:张磊,极限科技(INFINI Labs)搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。
原文:https://infinilabs.cn/blog/2024/use-spring-boot-for-easysearch-connection/

继续阅读 »

Easysearch 的很多用户都有这样的需要,之前是用的 ES,现在要迁移到 Easysearch,但是业务方使用的是 Spring Boot 集成的客户端,问是否能平滑迁移。

Easysearch 是完全兼容 Spring Boot 的,完全不用修改,本指南将探讨如何将 Spring Boot 和 ES 的 high-level 客户端 与 Easysearch 进行集成,涵盖从基础设置到实现 CRUD 操作和测试的所有内容。

服务器设置

首先,需要修改 Easysearch 节点的 easysearch.yml 文件,打开并配置这 2 个配置项:

elasticsearch.api_compatibility: true

#根据客户端版本配置版本号,我这里配置成 7.17.18
elasticsearch.api_compatibility_version: "7.17.18"

项目设置

然后,让我们设置 Maven 依赖。以下是 pom.xml 中的基本配置:

<properties>
    <java.version>11</java.version>
    <spring-data-elasticsearch.version>4.4.18</spring-data-elasticsearch.version>
    <elasticsearch.version>7.17.18</elasticsearch.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-elasticsearch</artifactId>
        <version>${spring-data-elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

客户端连接配置

完全和连接 Elasticsearch 的方式一样,不用修改:

配置 src/main/resources/application.yml 文件

spring:
  elasticsearch:
    rest:
      uris: https://localhost:9202
      username: admin
      password: xxxxxxxxxxx
    ssl:
      verification-mode: none

连接配置类

@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
    @Value("${spring.elasticsearch.rest.uris}")
    private String elasticsearchUrl;

    @Value("${spring.elasticsearch.rest.username}")
    private String username;

    @Value("${spring.elasticsearch.rest.password}")
    private String password;

    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(username, password));

        SSLContext sslContext = SSLContexts.custom()
                .loadTrustMaterial(null, (x509Certificates, s) -> true)
                .build();

        RestClientBuilder builder = RestClient.builder(HttpHost.create(elasticsearchUrl))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setDefaultCredentialsProvider(credentialsProvider)
                        .setSSLContext(sslContext)
                        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE));

        return new RestHighLevelClient(builder);
    }
}

领域模型

使用 Spring 的 Elasticsearch 注解定义领域模型:

@Data
@Document(indexName = "products")
public class Product {
    @Id
    private String id;

    @Field(type = FieldType.Text, name = "name")
    private String name;

    @Field(type = FieldType.Double, name = "price")
    private Double price;
}

仓库层

创建继承 ElasticsearchRepository 的仓库接口:

@Repository
@EnableElasticsearchRepositories
public interface ProductRepository extends ElasticsearchRepository<Product, String> {
}

服务层

实现服务层来处理业务逻辑:

@Service
public class ProductService {
    private final ProductRepository productRepository;

    @Autowired
    public ProductService(ProductRepository productRepository) {
        this.productRepository = productRepository;
    }

    public Product saveProduct(Product product) {
        return productRepository.save(product);
    }

    public Product findProductById(String id) {
        return productRepository.findById(id).orElse(null);
    }
}

测试

编写集成测试类:

@SpringBootTest
public class ProductServiceIntegrationTest {
    @Autowired
    private ElasticsearchOperations elasticsearchOperations;

    @Autowired
    private ProductService productService;

    private static final String INDEX_NAME = "products";

    @BeforeEach
    public void setUp() {
        IndexOperations indexOperations = elasticsearchOperations.indexOps(IndexCoordinates.of(INDEX_NAME));
        if (indexOperations.exists()) {
            indexOperations.delete();
        }

        // 定义 mapping
        Document mapping = Document.create()
                .append("properties", Document.create()
                        .append("name", Document.create()
                                .append("type", "text")
                                .append("analyzer", "standard"))
                        .append("price", Document.create()
                                .append("type", "double")));

        // 创建索引并应用 mapping
        indexOperations.create(Collections.EMPTY_MAP, mapping);
    }

    @Test
    public void testSaveAndFindProduct() {
         List<Product> products = Arrays.asList(
                new Product("Test Product 1", 99.99),
                new Product("Test Product 2", 199.99),
                new Product("Test Product 3", 299.99)
        );

        List<IndexQuery> queries = products.stream()
            .map(product -> new IndexQueryBuilder()
                .withObject(product)
                .withIndex(INDEX_NAME)
                .build())
            .collect(Collectors.toList());

        List<IndexedObjectInformation> indexedInfos = elasticsearchOperations.bulkIndex(
            queries,
            IndexCoordinates.of(INDEX_NAME)
        );

        // 验证结果
        List<String> ids = indexedInfos.stream()
            .map(IndexedObjectInformation::getId)
            .collect(Collectors.toList());

        assertFalse(ids.isEmpty());
        assertEquals(products.size(), ids.size());
    }
}

结论

本指南展示了 Easysearch 与 Elasticsearch 的高度兼容性:

  1. 配置方式相同,仅需启用 Easysearch 的 API 兼容模式。
  2. 可直接使用现有 Elasticsearch 客户端。
  3. Maven 依赖无需更改。
  4. API、注解和仓库接口完全兼容。
  5. 现有测试代码可直接应用。

这种兼容性使得从 Elasticsearch 迁移到 Easysearch 成为一个简单、低风险的过程。Spring Boot 项目可以几乎无缝地切换到 Easysearch,同时获得其性能和资源利用方面的优势。

关于 Easysearch

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。

官网文档:https://infinilabs.cn/docs/latest/easysearch

作者:张磊,极限科技(INFINI Labs)搜索引擎研发负责人,对 Elasticsearch 和 Lucene 源码比较熟悉,目前主要负责公司的 Easysearch 产品的研发以及客户服务工作。
原文:https://infinilabs.cn/blog/2024/use-spring-boot-for-easysearch-connection/

收起阅读 »

INFINI Labs 产品更新 | Console/Gateway/Agent 等产品开源发布首个版本

release

INFINI Labs 产品又更新啦~,包括 Easysearch v1.9.0、Gateway、Console、Agent、Loadgen v1.27.0。本次各产品更新了很多亮点功能,如 Easysearch 新增 rollup 功能,优化了多版本兼容配置;Console/Gateway/Agent/Loadgen 及 Framework 开源后,发布首个重大更新版本,支持过期元数据删除,指标图表懒加载,指标采集协程优化等等,欢迎大家下载体验。

INFINI Easysearch v1.9.0

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。详情见:https://infinilabs.cn

Easysearch 本次更新如下:

Improvements

  • 发布 rollup 功能

    • 支持自动对 rollup 索引进行滚动,无需外部触发
    • 支持 avg sum max min value_count percentiles 指标类型的聚合
    • 支持 terms 聚合
    • 支持对指标聚合进行 Pipeline 聚合
    • 支持聚合前先对数据进行过滤
    • 进行聚合查询时支持直接搜索原始索引,不用更改搜索代码
  • 增加适配 logstash 8.x 的请求 header
  • _cat/templates 增加 lifecycle 和 rollover 列的展示

Bug fix

  • 修复 rest-api template 测试错误

INFINI Console v1.27.0

INFINI Console 是一款非常轻量级的多集群、跨版本的搜索基础设施统一管控平台。通过对流行的搜索引擎基础设施进行跨版本、多集群的集中纳管, 企业可以快速方便的统一管理企业内部的不同版本的多套搜索集群。开源地址:https://github.com/infinilabs/console

Console 在线体验: http://demo.infini.cloud (用户名/密码:readonly/readonly)。

Console 本次更新如下:

Improvements

  • 代码开源,统一采用 Github 仓库进行开发
  • 指标采集优化,由原来的单一协程采集调整为每个注册的集群有单独的协程进行采集
  • 指标监控页面图表展示采用懒加载、单个图表独立加载,增强用户体验
  • 通用时间控件增加超时时间设置
  • 集群选择控件增加注册、刷新功能
  • 提供指标采集状态
  • 表格控件排版优化

Bug fix

  • 修复集群元数据更新不及时问题
  • 修复帮助文档等链接不正确问题
  • 修复节点、索引数据因随机 id 出现重复记录问题
  • 修复 Runtime、Agent 实例编辑页面出错问题
  • 修复集群、节点、索引、分片元数据无 Loading 问题
  • 修复索引健康状态指标采集失败问题
  • 修复个别菜单列未国际化问题

INFINI Gateway v1.27.0

INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway 可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。开源地址:https://github.com/infinilabs/gateway

Gateway 本次更新如下:

Improvements

  • 调整队列消费者 slice 默认配置为 1

Bug fix

  • 修复缓存数据丢失导致队列无法消费问题
  • 同步更新 Framework 修复的一些已知问题

INFINI Agent v1.27.0

INFINI Agent 是 INFINI Console 的一个可选探针组件,负责采集和上传集群指标和日志等信息,并可通过 Console 管理。Agent 支持主流操作系统和平台,安装包轻量且无任何外部依赖,可以快速方便地安装。开源地址:https://github.com/infinilabs/agent

探针 Agent 本次更新如下:

Improvements

  • 与 INFINI Console 统一版本号
  • 同步更新 Framework 修复的已知问题
  • 支持 K8S 环境指标采集

INFINI Loadgen v1.27.0

INFINI Loadgen 是一款轻量、无依赖的 Eaysearch/Elasticsearch/OpenSearch 性能压测工具,支持参数模板化配置,支持压测端均衡流量控制,可以模拟高并发请求。开源地址:https://github.com/infinilabs/loadgen

Loadgen 本次更新如下:

Improvements

  • 保持与 Console 相同版本
  • 同步更新 Framework 修复的已知问题

Bug fix

  • 修复 API 接口测试逻辑异常问题

INFINI Framework

INFINI Framework 是 INFINI Labs 各产品依赖的核心公共代码库。开源地址:https://github.com/infinilabs/framework

Improvements

期待反馈

欢迎下载体验使用,如果您在使用过程中遇到如何疑问或者问题,欢迎前往 INFINI Labs Github(https://github.com/infinilabs) 中的对应项目中提交 Feature Request 或提交 Bug。

下载地址: https://infinilabs.cn/download

邮件hello@infini.ltd

电话(+86) 400-139-9200

Discordhttps://discord.gg/4tKTMkkvVX

也欢迎大家微信扫码添加小助手(INFINI-Labs),加入用户群一起讨论交流。

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

继续阅读 »

release

INFINI Labs 产品又更新啦~,包括 Easysearch v1.9.0、Gateway、Console、Agent、Loadgen v1.27.0。本次各产品更新了很多亮点功能,如 Easysearch 新增 rollup 功能,优化了多版本兼容配置;Console/Gateway/Agent/Loadgen 及 Framework 开源后,发布首个重大更新版本,支持过期元数据删除,指标图表懒加载,指标采集协程优化等等,欢迎大家下载体验。

INFINI Easysearch v1.9.0

INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。详情见:https://infinilabs.cn

Easysearch 本次更新如下:

Improvements

  • 发布 rollup 功能

    • 支持自动对 rollup 索引进行滚动,无需外部触发
    • 支持 avg sum max min value_count percentiles 指标类型的聚合
    • 支持 terms 聚合
    • 支持对指标聚合进行 Pipeline 聚合
    • 支持聚合前先对数据进行过滤
    • 进行聚合查询时支持直接搜索原始索引,不用更改搜索代码
  • 增加适配 logstash 8.x 的请求 header
  • _cat/templates 增加 lifecycle 和 rollover 列的展示

Bug fix

  • 修复 rest-api template 测试错误

INFINI Console v1.27.0

INFINI Console 是一款非常轻量级的多集群、跨版本的搜索基础设施统一管控平台。通过对流行的搜索引擎基础设施进行跨版本、多集群的集中纳管, 企业可以快速方便的统一管理企业内部的不同版本的多套搜索集群。开源地址:https://github.com/infinilabs/console

Console 在线体验: http://demo.infini.cloud (用户名/密码:readonly/readonly)。

Console 本次更新如下:

Improvements

  • 代码开源,统一采用 Github 仓库进行开发
  • 指标采集优化,由原来的单一协程采集调整为每个注册的集群有单独的协程进行采集
  • 指标监控页面图表展示采用懒加载、单个图表独立加载,增强用户体验
  • 通用时间控件增加超时时间设置
  • 集群选择控件增加注册、刷新功能
  • 提供指标采集状态
  • 表格控件排版优化

Bug fix

  • 修复集群元数据更新不及时问题
  • 修复帮助文档等链接不正确问题
  • 修复节点、索引数据因随机 id 出现重复记录问题
  • 修复 Runtime、Agent 实例编辑页面出错问题
  • 修复集群、节点、索引、分片元数据无 Loading 问题
  • 修复索引健康状态指标采集失败问题
  • 修复个别菜单列未国际化问题

INFINI Gateway v1.27.0

INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway 可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。开源地址:https://github.com/infinilabs/gateway

Gateway 本次更新如下:

Improvements

  • 调整队列消费者 slice 默认配置为 1

Bug fix

  • 修复缓存数据丢失导致队列无法消费问题
  • 同步更新 Framework 修复的一些已知问题

INFINI Agent v1.27.0

INFINI Agent 是 INFINI Console 的一个可选探针组件,负责采集和上传集群指标和日志等信息,并可通过 Console 管理。Agent 支持主流操作系统和平台,安装包轻量且无任何外部依赖,可以快速方便地安装。开源地址:https://github.com/infinilabs/agent

探针 Agent 本次更新如下:

Improvements

  • 与 INFINI Console 统一版本号
  • 同步更新 Framework 修复的已知问题
  • 支持 K8S 环境指标采集

INFINI Loadgen v1.27.0

INFINI Loadgen 是一款轻量、无依赖的 Eaysearch/Elasticsearch/OpenSearch 性能压测工具,支持参数模板化配置,支持压测端均衡流量控制,可以模拟高并发请求。开源地址:https://github.com/infinilabs/loadgen

Loadgen 本次更新如下:

Improvements

  • 保持与 Console 相同版本
  • 同步更新 Framework 修复的已知问题

Bug fix

  • 修复 API 接口测试逻辑异常问题

INFINI Framework

INFINI Framework 是 INFINI Labs 各产品依赖的核心公共代码库。开源地址:https://github.com/infinilabs/framework

Improvements

期待反馈

欢迎下载体验使用,如果您在使用过程中遇到如何疑问或者问题,欢迎前往 INFINI Labs Github(https://github.com/infinilabs) 中的对应项目中提交 Feature Request 或提交 Bug。

下载地址: https://infinilabs.cn/download

邮件hello@infini.ltd

电话(+86) 400-139-9200

Discordhttps://discord.gg/4tKTMkkvVX

也欢迎大家微信扫码添加小助手(INFINI-Labs),加入用户群一起讨论交流。

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

收起阅读 »

【搜索客社区日报】 第1961期 (2024-12-27)

1、深入探究:在 OpenSearch 向量引擎中启用可插入存储
https://opensearch.org/blog/en ... ordb/

2、给 Postgres 写一个向量插件 - 向量类型
https://infinilabs.cn/blog/202 ... type/

3、带有 Elasticsearch 和 Langchain 的 Agentic RAG
https://mp.weixin.qq.com/s/kPcO5CTNAerClQ1yGAgfzA

4、GitLab 停止为中国区用户提供 GitLab.com 账号服务
https://news.cnblogs.com/n/782782/

5、活动回顾 - 第5期 搜索客 Meetup 线上直播活动圆满结束,附 PPT 下载和视频回放
https://elasticsearch.cn/article/15335

编辑:Fred
更多资讯:http://news.searchkit.cn
继续阅读 »
1、深入探究:在 OpenSearch 向量引擎中启用可插入存储
https://opensearch.org/blog/en ... ordb/

2、给 Postgres 写一个向量插件 - 向量类型
https://infinilabs.cn/blog/202 ... type/

3、带有 Elasticsearch 和 Langchain 的 Agentic RAG
https://mp.weixin.qq.com/s/kPcO5CTNAerClQ1yGAgfzA

4、GitLab 停止为中国区用户提供 GitLab.com 账号服务
https://news.cnblogs.com/n/782782/

5、活动回顾 - 第5期 搜索客 Meetup 线上直播活动圆满结束,附 PPT 下载和视频回放
https://elasticsearch.cn/article/15335

编辑:Fred
更多资讯:http://news.searchkit.cn 收起阅读 »

「玩转社区运营,实习先行」加入极限科技(INFINI Labs)开启成长之旅!

极限科技诚招社区运营实习生!

我们为你提供全面参与国内外社区运营建设的机会,负责从内容策划到数据分析的多维度实践,与团队一起探索技术与产品的无限可能!

如果你还不了解 极限科技(INFINI Labs) 是谁,在做什么,需要什么样的小伙伴,那么请看下文:

我们是谁

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

我们在做什么

极限科技(INFINI Labs)正在致力于以下几个核心方向:

1、开发近实时搜索引擎 INFINI Easysearch
INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。详情参见:https://infinilabs.cn

2、打造下一代实时搜索引擎 INFINI Pizza
INFINI Pizza 是一个分布式混合搜索数据库系统。我们的使命是充分利用现代硬件和人工智能的潜力,为企业提供量身定制的实时智能搜索体验。我们致力于满足具有挑战性的环境中高并发和高吞吐量的需求,同时提供无缝高效的搜索功能。详情参见:https://pizza.rs

3、打造 Coco AI — 搜索、连接、协作
Coco AI 是一个人工智能驱动的统一的搜索平台,将您的所有企业应用程序和数据(如 Google Workspace、Dropbox、Confluent Wiki、GitHub 等)整合到一个强大而简洁的搜索界面中。本项目包含适用于桌面和移动端的 COCO 应用,用户可以通过该应用在不同平台上搜索并与企业数据互动。详情参见:https://coco.rs

4、积极参与全球开源生态建设
通过开源 Coco AI、Console、Gateway、Agent、Loadgen 等搜索领域产品和社区贡献,推动全球开源技术的发展,提升中国在全球开源领域的影响力。INFINI Labs Github 主页:https://github.com/infinilabs

5、提供专业服务
为客户提供包括搜索技术支持、迁移服务、定制解决方案和培训在内的全方位服务。

6、提供国产化搜索解决方案
针对中国市场的特殊需求,提供符合国产化标准的搜索产品和解决方案,帮助客户解决使用 Elasticsearch 时遇到的挑战。

极限科技(INFINI Labs)通过这些努力,旨在成为全球领先的实时搜索和数据分析解决方案提供商。

我们期待有才华、有激情的你加入我们,一起探索数据搜索的未来,共同创造无限可能!

在招岗位介绍

岗位名称

社区运营实习生

岗位职责

  1. 全面参与公司旗下国内外社区运营和建设;
  2. 负责社区内容的策划、文案、编辑,围绕团队成果产出技术解读文章,通过公众号、博客、社区等形式进行内容运营,提升公司的影响力;
  3. 数据化运营,包括分析官网访问、下载等数据指标,根据数据反馈及时调整策略,提升运营效果;
  4. 与团队合作,维护更新产品技术相关介绍文档,帮助用户深入理解技术理念和技术优势;
  5. 定期与社区用户、媒体沟通,保持通畅的聆听反馈机制;
  6. 参与策划、组织及执行团队主办或承办的各类社区活动;

岗位要求

  1. 本科及以上学历在读,新闻、营销、传媒、计算机等相关专业优先;
  2. 具备协作意识和沟通能力,能跨职能、跨部门协调跟进问题;
  3. 具备较强的创意和策划能力、应变能力、语言和文字表达能力以及敏锐的市场触角;
  4. 对搜索技术、互联网产品及工具类信息怀有浓厚兴趣,具备快速学习并熟练掌握相关知识的能力,能够紧跟行业动态;
  5. 具有用户增长、数据分析、数据挖掘、信息检索经验者优先;
  6. 具有开源社区、开发者社区、开源媒体运营经验者优先;
  7. 英语听说写流畅优先;
  8. 熟悉 Github、Git、Markdown 优先;
  9. 至少实习 6 个月以上,城市不限,可接受远程办公;

我们提供

  1. 广阔的职业发展空间和晋升机会;
  2. 与一群充满激情和创造力的团队一起工作的机会;
  3. 不断学习和成长的机会,包括内部培训和外部学习资源;

简历投递

  1. 邮件:hello@infini.ltd(邮件标题请备注姓名+求职岗位)
  2. 微信:INFINI-Labs (加微请备注求职岗位)

如果你:

  • 对互联网、搜索技术充满兴趣;
  • 擅长创意策划与内容输出;
  • 乐于跨部门协作,具备良好的沟通能力;

那就快来加入我们吧!

继续阅读 »

极限科技诚招社区运营实习生!

我们为你提供全面参与国内外社区运营建设的机会,负责从内容策划到数据分析的多维度实践,与团队一起探索技术与产品的无限可能!

如果你还不了解 极限科技(INFINI Labs) 是谁,在做什么,需要什么样的小伙伴,那么请看下文:

我们是谁

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

我们在做什么

极限科技(INFINI Labs)正在致力于以下几个核心方向:

1、开发近实时搜索引擎 INFINI Easysearch
INFINI Easysearch 是一个分布式的搜索型数据库,实现非结构化数据检索、全文检索、向量检索、地理位置信息查询、组合索引查询、多语种支持、聚合分析等。Easysearch 可以完美替代 Elasticsearch,同时添加和完善多项企业级功能。Easysearch 助您拥有简洁、高效、易用的搜索体验。详情参见:https://infinilabs.cn

2、打造下一代实时搜索引擎 INFINI Pizza
INFINI Pizza 是一个分布式混合搜索数据库系统。我们的使命是充分利用现代硬件和人工智能的潜力,为企业提供量身定制的实时智能搜索体验。我们致力于满足具有挑战性的环境中高并发和高吞吐量的需求,同时提供无缝高效的搜索功能。详情参见:https://pizza.rs

3、打造 Coco AI — 搜索、连接、协作
Coco AI 是一个人工智能驱动的统一的搜索平台,将您的所有企业应用程序和数据(如 Google Workspace、Dropbox、Confluent Wiki、GitHub 等)整合到一个强大而简洁的搜索界面中。本项目包含适用于桌面和移动端的 COCO 应用,用户可以通过该应用在不同平台上搜索并与企业数据互动。详情参见:https://coco.rs

4、积极参与全球开源生态建设
通过开源 Coco AI、Console、Gateway、Agent、Loadgen 等搜索领域产品和社区贡献,推动全球开源技术的发展,提升中国在全球开源领域的影响力。INFINI Labs Github 主页:https://github.com/infinilabs

5、提供专业服务
为客户提供包括搜索技术支持、迁移服务、定制解决方案和培训在内的全方位服务。

6、提供国产化搜索解决方案
针对中国市场的特殊需求,提供符合国产化标准的搜索产品和解决方案,帮助客户解决使用 Elasticsearch 时遇到的挑战。

极限科技(INFINI Labs)通过这些努力,旨在成为全球领先的实时搜索和数据分析解决方案提供商。

我们期待有才华、有激情的你加入我们,一起探索数据搜索的未来,共同创造无限可能!

在招岗位介绍

岗位名称

社区运营实习生

岗位职责

  1. 全面参与公司旗下国内外社区运营和建设;
  2. 负责社区内容的策划、文案、编辑,围绕团队成果产出技术解读文章,通过公众号、博客、社区等形式进行内容运营,提升公司的影响力;
  3. 数据化运营,包括分析官网访问、下载等数据指标,根据数据反馈及时调整策略,提升运营效果;
  4. 与团队合作,维护更新产品技术相关介绍文档,帮助用户深入理解技术理念和技术优势;
  5. 定期与社区用户、媒体沟通,保持通畅的聆听反馈机制;
  6. 参与策划、组织及执行团队主办或承办的各类社区活动;

岗位要求

  1. 本科及以上学历在读,新闻、营销、传媒、计算机等相关专业优先;
  2. 具备协作意识和沟通能力,能跨职能、跨部门协调跟进问题;
  3. 具备较强的创意和策划能力、应变能力、语言和文字表达能力以及敏锐的市场触角;
  4. 对搜索技术、互联网产品及工具类信息怀有浓厚兴趣,具备快速学习并熟练掌握相关知识的能力,能够紧跟行业动态;
  5. 具有用户增长、数据分析、数据挖掘、信息检索经验者优先;
  6. 具有开源社区、开发者社区、开源媒体运营经验者优先;
  7. 英语听说写流畅优先;
  8. 熟悉 Github、Git、Markdown 优先;
  9. 至少实习 6 个月以上,城市不限,可接受远程办公;

我们提供

  1. 广阔的职业发展空间和晋升机会;
  2. 与一群充满激情和创造力的团队一起工作的机会;
  3. 不断学习和成长的机会,包括内部培训和外部学习资源;

简历投递

  1. 邮件:hello@infini.ltd(邮件标题请备注姓名+求职岗位)
  2. 微信:INFINI-Labs (加微请备注求职岗位)

如果你:

  • 对互联网、搜索技术充满兴趣;
  • 擅长创意策划与内容输出;
  • 乐于跨部门协作,具备良好的沟通能力;

那就快来加入我们吧!

收起阅读 »

【搜索客社区日报】 第1960期 (2024-12-26)

1.我与vLLM的2024:清华大佬的vLLM开发之路
https://mp.weixin.qq.com/s/i8BEibOysvf7dgLE4a8rPg
2.聊聊 GPU 监控那些事:利用率 & 故障等
https://mp.weixin.qq.com/s/geSijcXTAA8zoEM60F968w
3.万字长文梳理 2024 年的 RAG
https://mp.weixin.qq.com/s/9H4ZgqaB_q_FX2DzaaHPmQ

编辑:Se7en
更多资讯:http://news.searchkit.cn
继续阅读 »
1.我与vLLM的2024:清华大佬的vLLM开发之路
https://mp.weixin.qq.com/s/i8BEibOysvf7dgLE4a8rPg
2.聊聊 GPU 监控那些事:利用率 & 故障等
https://mp.weixin.qq.com/s/geSijcXTAA8zoEM60F968w
3.万字长文梳理 2024 年的 RAG
https://mp.weixin.qq.com/s/9H4ZgqaB_q_FX2DzaaHPmQ

编辑:Se7en
更多资讯:http://news.searchkit.cn 收起阅读 »

活动回顾 - 第5期 搜索客 Meetup 线上直播活动圆满结束,附 PPT 下载和视频回放

2024 年 12 月 20 日,由搜索客社区和极限科技(INFINI Labs)联合举办的第 5 期线上 Meetup 技术交流直播活动圆满结束。本期 Meetup 直播活动吸引了超过 800+ 人次的技术爱好者观看参与,共同学习讨论了 Elasticsearch 多集群管理工具 INFINI Console 的使用和运维经验。

20241220-182853.png

本期 Meetup 活动主题

本期 Meetup 活动由极限科技(INFINI Labs)技术专家 罗厚付老师 为大家带来了主题为《最强开源 Elasticsearch 多集群管理工具 INFINI Console - 动手实战》精彩分享。

1735028959857.jpg

罗老师首先介绍了 INFINI Labs 最近开源的产品和基本情况。然后,详细讲解了 Agent 采集指标的配置方法,包括如何自动注册到平台、监控告警配置等。接着讲解了 INFINI Console 源码编译的步骤和要求,如创建编译路径、克隆代码等。最后回答了观众提的相关问题,并强调了开源建设的重要性。

以下为摘取 PPT 部分内容截图:

1735029060156.jpg

1735029140190.jpg

1735035341200.jpg

精选问答

分享过程中,直播间的小伙伴对分享内容非常感兴趣,纷纷在评论区留言提问,下面摘取部分问答:

问 1:Console 开源代码 web 模块没有跑起来 ‍‍‍
答:web 模块编译需要指定 node 版本 v16.20.2

问 2:开源后的 Console 所有功能都免费吗?哪些是收费功能
答:开源的代码所有功能都是免费的,收费功能后续将在企业版中提供。

问 3:Console 支持的 Elasticsearch 版本是多少?
答:支持多版本 Elasticsearch,包含 1.x、2.x、5.x、6.x、7.x、8.x;同时还支持 OpenSearch 和 Easysearch 多集群接入纳管

问 4:Console 自采与 Agent 采集监控指标有什么区别,对性能影响如何?
答:区别还是挺大的,Console 是直接从集群层面采集指标,而不是在每个节点上单独采集。这种方式可能会涉及到在整个集群中循环,收集所有节点的信息。Agent 采集方式即在每个节点上安装一个 Agent,每个 Agent 负责收集所在节点的指标,并将其推送到存储或 Gateway。这种方式可以减轻集群的压力,因为不需要在集群中循环收集数据,即使一个节点出现异常,其他节点的指标采集仍然可以正常进行。

问 5:Agent 可以采集日志吗?还是只采集指标呢?
答:都支持

问 6:生产环境是建议通过 Agent 方式采集吗?
答:是的,生产环境建议使用 Agent,稳定且内存占用低。‍‍‍‍‍‍‍‍‍‍‍ 我这边管理的一个集群的话大概是有 300 多个节点吧。大概是装 300 多个探针的样子,探针的资源消耗是有做一个限制的,内存是限制在这个 50M 以下。

问 7:如果是 k8s 的话, Agent 是怎么匹配的呢?安装在 pod 层面吗?
答:对,我们现在的打的镜像是包含了这个 Agent 的,然后可以环境变量去控制开关。

问 8:Console 后面会支持备份恢复吗?大概什么时候上线
答:‍‍‍‍‍‍‍‍ 这个目前还不支持,在计划中,没有那么快上线。如果说你只是运维的一个需求的话可以通过相关 API 命令去操作备份恢复。

问 9:Console 告警规则能多提供一些吗?
答:目前内置的告警规则主要是一些基础的通用的场景,欢迎大家提供一些实际的一些业务需求,然后提个 Issue,共同建设和维护。(https://github.com/infinilabs/console/issues

问 10:Easysearch 的审计日志采集了哪些信息?
答:主要包含用户的操作记录,然后用户的一些登录时间 IP 相关的一些信息是都有的。

罗老师从专业的角度热情耐心的一一进行了解答,获得了在线观众的点赞好评。(如大家对以上问题有见解或提出其他 Console 相关问题,也欢迎在本文评论区留言讨论,我们将从评论区中挑选精彩评论给予礼品赠送 😄)

同时,在整个直播过程中,主持人进行了多轮激动人心的抽奖活动,为参会小伙伴带来了额外的惊喜。

最后感谢大家的参与和支持,让我们共同期待下一次 搜索客 Meetup 活动带来更多的精彩内容!

本期 Meetup 的 PPT 下载

本期 PPT 下载的链接:https://searchkit.cn/slides/328

本期 Meetup 视频回放

扫码关注极限实验室视频号查看直播回放,或者扫码关注极限实验室 B 站 账号,可查看本期 Meetup 活动视频。我们也会在视频号、B 站持续更新最新技术视频,欢迎通过点赞、投币,收藏,三连来支持我们。

Meetup_视频回放.jpg

Meetup 活动讲师招募

讲师招募

搜索客社区 Meetup 的成功举办,离不开社区小伙伴的热情参与。目前社区讲师招募计划也在持续进行中,我们诚挚邀请各位技术大咖、行业精英踊跃提交演讲议题,与大家分享您的经验。

讲师报名链接:http://cfp.searchkit.cn
或扫描下方二维码,立刻报名成为讲师!

Meetup 活动聚焦 AI 与搜索领域的最新动态,以及数据实时搜索分析、向量检索、技术实践与案例分析、日志分析、安全等领域的深度探讨。

我们热切期待您的精彩分享!

关于 搜索客(SearchKit)社区

搜索客社区由 Elasticsearch 中文社区进行全新的品牌升级,以新的 Slogan:“搜索人自己的社区” 为宣言。汇集搜索领域最新动态、精选干货文章、精华讨论、文档资料、翻译与版本发布等,为广大搜索领域从业者提供更为丰富便捷的学习和交流平台。社区官网:https://searchkit.cn

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

继续阅读 »

2024 年 12 月 20 日,由搜索客社区和极限科技(INFINI Labs)联合举办的第 5 期线上 Meetup 技术交流直播活动圆满结束。本期 Meetup 直播活动吸引了超过 800+ 人次的技术爱好者观看参与,共同学习讨论了 Elasticsearch 多集群管理工具 INFINI Console 的使用和运维经验。

20241220-182853.png

本期 Meetup 活动主题

本期 Meetup 活动由极限科技(INFINI Labs)技术专家 罗厚付老师 为大家带来了主题为《最强开源 Elasticsearch 多集群管理工具 INFINI Console - 动手实战》精彩分享。

1735028959857.jpg

罗老师首先介绍了 INFINI Labs 最近开源的产品和基本情况。然后,详细讲解了 Agent 采集指标的配置方法,包括如何自动注册到平台、监控告警配置等。接着讲解了 INFINI Console 源码编译的步骤和要求,如创建编译路径、克隆代码等。最后回答了观众提的相关问题,并强调了开源建设的重要性。

以下为摘取 PPT 部分内容截图:

1735029060156.jpg

1735029140190.jpg

1735035341200.jpg

精选问答

分享过程中,直播间的小伙伴对分享内容非常感兴趣,纷纷在评论区留言提问,下面摘取部分问答:

问 1:Console 开源代码 web 模块没有跑起来 ‍‍‍
答:web 模块编译需要指定 node 版本 v16.20.2

问 2:开源后的 Console 所有功能都免费吗?哪些是收费功能
答:开源的代码所有功能都是免费的,收费功能后续将在企业版中提供。

问 3:Console 支持的 Elasticsearch 版本是多少?
答:支持多版本 Elasticsearch,包含 1.x、2.x、5.x、6.x、7.x、8.x;同时还支持 OpenSearch 和 Easysearch 多集群接入纳管

问 4:Console 自采与 Agent 采集监控指标有什么区别,对性能影响如何?
答:区别还是挺大的,Console 是直接从集群层面采集指标,而不是在每个节点上单独采集。这种方式可能会涉及到在整个集群中循环,收集所有节点的信息。Agent 采集方式即在每个节点上安装一个 Agent,每个 Agent 负责收集所在节点的指标,并将其推送到存储或 Gateway。这种方式可以减轻集群的压力,因为不需要在集群中循环收集数据,即使一个节点出现异常,其他节点的指标采集仍然可以正常进行。

问 5:Agent 可以采集日志吗?还是只采集指标呢?
答:都支持

问 6:生产环境是建议通过 Agent 方式采集吗?
答:是的,生产环境建议使用 Agent,稳定且内存占用低。‍‍‍‍‍‍‍‍‍‍‍ 我这边管理的一个集群的话大概是有 300 多个节点吧。大概是装 300 多个探针的样子,探针的资源消耗是有做一个限制的,内存是限制在这个 50M 以下。

问 7:如果是 k8s 的话, Agent 是怎么匹配的呢?安装在 pod 层面吗?
答:对,我们现在的打的镜像是包含了这个 Agent 的,然后可以环境变量去控制开关。

问 8:Console 后面会支持备份恢复吗?大概什么时候上线
答:‍‍‍‍‍‍‍‍ 这个目前还不支持,在计划中,没有那么快上线。如果说你只是运维的一个需求的话可以通过相关 API 命令去操作备份恢复。

问 9:Console 告警规则能多提供一些吗?
答:目前内置的告警规则主要是一些基础的通用的场景,欢迎大家提供一些实际的一些业务需求,然后提个 Issue,共同建设和维护。(https://github.com/infinilabs/console/issues

问 10:Easysearch 的审计日志采集了哪些信息?
答:主要包含用户的操作记录,然后用户的一些登录时间 IP 相关的一些信息是都有的。

罗老师从专业的角度热情耐心的一一进行了解答,获得了在线观众的点赞好评。(如大家对以上问题有见解或提出其他 Console 相关问题,也欢迎在本文评论区留言讨论,我们将从评论区中挑选精彩评论给予礼品赠送 😄)

同时,在整个直播过程中,主持人进行了多轮激动人心的抽奖活动,为参会小伙伴带来了额外的惊喜。

最后感谢大家的参与和支持,让我们共同期待下一次 搜索客 Meetup 活动带来更多的精彩内容!

本期 Meetup 的 PPT 下载

本期 PPT 下载的链接:https://searchkit.cn/slides/328

本期 Meetup 视频回放

扫码关注极限实验室视频号查看直播回放,或者扫码关注极限实验室 B 站 账号,可查看本期 Meetup 活动视频。我们也会在视频号、B 站持续更新最新技术视频,欢迎通过点赞、投币,收藏,三连来支持我们。

Meetup_视频回放.jpg

Meetup 活动讲师招募

讲师招募

搜索客社区 Meetup 的成功举办,离不开社区小伙伴的热情参与。目前社区讲师招募计划也在持续进行中,我们诚挚邀请各位技术大咖、行业精英踊跃提交演讲议题,与大家分享您的经验。

讲师报名链接:http://cfp.searchkit.cn
或扫描下方二维码,立刻报名成为讲师!

Meetup 活动聚焦 AI 与搜索领域的最新动态,以及数据实时搜索分析、向量检索、技术实践与案例分析、日志分析、安全等领域的深度探讨。

我们热切期待您的精彩分享!

关于 搜索客(SearchKit)社区

搜索客社区由 Elasticsearch 中文社区进行全新的品牌升级,以新的 Slogan:“搜索人自己的社区” 为宣言。汇集搜索领域最新动态、精选干货文章、精华讨论、文档资料、翻译与版本发布等,为广大搜索领域从业者提供更为丰富便捷的学习和交流平台。社区官网:https://searchkit.cn

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

收起阅读 »

【搜索客社区日报】 第1959期 (2024-12-25)

1.ElasticSearch性能调优最佳实践
https://mp.weixin.qq.com/s/crvAiHdbkDMFySS9x8tydQ
2.GraphRAG:用知识图增强RAG(搭梯)
https://medium.com/%40zilliz_l ... f99e1
3.如何在 Elasticsearch 中高效索引超过 1000 万条记录(搭梯)
https://medium.com/%40icreon/h ... ca87b
4.深入探讨高质量重排器及其性能优化:Elastic Rerank模型的实战评估
https://cloud.tencent.com/deve ... 77172
5.超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
https://developer.aliyun.com/article/1625901



编辑:kin122 
更多资讯:http://news.searchkit.cn
继续阅读 »
1.ElasticSearch性能调优最佳实践
https://mp.weixin.qq.com/s/crvAiHdbkDMFySS9x8tydQ
2.GraphRAG:用知识图增强RAG(搭梯)
https://medium.com/%40zilliz_l ... f99e1
3.如何在 Elasticsearch 中高效索引超过 1000 万条记录(搭梯)
https://medium.com/%40icreon/h ... ca87b
4.深入探讨高质量重排器及其性能优化:Elastic Rerank模型的实战评估
https://cloud.tencent.com/deve ... 77172
5.超越传统模型:从零开始构建高效的日志分析平台——基于Elasticsearch的实战指南
https://developer.aliyun.com/article/1625901



编辑:kin122 
更多资讯:http://news.searchkit.cn 收起阅读 »

【搜索客社区日报】第1958期 (2024-12-24)


平安夜咯铁子们,hohoho
1. Logstash面试题(需要梯子)
https://medium.com/%40romantic ... a1432
2. 用ES来提升RAG的效果,顶不顶(需要梯子)
https://medium.com/%40raphy.26 ... 26c63
3. 别说我没教你啊,全干工程师们会用Nodejs连接Elasticsearch了(需要梯子)
https://medium.com/%40arnabgol ... 5270b
 
编辑:斯蒂文
更多资讯:http://news.searchkit.cn
 
继续阅读 »

平安夜咯铁子们,hohoho
1. Logstash面试题(需要梯子)
https://medium.com/%40romantic ... a1432
2. 用ES来提升RAG的效果,顶不顶(需要梯子)
https://medium.com/%40raphy.26 ... 26c63
3. 别说我没教你啊,全干工程师们会用Nodejs连接Elasticsearch了(需要梯子)
https://medium.com/%40arnabgol ... 5270b
 
编辑:斯蒂文
更多资讯:http://news.searchkit.cn
  收起阅读 »

【搜索客社区日报】第1957期 (2024-12-23)

1. INFINI Labs 产品更新 | Console/Gateway/Agent 等产品开源发布首个版本
https://infinilabs.cn/blog/2024/release-20241213/

2. Easysearch Chart Admin 密码自定义
https://infinilabs.cn/blog/202 ... stom/

3. Elasticsearch VS Easysearch 性能测试
https://infinilabs.cn/blog/202 ... ting/

4. Easysearch Java SDK 2.0.x 使用指南(三)
https://infinilabs.cn/blog/202 ... nt-3/

5. RAG效果不好怎么办?试试这八大解决方案(含代码)
https://mp.weixin.qq.com/s/XiHXa6Jwak_Ps2QTABdQ2w

编辑:Muse
更多资讯:http://news.searchkit.cn
继续阅读 »
1. INFINI Labs 产品更新 | Console/Gateway/Agent 等产品开源发布首个版本
https://infinilabs.cn/blog/2024/release-20241213/

2. Easysearch Chart Admin 密码自定义
https://infinilabs.cn/blog/202 ... stom/

3. Elasticsearch VS Easysearch 性能测试
https://infinilabs.cn/blog/202 ... ting/

4. Easysearch Java SDK 2.0.x 使用指南(三)
https://infinilabs.cn/blog/202 ... nt-3/

5. RAG效果不好怎么办?试试这八大解决方案(含代码)
https://mp.weixin.qq.com/s/XiHXa6Jwak_Ps2QTABdQ2w

编辑:Muse
更多资讯:http://news.searchkit.cn 收起阅读 »

Elasticseach Ingest 模块&&漏洞分析

本文基于Elasticsearch7.10.2分析

0.Ingest 节点 概述

在实际进行文档index之前,使用采集节点(默认情况下,每个es节点都是ingest)对文档进行预处理。采集节点会拦截bulk和index请求,进行转换,然后将文档传回index或bulk API。

每个索引都有index.default_pipeline 和 index.final_pipeline 两个配置。

他们都是用于指定 Elasticsearch index 或者bulk 文档时要执行的预处理逻辑。

  • index.default_pipeline 定义了默认管道,它会在索引文档时首先执行。但如果索引请求中指定了 pipeline 参数,则该参数指定的管道会覆盖默认管道。如果设置了 index.default_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行任何摄取管道。
  • index.final_pipeline 定义了最终管道,它总是在请求管道(如果指定)和默认管道(如果存在)之后执行。如果设置了 index.final_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行最终管道。

简而言之,default_pipeline 先执行,可被覆盖;final_pipeline 后执行,不可被覆盖。两者都可设为 _none 以禁用。

一个Pipeline可以有多个Processor组成,每个Processor有着各自的不同功能,官方支持的Processor可参考:https://www.elastic.co/guide/en/elasticsearch/reference/7.10/ingest-processors.html

一个简单的例子,利用Set Processor 对新增的文档中加入新的字段和值:

PUT _ingest/pipeline/set_os
{
  "description": "sets the value of host.os.name from the field os",
  "processors": [
    {
      "set": {
        "field": "host.os.name",  // 增加的属性
        "value": "{{os}}"   // 这里引用了文档原先的os属性, 这里可以直接填写其他值
      }
    }
  ]
}

POST _ingest/pipeline/set_os/_simulate
{
  "docs": [
    {
      "_source": {
        "os": "Ubuntu"
      }
    }
  ]
}

这样转换之后,文档内容就变成了:

{
  "host" : {
    "os" : {
      "name" : "Ubuntu"
    }
  },
  "os" : "Ubuntu"
}

写单个文档的流程概述

没有指定pipeline.png

当请求,或者索引本身配置有pipline的时候,协调节点就会转发到ingest节点

PSOT source_index/_doc?pipeline=set_os
{
  "os": "xxxx"
}

指定了pipeline.png

【注】 并不是一定会发生内部rpc的请求转发,如果本地节点能接受当前的请求则不会转发到其他节点。

1. 模块总体概述

本小节关注IngestService中重要的相关类,对这些类有一个整体的了解有助于理解该模块。

  • ClusterService
    • IngestService 实现了 ClusterStateApplier 接口, 这样就能监听和响应集群的状态变化,当集群状态更新时,IngestService可以调整其内部 pipelines完成CRUD。
    • 另外IngestService还有List<Consumer>用来对提供给对集群状态变更之后需要最新状态的插件。
  • ScriptService
    • 某些Processor需要其用于管理和执行脚本,比如Script Processor。
  • AnalysisRegistry
    • 某些需要对文档内容进行分词处理的Processor。
  • ThreadPool
    • Processor都是异步执行的,实际执行线程池取决于调用上下文(如 write 或 management)
      • bulk API 时发生的pipeline 处理使用的是write线程池。
      • pipeline/_simulate API 使用的是management线程池,模拟执行通常是短时间的、低频的任务,不需要高并发支持而且为了不影响实际的文档处理或其他重要任务。
    • 另外为了避免Grok Processor运行时间过长,使用了Generic线程做定时调度检查执行时间
  • IngestMetric
    • 通过实现ReportingService接口来做到展示ingest内部的执行情况。
    • GET _nodes/stats?filter_path=nodes.*.ingest 可以查看到ingest 中的每个Pipeline中的执行的次数、失败次数以及总耗时
  • IngestPlugin
    • Ingest支持加载自定义的Processor插件,系统内置的所有Processor以及自定义的都通过IngestService 中的Map<String, Processor.Factory> processorFactories来进行管理。
  • IngestDocument
    • 其包含文档的源数据,提供了修改和查询文档的字段的能力,为Processor灵活操作文档数据提供基础,在后续pipeline的执行中,也是由其的executePipeline方法驱动的。

2. Processor 实现机制

抽象工厂设计模式的应用

Processor接口设计

每个Processor都有核心方法execute,使得处理器能够以统一的方式操作 IngestDocument,并通过多态实现不同处理逻辑。

Processor.Factory的设计

Processor.Factory 是 Processor 的抽象工厂,负责动态创建处理器实例。其主要职责包括:

  • 动态实例化
    • Processor create() 方法接收处理器配置并创建具体的处理器。
    • 支持递归创建,例如ConditionalProcessor可以嵌套其他处理器。
  • 依赖注入
    • Processor.Parameters 提供了一组服务和工具(如 ScriptService),工厂可以利用这些依赖创建复杂的Processor。

Processor.Factory的集中管理

在 IngestService 中,通过 Map<String, Processor.Factory> processorFactories 集中管理所有处理器工厂。这种管理方式提供了以下优势:

动态扩展

  • 插件可以注册自定义Processor工厂。
  • 新处理器类型的注册仅需添加到 processorFactories。

组合以及装饰器设计模式的应用

Processor经典的几个类的关系如下:

processor关系类图.png

CompoundProcessor是经典的组合设计模式,Pipeline这个类可以像使用单个 Processor 一样调用 CompoundProcessor,无需关注其内部具体细节。

而ConditionalProcessor 以及TrackingResultProcessor则体现了装饰器模式,在不改变原有对象的情况下扩展功能:

  • ConditionalProcessor 在执行Processor前会调用evaluate方法判断是否需要执行。
  • TrackingResultProcessor中decorate是为 CompoundProcessor 及其内部的 Processor 添加跟踪功能。

如何自定义Processor 插件

自定义 Processor 插件的注册方式为实现 IngestPlugin (Elasticsearch 提供不同的插件接口用来扩展不同类型的功能)的 getProcessors 方法,该方法返回一个工厂列表,IngestService 会将这些工厂注入到 processorFactories 中。

分析完代码之后,我们回到实战中来,简单起见,我们实现类似Append的Processor,但是我们这个更简单,输入的是字符串,然后我们用,分割一下将其作为数组设为值。

import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.HashMap;
import java.util.Map;
public class AddArrayProcessorPlugin extends Plugin implements IngestPlugin {
    @Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
        Map<String, Processor.Factory> processors = new HashMap<>();
        processors.put(AddArrayProcessor.TYPE, new AddArrayProcessor.Factory());
        return processors;
    }
}
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.*;

public class AddArrayProcessor extends AbstractProcessor {
    public static final String TYPE = "add_array";
    public String field;
    public String value;
    protected AddArrayProcessor(String tag, String description, String field, String value) {
        super(tag, description);
        this.field = field;
        this.value = value;
    }
    @Override
    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
        List valueList = new ArrayList<>(Arrays.asList(value.split(",")));
        ingestDocument.setFieldValue(field, valueList);
        return ingestDocument;
    }
    @Override
    public String getType() {
        return TYPE;
    }
    public static final class Factory implements Processor.Factory {
        @Override
        public Processor create(Map<String, Processor.Factory> processorFactories, String tag,
                                String description, Map<String, Object> config) throws Exception {
            String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
            String value = ConfigurationUtils.readStringProperty(TYPE, tag, config, "value");
            return new AddArrayProcessor(tag, description, field, value);
        }
    }
}

打包之后,我们去install我们的Processor插件:

bin/elasticsearch-plugin install file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip 
warning: no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release
-> Installing file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
-> Downloading file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
[=================================================] 100%   
-> Installed AddArrayProcess

测试一下:

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
       {
        "add_array": {
          "field": "test_arr",
          "value": "a,b,c,d,e,f"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "test",
      "_id": "1",
      "_source": {
        "field": "value"
      }
    }
  ]
}
docs结果:
"_source" : {
          "field" : "value",
          "test_arr" : [
            "a",
            "b",
            "c",
            "d",
            "e",
            "f"
          ]
        }

3. Pipeline设计

如何管理Pipeline

在IngestServcie中有一个Map存储所有的Pipeline实例,private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();

这里并没有将Pipeline实例存储在IngestMetadata,这样做的原因有2个:

  1. 在 Elasticsearch 的启动过程中,插件和节点服务的初始化发生在 ClusterState 加载之后, 只有等所有插件完成加载后,所有的Processor工厂才会被注册到系统中,这意味着在集群状态初始化之前,Processor工厂并不可用。
  2. ClusterState 中的元数据结构是静态注册的,即在类加载时已经确定,如果要将运行时示例存储进去,必须改变 ClusterState 的元数据结构存储格式,这个很不方便维护,而且这在集群状态同步的时候也会带来不必要的序列化以及反序列化。

所以Pipeline的管理逻辑是:

  • ClusterState 中的IngestMetadata 只存储 JSON 格式的管道定义,描述 Pipeline 的配置(例如,包含哪些 Processor,它们的参数等)。
  • IngestService 中维护的运行时实例,将 JSON 配置解析为完整的 Pipeline 对象,包括处理器链

在集群变更时, pipeline的CRUD的实现在innerUpdatePipelines方法中 。

责任链设计模式的应用

管道的执行通过IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 去驱动,每个文档的每个Pipeline都会进入到这个函数, 而每个Pipeline有组装好的CompoundProcessor,实际的链式调用是在CompoundProcessor中。

简图大致如下:

pipeline执行简图.png

这里拿其和Zookeeper(3.6.3)中RequestProcessor 做一些对比:

特点 ES中的CompoundProcessor Zookeeper中RequestProcessor
链的存储定义 由两条列表组成,分别保存正常流程以及失败流程的Processor列表。 固定的链式结构。
异步执行 支持异步回调,可以异步执行链中的处理器。 同步执行 。
失败处理 提供专门的 onFailureProcessors 作为失败处理链。如果不忽略异常并且onFailureProcessors 不为空则会执行失败处理链逻辑。 没有专门的失败处理链,异常直接交由上层捕获。
可配置性 可动态调整处理器链和配置(如 ignoreFailure)。 代码中写死,无法配置

这里并没有说Zookeeper的设计就差于Elasticsearch, 只是设计目标有所不同,RequestProcessor就是适合集中式的强一致、其中Processor并不需要灵活变化,而CompoundProcessor就是适合高并发而Procesor灵活变化场景。

4. Ingest实战建议

回到实战中来,这里结合目前所分析的内容给出相应的实战建议。

  • 建立监控
    • 对于关键的Piepline,我们需要通过GET _nodes/stats?filter_path=nodes.*.ingest 获取其运行状况,识别延迟或失败的 Processor。
    • 文档写入的时候使用的是write线程池,我们也需要监控GET _nodes/stats?filter_path=nodes.*.thread_pool.write 的queue和write_rejections 判断需要扩展线程池。
  • 优化Processor
    • 减少高开销操作,优先使用内置 Processor,比如script Processor 可适时替换set, append,避免过度复杂的正则表达式或嵌套逻辑。
    • 自定义的Processor尽量优化,比如如果涉及查询外部系统可考虑引入缓存。
  • 建立单独的Ingest节点
    • 如果有大量的Pipeline需要执行,则可以考虑增加专用 Ingest 节点,避免与数据节点争夺资源。

5. 漏洞&&修复分析

这里分析7.10.2版本Ingest模块存在的漏洞以及官方是如何修复的。

CVE-2021-22144

https://discuss.elastic.co/t/elasticsearch-7-13-3-and-6-8-17-security-update/278100

Elasticsearch Grok 解析器中发现了一个不受控制的递归漏洞,该漏洞可能导致拒绝服务攻击。能够向 Elasticsearch 提交任意查询的用户可能会创建恶意 Grok 查询,从而导致 Elasticsearch 节点崩溃。

漏洞复现

发起这个请求:

  • patterns: 处理字段时使用的 Grok 模式,这里设置为 %{INT}。
  • pattern_definitions: 定义自定义 Grok 模式,这里故意让 INT 模式递归引用自身,导致循环引用问题。
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "%{INT}"
          ],
          "pattern_definitions": {
            "INT": "%{INT}"
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "test"
      }
    }
  ]
}

当执行之后会使得节点直接StackOverflow中止进程。

修复逻辑

这个问题的关键在于原先的逻辑中,只会对间接的递归引用(pattern1 => pattern2 => pattern3 => pattern1)做了检测,但是没有对直接的自引用(pattern1 => pattern1 )做检测。

    private void forbidCircularReferences() {
                // 这个是增加的逻辑,检测直接的自引用
        for (Map.Entry<String, String> entry : patternBank.entrySet()) {
            if (patternReferencesItself(entry.getValue(), entry.getKey())) {
                throw new IllegalArgumentException("circular reference in pattern [" + entry.getKey() + "][" + entry.getValue() + "]");
            }
        }

        // 间接递归引用检测(这个是原先的逻辑)
        for (Map.Entry<String, String> entry : patternBank.entrySet()) {
            String name = entry.getKey();
            String pattern = entry.getValue();
            innerForbidCircularReferences(name, new ArrayList<>(), pattern);
        }
    }

CVE-2023-46673

https://discuss.elastic.co/t/elasticsearch-7-17-14-8-10-3-security-update-esa-2023-24/347708

漏洞复现

尝试了很多已有的Processor都没有复现,我们这使用自定义的Processor来复现,将之前的自定义AddArrayProcessor加一行代码:

    @Override
    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
        List valueList = new ArrayList<>(Arrays.asList(value.split(",")));
        valueList.add(valueList);  // 增加的代码
        ingestDocument.setFieldValue(field, valueList);
        return ingestDocument;
    }

重新编译再安装插件之后,执行改Processor 将会StackOverflow。

修复逻辑

这个问题的关键在于在IngestDocument的deepCopyMap的方法之前没有判断这样的无限引用的情况:

循环引用.png

那么在此之前做一个检测就好了,这个方法在原本的ES代码中就存在:org.elasticsearch.common.util.CollectionUtils#ensureNoSelfReferences(java.lang.Object, java.lang.String) ,其利用 IdentityHashMap 记录已访问对象的引用,检测并防止对象间的循环引用。

CVE-2024-23450

https://discuss.elastic.co/t/elasticsearch-8-13-0-7-17-19-security-update-esa-2024-06/356314

漏洞复现

虽然我们的索引只有2个Pipeline的配置,但是由于Pipeline Processor的存在,所以实际上一个文档其实能被很多Pipeline处理,当需要执行足够多个的pipline个数时,则会发生StackOverflow。

修复逻辑

这个问题的关键在于对Pipeline的个数并没有限制,添加一个配置项,当超出该个数则直接抛出异常。

public static final int MAX_PIPELINES = Integer.parseInt(System.getProperty("es.ingest.max_pipelines", "100"));

IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 添加逻辑:

    public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
        if (executedPipelines.size() >= MAX_PIPELINES) {
            handler.accept(
                null,
                new IllegalStateException(PIPELINE_TOO_MANY_ERROR_MESSAGE + MAX_PIPELINES + " nested pipelines")
            );
        } 

思考: 这里判断pipeline是否超出100个限制是用已经执行的pipeline个数来计算的。 假设已经超出100个pipeline,那这100个pipeline是会白跑的, 如果能在真正执行之前分析需要执行的Pipeline个数会更好。

6. 总结

Ingest 模块作为 Elasticsearch 数据处理流程的重要组成部分,提供了灵活的管道化能力,使得用户能够在数据写入前进行丰富的预处理操作。然而,在实际场景中,Ingest 模块也面临性能瓶颈、资源竞争等挑战,需要结合业务需求和系统现状进行精细化调优,通过优化 Processor 执行效率、合理规划集群架构以及增强监控与诊断手段,我们可以充分释放 Ingest 模块的能力,提升 Elasticsearch 的整体数据处理能力。

继续阅读 »

本文基于Elasticsearch7.10.2分析

0.Ingest 节点 概述

在实际进行文档index之前,使用采集节点(默认情况下,每个es节点都是ingest)对文档进行预处理。采集节点会拦截bulk和index请求,进行转换,然后将文档传回index或bulk API。

每个索引都有index.default_pipeline 和 index.final_pipeline 两个配置。

他们都是用于指定 Elasticsearch index 或者bulk 文档时要执行的预处理逻辑。

  • index.default_pipeline 定义了默认管道,它会在索引文档时首先执行。但如果索引请求中指定了 pipeline 参数,则该参数指定的管道会覆盖默认管道。如果设置了 index.default_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行任何摄取管道。
  • index.final_pipeline 定义了最终管道,它总是在请求管道(如果指定)和默认管道(如果存在)之后执行。如果设置了 index.final_pipeline 但对应的管道不存在,索引请求会失败。特殊值 _none 表示不运行最终管道。

简而言之,default_pipeline 先执行,可被覆盖;final_pipeline 后执行,不可被覆盖。两者都可设为 _none 以禁用。

一个Pipeline可以有多个Processor组成,每个Processor有着各自的不同功能,官方支持的Processor可参考:https://www.elastic.co/guide/en/elasticsearch/reference/7.10/ingest-processors.html

一个简单的例子,利用Set Processor 对新增的文档中加入新的字段和值:

PUT _ingest/pipeline/set_os
{
  "description": "sets the value of host.os.name from the field os",
  "processors": [
    {
      "set": {
        "field": "host.os.name",  // 增加的属性
        "value": "{{os}}"   // 这里引用了文档原先的os属性, 这里可以直接填写其他值
      }
    }
  ]
}

POST _ingest/pipeline/set_os/_simulate
{
  "docs": [
    {
      "_source": {
        "os": "Ubuntu"
      }
    }
  ]
}

这样转换之后,文档内容就变成了:

{
  "host" : {
    "os" : {
      "name" : "Ubuntu"
    }
  },
  "os" : "Ubuntu"
}

写单个文档的流程概述

没有指定pipeline.png

当请求,或者索引本身配置有pipline的时候,协调节点就会转发到ingest节点

PSOT source_index/_doc?pipeline=set_os
{
  "os": "xxxx"
}

指定了pipeline.png

【注】 并不是一定会发生内部rpc的请求转发,如果本地节点能接受当前的请求则不会转发到其他节点。

1. 模块总体概述

本小节关注IngestService中重要的相关类,对这些类有一个整体的了解有助于理解该模块。

  • ClusterService
    • IngestService 实现了 ClusterStateApplier 接口, 这样就能监听和响应集群的状态变化,当集群状态更新时,IngestService可以调整其内部 pipelines完成CRUD。
    • 另外IngestService还有List<Consumer>用来对提供给对集群状态变更之后需要最新状态的插件。
  • ScriptService
    • 某些Processor需要其用于管理和执行脚本,比如Script Processor。
  • AnalysisRegistry
    • 某些需要对文档内容进行分词处理的Processor。
  • ThreadPool
    • Processor都是异步执行的,实际执行线程池取决于调用上下文(如 write 或 management)
      • bulk API 时发生的pipeline 处理使用的是write线程池。
      • pipeline/_simulate API 使用的是management线程池,模拟执行通常是短时间的、低频的任务,不需要高并发支持而且为了不影响实际的文档处理或其他重要任务。
    • 另外为了避免Grok Processor运行时间过长,使用了Generic线程做定时调度检查执行时间
  • IngestMetric
    • 通过实现ReportingService接口来做到展示ingest内部的执行情况。
    • GET _nodes/stats?filter_path=nodes.*.ingest 可以查看到ingest 中的每个Pipeline中的执行的次数、失败次数以及总耗时
  • IngestPlugin
    • Ingest支持加载自定义的Processor插件,系统内置的所有Processor以及自定义的都通过IngestService 中的Map<String, Processor.Factory> processorFactories来进行管理。
  • IngestDocument
    • 其包含文档的源数据,提供了修改和查询文档的字段的能力,为Processor灵活操作文档数据提供基础,在后续pipeline的执行中,也是由其的executePipeline方法驱动的。

2. Processor 实现机制

抽象工厂设计模式的应用

Processor接口设计

每个Processor都有核心方法execute,使得处理器能够以统一的方式操作 IngestDocument,并通过多态实现不同处理逻辑。

Processor.Factory的设计

Processor.Factory 是 Processor 的抽象工厂,负责动态创建处理器实例。其主要职责包括:

  • 动态实例化
    • Processor create() 方法接收处理器配置并创建具体的处理器。
    • 支持递归创建,例如ConditionalProcessor可以嵌套其他处理器。
  • 依赖注入
    • Processor.Parameters 提供了一组服务和工具(如 ScriptService),工厂可以利用这些依赖创建复杂的Processor。

Processor.Factory的集中管理

在 IngestService 中,通过 Map<String, Processor.Factory> processorFactories 集中管理所有处理器工厂。这种管理方式提供了以下优势:

动态扩展

  • 插件可以注册自定义Processor工厂。
  • 新处理器类型的注册仅需添加到 processorFactories。

组合以及装饰器设计模式的应用

Processor经典的几个类的关系如下:

processor关系类图.png

CompoundProcessor是经典的组合设计模式,Pipeline这个类可以像使用单个 Processor 一样调用 CompoundProcessor,无需关注其内部具体细节。

而ConditionalProcessor 以及TrackingResultProcessor则体现了装饰器模式,在不改变原有对象的情况下扩展功能:

  • ConditionalProcessor 在执行Processor前会调用evaluate方法判断是否需要执行。
  • TrackingResultProcessor中decorate是为 CompoundProcessor 及其内部的 Processor 添加跟踪功能。

如何自定义Processor 插件

自定义 Processor 插件的注册方式为实现 IngestPlugin (Elasticsearch 提供不同的插件接口用来扩展不同类型的功能)的 getProcessors 方法,该方法返回一个工厂列表,IngestService 会将这些工厂注入到 processorFactories 中。

分析完代码之后,我们回到实战中来,简单起见,我们实现类似Append的Processor,但是我们这个更简单,输入的是字符串,然后我们用,分割一下将其作为数组设为值。

import org.elasticsearch.ingest.Processor;
import org.elasticsearch.plugins.IngestPlugin;
import org.elasticsearch.plugins.Plugin;
import java.util.HashMap;
import java.util.Map;
public class AddArrayProcessorPlugin extends Plugin implements IngestPlugin {
    @Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
        Map<String, Processor.Factory> processors = new HashMap<>();
        processors.put(AddArrayProcessor.TYPE, new AddArrayProcessor.Factory());
        return processors;
    }
}
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
import org.elasticsearch.ingest.Processor;
import java.util.*;

public class AddArrayProcessor extends AbstractProcessor {
    public static final String TYPE = "add_array";
    public String field;
    public String value;
    protected AddArrayProcessor(String tag, String description, String field, String value) {
        super(tag, description);
        this.field = field;
        this.value = value;
    }
    @Override
    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
        List valueList = new ArrayList<>(Arrays.asList(value.split(",")));
        ingestDocument.setFieldValue(field, valueList);
        return ingestDocument;
    }
    @Override
    public String getType() {
        return TYPE;
    }
    public static final class Factory implements Processor.Factory {
        @Override
        public Processor create(Map<String, Processor.Factory> processorFactories, String tag,
                                String description, Map<String, Object> config) throws Exception {
            String field = ConfigurationUtils.readStringProperty(TYPE, tag, config, "field");
            String value = ConfigurationUtils.readStringProperty(TYPE, tag, config, "value");
            return new AddArrayProcessor(tag, description, field, value);
        }
    }
}

打包之后,我们去install我们的Processor插件:

bin/elasticsearch-plugin install file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip 
warning: no-jdk distributions that do not bundle a JDK are deprecated and will be removed in a future release
-> Installing file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
-> Downloading file:///home/hcb/data/data/es/data_standalone/elasticsearch-7.10.2-SNAPSHOT/AddArrayProcessor-1.0-SNAPSHOT.zip
[=================================================] 100%   
-> Installed AddArrayProcess

测试一下:

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
       {
        "add_array": {
          "field": "test_arr",
          "value": "a,b,c,d,e,f"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "test",
      "_id": "1",
      "_source": {
        "field": "value"
      }
    }
  ]
}
docs结果:
"_source" : {
          "field" : "value",
          "test_arr" : [
            "a",
            "b",
            "c",
            "d",
            "e",
            "f"
          ]
        }

3. Pipeline设计

如何管理Pipeline

在IngestServcie中有一个Map存储所有的Pipeline实例,private volatile Map<String, PipelineHolder> pipelines = Collections.emptyMap();

这里并没有将Pipeline实例存储在IngestMetadata,这样做的原因有2个:

  1. 在 Elasticsearch 的启动过程中,插件和节点服务的初始化发生在 ClusterState 加载之后, 只有等所有插件完成加载后,所有的Processor工厂才会被注册到系统中,这意味着在集群状态初始化之前,Processor工厂并不可用。
  2. ClusterState 中的元数据结构是静态注册的,即在类加载时已经确定,如果要将运行时示例存储进去,必须改变 ClusterState 的元数据结构存储格式,这个很不方便维护,而且这在集群状态同步的时候也会带来不必要的序列化以及反序列化。

所以Pipeline的管理逻辑是:

  • ClusterState 中的IngestMetadata 只存储 JSON 格式的管道定义,描述 Pipeline 的配置(例如,包含哪些 Processor,它们的参数等)。
  • IngestService 中维护的运行时实例,将 JSON 配置解析为完整的 Pipeline 对象,包括处理器链

在集群变更时, pipeline的CRUD的实现在innerUpdatePipelines方法中 。

责任链设计模式的应用

管道的执行通过IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 去驱动,每个文档的每个Pipeline都会进入到这个函数, 而每个Pipeline有组装好的CompoundProcessor,实际的链式调用是在CompoundProcessor中。

简图大致如下:

pipeline执行简图.png

这里拿其和Zookeeper(3.6.3)中RequestProcessor 做一些对比:

特点 ES中的CompoundProcessor Zookeeper中RequestProcessor
链的存储定义 由两条列表组成,分别保存正常流程以及失败流程的Processor列表。 固定的链式结构。
异步执行 支持异步回调,可以异步执行链中的处理器。 同步执行 。
失败处理 提供专门的 onFailureProcessors 作为失败处理链。如果不忽略异常并且onFailureProcessors 不为空则会执行失败处理链逻辑。 没有专门的失败处理链,异常直接交由上层捕获。
可配置性 可动态调整处理器链和配置(如 ignoreFailure)。 代码中写死,无法配置

这里并没有说Zookeeper的设计就差于Elasticsearch, 只是设计目标有所不同,RequestProcessor就是适合集中式的强一致、其中Processor并不需要灵活变化,而CompoundProcessor就是适合高并发而Procesor灵活变化场景。

4. Ingest实战建议

回到实战中来,这里结合目前所分析的内容给出相应的实战建议。

  • 建立监控
    • 对于关键的Piepline,我们需要通过GET _nodes/stats?filter_path=nodes.*.ingest 获取其运行状况,识别延迟或失败的 Processor。
    • 文档写入的时候使用的是write线程池,我们也需要监控GET _nodes/stats?filter_path=nodes.*.thread_pool.write 的queue和write_rejections 判断需要扩展线程池。
  • 优化Processor
    • 减少高开销操作,优先使用内置 Processor,比如script Processor 可适时替换set, append,避免过度复杂的正则表达式或嵌套逻辑。
    • 自定义的Processor尽量优化,比如如果涉及查询外部系统可考虑引入缓存。
  • 建立单独的Ingest节点
    • 如果有大量的Pipeline需要执行,则可以考虑增加专用 Ingest 节点,避免与数据节点争夺资源。

5. 漏洞&&修复分析

这里分析7.10.2版本Ingest模块存在的漏洞以及官方是如何修复的。

CVE-2021-22144

https://discuss.elastic.co/t/elasticsearch-7-13-3-and-6-8-17-security-update/278100

Elasticsearch Grok 解析器中发现了一个不受控制的递归漏洞,该漏洞可能导致拒绝服务攻击。能够向 Elasticsearch 提交任意查询的用户可能会创建恶意 Grok 查询,从而导致 Elasticsearch 节点崩溃。

漏洞复现

发起这个请求:

  • patterns: 处理字段时使用的 Grok 模式,这里设置为 %{INT}。
  • pattern_definitions: 定义自定义 Grok 模式,这里故意让 INT 模式递归引用自身,导致循环引用问题。
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "%{INT}"
          ],
          "pattern_definitions": {
            "INT": "%{INT}"
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "test"
      }
    }
  ]
}

当执行之后会使得节点直接StackOverflow中止进程。

修复逻辑

这个问题的关键在于原先的逻辑中,只会对间接的递归引用(pattern1 => pattern2 => pattern3 => pattern1)做了检测,但是没有对直接的自引用(pattern1 => pattern1 )做检测。

    private void forbidCircularReferences() {
                // 这个是增加的逻辑,检测直接的自引用
        for (Map.Entry<String, String> entry : patternBank.entrySet()) {
            if (patternReferencesItself(entry.getValue(), entry.getKey())) {
                throw new IllegalArgumentException("circular reference in pattern [" + entry.getKey() + "][" + entry.getValue() + "]");
            }
        }

        // 间接递归引用检测(这个是原先的逻辑)
        for (Map.Entry<String, String> entry : patternBank.entrySet()) {
            String name = entry.getKey();
            String pattern = entry.getValue();
            innerForbidCircularReferences(name, new ArrayList<>(), pattern);
        }
    }

CVE-2023-46673

https://discuss.elastic.co/t/elasticsearch-7-17-14-8-10-3-security-update-esa-2023-24/347708

漏洞复现

尝试了很多已有的Processor都没有复现,我们这使用自定义的Processor来复现,将之前的自定义AddArrayProcessor加一行代码:

    @Override
    public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
        List valueList = new ArrayList<>(Arrays.asList(value.split(",")));
        valueList.add(valueList);  // 增加的代码
        ingestDocument.setFieldValue(field, valueList);
        return ingestDocument;
    }

重新编译再安装插件之后,执行改Processor 将会StackOverflow。

修复逻辑

这个问题的关键在于在IngestDocument的deepCopyMap的方法之前没有判断这样的无限引用的情况:

循环引用.png

那么在此之前做一个检测就好了,这个方法在原本的ES代码中就存在:org.elasticsearch.common.util.CollectionUtils#ensureNoSelfReferences(java.lang.Object, java.lang.String) ,其利用 IdentityHashMap 记录已访问对象的引用,检测并防止对象间的循环引用。

CVE-2024-23450

https://discuss.elastic.co/t/elasticsearch-8-13-0-7-17-19-security-update-esa-2024-06/356314

漏洞复现

虽然我们的索引只有2个Pipeline的配置,但是由于Pipeline Processor的存在,所以实际上一个文档其实能被很多Pipeline处理,当需要执行足够多个的pipline个数时,则会发生StackOverflow。

修复逻辑

这个问题的关键在于对Pipeline的个数并没有限制,添加一个配置项,当超出该个数则直接抛出异常。

public static final int MAX_PIPELINES = Integer.parseInt(System.getProperty("es.ingest.max_pipelines", "100"));

IngestDocument的org.elasticsearch.ingest.IngestDocument#executePipeline 添加逻辑:

    public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
        if (executedPipelines.size() >= MAX_PIPELINES) {
            handler.accept(
                null,
                new IllegalStateException(PIPELINE_TOO_MANY_ERROR_MESSAGE + MAX_PIPELINES + " nested pipelines")
            );
        } 

思考: 这里判断pipeline是否超出100个限制是用已经执行的pipeline个数来计算的。 假设已经超出100个pipeline,那这100个pipeline是会白跑的, 如果能在真正执行之前分析需要执行的Pipeline个数会更好。

6. 总结

Ingest 模块作为 Elasticsearch 数据处理流程的重要组成部分,提供了灵活的管道化能力,使得用户能够在数据写入前进行丰富的预处理操作。然而,在实际场景中,Ingest 模块也面临性能瓶颈、资源竞争等挑战,需要结合业务需求和系统现状进行精细化调优,通过优化 Processor 执行效率、合理规划集群架构以及增强监控与诊断手段,我们可以充分释放 Ingest 模块的能力,提升 Elasticsearch 的整体数据处理能力。

收起阅读 »

【第5期】搜索客 Meetup | 最强开源 Elasticsearch 多集群管理工具 INFINI Console - 动手实战

本次活动由 搜索客社区、极限科技(INFINI Labs)联合举办,最近 INFINI Labs 重磅宣布旗下的产品 Console/Gateway/Agent/Framework 等在 Github 上开源了,其中 INFINI Console 作为 一款非常轻量级的多集群、跨版本的搜索基础设施统一管控工具,受到广大用户喜爱。借此开源机会,我们邀请到 INFINI Labs 的技术专家罗厚付老师跟大家分享介绍 Console 并动手实战,手把手教你从源码编译 -> 安装部署 -> 上手体验全攻略,欢迎预约直播观看~

活动主题:最强开源 Elasticsearch 多集群管理工具 INFINI Console - 动手实战
活动时间:2024 年 12 月 20 日 19:00-20:00(周三)
活动形式:微信视频号(极限实验室)直播
报名方式:关注或扫码海报中的二维码进行预约

嘉宾介绍

罗厚付,极限科技技术专家,拥有多年安全风控及大数据系统架构经验。现任极限科技云上产品设计与研发负责人,主导过多个核心产品的设计与落地。日常负责运维超大规模 ES 集群(800+节点/1PB+数据)。

主题摘要

INFINI Labs Console/Gateway/Agent/Framework 开源后,如何在本地搭建开发环境,并运行起来,使用 INFINI Easysearch 进行指标存储,使用 INFINI Console/Agent 对 Ealsticsearch 进行指标采集。

参与有奖

本次直播活动将设有福袋抽奖环节,参与就有机会获得 INFINI Labs 周边纪念品,包括 T 恤、鸭舌帽、咖啡杯、指甲刀套件等等(图片仅供参考,款式、颜色与尺码随机)。

活动交流

本活动设有 Meetup 技术交流群,可添加小助手微信拉群,与更多小伙伴一起学习交流。

Meetup 讲师招募

搜索客社区 Meetup 的成功举办,离不开社区小伙伴的热情参与。目前社区讲师招募计划也在持续进行中,我们诚挚邀请各位技术大咖、行业精英踊跃提交演讲议题,与大家分享您的经验。

讲师报名链接:http://cfp.searchkit.cn
或扫描下方二维码,立刻报名成为讲师!

Meetup 活动聚焦 AI 与搜索领域的最新动态,以及数据实时搜索分析、向量检索、技术实践与案例分析、日志分析、安全等领域的深度探讨。

我们热切期待您的精彩分享!

往期回顾

  1. 【第 4 期】搜索客 Meetup | INFINI Pizza 网站 SVG 动画这么炫,我教你啊!
  2. 【第 3 期】搜索客 Meetup | Elasticsearch 的代码结构和写入查询流程的解读 - 下篇
  3. 【第 2 期】搜索客 Meetup | Elasticsearch 的代码结构和写入查询流程的解读 - 上篇
  4. 【第 1 期】搜索客 Meetup | Easysearch 结合大模型实现 RAG

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

继续阅读 »

本次活动由 搜索客社区、极限科技(INFINI Labs)联合举办,最近 INFINI Labs 重磅宣布旗下的产品 Console/Gateway/Agent/Framework 等在 Github 上开源了,其中 INFINI Console 作为 一款非常轻量级的多集群、跨版本的搜索基础设施统一管控工具,受到广大用户喜爱。借此开源机会,我们邀请到 INFINI Labs 的技术专家罗厚付老师跟大家分享介绍 Console 并动手实战,手把手教你从源码编译 -> 安装部署 -> 上手体验全攻略,欢迎预约直播观看~

活动主题:最强开源 Elasticsearch 多集群管理工具 INFINI Console - 动手实战
活动时间:2024 年 12 月 20 日 19:00-20:00(周三)
活动形式:微信视频号(极限实验室)直播
报名方式:关注或扫码海报中的二维码进行预约

嘉宾介绍

罗厚付,极限科技技术专家,拥有多年安全风控及大数据系统架构经验。现任极限科技云上产品设计与研发负责人,主导过多个核心产品的设计与落地。日常负责运维超大规模 ES 集群(800+节点/1PB+数据)。

主题摘要

INFINI Labs Console/Gateway/Agent/Framework 开源后,如何在本地搭建开发环境,并运行起来,使用 INFINI Easysearch 进行指标存储,使用 INFINI Console/Agent 对 Ealsticsearch 进行指标采集。

参与有奖

本次直播活动将设有福袋抽奖环节,参与就有机会获得 INFINI Labs 周边纪念品,包括 T 恤、鸭舌帽、咖啡杯、指甲刀套件等等(图片仅供参考,款式、颜色与尺码随机)。

活动交流

本活动设有 Meetup 技术交流群,可添加小助手微信拉群,与更多小伙伴一起学习交流。

Meetup 讲师招募

搜索客社区 Meetup 的成功举办,离不开社区小伙伴的热情参与。目前社区讲师招募计划也在持续进行中,我们诚挚邀请各位技术大咖、行业精英踊跃提交演讲议题,与大家分享您的经验。

讲师报名链接:http://cfp.searchkit.cn
或扫描下方二维码,立刻报名成为讲师!

Meetup 活动聚焦 AI 与搜索领域的最新动态,以及数据实时搜索分析、向量检索、技术实践与案例分析、日志分析、安全等领域的深度探讨。

我们热切期待您的精彩分享!

往期回顾

  1. 【第 4 期】搜索客 Meetup | INFINI Pizza 网站 SVG 动画这么炫,我教你啊!
  2. 【第 3 期】搜索客 Meetup | Elasticsearch 的代码结构和写入查询流程的解读 - 下篇
  3. 【第 2 期】搜索客 Meetup | Elasticsearch 的代码结构和写入查询流程的解读 - 上篇
  4. 【第 1 期】搜索客 Meetup | Easysearch 结合大模型实现 RAG

关于极限科技(INFINI Labs)

INFINI Labs

极限科技,全称极限数据(北京)科技有限公司,是一家专注于实时搜索与数据分析的软件公司。旗下品牌极限实验室(INFINI Labs)致力于打造极致易用的数据探索与分析体验。

极限科技是一支年轻的团队,采用天然分布式的方式来进行远程协作,员工分布在全球各地,希望通过努力成为中国乃至全球企业大数据实时搜索分析产品的首选,为中国技术品牌输出添砖加瓦。

官网:https://infinilabs.cn

收起阅读 »