悟空,拿我的打狗棒来

总结最近半年对Elasticsearch开源项目的贡献

自从2019年对Elasticsearch项目提交过一次代码之后,开始逐渐关注社区里的新动态,并且尝试去解决一些issue,通过这个过程去理解源码从而可以深入理解Elasticsearch的实现机制。现在把最近半年(2020年1月-2020年6月)对Elasticsearch项目所做的工作进行一次总结,记录遇到的问题和解决办法。

ingest set processor增加ignore_empty_value参数

issue: #54783

PR: #57030

使用ingest set processor时, 如果对于value字段为空字符串或者null的情况不需要进行处理,当前只能通过脚本判断value是否为空字符串或者null。本次提交对set processor增加了ignore_empty_value参数,设置该参数为true后,set processor会自动规避value字段为空字符串或者null的情况, 不对文档进行任何修改,优雅的退出处理逻辑。

修复reindex api bug

issue: #52786

PR: #54901

调用reindex api,当max_docs参数<slices时,会报错max_docs为0,实际上是因为没有提前校验max_docs是否<slices,导致max_docs被设置为0。本次提交修复了这个bug,并且给出比较清晰的错误提示。

当使用date_nanos字段作为过滤条件并且使用now时,无法创建filtered alias

issue: #54315

PR: #54785

PUT date_source/_alias/date_nanos_alias
{
  "filter": {
   "range": {
      "date_nanos": {
        "gt": "now-7d/d"
      }
    }
  }
}

如上述操作,创建filtered alias时,以date_nanos字段为过滤条件,并且使用了now,会导致创建别名失败;该提交主要是修改了queryShardContext中的nowInMillis值,设置为当前时间戳。

禁止修改nested字段的include_in_root、include_in_parent参数

issue: #53792

PR: #54386

nested字段的include_in_root、include_in_parent参数,是无法进行修改的,但是当前调用PUT {index}/_mapping API进行修改时却没有报错,本次提交的改动是在修改两个参数时抛出400参数错误。

对所有处理字符串类型数据的ingest processor,支持字段值为数组

issue: #51087

PR: #53343

对Lowercase Processors、Uppercase Processors、Trim Processors等处理字符串类型数据的ingest processor, 都支持要处理的字段类型为数组类型。

修复_search/template API返回结果总量不准的bug

issue: #52801

PR: #53155

调用GET _search/template API时,如果设置了rest_total_hits_as_int为true,处理逻辑应该和GET _search API一致,trackTotalHitsUpTo变量会被设置为Integer.MAX_VALUE,因此都能够获取到准确的total hits count。但是在_search/template API的处理逻辑中,虽然rest_total_hits_as_int设置为了true, trackTotalHitsUpTo值却没有被设置,因此只能获取到最多为10000的total hits。

修复ingest pipeline simulate API异常处理bug

issue: #52833

PR: #52937

调用POST _ingest/pipeline/_simulate API时,如果传入的docs参数是空列表,则什么结果都不会返回。 Bug产生的原因是,在异步请求的ActionListener中没有对docs参数进行判空,导致始终没有响应给客户端。

修复删除enrich policy时的bug

issue: #5122.

PR: #52179

enrich policy关联的索引名称的格式为[policy_name]-*,在调用删除enrich policy的API:DELETE /_enrich/policy/时,需要删除所有的以[policy_name]开头的索引,因为代码直接通过通配符进行删除,如果设置了action.destructive_requires_name参数为true,则删除enrich policy会报错‘Wildcard expressions or all indices are not allowed’. 本次提交的改动是不直接通过通配符删除索引,获取到所有的索引名称后进行批量删除。

当因磁盘写满而导致ES自动对索引设置read_only_allow_delete block时,对http请求返回429状态码而不是403

issue: #49393

PR: #50166

这个提交有意思了,耗时也非常久,中间经过数次代码调整与优化。这个改动的初衷是因为在磁盘写满的情况下,ES会自动地把对应节点上的索引设置为只读(index.read_only_allow_delete=true), 后续有新的写入请求进来后,会直接返回403状态码拒绝进行写入。实际上,ES对所有类型的block,对应的http状态码都设置为403, 这就会导致一个问题,在部分客户端比如rest client碰到403的状态码,是不会对写入请求进行重试的,直接丢弃掉请求,导致数据丢失。所以该提交就需要针对因为index.read_only_allow_delete为true的情况,返回429状态码(429意思是TOO_MANY_REQUESTS, 请求太多,需要限流)。在提交代码之后,和社区的maintainer针对单元测试代码经过数次讨论,最终才被合并进master分支。讨论的焦点在于,6.8版本之后,如果磁盘空间释放出来,索引的只读的状态会被自动的release,有单独的线程轮询检查磁盘来确定要不要释放只读状态,所以需要对auto release机制是否开启进行随机选择。一方面,auto release开启,因为客户端接收到429状态码,写入请求经过重试后能够成功执行;另一方面,关闭auto release, 写入请求经过数次重试后仍然执行失败而报错。

elasticsearch-croneval工具异常捕获

issue: #49642

PR: #49744

elasticsearch-croneval工具是一个社区提供的用于校验cron表达式是否正确的一个工具,放置在elasticsearch安装目录的bin目录下。该工具的执行实际上调用了项目中的CronEvalTool类的main方法,实际上在执行的过程中,因为没有正确地捕获异常,导致在对非法的cron表达式进行校验时,工具直接把整个stacktrace信息都打印出来了。针对这个issue所做的提交捕获了这个异常,并给出了较为简明的错误信息。第一次提交之后,项目的maintainer表示要对这个改动进行team-discuss, 最终讨论下来的结果是:对该工具增加一个默认关闭的命令行参数,如果用户有需要查看完整的异常信息,添加该参数即可,默认情况下只显示简短的错误信息。

自定义normalizer无法使用bug修复

issue: #48650

PR: #48866

该bug是在7.x版本引入的,因为对自定义analyzer的代码进行了重构,导致所有custom normalizer都无法正常使用。可能因为normalizer的使用者并不是很多,一直到7.5发布后才被发现,该提交在7.6版本已经发布。关于这个bug的修复,有单独一篇文章进行介绍记一次向Elasticsearch开源社区贡献代码的经历.

继续阅读 »

自从2019年对Elasticsearch项目提交过一次代码之后,开始逐渐关注社区里的新动态,并且尝试去解决一些issue,通过这个过程去理解源码从而可以深入理解Elasticsearch的实现机制。现在把最近半年(2020年1月-2020年6月)对Elasticsearch项目所做的工作进行一次总结,记录遇到的问题和解决办法。

ingest set processor增加ignore_empty_value参数

issue: #54783

PR: #57030

使用ingest set processor时, 如果对于value字段为空字符串或者null的情况不需要进行处理,当前只能通过脚本判断value是否为空字符串或者null。本次提交对set processor增加了ignore_empty_value参数,设置该参数为true后,set processor会自动规避value字段为空字符串或者null的情况, 不对文档进行任何修改,优雅的退出处理逻辑。

修复reindex api bug

issue: #52786

PR: #54901

调用reindex api,当max_docs参数<slices时,会报错max_docs为0,实际上是因为没有提前校验max_docs是否<slices,导致max_docs被设置为0。本次提交修复了这个bug,并且给出比较清晰的错误提示。

当使用date_nanos字段作为过滤条件并且使用now时,无法创建filtered alias

issue: #54315

PR: #54785

PUT date_source/_alias/date_nanos_alias
{
  "filter": {
   "range": {
      "date_nanos": {
        "gt": "now-7d/d"
      }
    }
  }
}

如上述操作,创建filtered alias时,以date_nanos字段为过滤条件,并且使用了now,会导致创建别名失败;该提交主要是修改了queryShardContext中的nowInMillis值,设置为当前时间戳。

禁止修改nested字段的include_in_root、include_in_parent参数

issue: #53792

PR: #54386

nested字段的include_in_root、include_in_parent参数,是无法进行修改的,但是当前调用PUT {index}/_mapping API进行修改时却没有报错,本次提交的改动是在修改两个参数时抛出400参数错误。

对所有处理字符串类型数据的ingest processor,支持字段值为数组

issue: #51087

PR: #53343

对Lowercase Processors、Uppercase Processors、Trim Processors等处理字符串类型数据的ingest processor, 都支持要处理的字段类型为数组类型。

修复_search/template API返回结果总量不准的bug

issue: #52801

PR: #53155

调用GET _search/template API时,如果设置了rest_total_hits_as_int为true,处理逻辑应该和GET _search API一致,trackTotalHitsUpTo变量会被设置为Integer.MAX_VALUE,因此都能够获取到准确的total hits count。但是在_search/template API的处理逻辑中,虽然rest_total_hits_as_int设置为了true, trackTotalHitsUpTo值却没有被设置,因此只能获取到最多为10000的total hits。

修复ingest pipeline simulate API异常处理bug

issue: #52833

PR: #52937

调用POST _ingest/pipeline/_simulate API时,如果传入的docs参数是空列表,则什么结果都不会返回。 Bug产生的原因是,在异步请求的ActionListener中没有对docs参数进行判空,导致始终没有响应给客户端。

修复删除enrich policy时的bug

issue: #5122.

PR: #52179

enrich policy关联的索引名称的格式为[policy_name]-*,在调用删除enrich policy的API:DELETE /_enrich/policy/时,需要删除所有的以[policy_name]开头的索引,因为代码直接通过通配符进行删除,如果设置了action.destructive_requires_name参数为true,则删除enrich policy会报错‘Wildcard expressions or all indices are not allowed’. 本次提交的改动是不直接通过通配符删除索引,获取到所有的索引名称后进行批量删除。

当因磁盘写满而导致ES自动对索引设置read_only_allow_delete block时,对http请求返回429状态码而不是403

issue: #49393

PR: #50166

这个提交有意思了,耗时也非常久,中间经过数次代码调整与优化。这个改动的初衷是因为在磁盘写满的情况下,ES会自动地把对应节点上的索引设置为只读(index.read_only_allow_delete=true), 后续有新的写入请求进来后,会直接返回403状态码拒绝进行写入。实际上,ES对所有类型的block,对应的http状态码都设置为403, 这就会导致一个问题,在部分客户端比如rest client碰到403的状态码,是不会对写入请求进行重试的,直接丢弃掉请求,导致数据丢失。所以该提交就需要针对因为index.read_only_allow_delete为true的情况,返回429状态码(429意思是TOO_MANY_REQUESTS, 请求太多,需要限流)。在提交代码之后,和社区的maintainer针对单元测试代码经过数次讨论,最终才被合并进master分支。讨论的焦点在于,6.8版本之后,如果磁盘空间释放出来,索引的只读的状态会被自动的release,有单独的线程轮询检查磁盘来确定要不要释放只读状态,所以需要对auto release机制是否开启进行随机选择。一方面,auto release开启,因为客户端接收到429状态码,写入请求经过重试后能够成功执行;另一方面,关闭auto release, 写入请求经过数次重试后仍然执行失败而报错。

elasticsearch-croneval工具异常捕获

issue: #49642

PR: #49744

elasticsearch-croneval工具是一个社区提供的用于校验cron表达式是否正确的一个工具,放置在elasticsearch安装目录的bin目录下。该工具的执行实际上调用了项目中的CronEvalTool类的main方法,实际上在执行的过程中,因为没有正确地捕获异常,导致在对非法的cron表达式进行校验时,工具直接把整个stacktrace信息都打印出来了。针对这个issue所做的提交捕获了这个异常,并给出了较为简明的错误信息。第一次提交之后,项目的maintainer表示要对这个改动进行team-discuss, 最终讨论下来的结果是:对该工具增加一个默认关闭的命令行参数,如果用户有需要查看完整的异常信息,添加该参数即可,默认情况下只显示简短的错误信息。

自定义normalizer无法使用bug修复

issue: #48650

PR: #48866

该bug是在7.x版本引入的,因为对自定义analyzer的代码进行了重构,导致所有custom normalizer都无法正常使用。可能因为normalizer的使用者并不是很多,一直到7.5发布后才被发现,该提交在7.6版本已经发布。关于这个bug的修复,有单独一篇文章进行介绍记一次向Elasticsearch开源社区贡献代码的经历.

收起阅读 »

Elastic日报 第970期 (2020-06-16)

1、 使用Elasticsearch和Spark设计一个推荐系统。
https://t.cn/A6LtN6Ya
2、(自带梯子)Elasticsearch和Nodejs实现分页功能。
https://t.cn/A6LtNaaT
3、Elasticsearch入门到实战合计,带思维导图。
https://t.cn/A6LtNoXG

编辑:叮咚光军
归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup
继续阅读 »
1、 使用Elasticsearch和Spark设计一个推荐系统。
https://t.cn/A6LtN6Ya
2、(自带梯子)Elasticsearch和Nodejs实现分页功能。
https://t.cn/A6LtNaaT
3、Elasticsearch入门到实战合计,带思维导图。
https://t.cn/A6LtNoXG

编辑:叮咚光军
归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup 收起阅读 »

Enterprise:创建 meta 引擎来扩展你的 App search 体验

Elastic 从 7.6 发布引入了适用于 Elastic Cloud 及自我管理版本上的 Elastic App Search的 元引擎 (meta engine)。元引擎提供了跨多个现有或新引擎进行搜索的能力。 考虑在页面上添加一个新的搜索框,然后关闭该页面并在你选择的子引擎中搜索文档。 这使大型公司(或实际上任何规模的公司)可以统一并扩展其搜索功能,同时仍然允许搜索/网站管理员完全控制每个子引擎的行为。详细阅读请参阅 https://elasticstack.blog.csdn ... 06079
继续阅读 »
Elastic 从 7.6 发布引入了适用于 Elastic Cloud 及自我管理版本上的 Elastic App Search的 元引擎 (meta engine)。元引擎提供了跨多个现有或新引擎进行搜索的能力。 考虑在页面上添加一个新的搜索框,然后关闭该页面并在你选择的子引擎中搜索文档。 这使大型公司(或实际上任何规模的公司)可以统一并扩展其搜索功能,同时仍然允许搜索/网站管理员完全控制每个子引擎的行为。详细阅读请参阅 https://elasticstack.blog.csdn ... 06079 收起阅读 »

解决Elasticsearch HTTP方式查询报SocketTimeoutException的问题(待验证)

注意: 此解决方案,短时间内没有复现,还需要长时间验证是否有效。

现象

在使用HTTP方式,Elasticsearch 长时间不查询后,再次查询会出现抛出SocketTimeoutException的问题。

原因

基本逻辑

Elasticsearch 客户端会根据服务器返回的HTTP报文内容,来决定客户端保持HTTP连接Keep-Alive状态的策略。
如果结果如下,那么保持HTTP连接 Keep-Alive状态为120s

Connection: Keep-Alive
Keep-Alive: max=5, timeout = 120

如果不包含上述内容,那么客户端将保持Keep-Alive状态的时间为永久。
事实上,Elasticsearch服务器返回的报文,并没有上述HTTP头内容,所以客户端所有的HTTP连接都为永久保持Keep-Alive。
如果客户端长时间没有发送请求,服务器或者防火墙已经close了HTTP底层的TCP链接,但是此时客户端并不知道,由于Keep Alive是无限期,那么并不会重新建立连接,而是直接发送请求,此时就会得到SocketTimeout异常。

阅读源码

我使用的Elasticsearch的客户端下面的版本

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>rest</artifactId>
    <version>5.4.1</version>
</dependency>

其HTTP的发送依赖Maven包httpasyncclient.

这个包中的接口ConnectionKeepAliveStrategy,抽象了处理 HTTP Keepalive 的策略,其默认实现为:

@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {

    public static final DefaultConnectionKeepAliveStrategy INSTANCE = new DefaultConnectionKeepAliveStrategy();

    @Override
    public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
        Args.notNull(response, "HTTP response");
        final HeaderElementIterator it = new BasicHeaderElementIterator(
                response.headerIterator(HTTP.CONN_KEEP_ALIVE));
        while (it.hasNext()) {
            final HeaderElement he = it.nextElement();
            final String param = he.getName();
            final String value = he.getValue();
            if (value != null && param.equalsIgnoreCase("timeout")) {
                try {
                    return Long.parseLong(value) * 1000;
                } catch(final NumberFormatException ignore) {
                }
            }
        }
        return -1;
    }
}

-1代表多长时间,接口说明不是很清楚。
PoolingNHttpClientConnectionManager 类中的代码,实现了上述对待KeepAlive的逻辑,可以看到-1表示为:无限期

@Override
    public void releaseConnection(
            final NHttpClientConnection managedConn,
            final Object state,
            final long keepalive,
            final TimeUnit tunit) {
        Args.notNull(managedConn, "Managed connection");
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.detach(managedConn);
            if (entry == null) {
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Releasing connection: " + format(entry) + formatStats(entry.getRoute()));
            }
            final NHttpClientConnection conn = entry.getConnection();
            try {
                if (conn.isOpen()) {
                    entry.setState(state);
                    entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                       // keepalive 就是上面接口 ConnectionKeepAliveStrategy.getKeepAliveDuration()的返回值
                        if (keepalive > 0) {
                            s = "for " + (double) keepalive / 1000 + " seconds";
                        } else {
                       // 如果小于0 ,那么策略为indefinitely:无限期。
                            s = "indefinitely";
                        }
                        this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                    }
                }
            } finally {
                this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
                }
            }
        }
    }

解决方式

自定义类实现ConnectionKeepAliveStrategy接口:

public class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {

    public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();

    private CustomConnectionKeepAliveStrategy() {
        super();
    }

    /**
     * 最大keep alive的时间(分钟)
     * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
     */
    private final long MAX_KEEP_ALIVE_MINUTES = 10;

    @Override
    public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        long keepAliveDuration = super.getKeepAliveDuration(response, context);
        // <0 为无限期keepalive
        // 将无限期替换成一个默认的时间
        if(keepAliveDuration < 0){
            return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
        }
        return keepAliveDuration;
    }
}

在创建Elasticserach Client时,配置

RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
        httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
        return httpClientBuilder;
    }
});
继续阅读 »

注意: 此解决方案,短时间内没有复现,还需要长时间验证是否有效。

现象

在使用HTTP方式,Elasticsearch 长时间不查询后,再次查询会出现抛出SocketTimeoutException的问题。

原因

基本逻辑

Elasticsearch 客户端会根据服务器返回的HTTP报文内容,来决定客户端保持HTTP连接Keep-Alive状态的策略。
如果结果如下,那么保持HTTP连接 Keep-Alive状态为120s

Connection: Keep-Alive
Keep-Alive: max=5, timeout = 120

如果不包含上述内容,那么客户端将保持Keep-Alive状态的时间为永久。
事实上,Elasticsearch服务器返回的报文,并没有上述HTTP头内容,所以客户端所有的HTTP连接都为永久保持Keep-Alive。
如果客户端长时间没有发送请求,服务器或者防火墙已经close了HTTP底层的TCP链接,但是此时客户端并不知道,由于Keep Alive是无限期,那么并不会重新建立连接,而是直接发送请求,此时就会得到SocketTimeout异常。

阅读源码

我使用的Elasticsearch的客户端下面的版本

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>rest</artifactId>
    <version>5.4.1</version>
</dependency>

其HTTP的发送依赖Maven包httpasyncclient.

这个包中的接口ConnectionKeepAliveStrategy,抽象了处理 HTTP Keepalive 的策略,其默认实现为:

@Contract(threading = ThreadingBehavior.IMMUTABLE)
public class DefaultConnectionKeepAliveStrategy implements ConnectionKeepAliveStrategy {

    public static final DefaultConnectionKeepAliveStrategy INSTANCE = new DefaultConnectionKeepAliveStrategy();

    @Override
    public long getKeepAliveDuration(final HttpResponse response, final HttpContext context) {
        Args.notNull(response, "HTTP response");
        final HeaderElementIterator it = new BasicHeaderElementIterator(
                response.headerIterator(HTTP.CONN_KEEP_ALIVE));
        while (it.hasNext()) {
            final HeaderElement he = it.nextElement();
            final String param = he.getName();
            final String value = he.getValue();
            if (value != null && param.equalsIgnoreCase("timeout")) {
                try {
                    return Long.parseLong(value) * 1000;
                } catch(final NumberFormatException ignore) {
                }
            }
        }
        return -1;
    }
}

-1代表多长时间,接口说明不是很清楚。
PoolingNHttpClientConnectionManager 类中的代码,实现了上述对待KeepAlive的逻辑,可以看到-1表示为:无限期

@Override
    public void releaseConnection(
            final NHttpClientConnection managedConn,
            final Object state,
            final long keepalive,
            final TimeUnit tunit) {
        Args.notNull(managedConn, "Managed connection");
        synchronized (managedConn) {
            final CPoolEntry entry = CPoolProxy.detach(managedConn);
            if (entry == null) {
                return;
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug("Releasing connection: " + format(entry) + formatStats(entry.getRoute()));
            }
            final NHttpClientConnection conn = entry.getConnection();
            try {
                if (conn.isOpen()) {
                    entry.setState(state);
                    entry.updateExpiry(keepalive, tunit != null ? tunit : TimeUnit.MILLISECONDS);
                    if (this.log.isDebugEnabled()) {
                        final String s;
                       // keepalive 就是上面接口 ConnectionKeepAliveStrategy.getKeepAliveDuration()的返回值
                        if (keepalive > 0) {
                            s = "for " + (double) keepalive / 1000 + " seconds";
                        } else {
                       // 如果小于0 ,那么策略为indefinitely:无限期。
                            s = "indefinitely";
                        }
                        this.log.debug("Connection " + format(entry) + " can be kept alive " + s);
                    }
                }
            } finally {
                this.pool.release(entry, conn.isOpen() && entry.isRouteComplete());
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Connection released: " + format(entry) + formatStats(entry.getRoute()));
                }
            }
        }
    }

解决方式

自定义类实现ConnectionKeepAliveStrategy接口:

public class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {

    public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();

    private CustomConnectionKeepAliveStrategy() {
        super();
    }

    /**
     * 最大keep alive的时间(分钟)
     * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
     */
    private final long MAX_KEEP_ALIVE_MINUTES = 10;

    @Override
    public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
        long keepAliveDuration = super.getKeepAliveDuration(response, context);
        // <0 为无限期keepalive
        // 将无限期替换成一个默认的时间
        if(keepAliveDuration < 0){
            return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
        }
        return keepAliveDuration;
    }
}

在创建Elasticserach Client时,配置

RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
        httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
        return httpClientBuilder;
    }
});
收起阅读 »

《腾讯Elasticsearch海量规模背后的内核优化剖析》答疑

今天下午的《腾讯Elasticsearch海量规模背后的内核优化剖析》分享 大家反映强烈,由于时间关系,大家的问题没能及时答复,这里集中解答,大家如果还有其它疑问也可以持续提问。感谢大家的关注! 另外腾讯云上有内核增强版的ES服务,包含了我们所有的内核优化项,欢迎大家体验! 团队也在持续招聘,欢迎简历来砸:danielhuang@tencent.com; johngqjiang@tencent.com

继续阅读 »

今天下午的《腾讯Elasticsearch海量规模背后的内核优化剖析》分享 大家反映强烈,由于时间关系,大家的问题没能及时答复,这里集中解答,大家如果还有其它疑问也可以持续提问。感谢大家的关注! 另外腾讯云上有内核增强版的ES服务,包含了我们所有的内核优化项,欢迎大家体验! 团队也在持续招聘,欢迎简历来砸:danielhuang@tencent.com; johngqjiang@tencent.com

收起阅读 »

类比mysql查询,适合新手学习Elasticsearch的DSL查询语句

Mysql查询与Es的DSL查询语句对照

一、Mysql数据库与Elasticsearch的类比

关系型数据库(比如Mysql) 非关系型数据库(Elasticsearch)
数据库 Database 索引 Index
表 Table 类型 Type
数据行 Row 文档 Document
数据列 Column 字段 Field
约束 Schema 映射 Mapping

二、Mysql查询语句与DSL查询类比

Mysql查询语句与Elasticsearch的DSL查询类比,主要通过mysql库中的search_lexicon表和es中的search_lexicon_v1索引进行比较。

2.1 search_lexicon 表结构

CREATE TABLE `search_lexicon` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `keyword` varchar(50) NOT NULL DEFAULT '' COMMENT '关键词',
  `keyword_crc32` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '关键词校验',
  `search_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
  `consumer_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消费者ID',
  `num` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '文档数',
  `views` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '搜索次数',
  `state` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '状态 0 关闭 1 开启',
  `is_del` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否删除 0 正常 1 删除',
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
  `updatetime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据最后更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_search_lexicon_views` (`views`),
  KEY `idx_search_lexicon_updatetime` (`updatetime`) USING BTREE,
  KEY `idx_search_lexicon_keyword_type` (`keyword_crc32`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='三方平台搜索词库';

2.2 search_lexicon_v1 索引结构

{
  "search_lexicon_v1" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "@version" : {
            "type" : "long"
          },
          "consumer_id" : {
            "type" : "keyword"
          },
          "createtime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "id" : {
            "type" : "integer"
          },
          "is_del" : {
            "type" : "integer"
          },
          "keyword" : {
            "type" : "text",
            "fields" : {
              "standard" : {
                "type" : "text",
                "analyzer" : "by_standard_no_synonym"
              }
            },
            "analyzer" : "by_max_word_pinyin_no_synonym"
          },
          "num" : {
            "type" : "long"
          },
          "search_type" : {
            "type" : "integer"
          },
          "state" : {
            "type" : "integer"
          },
          "updatetime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "views" : {
            "type" : "long"
          }
        }
      }
    }
  }
}

2.3 查询语句对照

注意:dsl查询,每次默认展示10(size默认为10)条

以下的查询条件,是为了写查询而构造的,无任何实质性的意义,仅供mysql查询与dsl查询对比用

布尔查询支持的子查询类型共有四种,分别是:must,should,must_not和filter:

查询字句 说明 类型
must 文档必须符合must中所有的条件,会影响相关性得分 数组
should 文档应该匹配should子句查询的一个或多个 数组
must_not 文档必须不符合must_not 中的所有条件 数组
filter 过滤器,文档必须匹配该过滤条件,跟must子句的唯一区别是,filter不影响查询的score ,会缓存 字典

A、查询所有数据

mysql

SELECT * FROM search_lexicon

dsl

GET search_lexicon/_search
{

}
或
GET search_lexicon/_search
{
  "query": {
    "match_all": {}
  }
}

B、 查询一个条件且条件只有一个值(consumer_id=demo)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id='demo'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "consumer_id": "demo"
        }
      }
    }
  }
}
或
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        }
      ]
    }
  }
}

两者的区别在于前一个filter是一个对象,filter中只能放一个条件,后者filter是一个数组,里面可以放多个对象(多个查询条件),后续都将按照第二种方式查询

C、 查询一个条件且条件有多个值(consumer_id的值为demo,demo2)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id in('demo','demo2')

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "consumer_id": [
              "demo",
              "demo2"
            ]
          }
        }
      ]
    }
  }
}

D、 查询consumer_id=demo 且 state=1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ]
    }
  }
}

E、 查询consumer_id=demo , state=1 且 is_del<>1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1 and is_del <>1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ],
      "must_not": [
        {
          "term": {
            "is_del": {
              "value": 1
            }
          }
        }
      ]
    }
  }
}

F、查询Sconsumer_id ='demo' or (state=1 and is_del =0)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  }
}

G、在F的基础上,查询指定字段

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  }
}

H、在G的基础上,增加排序

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ]
}

I、在H的基础上,添加分页

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC LIMIT 0,20

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ],
  "from": 0,
  "size": 20
}

# from 是一个偏移量,size为每页显示条数

J、去重查询

mysql

SELECT DISTINCT state FROM search_lexicon WHERE consumer_id = 'demo'

dsl

# 通过折叠去重查询
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "collapse": {
    "field": "state"
  }
}

K、分组查询

mysql

SELECT  * FROM search_lexicon WHERE consumer_id = 'demo' GROUP BY state

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "size": 0, 
  "aggs": {
    "aaa": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}

L、模糊匹配

mysql

SELECT * FROM search_lexicon WHERE consumer_id="demo" and keyword LIKE '%渴望%'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ],
      "must": [
        {
          "match": {
            "keyword": "渴望"
          }
        }
      ]
    }
  }
}
继续阅读 »

Mysql查询与Es的DSL查询语句对照

一、Mysql数据库与Elasticsearch的类比

关系型数据库(比如Mysql) 非关系型数据库(Elasticsearch)
数据库 Database 索引 Index
表 Table 类型 Type
数据行 Row 文档 Document
数据列 Column 字段 Field
约束 Schema 映射 Mapping

二、Mysql查询语句与DSL查询类比

Mysql查询语句与Elasticsearch的DSL查询类比,主要通过mysql库中的search_lexicon表和es中的search_lexicon_v1索引进行比较。

2.1 search_lexicon 表结构

CREATE TABLE `search_lexicon` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `keyword` varchar(50) NOT NULL DEFAULT '' COMMENT '关键词',
  `keyword_crc32` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '关键词校验',
  `search_type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '类型',
  `consumer_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消费者ID',
  `num` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '文档数',
  `views` int(10) unsigned NOT NULL DEFAULT '0' COMMENT '搜索次数',
  `state` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '状态 0 关闭 1 开启',
  `is_del` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '是否删除 0 正常 1 删除',
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '数据创建时间',
  `updatetime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '数据最后更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_search_lexicon_views` (`views`),
  KEY `idx_search_lexicon_updatetime` (`updatetime`) USING BTREE,
  KEY `idx_search_lexicon_keyword_type` (`keyword_crc32`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='三方平台搜索词库';

2.2 search_lexicon_v1 索引结构

{
  "search_lexicon_v1" : {
    "mappings" : {
      "_doc" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "@version" : {
            "type" : "long"
          },
          "consumer_id" : {
            "type" : "keyword"
          },
          "createtime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "id" : {
            "type" : "integer"
          },
          "is_del" : {
            "type" : "integer"
          },
          "keyword" : {
            "type" : "text",
            "fields" : {
              "standard" : {
                "type" : "text",
                "analyzer" : "by_standard_no_synonym"
              }
            },
            "analyzer" : "by_max_word_pinyin_no_synonym"
          },
          "num" : {
            "type" : "long"
          },
          "search_type" : {
            "type" : "integer"
          },
          "state" : {
            "type" : "integer"
          },
          "updatetime" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"
          },
          "views" : {
            "type" : "long"
          }
        }
      }
    }
  }
}

2.3 查询语句对照

注意:dsl查询,每次默认展示10(size默认为10)条

以下的查询条件,是为了写查询而构造的,无任何实质性的意义,仅供mysql查询与dsl查询对比用

布尔查询支持的子查询类型共有四种,分别是:must,should,must_not和filter:

查询字句 说明 类型
must 文档必须符合must中所有的条件,会影响相关性得分 数组
should 文档应该匹配should子句查询的一个或多个 数组
must_not 文档必须不符合must_not 中的所有条件 数组
filter 过滤器,文档必须匹配该过滤条件,跟must子句的唯一区别是,filter不影响查询的score ,会缓存 字典

A、查询所有数据

mysql

SELECT * FROM search_lexicon

dsl

GET search_lexicon/_search
{

}
或
GET search_lexicon/_search
{
  "query": {
    "match_all": {}
  }
}

B、 查询一个条件且条件只有一个值(consumer_id=demo)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id='demo'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": {
        "term": {
          "consumer_id": "demo"
        }
      }
    }
  }
}
或
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        }
      ]
    }
  }
}

两者的区别在于前一个filter是一个对象,filter中只能放一个条件,后者filter是一个数组,里面可以放多个对象(多个查询条件),后续都将按照第二种方式查询

C、 查询一个条件且条件有多个值(consumer_id的值为demo,demo2)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id in('demo','demo2')

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "terms": {
            "consumer_id": [
              "demo",
              "demo2"
            ]
          }
        }
      ]
    }
  }
}

D、 查询consumer_id=demo 且 state=1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ]
    }
  }
}

E、 查询consumer_id=demo , state=1 且 is_del<>1的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' and state=1 and is_del <>1

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": "demo"
          }
        },
         {
          "term": {
            "state": 1
          }
        }
      ],
      "must_not": [
        {
          "term": {
            "is_del": {
              "value": 1
            }
          }
        }
      ]
    }
  }
}

F、查询Sconsumer_id ='demo' or (state=1 and is_del =0)的数据

mysql

SELECT * FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  }
}

G、在F的基础上,查询指定字段

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0)

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  }
}

H、在G的基础上,增加排序

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ]
}

I、在H的基础上,添加分页

mysql

SELECT id,keyword,consumer_id,num,views,state,is_del FROM search_lexicon WHERE consumer_id ='demo' or (state=1 and is_del =0) ORDER BY state DESC,id DESC LIMIT 0,20

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "should": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        },
        {
          "bool": {
            "filter": [
              {
                "term": {
                  "state": 1
                }
              },
              {
                "term": {
                  "is_del": 0
                }
              }
            ]
          }
        }
      ]
    }
  },
  "_source": {
    "includes": [
      "id",
      "keyword",
      "num",
      "is_del",
      "state",
      "consumer_id",
      "views"
    ]
  },
  "sort": [
    {
      "state": {
        "order": "desc"
      }
    },
    {
      "id": {
        "order": "desc"
      }
    }
  ],
  "from": 0,
  "size": 20
}

# from 是一个偏移量,size为每页显示条数

J、去重查询

mysql

SELECT DISTINCT state FROM search_lexicon WHERE consumer_id = 'demo'

dsl

# 通过折叠去重查询
GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "collapse": {
    "field": "state"
  }
}

K、分组查询

mysql

SELECT  * FROM search_lexicon WHERE consumer_id = 'demo' GROUP BY state

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ]
    }
  },
  "size": 0, 
  "aggs": {
    "aaa": {
      "terms": {
        "field": "state",
        "size": 10
      }
    }
  }
}

L、模糊匹配

mysql

SELECT * FROM search_lexicon WHERE consumer_id="demo" and keyword LIKE '%渴望%'

dsl

GET search_lexicon/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "consumer_id": {
              "value": "demo"
            }
          }
        }
      ],
      "must": [
        {
          "match": {
            "keyword": "渴望"
          }
        }
      ]
    }
  }
}
收起阅读 »

基于ES的aliyun-knn插件,开发的以图搜图搜索引擎

基于ES的aliyun-knn插件,开发的以图搜图搜索引擎

本例是基于Elasticsearch6.7 版本, 安装了aliyun-knn插件;设计的图片向量特征为512维度.
如果自建ES,是无法使用aliyun-knn插件的,自建建议使用ES7.x版本,并按照fast-elasticsearch-vector-scoring插件(https://github.com/lior-k/fast-elasticsearch-vector-scoring/)

由于我的python水平有限,文中设计到的图片特征提取,使用了yongyuan.name的VGGNet库,再此表示感谢!

一、 ES设计

1.1 索引结构

# 创建一个图片索引
PUT images_v2
{
  "aliases": {
    "images": {}
  }, 
  "settings": {
    "index.codec": "proxima",
    "index.vector.algorithm": "hnsw",
    "index.number_of_replicas":1,
    "index.number_of_shards":3
  },
  "mappings": {
    "_doc": {
      "properties": {
        "feature": {
          "type": "proxima_vector",
          "dim": 512
        },
        "relation_id": {
          "type": "keyword"
        },
        "image_path": {
          "type": "keyword"
        }
      }
    }
  }
}

1.2 DSL语句

GET images/_search
{
  "query": {
    "hnsw": {
      "feature": {
        "vector": [255,....255],
        "size": 3,
        "ef": 1
      }
    }
  },
  "from": 0,
  "size": 20, 
  "sort": [
    {
      "_score": {
        "order": "desc"
      }
    }
  ], 
  "collapse": {
    "field": "relation_id"
  },
  "_source": {
    "includes": [
      "relation_id",
      "image_path"
    ]
  }
}

二、图片特征

extract_cnn_vgg16_keras.py

# -*- coding: utf-8 -*-
# Author: yongyuan.name
import numpy as np
from numpy import linalg as LA
from keras.applications.vgg16 import VGG16
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
class VGGNet:
    def __init__(self):
        # weights: 'imagenet'
        # pooling: 'max' or 'avg'
        # input_shape: (width, height, 3), width and height should >= 48
        self.input_shape = (224, 224, 3)
        self.weight = 'imagenet'
        self.pooling = 'max'
        self.model = VGG16(weights = self.weight, input_shape = (self.input_shape[0], self.input_shape[1], self.input_shape[2]), pooling = self.pooling, include_top = False)
        self.model.predict(np.zeros((1, 224, 224 , 3)))
    '''
    Use vgg16 model to extract features
    Output normalized feature vector
    '''
    def extract_feat(self, img_path):
        img = image.load_img(img_path, target_size=(self.input_shape[0], self.input_shape[1]))
        img = image.img_to_array(img)
        img = np.expand_dims(img, axis=0)
        img = preprocess_input(img)
        feat = self.model.predict(img)
        norm_feat = feat[0]/LA.norm(feat[0])
        return norm_feat
# 获取图片特征
from extract_cnn_vgg16_keras import VGGNet
model = VGGNet()
file_path = "./demo.jpg"
queryVec = model.extract_feat(file_path)
feature = queryVec.tolist()

三、将图片特征写入ES

helper.py

import re
import urllib.request
def strip(path):
    """
    需要清洗的文件夹名字
    清洗掉Windows系统非法文件夹名字的字符串
    :param path:
    :return:
    """
    path = re.sub(r'[?\\*|“<>:/]', '', str(path))
    return path

def getfilename(url):
    """
    通过url获取最后的文件名
    :param url:
    :return:
    """
    filename = url.split('/')[-1]
    filename = strip(filename)
    return filename

def urllib_download(url, filename):
    """
    下载
    :param url:
    :param filename:
    :return:
    """
    return urllib.request.urlretrieve(url, filename)

train.py

# coding=utf-8
import mysql.connector
import os
from helper import urllib_download, getfilename
from elasticsearch5 import Elasticsearch, helpers
from extract_cnn_vgg16_keras import VGGNet
model = VGGNet()
http_auth = ("elastic", "123455")
es = Elasticsearch("http://127.0.0.1:9200", http_auth=http_auth)
mydb = mysql.connector.connect(
    host="127.0.0.1",  # 数据库主机地址
    user="root",  # 数据库用户名
    passwd="123456",  # 数据库密码
    database="images"
)
mycursor = mydb.cursor()
imgae_path = "./images/"
def get_data(page=1):
    page_size = 20
    offset = (page - 1) * page_size
    sql = """
    SELECT id, relation_id, photo FROM  images  LIMIT {0},{1}
    """
    mycursor.execute(sql.format(offset, page_size))
    myresult = mycursor.fetchall()
    return myresult

def train_image_feature(myresult):
    indexName = "images"
    photo_path = "http://域名/{0}"
    actions = []
    for x in myresult:
            id = str(x[0])
    relation_id = x[1]
    # photo = x[2].decode(encoding="utf-8")
    photo = x[2]
    full_photo = photo_path.format(photo)
    filename = imgae_path + getfilename(full_photo)
    if not os.path.exists(filename):
        try:
            urllib_download(full_photo, filename)
        except BaseException as e:
            print("gid:{0}的图片{1}未能下载成功".format(gid, full_photo))
            continue
    if not os.path.exists(filename):
         continue
    try:
        feature = model.extract_feat(filename).tolist()
        action = {
        "_op_type": "index",
        "_index": indexName,
        "_type": "_doc",
        "_id": id,
        "_source": {
                            "relation_id": relation_id,
                            "feature": feature,
                            "image_path": photo
        }
        }
        actions.append(action)
    except BaseException as e:
        print("id:{0}的图片{1}未能获取到特征".format(id, full_photo))
        continue
    # print(actions)
    succeed_num = 0
    for ok, response in helpers.streaming_bulk(es, actions):
        if not ok:
            print(ok)
            print(response)
        else:
            succeed_num += 1
            print("本次更新了{0}条数据".format(succeed_num))
            es.indices.refresh(indexName)

page = 1
while True:
    print("当前第{0}页".format(page))
    myresult = get_data(page=page)
    if not myresult:
        print("没有获取到数据了,退出")
        break
    train_image_feature(myresult)
    page += 1

四、搜索图片

import requests
import json
import os
import time
from elasticsearch5 import Elasticsearch
from extract_cnn_vgg16_keras import VGGNet
model = VGGNet()
http_auth = ("elastic", "123455")
es = Elasticsearch("http://127.0.0.1:9200", http_auth=http_auth)
#上传图片保存
upload_image_path = "./runtime/"
upload_image = request.files.get("image")
upload_image_type = upload_image.content_type.split('/')[-1]
file_name = str(time.time())[:10] + '.' + upload_image_type
file_path = upload_image_path + file_name
upload_image.save(file_path)
# 计算图片特征向量
queryVec = model.extract_feat(file_path)
feature = queryVec.tolist()
# 删除图片
os.remove(file_path)
# 根据特征向量去ES中搜索
body = {
    "query": {
        "hnsw": {
            "feature": {
                "vector": feature,
                "size": 5,
                "ef": 10
            }
        }
    },
    # "collapse": {
    # "field": "relation_id"
    # },
    "_source": {"includes": ["relation_id", "image_path"]},
    "from": 0,
    "size": 40
}
indexName = "images"
res = es.search(indexName, body=body)
# 返回的结果,最好根据自身情况,将得分低的过滤掉...经过测试, 得分在0.65及其以上的,比较符合要求

五、依赖的包

mysql_connector_repackaged
elasticsearch
Pillow
tensorflow
requests
pandas
Keras
numpy
继续阅读 »

基于ES的aliyun-knn插件,开发的以图搜图搜索引擎

本例是基于Elasticsearch6.7 版本, 安装了aliyun-knn插件;设计的图片向量特征为512维度.
如果自建ES,是无法使用aliyun-knn插件的,自建建议使用ES7.x版本,并按照fast-elasticsearch-vector-scoring插件(https://github.com/lior-k/fast-elasticsearch-vector-scoring/)

由于我的python水平有限,文中设计到的图片特征提取,使用了yongyuan.name的VGGNet库,再此表示感谢!

一、 ES设计

1.1 索引结构

# 创建一个图片索引
PUT images_v2
{
  "aliases": {
    "images": {}
  }, 
  "settings": {
    "index.codec": "proxima",
    "index.vector.algorithm": "hnsw",
    "index.number_of_replicas":1,
    "index.number_of_shards":3
  },
  "mappings": {
    "_doc": {
      "properties": {
        "feature": {
          "type": "proxima_vector",
          "dim": 512
        },
        "relation_id": {
          "type": "keyword"
        },
        "image_path": {
          "type": "keyword"
        }
      }
    }
  }
}

1.2 DSL语句

GET images/_search
{
  "query": {
    "hnsw": {
      "feature": {
        "vector": [255,....255],
        "size": 3,
        "ef": 1
      }
    }
  },
  "from": 0,
  "size": 20, 
  "sort": [
    {
      "_score": {
        "order": "desc"
      }
    }
  ], 
  "collapse": {
    "field": "relation_id"
  },
  "_source": {
    "includes": [
      "relation_id",
      "image_path"
    ]
  }
}

二、图片特征

extract_cnn_vgg16_keras.py

# -*- coding: utf-8 -*-
# Author: yongyuan.name
import numpy as np
from numpy import linalg as LA
from keras.applications.vgg16 import VGG16
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
class VGGNet:
    def __init__(self):
        # weights: 'imagenet'
        # pooling: 'max' or 'avg'
        # input_shape: (width, height, 3), width and height should >= 48
        self.input_shape = (224, 224, 3)
        self.weight = 'imagenet'
        self.pooling = 'max'
        self.model = VGG16(weights = self.weight, input_shape = (self.input_shape[0], self.input_shape[1], self.input_shape[2]), pooling = self.pooling, include_top = False)
        self.model.predict(np.zeros((1, 224, 224 , 3)))
    '''
    Use vgg16 model to extract features
    Output normalized feature vector
    '''
    def extract_feat(self, img_path):
        img = image.load_img(img_path, target_size=(self.input_shape[0], self.input_shape[1]))
        img = image.img_to_array(img)
        img = np.expand_dims(img, axis=0)
        img = preprocess_input(img)
        feat = self.model.predict(img)
        norm_feat = feat[0]/LA.norm(feat[0])
        return norm_feat
# 获取图片特征
from extract_cnn_vgg16_keras import VGGNet
model = VGGNet()
file_path = "./demo.jpg"
queryVec = model.extract_feat(file_path)
feature = queryVec.tolist()

三、将图片特征写入ES

helper.py

import re
import urllib.request
def strip(path):
    """
    需要清洗的文件夹名字
    清洗掉Windows系统非法文件夹名字的字符串
    :param path:
    :return:
    """
    path = re.sub(r'[?\\*|“<>:/]', '', str(path))
    return path

def getfilename(url):
    """
    通过url获取最后的文件名
    :param url:
    :return:
    """
    filename = url.split('/')[-1]
    filename = strip(filename)
    return filename

def urllib_download(url, filename):
    """
    下载
    :param url:
    :param filename:
    :return:
    """
    return urllib.request.urlretrieve(url, filename)

train.py

# coding=utf-8
import mysql.connector
import os
from helper import urllib_download, getfilename
from elasticsearch5 import Elasticsearch, helpers
from extract_cnn_vgg16_keras import VGGNet
model = VGGNet()
http_auth = ("elastic", "123455")
es = Elasticsearch("http://127.0.0.1:9200", http_auth=http_auth)
mydb = mysql.connector.connect(
    host="127.0.0.1",  # 数据库主机地址
    user="root",  # 数据库用户名
    passwd="123456",  # 数据库密码
    database="images"
)
mycursor = mydb.cursor()
imgae_path = "./images/"
def get_data(page=1):
    page_size = 20
    offset = (page - 1) * page_size
    sql = """
    SELECT id, relation_id, photo FROM  images  LIMIT {0},{1}
    """
    mycursor.execute(sql.format(offset, page_size))
    myresult = mycursor.fetchall()
    return myresult

def train_image_feature(myresult):
    indexName = "images"
    photo_path = "http://域名/{0}"
    actions = []
    for x in myresult:
            id = str(x[0])
    relation_id = x[1]
    # photo = x[2].decode(encoding="utf-8")
    photo = x[2]
    full_photo = photo_path.format(photo)
    filename = imgae_path + getfilename(full_photo)
    if not os.path.exists(filename):
        try:
            urllib_download(full_photo, filename)
        except BaseException as e:
            print("gid:{0}的图片{1}未能下载成功".format(gid, full_photo))
            continue
    if not os.path.exists(filename):
         continue
    try:
        feature = model.extract_feat(filename).tolist()
        action = {
        "_op_type": "index",
        "_index": indexName,
        "_type": "_doc",
        "_id": id,
        "_source": {
                            "relation_id": relation_id,
                            "feature": feature,
                            "image_path": photo
        }
        }
        actions.append(action)
    except BaseException as e:
        print("id:{0}的图片{1}未能获取到特征".format(id, full_photo))
        continue
    # print(actions)
    succeed_num = 0
    for ok, response in helpers.streaming_bulk(es, actions):
        if not ok:
            print(ok)
            print(response)
        else:
            succeed_num += 1
            print("本次更新了{0}条数据".format(succeed_num))
            es.indices.refresh(indexName)

page = 1
while True:
    print("当前第{0}页".format(page))
    myresult = get_data(page=page)
    if not myresult:
        print("没有获取到数据了,退出")
        break
    train_image_feature(myresult)
    page += 1

四、搜索图片

import requests
import json
import os
import time
from elasticsearch5 import Elasticsearch
from extract_cnn_vgg16_keras import VGGNet
model = VGGNet()
http_auth = ("elastic", "123455")
es = Elasticsearch("http://127.0.0.1:9200", http_auth=http_auth)
#上传图片保存
upload_image_path = "./runtime/"
upload_image = request.files.get("image")
upload_image_type = upload_image.content_type.split('/')[-1]
file_name = str(time.time())[:10] + '.' + upload_image_type
file_path = upload_image_path + file_name
upload_image.save(file_path)
# 计算图片特征向量
queryVec = model.extract_feat(file_path)
feature = queryVec.tolist()
# 删除图片
os.remove(file_path)
# 根据特征向量去ES中搜索
body = {
    "query": {
        "hnsw": {
            "feature": {
                "vector": feature,
                "size": 5,
                "ef": 10
            }
        }
    },
    # "collapse": {
    # "field": "relation_id"
    # },
    "_source": {"includes": ["relation_id", "image_path"]},
    "from": 0,
    "size": 40
}
indexName = "images"
res = es.search(indexName, body=body)
# 返回的结果,最好根据自身情况,将得分低的过滤掉...经过测试, 得分在0.65及其以上的,比较符合要求

五、依赖的包

mysql_connector_repackaged
elasticsearch
Pillow
tensorflow
requests
pandas
Keras
numpy
收起阅读 »

一种处理Elasticsearch对象数组类型的方式

目前情况

Elasticsearch中处理对象数组有两种格式array和nested,但这两种都有一定的不足。
以下面的文档为例:

{
  "user": [
    {
      "first": "John",
      "last": "Smith"
    },
    {
      "first": "Alice",
      "last": "White"
    }
  ]
}

如果在mapping中以array存储,那么实际存储为:

user.first:["John","Alice"]
user.last:["Smith","White"]

如果以must的方式查询user.first:Johnuser.last:White,那么这篇文档也会命中,这不是我们期望的。

如果在mapping中以array存储,Elasticsearch将每个对象视为一个doc,这例子会存储3个doc,会严重影响ES写入和查询的效率。

Flatten格式

我想到的存储方式很简单,就是将对象数组打平保存为一个keyword类型的字符串数组,故起名Flatten格式。 以上面文档为例,数组对象需要转换为下面的格式

"user.flatten": [
    "first:John",
    "last:Smith",
    "first:John&last:Smith",
    "first:Alice",
    "last:White",
    "first:Alice&last:White"
  ]

这样以must的方式查询user.first:Johnuser.last:White,可以转换为term查询first:John&last:White,并不会命中文档。
同时,这种方式还是保存1个doc,避免了nested的缺点。

对于flatten格式有几点说明

user.flatten数组的大小

如果user对象个数为M,user属性个数为N,那么其数组大小为(2^N-1)*M

对象为空怎么处理

建议以null方式保存,例如:

    {
              "first": "John",
             "last": null
    }

转换后的格式

    [
        "first:John",
        "last:null",
        "first:John&last:null",
    ]

保存和查询对于对象属性的处理顺序要保持一致

上述例子都是按first&last顺序存储的,那么以must的方式查询user.first:Johnuser.last:White也要以first:John&last:White方式查询,不能用last:White&first:John

不足

  • 需要自己编码将JSON对象转换为字符串数组
  • 需要自己编码转换查询语句
  • 只支持term查询
继续阅读 »

目前情况

Elasticsearch中处理对象数组有两种格式array和nested,但这两种都有一定的不足。
以下面的文档为例:

{
  "user": [
    {
      "first": "John",
      "last": "Smith"
    },
    {
      "first": "Alice",
      "last": "White"
    }
  ]
}

如果在mapping中以array存储,那么实际存储为:

user.first:["John","Alice"]
user.last:["Smith","White"]

如果以must的方式查询user.first:Johnuser.last:White,那么这篇文档也会命中,这不是我们期望的。

如果在mapping中以array存储,Elasticsearch将每个对象视为一个doc,这例子会存储3个doc,会严重影响ES写入和查询的效率。

Flatten格式

我想到的存储方式很简单,就是将对象数组打平保存为一个keyword类型的字符串数组,故起名Flatten格式。 以上面文档为例,数组对象需要转换为下面的格式

"user.flatten": [
    "first:John",
    "last:Smith",
    "first:John&last:Smith",
    "first:Alice",
    "last:White",
    "first:Alice&last:White"
  ]

这样以must的方式查询user.first:Johnuser.last:White,可以转换为term查询first:John&last:White,并不会命中文档。
同时,这种方式还是保存1个doc,避免了nested的缺点。

对于flatten格式有几点说明

user.flatten数组的大小

如果user对象个数为M,user属性个数为N,那么其数组大小为(2^N-1)*M

对象为空怎么处理

建议以null方式保存,例如:

    {
              "first": "John",
             "last": null
    }

转换后的格式

    [
        "first:John",
        "last:null",
        "first:John&last:null",
    ]

保存和查询对于对象属性的处理顺序要保持一致

上述例子都是按first&last顺序存储的,那么以must的方式查询user.first:Johnuser.last:White也要以first:John&last:White方式查询,不能用last:White&first:John

不足

  • 需要自己编码将JSON对象转换为字符串数组
  • 需要自己编码转换查询语句
  • 只支持term查询
收起阅读 »

Elasticsearch 7.6 利用机器学习来检测文本语言

Elasticsearch 新近发布的 7.6 版本里面包含了很多激动人心的功能,而最让我感兴趣的是利用机器学习来自动检测语言的功能。

功能初探

检测文本语言本身不是什么稀奇事,之前做爬虫的时候,就做过对网页正文进行语言的检测,有很多成熟的方案,而最好的就属 Google Chrome 团队开源的 CLD 系列(全名:Compact Language Detector)了,能够检测多达 80 种各种语言,我用过CLD2,是基于 C++ 贝叶斯分类器实现的,而 CLD3 则是基于神经网络实现的,无疑更加准确,这次 Elasticsearch 将这个非常小的功能也直接集成到了默认的发行包里面,对于使用者来说可以说是带来很大的方便。

多语言的痛点

相信很多朋友,在实际的业务场景中,对碰到过一个字段同时存在多个语种的文本内容的情况,尤其是出海的产品,比如类似大众点评的 APP 吧,一个餐馆下面,来自七洲五湖四海的朋友都来品尝过了,自然要留下点评语是不,德国的朋友使用的是德语,法国的朋友使用的是法语,广州的朋友用的是粤语,那对于开发这个 APP 的后台工程师可就犯难了,如果这些评论都存在一个字段里面,就不好设置一个统一的分词器,因为不同的语言他们的切分规则肯定是不一样的,最简单的例子,比如中文和英文,设置的分词不对,查询结果就会不精准。

相信也有很多人用过这样的解决方案,既然一个字段搞不定,那就把这个字段复制几份,英文字段存一份,中文字段存一份,用 MultiField 来做,这样虽然可以解决一部分问题,但是同样会带来容量和资源的浪费,和查询时候具体该选择哪个字段来参与查询的挑战。

而利用 7.6 的这个新功能,可以在创建索引的时候,可以自动的根据内容进行推理,从而影响索引文档的构成,进而做到特定的文本进特定的字段,从而提升查询体验和性能,关于这个功能,Elastic 官网这里也有一篇博客2,提供了详细的例子。

实战上手

看上去不错,但是鲁迅说过,网上得来终觉浅,觉知此事要躬行,来, 今天一起跑一遍看看具体怎么个用法。

功能剖析

首先,这个功能叫 Language identification,是机器学习的一个 Feature,但是不能单独使用,要结合 Ingest Node 的一个 inference ingest processor 来使用,Ingest processor 是在 Elasticsearch 里面运行的数据预处理器,部分功能类似于 Logstash 的数据解析,对于简单数据操作场景,完全可以替代掉 Logstash,简化部署架构。

Elasticsearch 在 7.6 的包里面,默认打包了提前训练好的机器学习模型,就是 Language identification 需要调用的语言检测模型,名称是固定的 lang_ident_model_1,这也是 Elasticsearch 自带的第一个模型包,大家了解一下就好。

那这个模型包在什么位置呢,我们来解刨一下:

$unzip /usr/share/elasticsearch/modules/x-pack-ml/x-pack-ml-7.6.0.jar 
$/org/elasticsearch/xpack/ml/inference$ tree
.
|-- ingest
|   |-- InferenceProcessor$Factory.class
|   `-- InferenceProcessor.class
|-- loadingservice
|   |-- LocalModel$1.class
|   |-- LocalModel.class
|   |-- Model.class
|   `-- ModelLoadingService.class
`-- persistence
    |-- InferenceInternalIndex.class
    |-- TrainedModelDefinitionDoc$1.class
    |-- TrainedModelDefinitionDoc$Builder.class
    |-- TrainedModelDefinitionDoc.class
    |-- TrainedModelProvider.class
    `-- lang_ident_model_1.json

3 directories, 12 files

可以看到,在 persistence 目录就有这个模型包,是 json 格式的,里面有个压缩的二进制编码后的字段。

Jietu20200306-234807.png

查看模型信息

我们还可以通过新的 API 来获取这个模型信息,以后模型多了之后会比较有用:

GET _ml/inference/lang_ident_model_1
{
  "count" : 1,
  "trained_model_configs" : [
    {
      "model_id" : "lang_ident_model_1",
      "created_by" : "_xpack",
      "version" : "7.6.0",
      "description" : "Model used for identifying language from arbitrary input text.",
      "create_time" : 1575548914594,
      "tags" : [
        "lang_ident",
        "prepackaged"
      ],
      "input" : {
        "field_names" : [
          "text"
        ]
      },
      "estimated_heap_memory_usage_bytes" : 1053992,
      "estimated_operations" : 39629,
      "license_level" : "basic"
    }
  ]
}

Ingest Pipeline 模拟测试

好了,基本的了解就到这里了,我们开始动手吧,既然要和 Ingest 结合使用,自然免不了要定义 Ingest Pipeline,也就是说定一个解析规则,索引的时候会调用这个规则来处理输入的索引文档。Ingest Pipeline 的调试是个问题,好在Ingest 提供了模拟调用的方法,我们测试一下:


POST _ingest/pipeline/_simulate
{
   "pipeline":{
      "processors":[
         {
            "inference":{
               "model_id":"lang_ident_model_1", 
               "inference_config":{
                  "classification":{
                     "num_top_classes":5 
                  }
               },
               "field_mappings":{

               }
            }
         }
      ]
   },
   "docs":[
      {
         "_source":{ 
            "text":"新冠病毒让你在家好好带着,你服不服"
         }
      }
   ]
}

上面是借助 Ingest 的推理 Process 来模拟调用这个机器学习模型进行文本判断的方法,第一部分是设置 processor 的定义,设置了一个 inference processor,也就是要进行语言模型的检测,第二部分 docs 则是输入了一个 json 文档,作为测试的输入源,运行结果如下:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "text" : "新冠病毒让你在家好好带着,你服不服",
          "ml" : {
            "inference" : {
              "top_classes" : [
                {
                  "class_name" : "zh",
                  "class_probability" : 0.9999872511022145,
                  "class_score" : 0.9999872511022145
                },
                {
                  "class_name" : "ja",
                  "class_probability" : 1.061491174235718E-5,
                  "class_score" : 1.061491174235718E-5
                },
                {
                  "class_name" : "hy",
                  "class_probability" : 6.304673023324264E-7,
                  "class_score" : 6.304673023324264E-7
                },
                {
                  "class_name" : "ta",
                  "class_probability" : 4.1374037676410867E-7,
                  "class_score" : 4.1374037676410867E-7
                },
                {
                  "class_name" : "te",
                  "class_probability" : 2.0709260170937159E-7,
                  "class_score" : 2.0709260170937159E-7
                }
              ],
              "predicted_value" : "zh",
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T15:58:44.783736Z"
        }
      }
    }
  ]
}

可以看到,第一条返回结果,zh 表示中文语言类型,可能性为 0.9999872511022145,基本上无限接近肯定了,这个是中文文本,而第二位和剩下的就明显得分比较低了,如果你看到是他们的得分开头是 1.x 和 6.x 等,是不是觉得,不对啊,后面的得分怎么反而大一些,哈哈,你仔细看会发现它后面其实还有 -E 啥的尾巴呢,这个是科学计数法,其实数值远远小于 0。

创建一个 Pipeline

简单模拟倒是证明这个功能 work 了,那具体怎么使用,一起看看吧。

首先创建一个 Pipeline:


PUT _ingest/pipeline/lang_detect_add_tag
{
  "description": "检测文本,添加语种标签",
  "processors": [
    {
      "inference": {
        "model_id": "lang_ident_model_1",
        "inference_config": {
          "classification": {
            "num_top_classes": 2
          }
        },
        "field_mappings": {
            "contents": "text"
        }
      }
    },
    {
      "set": {
        "field": "tag",
        "value": "{{ml.inference.predicted_value}}"
      }
    }
  ]
}

可以看到,我们定义了一个 ID 为 lang_detect_add_tag 的 Ingest Pipeline,并且我们设置了这个推理模型的参数,只返回 2 个分类结果,和设置了 content 字段作为检测对象。同时,我们还定义了一个新的 set processor,这个的意思是设置一个名为 tag 的字段,它的值是来自于一个其它的字段的变量引用,也就是把检测到的文本对应的语种存成一个标签字段。

测试这个 Pipeline

这个 Pipeline 创建完之后,我们同样可以对这个 Pipeline 进行模拟测试,模拟的好处是不会实际创建索引,方便调试。

POST /_ingest/pipeline/lang_detect_add_tag/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "contents": "巴林境内新型冠状病毒肺炎确诊病例累计达56例"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "contents": "Watch live: WHO gives a coronavirus update as global cases top 100,000"
      }
    }
  ]
}

返回结果:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "index",
        "_type" : "_doc",
        "_id" : "id",
        "_source" : {
          "tag" : "zh",
          "contents" : "巴林境内新型冠状病毒肺炎确诊病例累计达56例",
          "ml" : {
            "inference" : {
              "top_classes" : [
                {
                  "class_name" : "zh",
                  "class_probability" : 0.999812378112116,
                  "class_score" : 0.999812378112116
                },
                {
                  "class_name" : "ja",
                  "class_probability" : 1.8175264877915687E-4,
                  "class_score" : 1.8175264877915687E-4
                }
              ],
              "predicted_value" : "zh",
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:21:26.981249Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "index",
        "_type" : "_doc",
        "_id" : "id",
        "_source" : {
          "tag" : "en",
          "contents" : "Watch live: WHO gives a coronavirus update as global cases top 100,000",
          "ml" : {
            "inference" : {
              "top_classes" : [
                {
                  "class_name" : "en",
                  "class_probability" : 0.9896669173070857,
                  "class_score" : 0.9896669173070857
                },
                {
                  "class_name" : "tg",
                  "class_probability" : 0.0033122788575614993,
                  "class_score" : 0.0033122788575614993
                }
              ],
              "predicted_value" : "en",
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:21:26.981261Z"
        }
      }
    }
  ]
}

继续完善 Pipeline

可以看到,两个文档分别都正确识别了语种,并且创建了对应的 tag 字段,不过这个时候,文档里面的 ml 对象字段,就显得有点多余了,可以使用 remove processor 来删除这个字段。

PUT _ingest/pipeline/lang_detect_add_tag
{
  "description": "检测文本,添加语种标签",
  "processors": [
    {
      "inference": {
        "model_id": "lang_ident_model_1",
        "inference_config": {
          "classification": {
            "num_top_classes": 2
          }
        },
        "field_mappings": {
          "contents": "text"
        }
      }
    },
    {
      "set": {
        "field": "tag",
        "value": "{{ml.inference.predicted_value}}"
      }
    },
    {
      "remove": {
        "field": "ml"
      }
    }
  ]
}

索引文档并调用 Pipeline

那索引的时候,怎么使用这个 Pipeline 呢,看下面的例子:


POST news/_doc/1?pipeline=lang_detect_add_tag
{
  "contents":"""
  On Friday, he added: "In a globalised world, the only option is to stand together. All countries should really make sure that we stand together." Meanwhile, Italy—the country worst affected in Europe—reported 41 new COVID-19 deaths in just 24 hours. The country's civil protection agency said on Thursday evening that 3,858 people had been infected and 148 had died.
  """
}

GET news/_doc/1

上面的这个例子就不贴返回值了,大家自己试试。

另外一个例子

那回到最开始的场景,如果要根据检测结果来分别存储文本到不同的字段,怎么做呢,这里贴一下官网博客的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "inference": {
          "model_id": "lang_ident_model_1",
          "inference_config": {
            "classification": {
              "num_top_classes": 1
            }
          },
          "field_mappings": {
            "contents": "text"
          },
          "target_field": "_ml.lang_ident"
        }
      },
      {
        "rename": {
          "field": "contents",
          "target_field": "contents.default"
        }
      },
      {
        "rename": {
          "field": "_ml.lang_ident.predicted_value",
          "target_field": "contents.language"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": "ctx.contents.supported = (['de', 'en', 'ja', 'ko', 'zh'].contains(ctx.contents.language))"
        }
      },
      {
        "set": {
          "if": "ctx.contents.supported",
          "field": "contents.{{contents.language}}",
          "value": "{{contents.default}}",
          "override": false
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "contents": "Das leben ist kein Ponyhof"
      }
    },
    {
      "_source": {
        "contents": "The rain in Spain stays mainly in the plains"
      }
    },
    {
      "_source": {
        "contents": "オリンピック大会"
      }
    },
    {
      "_source": {
        "contents": "로마는 하루아침에 이루어진 것이 아니다"
      }
    },
    {
      "_source": {
        "contents": "授人以鱼不如授人以渔"
      }
    },
    {
      "_source": {
        "contents": "Qui court deux lievres a la fois, n’en prend aucun"
      }
    },
    {
      "_source": {
        "contents": "Lupus non timet canem latrantem"
      }
    },
    {
      "_source": {
        "contents": "This is mostly English but has a touch of Latin since we often just say, Carpe diem"
      }
    }
  ]
}

返回结果:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "de" : "Das leben ist kein Ponyhof",
            "default" : "Das leben ist kein Ponyhof",
            "language" : "de",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "de",
                  "class_probability" : 0.9996006023972855,
                  "class_score" : 0.9996006023972855
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211596Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "en" : "The rain in Spain stays mainly in the plains",
            "default" : "The rain in Spain stays mainly in the plains",
            "language" : "en",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "en",
                  "class_probability" : 0.9988809847231199,
                  "class_score" : 0.9988809847231199
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211611Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "オリンピック大会",
            "language" : "ja",
            "ja" : "オリンピック大会",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "ja",
                  "class_probability" : 0.9993823252841599,
                  "class_score" : 0.9993823252841599
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211618Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "로마는 하루아침에 이루어진 것이 아니다",
            "language" : "ko",
            "ko" : "로마는 하루아침에 이루어진 것이 아니다",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "ko",
                  "class_probability" : 0.9999939196272863,
                  "class_score" : 0.9999939196272863
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211624Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "授人以鱼不如授人以渔",
            "language" : "zh",
            "zh" : "授人以鱼不如授人以渔",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "zh",
                  "class_probability" : 0.9999810103320087,
                  "class_score" : 0.9999810103320087
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211629Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "Qui court deux lievres a la fois, n’en prend aucun",
            "language" : "fr",
            "supported" : false
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "fr",
                  "class_probability" : 0.9999669852240882,
                  "class_score" : 0.9999669852240882
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211635Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "Lupus non timet canem latrantem",
            "language" : "la",
            "supported" : false
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "la",
                  "class_probability" : 0.614050940088811,
                  "class_score" : 0.614050940088811
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.21164Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "en" : "This is mostly English but has a touch of Latin since we often just say, Carpe diem",
            "default" : "This is mostly English but has a touch of Latin since we often just say, Carpe diem",
            "language" : "en",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "en",
                  "class_probability" : 0.9997901768317939,
                  "class_score" : 0.9997901768317939
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211646Z"
        }
      }
    }
  ]
}

可以看到 Ingest Processor 非常灵活,且功能强大,所有的相关操作都可以在 Ingest processor 里面进行处理,再结合脚本做一下规则判断,对原始的字段重命名即可满足我们的文档处理需求。

小结

今天我们聊了聊 Language Identity 这个功能,也聊了聊 Ingest Pipeline 的使用,怎么样,这个功能是不是很赞呀,如果有类似使用场景的朋友,可以自己试试看。另外值得注意的是,如果文本长度太小可能会识别不准,CLD3 设计的文本长度要超过 200 个字符。

相关链接

继续阅读 »

Elasticsearch 新近发布的 7.6 版本里面包含了很多激动人心的功能,而最让我感兴趣的是利用机器学习来自动检测语言的功能。

功能初探

检测文本语言本身不是什么稀奇事,之前做爬虫的时候,就做过对网页正文进行语言的检测,有很多成熟的方案,而最好的就属 Google Chrome 团队开源的 CLD 系列(全名:Compact Language Detector)了,能够检测多达 80 种各种语言,我用过CLD2,是基于 C++ 贝叶斯分类器实现的,而 CLD3 则是基于神经网络实现的,无疑更加准确,这次 Elasticsearch 将这个非常小的功能也直接集成到了默认的发行包里面,对于使用者来说可以说是带来很大的方便。

多语言的痛点

相信很多朋友,在实际的业务场景中,对碰到过一个字段同时存在多个语种的文本内容的情况,尤其是出海的产品,比如类似大众点评的 APP 吧,一个餐馆下面,来自七洲五湖四海的朋友都来品尝过了,自然要留下点评语是不,德国的朋友使用的是德语,法国的朋友使用的是法语,广州的朋友用的是粤语,那对于开发这个 APP 的后台工程师可就犯难了,如果这些评论都存在一个字段里面,就不好设置一个统一的分词器,因为不同的语言他们的切分规则肯定是不一样的,最简单的例子,比如中文和英文,设置的分词不对,查询结果就会不精准。

相信也有很多人用过这样的解决方案,既然一个字段搞不定,那就把这个字段复制几份,英文字段存一份,中文字段存一份,用 MultiField 来做,这样虽然可以解决一部分问题,但是同样会带来容量和资源的浪费,和查询时候具体该选择哪个字段来参与查询的挑战。

而利用 7.6 的这个新功能,可以在创建索引的时候,可以自动的根据内容进行推理,从而影响索引文档的构成,进而做到特定的文本进特定的字段,从而提升查询体验和性能,关于这个功能,Elastic 官网这里也有一篇博客2,提供了详细的例子。

实战上手

看上去不错,但是鲁迅说过,网上得来终觉浅,觉知此事要躬行,来, 今天一起跑一遍看看具体怎么个用法。

功能剖析

首先,这个功能叫 Language identification,是机器学习的一个 Feature,但是不能单独使用,要结合 Ingest Node 的一个 inference ingest processor 来使用,Ingest processor 是在 Elasticsearch 里面运行的数据预处理器,部分功能类似于 Logstash 的数据解析,对于简单数据操作场景,完全可以替代掉 Logstash,简化部署架构。

Elasticsearch 在 7.6 的包里面,默认打包了提前训练好的机器学习模型,就是 Language identification 需要调用的语言检测模型,名称是固定的 lang_ident_model_1,这也是 Elasticsearch 自带的第一个模型包,大家了解一下就好。

那这个模型包在什么位置呢,我们来解刨一下:

$unzip /usr/share/elasticsearch/modules/x-pack-ml/x-pack-ml-7.6.0.jar 
$/org/elasticsearch/xpack/ml/inference$ tree
.
|-- ingest
|   |-- InferenceProcessor$Factory.class
|   `-- InferenceProcessor.class
|-- loadingservice
|   |-- LocalModel$1.class
|   |-- LocalModel.class
|   |-- Model.class
|   `-- ModelLoadingService.class
`-- persistence
    |-- InferenceInternalIndex.class
    |-- TrainedModelDefinitionDoc$1.class
    |-- TrainedModelDefinitionDoc$Builder.class
    |-- TrainedModelDefinitionDoc.class
    |-- TrainedModelProvider.class
    `-- lang_ident_model_1.json

3 directories, 12 files

可以看到,在 persistence 目录就有这个模型包,是 json 格式的,里面有个压缩的二进制编码后的字段。

Jietu20200306-234807.png

查看模型信息

我们还可以通过新的 API 来获取这个模型信息,以后模型多了之后会比较有用:

GET _ml/inference/lang_ident_model_1
{
  "count" : 1,
  "trained_model_configs" : [
    {
      "model_id" : "lang_ident_model_1",
      "created_by" : "_xpack",
      "version" : "7.6.0",
      "description" : "Model used for identifying language from arbitrary input text.",
      "create_time" : 1575548914594,
      "tags" : [
        "lang_ident",
        "prepackaged"
      ],
      "input" : {
        "field_names" : [
          "text"
        ]
      },
      "estimated_heap_memory_usage_bytes" : 1053992,
      "estimated_operations" : 39629,
      "license_level" : "basic"
    }
  ]
}

Ingest Pipeline 模拟测试

好了,基本的了解就到这里了,我们开始动手吧,既然要和 Ingest 结合使用,自然免不了要定义 Ingest Pipeline,也就是说定一个解析规则,索引的时候会调用这个规则来处理输入的索引文档。Ingest Pipeline 的调试是个问题,好在Ingest 提供了模拟调用的方法,我们测试一下:


POST _ingest/pipeline/_simulate
{
   "pipeline":{
      "processors":[
         {
            "inference":{
               "model_id":"lang_ident_model_1", 
               "inference_config":{
                  "classification":{
                     "num_top_classes":5 
                  }
               },
               "field_mappings":{

               }
            }
         }
      ]
   },
   "docs":[
      {
         "_source":{ 
            "text":"新冠病毒让你在家好好带着,你服不服"
         }
      }
   ]
}

上面是借助 Ingest 的推理 Process 来模拟调用这个机器学习模型进行文本判断的方法,第一部分是设置 processor 的定义,设置了一个 inference processor,也就是要进行语言模型的检测,第二部分 docs 则是输入了一个 json 文档,作为测试的输入源,运行结果如下:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "text" : "新冠病毒让你在家好好带着,你服不服",
          "ml" : {
            "inference" : {
              "top_classes" : [
                {
                  "class_name" : "zh",
                  "class_probability" : 0.9999872511022145,
                  "class_score" : 0.9999872511022145
                },
                {
                  "class_name" : "ja",
                  "class_probability" : 1.061491174235718E-5,
                  "class_score" : 1.061491174235718E-5
                },
                {
                  "class_name" : "hy",
                  "class_probability" : 6.304673023324264E-7,
                  "class_score" : 6.304673023324264E-7
                },
                {
                  "class_name" : "ta",
                  "class_probability" : 4.1374037676410867E-7,
                  "class_score" : 4.1374037676410867E-7
                },
                {
                  "class_name" : "te",
                  "class_probability" : 2.0709260170937159E-7,
                  "class_score" : 2.0709260170937159E-7
                }
              ],
              "predicted_value" : "zh",
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T15:58:44.783736Z"
        }
      }
    }
  ]
}

可以看到,第一条返回结果,zh 表示中文语言类型,可能性为 0.9999872511022145,基本上无限接近肯定了,这个是中文文本,而第二位和剩下的就明显得分比较低了,如果你看到是他们的得分开头是 1.x 和 6.x 等,是不是觉得,不对啊,后面的得分怎么反而大一些,哈哈,你仔细看会发现它后面其实还有 -E 啥的尾巴呢,这个是科学计数法,其实数值远远小于 0。

创建一个 Pipeline

简单模拟倒是证明这个功能 work 了,那具体怎么使用,一起看看吧。

首先创建一个 Pipeline:


PUT _ingest/pipeline/lang_detect_add_tag
{
  "description": "检测文本,添加语种标签",
  "processors": [
    {
      "inference": {
        "model_id": "lang_ident_model_1",
        "inference_config": {
          "classification": {
            "num_top_classes": 2
          }
        },
        "field_mappings": {
            "contents": "text"
        }
      }
    },
    {
      "set": {
        "field": "tag",
        "value": "{{ml.inference.predicted_value}}"
      }
    }
  ]
}

可以看到,我们定义了一个 ID 为 lang_detect_add_tag 的 Ingest Pipeline,并且我们设置了这个推理模型的参数,只返回 2 个分类结果,和设置了 content 字段作为检测对象。同时,我们还定义了一个新的 set processor,这个的意思是设置一个名为 tag 的字段,它的值是来自于一个其它的字段的变量引用,也就是把检测到的文本对应的语种存成一个标签字段。

测试这个 Pipeline

这个 Pipeline 创建完之后,我们同样可以对这个 Pipeline 进行模拟测试,模拟的好处是不会实际创建索引,方便调试。

POST /_ingest/pipeline/lang_detect_add_tag/_simulate
{
  "docs": [
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "contents": "巴林境内新型冠状病毒肺炎确诊病例累计达56例"
      }
    },
    {
      "_index": "index",
      "_id": "id",
      "_source": {
        "contents": "Watch live: WHO gives a coronavirus update as global cases top 100,000"
      }
    }
  ]
}

返回结果:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "index",
        "_type" : "_doc",
        "_id" : "id",
        "_source" : {
          "tag" : "zh",
          "contents" : "巴林境内新型冠状病毒肺炎确诊病例累计达56例",
          "ml" : {
            "inference" : {
              "top_classes" : [
                {
                  "class_name" : "zh",
                  "class_probability" : 0.999812378112116,
                  "class_score" : 0.999812378112116
                },
                {
                  "class_name" : "ja",
                  "class_probability" : 1.8175264877915687E-4,
                  "class_score" : 1.8175264877915687E-4
                }
              ],
              "predicted_value" : "zh",
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:21:26.981249Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "index",
        "_type" : "_doc",
        "_id" : "id",
        "_source" : {
          "tag" : "en",
          "contents" : "Watch live: WHO gives a coronavirus update as global cases top 100,000",
          "ml" : {
            "inference" : {
              "top_classes" : [
                {
                  "class_name" : "en",
                  "class_probability" : 0.9896669173070857,
                  "class_score" : 0.9896669173070857
                },
                {
                  "class_name" : "tg",
                  "class_probability" : 0.0033122788575614993,
                  "class_score" : 0.0033122788575614993
                }
              ],
              "predicted_value" : "en",
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:21:26.981261Z"
        }
      }
    }
  ]
}

继续完善 Pipeline

可以看到,两个文档分别都正确识别了语种,并且创建了对应的 tag 字段,不过这个时候,文档里面的 ml 对象字段,就显得有点多余了,可以使用 remove processor 来删除这个字段。

PUT _ingest/pipeline/lang_detect_add_tag
{
  "description": "检测文本,添加语种标签",
  "processors": [
    {
      "inference": {
        "model_id": "lang_ident_model_1",
        "inference_config": {
          "classification": {
            "num_top_classes": 2
          }
        },
        "field_mappings": {
          "contents": "text"
        }
      }
    },
    {
      "set": {
        "field": "tag",
        "value": "{{ml.inference.predicted_value}}"
      }
    },
    {
      "remove": {
        "field": "ml"
      }
    }
  ]
}

索引文档并调用 Pipeline

那索引的时候,怎么使用这个 Pipeline 呢,看下面的例子:


POST news/_doc/1?pipeline=lang_detect_add_tag
{
  "contents":"""
  On Friday, he added: "In a globalised world, the only option is to stand together. All countries should really make sure that we stand together." Meanwhile, Italy—the country worst affected in Europe—reported 41 new COVID-19 deaths in just 24 hours. The country's civil protection agency said on Thursday evening that 3,858 people had been infected and 148 had died.
  """
}

GET news/_doc/1

上面的这个例子就不贴返回值了,大家自己试试。

另外一个例子

那回到最开始的场景,如果要根据检测结果来分别存储文本到不同的字段,怎么做呢,这里贴一下官网博客的例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "inference": {
          "model_id": "lang_ident_model_1",
          "inference_config": {
            "classification": {
              "num_top_classes": 1
            }
          },
          "field_mappings": {
            "contents": "text"
          },
          "target_field": "_ml.lang_ident"
        }
      },
      {
        "rename": {
          "field": "contents",
          "target_field": "contents.default"
        }
      },
      {
        "rename": {
          "field": "_ml.lang_ident.predicted_value",
          "target_field": "contents.language"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": "ctx.contents.supported = (['de', 'en', 'ja', 'ko', 'zh'].contains(ctx.contents.language))"
        }
      },
      {
        "set": {
          "if": "ctx.contents.supported",
          "field": "contents.{{contents.language}}",
          "value": "{{contents.default}}",
          "override": false
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "contents": "Das leben ist kein Ponyhof"
      }
    },
    {
      "_source": {
        "contents": "The rain in Spain stays mainly in the plains"
      }
    },
    {
      "_source": {
        "contents": "オリンピック大会"
      }
    },
    {
      "_source": {
        "contents": "로마는 하루아침에 이루어진 것이 아니다"
      }
    },
    {
      "_source": {
        "contents": "授人以鱼不如授人以渔"
      }
    },
    {
      "_source": {
        "contents": "Qui court deux lievres a la fois, n’en prend aucun"
      }
    },
    {
      "_source": {
        "contents": "Lupus non timet canem latrantem"
      }
    },
    {
      "_source": {
        "contents": "This is mostly English but has a touch of Latin since we often just say, Carpe diem"
      }
    }
  ]
}

返回结果:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "de" : "Das leben ist kein Ponyhof",
            "default" : "Das leben ist kein Ponyhof",
            "language" : "de",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "de",
                  "class_probability" : 0.9996006023972855,
                  "class_score" : 0.9996006023972855
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211596Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "en" : "The rain in Spain stays mainly in the plains",
            "default" : "The rain in Spain stays mainly in the plains",
            "language" : "en",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "en",
                  "class_probability" : 0.9988809847231199,
                  "class_score" : 0.9988809847231199
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211611Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "オリンピック大会",
            "language" : "ja",
            "ja" : "オリンピック大会",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "ja",
                  "class_probability" : 0.9993823252841599,
                  "class_score" : 0.9993823252841599
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211618Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "로마는 하루아침에 이루어진 것이 아니다",
            "language" : "ko",
            "ko" : "로마는 하루아침에 이루어진 것이 아니다",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "ko",
                  "class_probability" : 0.9999939196272863,
                  "class_score" : 0.9999939196272863
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211624Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "授人以鱼不如授人以渔",
            "language" : "zh",
            "zh" : "授人以鱼不如授人以渔",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "zh",
                  "class_probability" : 0.9999810103320087,
                  "class_score" : 0.9999810103320087
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211629Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "Qui court deux lievres a la fois, n’en prend aucun",
            "language" : "fr",
            "supported" : false
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "fr",
                  "class_probability" : 0.9999669852240882,
                  "class_score" : 0.9999669852240882
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211635Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "default" : "Lupus non timet canem latrantem",
            "language" : "la",
            "supported" : false
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "la",
                  "class_probability" : 0.614050940088811,
                  "class_score" : 0.614050940088811
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.21164Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "contents" : {
            "en" : "This is mostly English but has a touch of Latin since we often just say, Carpe diem",
            "default" : "This is mostly English but has a touch of Latin since we often just say, Carpe diem",
            "language" : "en",
            "supported" : true
          },
          "_ml" : {
            "lang_ident" : {
              "top_classes" : [
                {
                  "class_name" : "en",
                  "class_probability" : 0.9997901768317939,
                  "class_score" : 0.9997901768317939
                }
              ],
              "model_id" : "lang_ident_model_1"
            }
          }
        },
        "_ingest" : {
          "timestamp" : "2020-03-06T16:31:36.211646Z"
        }
      }
    }
  ]
}

可以看到 Ingest Processor 非常灵活,且功能强大,所有的相关操作都可以在 Ingest processor 里面进行处理,再结合脚本做一下规则判断,对原始的字段重命名即可满足我们的文档处理需求。

小结

今天我们聊了聊 Language Identity 这个功能,也聊了聊 Ingest Pipeline 的使用,怎么样,这个功能是不是很赞呀,如果有类似使用场景的朋友,可以自己试试看。另外值得注意的是,如果文本长度太小可能会识别不准,CLD3 设计的文本长度要超过 200 个字符。

相关链接

收起阅读 »

使用Elasticsearch实现同段和同句搜索

同句搜索要求搜索多个关键词时,返回的文章不只要包含关键词,而且这些关键词必须在同一句中。
同段搜素类似,只是范围为同一段落。

SpanQuery

同段、同句搜索,使用常用的term、match查询,没有找到办法可以实现。
Elasticsearch提供了SpanQuery,官方文档中如下的介绍:

Span queries are low-level positional queries which provide expert control over the order and proximity of the specified terms. These are typically used to implement very specific queries on legal documents or patents.

上面提到,SpanQuery常常应用在法律或专利的特定搜索。这些领域,常常提供同段 /同句搜索 。
下面我们看一下三种类型的SpanQuery,能否实现我们的需求:

准备数据

PUT article

POST article/_mapping
{
  "properties": {
    "maincontent": {
      "type": "text"
    }
  }
}

POST article/_doc/1
{
   "maincontent":"the quick red fox jumps over the sleepy cat"
}

POST article/_doc/2
{
   "maincontent":"the quick brown fox jumps over the lazy dog"
}

SpanTermQuery

SpanTermQuery 和 Term Query类似, 下面的查询会返回_id为1的doc。 the quick red fox jumps over the sleepy cat

POST article/_search
{
  "profile": "true",
  "query": {
    "span_term": {
      "maincontent": {
        "value": "red"
      }
    }
  }
}

SpanNearQuery

SpanNearQuery 表示邻近搜索,查找多个term是否邻近,slop可以设置邻近距离,如果设置为0,那么代表两个term是挨着的,相当于matchphase in_order参数,代表文档中的term和查询设置的term保持相同的顺序。

POST article/_search
{
  "query": {
    "span_near": {
      "clauses": [
        {
          "span_term": {
            "maincontent": {
              "value": "quick"
            }
          }
        },
        {
          "span_term": {
            "maincontent": {
              "value": "brown"
            }
          }
        }
      ],
      "slop": 0,
      "in_order": true
    }
  }
}

上面的查询会返回_id为2的doc。

the quick brown fox jumps over the lazy dog

SpanNotQuery

SpanNotQuery非常重要,它要求两个SpanQuery的跨度,不能够重合。
看下面的例子:

  • include: 匹配的SpanQuery,例子为需要一个包含quick和fox两个词的邻近搜索。
  • exclude:设置一个SpanQuery,要求include中的SpanQuery不能包含这个SpanQuery
    POST article/_search
    {
    "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "maincontent": {
                  "value": "quick"
                }
              }
            },
            {
              "span_term": {
                "maincontent": {
                  "value": "fox"
                }
              }
            }
          ],
          "slop": 1,
          "in_order": true
        }
      },
      "exclude": {
        "span_term": {
          "maincontent": {
            "value": "red"
          }
        }
      }
    }
    }
    }

    上面的查询会返回_id为2的doc。
    因为_id为1的文档,虽然quick red fox符合include中的SpanQuery,但是red也符合exclude中的SpanQuery。因此,这篇文章需要排除掉。 the quick red fox jumps over the sleepy cat

同句/同段搜索原理

同句搜索,反向来说,就是搜索词不能够跨句。再进一步,就是搜索词之间不能够有等其他标点符号。
其对应的查询类似如下:

POST article/_search
{
  "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "maincontent": {
                  "value": "word1"
                }
              }
            },
            {
              "span_term": {
                "maincontent": {
                  "value": "word2"
                }
              }
            }
          ],
          "slop": 1,
          "in_order": true
        }
      },
      "exclude": {
        "span_term": {
          "maincontent": {
            "value": "。/?/!"
          }
        }
      }
    }
  }
}

同段搜素类似,对应分隔符变为\n,或者<p>,</p>

同段/同句搜索实现

文本为HTML格式

创建索引

PUT sample1
{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 1,
    "analysis": {
      "analyzer": {
        "maincontent_analyzer": {
          "type": "custom",
          "char_filter": [
            "sentence_paragrah_mapping",
            "html_strip"
          ],
          "tokenizer": "ik_max_word"
        }
      },
      "char_filter": {
        "sentence_paragrah_mapping": {
          "type": "mapping",
          "mappings": [
            """<h1> => \u0020paragraph\u0020""",
            """</h1> => \u0020sentence\u0020paragraph\u0020 """,
            """<h2> => \u0020paragraph\u0020""",
            """</h2> => \u0020sentence\u0020paragraph\u0020 """,
            """<p> => \u0020paragraph\u0020""",
            """</p> => \u0020sentence\u0020paragraph\u0020 """,
            """! => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """。 => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """! => \u0020sentence\u0020"""
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "mainContent": {
        "type": "text",
        "analyzer": "maincontent_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

我们创建了一个名称为sentence_paragrah_mapping的char filter,它的目的有两个:

  • 替换p,h1,h2标签为统一的分段符:paragraph
  • 替换中英文 ,, 标点符号为统一的分页符:sentence

有几个细节,需要说明:

  • paragraphsentence前后都需要添加空格,并且需要使用Unicode \u0020表示空格。

    # 期望
    hello world! => hello world sentence
    # 不合理的配置,可能会出现下面的情况
    hello world! => hello worldsentence
  • </p>,</h1>,</h2>的结尾标签需要添加paragraphsentence两个分隔符,避免结尾没有标点符号的情况
# 期望
<h1>hello world</h1> <p>hello china</p> => paragraph hello world sentence paragraph hello china sentence

# </p>,</h1>,</h2>只使用paragraph替换的结果
# 此时 hello world hello china 为同句
<h1>hello world</h1> <p>hello china</p> => paragraph hello world  paragraph hello china sentence

# 上面配置结果有些冗余:有两个连续的paragraph 
# 如果能保证HTML文本都符合标准,可以只替换</p>,</h1>,</h2>,不替换<p>,<h1>,<h2>
<h1>hello world</h1> <p>hello china</p> => paragraph hello world sentence paragraph paragraph hello china sentence
  • 注意sentence_paragrah_mapping和html_strip的配置顺序

插入测试数据

POST sample1/_doc/1
{
  "mainContent":"<p>java python javascript</p><p>oracle mysql sqlserver</p>"
} 

# 测试分词
POST sample1/_analyze
{
  "text": ["<p>java python javascript</p><p>oracle mysql sqlserver</p>"],
  "analyzer": "maincontent_analyzer"
}

# 返回结果
{
  "tokens" : [
    {
      "token" : "paragraph",
      "start_offset" : 1,
      "end_offset" : 2,
      "type" : "ENGLISH",
      "position" : 0
    },
    {
      "token" : "java",
      "start_offset" : 3,
      "end_offset" : 7,
      "type" : "ENGLISH",
      "position" : 1
    },
    {
      "token" : "python",
      "start_offset" : 8,
      "end_offset" : 14,
      "type" : "ENGLISH",
      "position" : 2
    },
    {
      "token" : "javascript",
      "start_offset" : 15,
      "end_offset" : 25,
      "type" : "ENGLISH",
      "position" : 3
    },
    {
      "token" : "sentence",
      "start_offset" : 26,
      "end_offset" : 28,
      "type" : "ENGLISH",
      "position" : 4
    },
    {
      "token" : "paragraph",
      "start_offset" : 28,
      "end_offset" : 28,
      "type" : "ENGLISH",
      "position" : 5
    },
    {
      "token" : "paragraph",
      "start_offset" : 30,
      "end_offset" : 31,
      "type" : "ENGLISH",
      "position" : 6
    },
    {
      "token" : "oracle",
      "start_offset" : 32,
      "end_offset" : 38,
      "type" : "ENGLISH",
      "position" : 7
    },
    {
      "token" : "mysql",
      "start_offset" : 39,
      "end_offset" : 44,
      "type" : "ENGLISH",
      "position" : 8
    },
    {
      "token" : "sqlserver",
      "start_offset" : 45,
      "end_offset" : 54,
      "type" : "ENGLISH",
      "position" : 9
    },
    {
      "token" : "sentence",
      "start_offset" : 55,
      "end_offset" : 57,
      "type" : "ENGLISH",
      "position" : 10
    },
    {
      "token" : "paragraph",
      "start_offset" : 57,
      "end_offset" : 57,
      "type" : "ENGLISH",
      "position" : 11
    }
  ]
}

测试查询

  • 同段查询:java python
GET sample1/_search
{
  "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "mainContent": {
                  "value": "java"
                }
              }
            },
            {
              "span_term": {
                "mainContent": {
                  "value": "python"
                }
              }
            }
          ],
          "slop": 12,
          "in_order": false
        }
      },
      "exclude": {
        "span_term": {
          "mainContent": {
            "value": "paragraph"
          }
        }
      }
    }
  }
}

//结果
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.1655603,
    "hits" : [
      {
        "_index" : "sample1",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 0.1655603,
        "_source" : {
          "mainContent" : "<p>java python javascript</p><p>oracle mysql sqlserver</p>"
        }
      }
    ]
  }
}
  • 同段查询:java oracle
GET sample1/_search
{
  "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "mainContent": {
                  "value": "java"
                }
              }
            },
            {
              "span_term": {
                "mainContent": {
                  "value": "oracle"
                }
              }
            }
          ],
          "slop": 12,
          "in_order": false
        }
      },
      "exclude": {
        "span_term": {
          "mainContent": {
            "value": "paragraph"
          }
        }
      }
    }
  }
}

#结果:没有文档返回
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  }
}

纯文本格式

纯文本和HTML的区别是段落分割符不同,使用\n.

创建索引

PUT sample2
{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 1,
    "analysis": {
      "analyzer": {
        "maincontent_analyzer": {
          "type": "custom",
          "char_filter": [
            "sentence_paragrah_mapping"
          ],
          "tokenizer": "ik_max_word"
        }
      },
      "char_filter": {
        "sentence_paragrah_mapping": {
          "type": "mapping",
          "mappings": [
            """\n => \u0020sentence\u0020paragraph\u0020 """,
            """! => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """。 => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """! => \u0020sentence\u0020"""
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "mainContent": {
        "type": "text",
        "analyzer": "maincontent_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

测试分词

POST sample2/_analyze
{
  "text": ["java python javascript\noracle mysql sqlserver"],
  "analyzer": "maincontent_analyzer"
}

# 结果
{
  "tokens" : [
    {
      "token" : "java",
      "start_offset" : 0,
      "end_offset" : 4,
      "type" : "ENGLISH",
      "position" : 0
    },
    {
      "token" : "python",
      "start_offset" : 5,
      "end_offset" : 11,
      "type" : "ENGLISH",
      "position" : 1
    },
    {
      "token" : "javascript",
      "start_offset" : 12,
      "end_offset" : 22,
      "type" : "ENGLISH",
      "position" : 2
    },
    {
      "token" : "sentence",
      "start_offset" : 22,
      "end_offset" : 22,
      "type" : "ENGLISH",
      "position" : 3
    },
    {
      "token" : "paragraph",
      "start_offset" : 22,
      "end_offset" : 22,
      "type" : "ENGLISH",
      "position" : 4
    },
    {
      "token" : "oracle",
      "start_offset" : 23,
      "end_offset" : 29,
      "type" : "ENGLISH",
      "position" : 5
    },
    {
      "token" : "mysql",
      "start_offset" : 30,
      "end_offset" : 35,
      "type" : "ENGLISH",
      "position" : 6
    },
    {
      "token" : "sqlserver",
      "start_offset" : 36,
      "end_offset" : 45,
      "type" : "ENGLISH",
      "position" : 7
    }
  ]
}
继续阅读 »

同句搜索要求搜索多个关键词时,返回的文章不只要包含关键词,而且这些关键词必须在同一句中。
同段搜素类似,只是范围为同一段落。

SpanQuery

同段、同句搜索,使用常用的term、match查询,没有找到办法可以实现。
Elasticsearch提供了SpanQuery,官方文档中如下的介绍:

Span queries are low-level positional queries which provide expert control over the order and proximity of the specified terms. These are typically used to implement very specific queries on legal documents or patents.

上面提到,SpanQuery常常应用在法律或专利的特定搜索。这些领域,常常提供同段 /同句搜索 。
下面我们看一下三种类型的SpanQuery,能否实现我们的需求:

准备数据

PUT article

POST article/_mapping
{
  "properties": {
    "maincontent": {
      "type": "text"
    }
  }
}

POST article/_doc/1
{
   "maincontent":"the quick red fox jumps over the sleepy cat"
}

POST article/_doc/2
{
   "maincontent":"the quick brown fox jumps over the lazy dog"
}

SpanTermQuery

SpanTermQuery 和 Term Query类似, 下面的查询会返回_id为1的doc。 the quick red fox jumps over the sleepy cat

POST article/_search
{
  "profile": "true",
  "query": {
    "span_term": {
      "maincontent": {
        "value": "red"
      }
    }
  }
}

SpanNearQuery

SpanNearQuery 表示邻近搜索,查找多个term是否邻近,slop可以设置邻近距离,如果设置为0,那么代表两个term是挨着的,相当于matchphase in_order参数,代表文档中的term和查询设置的term保持相同的顺序。

POST article/_search
{
  "query": {
    "span_near": {
      "clauses": [
        {
          "span_term": {
            "maincontent": {
              "value": "quick"
            }
          }
        },
        {
          "span_term": {
            "maincontent": {
              "value": "brown"
            }
          }
        }
      ],
      "slop": 0,
      "in_order": true
    }
  }
}

上面的查询会返回_id为2的doc。

the quick brown fox jumps over the lazy dog

SpanNotQuery

SpanNotQuery非常重要,它要求两个SpanQuery的跨度,不能够重合。
看下面的例子:

  • include: 匹配的SpanQuery,例子为需要一个包含quick和fox两个词的邻近搜索。
  • exclude:设置一个SpanQuery,要求include中的SpanQuery不能包含这个SpanQuery
    POST article/_search
    {
    "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "maincontent": {
                  "value": "quick"
                }
              }
            },
            {
              "span_term": {
                "maincontent": {
                  "value": "fox"
                }
              }
            }
          ],
          "slop": 1,
          "in_order": true
        }
      },
      "exclude": {
        "span_term": {
          "maincontent": {
            "value": "red"
          }
        }
      }
    }
    }
    }

    上面的查询会返回_id为2的doc。
    因为_id为1的文档,虽然quick red fox符合include中的SpanQuery,但是red也符合exclude中的SpanQuery。因此,这篇文章需要排除掉。 the quick red fox jumps over the sleepy cat

同句/同段搜索原理

同句搜索,反向来说,就是搜索词不能够跨句。再进一步,就是搜索词之间不能够有等其他标点符号。
其对应的查询类似如下:

POST article/_search
{
  "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "maincontent": {
                  "value": "word1"
                }
              }
            },
            {
              "span_term": {
                "maincontent": {
                  "value": "word2"
                }
              }
            }
          ],
          "slop": 1,
          "in_order": true
        }
      },
      "exclude": {
        "span_term": {
          "maincontent": {
            "value": "。/?/!"
          }
        }
      }
    }
  }
}

同段搜素类似,对应分隔符变为\n,或者<p>,</p>

同段/同句搜索实现

文本为HTML格式

创建索引

PUT sample1
{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 1,
    "analysis": {
      "analyzer": {
        "maincontent_analyzer": {
          "type": "custom",
          "char_filter": [
            "sentence_paragrah_mapping",
            "html_strip"
          ],
          "tokenizer": "ik_max_word"
        }
      },
      "char_filter": {
        "sentence_paragrah_mapping": {
          "type": "mapping",
          "mappings": [
            """<h1> => \u0020paragraph\u0020""",
            """</h1> => \u0020sentence\u0020paragraph\u0020 """,
            """<h2> => \u0020paragraph\u0020""",
            """</h2> => \u0020sentence\u0020paragraph\u0020 """,
            """<p> => \u0020paragraph\u0020""",
            """</p> => \u0020sentence\u0020paragraph\u0020 """,
            """! => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """。 => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """! => \u0020sentence\u0020"""
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "mainContent": {
        "type": "text",
        "analyzer": "maincontent_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

我们创建了一个名称为sentence_paragrah_mapping的char filter,它的目的有两个:

  • 替换p,h1,h2标签为统一的分段符:paragraph
  • 替换中英文 ,, 标点符号为统一的分页符:sentence

有几个细节,需要说明:

  • paragraphsentence前后都需要添加空格,并且需要使用Unicode \u0020表示空格。

    # 期望
    hello world! => hello world sentence
    # 不合理的配置,可能会出现下面的情况
    hello world! => hello worldsentence
  • </p>,</h1>,</h2>的结尾标签需要添加paragraphsentence两个分隔符,避免结尾没有标点符号的情况
# 期望
<h1>hello world</h1> <p>hello china</p> => paragraph hello world sentence paragraph hello china sentence

# </p>,</h1>,</h2>只使用paragraph替换的结果
# 此时 hello world hello china 为同句
<h1>hello world</h1> <p>hello china</p> => paragraph hello world  paragraph hello china sentence

# 上面配置结果有些冗余:有两个连续的paragraph 
# 如果能保证HTML文本都符合标准,可以只替换</p>,</h1>,</h2>,不替换<p>,<h1>,<h2>
<h1>hello world</h1> <p>hello china</p> => paragraph hello world sentence paragraph paragraph hello china sentence
  • 注意sentence_paragrah_mapping和html_strip的配置顺序

插入测试数据

POST sample1/_doc/1
{
  "mainContent":"<p>java python javascript</p><p>oracle mysql sqlserver</p>"
} 

# 测试分词
POST sample1/_analyze
{
  "text": ["<p>java python javascript</p><p>oracle mysql sqlserver</p>"],
  "analyzer": "maincontent_analyzer"
}

# 返回结果
{
  "tokens" : [
    {
      "token" : "paragraph",
      "start_offset" : 1,
      "end_offset" : 2,
      "type" : "ENGLISH",
      "position" : 0
    },
    {
      "token" : "java",
      "start_offset" : 3,
      "end_offset" : 7,
      "type" : "ENGLISH",
      "position" : 1
    },
    {
      "token" : "python",
      "start_offset" : 8,
      "end_offset" : 14,
      "type" : "ENGLISH",
      "position" : 2
    },
    {
      "token" : "javascript",
      "start_offset" : 15,
      "end_offset" : 25,
      "type" : "ENGLISH",
      "position" : 3
    },
    {
      "token" : "sentence",
      "start_offset" : 26,
      "end_offset" : 28,
      "type" : "ENGLISH",
      "position" : 4
    },
    {
      "token" : "paragraph",
      "start_offset" : 28,
      "end_offset" : 28,
      "type" : "ENGLISH",
      "position" : 5
    },
    {
      "token" : "paragraph",
      "start_offset" : 30,
      "end_offset" : 31,
      "type" : "ENGLISH",
      "position" : 6
    },
    {
      "token" : "oracle",
      "start_offset" : 32,
      "end_offset" : 38,
      "type" : "ENGLISH",
      "position" : 7
    },
    {
      "token" : "mysql",
      "start_offset" : 39,
      "end_offset" : 44,
      "type" : "ENGLISH",
      "position" : 8
    },
    {
      "token" : "sqlserver",
      "start_offset" : 45,
      "end_offset" : 54,
      "type" : "ENGLISH",
      "position" : 9
    },
    {
      "token" : "sentence",
      "start_offset" : 55,
      "end_offset" : 57,
      "type" : "ENGLISH",
      "position" : 10
    },
    {
      "token" : "paragraph",
      "start_offset" : 57,
      "end_offset" : 57,
      "type" : "ENGLISH",
      "position" : 11
    }
  ]
}

测试查询

  • 同段查询:java python
GET sample1/_search
{
  "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "mainContent": {
                  "value": "java"
                }
              }
            },
            {
              "span_term": {
                "mainContent": {
                  "value": "python"
                }
              }
            }
          ],
          "slop": 12,
          "in_order": false
        }
      },
      "exclude": {
        "span_term": {
          "mainContent": {
            "value": "paragraph"
          }
        }
      }
    }
  }
}

//结果
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.1655603,
    "hits" : [
      {
        "_index" : "sample1",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 0.1655603,
        "_source" : {
          "mainContent" : "<p>java python javascript</p><p>oracle mysql sqlserver</p>"
        }
      }
    ]
  }
}
  • 同段查询:java oracle
GET sample1/_search
{
  "query": {
    "span_not": {
      "include": {
        "span_near": {
          "clauses": [
            {
              "span_term": {
                "mainContent": {
                  "value": "java"
                }
              }
            },
            {
              "span_term": {
                "mainContent": {
                  "value": "oracle"
                }
              }
            }
          ],
          "slop": 12,
          "in_order": false
        }
      },
      "exclude": {
        "span_term": {
          "mainContent": {
            "value": "paragraph"
          }
        }
      }
    }
  }
}

#结果:没有文档返回
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 0,
      "relation" : "eq"
    },
    "max_score" : null,
    "hits" : [ ]
  }
}

纯文本格式

纯文本和HTML的区别是段落分割符不同,使用\n.

创建索引

PUT sample2
{
  "settings": {
    "number_of_replicas": 0,
    "number_of_shards": 1,
    "analysis": {
      "analyzer": {
        "maincontent_analyzer": {
          "type": "custom",
          "char_filter": [
            "sentence_paragrah_mapping"
          ],
          "tokenizer": "ik_max_word"
        }
      },
      "char_filter": {
        "sentence_paragrah_mapping": {
          "type": "mapping",
          "mappings": [
            """\n => \u0020sentence\u0020paragraph\u0020 """,
            """! => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """。 => \u0020sentence\u0020 """,
            """? => \u0020sentence\u0020 """,
            """! => \u0020sentence\u0020"""
          ]
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "mainContent": {
        "type": "text",
        "analyzer": "maincontent_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

测试分词

POST sample2/_analyze
{
  "text": ["java python javascript\noracle mysql sqlserver"],
  "analyzer": "maincontent_analyzer"
}

# 结果
{
  "tokens" : [
    {
      "token" : "java",
      "start_offset" : 0,
      "end_offset" : 4,
      "type" : "ENGLISH",
      "position" : 0
    },
    {
      "token" : "python",
      "start_offset" : 5,
      "end_offset" : 11,
      "type" : "ENGLISH",
      "position" : 1
    },
    {
      "token" : "javascript",
      "start_offset" : 12,
      "end_offset" : 22,
      "type" : "ENGLISH",
      "position" : 2
    },
    {
      "token" : "sentence",
      "start_offset" : 22,
      "end_offset" : 22,
      "type" : "ENGLISH",
      "position" : 3
    },
    {
      "token" : "paragraph",
      "start_offset" : 22,
      "end_offset" : 22,
      "type" : "ENGLISH",
      "position" : 4
    },
    {
      "token" : "oracle",
      "start_offset" : 23,
      "end_offset" : 29,
      "type" : "ENGLISH",
      "position" : 5
    },
    {
      "token" : "mysql",
      "start_offset" : 30,
      "end_offset" : 35,
      "type" : "ENGLISH",
      "position" : 6
    },
    {
      "token" : "sqlserver",
      "start_offset" : 36,
      "end_offset" : 45,
      "type" : "ENGLISH",
      "position" : 7
    }
  ]
}
收起阅读 »

想实现存储与计算分离吗 -----京东ES+ChubaoFS是这样实现的

以下文章来源于微信公众号InfoQ Pro ,作者王行行 张丽颖

Elasticsearch 是一个开源的分布式 RElasticsearchTful 搜索引擎,作为一个分布式、可扩展、实时的搜索与数据分析引擎,它可以快速存储、搜索和分析大量数据。同时,Elasticsearch 也支持具有负责搜索功能和要求的应用程序的基础引擎, 因此可以应用在很多不同的场景中。

1 Elasticsearch 在京东的使用场景

由于较高的性能和较低的使用门槛,京东内部有很多的场景都在使用 Elasticsearch。 2015 年 6 月,京东着手开发了 Elasticsearch 的托管平台——杰思 (JElasticsearch)。杰思平台主要负责 Elasticsearch 集群的部署、运行监控、数据迁移、权限管理、插件开发、集群升级等日常维护工作。

目前杰思平台管理的集群覆盖了京东多条业务线,同时也覆盖了很多应用场景:

1.1 补充关系型数据库的结构化数据查询

主要应用的业务是商品、促销、优惠券、订单、收银台、物流、对账、评论等大数据量查询。此场景的核心诉求是高性能、稳定性和高可用性,部分场景会有检索要求,通常用于加速关系型数据库,业务系统通过 binlog 同步或业务双写完成数据同步。

1.2 全文检索功能

主要的应用场景是应用、安全、风控、交易等操作日志,以及京东部分品类商品搜索。此类日志化场景对写要求很高,查询性能及高可用等要求相对较低,大的业务写会达到数千万 / 秒,存储以 PB 为单位来计算。

这些场景对磁盘、内存有比较高的要求,因此,京东也做了相应优化,用于减少内存消耗,提升磁盘整体使用率,使用更廉价的磁盘来降低成本等等。

1.3 时数据分析引擎,形成统计报表

主要应用的业务是物流单的各种分析、订单数据分析、用户画像等。因为业务数据分析纬度较多,flink、storm 等流式分析对于某些报表场景不太适用,批处理实时性又成为问题,所以近实时分析的 Elasticsearch 就成为了这些业务的选择。

640.png

Image1:Elasticsearch +ChubaoFS 支持京东商城应用场景

在应用 Elasticsearch 的 5 年时间中,京东从最初的几个场景应用变成了覆盖各条业务线,从最初的几台机器变成了现在的上千机器和几千集群的量级,运维压力也随之而来了。目前,京东在日常运维 ELasticsearch 集群时,主要面临以下几个问题:

  • IO 读写不均匀,部分节点 IO 压力非常大;

  • 冷数据节点存储量受限制于单机的最大存储;

  • close 后的索引节点故障无法进行 recovery,导致数据丢失的风险。

为了解决这些问题,京东应用了 ChubaoFS。ChubaoFS 是京东自研的、为云原生应用提供高性能、高可用、可扩展、 稳定性的分布式文件系统,设计初衷是为了京东容器集群提供持久化存储方案,同时也可作为通用云存储供业务方使用,帮助有状态应用实现计算与存储分离。

ChubaoFS 支持多种读写模型,支持多租户,兼容 POSIX 语义和 S3 协议。ChubaoFS 设计的每个 pod 可以共享一个存储卷,或者每个 pod 一个存储卷,当容器所在的物理机宕机后,容器的数据可以随着容器被同时调度到其他宿主机上, 保障数据可靠存储。

6402.png

Image2: Elasticsearch+ChubaoFS=Decouping Compute from Storage

2 Elasticsearch 实例管理演进之路

京东的 Elasticsearch 实例管理也是一个不断摸索、不断爬坑的过程。

2.1 初始阶段

最初,京东 Elasticsearch 集群部署是完全没有架构可言的,集群配置也都采用默认配置,一台物理机启动多个 Elasticsearch 进程,进程间完全共享服务器资源,不同业务之间使用集群进行隔离,这种形式使用服务器 CPU 和内存得到了充分利用。

6401_(1).png

Image3:物理机部署

当系统运行了一段时间之后,这种部署方式的弊端开始显现出了。

  • 实例容易受到其他节点的影响,重要业务抖动问题没有有效方式避免。

  • 物理机内存被 cache 占用,新创建实例启动时耗时特别长。

  • 实例存储受单机磁盘容量限制,数据迁移时有发生。

2.2 容器隔离阶段

由于物理机直接部署 Elasticsearch,无法管理 CPU、内存,各个节点相互影响明显,甚至会影响到稳定性。所以,针对上述情况,京东做出了改善方案——调研资源隔离方式。

当时比较主流的资源隔离方式有两种,Docker 容器化和虚拟机。

2016 年时 Docker 容器化技术已成型,但虚拟技术比较成熟有大量工具、生态系统完善。相对于虚拟机的资源隔离,Docker 不需要实现硬件虚拟化,只是利用 cgroup 对资源进行限制,实际使用的仍然是物理机的资源,所以在资源使用率方面效率更高,我们经过测试使用 Docker 化后性能损失相对较小几乎可以忽略。

Docker 是资源限制,启动时不需要加载操作系统内核,可以毫秒级启动。启动对资源的消耗要低很多,可以做到快速的启停。另外由于是资源限制类,只限制最大使用量而不隔离最小,这样又可以做到虚拟化进行资源超买,提升资源使用率。

而在虚拟机的优势方面,例如安全性,京东采用了内部资源共享平台,通过流程管理或内部其它设施来弥补。这样一来,原本的完全资源隔离优势,反而成为了内部系统资源最大化利用的劣势。

因此,京东选择了当时相对不太成熟的容器化部署方式,并进行了服务器上 Elasticsearcht 资源隔离:

640_(2).png

Image4 Docker 部署图

  1. 内存完全隔离:

    • 数据 / 主数节点:默认按 jvm50%,预留一半给 Lucene 做 cache 使用。

    • 网关 / 主节点:内存容量 -2 做为 jvm 内存,预留 2G 给监控及其它服务。
  2. CPU 隔离:

    • 重要业务直接绑定 CPU,完全避免资源抢占。

    • 一般业务通过调整 cpu-sharElasticsearch、cpu-period、cpu-quota 进行 CPU 比例分配。
  3. IO 隔离:

    • 由于生产环境机器的多样性,磁盘 IO 本身差别很大,另外对 IO 的限制会造成 Elasticsearch 读写性能严重下降,出于只针对内部业务的考虑,京东并未对 IO 进行隔离。

    • 通过简单的容器隔离,CPU 抢占现象明显改善。内存完全隔离之后,生产环境中节点之间相互影响很少发生 (IO 差的机器会有 IO 争用),部署方式改造产生了应用的收益。

2.3 无状态实例阶段

随着业务的不断增长,集群数量及消耗的服务器资源成比例上升,京东 Elasticsearch 实例上升为上万个,维护的集群快速增长为上千个,集群规模从几个到几十个不等。

但是整体资源的利用率却相对较低,磁盘使用率仅为 28% 左右,日常平均读写 IO 在 10~20M/ 秒(日志分区 IO 在 60-100M / 秒)。造成资源浪费的原因是集群规模普遍较小,为保证突发情况下,读写请求对 IO 的要求,我们一般会为集群分配较为富余的资源,物理机分配的容器也会控制在一定量级。

我们做个假设,如果大量的服务器 IO 都可以共享,那么某个集群突发请求对 IO 的影响其实可以忽略的。基于这种假设以及对提高磁盘使用率的迫切需要,我们考虑引入了公司内部部署的 ChubaoFS 作为存储,将 Elasticsearch 作为无状态的实例进行存储计算分离。

得益于 ChubaoFS 是为大规模容器集群挂载而设计的通用文件系统,我们几乎是零成本接入的,只需在物理机上安装相应的客户端,就可以将 ChubaoFS 当成本地文件系统来用。集成之后我们对 ChubaoFS 的性能进行了一系列对比。

我们使用 elasticsearch benchmark 测试工具 Elasticsearchrally 分别对 Elasticsearch 使用本地磁盘和 ChubaoFS 进行 benchmark 测试,测试使用了 7 个 elasticsearch 节点,50 个 shard

Elasticsearchrally 测试参数如下:

Elasticsearchrally --pipeline=benchmark-only \--track=pmc \
--track-
params="number_of_replicas:${REPLICA_COUNT},number_of_shards:${SHARD_COUNT}" \--target-hosts=${TARGET_HOSTS} \--report-file=${report_file}

其中 REPLICA_COUNT 0、1、2 分别 代表不同的副本数;SHARD_COUNT 为 50。

从测试结果可以看出,Elasticsearch 集成 ChubaoFS 之后,在不同副本数情况下, index benchmark 性能和本地磁盘差距在 110%~120% 左右,仅有略微的下降;merge benchmark 性能在 replica > 0 时,Elasticsearch 使用 ChubaoFS 优于本地磁盘。refrElasticsearchh 和 flush benchmark 性能 ChubaoFS 不及本地磁盘。

640_(1).png

640_(3).png

640_(4).png

640_(5).png

3 目前使用效果

集成 ChubaoFS 之后,我们先是灰度运行了一段时间,效果表现良好之后,我们将京东日志所有的 Elasticsearch 集群底层全部切换为 ChubaoFS。切换之后,我们在这些方面获得了更好的效果:

3.1 节约资源

在采用 ChubaoFS 之前,我们使用了 500 台物理机器,并且每个机器平时大概有 80% 的磁盘 IO 能力处于闲置状态。采用 ChubaoFS 之后,ChubaoFS 的集群规模约为 50 台,Elasticsearch 托管到公司的容器平台,实现弹性可扩展。

3.2 管理和运维更加简单便捷

采用 ChubaoFS 之后,我们不用再担心某个机器的硬盘故障,或者某个机器的读写负载不均衡的问题。

3.3 GC 频率明显降低

由于 ChubaoFS 底层对文件作了副本支持,业务层 Elasticsearch 将副本置为 0,原先 segment 挤占堆内存导致 FullGC 现象明显,接入 ChubaoFS 后,GC 频率明显降低。

4 参考资料:

5 ChubaoFS 社区交流:

  • Twitter: @ChubaoFSMailing

  • list: chubaofs-maintainers@groups.io

  • Slack: chubaofs.slack.com

6 作者简介:

  1. 王行行: 京东零售计算存储平台架构部架构师,杰思平台 (京东 Elasticsearch) 团队负责人,2015 年加入京东,目前主要负责京东商城智能监控平台底层、杰思平台等基础设施建设。

  2. 张丽颖: CNCF Ambassador,京东零售计算存储部产品经理, 开源项目 ChubaoFS 的 contributor。
继续阅读 »

以下文章来源于微信公众号InfoQ Pro ,作者王行行 张丽颖

Elasticsearch 是一个开源的分布式 RElasticsearchTful 搜索引擎,作为一个分布式、可扩展、实时的搜索与数据分析引擎,它可以快速存储、搜索和分析大量数据。同时,Elasticsearch 也支持具有负责搜索功能和要求的应用程序的基础引擎, 因此可以应用在很多不同的场景中。

1 Elasticsearch 在京东的使用场景

由于较高的性能和较低的使用门槛,京东内部有很多的场景都在使用 Elasticsearch。 2015 年 6 月,京东着手开发了 Elasticsearch 的托管平台——杰思 (JElasticsearch)。杰思平台主要负责 Elasticsearch 集群的部署、运行监控、数据迁移、权限管理、插件开发、集群升级等日常维护工作。

目前杰思平台管理的集群覆盖了京东多条业务线,同时也覆盖了很多应用场景:

1.1 补充关系型数据库的结构化数据查询

主要应用的业务是商品、促销、优惠券、订单、收银台、物流、对账、评论等大数据量查询。此场景的核心诉求是高性能、稳定性和高可用性,部分场景会有检索要求,通常用于加速关系型数据库,业务系统通过 binlog 同步或业务双写完成数据同步。

1.2 全文检索功能

主要的应用场景是应用、安全、风控、交易等操作日志,以及京东部分品类商品搜索。此类日志化场景对写要求很高,查询性能及高可用等要求相对较低,大的业务写会达到数千万 / 秒,存储以 PB 为单位来计算。

这些场景对磁盘、内存有比较高的要求,因此,京东也做了相应优化,用于减少内存消耗,提升磁盘整体使用率,使用更廉价的磁盘来降低成本等等。

1.3 时数据分析引擎,形成统计报表

主要应用的业务是物流单的各种分析、订单数据分析、用户画像等。因为业务数据分析纬度较多,flink、storm 等流式分析对于某些报表场景不太适用,批处理实时性又成为问题,所以近实时分析的 Elasticsearch 就成为了这些业务的选择。

640.png

Image1:Elasticsearch +ChubaoFS 支持京东商城应用场景

在应用 Elasticsearch 的 5 年时间中,京东从最初的几个场景应用变成了覆盖各条业务线,从最初的几台机器变成了现在的上千机器和几千集群的量级,运维压力也随之而来了。目前,京东在日常运维 ELasticsearch 集群时,主要面临以下几个问题:

  • IO 读写不均匀,部分节点 IO 压力非常大;

  • 冷数据节点存储量受限制于单机的最大存储;

  • close 后的索引节点故障无法进行 recovery,导致数据丢失的风险。

为了解决这些问题,京东应用了 ChubaoFS。ChubaoFS 是京东自研的、为云原生应用提供高性能、高可用、可扩展、 稳定性的分布式文件系统,设计初衷是为了京东容器集群提供持久化存储方案,同时也可作为通用云存储供业务方使用,帮助有状态应用实现计算与存储分离。

ChubaoFS 支持多种读写模型,支持多租户,兼容 POSIX 语义和 S3 协议。ChubaoFS 设计的每个 pod 可以共享一个存储卷,或者每个 pod 一个存储卷,当容器所在的物理机宕机后,容器的数据可以随着容器被同时调度到其他宿主机上, 保障数据可靠存储。

6402.png

Image2: Elasticsearch+ChubaoFS=Decouping Compute from Storage

2 Elasticsearch 实例管理演进之路

京东的 Elasticsearch 实例管理也是一个不断摸索、不断爬坑的过程。

2.1 初始阶段

最初,京东 Elasticsearch 集群部署是完全没有架构可言的,集群配置也都采用默认配置,一台物理机启动多个 Elasticsearch 进程,进程间完全共享服务器资源,不同业务之间使用集群进行隔离,这种形式使用服务器 CPU 和内存得到了充分利用。

6401_(1).png

Image3:物理机部署

当系统运行了一段时间之后,这种部署方式的弊端开始显现出了。

  • 实例容易受到其他节点的影响,重要业务抖动问题没有有效方式避免。

  • 物理机内存被 cache 占用,新创建实例启动时耗时特别长。

  • 实例存储受单机磁盘容量限制,数据迁移时有发生。

2.2 容器隔离阶段

由于物理机直接部署 Elasticsearch,无法管理 CPU、内存,各个节点相互影响明显,甚至会影响到稳定性。所以,针对上述情况,京东做出了改善方案——调研资源隔离方式。

当时比较主流的资源隔离方式有两种,Docker 容器化和虚拟机。

2016 年时 Docker 容器化技术已成型,但虚拟技术比较成熟有大量工具、生态系统完善。相对于虚拟机的资源隔离,Docker 不需要实现硬件虚拟化,只是利用 cgroup 对资源进行限制,实际使用的仍然是物理机的资源,所以在资源使用率方面效率更高,我们经过测试使用 Docker 化后性能损失相对较小几乎可以忽略。

Docker 是资源限制,启动时不需要加载操作系统内核,可以毫秒级启动。启动对资源的消耗要低很多,可以做到快速的启停。另外由于是资源限制类,只限制最大使用量而不隔离最小,这样又可以做到虚拟化进行资源超买,提升资源使用率。

而在虚拟机的优势方面,例如安全性,京东采用了内部资源共享平台,通过流程管理或内部其它设施来弥补。这样一来,原本的完全资源隔离优势,反而成为了内部系统资源最大化利用的劣势。

因此,京东选择了当时相对不太成熟的容器化部署方式,并进行了服务器上 Elasticsearcht 资源隔离:

640_(2).png

Image4 Docker 部署图

  1. 内存完全隔离:

    • 数据 / 主数节点:默认按 jvm50%,预留一半给 Lucene 做 cache 使用。

    • 网关 / 主节点:内存容量 -2 做为 jvm 内存,预留 2G 给监控及其它服务。
  2. CPU 隔离:

    • 重要业务直接绑定 CPU,完全避免资源抢占。

    • 一般业务通过调整 cpu-sharElasticsearch、cpu-period、cpu-quota 进行 CPU 比例分配。
  3. IO 隔离:

    • 由于生产环境机器的多样性,磁盘 IO 本身差别很大,另外对 IO 的限制会造成 Elasticsearch 读写性能严重下降,出于只针对内部业务的考虑,京东并未对 IO 进行隔离。

    • 通过简单的容器隔离,CPU 抢占现象明显改善。内存完全隔离之后,生产环境中节点之间相互影响很少发生 (IO 差的机器会有 IO 争用),部署方式改造产生了应用的收益。

2.3 无状态实例阶段

随着业务的不断增长,集群数量及消耗的服务器资源成比例上升,京东 Elasticsearch 实例上升为上万个,维护的集群快速增长为上千个,集群规模从几个到几十个不等。

但是整体资源的利用率却相对较低,磁盘使用率仅为 28% 左右,日常平均读写 IO 在 10~20M/ 秒(日志分区 IO 在 60-100M / 秒)。造成资源浪费的原因是集群规模普遍较小,为保证突发情况下,读写请求对 IO 的要求,我们一般会为集群分配较为富余的资源,物理机分配的容器也会控制在一定量级。

我们做个假设,如果大量的服务器 IO 都可以共享,那么某个集群突发请求对 IO 的影响其实可以忽略的。基于这种假设以及对提高磁盘使用率的迫切需要,我们考虑引入了公司内部部署的 ChubaoFS 作为存储,将 Elasticsearch 作为无状态的实例进行存储计算分离。

得益于 ChubaoFS 是为大规模容器集群挂载而设计的通用文件系统,我们几乎是零成本接入的,只需在物理机上安装相应的客户端,就可以将 ChubaoFS 当成本地文件系统来用。集成之后我们对 ChubaoFS 的性能进行了一系列对比。

我们使用 elasticsearch benchmark 测试工具 Elasticsearchrally 分别对 Elasticsearch 使用本地磁盘和 ChubaoFS 进行 benchmark 测试,测试使用了 7 个 elasticsearch 节点,50 个 shard

Elasticsearchrally 测试参数如下:

Elasticsearchrally --pipeline=benchmark-only \--track=pmc \
--track-
params="number_of_replicas:${REPLICA_COUNT},number_of_shards:${SHARD_COUNT}" \--target-hosts=${TARGET_HOSTS} \--report-file=${report_file}

其中 REPLICA_COUNT 0、1、2 分别 代表不同的副本数;SHARD_COUNT 为 50。

从测试结果可以看出,Elasticsearch 集成 ChubaoFS 之后,在不同副本数情况下, index benchmark 性能和本地磁盘差距在 110%~120% 左右,仅有略微的下降;merge benchmark 性能在 replica > 0 时,Elasticsearch 使用 ChubaoFS 优于本地磁盘。refrElasticsearchh 和 flush benchmark 性能 ChubaoFS 不及本地磁盘。

640_(1).png

640_(3).png

640_(4).png

640_(5).png

3 目前使用效果

集成 ChubaoFS 之后,我们先是灰度运行了一段时间,效果表现良好之后,我们将京东日志所有的 Elasticsearch 集群底层全部切换为 ChubaoFS。切换之后,我们在这些方面获得了更好的效果:

3.1 节约资源

在采用 ChubaoFS 之前,我们使用了 500 台物理机器,并且每个机器平时大概有 80% 的磁盘 IO 能力处于闲置状态。采用 ChubaoFS 之后,ChubaoFS 的集群规模约为 50 台,Elasticsearch 托管到公司的容器平台,实现弹性可扩展。

3.2 管理和运维更加简单便捷

采用 ChubaoFS 之后,我们不用再担心某个机器的硬盘故障,或者某个机器的读写负载不均衡的问题。

3.3 GC 频率明显降低

由于 ChubaoFS 底层对文件作了副本支持,业务层 Elasticsearch 将副本置为 0,原先 segment 挤占堆内存导致 FullGC 现象明显,接入 ChubaoFS 后,GC 频率明显降低。

4 参考资料:

5 ChubaoFS 社区交流:

  • Twitter: @ChubaoFSMailing

  • list: chubaofs-maintainers@groups.io

  • Slack: chubaofs.slack.com

6 作者简介:

  1. 王行行: 京东零售计算存储平台架构部架构师,杰思平台 (京东 Elasticsearch) 团队负责人,2015 年加入京东,目前主要负责京东商城智能监控平台底层、杰思平台等基础设施建设。

  2. 张丽颖: CNCF Ambassador,京东零售计算存储部产品经理, 开源项目 ChubaoFS 的 contributor。
收起阅读 »

玩转Elasticsearch routing功能


Elasticsearch是一个搭建在Lucene搜索引擎库基础之上的搜索服务平台。它在单机的Lucene搜索引擎库基础之上增加了分布式设计,translog等特性,增强了搜索引擎的性能,高可用性,高可扩性等。
Elasticsearch分布式设计的基本思想是Elasticsearch集群由多个服务器节点组成,集群中的一个索引分为多个分片,每个分片可以分配在不同的节点上。其中每个分片都是一个单独的功能完成的Lucene实例,可以独立地进行写入和查询服务,ES中存储的数据分布在集群分片的一个或多个上,其结构简单描述为下图。

在上面的架构图中,集群由三个节点组成,每个节点上有两个分片,想要读写文档就必须知道文档被分配在哪个分片上,这也正是本文要讲的routing功能的作用。

1. 工作原理

1.1 routing参数

routing参数是一个可选参数,默认使用文档的_id值,可以用在INDEX, UPDATE,GET, SEARCH, DELETE等各种操作中。在写入(包括更新)时,用于计算文档所属分片,在查询(GET请求或指定了routing的查询)中用于限制查询范围,提高查询速度。

1.2 计算方法

ES中shardId的计算公式如下:

shardId = hash(_routing) % num_primary_shards

通过该公式可以保证使用相同routing的文档被分配到同一个shard上,当然在默认情况下使用_id作为routing起到将文档均匀分布到多个分片上防止数据倾斜的作用。

1.3 routing_partition_size参数

使用了routing参数可以让routing值相同的文档分配到同一个分片上,从而减少查询时需要查询的shard数,提高查询效率。但是使用该参数容易导致数据倾斜。为此,ES还提供了一个index.routing_partition_size参数(仅当使用routing参数时可用),用于将routing相同的文档映射到集群分片的一个子集上,这样一方面可以减少查询的分片数,另一方面又可以在一定程度上防止数据倾斜。引入该参数后计算公式如下

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

1.4 源码解读

如下为计算文档归属分片的源码,从源码中我们可以看到ES的哈希算法使用的是Murmur3,取模使用的是java的floorMod

version: 6.5
path: org\elasticsearch\cluster\routing\OperationRouting.java

public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id; //默认使用id
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {//使用了routing_partition_size参数
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

2. 存在的问题及解决方案

2.1 数据倾斜

如前面所述,用户使用自定义routing可以控制文档的分配位置,从而达到将相似文档放在同一个或同一批分片的目的,减少查询时的分片个数,提高查询速度。然而,这也意味着数据无法像默认情况那么均匀的分配到各分片和各节点上,从而会导致各节点存储和读写压力分布不均,影响系统的性能和稳定性。对此可以从以下两个方面进行优化

  1. 使用routing_partition_size参数
    如前面所述,该参数可以使routing相同的文档分配到一批分片(集群分片的子集)而不是一个分片上,从而可以从一定程度上减轻数据倾斜的问题。该参数的效果与其值设置的大小有关,当该值等于number_of_shard时,routing将退化为与未指定一样。当然该方法只能减轻数据倾斜,并不能彻底解决。
  2. 合理划分数据和设置routing值
    从前面的分析,我们可以得到文档分片计算的公式,公式中的hash算法和取模算法也已经通过源码获取。因此用户在划分数据时,可以首先明确数据要划分为几类,每一类数据准备划分到哪部分分片上,再结合分片计算公式计算出合理的routing值,当然也可以在routing参数设置之前设置一个自定义hash函数来实现,从而实现数据的均衡分配。
  3. routing前使用自定义hash函数
    很多情况下,用户并不能提前确定数据的分类值,为此可以在分类值和routing值之间设置一个hash函数,保证分类值散列后的值更均匀,使用该值作为routing,从而防止数据倾斜。

2.2 异常行为

ES的id去重是在分片维度进行的,之所以这样做是ES因为默认情况下使用_id作为routing值,这样id相同的文档会被分配到相同的分片上,因此只需要在分片维度做id去重即可保证id的唯一性。
然而当使用了自定义routing后,id相同的文档如果指定了不同的routing是可能被分配到不同的分片上的,从而导致同一个索引中出现两个id一样的文档,这里之所以说“可能”是因为如果不同的routing经过计算后仍然被映射到同一个分片上,去重还是可以生效的。因此这里会出现一个不稳定的情况,即当对id相同routing不同的文档进行写入操作时,有的时候被更新,有的时候会生成两个id相同的文档,具体可以使用下面的操作复现

# 出现两个id一样的情况
POST _bulk
{"index":{"_index":"routing_test","_id":"123","routing":"abc"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test","_id":"123","routing":"xyz"}}
{"name":"lisi","age":22}

GET routing_test/_search

# 相同id被更新的情况
POST _bulk
{"index":{"_index":"routing_test_2","_id":"123","routing":"123"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test_2","_id":"123","routing":"123456"}}
{"name":"lisi","age":22}

GET routing_test_2/_search

以上测试场景在5.6.4, 6.4.3, 6.8.2集群上均验证会出现,在7.2.1集群上没有出现(可能是id去重逻辑发生了变化,这个后续研究一下后更新)。
对于这种场景,虽然在响应行为不一致,但是由于属于未按正常使用方式使用(id相同的文档应该使用相同的routing),也属于可以理解的情况,官方文档上也有对应描述, 参考地址

3. 常规用法

3.1 文档划分及routing指定

  • 明确文档划分
    使用routing是为了让查询时有可能出现在相同结果集的文档被分配到一个或一批分片上。因此首先要先明确哪些文档应该被分配在一起,对于这些文档使用相同的routing值,常规的一些自带分类信息的文档,如学生的班级属性,产品的分类等都可以作为文档划分的依据。
  • 确定各类别的目标分片
    当然这一步不是必须的,但是合理设置各类数据的目标分片,让他们尽量均匀分配,可以防止数据倾斜。因此建议在使用前就明确哪一类数据准备分配在哪一个或一批分片上,然后通过计算给出这类文档的合理routing值
  • routing分布均匀
    在很多场景下分类有哪些值不确定,因此无法明确划分各类数据的分片归属并计算出routing值,对于这种情况,建议可以在routing之前增加一个hash函数,让不同文档分类的值通过哈希尽量散列得更均匀一些,从而保证数据分布平衡。

3.2 routing的使用

  • 写入操作
    文档的PUT, POST, BULK操作均支持routing参数,在请求中带上routing=xxx即可。使用了routing值即可保证使用相同routing值的文档被分配到一个或一批分片上。
  • GET操作
    对于使用了routing写入的文档,在GET时必须指定routing,否则可能导致404,这与GET的实现机制有关,GET请求会先根据routing找到对应的分片再获取文档,如果对写入使用routing的文档GET时没有指定routing,那么会默认使用id进行routing从而大概率无法获得文档。
  • 查询操作
    查询操作可以在body中指定_routing参数(可以指定多个)来进行查询。当然不指定_routing也是可以查询出结果的,不过是遍历所有的分片,指定了_routing后,查询仅会对routing对应的一个或一批索引进行检索,从而提高查询效率,这也是很多用户使用routing的主要目的,查询操作示例如下:
    GET my_index/_search
    {
    "query": {
    "terms": {
      "_routing": [ "user1" ] 
    }
    }
    }
  • UPDATE或DELETE操作
    UPDATE或DELETE操作与GET操作类似,也是先根据routing确定分片,再进行更新或删除操作,因此对于写入使用了routing的文档,必须指定routing,否则会报404响应。

3.3 设置routing为必选参数

从3.2的分析可以看出对于使用routing写入的文档,在进行GET,UPDATE或DELETE操作时如果不指定routing参数会出现异常。为此ES提供了一个索引mapping级别的设置,_routing.required, 来强制用户在INDEX,GET, DELETE,UPDATA一个文档时必须使用routing参数。当然查询时不受该参数的限制的。该参数的设置方式如下:

PUT my_index
{
  "mappings": {
    "_doc": {
      "_routing": {
        "required": true 
      }
    }
  }
}

3.4 routing结合别名使用

别名功能支持设置routing, 如下:

POST /_aliases
{
    "actions" : [
        {
            "add" : {
                 "index" : "test",
                 "alias" : "alias1",
                 "routing" : "1"
            }
        }
    ]
}

还支持查询和写入使用不同的routing,[详情参考] 将routing和别名结合,可以对使用者屏蔽读写时使用routing的细节,降低误操作的风险,提高操作的效率。

routing是ES中相对高阶一些的用法,在用户了解业务数据分布和查询需求的基础之上,可以对查询性能进行优化,然而使用不当会导致数据倾斜,重复ID等问题。本文介绍了routing的原理,问题及使用技巧,希望对大家有帮助,欢迎评论讨论

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

继续阅读 »


Elasticsearch是一个搭建在Lucene搜索引擎库基础之上的搜索服务平台。它在单机的Lucene搜索引擎库基础之上增加了分布式设计,translog等特性,增强了搜索引擎的性能,高可用性,高可扩性等。
Elasticsearch分布式设计的基本思想是Elasticsearch集群由多个服务器节点组成,集群中的一个索引分为多个分片,每个分片可以分配在不同的节点上。其中每个分片都是一个单独的功能完成的Lucene实例,可以独立地进行写入和查询服务,ES中存储的数据分布在集群分片的一个或多个上,其结构简单描述为下图。

在上面的架构图中,集群由三个节点组成,每个节点上有两个分片,想要读写文档就必须知道文档被分配在哪个分片上,这也正是本文要讲的routing功能的作用。

1. 工作原理

1.1 routing参数

routing参数是一个可选参数,默认使用文档的_id值,可以用在INDEX, UPDATE,GET, SEARCH, DELETE等各种操作中。在写入(包括更新)时,用于计算文档所属分片,在查询(GET请求或指定了routing的查询)中用于限制查询范围,提高查询速度。

1.2 计算方法

ES中shardId的计算公式如下:

shardId = hash(_routing) % num_primary_shards

通过该公式可以保证使用相同routing的文档被分配到同一个shard上,当然在默认情况下使用_id作为routing起到将文档均匀分布到多个分片上防止数据倾斜的作用。

1.3 routing_partition_size参数

使用了routing参数可以让routing值相同的文档分配到同一个分片上,从而减少查询时需要查询的shard数,提高查询效率。但是使用该参数容易导致数据倾斜。为此,ES还提供了一个index.routing_partition_size参数(仅当使用routing参数时可用),用于将routing相同的文档映射到集群分片的一个子集上,这样一方面可以减少查询的分片数,另一方面又可以在一定程度上防止数据倾斜。引入该参数后计算公式如下

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards

1.4 源码解读

如下为计算文档归属分片的源码,从源码中我们可以看到ES的哈希算法使用的是Murmur3,取模使用的是java的floorMod

version: 6.5
path: org\elasticsearch\cluster\routing\OperationRouting.java

public static int generateShardId(IndexMetaData indexMetaData, @Nullable String id, @Nullable String routing) {
    final String effectiveRouting;
    final int partitionOffset;

    if (routing == null) {
        assert(indexMetaData.isRoutingPartitionedIndex() == false) : "A routing value is required for gets from a partitioned index";
        effectiveRouting = id; //默认使用id
    } else {
        effectiveRouting = routing;
    }

    if (indexMetaData.isRoutingPartitionedIndex()) {//使用了routing_partition_size参数
        partitionOffset = Math.floorMod(Murmur3HashFunction.hash(id), indexMetaData.getRoutingPartitionSize());
    } else {
        // we would have still got 0 above but this check just saves us an unnecessary hash calculation
        partitionOffset = 0;
    }

    return calculateScaledShardId(indexMetaData, effectiveRouting, partitionOffset);
}

private static int calculateScaledShardId(IndexMetaData indexMetaData, String effectiveRouting, int partitionOffset) {
    final int hash = Murmur3HashFunction.hash(effectiveRouting) + partitionOffset;

    // we don't use IMD#getNumberOfShards since the index might have been shrunk such that we need to use the size
    // of original index to hash documents
    return Math.floorMod(hash, indexMetaData.getRoutingNumShards()) / indexMetaData.getRoutingFactor();
}

2. 存在的问题及解决方案

2.1 数据倾斜

如前面所述,用户使用自定义routing可以控制文档的分配位置,从而达到将相似文档放在同一个或同一批分片的目的,减少查询时的分片个数,提高查询速度。然而,这也意味着数据无法像默认情况那么均匀的分配到各分片和各节点上,从而会导致各节点存储和读写压力分布不均,影响系统的性能和稳定性。对此可以从以下两个方面进行优化

  1. 使用routing_partition_size参数
    如前面所述,该参数可以使routing相同的文档分配到一批分片(集群分片的子集)而不是一个分片上,从而可以从一定程度上减轻数据倾斜的问题。该参数的效果与其值设置的大小有关,当该值等于number_of_shard时,routing将退化为与未指定一样。当然该方法只能减轻数据倾斜,并不能彻底解决。
  2. 合理划分数据和设置routing值
    从前面的分析,我们可以得到文档分片计算的公式,公式中的hash算法和取模算法也已经通过源码获取。因此用户在划分数据时,可以首先明确数据要划分为几类,每一类数据准备划分到哪部分分片上,再结合分片计算公式计算出合理的routing值,当然也可以在routing参数设置之前设置一个自定义hash函数来实现,从而实现数据的均衡分配。
  3. routing前使用自定义hash函数
    很多情况下,用户并不能提前确定数据的分类值,为此可以在分类值和routing值之间设置一个hash函数,保证分类值散列后的值更均匀,使用该值作为routing,从而防止数据倾斜。

2.2 异常行为

ES的id去重是在分片维度进行的,之所以这样做是ES因为默认情况下使用_id作为routing值,这样id相同的文档会被分配到相同的分片上,因此只需要在分片维度做id去重即可保证id的唯一性。
然而当使用了自定义routing后,id相同的文档如果指定了不同的routing是可能被分配到不同的分片上的,从而导致同一个索引中出现两个id一样的文档,这里之所以说“可能”是因为如果不同的routing经过计算后仍然被映射到同一个分片上,去重还是可以生效的。因此这里会出现一个不稳定的情况,即当对id相同routing不同的文档进行写入操作时,有的时候被更新,有的时候会生成两个id相同的文档,具体可以使用下面的操作复现

# 出现两个id一样的情况
POST _bulk
{"index":{"_index":"routing_test","_id":"123","routing":"abc"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test","_id":"123","routing":"xyz"}}
{"name":"lisi","age":22}

GET routing_test/_search

# 相同id被更新的情况
POST _bulk
{"index":{"_index":"routing_test_2","_id":"123","routing":"123"}}
{"name":"zhangsan","age":18}
{"index":{"_index":"routing_test_2","_id":"123","routing":"123456"}}
{"name":"lisi","age":22}

GET routing_test_2/_search

以上测试场景在5.6.4, 6.4.3, 6.8.2集群上均验证会出现,在7.2.1集群上没有出现(可能是id去重逻辑发生了变化,这个后续研究一下后更新)。
对于这种场景,虽然在响应行为不一致,但是由于属于未按正常使用方式使用(id相同的文档应该使用相同的routing),也属于可以理解的情况,官方文档上也有对应描述, 参考地址

3. 常规用法

3.1 文档划分及routing指定

  • 明确文档划分
    使用routing是为了让查询时有可能出现在相同结果集的文档被分配到一个或一批分片上。因此首先要先明确哪些文档应该被分配在一起,对于这些文档使用相同的routing值,常规的一些自带分类信息的文档,如学生的班级属性,产品的分类等都可以作为文档划分的依据。
  • 确定各类别的目标分片
    当然这一步不是必须的,但是合理设置各类数据的目标分片,让他们尽量均匀分配,可以防止数据倾斜。因此建议在使用前就明确哪一类数据准备分配在哪一个或一批分片上,然后通过计算给出这类文档的合理routing值
  • routing分布均匀
    在很多场景下分类有哪些值不确定,因此无法明确划分各类数据的分片归属并计算出routing值,对于这种情况,建议可以在routing之前增加一个hash函数,让不同文档分类的值通过哈希尽量散列得更均匀一些,从而保证数据分布平衡。

3.2 routing的使用

  • 写入操作
    文档的PUT, POST, BULK操作均支持routing参数,在请求中带上routing=xxx即可。使用了routing值即可保证使用相同routing值的文档被分配到一个或一批分片上。
  • GET操作
    对于使用了routing写入的文档,在GET时必须指定routing,否则可能导致404,这与GET的实现机制有关,GET请求会先根据routing找到对应的分片再获取文档,如果对写入使用routing的文档GET时没有指定routing,那么会默认使用id进行routing从而大概率无法获得文档。
  • 查询操作
    查询操作可以在body中指定_routing参数(可以指定多个)来进行查询。当然不指定_routing也是可以查询出结果的,不过是遍历所有的分片,指定了_routing后,查询仅会对routing对应的一个或一批索引进行检索,从而提高查询效率,这也是很多用户使用routing的主要目的,查询操作示例如下:
    GET my_index/_search
    {
    "query": {
    "terms": {
      "_routing": [ "user1" ] 
    }
    }
    }
  • UPDATE或DELETE操作
    UPDATE或DELETE操作与GET操作类似,也是先根据routing确定分片,再进行更新或删除操作,因此对于写入使用了routing的文档,必须指定routing,否则会报404响应。

3.3 设置routing为必选参数

从3.2的分析可以看出对于使用routing写入的文档,在进行GET,UPDATE或DELETE操作时如果不指定routing参数会出现异常。为此ES提供了一个索引mapping级别的设置,_routing.required, 来强制用户在INDEX,GET, DELETE,UPDATA一个文档时必须使用routing参数。当然查询时不受该参数的限制的。该参数的设置方式如下:

PUT my_index
{
  "mappings": {
    "_doc": {
      "_routing": {
        "required": true 
      }
    }
  }
}

3.4 routing结合别名使用

别名功能支持设置routing, 如下:

POST /_aliases
{
    "actions" : [
        {
            "add" : {
                 "index" : "test",
                 "alias" : "alias1",
                 "routing" : "1"
            }
        }
    ]
}

还支持查询和写入使用不同的routing,[详情参考] 将routing和别名结合,可以对使用者屏蔽读写时使用routing的细节,降低误操作的风险,提高操作的效率。

routing是ES中相对高阶一些的用法,在用户了解业务数据分布和查询需求的基础之上,可以对查询性能进行优化,然而使用不当会导致数据倾斜,重复ID等问题。本文介绍了routing的原理,问题及使用技巧,希望对大家有帮助,欢迎评论讨论

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

收起阅读 »

Elasticsearch冷热分离原理和实践


性能与容量之间的矛盾由来已久,计算机的多级存储体系就是其中一个经典的例子,同样的问题在Elasticsearch中也存在。为了保证Elasticsearch的读写性能,官方建议磁盘使用SSD固态硬盘。然而Elasticsearch要解决的是海量数据的存储和检索问题,海量的数据就意味需要大量的存储空间,如果都使用SSD固态硬盘成本将成为一个很大的问题,这也是制约许多企业和个人使用Elasticsearch的因素之一。为了解决这个问题,Elasticsearch冷热分离架构应运而生。

1. 实现原理

1.1 节点异构

传统的Elasticsearch集群中所有节点均采用相同的配置,然而Elasticsearch并没有对节点的规格一致性做要求,换而言之就是每个节点可以是任意规格,当然这样做会导致集群各节点性能不一致,影响集群稳定性。但是如果有规则的将集群的节点分成不同类型,部分是高性能的节点用于存储热点数据,部分是性能相对差些的大容量节点用于存储冷数据,却可以一方面保证热数据的性能,另一方面保证冷数据的存储,降低存储成本,这也是Elasticsearch冷热分离架构的基本思想,如下图为一个3热节点,2冷节点的冷热分离Elasticsearch集群:

其中热节点为16核64GB 1TB SSD盘,用于满足对热数据对读写性能的要求,冷节点为8C32GB 5TB HDD在保证一定读写性能的基础之上提供了成本较低的大存储HDD盘来满足冷节点对数据存储的需求。

1.2 数据分布

集群节点异构后接着要考虑的是数据分布问题,即用户如何对冷热数据进行标识,并将冷数据移动到冷节点,热数据移动到热节点。

节点指定冷热属性

仅仅将不同的节点设置为不同的规格还不够,为了能明确区分出哪些节点是热节点,哪些节点是冷节点,需要为对应节点打标签
Elasticsearch支持给节点打标签,具体方式是在elasticsearch.yml文件中增加

node.attr.{attribute}: {value}

配置。其中attribute为用户自定义的任意标签名,value为该节点对应的该标签的值,例如对于冷热分离,可以使用如下设置

node.attr.temperature: hot //热节点
node.attr.temperature: warm //冷节点

ps:中文通常叫冷热,英文叫hot/warm

索引指定冷热属性

节点有了冷热属性后,接下来就是指定数据的冷热属性,来设置和调整数据分布。冷热分离方案中数据冷热分布的基本单位是索引,即指定某个索引为热索引,另一个索引为冷索引。通过索引的分布来实现控制数据分布的目的。 Elasticsearch提供了index shard filtering功能(2.x开始),该功能在索引配置中提供了如下几个配置

index.routing.allocation.include.{attribute}
Assign the index to a node whose {attribute} has at least one of the comma-separated values.

index.routing.allocation.require.{attribute}
Assign the index to a node whose {attribute} has all of the comma-separated values.

index.routing.allocation.exclude.{attribute}
Assign the index to a node whose {attribute} has none of the comma-separated values.

用户可以在创建索引,或后续的任意时刻设置这些配置来控制索引在不同标签节点上的分配动作。
index.routing.allocation.include.{attribute}表示索引可以分配在包含多个值中其中一个的节点上。
index.routing.allocation.require.{attribute}表示索引要分配在包含索引指定值的节点上(通常一般设置一个值)。
index.routing.allocation.exclude.{attribute}表示索引只能分配在不包含所有指定值的节点上。

数据分布控制

Elasticsearch的索引分片分配由ShardAllocator决定,ShardAllocator通过在索引分片创建或rebalance时对每个节点调用一系列AllocationDecider来决定是否将节点分配到指定节点上,其中一个AllocationDecider是FilterAllocationDecider,该decider用于应用集群,节点的一些基于attr的分配规则,涉及到节点级别配置的核心代码如下

private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
    if (indexMd.requireFilters() != null) {
        if (indexMd.requireFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
        }
    }
    if (indexMd.includeFilters() != null) {
        if (indexMd.includeFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
        }
    }
    if (indexMd.excludeFilters() != null) {
        if (indexMd.excludeFilters().match(node.node())) {
            return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
        }
    }
    return null;
}

2 冷热集群搭建及使用实践

2.1 集群规格选型

根据业务数据量及读写性能要求选择合适的冷热节点规格

  • 存储量计算:根据冷热数据各自数据量及要求保留时间,计算出冷热数据源数据量,然后使用如下公式计算出冷热节点各自的磁盘需求量
    
    实际空间 = 源数据 * (1 + 副本数量) * (1 + 数据膨胀) / (1 - 内部任务开销) / (1 - 操作系统预留)
        ≈ 源数据 * (1 + 副本数量) * 1.45
    ES建议存储容量 = 源数据 * (1 + 副本数量) * 1.45 * (1 + 预留空间)
        ≈ 源数据 * (1 + 副本数量) * 2.2
  • 副本数量:副本有利于增加数据的可靠性,但同时会增加存储成本。默认和建议的副本数量为1,对于部分可以承受异常情况导致数据丢失的场景,可考虑设置副本数量为0。
  • 数据膨胀:除原始数据外,ES 需要存储索引、列存数据等,在应用编码压缩等技术后,一般膨胀10%。
  • 内部任务开销:ES 占用约20%的磁盘空间,用于 segment 合并、ES Translog、日志等。
  • 操作系统预留:Linux 操作系统默认为 root 用户预留5%的磁盘空间,用于关键流程处理、系统恢复、防止磁盘碎片化问题等。
  • 预留空间:为保证集群的正常运行建议预留50%的存储空间
  • 计算资源预估:
    ES 的计算资源主要消耗在写入和查询过程,而不同业务场景在写入和查询方面的复杂度不同、比重不同,导致计算资源相比存储资源较难评估
    • 日志场景:日志属于典型的写多读少类场景,计算资源主要消耗在写入过程中。我们在日志场景的经验是:2核8GB内存的资源最大可支持0.5w/s的写入能力,但注意不同业务场景可能有偏差。由于实例性能基本随计算资源总量呈线性扩容,您可以按实例资源总量估算写入能力。例如6核24GB内存的资源可支持1.5w/s的写入能力,40核160GB内存的资源可支持10w/s的写入能力。
    • Metric 及 APM 等结构化数据场景:这也是写多读少类场景,但相比日志场景计算资源消耗较小,2核8GB内存的资源一般可支持1w/s的写入能力,您可参照日志场景线性扩展的方式,评估不同规格实例的实际写入能力。
    • 站内搜索及应用搜索等搜索场景:此类为读多写少类场景,计算资源主要消耗在查询过程,由于查询复杂度在不同使用场景差别非常大,计算资源也最难评估,建议您结合存储资源初步选择计算资源,然后在测试过程中验证、调整。

2.2 搭建集群

  • 自建

按照选定冷热节点规格部署服务器,搭建集群,热节点使用SSD盘,冷节点使用HDD盘,对热节点elasticsearcy.yml增加如下配置

node.attr.temperature: hot 

对冷节点增加如下配置

node.attr.temperature: warm

启动集群,冷热分离的Elasticsearch集群即搭建完成

  • 购买云ES服务

腾讯云预计于12月中旬上线冷热分离集群,用户只需要在创建页面上根据需要即可分钟级拉起一个冷热分离架构的ES集群,方便快速,扩展性好,运维成本低

  • 验证 使用如下命令可以验证节点冷热属性
    GET _cat/nodeattrs?v&h=node,attr,value&s=attr:desc
    node        attr        value
    node1   temperature     hot
    node2   temperature     hot
    node3   temperature     warm
    node4   temperature     hot
    node5   temperature     warm
    ...

    可以看到该集群为三热二冷的冷热分离集群(当然要注意如果其中有专用主节点或专用协调节点这类无法分配shard的节点,即使设置了冷热属性也不会有分片可以分配到其上)

3. 为索引设置冷热属性

业务方可以根据实际情况决定索引的冷热属性

  • 对于热数据,索引设置如下
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }
  • 对于冷数据,索引设置
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }
  • 验证
    创建索引
    PUT hot_warm_test_index
    {
    "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 3
    }
    }

    查看分片分配,可以看到分片均匀分配在五个节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 2     r      node2
    hot_data_index 2     p      node3
    hot_data_index 1     r      node4
    hot_data_index 0     p      node5

    设置索引为热索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }

    查看分片分配,发现分片均分配在热节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 0     p      node2
    hot_data_index 2     r      node2
    hot_data_index 2     p      node4
    hot_data_index 1     r      node4

    设置索引为冷索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }

    查看分片分配,发现分片均分配到冷节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node3
    hot_data_index 0     r      node3
    hot_data_index 2     r      node3
    hot_data_index 0     p      node5
    hot_data_index 2     p      node5
    hot_data_index 1     r      node5

4. 索引生命周期管理

从ES6.6开始,Elasticsearch提供索引生命周期管理功能,索引生命周期管理可以通过API或者kibana界面配置,详情参考[index-lifecycle-management], 本文仅通过kibana界面演示如何使用索引生命周期管理结合冷热分离架构实现索引数据的动态管理。
kibana中的索引生命周期管理位置如下图(版本6.8.2):
点击创建create policy,进入配置界面,可以看到索引的生命周期被分为:Hot phrase,Warm phase, Cold phase,Delete phrase四个阶段

  • Hot phrase: 该阶段可以根据索引的文档数,大小,时长决定是否调用rollover API来滚动索引,详情可以参考[indices-rollover-index],因与本文关系不大不再详细赘述。
  • Warm phrase: 当一个索引在Hot phrase被roll over后便会进入Warm phrase,进入该阶段的索引会被设置为read-only, 用户可以为这个索引设置要使用的attribute, 如对于冷热分离策略,这里可以选择temperature: warm属性。另外还可以对索引进行forceMerge, shrink等操作,这两个操作具体可以参考官方文档。
  • Cold phrase: 可以设置当索引rollover一段时间后进入cold阶段,这个阶段也可以设置一个属性。从冷热分离架构可以看出冷热属性是具备扩展性的,不仅可以指定hot, warm, 也可以扩展增加hot, warm, cold, freeze等多个冷热属性。如果想使用三层的冷热分离的话这里可以指定为temperature: cold, 此处还支持对索引的freeze操作,详情参考官方文档。
  • Delete phrase: 可以设置索引rollover一段时间后进入delete阶段,进入该阶段的索引会自动被删除。

冷热分离架构是Elasticsearch的经典架构之一,使用该架构用户可以在保证热数据良好读写性能的同时,仍可以存储海量的数据,极大地丰富了ES的应用场景,解决了用户的成本问题。再结合ES在6.6推出的索引生命周期管理,使得ES集群在使用性和自动化方面表现出色,真正地解决了用户在性能,存储成本,自动化数据管理等方面的问题。

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

继续阅读 »


性能与容量之间的矛盾由来已久,计算机的多级存储体系就是其中一个经典的例子,同样的问题在Elasticsearch中也存在。为了保证Elasticsearch的读写性能,官方建议磁盘使用SSD固态硬盘。然而Elasticsearch要解决的是海量数据的存储和检索问题,海量的数据就意味需要大量的存储空间,如果都使用SSD固态硬盘成本将成为一个很大的问题,这也是制约许多企业和个人使用Elasticsearch的因素之一。为了解决这个问题,Elasticsearch冷热分离架构应运而生。

1. 实现原理

1.1 节点异构

传统的Elasticsearch集群中所有节点均采用相同的配置,然而Elasticsearch并没有对节点的规格一致性做要求,换而言之就是每个节点可以是任意规格,当然这样做会导致集群各节点性能不一致,影响集群稳定性。但是如果有规则的将集群的节点分成不同类型,部分是高性能的节点用于存储热点数据,部分是性能相对差些的大容量节点用于存储冷数据,却可以一方面保证热数据的性能,另一方面保证冷数据的存储,降低存储成本,这也是Elasticsearch冷热分离架构的基本思想,如下图为一个3热节点,2冷节点的冷热分离Elasticsearch集群:

其中热节点为16核64GB 1TB SSD盘,用于满足对热数据对读写性能的要求,冷节点为8C32GB 5TB HDD在保证一定读写性能的基础之上提供了成本较低的大存储HDD盘来满足冷节点对数据存储的需求。

1.2 数据分布

集群节点异构后接着要考虑的是数据分布问题,即用户如何对冷热数据进行标识,并将冷数据移动到冷节点,热数据移动到热节点。

节点指定冷热属性

仅仅将不同的节点设置为不同的规格还不够,为了能明确区分出哪些节点是热节点,哪些节点是冷节点,需要为对应节点打标签
Elasticsearch支持给节点打标签,具体方式是在elasticsearch.yml文件中增加

node.attr.{attribute}: {value}

配置。其中attribute为用户自定义的任意标签名,value为该节点对应的该标签的值,例如对于冷热分离,可以使用如下设置

node.attr.temperature: hot //热节点
node.attr.temperature: warm //冷节点

ps:中文通常叫冷热,英文叫hot/warm

索引指定冷热属性

节点有了冷热属性后,接下来就是指定数据的冷热属性,来设置和调整数据分布。冷热分离方案中数据冷热分布的基本单位是索引,即指定某个索引为热索引,另一个索引为冷索引。通过索引的分布来实现控制数据分布的目的。 Elasticsearch提供了index shard filtering功能(2.x开始),该功能在索引配置中提供了如下几个配置

index.routing.allocation.include.{attribute}
Assign the index to a node whose {attribute} has at least one of the comma-separated values.

index.routing.allocation.require.{attribute}
Assign the index to a node whose {attribute} has all of the comma-separated values.

index.routing.allocation.exclude.{attribute}
Assign the index to a node whose {attribute} has none of the comma-separated values.

用户可以在创建索引,或后续的任意时刻设置这些配置来控制索引在不同标签节点上的分配动作。
index.routing.allocation.include.{attribute}表示索引可以分配在包含多个值中其中一个的节点上。
index.routing.allocation.require.{attribute}表示索引要分配在包含索引指定值的节点上(通常一般设置一个值)。
index.routing.allocation.exclude.{attribute}表示索引只能分配在不包含所有指定值的节点上。

数据分布控制

Elasticsearch的索引分片分配由ShardAllocator决定,ShardAllocator通过在索引分片创建或rebalance时对每个节点调用一系列AllocationDecider来决定是否将节点分配到指定节点上,其中一个AllocationDecider是FilterAllocationDecider,该decider用于应用集群,节点的一些基于attr的分配规则,涉及到节点级别配置的核心代码如下

private Decision shouldIndexFilter(IndexMetaData indexMd, RoutingNode node, RoutingAllocation allocation) {
    if (indexMd.requireFilters() != null) {
        if (indexMd.requireFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_PREFIX, indexMd.requireFilters());
        }
    }
    if (indexMd.includeFilters() != null) {
        if (indexMd.includeFilters().match(node.node()) == false) {
            return allocation.decision(Decision.NO, NAME, "node does not match index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_PREFIX, indexMd.includeFilters());
        }
    }
    if (indexMd.excludeFilters() != null) {
        if (indexMd.excludeFilters().match(node.node())) {
            return allocation.decision(Decision.NO, NAME, "node matches index setting [%s] filters [%s]",
                IndexMetaData.INDEX_ROUTING_EXCLUDE_GROUP_SETTING.getKey(), indexMd.excludeFilters());
        }
    }
    return null;
}

2 冷热集群搭建及使用实践

2.1 集群规格选型

根据业务数据量及读写性能要求选择合适的冷热节点规格

  • 存储量计算:根据冷热数据各自数据量及要求保留时间,计算出冷热数据源数据量,然后使用如下公式计算出冷热节点各自的磁盘需求量
    
    实际空间 = 源数据 * (1 + 副本数量) * (1 + 数据膨胀) / (1 - 内部任务开销) / (1 - 操作系统预留)
        ≈ 源数据 * (1 + 副本数量) * 1.45
    ES建议存储容量 = 源数据 * (1 + 副本数量) * 1.45 * (1 + 预留空间)
        ≈ 源数据 * (1 + 副本数量) * 2.2
  • 副本数量:副本有利于增加数据的可靠性,但同时会增加存储成本。默认和建议的副本数量为1,对于部分可以承受异常情况导致数据丢失的场景,可考虑设置副本数量为0。
  • 数据膨胀:除原始数据外,ES 需要存储索引、列存数据等,在应用编码压缩等技术后,一般膨胀10%。
  • 内部任务开销:ES 占用约20%的磁盘空间,用于 segment 合并、ES Translog、日志等。
  • 操作系统预留:Linux 操作系统默认为 root 用户预留5%的磁盘空间,用于关键流程处理、系统恢复、防止磁盘碎片化问题等。
  • 预留空间:为保证集群的正常运行建议预留50%的存储空间
  • 计算资源预估:
    ES 的计算资源主要消耗在写入和查询过程,而不同业务场景在写入和查询方面的复杂度不同、比重不同,导致计算资源相比存储资源较难评估
    • 日志场景:日志属于典型的写多读少类场景,计算资源主要消耗在写入过程中。我们在日志场景的经验是:2核8GB内存的资源最大可支持0.5w/s的写入能力,但注意不同业务场景可能有偏差。由于实例性能基本随计算资源总量呈线性扩容,您可以按实例资源总量估算写入能力。例如6核24GB内存的资源可支持1.5w/s的写入能力,40核160GB内存的资源可支持10w/s的写入能力。
    • Metric 及 APM 等结构化数据场景:这也是写多读少类场景,但相比日志场景计算资源消耗较小,2核8GB内存的资源一般可支持1w/s的写入能力,您可参照日志场景线性扩展的方式,评估不同规格实例的实际写入能力。
    • 站内搜索及应用搜索等搜索场景:此类为读多写少类场景,计算资源主要消耗在查询过程,由于查询复杂度在不同使用场景差别非常大,计算资源也最难评估,建议您结合存储资源初步选择计算资源,然后在测试过程中验证、调整。

2.2 搭建集群

  • 自建

按照选定冷热节点规格部署服务器,搭建集群,热节点使用SSD盘,冷节点使用HDD盘,对热节点elasticsearcy.yml增加如下配置

node.attr.temperature: hot 

对冷节点增加如下配置

node.attr.temperature: warm

启动集群,冷热分离的Elasticsearch集群即搭建完成

  • 购买云ES服务

腾讯云预计于12月中旬上线冷热分离集群,用户只需要在创建页面上根据需要即可分钟级拉起一个冷热分离架构的ES集群,方便快速,扩展性好,运维成本低

  • 验证 使用如下命令可以验证节点冷热属性
    GET _cat/nodeattrs?v&h=node,attr,value&s=attr:desc
    node        attr        value
    node1   temperature     hot
    node2   temperature     hot
    node3   temperature     warm
    node4   temperature     hot
    node5   temperature     warm
    ...

    可以看到该集群为三热二冷的冷热分离集群(当然要注意如果其中有专用主节点或专用协调节点这类无法分配shard的节点,即使设置了冷热属性也不会有分片可以分配到其上)

3. 为索引设置冷热属性

业务方可以根据实际情况决定索引的冷热属性

  • 对于热数据,索引设置如下
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }
  • 对于冷数据,索引设置
    PUT hot_data_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }
  • 验证
    创建索引
    PUT hot_warm_test_index
    {
    "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 3
    }
    }

    查看分片分配,可以看到分片均匀分配在五个节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 2     r      node2
    hot_data_index 2     p      node3
    hot_data_index 1     r      node4
    hot_data_index 0     p      node5

    设置索引为热索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "hot"
    }

    查看分片分配,发现分片均分配在热节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node1
    hot_data_index 0     r      node1
    hot_data_index 0     p      node2
    hot_data_index 2     r      node2
    hot_data_index 2     p      node4
    hot_data_index 1     r      node4

    设置索引为冷索引

    PUT hot_warm_test_index/_settings
    {
    "index.routing.allocation.require.temperature": "warm"
    }

    查看分片分配,发现分片均分配到冷节点上

    GET _cat/shards/hot_warm_test_index?v&h=index,shard,prirep,node&s=node
    index          shard prirep node
    hot_data_index 1     p      node3
    hot_data_index 0     r      node3
    hot_data_index 2     r      node3
    hot_data_index 0     p      node5
    hot_data_index 2     p      node5
    hot_data_index 1     r      node5

4. 索引生命周期管理

从ES6.6开始,Elasticsearch提供索引生命周期管理功能,索引生命周期管理可以通过API或者kibana界面配置,详情参考[index-lifecycle-management], 本文仅通过kibana界面演示如何使用索引生命周期管理结合冷热分离架构实现索引数据的动态管理。
kibana中的索引生命周期管理位置如下图(版本6.8.2):
点击创建create policy,进入配置界面,可以看到索引的生命周期被分为:Hot phrase,Warm phase, Cold phase,Delete phrase四个阶段

  • Hot phrase: 该阶段可以根据索引的文档数,大小,时长决定是否调用rollover API来滚动索引,详情可以参考[indices-rollover-index],因与本文关系不大不再详细赘述。
  • Warm phrase: 当一个索引在Hot phrase被roll over后便会进入Warm phrase,进入该阶段的索引会被设置为read-only, 用户可以为这个索引设置要使用的attribute, 如对于冷热分离策略,这里可以选择temperature: warm属性。另外还可以对索引进行forceMerge, shrink等操作,这两个操作具体可以参考官方文档。
  • Cold phrase: 可以设置当索引rollover一段时间后进入cold阶段,这个阶段也可以设置一个属性。从冷热分离架构可以看出冷热属性是具备扩展性的,不仅可以指定hot, warm, 也可以扩展增加hot, warm, cold, freeze等多个冷热属性。如果想使用三层的冷热分离的话这里可以指定为temperature: cold, 此处还支持对索引的freeze操作,详情参考官方文档。
  • Delete phrase: 可以设置索引rollover一段时间后进入delete阶段,进入该阶段的索引会自动被删除。

冷热分离架构是Elasticsearch的经典架构之一,使用该架构用户可以在保证热数据良好读写性能的同时,仍可以存储海量的数据,极大地丰富了ES的应用场景,解决了用户的成本问题。再结合ES在6.6推出的索引生命周期管理,使得ES集群在使用性和自动化方面表现出色,真正地解决了用户在性能,存储成本,自动化数据管理等方面的问题。

欢迎关注公众号Elastic慕容,和我一起进入Elastic的奇妙世界吧

收起阅读 »

2019年Elasticsearch用户调查

本次用户调查针对的是全国的ES用户,由Elastic官方联合阿里共同发起。调查结果报告会在2019年Elastic开发者大会-北京上正式发布。
poster.JPG

 
本次用户调查针对的是全国的ES用户,由Elastic官方联合阿里共同发起。调查结果报告会在2019年Elastic开发者大会-北京上正式发布。
poster.JPG

 

ES脚本性能优化一例

使用painless脚本为文档自定义打分是很常见的场景,对新人来说也是最容易造成性能问题的地方。本文中使用两个例子简单谈一下脚本性能优化。

目标

ES本身是基于倒排等数据结构实现的查询,因此在做类似Term、Match等可以利用底层数据结构的场景进行查询时,性能是很好的。

脚本和term等查询不一样,无法利用现有的各种数据结构,可以简单理解成循环:

docs = getDocs(xxx); // 获取满足条件的文档列表
for(Doc doc : docs) {
    score = getScoreByScript(doc);
}

因此脚本的性能取决于两个地方:脚本的复杂度和满足条件的文档数

例子1

我们有个场景是查询指定坐标指定范围内的POI列表,例如5公里内的景点列表。

由于我们的距离公式和ES默认的都不一致,如下:

/**
 * 计算距离,返回单位:米
 */
public static Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2) {
    double diffLon = Math.abs(lng1 - lng2);
    if (diffLon > 180)
        diffLon -= 360;
    return Math.sqrt(Math.pow(diffLon, 2) + Math.pow(lat1 - lat2, 2)) * 110.0 * 1000;
}

所以该同学把这段Java代码转成了Painless,在sort里使用这个该方法计算出距离。上线以后发现ES有了很多慢查询,对应的服务也95线、99线也比较高。

原因是其他脚本没有有效地缩小数据量,导致有几百万的数据需要使用该脚本做距离计算,给ES的CPU造成很大压力,查询性能也比较差。

该例子优化起来很简单,即使用ES自带的distance做较大范围的限制,例如需要5公里的数据,可以用ES的plain距离做限制,再加上之前的自定义脚本逻辑。由于ES的plain距离计算性能好很多,因此经过该过滤以后,自定义脚本的文档量少了很多,因此整体性能有了很大提升。

例子2

有个场景是对文章进行搜索,如果文章关联的城市是指定的几个城市,则给额外的加分。例如:

{
    "query": {xxx},
    "sort": [
    {
      "_script": {
        "script": {
          "source": "def score = 0;def cityIds = doc['cityIds']; def paramCityIds = params.cityIds; for (int i=0; i<cityIds.size(); i++){if (paramCityIds.contains(cityIds[i])){score += 100;}} return score;",
          "lang": "painless",
          "params": {
            "cityIds": [2,1,3]
          }
        },
        "type": "number",
        "order": "desc"
      }
    }
    ]
}

问题和例子1一样,该功能的性能比较差。虽然脚本简单,但是满足的文档量比较大,带来的计算量也比较多,因此性能上不去。

这是一个比较常见的场景,问题的根源还是对ES的机制不够了解,优化起来也很简单,想办法利用到倒排就可以了。

ES里有个专门针对改场景的查询:constant_score,因此以上查询可以修改为:

{
    "query": {
        "should": [
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 2
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 1
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 3
                            }
                        },
                        "boost": 5
                     }
            }
        ]
    },
    "sort": [
    {
      "_score": "desc"
    ]
}

性能即可得到极大改善。

继续阅读 »

使用painless脚本为文档自定义打分是很常见的场景,对新人来说也是最容易造成性能问题的地方。本文中使用两个例子简单谈一下脚本性能优化。

目标

ES本身是基于倒排等数据结构实现的查询,因此在做类似Term、Match等可以利用底层数据结构的场景进行查询时,性能是很好的。

脚本和term等查询不一样,无法利用现有的各种数据结构,可以简单理解成循环:

docs = getDocs(xxx); // 获取满足条件的文档列表
for(Doc doc : docs) {
    score = getScoreByScript(doc);
}

因此脚本的性能取决于两个地方:脚本的复杂度和满足条件的文档数

例子1

我们有个场景是查询指定坐标指定范围内的POI列表,例如5公里内的景点列表。

由于我们的距离公式和ES默认的都不一致,如下:

/**
 * 计算距离,返回单位:米
 */
public static Double getDistance(Double lat1, Double lng1, Double lat2, Double lng2) {
    double diffLon = Math.abs(lng1 - lng2);
    if (diffLon > 180)
        diffLon -= 360;
    return Math.sqrt(Math.pow(diffLon, 2) + Math.pow(lat1 - lat2, 2)) * 110.0 * 1000;
}

所以该同学把这段Java代码转成了Painless,在sort里使用这个该方法计算出距离。上线以后发现ES有了很多慢查询,对应的服务也95线、99线也比较高。

原因是其他脚本没有有效地缩小数据量,导致有几百万的数据需要使用该脚本做距离计算,给ES的CPU造成很大压力,查询性能也比较差。

该例子优化起来很简单,即使用ES自带的distance做较大范围的限制,例如需要5公里的数据,可以用ES的plain距离做限制,再加上之前的自定义脚本逻辑。由于ES的plain距离计算性能好很多,因此经过该过滤以后,自定义脚本的文档量少了很多,因此整体性能有了很大提升。

例子2

有个场景是对文章进行搜索,如果文章关联的城市是指定的几个城市,则给额外的加分。例如:

{
    "query": {xxx},
    "sort": [
    {
      "_script": {
        "script": {
          "source": "def score = 0;def cityIds = doc['cityIds']; def paramCityIds = params.cityIds; for (int i=0; i<cityIds.size(); i++){if (paramCityIds.contains(cityIds[i])){score += 100;}} return score;",
          "lang": "painless",
          "params": {
            "cityIds": [2,1,3]
          }
        },
        "type": "number",
        "order": "desc"
      }
    }
    ]
}

问题和例子1一样,该功能的性能比较差。虽然脚本简单,但是满足的文档量比较大,带来的计算量也比较多,因此性能上不去。

这是一个比较常见的场景,问题的根源还是对ES的机制不够了解,优化起来也很简单,想办法利用到倒排就可以了。

ES里有个专门针对改场景的查询:constant_score,因此以上查询可以修改为:

{
    "query": {
        "should": [
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 2
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 1
                            }
                        },
                        "boost": 5
                     }
            },
            {
                    "constant_score": {
                        "filter": {
                            "term": {
                                    "cityIds": 3
                            }
                        },
                        "boost": 5
                     }
            }
        ]
    },
    "sort": [
    {
      "_score": "desc"
    ]
}

性能即可得到极大改善。

收起阅读 »