橡皮、老虎皮、狮子皮哪一个最不好?

ES脚本性能优化一例

使用painless脚本为文档自定义打分是很常见的场景,对新人来说也是最容易造成性能问题的地方。本文中使用两个例子简单谈一下脚本性能优化。

目标

ES本身是基于倒排等数据结构实现的查询,因此在做类似Term、Match等可以利用底层数据结构的场景进行查询时,性能是很好的。

脚本和term等查询不一样,无法利用现有的各种数据结构,可以简单理解成循环:

docs = getDocs(xxx); // 获取满足条件的文档列表
for(Doc doc : docs) {
    score = getScoreByScript(doc);
}

因此脚本的性能取决于两个地方:脚本的复杂度和满足条件的文档数

例子1

我们有个场景是查询指定坐标指定范围内的POI列表,例如5公里内的景点列表。

由于我们的距离公式和ES默认的都不一致,如下:

/**
 * 计算距离,返回单位:米
 */
public static Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2) {
    double diffLon = Math.abs(lng1 - lng2);
    if (diffLon > 180)
        diffLon -= 360;
    return Math.sqrt(Math.pow(diffLon, 2) + Math.pow(lat1 - lat2, 2)) * 110.0 * 1000;
}

所以该同学把这段Java代码转成了Painless,在sort里使用这个该方法计算出距离。上线以后发现ES有了很多慢查询,对应的服务也95线、99线也比较高。

原因是其他脚本没有有效地缩小数据量,导致有几百万的数据需要使用该脚本做距离计算,给ES的CPU造成很大压力,查询性能也比较差。

该例子优化起来很简单,即使用ES自带的distance做较大范围的限制,例如需要5公里的数据,可以用ES的plain距离做限制,再加上之前的自定义脚本逻辑。由于ES的plain距离计算性能好很多,因此经过该过滤以后,自定义脚本的文档量少了很多,因此整体性能有了很大提升。

例子2

有个场景是对文章进行搜索,如果文章关联的城市是指定的几个城市,则给额外的加分。例如:

{
    "query": {xxx},
    "sort": [
    {
      "_script": {
        "script": {
          "source": "def score = 0;def cityIds = doc['cityIds']; def paramCityIds = params.cityIds; for (int i=0; i<cityIds.size(); i++){if (paramCityIds.contains(cityIds[i])){score += 100;}} return score;",
          "lang": "painless",
          "params": {
            "cityIds": [2,1,3]
          }
        },
        "type": "number",
        "order": "desc"
      }
    }
    ]
}

问题和例子1一样,该功能的性能比较差。虽然脚本简单,但是满足的文档量比较大,带来的计算量也比较多,因此性能上不去。

这是一个比较常见的场景,问题的根源还是对ES的机制不够了解,优化起来也很简单,想办法利用到倒排就可以了。

ES里有个专门针对改场景的查询:constant_score,因此以上查询可以修改为:

{
    "query": {
        "should": [
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 2
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 1
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 3
                            }
                        },
                        "boost": 5
                     }
            }
        ]
    },
    "sort": [
    {
      "_score": "desc"
    ]
}

性能即可得到极大改善。

继续阅读 »

使用painless脚本为文档自定义打分是很常见的场景,对新人来说也是最容易造成性能问题的地方。本文中使用两个例子简单谈一下脚本性能优化。

目标

ES本身是基于倒排等数据结构实现的查询,因此在做类似Term、Match等可以利用底层数据结构的场景进行查询时,性能是很好的。

脚本和term等查询不一样,无法利用现有的各种数据结构,可以简单理解成循环:

docs = getDocs(xxx); // 获取满足条件的文档列表
for(Doc doc : docs) {
    score = getScoreByScript(doc);
}

因此脚本的性能取决于两个地方:脚本的复杂度和满足条件的文档数

例子1

我们有个场景是查询指定坐标指定范围内的POI列表,例如5公里内的景点列表。

由于我们的距离公式和ES默认的都不一致,如下:

/**
 * 计算距离,返回单位:米
 */
public static Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2) {
    double diffLon = Math.abs(lng1 - lng2);
    if (diffLon > 180)
        diffLon -= 360;
    return Math.sqrt(Math.pow(diffLon, 2) + Math.pow(lat1 - lat2, 2)) * 110.0 * 1000;
}

所以该同学把这段Java代码转成了Painless,在sort里使用这个该方法计算出距离。上线以后发现ES有了很多慢查询,对应的服务也95线、99线也比较高。

原因是其他脚本没有有效地缩小数据量,导致有几百万的数据需要使用该脚本做距离计算,给ES的CPU造成很大压力,查询性能也比较差。

该例子优化起来很简单,即使用ES自带的distance做较大范围的限制,例如需要5公里的数据,可以用ES的plain距离做限制,再加上之前的自定义脚本逻辑。由于ES的plain距离计算性能好很多,因此经过该过滤以后,自定义脚本的文档量少了很多,因此整体性能有了很大提升。

例子2

有个场景是对文章进行搜索,如果文章关联的城市是指定的几个城市,则给额外的加分。例如:

{
    "query": {xxx},
    "sort": [
    {
      "_script": {
        "script": {
          "source": "def score = 0;def cityIds = doc['cityIds']; def paramCityIds = params.cityIds; for (int i=0; i<cityIds.size(); i++){if (paramCityIds.contains(cityIds[i])){score += 100;}} return score;",
          "lang": "painless",
          "params": {
            "cityIds": [2,1,3]
          }
        },
        "type": "number",
        "order": "desc"
      }
    }
    ]
}

问题和例子1一样,该功能的性能比较差。虽然脚本简单,但是满足的文档量比较大,带来的计算量也比较多,因此性能上不去。

这是一个比较常见的场景,问题的根源还是对ES的机制不够了解,优化起来也很简单,想办法利用到倒排就可以了。

ES里有个专门针对改场景的查询:constant_score,因此以上查询可以修改为:

{
    "query": {
        "should": [
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 2
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 1
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 3
                            }
                        },
                        "boost": 5
                     }
            }
        ]
    },
    "sort": [
    {
      "_score": "desc"
    ]
}

性能即可得到极大改善。

收起阅读 »

深入理解Elasticsearch写入过程

Elasticsearch 是当前主流的搜索引擎,其具有扩展性好,查询速度快,查询结果近实时等优点,本文将对Elasticsearch的写操作进行分析。

1. lucene的写操作及其问题

Elasticsearch底层使用Lucene来实现doc的读写操作,Lucene通过

public long addDocument(...);
public long deleteDocuments(...);
public long updateDocument(...);

三个方法来实现文档的写入,更新和删除操作。但是存在如下问题

  1. 没有并发设计
    lucene只是一个搜索引擎库,并没有涉及到分布式相关的设计,因此要想使用Lucene来处理海量数据,并利用分布式的能力,就必须在其之上进行分布式的相关设计。
  2. 非实时
    将文件写入lucence后并不能立即被检索,需要等待lucene生成一个完整的segment才能被检索
  3. 数据存储不可靠
    写入lucene的数据不会立即被持久化到磁盘,如果服务器宕机,那存储在内存中的数据将会丢失
  4. 不支持部分更新
    lucene中提供的updateDocuments仅支持对文档的全量更新,对部分更新不支持

2. Elasticsearch的写入方案

针对Lucene的问题,ES做了如下设计

2.1 分布式设计:

为了支持对海量数据的存储和查询,Elasticsearch引入分片的概念,一个索引被分成多个分片,每个分片可以有一个主分片和多个副本分片,每个分片副本都是一个具有完整功能的lucene实例。分片可以分配在不同的服务器上,同一个分片的不同副本不能分配在相同的服务器上。
在进行写操作时,ES会根据传入的_routing参数(或mapping中设置的_routing, 如果参数和设置中都没有则默认使用_id), 按照公式shard_num = hash(\routing) % num_primary_shards,计算出文档要分配到的分片,在从集群元数据中找出对应主分片的位置,将请求路由到该分片进行文档写操作。

2.2 近实时性-refresh操作

当一个文档写入Lucene后是不能被立即查询到的,Elasticsearch提供了一个refresh操作,会定时地调用lucene的reopen(新版本为openIfChanged)为内存中新写入的数据生成一个新的segment,此时被处理的文档均可以被检索到。refresh操作的时间间隔由refresh_interval参数控制,默认为1s, 当然还可以在写入请求中带上refresh表示写入后立即refresh,另外还可以调用refresh API显式refresh。

2.3 数据存储可靠性

  1. 引入translog
    当一个文档写入Lucence后是存储在内存中的,即使执行了refresh操作仍然是在文件系统缓存中,如果此时服务器宕机,那么这部分数据将会丢失。为此ES增加了translog, 当进行文档写操作时会先将文档写入Lucene,然后写入一份到translog,写入translog是落盘的(如果对可靠性要求不是很高,也可以设置异步落盘,可以提高性能,由配置index.translog.durabilityindex.translog.sync_interval控制),这样就可以防止服务器宕机后数据的丢失。由于translog是追加写入,因此性能比较好。与传统的分布式系统不同,这里是先写入Lucene再写入translog,原因是写入Lucene可能会失败,为了减少写入失败回滚的复杂度,因此先写入Lucene.
  2. flush操作
    另外每30分钟或当translog达到一定大小(由index.translog.flush_threshold_size控制,默认512mb), ES会触发一次flush操作,此时ES会先执行refresh操作将buffer中的数据生成segment,然后调用lucene的commit方法将所有内存中的segment fsync到磁盘。此时lucene中的数据就完成了持久化,会清空translog中的数据(6.x版本为了实现sequenceIDs,不删除translog)
  3. merge操作
    由于refresh默认间隔为1s中,因此会产生大量的小segment,为此ES会运行一个任务检测当前磁盘中的segment,对符合条件的segment进行合并操作,减少lucene中的segment个数,提高查询速度,降低负载。不仅如此,merge过程也是文档删除和更新操作后,旧的doc真正被删除的时候。用户还可以手动调用_forcemerge API来主动触发merge,以减少集群的segment个数和清理已删除或更新的文档。
  4. 多副本机制
    另外ES有多副本机制,一个分片的主副分片不能分片在同一个节点上,进一步保证数据的可靠性。

    2.4 部分更新

    lucene支持对文档的整体更新,ES为了支持局部更新,在Lucene的Store索引中存储了一个_source字段,该字段的key值是文档ID, 内容是文档的原文。当进行更新操作时先从_source中获取原文,与更新部分合并后,再调用lucene API进行全量更新, 对于写入了ES但是还没有refresh的文档,可以从translog中获取。另外为了防止读取文档过程后执行更新前有其他线程修改了文档,ES增加了版本机制,当执行更新操作时发现当前文档的版本与预期不符,则会重新获取文档再更新。

3. ES的写入流程

ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard, primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。大致流程如下图:

3.1 coordinating节点

ES中接收并转发请求的节点称为coordinating节点,ES中所有节点都可以接受并转发请求。当一个节点接受到写请求或更新请求后,会执行如下操作:

  1. ingest pipeline
    查看该请求是否符合某个ingest pipeline的pattern, 如果符合则执行pipeline中的逻辑,一般是对文档进行各种预处理,如格式调整,增加字段等。如果当前节点没有ingest角色,则需要将请求转发给有ingest角色的节点执行。
  2. 自动创建索引
    判断索引是否存在,如果开启了自动创建则自动创建,否则报错
  3. 设置routing
    获取请求URL或mapping中的_routing,如果没有则使用_id, 如果没有指定_id则ES会自动生成一个全局唯一ID。该_routing字段用于决定文档分配在索引的哪个shard上。
  4. 构建BulkShardRequest
    由于Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执行,因此协调节点,会将请求按照shard分开,同一个shard上的请求聚合到一起,构建BulkShardRequest
  5. 将请求发送给primary shard
    因为当前执行的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点
  6. 等待primary shard返回

3.2 primary shard

Primary请求的入口是PrimaryOperationTransportHandler的MessageReceived, 当接收到请求时,执行的逻辑如下

  1. 判断操作类型
    遍历bulk请求中的各子请求,根据不同的操作类型跳转到不同的处理逻辑
  2. 将update操作转换为Index和Delete操作
    获取文档的当前内容,与update内容合并生成新文档,然后将update请求转换成index请求,此处文档设置一个version v1
  3. Parse Doc
    解析文档的各字段,并添加如_uid等ES相关的一些系统字段
  4. 更新mapping
    对于新增字段会根据dynamic mapping或dynamic template生成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常
  5. 获取sequence Id和Version
    从SequcenceNumberService获取一个sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根据当前Versoin+1用于防止并发写导致数据不一致。
  6. 写入lucene
    这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。
  7. 写入translog
    写入Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满足nosql场景的实时性。
  8. 重构bulk request
    因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进行调整,只包含index或delete操作,不需要再进行update的处理操作。
  9. flush translog
    默认情况下,translog要在此处落盘完成,如果对可靠性要求不高,可以设置translog异步,那么translog的fsync将会异步执行,但是落盘前的数据有丢失风险。
  10. 发送请求给replicas
    将构造好的bulkrequest并发发送给各replicas,等待replica返回,这里需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执行失败,则primary会给master发请求remove该shard。这里会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
  11. 等待replica响应
    当所有的replica返回请求时,更细primary shard的LocalCheckPoint。

3.3 replica shard

Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,当replica shard接收到请求时执行如下流程:

  1. 判断操作类型
    replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执行对应的操作
  2. Parse Doc
  3. 更新mapping
  4. 获取sequenceId和Version 直接使用primary shard发送过来的请求中的内容即可
  5. 写如lucene
  6. write Translog
  7. Flush translog

4 总结与分析

Elasticsearch建立在Lucene基础之上,底层采用lucene来实现文件的读写操作,实现了文档的存储和高效查询。然后lucene作为一个搜索库在应对海量数据的存储上仍有一些不足之处。
Elasticsearch通过引入分片概念,成功地将lucene部署到分布式系统中,增强了系统的可靠性和扩展性。
Elasticsearch通过定期refresh lucene in-momory-buffer中的数据,使得ES具有了近实时的写入和查询能力。
Elasticsearch通过引入translog,多副本,以及定期执行flush,merge等操作保证了数据可靠性和较高的存储性能。
Elasticsearch通过存储_source字段结合verison字段实现了文档的局部更新,使得ES的使用方式更加灵活多样。
Elasticsearch基于lucene,又不简单地只是lucene,它完美地将lucene与分布式系统结合,既利用了lucene的检索能力,又具有了分布式系统的众多优点。

本文参考

继续阅读 »

Elasticsearch 是当前主流的搜索引擎,其具有扩展性好,查询速度快,查询结果近实时等优点,本文将对Elasticsearch的写操作进行分析。

1. lucene的写操作及其问题

Elasticsearch底层使用Lucene来实现doc的读写操作,Lucene通过

public long addDocument(...);
public long deleteDocuments(...);
public long updateDocument(...);

三个方法来实现文档的写入,更新和删除操作。但是存在如下问题

  1. 没有并发设计
    lucene只是一个搜索引擎库,并没有涉及到分布式相关的设计,因此要想使用Lucene来处理海量数据,并利用分布式的能力,就必须在其之上进行分布式的相关设计。
  2. 非实时
    将文件写入lucence后并不能立即被检索,需要等待lucene生成一个完整的segment才能被检索
  3. 数据存储不可靠
    写入lucene的数据不会立即被持久化到磁盘,如果服务器宕机,那存储在内存中的数据将会丢失
  4. 不支持部分更新
    lucene中提供的updateDocuments仅支持对文档的全量更新,对部分更新不支持

2. Elasticsearch的写入方案

针对Lucene的问题,ES做了如下设计

2.1 分布式设计:

为了支持对海量数据的存储和查询,Elasticsearch引入分片的概念,一个索引被分成多个分片,每个分片可以有一个主分片和多个副本分片,每个分片副本都是一个具有完整功能的lucene实例。分片可以分配在不同的服务器上,同一个分片的不同副本不能分配在相同的服务器上。
在进行写操作时,ES会根据传入的_routing参数(或mapping中设置的_routing, 如果参数和设置中都没有则默认使用_id), 按照公式shard_num = hash(\routing) % num_primary_shards,计算出文档要分配到的分片,在从集群元数据中找出对应主分片的位置,将请求路由到该分片进行文档写操作。

2.2 近实时性-refresh操作

当一个文档写入Lucene后是不能被立即查询到的,Elasticsearch提供了一个refresh操作,会定时地调用lucene的reopen(新版本为openIfChanged)为内存中新写入的数据生成一个新的segment,此时被处理的文档均可以被检索到。refresh操作的时间间隔由refresh_interval参数控制,默认为1s, 当然还可以在写入请求中带上refresh表示写入后立即refresh,另外还可以调用refresh API显式refresh。

2.3 数据存储可靠性

  1. 引入translog
    当一个文档写入Lucence后是存储在内存中的,即使执行了refresh操作仍然是在文件系统缓存中,如果此时服务器宕机,那么这部分数据将会丢失。为此ES增加了translog, 当进行文档写操作时会先将文档写入Lucene,然后写入一份到translog,写入translog是落盘的(如果对可靠性要求不是很高,也可以设置异步落盘,可以提高性能,由配置index.translog.durabilityindex.translog.sync_interval控制),这样就可以防止服务器宕机后数据的丢失。由于translog是追加写入,因此性能比较好。与传统的分布式系统不同,这里是先写入Lucene再写入translog,原因是写入Lucene可能会失败,为了减少写入失败回滚的复杂度,因此先写入Lucene.
  2. flush操作
    另外每30分钟或当translog达到一定大小(由index.translog.flush_threshold_size控制,默认512mb), ES会触发一次flush操作,此时ES会先执行refresh操作将buffer中的数据生成segment,然后调用lucene的commit方法将所有内存中的segment fsync到磁盘。此时lucene中的数据就完成了持久化,会清空translog中的数据(6.x版本为了实现sequenceIDs,不删除translog)
  3. merge操作
    由于refresh默认间隔为1s中,因此会产生大量的小segment,为此ES会运行一个任务检测当前磁盘中的segment,对符合条件的segment进行合并操作,减少lucene中的segment个数,提高查询速度,降低负载。不仅如此,merge过程也是文档删除和更新操作后,旧的doc真正被删除的时候。用户还可以手动调用_forcemerge API来主动触发merge,以减少集群的segment个数和清理已删除或更新的文档。
  4. 多副本机制
    另外ES有多副本机制,一个分片的主副分片不能分片在同一个节点上,进一步保证数据的可靠性。

    2.4 部分更新

    lucene支持对文档的整体更新,ES为了支持局部更新,在Lucene的Store索引中存储了一个_source字段,该字段的key值是文档ID, 内容是文档的原文。当进行更新操作时先从_source中获取原文,与更新部分合并后,再调用lucene API进行全量更新, 对于写入了ES但是还没有refresh的文档,可以从translog中获取。另外为了防止读取文档过程后执行更新前有其他线程修改了文档,ES增加了版本机制,当执行更新操作时发现当前文档的版本与预期不符,则会重新获取文档再更新。

3. ES的写入流程

ES的任意节点都可以作为协调节点(coordinating node)接受请求,当协调节点接受到请求后进行一系列处理,然后通过_routing字段找到对应的primary shard,并将请求转发给primary shard, primary shard完成写入后,将写入并发发送给各replica, raplica执行写入操作后返回给primary shard, primary shard再将请求返回给协调节点。大致流程如下图:

3.1 coordinating节点

ES中接收并转发请求的节点称为coordinating节点,ES中所有节点都可以接受并转发请求。当一个节点接受到写请求或更新请求后,会执行如下操作:

  1. ingest pipeline
    查看该请求是否符合某个ingest pipeline的pattern, 如果符合则执行pipeline中的逻辑,一般是对文档进行各种预处理,如格式调整,增加字段等。如果当前节点没有ingest角色,则需要将请求转发给有ingest角色的节点执行。
  2. 自动创建索引
    判断索引是否存在,如果开启了自动创建则自动创建,否则报错
  3. 设置routing
    获取请求URL或mapping中的_routing,如果没有则使用_id, 如果没有指定_id则ES会自动生成一个全局唯一ID。该_routing字段用于决定文档分配在索引的哪个shard上。
  4. 构建BulkShardRequest
    由于Bulk Request中包含多种(Index/Update/Delete)请求,这些请求分别需要到不同的shard上执行,因此协调节点,会将请求按照shard分开,同一个shard上的请求聚合到一起,构建BulkShardRequest
  5. 将请求发送给primary shard
    因为当前执行的是写操作,因此只能在primary上完成,所以需要把请求路由到primary shard所在节点
  6. 等待primary shard返回

3.2 primary shard

Primary请求的入口是PrimaryOperationTransportHandler的MessageReceived, 当接收到请求时,执行的逻辑如下

  1. 判断操作类型
    遍历bulk请求中的各子请求,根据不同的操作类型跳转到不同的处理逻辑
  2. 将update操作转换为Index和Delete操作
    获取文档的当前内容,与update内容合并生成新文档,然后将update请求转换成index请求,此处文档设置一个version v1
  3. Parse Doc
    解析文档的各字段,并添加如_uid等ES相关的一些系统字段
  4. 更新mapping
    对于新增字段会根据dynamic mapping或dynamic template生成对应的mapping,如果mapping中有dynamic mapping相关设置则按设置处理,如忽略或抛出异常
  5. 获取sequence Id和Version
    从SequcenceNumberService获取一个sequenceID和Version。SequcenID用于初始化LocalCheckPoint, verion是根据当前Versoin+1用于防止并发写导致数据不一致。
  6. 写入lucene
    这一步开始会对文档uid加锁,然后判断uid对应的version v2和之前update转换时的versoin v1是否一致,不一致则返回第二步重新执行。 如果version一致,如果同id的doc已经存在,则调用lucene的updateDocument接口,如果是新文档则调用lucene的addDoucument. 这里有个问题,如何保证Delete-Then-Add的原子性,ES是通过在Delete之前会加上已refresh锁,禁止被refresh,只有等待Add完成后释放了Refresh Lock, 这样就保证了这个操作的原子性。
  7. 写入translog
    写入Lucene的Segment后,会以key value的形式写Translog, Key是Id, Value是Doc的内容。当查询的时候,如果请求的是GetDocById则可以直接根据_id从translog中获取。满足nosql场景的实时性。
  8. 重构bulk request
    因为primary shard已经将update操作转换为index操作或delete操作,因此要对之前的bulkrequest进行调整,只包含index或delete操作,不需要再进行update的处理操作。
  9. flush translog
    默认情况下,translog要在此处落盘完成,如果对可靠性要求不高,可以设置translog异步,那么translog的fsync将会异步执行,但是落盘前的数据有丢失风险。
  10. 发送请求给replicas
    将构造好的bulkrequest并发发送给各replicas,等待replica返回,这里需要等待所有的replicas返回,响应请求给协调节点。如果某个shard执行失败,则primary会给master发请求remove该shard。这里会同时把sequenceID, primaryTerm, GlobalCheckPoint等传递给replica。
  11. 等待replica响应
    当所有的replica返回请求时,更细primary shard的LocalCheckPoint。

3.3 replica shard

Replica 请求的入口是在ReplicaOperationTransportHandler的messageReceived,当replica shard接收到请求时执行如下流程:

  1. 判断操作类型
    replica收到的写如请求只会有add和delete,因update在primary shard上已经转换为add或delete了。根据不同的操作类型执行对应的操作
  2. Parse Doc
  3. 更新mapping
  4. 获取sequenceId和Version 直接使用primary shard发送过来的请求中的内容即可
  5. 写如lucene
  6. write Translog
  7. Flush translog

4 总结与分析

Elasticsearch建立在Lucene基础之上,底层采用lucene来实现文件的读写操作,实现了文档的存储和高效查询。然后lucene作为一个搜索库在应对海量数据的存储上仍有一些不足之处。
Elasticsearch通过引入分片概念,成功地将lucene部署到分布式系统中,增强了系统的可靠性和扩展性。
Elasticsearch通过定期refresh lucene in-momory-buffer中的数据,使得ES具有了近实时的写入和查询能力。
Elasticsearch通过引入translog,多副本,以及定期执行flush,merge等操作保证了数据可靠性和较高的存储性能。
Elasticsearch通过存储_source字段结合verison字段实现了文档的局部更新,使得ES的使用方式更加灵活多样。
Elasticsearch基于lucene,又不简单地只是lucene,它完美地将lucene与分布式系统结合,既利用了lucene的检索能力,又具有了分布式系统的众多优点。

本文参考

收起阅读 »

ES Aggs根据聚合的结果(数值)进行过滤

前言

我们在使用聚合时总是有各种各样的聚合需求,其中一个比较常用的就是根据聚合的结果过滤聚合的桶,例如:1、每个IP登录次数超过5次的IP;2、每个IP登录人数超过2的IP。 还有我之前的一个案例,访问量超过1000的人数,这些都是很常见的统计需求。

案例需求

我们在使用聚合计算的时候一般都有两类,一种是计算文档的数量,另一种是计算文档内字段的值的数量(去重计算)或者值的数学计算。两种聚合计算在过滤的时候采用不同的方法来计算。

我们使用以下案例来说明两种过滤的不同: 用户每次登录都会记录一个登录记录:

{"userID":"a","IP":"10.70.25.1","time":"2019-10-10 12:12:12.222"}

然后提出以下两个需求: 1、每个IP登录次数超过5次的IP; 2、每个IP登录人数超过2的IP。

实现

每个IP登录次数超过5次的IP

这个是对登录记录个数的桶聚合统计,然后过滤。使用IP做term聚合,就可以得出每个IP的登录次数,然后term聚合中有一个参数min_doc_count这个字段就可以对文档数量进行过滤,具体的语句如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": 5
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 614,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.90.139",
          "doc_count" : 61
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21
        },
        {
          "key" : "10.25.75.52",
          "doc_count" : 18
        },
        {
          "key" : "10.25.89.32",
          "doc_count" : 13
        },
        {
          "key" : "10.25.93.243",
          "doc_count" : 10
        },
        {
          "key" : "10.25.78.189",
          "doc_count" : 9
        },
        {
          "key" : "10.25.90.82",
          "doc_count" : 8
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8
        },
        {
          "key" : "10.25.90.57",
          "doc_count" : 7
        },
        {
          "key" : "10.25.91.251",
          "doc_count" : 7
        },
        {
          "key" : "10.25.95.166",
          "doc_count" : 6
        },
        {
          "key" : "10.25.89.33",
          "doc_count" : 5
        },
        {
          "key" : "10.25.90.88",
          "doc_count" : 5
        },
        {
          "key" : "10.25.92.53",
          "doc_count" : 5
        }
      ]
    }
  }
}

每个IP登录人数超过2的IP

这个是对登录记录用户ID的去重数聚合,然后过滤。对用户ID进行去重可以使用Cardinality Aggregation聚合,然后再使用Bucket Selector Aggregation聚合过滤器过滤数据。具体内容如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "distinct": "desc"
        },
        "min_doc_count": 5
      },
      "aggs": {
        "distinct": {
          "cardinality": {
            "field": "IP.keyword"
          }
        },
        "dd":{
          "bucket_selector": {
            "buckets_path": {"userCount":"distinct"},
            "script": "params.userCount > 2"
          }
        }
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 317,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.75.52",
          "doc_count" : 18,
          "distinct" : {
            "value" : 4
          }
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.90.139",
          "doc_count" : 61,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21,
          "distinct" : {
            "value" : 3
          }
        }
      ]
    }
  }
}

桶聚合选择器: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-bucket-selector-aggregation.html

继续阅读 »

前言

我们在使用聚合时总是有各种各样的聚合需求,其中一个比较常用的就是根据聚合的结果过滤聚合的桶,例如:1、每个IP登录次数超过5次的IP;2、每个IP登录人数超过2的IP。 还有我之前的一个案例,访问量超过1000的人数,这些都是很常见的统计需求。

案例需求

我们在使用聚合计算的时候一般都有两类,一种是计算文档的数量,另一种是计算文档内字段的值的数量(去重计算)或者值的数学计算。两种聚合计算在过滤的时候采用不同的方法来计算。

我们使用以下案例来说明两种过滤的不同: 用户每次登录都会记录一个登录记录:

{"userID":"a","IP":"10.70.25.1","time":"2019-10-10 12:12:12.222"}

然后提出以下两个需求: 1、每个IP登录次数超过5次的IP; 2、每个IP登录人数超过2的IP。

实现

每个IP登录次数超过5次的IP

这个是对登录记录个数的桶聚合统计,然后过滤。使用IP做term聚合,就可以得出每个IP的登录次数,然后term聚合中有一个参数min_doc_count这个字段就可以对文档数量进行过滤,具体的语句如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": 5
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 614,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.90.139",
          "doc_count" : 61
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21
        },
        {
          "key" : "10.25.75.52",
          "doc_count" : 18
        },
        {
          "key" : "10.25.89.32",
          "doc_count" : 13
        },
        {
          "key" : "10.25.93.243",
          "doc_count" : 10
        },
        {
          "key" : "10.25.78.189",
          "doc_count" : 9
        },
        {
          "key" : "10.25.90.82",
          "doc_count" : 8
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8
        },
        {
          "key" : "10.25.90.57",
          "doc_count" : 7
        },
        {
          "key" : "10.25.91.251",
          "doc_count" : 7
        },
        {
          "key" : "10.25.95.166",
          "doc_count" : 6
        },
        {
          "key" : "10.25.89.33",
          "doc_count" : 5
        },
        {
          "key" : "10.25.90.88",
          "doc_count" : 5
        },
        {
          "key" : "10.25.92.53",
          "doc_count" : 5
        }
      ]
    }
  }
}

每个IP登录人数超过2的IP

这个是对登录记录用户ID的去重数聚合,然后过滤。对用户ID进行去重可以使用Cardinality Aggregation聚合,然后再使用Bucket Selector Aggregation聚合过滤器过滤数据。具体内容如下: 查询语句

{
  "aggs": {
    "IP": {
      "terms": {
        "field": "IP",
        "size": 3000,
        "order": {
          "distinct": "desc"
        },
        "min_doc_count": 5
      },
      "aggs": {
        "distinct": {
          "cardinality": {
            "field": "IP.keyword"
          }
        },
        "dd":{
          "bucket_selector": {
            "buckets_path": {"userCount":"distinct"},
            "script": "params.userCount > 2"
          }
        }
      }
    }
  },
  "size": 0
}

结果

{
  "took" : 317,
  "timed_out" : false,
  "num_reduce_phases" : 3,
  "_shards" : {
    "total" : 1105,
    "successful" : 1105,
    "skipped" : 75,
    "failed" : 0
  },
  "hits" : {
    "total" : 2826,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "IP" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "10.25.75.52",
          "doc_count" : 18,
          "distinct" : {
            "value" : 4
          }
        },
        {
          "key" : "10.25.78.146",
          "doc_count" : 45,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.90.139",
          "doc_count" : 61,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.91.240",
          "doc_count" : 8,
          "distinct" : {
            "value" : 3
          }
        },
        {
          "key" : "10.25.94.22",
          "doc_count" : 21,
          "distinct" : {
            "value" : 3
          }
        }
      ]
    }
  }
}

桶聚合选择器: https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-bucket-selector-aggregation.html

收起阅读 »

聚合元数据——对聚合结果进行打标签

背景

在我们的项目中需要对聚合后的结果进行二次的terms的聚合。实际需求就是有n个模型,需要统计每个模型在每段时间的调用次数,然后需要查询指定m个模型的调用总次数。我们要为每个模型建立一个索引,然后为每个模型查一次这段时间内的使用次数。

注:我们记录的值只能从结果中拿取。

实现过程

第一次设计

每个模型在第一次设计的时候是两个字段,【调用次数】、【错误次数】,使用以下语句:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 215,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1053,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

解析后保存:

{"@timestamp":"2019-09-23T12:23:23.333","count":0,"error":0}

这种情况可以统计每个模型在某段时间内的调用次数,使用索引名来区分每个model。但是在统计指定m个模型的时候就不行了,使用索引名来查询的时候由于是指定m个,前缀不能使用* 匹配,并且不能罗列所有m个索引来查询,就无法达到统计的效果。 第一次设计因为不能在结果中记录模型ID导致不能统计指定的m个模型的数量,以失败告终。

第二次设计

既然需要在统计结果中记录模型ID,那就使用terms聚合来进行操作,先使用模型ID过滤一下数据,然后使用聚合唯一的模型ID,这样就有了模型ID。查询语句如下:

{
  "aggs": {
    "modelId": {
      "terms": {
        "field": "modelId.keyword",
        "size": 5,
        "order": {
          "_count": "desc"
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h",
              "lte": "now/h",
              "format": "epoch_millis"
            }
          }
        },
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 140,
    "successful" : 140,
    "skipped" : 135,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "modelId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ ]
    }
  }
}

在有模型调用的时候这个方法还好用,但是在无模型调用的时候这个返回结果就如上面的一样,是不包含任何信息的,错误次数的0和模型ID都没有了。 第二次设计因为在无模型调用的时候导致模型ID不能记录,然后也是不能实现指定m个模型的查询次数统计,也以失败告终。

第三次设计

经过两次失败的实际案例,我发现现有的知识已经不能满足需求了,我需要新的方法,我需要一个能给查询结果添加字段的方法,所以我去查询官方文档,让我找到了这个方法聚合元数据 也就是对聚合结果进行打标签。 我使用第一次的设计方案,然后添加上对聚合结果打标签的方法,就可以记录一次统计值的模型ID了。 查询语句如下:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      },
      "meta": {
        "modelId": "modelId01"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果为:

{
  "took" : 88,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1056,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "meta" : {
        "modelId" : "modelId01"
      },
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

至此完成了统计需求。

总结

使用聚合元数据方法,可以对聚合的结果进行打标签,可以使用在聚合结果保存后再次进行terms聚合的时候使用,或者通过标签进行各种其他查询。

继续阅读 »

背景

在我们的项目中需要对聚合后的结果进行二次的terms的聚合。实际需求就是有n个模型,需要统计每个模型在每段时间的调用次数,然后需要查询指定m个模型的调用总次数。我们要为每个模型建立一个索引,然后为每个模型查一次这段时间内的使用次数。

注:我们记录的值只能从结果中拿取。

实现过程

第一次设计

每个模型在第一次设计的时候是两个字段,【调用次数】、【错误次数】,使用以下语句:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 215,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1053,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

解析后保存:

{"@timestamp":"2019-09-23T12:23:23.333","count":0,"error":0}

这种情况可以统计每个模型在某段时间内的调用次数,使用索引名来区分每个model。但是在统计指定m个模型的时候就不行了,使用索引名来查询的时候由于是指定m个,前缀不能使用* 匹配,并且不能罗列所有m个索引来查询,就无法达到统计的效果。 第一次设计因为不能在结果中记录模型ID导致不能统计指定的m个模型的数量,以失败告终。

第二次设计

既然需要在统计结果中记录模型ID,那就使用terms聚合来进行操作,先使用模型ID过滤一下数据,然后使用聚合唯一的模型ID,这样就有了模型ID。查询语句如下:

{
  "aggs": {
    "modelId": {
      "terms": {
        "field": "modelId.keyword",
        "size": 5,
        "order": {
          "_count": "desc"
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h",
              "lte": "now/h",
              "format": "epoch_millis"
            }
          }
        },
        {
          "match_phrase": {
            "modelId.keyword": {
              "query": "modelId01"
            }
          }
        }
      ]
    }
  }
}

结果为:

{
  "took" : 9,
  "timed_out" : false,
  "_shards" : {
    "total" : 140,
    "successful" : 140,
    "skipped" : 135,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "modelId" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ ]
    }
  }
}

在有模型调用的时候这个方法还好用,但是在无模型调用的时候这个返回结果就如上面的一样,是不包含任何信息的,错误次数的0和模型ID都没有了。 第二次设计因为在无模型调用的时候导致模型ID不能记录,然后也是不能实现指定m个模型的查询次数统计,也以失败告终。

第三次设计

经过两次失败的实际案例,我发现现有的知识已经不能满足需求了,我需要新的方法,我需要一个能给查询结果添加字段的方法,所以我去查询官方文档,让我找到了这个方法聚合元数据 也就是对聚合结果进行打标签。 我使用第一次的设计方案,然后添加上对聚合结果打标签的方法,就可以记录一次统计值的模型ID了。 查询语句如下:

{
  "aggs": {
    "error": {
      "filters": {
        "filters": {
          "error": {
            "query_string": {
              "query": "state:-1",
              "analyze_wildcard": true,
              "default_field": "*"
            }
          }
        }
      },
      "meta": {
        "modelId": "modelId01"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "bool": {
            "should": [
              {
                "match_phrase": {
                  "li": "D003" //说明这是一次模型调用
                }
              }
            ],
            "minimum_should_match": 1
          }
        },
        {
          "match_phrase": {
            "modelId..keyword": {
              "query": "modelId01"
            }
          }
        },
        {
          "range": {
            "x_st": {
              "gte": "now/h-1h-10s",
              "lte": "now/h-10s",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果为:

{
  "took" : 88,
  "timed_out" : false,
  "_shards" : {
    "total" : 1095,
    "successful" : 1095,
    "skipped" : 1056,
    "failed" : 0
  },
  "hits" : {
    "total" : 0,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "error" : {
      "meta" : {
        "modelId" : "modelId01"
      },
      "buckets" : {
        "error" : {
          "doc_count" : 0
        }
      }
    }
  }
}

至此完成了统计需求。

总结

使用聚合元数据方法,可以对聚合的结果进行打标签,可以使用在聚合结果保存后再次进行terms聚合的时候使用,或者通过标签进行各种其他查询。

收起阅读 »

Lucene doc_value的理解复盘

之前看过很多doc_Value相关的文章,也看过elasticsearch官方的描述,但是总是感觉对doc_value如何提升聚合排序性能这块说的很简单(也许是我个人理解力的问题),总是不能深刻理解没有doc_Value的话会牺牲多大性能,再加上每次研究一半就搁置了,所以一直没能很好理解这块知识点,今天来重点复盘下。  本文不是科普文,不一定是完全正确的,大家可以讨论。
 
首先从elasticsearch入手,我们建立一个索引结构如下:
{
"info": {
"mappings": {
"info": {
"_all": {
"enabled": false
},
"properties": {
"age": {
"type": "int"
},
"time": {
"type": "long"
}
}
}
}
}
}
我这里特地拿非字符串类型的值做举例,大家在用MySQL的时候大多是用值精准匹配等条件,没有涉及到分词和多值,所以一开始很多用分词举例的时候我就糊涂了,这里我用数字举例说明下。
 
假设我们的需求是查询time=10的各个age包含的info有多少。这是一个搜索兼聚合的需求,首先要搜出来time:[1 , 10]的结果,然后在这些doc结果里统计,他们的age各自分别占了多少个doc。语句如下:
{
"query": {
"term": {
"time": 10
}
},
"aggs": {
"age_terms": {
"terms": {
"field": "age"
}
}
}
}

假设我们的数据是这样的:
docId        time           age
1               10               20
2              8                 21
3              9                 24
4              2                 23
5              10                22
6              10                22
 
 
结果也很明显了,返回doc为1,5,6的内容,聚合结果, age=20的doc_count=1,  age=22的doc_coutn=2。
 
demo介绍完了,说下假设没有doc_value,只有倒排索引我们要怎么实现这个效果。
 
首先搜索不用说了,走time字段的倒排索引,下表:
2:4
8:2
9:3
10:1,5,6
一目了然,一下就能把post_list结果拿到。
 
那么聚合是在1,5,6这个结果之上做的,因为聚合字段和排序字段不一样,time倒排索引中不包含age的信息,所以要想对age做聚合,就需要做类似回表的操作想办法取回每个doc对应的age值,然后在内存中遍历下算出来每个age对应的doc有几个(当然这个例子最多只有3个doc_count, 返回值只有3个doc)。
 
取回的方式,可以走age的倒排索引,遍历age索引的term_dictionary(不了解的同学可以理解为这就是一个排序的term列表),遍历要做的事情是什么呢:取出每个term后面的postling_list,看下list中有没有对应我搜索结果的doc_id,有的话记录下有几个,给这个term做下标记记录term的doc_count。 这个过程就比较狗了,假设term非常多,遍历一次的代价就非常高,所以在没有doc_value的情况下,遍历倒排索引就显得很差劲,倒排索引的价值本来就是避免数据遍历提升搜索性能,到了这种聚合需求上反而要用我做这么高代价的事儿,所以才需要一个出路避免这个问题。
 
这时候有人会说了,一个info对应一个age,那么当我遍历的时候,记录下这个doc有没有遍历过,当所有doc_id都遍历过一遍后就不遍历了,能提升一些性能吧。  首先如果运气不好,碰上你的doc要聚合的term出现在term_dictionary最后一个的时候,时间开销还是最坏情况的,其次,我这里用的是单个数值,如果一个field是多值的呢,换句话说,如果是分词的字符串呢,多值和分词其实一样,这种情况下当你遍历到一个term包含结果的某个doc_id时你不能说这个doc不会再后续的其他term中出现了,你还是要继续遍历完,所以为了满足这个需求,遍历全部term_dictionary是没跑的。
 
排序其实也差不多的,如果你要根据非搜索结果进行排序,一样要取出结果doc,对应排序字段的值,然后内存排序,所以内存排序聚合我觉得不是开销大的点,开销大的地方在于遍历倒排索引
 
其实讲到这里我的疑惑基本就解决了。接下来再说下doc_value
 
doc_value通俗的讲就是正排索引(这个词儿基本上所有文章都会这么说,说的也对,就是看多了有点想吐。。),其实正排索引就是我上面列举出来的那个值列表,只不过是一个字段一个doc_value,跟倒排索引一样,不是放在一起的(这点区别mysql聚集索引的叶子节点,聚集索引叶子节点是保存了全部的值,用主键绑定的,这里也说明了Lucene是索引和数据分离的,而MySQL是索引即数据,我是这么理解)。
time的doc_value

docId        time           
1               10              
2              8                
3              9                 
4              2                 
5              10               
6              10                
 
age的doc_value

docId        age
1               20
2               21
3               24
4               23
5               22
6               22
 
现在我们有了这样的一个结构可就厉害了,回到刚才的场景,搜索完结果是1,5,6,如果想按age进行聚合或者排序,那么只需要我们来到doc_value中,根据我们的id列表和doc_value的列表做交集拿出来结果对应的age就可以了,然后内存排序一下,实际上doc_value完全可以按照value先排好序,然后我们交集的结果就是排序结果了,都不用内存再排一次(暂时还不知道doc_value是不是根据value排过序)。
 
 
最后再说下我觉得其实可以优化的地方,之前我在问答也提问过,就是当搜索和排序/聚合使用相同字段的时候,还要不要走doc_value的问题。
 
因为如果是同一个字段那么完全没必要回表了,倒排索引本来就是field有序的所以排序问题解决。
 
其次聚合,刚才的例子中是单值的,如果是多值的话也不要紧,因为我们都知道查询条件的值了,单值的话 doc_count就等于total,多值的话 doc_count就等于其posting_list的length,多简单,省的走一次doc_value的操作。
 
不过那个问答回答的人太少,回答我的人的答复也蛮精彩的但是我仍然觉得这个操作是可以分开的。
 
求各路大神们对本文的观点做出指点和评论,或者我理解不对的地方提出指正。 对于想了解doc_value的同学希望这篇文章能解决你们的疑惑。
 
 
 
 
 
 
 
 
 
 
 
 
继续阅读 »
之前看过很多doc_Value相关的文章,也看过elasticsearch官方的描述,但是总是感觉对doc_value如何提升聚合排序性能这块说的很简单(也许是我个人理解力的问题),总是不能深刻理解没有doc_Value的话会牺牲多大性能,再加上每次研究一半就搁置了,所以一直没能很好理解这块知识点,今天来重点复盘下。  本文不是科普文,不一定是完全正确的,大家可以讨论。
 
首先从elasticsearch入手,我们建立一个索引结构如下:
{
"info": {
"mappings": {
"info": {
"_all": {
"enabled": false
},
"properties": {
"age": {
"type": "int"
},
"time": {
"type": "long"
}
}
}
}
}
}
我这里特地拿非字符串类型的值做举例,大家在用MySQL的时候大多是用值精准匹配等条件,没有涉及到分词和多值,所以一开始很多用分词举例的时候我就糊涂了,这里我用数字举例说明下。
 
假设我们的需求是查询time=10的各个age包含的info有多少。这是一个搜索兼聚合的需求,首先要搜出来time:[1 , 10]的结果,然后在这些doc结果里统计,他们的age各自分别占了多少个doc。语句如下:
{
"query": {
"term": {
"time": 10
}
},
"aggs": {
"age_terms": {
"terms": {
"field": "age"
}
}
}
}

假设我们的数据是这样的:
docId        time           age
1               10               20
2              8                 21
3              9                 24
4              2                 23
5              10                22
6              10                22
 
 
结果也很明显了,返回doc为1,5,6的内容,聚合结果, age=20的doc_count=1,  age=22的doc_coutn=2。
 
demo介绍完了,说下假设没有doc_value,只有倒排索引我们要怎么实现这个效果。
 
首先搜索不用说了,走time字段的倒排索引,下表:
2:4
8:2
9:3
10:1,5,6
一目了然,一下就能把post_list结果拿到。
 
那么聚合是在1,5,6这个结果之上做的,因为聚合字段和排序字段不一样,time倒排索引中不包含age的信息,所以要想对age做聚合,就需要做类似回表的操作想办法取回每个doc对应的age值,然后在内存中遍历下算出来每个age对应的doc有几个(当然这个例子最多只有3个doc_count, 返回值只有3个doc)。
 
取回的方式,可以走age的倒排索引,遍历age索引的term_dictionary(不了解的同学可以理解为这就是一个排序的term列表),遍历要做的事情是什么呢:取出每个term后面的postling_list,看下list中有没有对应我搜索结果的doc_id,有的话记录下有几个,给这个term做下标记记录term的doc_count。 这个过程就比较狗了,假设term非常多,遍历一次的代价就非常高,所以在没有doc_value的情况下,遍历倒排索引就显得很差劲,倒排索引的价值本来就是避免数据遍历提升搜索性能,到了这种聚合需求上反而要用我做这么高代价的事儿,所以才需要一个出路避免这个问题。
 
这时候有人会说了,一个info对应一个age,那么当我遍历的时候,记录下这个doc有没有遍历过,当所有doc_id都遍历过一遍后就不遍历了,能提升一些性能吧。  首先如果运气不好,碰上你的doc要聚合的term出现在term_dictionary最后一个的时候,时间开销还是最坏情况的,其次,我这里用的是单个数值,如果一个field是多值的呢,换句话说,如果是分词的字符串呢,多值和分词其实一样,这种情况下当你遍历到一个term包含结果的某个doc_id时你不能说这个doc不会再后续的其他term中出现了,你还是要继续遍历完,所以为了满足这个需求,遍历全部term_dictionary是没跑的。
 
排序其实也差不多的,如果你要根据非搜索结果进行排序,一样要取出结果doc,对应排序字段的值,然后内存排序,所以内存排序聚合我觉得不是开销大的点,开销大的地方在于遍历倒排索引
 
其实讲到这里我的疑惑基本就解决了。接下来再说下doc_value
 
doc_value通俗的讲就是正排索引(这个词儿基本上所有文章都会这么说,说的也对,就是看多了有点想吐。。),其实正排索引就是我上面列举出来的那个值列表,只不过是一个字段一个doc_value,跟倒排索引一样,不是放在一起的(这点区别mysql聚集索引的叶子节点,聚集索引叶子节点是保存了全部的值,用主键绑定的,这里也说明了Lucene是索引和数据分离的,而MySQL是索引即数据,我是这么理解)。
time的doc_value

docId        time           
1               10              
2              8                
3              9                 
4              2                 
5              10               
6              10                
 
age的doc_value

docId        age
1               20
2               21
3               24
4               23
5               22
6               22
 
现在我们有了这样的一个结构可就厉害了,回到刚才的场景,搜索完结果是1,5,6,如果想按age进行聚合或者排序,那么只需要我们来到doc_value中,根据我们的id列表和doc_value的列表做交集拿出来结果对应的age就可以了,然后内存排序一下,实际上doc_value完全可以按照value先排好序,然后我们交集的结果就是排序结果了,都不用内存再排一次(暂时还不知道doc_value是不是根据value排过序)。
 
 
最后再说下我觉得其实可以优化的地方,之前我在问答也提问过,就是当搜索和排序/聚合使用相同字段的时候,还要不要走doc_value的问题。
 
因为如果是同一个字段那么完全没必要回表了,倒排索引本来就是field有序的所以排序问题解决。
 
其次聚合,刚才的例子中是单值的,如果是多值的话也不要紧,因为我们都知道查询条件的值了,单值的话 doc_count就等于total,多值的话 doc_count就等于其posting_list的length,多简单,省的走一次doc_value的操作。
 
不过那个问答回答的人太少,回答我的人的答复也蛮精彩的但是我仍然觉得这个操作是可以分开的。
 
求各路大神们对本文的观点做出指点和评论,或者我理解不对的地方提出指正。 对于想了解doc_value的同学希望这篇文章能解决你们的疑惑。
 
 
 
 
 
 
 
 
 
 
 
  收起阅读 »

Elastic日报 第735期 (2019-09-21)

1.es字符串包含查询的几种方案

http://t.cn/AinyGFZD

2.es6.2版本利用脚本查询为空数据的方法

http://t.cn/AinyqBJN

3.es中 “先filter后aggs” 和 “在aggs执行filter” 的区别

http://t.cn/AinyfBkq

继续阅读 »

1.es字符串包含查询的几种方案

http://t.cn/AinyGFZD

2.es6.2版本利用脚本查询为空数据的方法

http://t.cn/AinyqBJN

3.es中 “先filter后aggs” 和 “在aggs执行filter” 的区别

http://t.cn/AinyfBkq

收起阅读 »

关于sum bucket 与Filter Aggregation 结合的写法分享

我们可以用ES的桶来实现这样的需求,找出汽车厂商每个颜色的车辆数量
当如果需求是,找出汽车厂商里的车一共有多少种颜色呢?
 我们可以使用sum bucket ,默认情况下,可以参考官方文档
https://www.elastic.co/guide/e ... ml%23
 
但如果我们想找到售价大于100W的车,有多少种颜色呢?
这就需要Filter Aggregation   和 上文的sum bucket结合
 
这里有个需要注意的地方,sum_bucket的位置不要跟 Filter Aggregation 同级,而是应该在其下一级:
    "query": {
"query": {
"bool": {
"filter": {
"bool": {
"must": [
{
"term": {
"delflag": 0
}
},
{
"term": {
"is_child": 1
}
}
],
"must_not":
}
}
}
},
"aggs": {
"valid_sales_people": {
"filter": {
"bool": {
"must_not": {
"terms": {
"status": [
0,
1,
7
]
}
}
}
},
"aggs": {
"render_people": {
"terms": {
"field": "uid",
"size": 2147483647
},
"aggs": {
"unique_people": {
"cardinality": {
"field": "uid"
}
}
}
},
"member_count": {
"sum_bucket": {
"buckets_path": "render_people>unique_people"
}
}
}
}
}
},
如代码所示,我希望对status not in (0,1,7)的uid(用户)进行聚合,并想得到uid桶种类的求和,那么member_count就应该与render_people同级,而不是把member_count放到和与valid_sales_people同一级。
 
如果将member_count 与valid_sales_people放到同一级,会报一个错:sum_bucket的第一个聚合必须是多桶聚合。究其原因,应该是加上filter aggregation后,valid_sales_people已不具有多桶聚合属性(因为附带了filter过滤条件),而其下的render_people则具有了多桶聚合属性。所以member_count应该与render_people放在同级,对应的buckets_path也自然改为render_people>unique_people
 

 
继续阅读 »
我们可以用ES的桶来实现这样的需求,找出汽车厂商每个颜色的车辆数量
当如果需求是,找出汽车厂商里的车一共有多少种颜色呢?
 我们可以使用sum bucket ,默认情况下,可以参考官方文档
https://www.elastic.co/guide/e ... ml%23
 
但如果我们想找到售价大于100W的车,有多少种颜色呢?
这就需要Filter Aggregation   和 上文的sum bucket结合
 
这里有个需要注意的地方,sum_bucket的位置不要跟 Filter Aggregation 同级,而是应该在其下一级:
    "query": {
"query": {
"bool": {
"filter": {
"bool": {
"must": [
{
"term": {
"delflag": 0
}
},
{
"term": {
"is_child": 1
}
}
],
"must_not":
}
}
}
},
"aggs": {
"valid_sales_people": {
"filter": {
"bool": {
"must_not": {
"terms": {
"status": [
0,
1,
7
]
}
}
}
},
"aggs": {
"render_people": {
"terms": {
"field": "uid",
"size": 2147483647
},
"aggs": {
"unique_people": {
"cardinality": {
"field": "uid"
}
}
}
},
"member_count": {
"sum_bucket": {
"buckets_path": "render_people>unique_people"
}
}
}
}
}
},
如代码所示,我希望对status not in (0,1,7)的uid(用户)进行聚合,并想得到uid桶种类的求和,那么member_count就应该与render_people同级,而不是把member_count放到和与valid_sales_people同一级。
 
如果将member_count 与valid_sales_people放到同一级,会报一个错:sum_bucket的第一个聚合必须是多桶聚合。究其原因,应该是加上filter aggregation后,valid_sales_people已不具有多桶聚合属性(因为附带了filter过滤条件),而其下的render_people则具有了多桶聚合属性。所以member_count应该与render_people放在同级,对应的buckets_path也自然改为render_people>unique_people
 

  收起阅读 »

记一次“访问量超过1000的人数”统计,计算聚合桶的个数

前言

众所周知,在ES中有各种聚合方法能够是数据分析简单、高效。但是在繁杂的聚合方法中找到满足我们需求的那个,需要我们自己去实践。下面我就说明一下“访问量超过1000的人数”统计案例的实现。

需求

ES在使用过程中,我们公司有一个需求,就是需要统计活跃用户数,我们定义活跃用户数为:今日访问量超过1000的用户,所以我们统计活跃用户数的时候需要统计“访问量超过1000的人数”。

之前的做法

第一版统计活跃用户数的方法由于对复杂的聚合统计不熟悉的原因,就把统计分为了两步。 第一步:在ES中使用字段聚合每个用户的访问数量,数量大于1000;

查询语句

{
  "aggs": {
    "user": {
      "terms": {
        "field": "userId.keyword",
        "size": 10000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": "1000"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "startTime": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 203,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 67470,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "admin",
          "doc_count" : 46998
        },
        {
          "key" : "nameless",
          "doc_count" : 8416
        },
        {
          "key" : "li",
          "doc_count" : 2486
        },
        {
          "key" : "liu",
          "doc_count" : 2183
        },
        {
          "key" : "111111",
          "doc_count" : 1281
        }
      ]
    }
  }
}

第二步:从ES中获取第一步的统计结果,然后统计用户桶的个数,达到统计出个数的效果。

改进后的做法

改进后就是直接使用ES的查询,使用了sum_bucket聚合,是计算每个用户的用户ID独立数,也就是每个用户的用户ID独立数都是1,然后用桶聚合求和,得到所有的人数。 参考链接:[sum bucket聚合](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-sum-bucket-aggregation.html)

查询语句

{
  "aggs": {
    "usercount": {
      "sum_bucket": {
        "buckets_path": "usercount-bucket>usercount-metric"
      }
    },
    "usercount-bucket": {
      "terms": {
        "field": "userId.keyword",
        "size": 10,
        "order": {
          "_key": "desc"
        },
        "min_doc_count": "1000"
      },
      "aggs": {
        "usercount-metric": {
          "cardinality": {
            "field": "userId.keyword"
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "x_st": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 106,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 63956,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "usercount-bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "nameless",
          "doc_count" : 8278,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "liu",
          "doc_count" : 2142,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "li",
          "doc_count" : 1928,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "admin",
          "doc_count" : 44395,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "111111",
          "doc_count" : 1281,
          "usercount-metric" : {
            "value" : 1
          }
        }
      ]
    },
    "usercount" : {
      "value" : 5.0
    }
  }
}
继续阅读 »

前言

众所周知,在ES中有各种聚合方法能够是数据分析简单、高效。但是在繁杂的聚合方法中找到满足我们需求的那个,需要我们自己去实践。下面我就说明一下“访问量超过1000的人数”统计案例的实现。

需求

ES在使用过程中,我们公司有一个需求,就是需要统计活跃用户数,我们定义活跃用户数为:今日访问量超过1000的用户,所以我们统计活跃用户数的时候需要统计“访问量超过1000的人数”。

之前的做法

第一版统计活跃用户数的方法由于对复杂的聚合统计不熟悉的原因,就把统计分为了两步。 第一步:在ES中使用字段聚合每个用户的访问数量,数量大于1000;

查询语句

{
  "aggs": {
    "user": {
      "terms": {
        "field": "userId.keyword",
        "size": 10000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": "1000"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "startTime": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 203,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 67470,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "admin",
          "doc_count" : 46998
        },
        {
          "key" : "nameless",
          "doc_count" : 8416
        },
        {
          "key" : "li",
          "doc_count" : 2486
        },
        {
          "key" : "liu",
          "doc_count" : 2183
        },
        {
          "key" : "111111",
          "doc_count" : 1281
        }
      ]
    }
  }
}

第二步:从ES中获取第一步的统计结果,然后统计用户桶的个数,达到统计出个数的效果。

改进后的做法

改进后就是直接使用ES的查询,使用了sum_bucket聚合,是计算每个用户的用户ID独立数,也就是每个用户的用户ID独立数都是1,然后用桶聚合求和,得到所有的人数。 参考链接:[sum bucket聚合](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-sum-bucket-aggregation.html)

查询语句

{
  "aggs": {
    "usercount": {
      "sum_bucket": {
        "buckets_path": "usercount-bucket>usercount-metric"
      }
    },
    "usercount-bucket": {
      "terms": {
        "field": "userId.keyword",
        "size": 10,
        "order": {
          "_key": "desc"
        },
        "min_doc_count": "1000"
      },
      "aggs": {
        "usercount-metric": {
          "cardinality": {
            "field": "userId.keyword"
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "x_st": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 106,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 63956,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "usercount-bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "nameless",
          "doc_count" : 8278,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "liu",
          "doc_count" : 2142,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "li",
          "doc_count" : 1928,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "admin",
          "doc_count" : 44395,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "111111",
          "doc_count" : 1281,
          "usercount-metric" : {
            "value" : 1
          }
        }
      ]
    },
    "usercount" : {
      "value" : 5.0
    }
  }
}
收起阅读 »

ES6.8权限使用配置


概述
ES的权限控制一直ES使用中的一个问题,因为官方之前一直未免费安全性功能。公司要不选择使用其他插件来解决,要不就是只在内网使用。现在在ES6.8及以后版本ES将部分安全性功能免费开放了, 现在我们就6.8版本的【基于角色的访问控制】进行操作、验证。
1、下载安装ELK6.8版本(此处省略)
ES在6.8以后发布的版本才有(7.0是发布在6.8之前的)
2、修改ES配置文件 elasticsearch.yml
在配置文件中添加:
xpack.security.enabled: true
基础版本的安全性功能是默认关闭的。
然后启动ES
./elasticsearch -d
3、设置内置用户密码
参考:内置用户
./bin/elasticsearch-setup-passwords interactive
这里设置的密码要记住,后面会使用到。我们设置简单的密码:123456(密码不能少于6位)
如果是Windows请使用CMD命令行执行
按照提示设置内置用户密码
4、设置kibana用户名密码
在kibana的配置文件kibana.yml里面添加
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
5、然后启动kibana
启动kibana就可以使用用户名与密码进行访问。

Image.png


6、设置logstash用户名和密码
打开配置文件conf,在output中的elasticsearch中添加user、password
例:
output {
elasticsearch {
hosts => ["10.68.24.136:9200","10.68.24.137:9200"]
index => "%{[indexName]}-%{+YYYY.MM.dd}"
user => "logstash_system"
password => "123456"
}
继续阅读 »

概述
ES的权限控制一直ES使用中的一个问题,因为官方之前一直未免费安全性功能。公司要不选择使用其他插件来解决,要不就是只在内网使用。现在在ES6.8及以后版本ES将部分安全性功能免费开放了, 现在我们就6.8版本的【基于角色的访问控制】进行操作、验证。
1、下载安装ELK6.8版本(此处省略)
ES在6.8以后发布的版本才有(7.0是发布在6.8之前的)
2、修改ES配置文件 elasticsearch.yml
在配置文件中添加:
xpack.security.enabled: true
基础版本的安全性功能是默认关闭的。
然后启动ES
./elasticsearch -d
3、设置内置用户密码
参考:内置用户
./bin/elasticsearch-setup-passwords interactive
这里设置的密码要记住,后面会使用到。我们设置简单的密码:123456(密码不能少于6位)
如果是Windows请使用CMD命令行执行
按照提示设置内置用户密码
4、设置kibana用户名密码
在kibana的配置文件kibana.yml里面添加
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
5、然后启动kibana
启动kibana就可以使用用户名与密码进行访问。

Image.png


6、设置logstash用户名和密码
打开配置文件conf,在output中的elasticsearch中添加user、password
例:
output {
elasticsearch {
hosts => ["10.68.24.136:9200","10.68.24.137:9200"]
index => "%{[indexName]}-%{+YYYY.MM.dd}"
user => "logstash_system"
password => "123456"
}
收起阅读 »

从MySQL建立Elasticsearch索引-索引

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

收起阅读 »

从MySQL建立Elasticsearch索引-引言

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

收起阅读 »

Elastic日报 第661期 (2019-07-06)

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

继续阅读 »

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

收起阅读 »

邀请参加 Elasticsearch 中文分词器需求调查问卷

Jietu20190624-202208.gif

 亲爱的 Elasticsearch 中文用户:
 
为了能够充分了解您对 Elasticsearch 中文分词需求的基本情况,以便我们更有针对性地改进中文分词下的使用体验,并优化相关分词器,请您抽出几分钟的时间填写以下问卷。问卷题目中的选项没有对错之分,请您根据实际情况或想法进行填写。您的信息将被严格保密,请放心填写!

调研结束后,我们将随机抽出 5 名同学,每人送 Elastic 纪念 T 恤一件。谢谢您对本次调查的参与和支持~
 
问卷填写地址:http://elasticsearch.mikecrm.com/nz0FxmK
 
谢谢!
 
以下是 5 位幸运的参与者:
 
  • 韦先生 *4270
  • 汪科  *9396
  • 乔驰  *3096
  • 彭先生 *4350
  • 张丽斌 *5329

 
再次感谢大家的参与。
继续阅读 »
Jietu20190624-202208.gif

 亲爱的 Elasticsearch 中文用户:
 
为了能够充分了解您对 Elasticsearch 中文分词需求的基本情况,以便我们更有针对性地改进中文分词下的使用体验,并优化相关分词器,请您抽出几分钟的时间填写以下问卷。问卷题目中的选项没有对错之分,请您根据实际情况或想法进行填写。您的信息将被严格保密,请放心填写!

调研结束后,我们将随机抽出 5 名同学,每人送 Elastic 纪念 T 恤一件。谢谢您对本次调查的参与和支持~
 
问卷填写地址:http://elasticsearch.mikecrm.com/nz0FxmK
 
谢谢!
 
以下是 5 位幸运的参与者:
 
  • 韦先生 *4270
  • 汪科  *9396
  • 乔驰  *3096
  • 彭先生 *4350
  • 张丽斌 *5329

 
再次感谢大家的参与。 收起阅读 »

Elastic日报 第632期 (2019-06-04)

1、23条好用的Elasticsearch查询策略。
http://t.cn/R9Sjxf5
2、如何使用Nginx为Elasticsearch设置代理。
http://t.cn/Ai9Oc9Pd
3、Telegraf Elasticsearch插件使用教程。
http://t.cn/Ai9OcpDY

编辑:叮咚光军

归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup
继续阅读 »
1、23条好用的Elasticsearch查询策略。
http://t.cn/R9Sjxf5
2、如何使用Nginx为Elasticsearch设置代理。
http://t.cn/Ai9Oc9Pd
3、Telegraf Elasticsearch插件使用教程。
http://t.cn/Ai9OcpDY

编辑:叮咚光军

归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup 收起阅读 »

ES6.* join类型的可行性

ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过
继续阅读 »
ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过 收起阅读 »