看,灰机...

玩转Elasticsearch routing功能


Elasticsearch是一个搭建在Lucene搜索引擎库基础之上的搜索服务平台。它在单机的Lucene搜索引擎库基础之上增加了分布式设计,translog等特性,增强了搜索引擎的性能,高可用性,高可扩性等。
Elasticsearch分布式设计的基本思想是Elasticsearch集群由多个服务器节点组成,集群中的一个索引分为多个分片,每个分片可以分配在不同的节点上。其中每个分片都是一个单独的功能完成的Lucene实例,可以独立地进行写入和查询服务,ES中存储的数据分布在集群分片的一个或多个上,其结构简单描述为下图。

在上面的架构图中,集群由三个节点组成,每个节点上有两个分片,想要读写文档就必须知道文档被分配在哪个分片上,这也正是本文要讲的routing功能的作用。

1. 工作原理

1.1 routing参数

routing参数是一个可选参数,默认使用文档的_id值,可以用在INDEX, UPDATE,GET, SEARCH, DELETE等各种操作中。在写入(包括更新)时,用于计算文档所属分片,在查询(GET请求或指定了routing的查询)中用于限制查询范围,提高查询速度。

1.2 计算方法

ES中shardId的计算公式如下:

shardId = hash(_routing) % num_primary_shards

通过该公式可以保证使用相同routing的文档被分配到同一个shard上,当然在默认情况下使用_id作为routing起到将文档均匀分布到多个分片上防止数据倾斜的作用。

1.3 routing_partition_size参数

使用了routing参数可以让routing值相同的文档分配到同一个分片上,从而减少查询时需要查询的shard数,提高查询效率。但是使用该参数容易导致数据倾斜。为此,ES还提供了一个index.routing_partition_size参数(仅当使用routing参数时可用),用于将routing相同的文档映射到集群分片的一个子集上,这样一方面可以减少查询的分片数,另一方面又可以在一定程度上防止数据倾斜。引入该参数后计算公式如下

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

1.4 源码解读

如下为计算文档归属分片的源码,从源码中我们可以看到ES的哈希算法使用的是Murmur3,取模使用的是java的floorMod

version: 6.5
path: org\elasticsearch\cluster\routing\OperationRouting.java

public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id; //默认使用id
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {//使用了routing_partition_size参数
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

2. 存在的问题及解决方案

2.1 数据倾斜

如前面所述,用户使用自定义routing可以控制文档的分配位置,从而达到将相似文档放在同一个或同一批分片的目的,减少查询时的分片个数,提高查询速度。然而,这也意味着数据无法像默认情况那么均匀的分配到各分片和各节点上,从而会导致各节点存储和读写压力分布不均,影响系统的性能和稳定性。对此可以从以下两个方面进行优化

  1. 使用routing_partition_size参数
    如前面所述,该参数可以使routing相同的文档分配到一批分片(集群分片的子集)而不是一个分片上,从而可以从一定程度上减轻数据倾斜的问题。该参数的效果与其值设置的大小有关,当该值等于number_of_shard时,routing将退化为与未指定一样。当然该方法只能减轻数据倾斜,并不能彻底解决。
  2. 合理划分数据和设置routing值
    从前面的分析,我们可以得到文档分片计算的公式,公式中的hash算法和取模算法也已经通过源码获取。因此用户在划分数据时,可以首先明确数据要划分为几类,每一类数据准备划分到哪部分分片上,再结合分片计算公式计算出合理的routing值,当然也可以在routing参数设置之前设置一个自定义hash函数来实现,从而实现数据的均衡分配。
  3. routing前使用自定义hash函数
    很多情况下,用户并不能提前确定数据的分类值,为此可以在分类值和routing值之间设置一个hash函数,保证分类值散列后的值更均匀,使用该值作为routing,从而防止数据倾斜。

2.2 异常行为

ES的id去重是在分片维度进行的,之所以这样做是ES因为默认情况下使用_id作为routing值,这样id相同的文档会被分配到相同的分片上,因此只需要在分片维度做id去重即可保证id的唯一性。
然而当使用了自定义routing后,id相同的文档如果指定了不同的routing是可能被分配到不同的分片上的,从而导致同一个索引中出现两个id一样的文档,这里之所以说“可能”是因为如果不同的routing经过计算后仍然被映射到同一个分片上,去重还是可以生效的。因此这里会出现一个不稳定的情况,即当对id相同routing不同的文档进行写入操作时,有的时候被更新,有的时候会生成两个id相同的文档,具体可以使用下面的操作复现

# 出现两个id一样的情况
POST _bulk
{"index":{"_index":"routing_test","_id":"123","routing":"abc"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test","_id":"123","routing":"xyz"}}
{"name":"lisi","age":22}

GET routing_test/_search

# 相同id被更新的情况
POST _bulk
{"index":{"_index":"routing_test_2","_id":"123","routing":"123"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test_2","_id":"123","routing":"123456"}}
{"name":"lisi","age":22}

GET routing_test_2/_search

以上测试场景在5.6.4, 6.4.3, 6.8.2集群上均验证会出现,在7.2.1集群上没有出现(可能是id去重逻辑发生了变化,这个后续研究一下后更新)。
对于这种场景,虽然在响应行为不一致,但是由于属于未按正常使用方式使用(id相同的文档应该使用相同的routing),也属于可以理解的情况,官方文档上也有对应描述, 参考地址

3. 常规用法

3.1 文档划分及routing指定

  • 明确文档划分
    使用routing是为了让查询时有可能出现在相同结果集的文档被分配到一个或一批分片上。因此首先要先明确哪些文档应该被分配在一起,对于这些文档使用相同的routing值,常规的一些自带分类信息的文档,如学生的班级属性,产品的分类等都可以作为文档划分的依据。
  • 确定各类别的目标分片
    当然这一步不是必须的,但是合理设置各类数据的目标分片,让他们尽量均匀分配,可以防止数据倾斜。因此建议在使用前就明确哪一类数据准备分配在哪一个或一批分片上,然后通过计算给出这类文档的合理routing值
  • routing分布均匀
    在很多场景下分类有哪些值不确定,因此无法明确划分各类数据的分片归属并计算出routing值,对于这种情况,建议可以在routing之前增加一个hash函数,让不同文档分类的值通过哈希尽量散列得更均匀一些,从而保证数据分布平衡。

3.2 routing的使用

  • 写入操作
    文档的PUT, POST, BULK操作均支持routing参数,在请求中带上routing=xxx即可。使用了routing值即可保证使用相同routing值的文档被分配到一个或一批分片上。
  • GET操作
    对于使用了routing写入的文档,在GET时必须指定routing,否则可能导致404,这与GET的实现机制有关,GET请求会先根据routing找到对应的分片再获取文档,如果对写入使用routing的文档GET时没有指定routing,那么会默认使用id进行routing从而大概率无法获得文档。
  • 查询操作
    查询操作可以在body中指定_routing参数(可以指定多个)来进行查询。当然不指定_routing也是可以查询出结果的,不过是遍历所有的分片,指定了_routing后,查询仅会对routing对应的一个或一批索引进行检索,从而提高查询效率,这也是很多用户使用routing的主要目的,查询操作示例如下:
    GET my_index/_search
    {
    "query": {
    "terms": {
      "_routing": [ "user1" ] 
    }
    }
    }
  • UPDATE或DELETE操作
    UPDATE或DELETE操作与GET操作类似,也是先根据routing确定分片,再进行更新或删除操作,因此对于写入使用了routing的文档,必须指定routing,否则会报404响应。

3.3 设置routing为必选参数

从3.2的分析可以看出对于使用routing写入的文档,在进行GET,UPDATE或DELETE操作时如果不指定routing参数会出现异常。为此ES提供了一个索引mapping级别的设置,_routing.required, 来强制用户在INDEX,GET, DELETE,UPDATA一个文档时必须使用routing参数。当然查询时不受该参数的限制的。该参数的设置方式如下:

PUT my_index
{
  "mappings": {
    "_doc": {
      "_routing": {
        "required": true 
      }
    }
  }
}

3.4 routing结合别名使用

别名功能支持设置routing, 如下:

POST /_aliases
{
    "actions" : [
        {
            "add" : {
                 "index" : "test",
                 "alias" : "alias1",
                 "routing" : "1"
            }
        }
    ]
}

还支持查询和写入使用不同的routing,[详情参考] 将routing和别名结合,可以对使用者屏蔽读写时使用routing的细节,降低误操作的风险,提高操作的效率。

routing是ES中相对高阶一些的用法,在用户了解业务数据分布和查询需求的基础之上,可以对查询性能进行优化,然而使用不当会导致数据倾斜,重复ID等问题。本文介绍了routing的原理,问题及使用技巧,希望对大家有帮助,欢迎评论讨论

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

继续阅读 »


Elasticsearch是一个搭建在Lucene搜索引擎库基础之上的搜索服务平台。它在单机的Lucene搜索引擎库基础之上增加了分布式设计,translog等特性,增强了搜索引擎的性能,高可用性,高可扩性等。
Elasticsearch分布式设计的基本思想是Elasticsearch集群由多个服务器节点组成,集群中的一个索引分为多个分片,每个分片可以分配在不同的节点上。其中每个分片都是一个单独的功能完成的Lucene实例,可以独立地进行写入和查询服务,ES中存储的数据分布在集群分片的一个或多个上,其结构简单描述为下图。

在上面的架构图中,集群由三个节点组成,每个节点上有两个分片,想要读写文档就必须知道文档被分配在哪个分片上,这也正是本文要讲的routing功能的作用。

1. 工作原理

1.1 routing参数

routing参数是一个可选参数,默认使用文档的_id值,可以用在INDEX, UPDATE,GET, SEARCH, DELETE等各种操作中。在写入(包括更新)时,用于计算文档所属分片,在查询(GET请求或指定了routing的查询)中用于限制查询范围,提高查询速度。

1.2 计算方法

ES中shardId的计算公式如下:

shardId = hash(_routing) % num_primary_shards

通过该公式可以保证使用相同routing的文档被分配到同一个shard上,当然在默认情况下使用_id作为routing起到将文档均匀分布到多个分片上防止数据倾斜的作用。

1.3 routing_partition_size参数

使用了routing参数可以让routing值相同的文档分配到同一个分片上,从而减少查询时需要查询的shard数,提高查询效率。但是使用该参数容易导致数据倾斜。为此,ES还提供了一个index.routing_partition_size参数(仅当使用routing参数时可用),用于将routing相同的文档映射到集群分片的一个子集上,这样一方面可以减少查询的分片数,另一方面又可以在一定程度上防止数据倾斜。引入该参数后计算公式如下

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

1.4 源码解读

如下为计算文档归属分片的源码,从源码中我们可以看到ES的哈希算法使用的是Murmur3,取模使用的是java的floorMod

version: 6.5
path: org\elasticsearch\cluster\routing\OperationRouting.java

public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id; //默认使用id
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {//使用了routing_partition_size参数
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

2. 存在的问题及解决方案

2.1 数据倾斜

如前面所述,用户使用自定义routing可以控制文档的分配位置,从而达到将相似文档放在同一个或同一批分片的目的,减少查询时的分片个数,提高查询速度。然而,这也意味着数据无法像默认情况那么均匀的分配到各分片和各节点上,从而会导致各节点存储和读写压力分布不均,影响系统的性能和稳定性。对此可以从以下两个方面进行优化

  1. 使用routing_partition_size参数
    如前面所述,该参数可以使routing相同的文档分配到一批分片(集群分片的子集)而不是一个分片上,从而可以从一定程度上减轻数据倾斜的问题。该参数的效果与其值设置的大小有关,当该值等于number_of_shard时,routing将退化为与未指定一样。当然该方法只能减轻数据倾斜,并不能彻底解决。
  2. 合理划分数据和设置routing值
    从前面的分析,我们可以得到文档分片计算的公式,公式中的hash算法和取模算法也已经通过源码获取。因此用户在划分数据时,可以首先明确数据要划分为几类,每一类数据准备划分到哪部分分片上,再结合分片计算公式计算出合理的routing值,当然也可以在routing参数设置之前设置一个自定义hash函数来实现,从而实现数据的均衡分配。
  3. routing前使用自定义hash函数
    很多情况下,用户并不能提前确定数据的分类值,为此可以在分类值和routing值之间设置一个hash函数,保证分类值散列后的值更均匀,使用该值作为routing,从而防止数据倾斜。

2.2 异常行为

ES的id去重是在分片维度进行的,之所以这样做是ES因为默认情况下使用_id作为routing值,这样id相同的文档会被分配到相同的分片上,因此只需要在分片维度做id去重即可保证id的唯一性。
然而当使用了自定义routing后,id相同的文档如果指定了不同的routing是可能被分配到不同的分片上的,从而导致同一个索引中出现两个id一样的文档,这里之所以说“可能”是因为如果不同的routing经过计算后仍然被映射到同一个分片上,去重还是可以生效的。因此这里会出现一个不稳定的情况,即当对id相同routing不同的文档进行写入操作时,有的时候被更新,有的时候会生成两个id相同的文档,具体可以使用下面的操作复现

# 出现两个id一样的情况
POST _bulk
{"index":{"_index":"routing_test","_id":"123","routing":"abc"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test","_id":"123","routing":"xyz"}}
{"name":"lisi","age":22}

GET routing_test/_search

# 相同id被更新的情况
POST _bulk
{"index":{"_index":"routing_test_2","_id":"123","routing":"123"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test_2","_id":"123","routing":"123456"}}
{"name":"lisi","age":22}

GET routing_test_2/_search

以上测试场景在5.6.4, 6.4.3, 6.8.2集群上均验证会出现,在7.2.1集群上没有出现(可能是id去重逻辑发生了变化,这个后续研究一下后更新)。
对于这种场景,虽然在响应行为不一致,但是由于属于未按正常使用方式使用(id相同的文档应该使用相同的routing),也属于可以理解的情况,官方文档上也有对应描述, 参考地址

3. 常规用法

3.1 文档划分及routing指定

  • 明确文档划分
    使用routing是为了让查询时有可能出现在相同结果集的文档被分配到一个或一批分片上。因此首先要先明确哪些文档应该被分配在一起,对于这些文档使用相同的routing值,常规的一些自带分类信息的文档,如学生的班级属性,产品的分类等都可以作为文档划分的依据。
  • 确定各类别的目标分片
    当然这一步不是必须的,但是合理设置各类数据的目标分片,让他们尽量均匀分配,可以防止数据倾斜。因此建议在使用前就明确哪一类数据准备分配在哪一个或一批分片上,然后通过计算给出这类文档的合理routing值
  • routing分布均匀
    在很多场景下分类有哪些值不确定,因此无法明确划分各类数据的分片归属并计算出routing值,对于这种情况,建议可以在routing之前增加一个hash函数,让不同文档分类的值通过哈希尽量散列得更均匀一些,从而保证数据分布平衡。

3.2 routing的使用

  • 写入操作
    文档的PUT, POST, BULK操作均支持routing参数,在请求中带上routing=xxx即可。使用了routing值即可保证使用相同routing值的文档被分配到一个或一批分片上。
  • GET操作
    对于使用了routing写入的文档,在GET时必须指定routing,否则可能导致404,这与GET的实现机制有关,GET请求会先根据routing找到对应的分片再获取文档,如果对写入使用routing的文档GET时没有指定routing,那么会默认使用id进行routing从而大概率无法获得文档。
  • 查询操作
    查询操作可以在body中指定_routing参数(可以指定多个)来进行查询。当然不指定_routing也是可以查询出结果的,不过是遍历所有的分片,指定了_routing后,查询仅会对routing对应的一个或一批索引进行检索,从而提高查询效率,这也是很多用户使用routing的主要目的,查询操作示例如下:
    GET my_index/_search
    {
    "query": {
    "terms": {
      "_routing": [ "user1" ] 
    }
    }
    }
  • UPDATE或DELETE操作
    UPDATE或DELETE操作与GET操作类似,也是先根据routing确定分片,再进行更新或删除操作,因此对于写入使用了routing的文档,必须指定routing,否则会报404响应。

3.3 设置routing为必选参数

从3.2的分析可以看出对于使用routing写入的文档,在进行GET,UPDATE或DELETE操作时如果不指定routing参数会出现异常。为此ES提供了一个索引mapping级别的设置,_routing.required, 来强制用户在INDEX,GET, DELETE,UPDATA一个文档时必须使用routing参数。当然查询时不受该参数的限制的。该参数的设置方式如下:

PUT my_index
{
  "mappings": {
    "_doc": {
      "_routing": {
        "required": true 
      }
    }
  }
}

3.4 routing结合别名使用

别名功能支持设置routing, 如下:

POST /_aliases
{
    "actions" : [
        {
            "add" : {
                 "index" : "test",
                 "alias" : "alias1",
                 "routing" : "1"
            }
        }
    ]
}

还支持查询和写入使用不同的routing,[详情参考] 将routing和别名结合,可以对使用者屏蔽读写时使用routing的细节,降低误操作的风险,提高操作的效率。

routing是ES中相对高阶一些的用法,在用户了解业务数据分布和查询需求的基础之上,可以对查询性能进行优化,然而使用不当会导致数据倾斜,重复ID等问题。本文介绍了routing的原理,问题及使用技巧,希望对大家有帮助,欢迎评论讨论

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

收起阅读 »

Elasticsearch冷热分离原理和实践


性能与容量之间的矛盾由来已久,计算机的多级存储体系就是其中一个经典的例子,同样的问题在Elasticsearch中也存在。为了保证Elasticsearch的读写性能,官方建议磁盘使用SSD固态硬盘。然而Elasticsearch要解决的是海量数据的存储和检索问题,海量的数据就意味需要大量的存储空间,如果都使用SSD固态硬盘成本将成为一个很大的问题,这也是制约许多企业和个人使用Elasticsearch的因素之一。为了解决这个问题,Elasticsearch冷热分离架构应运而生。

1. 实现原理

1.1 节点异构

传统的Elasticsearch集群中所有节点均采用相同的配置,然而Elasticsearch并没有对节点的规格一致性做要求,换而言之就是每个节点可以是任意规格,当然这样做会导致集群各节点性能不一致,影响集群稳定性。但是如果有规则的将集群的节点分成不同类型,部分是高性能的节点用于存储热点数据,部分是性能相对差些的大容量节点用于存储冷数据,却可以一方面保证热数据的性能,另一方面保证冷数据的存储,降低存储成本,这也是Elasticsearch冷热分离架构的基本思想,如下图为一个3热节点,2冷节点的冷热分离Elasticsearch集群:

其中热节点为16核64GB 1TB SSD盘,用于满足对热数据对读写性能的要求,冷节点为8C32GB 5TB HDD在保证一定读写性能的基础之上提供了成本较低的大存储HDD盘来满足冷节点对数据存储的需求。

1.2 数据分布

集群节点异构后接着要考虑的是数据分布问题,即用户如何对冷热数据进行标识,并将冷数据移动到冷节点,热数据移动到热节点。

节点指定冷热属性

仅仅将不同的节点设置为不同的规格还不够,为了能明确区分出哪些节点是热节点,哪些节点是冷节点,需要为对应节点打标签
Elasticsearch支持给节点打标签,具体方式是在elasticsearch.yml文件中增加

node.attr.{attribute}: {value}

配置。其中attribute为用户自定义的任意标签名,value为该节点对应的该标签的值,例如对于冷热分离,可以使用如下设置

node.attr.temperature: hot //热节点
node.attr.temperature: warm //冷节点

ps:中文通常叫冷热,英文叫hot/warm

索引指定冷热属性

节点有了冷热属性后,接下来就是指定数据的冷热属性,来设置和调整数据分布。冷热分离方案中数据冷热分布的基本单位是索引,即指定某个索引为热索引,另一个索引为冷索引。通过索引的分布来实现控制数据分布的目的。 Elasticsearch提供了index shard filtering功能(2.x开始),该功能在索引配置中提供了如下几个配置

index.routing.allocation.include.{attribute}
Assign the index to a node whose {attribute} has at least one of the comma-separated values.

index.routing.allocation.require.{attribute}
Assign the index to a node whose {attribute} has all of the comma-separated values.

index.routing.allocation.exclude.{attribute}
Assign the index to a node whose {attribute} has none of the comma-separated values.

用户可以在创建索引,或后续的任意时刻设置这些配置来控制索引在不同标签节点上的分配动作。
index.routing.allocation.include.{attribute}表示索引可以分配在包含多个值中其中一个的节点上。
index.routing.allocation.require.{attribute}表示索引要分配在包含索引指定值的节点上(通常一般设置一个值)。
index.routing.allocation.exclude.{attribute}表示索引只能分配在不包含所有指定值的节点上。

数据分布控制

Elasticsearch的索引分片分配由ShardAllocator决定,ShardAllocator通过在索引分片创建或rebalance时对每个节点调用一系列AllocationDecider来决定是否将节点分配到指定节点上,其中一个AllocationDecider是FilterAllocationDecider,该decider用于应用集群,节点的一些基于attr的分配规则,涉及到节点级别配置的核心代码如下

private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
    if (indexMd.requireFilters() != null) {
        if (indexMd.requireFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
        }
    }
    if (indexMd.includeFilters() != null) {
        if (indexMd.includeFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
        }
    }
    if (indexMd.excludeFilters() != null) {
        if (indexMd.excludeFilters().match(node.node())) {
            return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
        }
    }
    return null;
}

2 冷热集群搭建及使用实践

2.1 集群规格选型

根据业务数据量及读写性能要求选择合适的冷热节点规格

  • 存储量计算:根据冷热数据各自数据量及要求保留时间,计算出冷热数据源数据量,然后使用如下公式计算出冷热节点各自的磁盘需求量
    
    实际空间 = 源数据 * (1 + 副本数量) * (1 + 数据膨胀) / (1 - 内部任务开销) / (1 - 操作系统预留)
        ≈ 源数据 * (1 + 副本数量) * 1.45
    ES建议存储容量 = 源数据 * (1 + 副本数量) * 1.45 * (1 + 预留空间)
        ≈ 源数据 * (1 + 副本数量) * 2.2
  • 副本数量:副本有利于增加数据的可靠性,但同时会增加存储成本。默认和建议的副本数量为1,对于部分可以承受异常情况导致数据丢失的场景,可考虑设置副本数量为0。
  • 数据膨胀:除原始数据外,ES 需要存储索引、列存数据等,在应用编码压缩等技术后,一般膨胀10%。
  • 内部任务开销:ES 占用约20%的磁盘空间,用于 segment 合并、ES Translog、日志等。
  • 操作系统预留:Linux 操作系统默认为 root 用户预留5%的磁盘空间,用于关键流程处理、系统恢复、防止磁盘碎片化问题等。
  • 预留空间:为保证集群的正常运行建议预留50%的存储空间
  • 计算资源预估:
    ES 的计算资源主要消耗在写入和查询过程,而不同业务场景在写入和查询方面的复杂度不同、比重不同,导致计算资源相比存储资源较难评估
    • 日志场景:日志属于典型的写多读少类场景,计算资源主要消耗在写入过程中。我们在日志场景的经验是:2核8GB内存的资源最大可支持0.5w/s的写入能力,但注意不同业务场景可能有偏差。由于实例性能基本随计算资源总量呈线性扩容,您可以按实例资源总量估算写入能力。例如6核24GB内存的资源可支持1.5w/s的写入能力,40核160GB内存的资源可支持10w/s的写入能力。
    • Metric 及 APM 等结构化数据场景:这也是写多读少类场景,但相比日志场景计算资源消耗较小,2核8GB内存的资源一般可支持1w/s的写入能力,您可参照日志场景线性扩展的方式,评估不同规格实例的实际写入能力。
    • 站内搜索及应用搜索等搜索场景:此类为读多写少类场景,计算资源主要消耗在查询过程,由于查询复杂度在不同使用场景差别非常大,计算资源也最难评估,建议您结合存储资源初步选择计算资源,然后在测试过程中验证、调整。

2.2 搭建集群

  • 自建

按照选定冷热节点规格部署服务器,搭建集群,热节点使用SSD盘,冷节点使用HDD盘,对热节点elasticsearcy.yml增加如下配置

node.attr.temperature: hot 

对冷节点增加如下配置

node.attr.temperature: warm

启动集群,冷热分离的Elasticsearch集群即搭建完成

  • 购买云ES服务

腾讯云预计于12月中旬上线冷热分离集群,用户只需要在创建页面上根据需要即可分钟级拉起一个冷热分离架构的ES集群,方便快速,扩展性好,运维成本低

  • 验证 使用如下命令可以验证节点冷热属性
    GET _cat/nodeattrs?v&h=node,attr,value&s=attr:desc
    node        attr        value
    node1   temperature     hot
    node2   temperature     hot
    node3   temperature     warm
    node4   temperature     hot
    node5   temperature     warm
    ...

    可以看到该集群为三热二冷的冷热分离集群(当然要注意如果其中有专用主节点或专用协调节点这类无法分配shard的节点,即使设置了冷热属性也不会有分片可以分配到其上)

3. 为索引设置冷热属性

业务方可以根据实际情况决定索引的冷热属性

  • 对于热数据,索引设置如下
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }
  • 对于冷数据,索引设置
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }
  • 验证
    创建索引
    PUT hot_warm_test_index
    {
    "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 3
    }
    }

    查看分片分配,可以看到分片均匀分配在五个节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 2     r      node2
    hot_data_index 2     p      node3
    hot_data_index 1     r      node4
    hot_data_index 0     p      node5

    设置索引为热索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }

    查看分片分配,发现分片均分配在热节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 0     p      node2
    hot_data_index 2     r      node2
    hot_data_index 2     p      node4
    hot_data_index 1     r      node4

    设置索引为冷索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }

    查看分片分配,发现分片均分配到冷节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node3
    hot_data_index 0     r      node3
    hot_data_index 2     r      node3
    hot_data_index 0     p      node5
    hot_data_index 2     p      node5
    hot_data_index 1     r      node5

4. 索引生命周期管理

从ES6.6开始,Elasticsearch提供索引生命周期管理功能,索引生命周期管理可以通过API或者kibana界面配置,详情参考[index-lifecycle-management], 本文仅通过kibana界面演示如何使用索引生命周期管理结合冷热分离架构实现索引数据的动态管理。
kibana中的索引生命周期管理位置如下图(版本6.8.2):
点击创建create policy,进入配置界面,可以看到索引的生命周期被分为:Hot phrase,Warm phase, Cold phase,Delete phrase四个阶段

  • Hot phrase: 该阶段可以根据索引的文档数,大小,时长决定是否调用rollover API来滚动索引,详情可以参考[indices-rollover-index],因与本文关系不大不再详细赘述。
  • Warm phrase: 当一个索引在Hot phrase被roll over后便会进入Warm phrase,进入该阶段的索引会被设置为read-only, 用户可以为这个索引设置要使用的attribute, 如对于冷热分离策略,这里可以选择temperature: warm属性。另外还可以对索引进行forceMerge, shrink等操作,这两个操作具体可以参考官方文档。
  • Cold phrase: 可以设置当索引rollover一段时间后进入cold阶段,这个阶段也可以设置一个属性。从冷热分离架构可以看出冷热属性是具备扩展性的,不仅可以指定hot, warm, 也可以扩展增加hot, warm, cold, freeze等多个冷热属性。如果想使用三层的冷热分离的话这里可以指定为temperature: cold, 此处还支持对索引的freeze操作,详情参考官方文档。
  • Delete phrase: 可以设置索引rollover一段时间后进入delete阶段,进入该阶段的索引会自动被删除。

冷热分离架构是Elasticsearch的经典架构之一,使用该架构用户可以在保证热数据良好读写性能的同时,仍可以存储海量的数据,极大地丰富了ES的应用场景,解决了用户的成本问题。再结合ES在6.6推出的索引生命周期管理,使得ES集群在使用性和自动化方面表现出色,真正地解决了用户在性能,存储成本,自动化数据管理等方面的问题。

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

继续阅读 »


性能与容量之间的矛盾由来已久,计算机的多级存储体系就是其中一个经典的例子,同样的问题在Elasticsearch中也存在。为了保证Elasticsearch的读写性能,官方建议磁盘使用SSD固态硬盘。然而Elasticsearch要解决的是海量数据的存储和检索问题,海量的数据就意味需要大量的存储空间,如果都使用SSD固态硬盘成本将成为一个很大的问题,这也是制约许多企业和个人使用Elasticsearch的因素之一。为了解决这个问题,Elasticsearch冷热分离架构应运而生。

1. 实现原理

1.1 节点异构

传统的Elasticsearch集群中所有节点均采用相同的配置,然而Elasticsearch并没有对节点的规格一致性做要求,换而言之就是每个节点可以是任意规格,当然这样做会导致集群各节点性能不一致,影响集群稳定性。但是如果有规则的将集群的节点分成不同类型,部分是高性能的节点用于存储热点数据,部分是性能相对差些的大容量节点用于存储冷数据,却可以一方面保证热数据的性能,另一方面保证冷数据的存储,降低存储成本,这也是Elasticsearch冷热分离架构的基本思想,如下图为一个3热节点,2冷节点的冷热分离Elasticsearch集群:

其中热节点为16核64GB 1TB SSD盘,用于满足对热数据对读写性能的要求,冷节点为8C32GB 5TB HDD在保证一定读写性能的基础之上提供了成本较低的大存储HDD盘来满足冷节点对数据存储的需求。

1.2 数据分布

集群节点异构后接着要考虑的是数据分布问题,即用户如何对冷热数据进行标识,并将冷数据移动到冷节点,热数据移动到热节点。

节点指定冷热属性

仅仅将不同的节点设置为不同的规格还不够,为了能明确区分出哪些节点是热节点,哪些节点是冷节点,需要为对应节点打标签
Elasticsearch支持给节点打标签,具体方式是在elasticsearch.yml文件中增加

node.attr.{attribute}: {value}

配置。其中attribute为用户自定义的任意标签名,value为该节点对应的该标签的值,例如对于冷热分离,可以使用如下设置

node.attr.temperature: hot //热节点
node.attr.temperature: warm //冷节点

ps:中文通常叫冷热,英文叫hot/warm

索引指定冷热属性

节点有了冷热属性后,接下来就是指定数据的冷热属性,来设置和调整数据分布。冷热分离方案中数据冷热分布的基本单位是索引,即指定某个索引为热索引,另一个索引为冷索引。通过索引的分布来实现控制数据分布的目的。 Elasticsearch提供了index shard filtering功能(2.x开始),该功能在索引配置中提供了如下几个配置

index.routing.allocation.include.{attribute}
Assign the index to a node whose {attribute} has at least one of the comma-separated values.

index.routing.allocation.require.{attribute}
Assign the index to a node whose {attribute} has all of the comma-separated values.

index.routing.allocation.exclude.{attribute}
Assign the index to a node whose {attribute} has none of the comma-separated values.

用户可以在创建索引,或后续的任意时刻设置这些配置来控制索引在不同标签节点上的分配动作。
index.routing.allocation.include.{attribute}表示索引可以分配在包含多个值中其中一个的节点上。
index.routing.allocation.require.{attribute}表示索引要分配在包含索引指定值的节点上(通常一般设置一个值)。
index.routing.allocation.exclude.{attribute}表示索引只能分配在不包含所有指定值的节点上。

数据分布控制

Elasticsearch的索引分片分配由ShardAllocator决定,ShardAllocator通过在索引分片创建或rebalance时对每个节点调用一系列AllocationDecider来决定是否将节点分配到指定节点上,其中一个AllocationDecider是FilterAllocationDecider,该decider用于应用集群,节点的一些基于attr的分配规则,涉及到节点级别配置的核心代码如下

private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
    if (indexMd.requireFilters() != null) {
        if (indexMd.requireFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
        }
    }
    if (indexMd.includeFilters() != null) {
        if (indexMd.includeFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
        }
    }
    if (indexMd.excludeFilters() != null) {
        if (indexMd.excludeFilters().match(node.node())) {
            return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
        }
    }
    return null;
}

2 冷热集群搭建及使用实践

2.1 集群规格选型

根据业务数据量及读写性能要求选择合适的冷热节点规格

  • 存储量计算:根据冷热数据各自数据量及要求保留时间,计算出冷热数据源数据量,然后使用如下公式计算出冷热节点各自的磁盘需求量
    
    实际空间 = 源数据 * (1 + 副本数量) * (1 + 数据膨胀) / (1 - 内部任务开销) / (1 - 操作系统预留)
        ≈ 源数据 * (1 + 副本数量) * 1.45
    ES建议存储容量 = 源数据 * (1 + 副本数量) * 1.45 * (1 + 预留空间)
        ≈ 源数据 * (1 + 副本数量) * 2.2
  • 副本数量:副本有利于增加数据的可靠性,但同时会增加存储成本。默认和建议的副本数量为1,对于部分可以承受异常情况导致数据丢失的场景,可考虑设置副本数量为0。
  • 数据膨胀:除原始数据外,ES 需要存储索引、列存数据等,在应用编码压缩等技术后,一般膨胀10%。
  • 内部任务开销:ES 占用约20%的磁盘空间,用于 segment 合并、ES Translog、日志等。
  • 操作系统预留:Linux 操作系统默认为 root 用户预留5%的磁盘空间,用于关键流程处理、系统恢复、防止磁盘碎片化问题等。
  • 预留空间:为保证集群的正常运行建议预留50%的存储空间
  • 计算资源预估:
    ES 的计算资源主要消耗在写入和查询过程,而不同业务场景在写入和查询方面的复杂度不同、比重不同,导致计算资源相比存储资源较难评估
    • 日志场景:日志属于典型的写多读少类场景,计算资源主要消耗在写入过程中。我们在日志场景的经验是:2核8GB内存的资源最大可支持0.5w/s的写入能力,但注意不同业务场景可能有偏差。由于实例性能基本随计算资源总量呈线性扩容,您可以按实例资源总量估算写入能力。例如6核24GB内存的资源可支持1.5w/s的写入能力,40核160GB内存的资源可支持10w/s的写入能力。
    • Metric 及 APM 等结构化数据场景:这也是写多读少类场景,但相比日志场景计算资源消耗较小,2核8GB内存的资源一般可支持1w/s的写入能力,您可参照日志场景线性扩展的方式,评估不同规格实例的实际写入能力。
    • 站内搜索及应用搜索等搜索场景:此类为读多写少类场景,计算资源主要消耗在查询过程,由于查询复杂度在不同使用场景差别非常大,计算资源也最难评估,建议您结合存储资源初步选择计算资源,然后在测试过程中验证、调整。

2.2 搭建集群

  • 自建

按照选定冷热节点规格部署服务器,搭建集群,热节点使用SSD盘,冷节点使用HDD盘,对热节点elasticsearcy.yml增加如下配置

node.attr.temperature: hot 

对冷节点增加如下配置

node.attr.temperature: warm

启动集群,冷热分离的Elasticsearch集群即搭建完成

  • 购买云ES服务

腾讯云预计于12月中旬上线冷热分离集群,用户只需要在创建页面上根据需要即可分钟级拉起一个冷热分离架构的ES集群,方便快速,扩展性好,运维成本低

  • 验证 使用如下命令可以验证节点冷热属性
    GET _cat/nodeattrs?v&h=node,attr,value&s=attr:desc
    node        attr        value
    node1   temperature     hot
    node2   temperature     hot
    node3   temperature     warm
    node4   temperature     hot
    node5   temperature     warm
    ...

    可以看到该集群为三热二冷的冷热分离集群(当然要注意如果其中有专用主节点或专用协调节点这类无法分配shard的节点,即使设置了冷热属性也不会有分片可以分配到其上)

3. 为索引设置冷热属性

业务方可以根据实际情况决定索引的冷热属性

  • 对于热数据,索引设置如下
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }
  • 对于冷数据,索引设置
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }
  • 验证
    创建索引
    PUT hot_warm_test_index
    {
    "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 3
    }
    }

    查看分片分配,可以看到分片均匀分配在五个节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 2     r      node2
    hot_data_index 2     p      node3
    hot_data_index 1     r      node4
    hot_data_index 0     p      node5

    设置索引为热索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }

    查看分片分配,发现分片均分配在热节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 0     p      node2
    hot_data_index 2     r      node2
    hot_data_index 2     p      node4
    hot_data_index 1     r      node4

    设置索引为冷索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }

    查看分片分配,发现分片均分配到冷节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node3
    hot_data_index 0     r      node3
    hot_data_index 2     r      node3
    hot_data_index 0     p      node5
    hot_data_index 2     p      node5
    hot_data_index 1     r      node5

4. 索引生命周期管理

从ES6.6开始,Elasticsearch提供索引生命周期管理功能,索引生命周期管理可以通过API或者kibana界面配置,详情参考[index-lifecycle-management], 本文仅通过kibana界面演示如何使用索引生命周期管理结合冷热分离架构实现索引数据的动态管理。
kibana中的索引生命周期管理位置如下图(版本6.8.2):
点击创建create policy,进入配置界面,可以看到索引的生命周期被分为:Hot phrase,Warm phase, Cold phase,Delete phrase四个阶段

  • Hot phrase: 该阶段可以根据索引的文档数,大小,时长决定是否调用rollover API来滚动索引,详情可以参考[indices-rollover-index],因与本文关系不大不再详细赘述。
  • Warm phrase: 当一个索引在Hot phrase被roll over后便会进入Warm phrase,进入该阶段的索引会被设置为read-only, 用户可以为这个索引设置要使用的attribute, 如对于冷热分离策略,这里可以选择temperature: warm属性。另外还可以对索引进行forceMerge, shrink等操作,这两个操作具体可以参考官方文档。
  • Cold phrase: 可以设置当索引rollover一段时间后进入cold阶段,这个阶段也可以设置一个属性。从冷热分离架构可以看出冷热属性是具备扩展性的,不仅可以指定hot, warm, 也可以扩展增加hot, warm, cold, freeze等多个冷热属性。如果想使用三层的冷热分离的话这里可以指定为temperature: cold, 此处还支持对索引的freeze操作,详情参考官方文档。
  • Delete phrase: 可以设置索引rollover一段时间后进入delete阶段,进入该阶段的索引会自动被删除。

冷热分离架构是Elasticsearch的经典架构之一,使用该架构用户可以在保证热数据良好读写性能的同时,仍可以存储海量的数据,极大地丰富了ES的应用场景,解决了用户的成本问题。再结合ES在6.6推出的索引生命周期管理,使得ES集群在使用性和自动化方面表现出色,真正地解决了用户在性能,存储成本,自动化数据管理等方面的问题。

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

收起阅读 »

2019年Elasticsearch用户调查

本次用户调查针对的是全国的ES用户,由Elastic官方联合阿里共同发起。调查结果报告会在2019年Elastic开发者大会-北京上正式发布。
poster.JPG

 
本次用户调查针对的是全国的ES用户,由Elastic官方联合阿里共同发起。调查结果报告会在2019年Elastic开发者大会-北京上正式发布。
poster.JPG

 

ES脚本性能优化一例

使用painless脚本为文档自定义打分是很常见的场景,对新人来说也是最容易造成性能问题的地方。本文中使用两个例子简单谈一下脚本性能优化。

目标

ES本身是基于倒排等数据结构实现的查询,因此在做类似Term、Match等可以利用底层数据结构的场景进行查询时,性能是很好的。

脚本和term等查询不一样,无法利用现有的各种数据结构,可以简单理解成循环:

docs = getDocs(xxx); // 获取满足条件的文档列表
for(Doc doc : docs) {
    score = getScoreByScript(doc);
}

因此脚本的性能取决于两个地方:脚本的复杂度和满足条件的文档数

例子1

我们有个场景是查询指定坐标指定范围内的POI列表,例如5公里内的景点列表。

由于我们的距离公式和ES默认的都不一致,如下:

/**
 * 计算距离,返回单位:米
 */
public static Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2) {
    double diffLon = Math.abs(lng1 - lng2);
    if (diffLon > 180)
        diffLon -= 360;
    return Math.sqrt(Math.pow(diffLon, 2) + Math.pow(lat1 - lat2, 2)) * 110.0 * 1000;
}

所以该同学把这段Java代码转成了Painless,在sort里使用这个该方法计算出距离。上线以后发现ES有了很多慢查询,对应的服务也95线、99线也比较高。

原因是其他脚本没有有效地缩小数据量,导致有几百万的数据需要使用该脚本做距离计算,给ES的CPU造成很大压力,查询性能也比较差。

该例子优化起来很简单,即使用ES自带的distance做较大范围的限制,例如需要5公里的数据,可以用ES的plain距离做限制,再加上之前的自定义脚本逻辑。由于ES的plain距离计算性能好很多,因此经过该过滤以后,自定义脚本的文档量少了很多,因此整体性能有了很大提升。

例子2

有个场景是对文章进行搜索,如果文章关联的城市是指定的几个城市,则给额外的加分。例如:

{
    "query": {xxx},
    "sort": [
    {
      "_script": {
        "script": {
          "source": "def score = 0;def cityIds = doc['cityIds']; def paramCityIds = params.cityIds; for (int i=0; i<cityIds.size(); i++){if (paramCityIds.contains(cityIds[i])){score += 100;}} return score;",
          "lang": "painless",
          "params": {
            "cityIds": [2,1,3]
          }
        },
        "type": "number",
        "order": "desc"
      }
    }
    ]
}

问题和例子1一样,该功能的性能比较差。虽然脚本简单,但是满足的文档量比较大,带来的计算量也比较多,因此性能上不去。

这是一个比较常见的场景,问题的根源还是对ES的机制不够了解,优化起来也很简单,想办法利用到倒排就可以了。

ES里有个专门针对改场景的查询:constant_score,因此以上查询可以修改为:

{
    "query": {
        "should": [
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 2
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 1
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 3
                            }
                        },
                        "boost": 5
                     }
            }
        ]
    },
    "sort": [
    {
      "_score": "desc"
    ]
}

性能即可得到极大改善。

继续阅读 »

使用painless脚本为文档自定义打分是很常见的场景,对新人来说也是最容易造成性能问题的地方。本文中使用两个例子简单谈一下脚本性能优化。

目标

ES本身是基于倒排等数据结构实现的查询,因此在做类似Term、Match等可以利用底层数据结构的场景进行查询时,性能是很好的。

脚本和term等查询不一样,无法利用现有的各种数据结构,可以简单理解成循环:

docs = getDocs(xxx); // 获取满足条件的文档列表
for(Doc doc : docs) {
    score = getScoreByScript(doc);
}

因此脚本的性能取决于两个地方:脚本的复杂度和满足条件的文档数

例子1

我们有个场景是查询指定坐标指定范围内的POI列表,例如5公里内的景点列表。

由于我们的距离公式和ES默认的都不一致,如下:

/**
 * 计算距离,返回单位:米
 */
public static Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2) {
    double diffLon = Math.abs(lng1 - lng2);
    if (diffLon > 180)
        diffLon -= 360;
    return Math.sqrt(Math.pow(diffLon, 2) + Math.pow(lat1 - lat2, 2)) * 110.0 * 1000;
}

所以该同学把这段Java代码转成了Painless,在sort里使用这个该方法计算出距离。上线以后发现ES有了很多慢查询,对应的服务也95线、99线也比较高。

原因是其他脚本没有有效地缩小数据量,导致有几百万的数据需要使用该脚本做距离计算,给ES的CPU造成很大压力,查询性能也比较差。

该例子优化起来很简单,即使用ES自带的distance做较大范围的限制,例如需要5公里的数据,可以用ES的plain距离做限制,再加上之前的自定义脚本逻辑。由于ES的plain距离计算性能好很多,因此经过该过滤以后,自定义脚本的文档量少了很多,因此整体性能有了很大提升。

例子2

有个场景是对文章进行搜索,如果文章关联的城市是指定的几个城市,则给额外的加分。例如:

{
    "query": {xxx},
    "sort": [
    {
      "_script": {
        "script": {
          "source": "def score = 0;def cityIds = doc['cityIds']; def paramCityIds = params.cityIds; for (int i=0; i<cityIds.size(); i++){if (paramCityIds.contains(cityIds[i])){score += 100;}} return score;",
          "lang": "painless",
          "params": {
            "cityIds": [2,1,3]
          }
        },
        "type": "number",
        "order": "desc"
      }
    }
    ]
}

问题和例子1一样,该功能的性能比较差。虽然脚本简单,但是满足的文档量比较大,带来的计算量也比较多,因此性能上不去。

这是一个比较常见的场景,问题的根源还是对ES的机制不够了解,优化起来也很简单,想办法利用到倒排就可以了。

ES里有个专门针对改场景的查询:constant_score,因此以上查询可以修改为:

{
    "query": {
        "should": [
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 2
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 1
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 3
                            }
                        },
                        "boost": 5
                     }
            }
        ]
    },
    "sort": [
    {
      "_score": "desc"
    ]
}

性能即可得到极大改善。

收起阅读 »

深入理解Elasticsearch写入过程

Elasticsearch 是当前主流的搜索引擎,其具有扩展性好,查询速度快,查询结果近实时等优点,本文将对Elasticsearch的写操作进行分析。

1. lucene的写操作及其问题

Elasticsearch底层使用Lucene来实现doc的读写操作,Lucene通过

public long addDocument(...);
public long deleteDocuments(...);
public long updateDocument(...);

三个方法来实现文档的写入,更新和删除操作。但是存在如下问题

  1. 没有并发设计
    lucene只是一个搜索引擎库,并没有涉及到分布式相关的设计,因此要想使用Lucene来处理海量数据,并利用分布式的能力,就必须在其之上进行分布式的相关设计。
  2. 非实时
    将文件写入lucence后并不能立即被检索,需要等待lucene生成一个完整的segment才能被检索
  3. 数据存储不可靠
    写入lucene的数据不会立即被持久化到磁盘,如果服务器宕机,那存储在内存中的数据将会丢失
  4. 不支持部分更新
    lucene中提供的updateDocuments仅支持对文档的全量更新,对部分更新不支持

2. Elasticsearch的写入方案

针对Lucene的问题,ES做了如下设计

2.1 分布式设计:

为了支持对海量数据的存储和查询,Elasticsearch引入分片的概念,一个索引被分成多个分片,每个分片可以有一个主分片和多个副本分片,每个分片副本都是一个具有完整功能的lucene实例。分片可以分配在不同的服务器上,同一个分片的不同副本不能分配在相同的服务器上。
在进行写操作时,ES会根据传入的_routing参数(或mapping中设置的_routing, 如果参数和设置中都没有则默认使用_id), 按照公式shard_num = hash(\routing) % num_primary_shards,计算出文档要分配到的分片,在从集群元数据中找出对应主分片的位置,将请求路由到该分片进行文档写操作。

2.2 近实时性-refresh操作

当一个文档写入Lucene后是不能被立即查询到的,Elasticsearch提供了一个refresh操作,会定时地调用lucene的reopen(新版本为openIfChanged)为内存中新写入的数据生成一个新的segment,此时被处理的文档均可以被检索到。refresh操作的时间间隔由refresh_interval参数控制,默认为1s, 当然还可以在写入请求中带上refresh表示写入后立即refresh,另外还可以调用refresh API显式refresh。

2.3 数据存储可靠性

  1. 引入translog
    当一个文档写入Lucence后是存储在内存中的,即使执行了refresh操作仍然是在文件系统缓存中,如果此时服务器宕机,那么这部分数据将会丢失。为此ES增加了translog, 当进行文档写操作时会先将文档写入Lucene,然后写入一份到translog,写入translog是落盘的(如果对可靠性要求不是很高,也可以设置异步落盘,可以提高性能,由配置index.translog.durabilityindex.translog.sync_interval控制),这样就可以防止服务器宕机后数据的丢失。由于translog是追加写入,因此性能比较好。与传统的分布式系统不同,这里是先写入Lucene再写入translog,原因是写入Lucene可能会失败,为了减少写入失败回滚的复杂度,因此先写入Lucene.
  2. flush操作
    另外每30分钟或当translog达到一定大小(由index.translog.flush_threshold_size控制,默认512mb), ES会触发一次flush操作,此时ES会先执行refresh操作将buffer中的数据生成segment,然后调用lucene的commit方法将所有内存中的segment fsync到磁盘。此时lucene中的数据就完成了持久化,会清空translog中的数据(6.x版本为了实现sequenceIDs,不删除translog)
  3. merge操作
    由于refresh默认间隔为1s中,因此会产生大量的小segment,为此ES会运行一个任务检测当前磁盘中的segment,对符合条件的segment进行合并操作,减少lucene中的segment个数,提高查询速度,降低负载。不仅如此,merge过程也是文档删除和更新操作后,旧的doc真正被删除的时候。用户还可以手动调用_forcemerge API来主动触发merge,以减少集群的segment个数和清理已删除或更新的文档。
  4. 多副本机制
    另外ES有多副本机制,一个分片的主副分片不能分片在同一个节点上,进一步保证数据的可靠性。

    2.4 部分更新

    lucene支持对文档的整体更新,ES为了支持局部更新,在Lucene的Store索引中存储了一个_source字段,该字段的key值是文档ID, 内容是文档的原文。当进行更新操作时先从_source中获取原文,与更新部分合并后,再调用lucene API进行全量更新, 对于写入了ES但是还没有refresh的文档,可以从translog中获取。另外为了防止读取文档过程后执行更新前有其他线程修改了文档,ES增加了版本机制,当执行更新操作时发现当前文档的版本与预期不符,则会重新获取文档再更新。

3. ES的写入流程

ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard, primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。大致流程如下图:

3.1 coordinating节点

ES中接收并转发请求的节点称为coordinating节点,ES中所有节点都可以接受并转发请求。当一个节点接受到写请求或更新请求后,会执行如下操作:

  1. ingest pipeline
    查看该请求是否符合某个ingest pipeline的pattern, 如果符合则执行pipeline中的逻辑,一般是对文档进行各种预处理,如格式调整,增加字段等。如果当前节点没有ingest角色,则需要将请求转发给有ingest角色的节点执行。
  2. 自动创建索引
    判断索引是否存在,如果开启了自动创建则自动创建,否则报错
  3. 设置routing
    获取请求URL或mapping中的_routing,如果没有则使用_id, 如果没有指定_id则ES会自动生成一个全局唯一ID。该_routing字段用于决定文档分配在索引的哪个shard上。
  4. 构建BulkShardRequest
    由于Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执行,因此协调节点,会将请求按照shard分开,同一个shard上的请求聚合到一起,构建BulkShardRequest
  5. 将请求发送给primary shard
    因为当前执行的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点
  6. 等待primary shard返回

3.2 primary shard

Primary请求的入口是PrimaryOperationTransportHandler的MessageReceived, 当接收到请求时,执行的逻辑如下

  1. 判断操作类型
    遍历bulk请求中的各子请求,根据不同的操作类型跳转到不同的处理逻辑
  2. 将update操作转换为Index和Delete操作
    获取文档的当前内容,与update内容合并生成新文档,然后将update请求转换成index请求,此处文档设置一个version v1
  3. Parse Doc
    解析文档的各字段,并添加如_uid等ES相关的一些系统字段
  4. 更新mapping
    对于新增字段会根据dynamic mapping或dynamic template生成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常
  5. 获取sequence Id和Version
    从SequcenceNumberService获取一个sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根据当前Versoin+1用于防止并发写导致数据不一致。
  6. 写入lucene
    这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。
  7. 写入translog
    写入Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满足nosql场景的实时性。
  8. 重构bulk request
    因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进行调整,只包含index或delete操作,不需要再进行update的处理操作。
  9. flush translog
    默认情况下,translog要在此处落盘完成,如果对可靠性要求不高,可以设置translog异步,那么translog的fsync将会异步执行,但是落盘前的数据有丢失风险。
  10. 发送请求给replicas
    将构造好的bulkrequest并发发送给各replicas,等待replica返回,这里需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执行失败,则primary会给master发请求remove该shard。这里会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
  11. 等待replica响应
    当所有的replica返回请求时,更细primary shard的LocalCheckPoint。

3.3 replica shard

Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,当replica shard接收到请求时执行如下流程:

  1. 判断操作类型
    replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执行对应的操作
  2. Parse Doc
  3. 更新mapping
  4. 获取sequenceId和Version 直接使用primary shard发送过来的请求中的内容即可
  5. 写如lucene
  6. write Translog
  7. Flush translog

4 总结与分析

Elasticsearch建立在Lucene基础之上,底层采用lucene来实现文件的读写操作,实现了文档的存储和高效查询。然后lucene作为一个搜索库在应对海量数据的存储上仍有一些不足之处。
Elasticsearch通过引入分片概念,成功地将lucene部署到分布式系统中,增强了系统的可靠性和扩展性。
Elasticsearch通过定期refresh lucene in-momory-buffer中的数据,使得ES具有了近实时的写入和查询能力。
Elasticsearch通过引入translog,多副本,以及定期执行flush,merge等操作保证了数据可靠性和较高的存储性能。
Elasticsearch通过存储_source字段结合verison字段实现了文档的局部更新,使得ES的使用方式更加灵活多样。
Elasticsearch基于lucene,又不简单地只是lucene,它完美地将lucene与分布式系统结合,既利用了lucene的检索能力,又具有了分布式系统的众多优点。

本文参考

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

继续阅读 »

Elasticsearch 是当前主流的搜索引擎,其具有扩展性好,查询速度快,查询结果近实时等优点,本文将对Elasticsearch的写操作进行分析。

1. lucene的写操作及其问题

Elasticsearch底层使用Lucene来实现doc的读写操作,Lucene通过

public long addDocument(...);
public long deleteDocuments(...);
public long updateDocument(...);

三个方法来实现文档的写入,更新和删除操作。但是存在如下问题

  1. 没有并发设计
    lucene只是一个搜索引擎库,并没有涉及到分布式相关的设计,因此要想使用Lucene来处理海量数据,并利用分布式的能力,就必须在其之上进行分布式的相关设计。
  2. 非实时
    将文件写入lucence后并不能立即被检索,需要等待lucene生成一个完整的segment才能被检索
  3. 数据存储不可靠
    写入lucene的数据不会立即被持久化到磁盘,如果服务器宕机,那存储在内存中的数据将会丢失
  4. 不支持部分更新
    lucene中提供的updateDocuments仅支持对文档的全量更新,对部分更新不支持

2. Elasticsearch的写入方案

针对Lucene的问题,ES做了如下设计

2.1 分布式设计:

为了支持对海量数据的存储和查询,Elasticsearch引入分片的概念,一个索引被分成多个分片,每个分片可以有一个主分片和多个副本分片,每个分片副本都是一个具有完整功能的lucene实例。分片可以分配在不同的服务器上,同一个分片的不同副本不能分配在相同的服务器上。
在进行写操作时,ES会根据传入的_routing参数(或mapping中设置的_routing, 如果参数和设置中都没有则默认使用_id), 按照公式shard_num = hash(\routing) % num_primary_shards,计算出文档要分配到的分片,在从集群元数据中找出对应主分片的位置,将请求路由到该分片进行文档写操作。

2.2 近实时性-refresh操作

当一个文档写入Lucene后是不能被立即查询到的,Elasticsearch提供了一个refresh操作,会定时地调用lucene的reopen(新版本为openIfChanged)为内存中新写入的数据生成一个新的segment,此时被处理的文档均可以被检索到。refresh操作的时间间隔由refresh_interval参数控制,默认为1s, 当然还可以在写入请求中带上refresh表示写入后立即refresh,另外还可以调用refresh API显式refresh。

2.3 数据存储可靠性

  1. 引入translog
    当一个文档写入Lucence后是存储在内存中的,即使执行了refresh操作仍然是在文件系统缓存中,如果此时服务器宕机,那么这部分数据将会丢失。为此ES增加了translog, 当进行文档写操作时会先将文档写入Lucene,然后写入一份到translog,写入translog是落盘的(如果对可靠性要求不是很高,也可以设置异步落盘,可以提高性能,由配置index.translog.durabilityindex.translog.sync_interval控制),这样就可以防止服务器宕机后数据的丢失。由于translog是追加写入,因此性能比较好。与传统的分布式系统不同,这里是先写入Lucene再写入translog,原因是写入Lucene可能会失败,为了减少写入失败回滚的复杂度,因此先写入Lucene.
  2. flush操作
    另外每30分钟或当translog达到一定大小(由index.translog.flush_threshold_size控制,默认512mb), ES会触发一次flush操作,此时ES会先执行refresh操作将buffer中的数据生成segment,然后调用lucene的commit方法将所有内存中的segment fsync到磁盘。此时lucene中的数据就完成了持久化,会清空translog中的数据(6.x版本为了实现sequenceIDs,不删除translog)
  3. merge操作
    由于refresh默认间隔为1s中,因此会产生大量的小segment,为此ES会运行一个任务检测当前磁盘中的segment,对符合条件的segment进行合并操作,减少lucene中的segment个数,提高查询速度,降低负载。不仅如此,merge过程也是文档删除和更新操作后,旧的doc真正被删除的时候。用户还可以手动调用_forcemerge API来主动触发merge,以减少集群的segment个数和清理已删除或更新的文档。
  4. 多副本机制
    另外ES有多副本机制,一个分片的主副分片不能分片在同一个节点上,进一步保证数据的可靠性。

    2.4 部分更新

    lucene支持对文档的整体更新,ES为了支持局部更新,在Lucene的Store索引中存储了一个_source字段,该字段的key值是文档ID, 内容是文档的原文。当进行更新操作时先从_source中获取原文,与更新部分合并后,再调用lucene API进行全量更新, 对于写入了ES但是还没有refresh的文档,可以从translog中获取。另外为了防止读取文档过程后执行更新前有其他线程修改了文档,ES增加了版本机制,当执行更新操作时发现当前文档的版本与预期不符,则会重新获取文档再更新。

3. ES的写入流程

ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard, primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。大致流程如下图:

3.1 coordinating节点

ES中接收并转发请求的节点称为coordinating节点,ES中所有节点都可以接受并转发请求。当一个节点接受到写请求或更新请求后,会执行如下操作:

  1. ingest pipeline
    查看该请求是否符合某个ingest pipeline的pattern, 如果符合则执行pipeline中的逻辑,一般是对文档进行各种预处理,如格式调整,增加字段等。如果当前节点没有ingest角色,则需要将请求转发给有ingest角色的节点执行。
  2. 自动创建索引
    判断索引是否存在,如果开启了自动创建则自动创建,否则报错
  3. 设置routing
    获取请求URL或mapping中的_routing,如果没有则使用_id, 如果没有指定_id则ES会自动生成一个全局唯一ID。该_routing字段用于决定文档分配在索引的哪个shard上。
  4. 构建BulkShardRequest
    由于Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执行,因此协调节点,会将请求按照shard分开,同一个shard上的请求聚合到一起,构建BulkShardRequest
  5. 将请求发送给primary shard
    因为当前执行的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点
  6. 等待primary shard返回

3.2 primary shard

Primary请求的入口是PrimaryOperationTransportHandler的MessageReceived, 当接收到请求时,执行的逻辑如下

  1. 判断操作类型
    遍历bulk请求中的各子请求,根据不同的操作类型跳转到不同的处理逻辑
  2. 将update操作转换为Index和Delete操作
    获取文档的当前内容,与update内容合并生成新文档,然后将update请求转换成index请求,此处文档设置一个version v1
  3. Parse Doc
    解析文档的各字段,并添加如_uid等ES相关的一些系统字段
  4. 更新mapping
    对于新增字段会根据dynamic mapping或dynamic template生成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常
  5. 获取sequence Id和Version
    从SequcenceNumberService获取一个sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根据当前Versoin+1用于防止并发写导致数据不一致。
  6. 写入lucene
    这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。
  7. 写入translog
    写入Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满足nosql场景的实时性。
  8. 重构bulk request
    因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进行调整,只包含index或delete操作,不需要再进行update的处理操作。
  9. flush translog
    默认情况下,translog要在此处落盘完成,如果对可靠性要求不高,可以设置translog异步,那么translog的fsync将会异步执行,但是落盘前的数据有丢失风险。
  10. 发送请求给replicas
    将构造好的bulkrequest并发发送给各replicas,等待replica返回,这里需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执行失败,则primary会给master发请求remove该shard。这里会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
  11. 等待replica响应
    当所有的replica返回请求时,更细primary shard的LocalCheckPoint。

3.3 replica shard

Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,当replica shard接收到请求时执行如下流程:

  1. 判断操作类型
    replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执行对应的操作
  2. Parse Doc
  3. 更新mapping
  4. 获取sequenceId和Version 直接使用primary shard发送过来的请求中的内容即可
  5. 写如lucene
  6. write Translog
  7. Flush translog

4 总结与分析

Elasticsearch建立在Lucene基础之上,底层采用lucene来实现文件的读写操作,实现了文档的存储和高效查询。然后lucene作为一个搜索库在应对海量数据的存储上仍有一些不足之处。
Elasticsearch通过引入分片概念,成功地将lucene部署到分布式系统中,增强了系统的可靠性和扩展性。
Elasticsearch通过定期refresh lucene in-momory-buffer中的数据,使得ES具有了近实时的写入和查询能力。
Elasticsearch通过引入translog,多副本,以及定期执行flush,merge等操作保证了数据可靠性和较高的存储性能。
Elasticsearch通过存储_source字段结合verison字段实现了文档的局部更新,使得ES的使用方式更加灵活多样。
Elasticsearch基于lucene,又不简单地只是lucene,它完美地将lucene与分布式系统结合,既利用了lucene的检索能力,又具有了分布式系统的众多优点。

本文参考

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

收起阅读 »

ES Aggs根据聚合的结果(数值)进行过滤

前言

我们在使用聚合时总是有各种各样的聚合需求,其中一个比较常用的就是根据聚合的结果过滤聚合的桶,例如:1、每个IP登录次数超过5次的IP;2、每个IP登录人数超过2的IP。 还有我之前的一个案例,访问量超过1000的人数,这些都是很常见的统计需求。

案例需求

我们在使用聚合计算的时候一般都有两类,一种是计算文档的数量,另一种是计算文档内字段的值的数量(去重计算)或者值的数学计算。两种聚合计算在过滤的时候采用不同的方法来计算。

我们使用以下案例来说明两种过滤的不同: 用户每次登录都会记录一个登录记录:

{"userID":"a","IP":"10.70.25.1","time":"2019-10-10 12:12:12.222"}

然后提出以下两个需求: 1、每个IP登录次数超过5次的IP; 2、每个IP登录人数超过2的IP。

实现

每个IP登录次数超过5次的IP

这个是对登录记录个数的桶聚合统计,然后过滤。使用IP做term聚合,就可以得出每个IP的登录次数,然后term聚合中有一个参数min_doc_count这个字段就可以对文档数量进行过滤,具体的语句如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": 5
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 614,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.90.139",
          "doc_count" : 61
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21
        },
        {
          "key" : "10.25.75.52",
          "doc_count" : 18
        },
        {
          "key" : "10.25.89.32",
          "doc_count" : 13
        },
        {
          "key" : "10.25.93.243",
          "doc_count" : 10
        },
        {
          "key" : "10.25.78.189",
          "doc_count" : 9
        },
        {
          "key" : "10.25.90.82",
          "doc_count" : 8
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8
        },
        {
          "key" : "10.25.90.57",
          "doc_count" : 7
        },
        {
          "key" : "10.25.91.251",
          "doc_count" : 7
        },
        {
          "key" : "10.25.95.166",
          "doc_count" : 6
        },
        {
          "key" : "10.25.89.33",
          "doc_count" : 5
        },
        {
          "key" : "10.25.90.88",
          "doc_count" : 5
        },
        {
          "key" : "10.25.92.53",
          "doc_count" : 5
        }
      ]
    }
  }
}

每个IP登录人数超过2的IP

这个是对登录记录用户ID的去重数聚合,然后过滤。对用户ID进行去重可以使用Cardinality Aggregation聚合,然后再使用Bucket Selector Aggregation聚合过滤器过滤数据。具体内容如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "distinct": "desc"
        },
        "min_doc_count": 5
      },
      "aggs": {
        "distinct": {
          "cardinality": {
            "field": "IP.keyword"
          }
        },
        "dd":{
          "bucket_selector": {
            "buckets_path": {"userCount":"distinct"},
            "script": "params.userCount > 2"
          }
        }
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 317,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.75.52",
          "doc_count" : 18,
          "distinct" : {
            "value" : 4
          }
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.90.139",
          "doc_count" : 61,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21,
          "distinct" : {
            "value" : 3
          }
        }
      ]
    }
  }
}

桶聚合选择器: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-bucket-selector-aggregation.html

继续阅读 »

前言

我们在使用聚合时总是有各种各样的聚合需求,其中一个比较常用的就是根据聚合的结果过滤聚合的桶,例如:1、每个IP登录次数超过5次的IP;2、每个IP登录人数超过2的IP。 还有我之前的一个案例,访问量超过1000的人数,这些都是很常见的统计需求。

案例需求

我们在使用聚合计算的时候一般都有两类,一种是计算文档的数量,另一种是计算文档内字段的值的数量(去重计算)或者值的数学计算。两种聚合计算在过滤的时候采用不同的方法来计算。

我们使用以下案例来说明两种过滤的不同: 用户每次登录都会记录一个登录记录:

{"userID":"a","IP":"10.70.25.1","time":"2019-10-10 12:12:12.222"}

然后提出以下两个需求: 1、每个IP登录次数超过5次的IP; 2、每个IP登录人数超过2的IP。

实现

每个IP登录次数超过5次的IP

这个是对登录记录个数的桶聚合统计,然后过滤。使用IP做term聚合,就可以得出每个IP的登录次数,然后term聚合中有一个参数min_doc_count这个字段就可以对文档数量进行过滤,具体的语句如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": 5
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 614,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.90.139",
          "doc_count" : 61
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21
        },
        {
          "key" : "10.25.75.52",
          "doc_count" : 18
        },
        {
          "key" : "10.25.89.32",
          "doc_count" : 13
        },
        {
          "key" : "10.25.93.243",
          "doc_count" : 10
        },
        {
          "key" : "10.25.78.189",
          "doc_count" : 9
        },
        {
          "key" : "10.25.90.82",
          "doc_count" : 8
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8
        },
        {
          "key" : "10.25.90.57",
          "doc_count" : 7
        },
        {
          "key" : "10.25.91.251",
          "doc_count" : 7
        },
        {
          "key" : "10.25.95.166",
          "doc_count" : 6
        },
        {
          "key" : "10.25.89.33",
          "doc_count" : 5
        },
        {
          "key" : "10.25.90.88",
          "doc_count" : 5
        },
        {
          "key" : "10.25.92.53",
          "doc_count" : 5
        }
      ]
    }
  }
}

每个IP登录人数超过2的IP

这个是对登录记录用户ID的去重数聚合,然后过滤。对用户ID进行去重可以使用Cardinality Aggregation聚合,然后再使用Bucket Selector Aggregation聚合过滤器过滤数据。具体内容如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "distinct": "desc"
        },
        "min_doc_count": 5
      },
      "aggs": {
        "distinct": {
          "cardinality": {
            "field": "IP.keyword"
          }
        },
        "dd":{
          "bucket_selector": {
            "buckets_path": {"userCount":"distinct"},
            "script": "params.userCount > 2"
          }
        }
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 317,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.75.52",
          "doc_count" : 18,
          "distinct" : {
            "value" : 4
          }
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.90.139",
          "doc_count" : 61,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21,
          "distinct" : {
            "value" : 3
          }
        }
      ]
    }
  }
}

桶聚合选择器: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-bucket-selector-aggregation.html

收起阅读 »

聚合元数据——对聚合结果进行打标签

背景

在我们的项目中需要对聚合后的结果进行二次的terms的聚合。实际需求就是有n个模型,需要统计每个模型在每段时间的调用次数,然后需要查询指定m个模型的调用总次数。我们要为每个模型建立一个索引,然后为每个模型查一次这段时间内的使用次数。

注:我们记录的值只能从结果中拿取。

实现过程

第一次设计

每个模型在第一次设计的时候是两个字段,【调用次数】、【错误次数】,使用以下语句:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 215,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1053,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

解析后保存:

{"@timestamp":"2019-09-23T12:23:23.333","count":0,"error":0}

这种情况可以统计每个模型在某段时间内的调用次数,使用索引名来区分每个model。但是在统计指定m个模型的时候就不行了,使用索引名来查询的时候由于是指定m个,前缀不能使用* 匹配,并且不能罗列所有m个索引来查询,就无法达到统计的效果。 第一次设计因为不能在结果中记录模型ID导致不能统计指定的m个模型的数量,以失败告终。

第二次设计

既然需要在统计结果中记录模型ID,那就使用terms聚合来进行操作,先使用模型ID过滤一下数据,然后使用聚合唯一的模型ID,这样就有了模型ID。查询语句如下:

{
  "aggs": {
    "modelId": {
      "terms": {
        "field": "modelId.keyword",
        "size": 5,
        "order": {
          "_count": "desc"
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h",
              "lte": "now/h",
              "format": "epoch_millis"
            }
          }
        },
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 140,
    "successful" : 140,
    "skipped" : 135,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "modelId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ ]
    }
  }
}

在有模型调用的时候这个方法还好用,但是在无模型调用的时候这个返回结果就如上面的一样,是不包含任何信息的,错误次数的0和模型ID都没有了。 第二次设计因为在无模型调用的时候导致模型ID不能记录,然后也是不能实现指定m个模型的查询次数统计,也以失败告终。

第三次设计

经过两次失败的实际案例,我发现现有的知识已经不能满足需求了,我需要新的方法,我需要一个能给查询结果添加字段的方法,所以我去查询官方文档,让我找到了这个方法聚合元数据 也就是对聚合结果进行打标签。 我使用第一次的设计方案,然后添加上对聚合结果打标签的方法,就可以记录一次统计值的模型ID了。 查询语句如下:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      },
      "meta": {
        "modelId": "modelId01"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果为:

{
  "took" : 88,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1056,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "meta" : {
        "modelId" : "modelId01"
      },
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

至此完成了统计需求。

总结

使用聚合元数据方法,可以对聚合的结果进行打标签,可以使用在聚合结果保存后再次进行terms聚合的时候使用,或者通过标签进行各种其他查询。

继续阅读 »

背景

在我们的项目中需要对聚合后的结果进行二次的terms的聚合。实际需求就是有n个模型,需要统计每个模型在每段时间的调用次数,然后需要查询指定m个模型的调用总次数。我们要为每个模型建立一个索引,然后为每个模型查一次这段时间内的使用次数。

注:我们记录的值只能从结果中拿取。

实现过程

第一次设计

每个模型在第一次设计的时候是两个字段,【调用次数】、【错误次数】,使用以下语句:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 215,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1053,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

解析后保存:

{"@timestamp":"2019-09-23T12:23:23.333","count":0,"error":0}

这种情况可以统计每个模型在某段时间内的调用次数,使用索引名来区分每个model。但是在统计指定m个模型的时候就不行了,使用索引名来查询的时候由于是指定m个,前缀不能使用* 匹配,并且不能罗列所有m个索引来查询,就无法达到统计的效果。 第一次设计因为不能在结果中记录模型ID导致不能统计指定的m个模型的数量,以失败告终。

第二次设计

既然需要在统计结果中记录模型ID,那就使用terms聚合来进行操作,先使用模型ID过滤一下数据,然后使用聚合唯一的模型ID,这样就有了模型ID。查询语句如下:

{
  "aggs": {
    "modelId": {
      "terms": {
        "field": "modelId.keyword",
        "size": 5,
        "order": {
          "_count": "desc"
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h",
              "lte": "now/h",
              "format": "epoch_millis"
            }
          }
        },
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 140,
    "successful" : 140,
    "skipped" : 135,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "modelId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ ]
    }
  }
}

在有模型调用的时候这个方法还好用,但是在无模型调用的时候这个返回结果就如上面的一样,是不包含任何信息的,错误次数的0和模型ID都没有了。 第二次设计因为在无模型调用的时候导致模型ID不能记录,然后也是不能实现指定m个模型的查询次数统计,也以失败告终。

第三次设计

经过两次失败的实际案例,我发现现有的知识已经不能满足需求了,我需要新的方法,我需要一个能给查询结果添加字段的方法,所以我去查询官方文档,让我找到了这个方法聚合元数据 也就是对聚合结果进行打标签。 我使用第一次的设计方案,然后添加上对聚合结果打标签的方法,就可以记录一次统计值的模型ID了。 查询语句如下:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      },
      "meta": {
        "modelId": "modelId01"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果为:

{
  "took" : 88,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1056,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "meta" : {
        "modelId" : "modelId01"
      },
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

至此完成了统计需求。

总结

使用聚合元数据方法,可以对聚合的结果进行打标签,可以使用在聚合结果保存后再次进行terms聚合的时候使用,或者通过标签进行各种其他查询。

收起阅读 »

Lucene doc_value的理解复盘

之前看过很多doc_Value相关的文章,也看过elasticsearch官方的描述,但是总是感觉对doc_value如何提升聚合排序性能这块说的很简单(也许是我个人理解力的问题),总是不能深刻理解没有doc_Value的话会牺牲多大性能,再加上每次研究一半就搁置了,所以一直没能很好理解这块知识点,今天来重点复盘下。  本文不是科普文,不一定是完全正确的,大家可以讨论。
 
首先从elasticsearch入手,我们建立一个索引结构如下:
{
"info": {
"mappings": {
"info": {
"_all": {
"enabled": false
},
"properties": {
"age": {
"type": "int"
},
"time": {
"type": "long"
}
}
}
}
}
}
我这里特地拿非字符串类型的值做举例,大家在用MySQL的时候大多是用值精准匹配等条件,没有涉及到分词和多值,所以一开始很多用分词举例的时候我就糊涂了,这里我用数字举例说明下。
 
假设我们的需求是查询time=10的各个age包含的info有多少。这是一个搜索兼聚合的需求,首先要搜出来time:[1 , 10]的结果,然后在这些doc结果里统计,他们的age各自分别占了多少个doc。语句如下:
{
"query": {
"term": {
"time": 10
}
},
"aggs": {
"age_terms": {
"terms": {
"field": "age"
}
}
}
}

假设我们的数据是这样的:
docId        time           age
1               10               20
2              8                 21
3              9                 24
4              2                 23
5              10                22
6              10                22
 
 
结果也很明显了,返回doc为1,5,6的内容,聚合结果, age=20的doc_count=1,  age=22的doc_coutn=2。
 
demo介绍完了,说下假设没有doc_value,只有倒排索引我们要怎么实现这个效果。
 
首先搜索不用说了,走time字段的倒排索引,下表:
2:4
8:2
9:3
10:1,5,6
一目了然,一下就能把post_list结果拿到。
 
那么聚合是在1,5,6这个结果之上做的,因为聚合字段和排序字段不一样,time倒排索引中不包含age的信息,所以要想对age做聚合,就需要做类似回表的操作想办法取回每个doc对应的age值,然后在内存中遍历下算出来每个age对应的doc有几个(当然这个例子最多只有3个doc_count, 返回值只有3个doc)。
 
取回的方式,可以走age的倒排索引,遍历age索引的term_dictionary(不了解的同学可以理解为这就是一个排序的term列表),遍历要做的事情是什么呢:取出每个term后面的postling_list,看下list中有没有对应我搜索结果的doc_id,有的话记录下有几个,给这个term做下标记记录term的doc_count。 这个过程就比较狗了,假设term非常多,遍历一次的代价就非常高,所以在没有doc_value的情况下,遍历倒排索引就显得很差劲,倒排索引的价值本来就是避免数据遍历提升搜索性能,到了这种聚合需求上反而要用我做这么高代价的事儿,所以才需要一个出路避免这个问题。
 
这时候有人会说了,一个info对应一个age,那么当我遍历的时候,记录下这个doc有没有遍历过,当所有doc_id都遍历过一遍后就不遍历了,能提升一些性能吧。  首先如果运气不好,碰上你的doc要聚合的term出现在term_dictionary最后一个的时候,时间开销还是最坏情况的,其次,我这里用的是单个数值,如果一个field是多值的呢,换句话说,如果是分词的字符串呢,多值和分词其实一样,这种情况下当你遍历到一个term包含结果的某个doc_id时你不能说这个doc不会再后续的其他term中出现了,你还是要继续遍历完,所以为了满足这个需求,遍历全部term_dictionary是没跑的。
 
排序其实也差不多的,如果你要根据非搜索结果进行排序,一样要取出结果doc,对应排序字段的值,然后内存排序,所以内存排序聚合我觉得不是开销大的点,开销大的地方在于遍历倒排索引
 
其实讲到这里我的疑惑基本就解决了。接下来再说下doc_value
 
doc_value通俗的讲就是正排索引(这个词儿基本上所有文章都会这么说,说的也对,就是看多了有点想吐。。),其实正排索引就是我上面列举出来的那个值列表,只不过是一个字段一个doc_value,跟倒排索引一样,不是放在一起的(这点区别mysql聚集索引的叶子节点,聚集索引叶子节点是保存了全部的值,用主键绑定的,这里也说明了Lucene是索引和数据分离的,而MySQL是索引即数据,我是这么理解)。
time的doc_value

docId        time           
1               10              
2              8                
3              9                 
4              2                 
5              10               
6              10                
 
age的doc_value

docId        age
1               20
2               21
3               24
4               23
5               22
6               22
 
现在我们有了这样的一个结构可就厉害了,回到刚才的场景,搜索完结果是1,5,6,如果想按age进行聚合或者排序,那么只需要我们来到doc_value中,根据我们的id列表和doc_value的列表做交集拿出来结果对应的age就可以了,然后内存排序一下,实际上doc_value完全可以按照value先排好序,然后我们交集的结果就是排序结果了,都不用内存再排一次(暂时还不知道doc_value是不是根据value排过序)。
 
 
最后再说下我觉得其实可以优化的地方,之前我在问答也提问过,就是当搜索和排序/聚合使用相同字段的时候,还要不要走doc_value的问题。
 
因为如果是同一个字段那么完全没必要回表了,倒排索引本来就是field有序的所以排序问题解决。
 
其次聚合,刚才的例子中是单值的,如果是多值的话也不要紧,因为我们都知道查询条件的值了,单值的话 doc_count就等于total,多值的话 doc_count就等于其posting_list的length,多简单,省的走一次doc_value的操作。
 
不过那个问答回答的人太少,回答我的人的答复也蛮精彩的但是我仍然觉得这个操作是可以分开的。
 
求各路大神们对本文的观点做出指点和评论,或者我理解不对的地方提出指正。 对于想了解doc_value的同学希望这篇文章能解决你们的疑惑。
 
 
 
 
 
 
 
 
 
 
 
 
继续阅读 »
之前看过很多doc_Value相关的文章,也看过elasticsearch官方的描述,但是总是感觉对doc_value如何提升聚合排序性能这块说的很简单(也许是我个人理解力的问题),总是不能深刻理解没有doc_Value的话会牺牲多大性能,再加上每次研究一半就搁置了,所以一直没能很好理解这块知识点,今天来重点复盘下。  本文不是科普文,不一定是完全正确的,大家可以讨论。
 
首先从elasticsearch入手,我们建立一个索引结构如下:
{
"info": {
"mappings": {
"info": {
"_all": {
"enabled": false
},
"properties": {
"age": {
"type": "int"
},
"time": {
"type": "long"
}
}
}
}
}
}
我这里特地拿非字符串类型的值做举例,大家在用MySQL的时候大多是用值精准匹配等条件,没有涉及到分词和多值,所以一开始很多用分词举例的时候我就糊涂了,这里我用数字举例说明下。
 
假设我们的需求是查询time=10的各个age包含的info有多少。这是一个搜索兼聚合的需求,首先要搜出来time:[1 , 10]的结果,然后在这些doc结果里统计,他们的age各自分别占了多少个doc。语句如下:
{
"query": {
"term": {
"time": 10
}
},
"aggs": {
"age_terms": {
"terms": {
"field": "age"
}
}
}
}

假设我们的数据是这样的:
docId        time           age
1               10               20
2              8                 21
3              9                 24
4              2                 23
5              10                22
6              10                22
 
 
结果也很明显了,返回doc为1,5,6的内容,聚合结果, age=20的doc_count=1,  age=22的doc_coutn=2。
 
demo介绍完了,说下假设没有doc_value,只有倒排索引我们要怎么实现这个效果。
 
首先搜索不用说了,走time字段的倒排索引,下表:
2:4
8:2
9:3
10:1,5,6
一目了然,一下就能把post_list结果拿到。
 
那么聚合是在1,5,6这个结果之上做的,因为聚合字段和排序字段不一样,time倒排索引中不包含age的信息,所以要想对age做聚合,就需要做类似回表的操作想办法取回每个doc对应的age值,然后在内存中遍历下算出来每个age对应的doc有几个(当然这个例子最多只有3个doc_count, 返回值只有3个doc)。
 
取回的方式,可以走age的倒排索引,遍历age索引的term_dictionary(不了解的同学可以理解为这就是一个排序的term列表),遍历要做的事情是什么呢:取出每个term后面的postling_list,看下list中有没有对应我搜索结果的doc_id,有的话记录下有几个,给这个term做下标记记录term的doc_count。 这个过程就比较狗了,假设term非常多,遍历一次的代价就非常高,所以在没有doc_value的情况下,遍历倒排索引就显得很差劲,倒排索引的价值本来就是避免数据遍历提升搜索性能,到了这种聚合需求上反而要用我做这么高代价的事儿,所以才需要一个出路避免这个问题。
 
这时候有人会说了,一个info对应一个age,那么当我遍历的时候,记录下这个doc有没有遍历过,当所有doc_id都遍历过一遍后就不遍历了,能提升一些性能吧。  首先如果运气不好,碰上你的doc要聚合的term出现在term_dictionary最后一个的时候,时间开销还是最坏情况的,其次,我这里用的是单个数值,如果一个field是多值的呢,换句话说,如果是分词的字符串呢,多值和分词其实一样,这种情况下当你遍历到一个term包含结果的某个doc_id时你不能说这个doc不会再后续的其他term中出现了,你还是要继续遍历完,所以为了满足这个需求,遍历全部term_dictionary是没跑的。
 
排序其实也差不多的,如果你要根据非搜索结果进行排序,一样要取出结果doc,对应排序字段的值,然后内存排序,所以内存排序聚合我觉得不是开销大的点,开销大的地方在于遍历倒排索引
 
其实讲到这里我的疑惑基本就解决了。接下来再说下doc_value
 
doc_value通俗的讲就是正排索引(这个词儿基本上所有文章都会这么说,说的也对,就是看多了有点想吐。。),其实正排索引就是我上面列举出来的那个值列表,只不过是一个字段一个doc_value,跟倒排索引一样,不是放在一起的(这点区别mysql聚集索引的叶子节点,聚集索引叶子节点是保存了全部的值,用主键绑定的,这里也说明了Lucene是索引和数据分离的,而MySQL是索引即数据,我是这么理解)。
time的doc_value

docId        time           
1               10              
2              8                
3              9                 
4              2                 
5              10               
6              10                
 
age的doc_value

docId        age
1               20
2               21
3               24
4               23
5               22
6               22
 
现在我们有了这样的一个结构可就厉害了,回到刚才的场景,搜索完结果是1,5,6,如果想按age进行聚合或者排序,那么只需要我们来到doc_value中,根据我们的id列表和doc_value的列表做交集拿出来结果对应的age就可以了,然后内存排序一下,实际上doc_value完全可以按照value先排好序,然后我们交集的结果就是排序结果了,都不用内存再排一次(暂时还不知道doc_value是不是根据value排过序)。
 
 
最后再说下我觉得其实可以优化的地方,之前我在问答也提问过,就是当搜索和排序/聚合使用相同字段的时候,还要不要走doc_value的问题。
 
因为如果是同一个字段那么完全没必要回表了,倒排索引本来就是field有序的所以排序问题解决。
 
其次聚合,刚才的例子中是单值的,如果是多值的话也不要紧,因为我们都知道查询条件的值了,单值的话 doc_count就等于total,多值的话 doc_count就等于其posting_list的length,多简单,省的走一次doc_value的操作。
 
不过那个问答回答的人太少,回答我的人的答复也蛮精彩的但是我仍然觉得这个操作是可以分开的。
 
求各路大神们对本文的观点做出指点和评论,或者我理解不对的地方提出指正。 对于想了解doc_value的同学希望这篇文章能解决你们的疑惑。
 
 
 
 
 
 
 
 
 
 
 
  收起阅读 »

Elastic日报 第735期 (2019-09-21)

1.es字符串包含查询的几种方案

http://t.cn/AinyGFZD

2.es6.2版本利用脚本查询为空数据的方法

http://t.cn/AinyqBJN

3.es中 “先filter后aggs” 和 “在aggs执行filter” 的区别

http://t.cn/AinyfBkq

继续阅读 »

1.es字符串包含查询的几种方案

http://t.cn/AinyGFZD

2.es6.2版本利用脚本查询为空数据的方法

http://t.cn/AinyqBJN

3.es中 “先filter后aggs” 和 “在aggs执行filter” 的区别

http://t.cn/AinyfBkq

收起阅读 »

关于sum bucket 与Filter Aggregation 结合的写法分享

我们可以用ES的桶来实现这样的需求,找出汽车厂商每个颜色的车辆数量
当如果需求是,找出汽车厂商里的车一共有多少种颜色呢?
 我们可以使用sum bucket ,默认情况下,可以参考官方文档
https://www.elastic.co/guide/e ... ml%23
 
但如果我们想找到售价大于100W的车,有多少种颜色呢?
这就需要Filter Aggregation   和 上文的sum bucket结合
 
这里有个需要注意的地方,sum_bucket的位置不要跟 Filter Aggregation 同级,而是应该在其下一级:
    "query": {
"query": {
"bool": {
"filter": {
"bool": {
"must": [
{
"term": {
"delflag": 0
}
},
{
"term": {
"is_child": 1
}
}
],
"must_not":
}
}
}
},
"aggs": {
"valid_sales_people": {
"filter": {
"bool": {
"must_not": {
"terms": {
"status": [
0,
1,
7
]
}
}
}
},
"aggs": {
"render_people": {
"terms": {
"field": "uid",
"size": 2147483647
},
"aggs": {
"unique_people": {
"cardinality": {
"field": "uid"
}
}
}
},
"member_count": {
"sum_bucket": {
"buckets_path": "render_people>unique_people"
}
}
}
}
}
},
如代码所示,我希望对status not in (0,1,7)的uid(用户)进行聚合,并想得到uid桶种类的求和,那么member_count就应该与render_people同级,而不是把member_count放到和与valid_sales_people同一级。
 
如果将member_count 与valid_sales_people放到同一级,会报一个错:sum_bucket的第一个聚合必须是多桶聚合。究其原因,应该是加上filter aggregation后,valid_sales_people已不具有多桶聚合属性(因为附带了filter过滤条件),而其下的render_people则具有了多桶聚合属性。所以member_count应该与render_people放在同级,对应的buckets_path也自然改为render_people>unique_people
 

 
继续阅读 »
我们可以用ES的桶来实现这样的需求,找出汽车厂商每个颜色的车辆数量
当如果需求是,找出汽车厂商里的车一共有多少种颜色呢?
 我们可以使用sum bucket ,默认情况下,可以参考官方文档
https://www.elastic.co/guide/e ... ml%23
 
但如果我们想找到售价大于100W的车,有多少种颜色呢?
这就需要Filter Aggregation   和 上文的sum bucket结合
 
这里有个需要注意的地方,sum_bucket的位置不要跟 Filter Aggregation 同级,而是应该在其下一级:
    "query": {
"query": {
"bool": {
"filter": {
"bool": {
"must": [
{
"term": {
"delflag": 0
}
},
{
"term": {
"is_child": 1
}
}
],
"must_not":
}
}
}
},
"aggs": {
"valid_sales_people": {
"filter": {
"bool": {
"must_not": {
"terms": {
"status": [
0,
1,
7
]
}
}
}
},
"aggs": {
"render_people": {
"terms": {
"field": "uid",
"size": 2147483647
},
"aggs": {
"unique_people": {
"cardinality": {
"field": "uid"
}
}
}
},
"member_count": {
"sum_bucket": {
"buckets_path": "render_people>unique_people"
}
}
}
}
}
},
如代码所示,我希望对status not in (0,1,7)的uid(用户)进行聚合,并想得到uid桶种类的求和,那么member_count就应该与render_people同级,而不是把member_count放到和与valid_sales_people同一级。
 
如果将member_count 与valid_sales_people放到同一级,会报一个错:sum_bucket的第一个聚合必须是多桶聚合。究其原因,应该是加上filter aggregation后,valid_sales_people已不具有多桶聚合属性(因为附带了filter过滤条件),而其下的render_people则具有了多桶聚合属性。所以member_count应该与render_people放在同级,对应的buckets_path也自然改为render_people>unique_people
 

  收起阅读 »

记一次“访问量超过1000的人数”统计,计算聚合桶的个数

前言

众所周知,在ES中有各种聚合方法能够是数据分析简单、高效。但是在繁杂的聚合方法中找到满足我们需求的那个,需要我们自己去实践。下面我就说明一下“访问量超过1000的人数”统计案例的实现。

需求

ES在使用过程中,我们公司有一个需求,就是需要统计活跃用户数,我们定义活跃用户数为:今日访问量超过1000的用户,所以我们统计活跃用户数的时候需要统计“访问量超过1000的人数”。

之前的做法

第一版统计活跃用户数的方法由于对复杂的聚合统计不熟悉的原因,就把统计分为了两步。 第一步:在ES中使用字段聚合每个用户的访问数量,数量大于1000;

查询语句

{
  "aggs": {
    "user": {
      "terms": {
        "field": "userId.keyword",
        "size": 10000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": "1000"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "startTime": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 203,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 67470,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "admin",
          "doc_count" : 46998
        },
        {
          "key" : "nameless",
          "doc_count" : 8416
        },
        {
          "key" : "li",
          "doc_count" : 2486
        },
        {
          "key" : "liu",
          "doc_count" : 2183
        },
        {
          "key" : "111111",
          "doc_count" : 1281
        }
      ]
    }
  }
}

第二步:从ES中获取第一步的统计结果,然后统计用户桶的个数,达到统计出个数的效果。

改进后的做法

改进后就是直接使用ES的查询,使用了sum_bucket聚合,是计算每个用户的用户ID独立数,也就是每个用户的用户ID独立数都是1,然后用桶聚合求和,得到所有的人数。 参考链接:[sum bucket聚合](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-sum-bucket-aggregation.html)

查询语句

{
  "aggs": {
    "usercount": {
      "sum_bucket": {
        "buckets_path": "usercount-bucket>usercount-metric"
      }
    },
    "usercount-bucket": {
      "terms": {
        "field": "userId.keyword",
        "size": 10,
        "order": {
          "_key": "desc"
        },
        "min_doc_count": "1000"
      },
      "aggs": {
        "usercount-metric": {
          "cardinality": {
            "field": "userId.keyword"
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "x_st": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 106,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 63956,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "usercount-bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "nameless",
          "doc_count" : 8278,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "liu",
          "doc_count" : 2142,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "li",
          "doc_count" : 1928,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "admin",
          "doc_count" : 44395,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "111111",
          "doc_count" : 1281,
          "usercount-metric" : {
            "value" : 1
          }
        }
      ]
    },
    "usercount" : {
      "value" : 5.0
    }
  }
}
继续阅读 »

前言

众所周知,在ES中有各种聚合方法能够是数据分析简单、高效。但是在繁杂的聚合方法中找到满足我们需求的那个,需要我们自己去实践。下面我就说明一下“访问量超过1000的人数”统计案例的实现。

需求

ES在使用过程中,我们公司有一个需求,就是需要统计活跃用户数,我们定义活跃用户数为:今日访问量超过1000的用户,所以我们统计活跃用户数的时候需要统计“访问量超过1000的人数”。

之前的做法

第一版统计活跃用户数的方法由于对复杂的聚合统计不熟悉的原因,就把统计分为了两步。 第一步:在ES中使用字段聚合每个用户的访问数量,数量大于1000;

查询语句

{
  "aggs": {
    "user": {
      "terms": {
        "field": "userId.keyword",
        "size": 10000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": "1000"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "startTime": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 203,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 67470,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "admin",
          "doc_count" : 46998
        },
        {
          "key" : "nameless",
          "doc_count" : 8416
        },
        {
          "key" : "li",
          "doc_count" : 2486
        },
        {
          "key" : "liu",
          "doc_count" : 2183
        },
        {
          "key" : "111111",
          "doc_count" : 1281
        }
      ]
    }
  }
}

第二步:从ES中获取第一步的统计结果,然后统计用户桶的个数,达到统计出个数的效果。

改进后的做法

改进后就是直接使用ES的查询,使用了sum_bucket聚合,是计算每个用户的用户ID独立数,也就是每个用户的用户ID独立数都是1,然后用桶聚合求和,得到所有的人数。 参考链接:[sum bucket聚合](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-sum-bucket-aggregation.html)

查询语句

{
  "aggs": {
    "usercount": {
      "sum_bucket": {
        "buckets_path": "usercount-bucket>usercount-metric"
      }
    },
    "usercount-bucket": {
      "terms": {
        "field": "userId.keyword",
        "size": 10,
        "order": {
          "_key": "desc"
        },
        "min_doc_count": "1000"
      },
      "aggs": {
        "usercount-metric": {
          "cardinality": {
            "field": "userId.keyword"
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "x_st": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 106,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 63956,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "usercount-bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "nameless",
          "doc_count" : 8278,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "liu",
          "doc_count" : 2142,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "li",
          "doc_count" : 1928,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "admin",
          "doc_count" : 44395,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "111111",
          "doc_count" : 1281,
          "usercount-metric" : {
            "value" : 1
          }
        }
      ]
    },
    "usercount" : {
      "value" : 5.0
    }
  }
}
收起阅读 »

ES6.8权限使用配置


概述
ES的权限控制一直ES使用中的一个问题,因为官方之前一直未免费安全性功能。公司要不选择使用其他插件来解决,要不就是只在内网使用。现在在ES6.8及以后版本ES将部分安全性功能免费开放了, 现在我们就6.8版本的【基于角色的访问控制】进行操作、验证。
1、下载安装ELK6.8版本(此处省略)
ES在6.8以后发布的版本才有(7.0是发布在6.8之前的)
2、修改ES配置文件 elasticsearch.yml
在配置文件中添加:
xpack.security.enabled: true
基础版本的安全性功能是默认关闭的。
然后启动ES
./elasticsearch -d
3、设置内置用户密码
参考:内置用户
./bin/elasticsearch-setup-passwords interactive
这里设置的密码要记住,后面会使用到。我们设置简单的密码:123456(密码不能少于6位)
如果是Windows请使用CMD命令行执行
按照提示设置内置用户密码
4、设置kibana用户名密码
在kibana的配置文件kibana.yml里面添加
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
5、然后启动kibana
启动kibana就可以使用用户名与密码进行访问。

Image.png


6、设置logstash用户名和密码
打开配置文件conf,在output中的elasticsearch中添加user、password
例:
output {
elasticsearch {
hosts => ["10.68.24.136:9200","10.68.24.137:9200"]
index => "%{[indexName]}-%{+YYYY.MM.dd}"
user => "logstash_system"
password => "123456"
}
继续阅读 »

概述
ES的权限控制一直ES使用中的一个问题,因为官方之前一直未免费安全性功能。公司要不选择使用其他插件来解决,要不就是只在内网使用。现在在ES6.8及以后版本ES将部分安全性功能免费开放了, 现在我们就6.8版本的【基于角色的访问控制】进行操作、验证。
1、下载安装ELK6.8版本(此处省略)
ES在6.8以后发布的版本才有(7.0是发布在6.8之前的)
2、修改ES配置文件 elasticsearch.yml
在配置文件中添加:
xpack.security.enabled: true
基础版本的安全性功能是默认关闭的。
然后启动ES
./elasticsearch -d
3、设置内置用户密码
参考:内置用户
./bin/elasticsearch-setup-passwords interactive
这里设置的密码要记住,后面会使用到。我们设置简单的密码:123456(密码不能少于6位)
如果是Windows请使用CMD命令行执行
按照提示设置内置用户密码
4、设置kibana用户名密码
在kibana的配置文件kibana.yml里面添加
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
5、然后启动kibana
启动kibana就可以使用用户名与密码进行访问。

Image.png


6、设置logstash用户名和密码
打开配置文件conf,在output中的elasticsearch中添加user、password
例:
output {
elasticsearch {
hosts => ["10.68.24.136:9200","10.68.24.137:9200"]
index => "%{[indexName]}-%{+YYYY.MM.dd}"
user => "logstash_system"
password => "123456"
}
收起阅读 »

从MySQL建立Elasticsearch索引-索引

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

收起阅读 »

从MySQL建立Elasticsearch索引-引言

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

收起阅读 »

Elastic日报 第661期 (2019-07-06)

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

继续阅读 »

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

收起阅读 »