记一次“访问量超过1000的人数”统计,计算聚合桶的个数

前言

众所周知,在ES中有各种聚合方法能够是数据分析简单、高效。但是在繁杂的聚合方法中找到满足我们需求的那个,需要我们自己去实践。下面我就说明一下“访问量超过1000的人数”统计案例的实现。

需求

ES在使用过程中,我们公司有一个需求,就是需要统计活跃用户数,我们定义活跃用户数为:今日访问量超过1000的用户,所以我们统计活跃用户数的时候需要统计“访问量超过1000的人数”。

之前的做法

第一版统计活跃用户数的方法由于对复杂的聚合统计不熟悉的原因,就把统计分为了两步。 第一步:在ES中使用字段聚合每个用户的访问数量,数量大于1000;

查询语句

{
  "aggs": {
    "user": {
      "terms": {
        "field": "userId.keyword",
        "size": 10000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": "1000"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "startTime": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 203,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 67470,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "admin",
          "doc_count" : 46998
        },
        {
          "key" : "nameless",
          "doc_count" : 8416
        },
        {
          "key" : "li",
          "doc_count" : 2486
        },
        {
          "key" : "liu",
          "doc_count" : 2183
        },
        {
          "key" : "111111",
          "doc_count" : 1281
        }
      ]
    }
  }
}

第二步:从ES中获取第一步的统计结果,然后统计用户桶的个数,达到统计出个数的效果。

改进后的做法

改进后就是直接使用ES的查询,使用了sum_bucket聚合,是计算每个用户的用户ID独立数,也就是每个用户的用户ID独立数都是1,然后用桶聚合求和,得到所有的人数。 参考链接:[sum bucket聚合](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-sum-bucket-aggregation.html)

查询语句

{
  "aggs": {
    "usercount": {
      "sum_bucket": {
        "buckets_path": "usercount-bucket>usercount-metric"
      }
    },
    "usercount-bucket": {
      "terms": {
        "field": "userId.keyword",
        "size": 10,
        "order": {
          "_key": "desc"
        },
        "min_doc_count": "1000"
      },
      "aggs": {
        "usercount-metric": {
          "cardinality": {
            "field": "userId.keyword"
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "x_st": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 106,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 63956,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "usercount-bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "nameless",
          "doc_count" : 8278,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "liu",
          "doc_count" : 2142,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "li",
          "doc_count" : 1928,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "admin",
          "doc_count" : 44395,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "111111",
          "doc_count" : 1281,
          "usercount-metric" : {
            "value" : 1
          }
        }
      ]
    },
    "usercount" : {
      "value" : 5.0
    }
  }
}
继续阅读 »

前言

众所周知,在ES中有各种聚合方法能够是数据分析简单、高效。但是在繁杂的聚合方法中找到满足我们需求的那个,需要我们自己去实践。下面我就说明一下“访问量超过1000的人数”统计案例的实现。

需求

ES在使用过程中,我们公司有一个需求,就是需要统计活跃用户数,我们定义活跃用户数为:今日访问量超过1000的用户,所以我们统计活跃用户数的时候需要统计“访问量超过1000的人数”。

之前的做法

第一版统计活跃用户数的方法由于对复杂的聚合统计不熟悉的原因,就把统计分为了两步。 第一步:在ES中使用字段聚合每个用户的访问数量,数量大于1000;

查询语句

{
  "aggs": {
    "user": {
      "terms": {
        "field": "userId.keyword",
        "size": 10000,
        "order": {
          "_count": "desc"
        },
        "min_doc_count": "1000"
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "startTime": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 203,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 67470,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "user" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "admin",
          "doc_count" : 46998
        },
        {
          "key" : "nameless",
          "doc_count" : 8416
        },
        {
          "key" : "li",
          "doc_count" : 2486
        },
        {
          "key" : "liu",
          "doc_count" : 2183
        },
        {
          "key" : "111111",
          "doc_count" : 1281
        }
      ]
    }
  }
}

第二步:从ES中获取第一步的统计结果,然后统计用户桶的个数,达到统计出个数的效果。

改进后的做法

改进后就是直接使用ES的查询,使用了sum_bucket聚合,是计算每个用户的用户ID独立数,也就是每个用户的用户ID独立数都是1,然后用桶聚合求和,得到所有的人数。 参考链接:[sum bucket聚合](https://www.elastic.co/guide/en/elasticsearch/reference/6.8/search-aggregations-pipeline-sum-bucket-aggregation.html)

查询语句

{
  "aggs": {
    "usercount": {
      "sum_bucket": {
        "buckets_path": "usercount-bucket>usercount-metric"
      }
    },
    "usercount-bucket": {
      "terms": {
        "field": "userId.keyword",
        "size": 10,
        "order": {
          "_key": "desc"
        },
        "min_doc_count": "1000"
      },
      "aggs": {
        "usercount-metric": {
          "cardinality": {
            "field": "userId.keyword"
          }
        }
      }
    }
  },
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "range": {
            "x_st": {
              "gte": "now-4h",
              "lte": "now",
              "format": "epoch_millis"
            }
          }
        }
      ]
    }
  }
}

查询结果

{
  "took" : 106,
  "timed_out" : false,
  "_shards" : {
    "total" : 1565,
    "successful" : 1565,
    "skipped" : 1520,
    "failed" : 0
  },
  "hits" : {
    "total" : 63956,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "usercount-bucket" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "nameless",
          "doc_count" : 8278,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "liu",
          "doc_count" : 2142,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "li",
          "doc_count" : 1928,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "admin",
          "doc_count" : 44395,
          "usercount-metric" : {
            "value" : 1
          }
        },
        {
          "key" : "111111",
          "doc_count" : 1281,
          "usercount-metric" : {
            "value" : 1
          }
        }
      ]
    },
    "usercount" : {
      "value" : 5.0
    }
  }
}
收起阅读 »

ES6.8权限使用配置


概述
ES的权限控制一直ES使用中的一个问题,因为官方之前一直未免费安全性功能。公司要不选择使用其他插件来解决,要不就是只在内网使用。现在在ES6.8及以后版本ES将部分安全性功能免费开放了, 现在我们就6.8版本的【基于角色的访问控制】进行操作、验证。
1、下载安装ELK6.8版本(此处省略)
ES在6.8以后发布的版本才有(7.0是发布在6.8之前的)
2、修改ES配置文件 elasticsearch.yml
在配置文件中添加:
xpack.security.enabled: true
基础版本的安全性功能是默认关闭的。
然后启动ES
./elasticsearch -d
3、设置内置用户密码
参考:内置用户
./bin/elasticsearch-setup-passwords interactive
这里设置的密码要记住,后面会使用到。我们设置简单的密码:123456(密码不能少于6位)
如果是Windows请使用CMD命令行执行
按照提示设置内置用户密码
4、设置kibana用户名密码
在kibana的配置文件kibana.yml里面添加
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
5、然后启动kibana
启动kibana就可以使用用户名与密码进行访问。

Image.png


6、设置logstash用户名和密码
打开配置文件conf,在output中的elasticsearch中添加user、password
例:
output {
elasticsearch {
hosts => ["10.68.24.136:9200","10.68.24.137:9200"]
index => "%{[indexName]}-%{+YYYY.MM.dd}"
user => "logstash_system"
password => "123456"
}
继续阅读 »

概述
ES的权限控制一直ES使用中的一个问题,因为官方之前一直未免费安全性功能。公司要不选择使用其他插件来解决,要不就是只在内网使用。现在在ES6.8及以后版本ES将部分安全性功能免费开放了, 现在我们就6.8版本的【基于角色的访问控制】进行操作、验证。
1、下载安装ELK6.8版本(此处省略)
ES在6.8以后发布的版本才有(7.0是发布在6.8之前的)
2、修改ES配置文件 elasticsearch.yml
在配置文件中添加:
xpack.security.enabled: true
基础版本的安全性功能是默认关闭的。
然后启动ES
./elasticsearch -d
3、设置内置用户密码
参考:内置用户
./bin/elasticsearch-setup-passwords interactive
这里设置的密码要记住,后面会使用到。我们设置简单的密码:123456(密码不能少于6位)
如果是Windows请使用CMD命令行执行
按照提示设置内置用户密码
4、设置kibana用户名密码
在kibana的配置文件kibana.yml里面添加
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
5、然后启动kibana
启动kibana就可以使用用户名与密码进行访问。

Image.png


6、设置logstash用户名和密码
打开配置文件conf,在output中的elasticsearch中添加user、password
例:
output {
elasticsearch {
hosts => ["10.68.24.136:9200","10.68.24.137:9200"]
index => "%{[indexName]}-%{+YYYY.MM.dd}"
user => "logstash_system"
password => "123456"
}
收起阅读 »

从MySQL建立Elasticsearch索引-索引

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-索引

接上文从MySQL建立Elasticsearch索引-引言,本文介绍一下我们实现索引创建时的一些思路。

目标

框架以Jar包的形式提供,通过配置(XML)提供规则,支持插件开发,支持全量、增量等。

因为希望尽量减少开发量,所以不要求使用方提供平表,因此需要考虑如何将平表

概念

  • 全量:即使用全部数据创建一个新的索引
  • 增量:按照时间范围或者给定的规则,把变化的数据同步到ES
  • 插件:本框架内定义了一个插件接口,用于特定需求自行开发,并接入到现有的配置文件中
  • 分批参数:SQL中由框架填充的参数,类似${id}等

配置

以用户(users)为例,配置有三种文件:

  1. users.mapping,保存了索引的配置,每次全量的时候会读取本文件进行索引创建
  2. rule,规则文件,里面主要定义了对应SQL和插件的配置
    1. users-all.rule,全量规则,规则中以主表为主体,支持以Top、Limit的形式对数据进行分批获取,分批参数在SQL里可以定义为${id},框架会自动填充该值,以保证能够拉取到全量的数据
    2. users-inc.rule,按照时间范围进行增量,分批参数支持在SQL中使用${startTime}和${endTime}
    3. users-spec.rule,按照指定的主表的主键,获取数据及更新索引的操作
    4. users-partial.rule,框架接入了阿里的Canal,本规则定义了各种表变化时,如果取到主表Id,以使用spec进行数据更新
  3. users.plugin,以上的rule主要提供了主表数据的获取方法,plugin文件则提供了各种关联表的信息获取方式,可以使用SQL,或者自定义的插件

全量索引实现关键点

为了保证索引的性能、监控、正确性等,实现时进行了以下设计。

(一)索引的维护

ES使用分布式、Replica、Snapshot机制保证索引的有效及集群稳定性。我们综合考虑后决定放弃Snapshot机制,通过定时/不定时创建全新全量索引,索引名字以${indexName}-{yyyy-MM-dd}的格式定义。正进行全量的索引关联上${indexName}_F的别名,正在使用的索引关联上${indexName}的别名,这样代码里可以不用关心应该读取或者使用哪个索引,合适的场景使用合适的别名即可。

(二)索引的性能

此处专指创建索引的性能,ES的性能是老话题了。

首先,为了保证全量的性能,创建索引时会调整mapping参数,类似refresh_interval改成-1,number_of_replicas改成0,添加默认slowlog设置。

索引过程中,控制bulk请求体的总大小,保证合适的分批大小的数据一并提交到ES。如果失败简单重试三次,如果三次都失败则认为失败,退出并删除当前索引,以保证线上ES数据干净准确。

(三)索引的变更

索引完成后,检查当前索引文档数和正使用索引文档数差异,如果大于配置的阈值,则认为此次索引创建有问题,删除索引并退出;做强制合并,减少segments数,提高搜索性能;恢复refresh_interval、number_of_replicas等设置;等待新索引状态变成GREEN后,将${indexName}切换到新的索引上,以使新索引生效。

最后,检查以${indexName}-{yyyy-MM-dd}格式命名的索引有几份,将多于指定个数的较老的索引删除,将多于指定个数的索引状态改为Close。这样可以尽可能提供磁盘内存利用率,减少不必要的损耗,又能在一定程度上保证数据可用。

以上,即完成了全量索引的创建。

增量索引实现关键点

增量索引因为场景比较多,所以规则也分了几种:

  • inc,使用${startTime}和${endTime}在SQL中代替时间范围,框架会自动保存上次执行时间,以使整个增量整体不断滚动更新。实际使用时,因为有多个关联表导致SQL写起来比较复杂,以及部分增量数据过大,对DB有一定压力,因此不推荐使用此种方式。数据相对简单的场景可以使用。
  • spec,在知道主表数据变化范围的时候使用,也是目前我们推荐的一种方式。使用主键查找数据,所有的表都是主键查询,影响范围也很明确。
  • partial,针对canal做了单独封装的规则,用户获取canal的变化类型(更新、删除),变化的主表Id,用于使用spec的方式进行数据更新。

暂时没有使用部分更新的原因是,关联表与主表的映射关系比较复杂,有1:1,1:N,M:N;并且目前按照主键更新数据的方式,对DB等压力不大;加上spec的方式逻辑清晰,有利于维护和开发。

扩展性

SQL不是万能的,比方说我们部分数据需要从其他微服务取,或者SQL逻辑复杂建议代码实现,这时候就可以使用我们的插件机制了。默认框架提供了一个很强大的插件,支持Distinct、Merge、Mapping等功能,可以满足80%的关联数据的场景了。其他的场景以及需要微服务的场景,支持用户自己实现指定插件,通过配置文件,即可接入现有的框架体系中。以此能支持目前我们全部需求。

下一篇,我们将分享我们搜索模块的实现思路。

收起阅读 »

从MySQL建立Elasticsearch索引-引言

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

继续阅读 »

本文始发于知乎Elasticsearch专栏:从MySQL建立Elasticsearch索引-引言

日常工作里,因业务需要大量使用了Elasticsearch。为了简化索引的开发工作,我们需要一个易用可扩展的MySQL到ES的同步框架,在比较了可以找到的各种开源框架&工具后,我们还是选择自行研发了一个,名字简单粗暴:es-common。

背景

16年我接手了并负责了部门所有业务的搜索系统,旧搜索系统是基于Lucene自研实现的一个搜索框架,包含了平表创建、全量索引、增量索引、搜索引擎四个部分基础功能的封装;另有一个管理系统,用于配置索引、字典等信息。框架的实现思路是通过建立平表,简化索引创建的过程。

接手该系统以后,各业务出现井喷式发展,各种需求铺天盖地,同时该系统的不稳定性也开始作妖,最早三个人的小团队每天都疲于奔命。解决现有问题的同时,也在查找更好的解决方案。于是Elasticsearch进入视野。 基于JSON的交互、分布式、数据与逻辑隔离、开箱即用、稳定等特性,使我们确定了向ES转型的计划。每个点都是现有系统没有解决的问题,具体的点就不吐槽了。

选择

我们的需求有:

  1. 支持全量、增量建立索引,以使数据变更较快体现
  2. 支持更新指定文档,以处理突发问题
  3. 索引有版本控制,以防止出现问题无法快速回滚
  4. 尽量减少代码开发,减少出错概率,更专注业务
  5. 简单的出错处理,失败检测等
  6. 支持插件化开发,提供额外数据
  7. 支持简单的规则,类似合并同主键数据
  8. 域名、AUTH不可见,以满足安全要求
  9. 支持非MySQL来源的数据,因为微服务的存在,不是所有数据都能从DB获取到

当时的调用结果已经找不到了,那么按照现在可以搜索到的框架和工具来重新做调研好了。简单搜索了一下MySQL到ES的同步框架或者工具,有以下几个:

这几个工具通过简单的配置,即可建立索引,但分析我们的需求,有以下需求无法满足:

  1. 需要提供DB的URL、User和Password等,我们公司由于安全的考虑,是无法拿到这些参数的;
  2. 有部分数据是通过微服务获取的,单纯的SQL是无法访问微服务的;
  3. 有的表数据量比较大,需要分片多JOB同时处理,JDBC在这点上操作比较麻烦;
  4. 增量是需要按照主键进行更新的,按照时间扫描对表压力比较大;
  5. 类似工具无法走常规发布,有一定的发布风险

基于以上原因,我们选择了自己研发了一个框架,用来满足我们的需求。下篇文章讲介绍es-common的实现思路。

收起阅读 »

Elastic日报 第661期 (2019-07-06)

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

继续阅读 »

1.利用logstash同步关系型数据库和es https://0x7.me/R4OFV

2.针对英语和俄语的词形分析插件 https://0x7.me/9S0J

3.一周热点:GitHub上的那些奇葩项目 https://0x7.me/8B5T

收起阅读 »

邀请参加 Elasticsearch 中文分词器需求调查问卷

Jietu20190624-202208.gif

 亲爱的 Elasticsearch 中文用户:
 
为了能够充分了解您对 Elasticsearch 中文分词需求的基本情况,以便我们更有针对性地改进中文分词下的使用体验,并优化相关分词器,请您抽出几分钟的时间填写以下问卷。问卷题目中的选项没有对错之分,请您根据实际情况或想法进行填写。您的信息将被严格保密,请放心填写!

调研结束后,我们将随机抽出 5 名同学,每人送 Elastic 纪念 T 恤一件。谢谢您对本次调查的参与和支持~
 
问卷填写地址:http://elasticsearch.mikecrm.com/nz0FxmK
 
谢谢!
 
以下是 5 位幸运的参与者:
 
  • 韦先生 *4270
  • 汪科  *9396
  • 乔驰  *3096
  • 彭先生 *4350
  • 张丽斌 *5329

 
再次感谢大家的参与。
继续阅读 »
Jietu20190624-202208.gif

 亲爱的 Elasticsearch 中文用户:
 
为了能够充分了解您对 Elasticsearch 中文分词需求的基本情况,以便我们更有针对性地改进中文分词下的使用体验,并优化相关分词器,请您抽出几分钟的时间填写以下问卷。问卷题目中的选项没有对错之分,请您根据实际情况或想法进行填写。您的信息将被严格保密,请放心填写!

调研结束后,我们将随机抽出 5 名同学,每人送 Elastic 纪念 T 恤一件。谢谢您对本次调查的参与和支持~
 
问卷填写地址:http://elasticsearch.mikecrm.com/nz0FxmK
 
谢谢!
 
以下是 5 位幸运的参与者:
 
  • 韦先生 *4270
  • 汪科  *9396
  • 乔驰  *3096
  • 彭先生 *4350
  • 张丽斌 *5329

 
再次感谢大家的参与。 收起阅读 »

Elastic日报 第632期 (2019-06-04)

1、23条好用的Elasticsearch查询策略。
http://t.cn/R9Sjxf5
2、如何使用Nginx为Elasticsearch设置代理。
http://t.cn/Ai9Oc9Pd
3、Telegraf Elasticsearch插件使用教程。
http://t.cn/Ai9OcpDY

编辑:叮咚光军

归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup
继续阅读 »
1、23条好用的Elasticsearch查询策略。
http://t.cn/R9Sjxf5
2、如何使用Nginx为Elasticsearch设置代理。
http://t.cn/Ai9Oc9Pd
3、Telegraf Elasticsearch插件使用教程。
http://t.cn/Ai9OcpDY

编辑:叮咚光军

归档:https://ela.st/cn-daily-all
订阅:https://ela.st/cn-daily-sub
沙龙:https://ela.st/cn-meetup 收起阅读 »

ES6.* join类型的可行性

ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过
继续阅读 »
ES6将_parent移除缓存join了
{
"产品id":"",
"产品名字":"",
"产品SKU":[
"skuid":""
"sku库存":"",
"sku销量":"",
"SKU经销权":[
{"经销商编号":"1","购买量":"","发货工厂":"","销售组织"},
{"经销商编号":"2","购买量":"","发货工厂":"","销售组织"}
]
],
"产品总销量":"",
"产品总库存":"",
]
}
想通过ES6.* 建立这种一对多 多对多的数据模型  有用过到生产环境的吗  现在测试已经满足子查父  同时根据子的排序  孙节点还没测过 收起阅读 »

百度智能云招聘ElasticSearch研发工程师

工作职责:
  • 负责ElasticSearch相关产品的设计、研发、维护
  • 与相关大数据产品制定大数据解决方案
  •  

职责要求:
  • 211或985大学本科毕业,计算机相关专业
  • 熟悉Java/Go语言开发,熟悉常见的数据结构与算法
  • 掌握网络编程和多线程开发
  • 对ElasticSearch、Lucene、Slor的原理有较深入了解。
  • 有对开源组件有过二次开发经验者优先
  • 由社区贡献经验者优先

联系方式:shouchunbai@baidu.com
 
继续阅读 »
工作职责:
  • 负责ElasticSearch相关产品的设计、研发、维护
  • 与相关大数据产品制定大数据解决方案
  •  

职责要求:
  • 211或985大学本科毕业,计算机相关专业
  • 熟悉Java/Go语言开发,熟悉常见的数据结构与算法
  • 掌握网络编程和多线程开发
  • 对ElasticSearch、Lucene、Slor的原理有较深入了解。
  • 有对开源组件有过二次开发经验者优先
  • 由社区贡献经验者优先

联系方式:shouchunbai@baidu.com
  收起阅读 »

ELK 使用小技巧(第 5 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、'reader' unacceptable character ' ' (0x0)

logstash 执行使用 Jdbc input plugin 后报错:

[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {
:exception=>#<Psych::SyntaxError: (): 'reader' unacceptable character ' ' (0x0) special characters are not allowed in "'reader'", 
position 0 at line 0 column 0>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:232:in parse'"

解决方案:删除 $USER_HOME/.logstash_jdbc_last_run 文件即可。

二、Elasticsearch

1、TermsQuery 与多个 TermQuery 的区别

当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:

Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);

当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。

当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。

2、ES 借助 nginx 配置域名

upstream /data/ {
    server 192.168.187.xxx:9200;
    keepalive 300 ;
}

server {
    listen 80;
    server_name testelk.xx.com;
    keepalive_timeout 120s 120s;
    location /data {
        proxy_pass http://data/;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_pass_header remote_user 
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Nginx-Proxy true;
    }
}

3、ES Reindex 时如何不停止写入服务

方案一:kennywu76

ES 的 reindex 在索引有实时的 update/delete 的情况下,即使借助 alias,也没有办法实现真正的 zero down time。

增加新文档比较好办,通过 alias 切换写入到新索引,同时 reindex 做旧->新索引的数据传输即可;但是 update/delete 操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。

我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似 last_update_time 字段记录文档最后更新的时间,用作写入 ES 文档的版本号,然后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx

其中 version_type=external_gt 表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,否则会抛版本冲突的异常。另外 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档

这样实时数据写入新索引和 reindex 可以同时进行,实时写入的数据应该具有更高的版本,总是能够成功,reindex 如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。

该方案的缺陷:

  • 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改;
  • delete 操作必须转化为写入一个空文档,delete 实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了 segment merge,delete 可能会被合并以后物理清除。这样 delete 和对应的版本信息丢失,之后 reindex 如果写入了旧版本的文档,仍然会有一致性问题;但是空文档会增加索引文件的大小,有额外的消耗,一个可能的缓解办法是在 reindex 全部做完以后,再做一次空文档的删除。

改进方案:the_best

重建索引步骤如下:

  1. 保证 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档;
  2. 对老索引 old_index(业务上的别名还是挂在老索引上)进行重索引操作(version_type=external);
    curl -X POST 'http://<hostname>:9200/_reindex'
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index",
        "size": 1000
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  3. 将别名切到 newIndex;
  4. 将重索引时间段内 old_index 产生的热数据,再捞一次到 new_index 中(conflicts=proceed&version_type=external);
    curl -X POST /_reindex
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index"
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : <reindex开始时刻前的毫秒时间戳>
                        }
                    }
                }
            }
        }
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  5. 手动做一次空文档的删除。

这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过我们可以视具体业务情况灵活操作。比如说数据量比较大重索引我们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,我们可以按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几次,直到别名切完后,我们能保证最后一次的步骤(4)可以在较短时间内完成。

4、ES 节点通讯配置

http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1

5、把 Lucene 的原生 query 传给 ES

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy, 
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q); 
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();

6、ES 文档字段个数限制

ES 文档默认不允许文档字段超过 1000,超过 1000 会报如下错误:

failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded

可以通过修改索引配置来修改字段个数限制,不过还是推荐从业务上进行优化:

修改settings
{
  "index.mapping.total_fields.limit": 2000
}

7、将 DSL 字符串转换为 QueryBuilder

## wrapper 案例
GET /_search
{
    "query" : {
        "wrapper": {
            "query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0=" 
        }
    }
}

## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")

8、ES 集群重启后 Slice Scroll 速度变慢

重启机器之后,pagecache 都没有了,所有数据都要重新从磁盘加载。

9、ES 开启索引新建删除日志

PUT _cluster/settings
{
  "persistent": {
    "logger.cluster.service": "DEBUG"
  }
}

10、慢日志全局级别设定

  1. 对已经存在的索引可以通过 PUT _settings 做存量设置
  2. 对之后新增的索引,可以使用类似于下面的template
    PUT _template/global-slowlog_template
    {
    "order": -1,
    "version": 0,
    "template": "*",
    "settings": {
        "index.indexing.slowlog.threshold.index.debug" : "10ms",
        "index.indexing.slowlog.threshold.index.info" : "50ms",
        "index.indexing.slowlog.threshold.index.warn" : "100ms",
        "index.search.slowlog.threshold.fetch.debug" : "100ms",
        "index.search.slowlog.threshold.fetch.info" : "200ms",
        "index.search.slowlog.threshold.fetch.warn" : "500ms",
        "index.search.slowlog.threshold.query.debug" : "100ms",
        "index.search.slowlog.threshold.query.info" : "200ms",
        "index.search.slowlog.threshold.query.warn" : "1s"
    }
    }

11、TCP 设置多个端口的用途

transport.tcp.port 这个参数不写,默认为 9300-9399,开放那么多 端口有用么?

  • 如果设置一个端口,假设这个端口占用了程序就无法正常启动;
  • 如果设置多个端口,一个端口占用会寻找下一个端口,直至找到可用端口。

12、ES 临时重启,设置分片延迟分配策略

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

三、Kibana

1、kibana 图表自定义标注

可以用 TSVB,支持标注。

Kibana TSVB 注解的使用:https://elasticsearch.cn/article/701

2、Kibana discover 导出 csv 文件

请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV

3、修改 kibana 的默认主页

https://elasticsearch.cn/article/6335

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、'reader' unacceptable character ' ' (0x0)

logstash 执行使用 Jdbc input plugin 后报错:

[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {
:exception=>#<Psych::SyntaxError: (): 'reader' unacceptable character ' ' (0x0) special characters are not allowed in "'reader'", 
position 0 at line 0 column 0>, :backtrace=>["org/jruby/ext/psych/PsychParser.java:232:in parse'"

解决方案:删除 $USER_HOME/.logstash_jdbc_last_run 文件即可。

二、Elasticsearch

1、TermsQuery 与多个 TermQuery 的区别

当 terms 的个数较少的时候,TermsQuery 等效为 ConstantScoreQuery 内部包含多个 TermQuery:

Query q1 = new TermInSetQuery(new Term("field", "foo"), new Term("field", "bar"));
// 等效为下面的语句
BooleanQuery bq = new BooleanQuery();
bq.add(new TermQuery(new Term("field", "foo")), Occur.SHOULD);
bq.add(new TermQuery(new Term("field", "bar")), Occur.SHOULD);
Query q2 = new ConstantScoreQuery(bq);

当 terms 较多的时候,它将使用匹配的文档组合成一个位集,并在该位集上进行评分;此时查询效率比普通的 Bool 合并要更加高效。

当 terms 的个数较多时,TermsQuery 比多个 TermQuery 组合的查询效率更高。

2、ES 借助 nginx 配置域名

upstream /data/ {
    server 192.168.187.xxx:9200;
    keepalive 300 ;
}

server {
    listen 80;
    server_name testelk.xx.com;
    keepalive_timeout 120s 120s;
    location /data {
        proxy_pass http://data/;
        proxy_http_version 1.1;
        proxy_set_header Connection "Keep-Alive";
        proxy_set_header Proxy-Connection "Keep-Alive";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_pass_header remote_user 
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host $http_host;
        proxy_set_header X-Nginx-Proxy true;
    }
}

3、ES Reindex 时如何不停止写入服务

方案一:kennywu76

ES 的 reindex 在索引有实时的 update/delete 的情况下,即使借助 alias,也没有办法实现真正的 zero down time。

增加新文档比较好办,通过 alias 切换写入到新索引,同时 reindex 做旧->新索引的数据传输即可;但是 update/delete 操作针对的文档如果还未从旧索引传输过来,直接对新索引操作会导致两个索引数据不一致。

我能够想到的(一个未经实际验证)的方案,前提是数据库里的文档有一个类似 last_update_time 字段记录文档最后更新的时间,用作写入 ES 文档的版本号,然后数据写入新索引的时候,url 里带上下面这样的参数:version_type=external_gt&version=xxxxxx

其中 version_type=external_gt 表示写入文档的版本号大于已有的文档版本号,或者文档不存在,写入才会成功,否则会抛版本冲突的异常。另外 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档

这样实时数据写入新索引和 reindex 可以同时进行,实时写入的数据应该具有更高的版本,总是能够成功,reindex 如果遇到版本冲突,说明该文档被实时部分更新过了,已经过时,可以直接放弃跳过。

该方案的缺陷:

  • 要求数据源里的数据具有版本信息,可能因为各种局限,不太容易更改;
  • delete 操作必须转化为写入一个空文档,delete 实际上是一个标记文档,并且本身也有版本信息。但是如果后端发生了 segment merge,delete 可能会被合并以后物理清除。这样 delete 和对应的版本信息丢失,之后 reindex 如果写入了旧版本的文档,仍然会有一致性问题;但是空文档会增加索引文件的大小,有额外的消耗,一个可能的缓解办法是在 reindex 全部做完以后,再做一次空文档的删除。

改进方案:the_best

重建索引步骤如下:

  1. 保证 delete 操作都要转换成 index 操作,index 的内容可以是一个空文档;
  2. 对老索引 old_index(业务上的别名还是挂在老索引上)进行重索引操作(version_type=external);
    curl -X POST 'http://<hostname>:9200/_reindex'
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index",
        "size": 1000
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  3. 将别名切到 newIndex;
  4. 将重索引时间段内 old_index 产生的热数据,再捞一次到 new_index 中(conflicts=proceed&version_type=external);
    curl -X POST /_reindex
    {
    "conflicts": "proceed",
    "source": {
        "index": "old_index"
        "query": {
            "constant_score" : {
                "filter" : {
                    "range" : {
                        "data_update_time" : {
                            "gte" : <reindex开始时刻前的毫秒时间戳>
                        }
                    }
                }
            }
        }
    },
    "dest": {
        "index": "new_index",
        "version_type": "external"
    }
    }
  5. 手动做一次空文档的删除。

这种方式取决于重索引期间产生的数据量大小(会影响步骤4的用时),不过我们可以视具体业务情况灵活操作。比如说数据量比较大重索引我们用了10个小时(这10个小时内新产生了200多万的数据),在切别名前,我们可以按步骤(4)的调用方式,把近10个小时的数据再捞一遍到新索引中,如此迭代个几次,直到别名切完后,我们能保证最后一次的步骤(4)可以在较短时间内完成。

4、ES 节点通讯配置

http.port: 9200
http.bind_host: 127.0.0.1
transport.tcp.port: 9300
transport.bind_host: 127.0.0.1

5、把 Lucene 的原生 query 传给 ES

SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.types(typeName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.from(0);
sourceBuilder.size(10);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

//q为Lucene检索表达式, 直接输入关键词匹配_all或者*字段, 字段匹配user:kimchy, 
//多字段匹配user:kimchy AND message:Elasticsearch
QueryStringQueryBuilder queryStringQueryBuilder = QueryBuilders.queryStringQuery(q); 
sourceBuilder.query(queryStringQueryBuilder);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
SearchHits searchHits = searchResponse.getHits();

6、ES 文档字段个数限制

ES 文档默认不允许文档字段超过 1000,超过 1000 会报如下错误:

failed to put mappings on indices [[[nfvoemspm/srjL3cMMRUqa7DgOrYqX-A]]], type [log]
java.lang.IllegalArgumentException: Limit of total fields [1000] in index [xxx] has been exceeded

可以通过修改索引配置来修改字段个数限制,不过还是推荐从业务上进行优化:

修改settings
{
  "index.mapping.total_fields.limit": 2000
}

7、将 DSL 字符串转换为 QueryBuilder

## wrapper 案例
GET /_search
{
    "query" : {
        "wrapper": {
            "query" : "eyJ0ZXJtIiA6IHsgInVzZXIiIDogIktpbWNoeSIgfX0=" 
        }
    }
}

## RestClient
QueryBuilders.wrapperQuery("{\"term\": {\"field\":\"value\"}}")

8、ES 集群重启后 Slice Scroll 速度变慢

重启机器之后,pagecache 都没有了,所有数据都要重新从磁盘加载。

9、ES 开启索引新建删除日志

PUT _cluster/settings
{
  "persistent": {
    "logger.cluster.service": "DEBUG"
  }
}

10、慢日志全局级别设定

  1. 对已经存在的索引可以通过 PUT _settings 做存量设置
  2. 对之后新增的索引,可以使用类似于下面的template
    PUT _template/global-slowlog_template
    {
    "order": -1,
    "version": 0,
    "template": "*",
    "settings": {
        "index.indexing.slowlog.threshold.index.debug" : "10ms",
        "index.indexing.slowlog.threshold.index.info" : "50ms",
        "index.indexing.slowlog.threshold.index.warn" : "100ms",
        "index.search.slowlog.threshold.fetch.debug" : "100ms",
        "index.search.slowlog.threshold.fetch.info" : "200ms",
        "index.search.slowlog.threshold.fetch.warn" : "500ms",
        "index.search.slowlog.threshold.query.debug" : "100ms",
        "index.search.slowlog.threshold.query.info" : "200ms",
        "index.search.slowlog.threshold.query.warn" : "1s"
    }
    }

11、TCP 设置多个端口的用途

transport.tcp.port 这个参数不写,默认为 9300-9399,开放那么多 端口有用么?

  • 如果设置一个端口,假设这个端口占用了程序就无法正常启动;
  • 如果设置多个端口,一个端口占用会寻找下一个端口,直至找到可用端口。

12、ES 临时重启,设置分片延迟分配策略

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

三、Kibana

1、kibana 图表自定义标注

可以用 TSVB,支持标注。

Kibana TSVB 注解的使用:https://elasticsearch.cn/article/701

2、Kibana discover 导出 csv 文件

请参考文章:如何快速把 Kibana Discover 页的 Document Table 导出成 CSV

3、修改 kibana 的默认主页

https://elasticsearch.cn/article/6335

四、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

ClusterBlockException SERVICE_UNAVAILABLE/1/state

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
继续阅读 »

截图.PNG
图中为_cluster/health 和_cat/pending_tasks的执行结果,现在ES节点全部故障了,请教下如何恢复?   gateway.expected_nodes配置的为节点个数的80% 

 Caused by: org.elasticsearch.discovery.MasterNotDiscoveredException: ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction$4.onTimeout(TransportMasterNodeAction.java:213) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ContextPreservingListener.onTimeout(ClusterStateObserver.java:317) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.ClusterStateObserver$ObserverClusterStateListener.onTimeout(ClusterStateObserver.java:244) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.cluster.service.ClusterApplierService$NotifyTimeout.run(ClusterApplierService.java:578) [elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:568) [elasticsearch-6.1.3.jar:6.1.3]
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
                at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
        Caused by: org.elasticsearch.cluster.block.ClusterBlockException: blocked by: [SERVICE_UNAVAILABLE/1/state not recovered / initialized];
                at org.elasticsearch.cluster.block.ClusterBlocks.indexBlockedException(ClusterBlocks.java:182) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:64) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction.checkBlock(TransportCreateIndexAction.java:39) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:135) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.start(TransportMasterNodeAction.java:127) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:105) ~[elasticsearch-6.1.3.jar:6.1.3]
                at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:55) ~[elasticsearch-6.1.3.jar:6.1.3]
收起阅读 »

Hive 与 ElasticSearch 的数据交互

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES 与 Hive 直接的数据交互;通过 Hive 外部表的方式,可以快速将 ES 索引数据映射到 Hive 中,使用易于上手的 Hive SQL 实现对数据的进一步加工。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • ES 版本:6.5.1
  • Hive 版本:2.1.1
  • ES-Hadoop 版本:6.5.1

2、Hive 简介

Hive 在 Hadoop 生态系统中扮演着数据仓库的角色,借助 Hive 可以方便地进行数据汇总、即席查询以及分析存储在 Hadoop 文件系统中的大型数据集。

Hive 通过类 SQL 语言(HSQL)对 Hadoop 上的数据进行抽象,这样用户可以通过 SQL 语句对数据进行定义、组织、操作和分析;在 Hive 中,数据集是通过表(定义了数据类型相关信息)进行定义的,用户可以通过内置运算符或用户自定义函数(UDF)对数据进行加载、查询和转换。

3、Hive 安装 ES-Hadoop

官方推荐的安装方式:

使用 add jar

add jar /path/elasticsearch-hadoop.jar

使用 hive.aux.jars.path

$ bin/hive --auxpath=/path/elasticsearch-hadoop.jar

修改配置(hive-site.xml)

<property>
  <name>hive.aux.jars.path</name>
  <value>/path/elasticsearch-hadoop.jar</value>
  <description>A comma separated list (with no spaces) of the jar files</description>
</property>

CDH6.X 推荐的安装方法

elasticsearch-hadoop.jar 复制到 Hive 的 auxlib 目录中,然后重启 Hive 即可。

cp elasticsearch-hadoop.jar /opt/cloudera/parcels/CDH/lib/hive/auxlib/

二、Hive 与 ElasticSearch 的数据交互

1、数据类型对照表

请务必注意,ES 中的类型是 index/_mapping 中对应的数据类型,非 _source 里面数据的类型。

Hive type Elasticsearch type
void null
boolean boolean
tinyint byte
smallint short
int int
bigint long
double double
float float
string string
binary binary
timestamp date
struct map
map map
array array
union not supported (yet)
decimal string
date date
varchar string
char string

2、建立 Hive 外部表

CREATE EXTERNAL TABLE default.surface(
    water_type STRING,
    water_level STRING,
    monitor_time TIMESTAMP,
    sitecode STRING,
    p492 DOUBLE,
    p311 DOUBLE,
    status STRING
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES(
    'es.resource'='ods_data_day_surface*/doc',
    'es.query'='?q=status:001'
    'es.nodes'='sky-01','es.port'='9200',
    'es.net.http.auth.user'='sky',
    'es.net.http.auth.pass'='jointsky',
    'es.date.format'='yyyy-MM-dd HH:mm:ss',
    'es.ser.reader.value.class'='com.jointsky.bigdata.hive.EsValueReader'
    'es.mapping.names'='waterType:water_type,monitortime:monitor_time'
);

3、配置项说明

es.resource

es.resource 用于设置 ES 资源的位置,默认该配置项同时设置了读和写的索引,当然也可以分别设置读写索引名称:

  • es.resource.read:设置读取位置;
  • es.resource.write:设置写入位置。

es.query

es.query 设置查询过滤条件,目前支持 uri queryquery dslexternal resource 三种设置方式。

# uri (or parameter) query
es.query = ?q=costinl

# query dsl
es.query = { "query" : { "term" : { "user" : "costinl" } } }

# external resource
es.query = org/mypackage/myquery.json

es.mapping.names

es.mapping.names 用于设置 Hive 与 ES 的字段映射关系,如果不设置,则默认字段名不发生变化(即为 data type 区域定义的字段名);此外该部分还用于定义 Hive 到 ES 的数据映射类型。

'es.mapping.names' = 'date:@timestamp , url:url_123 ')

其他通用字段的说明请参考文章:使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

4、自定义日期类型解析

目前将 ES 的 date 类型映射到 Hive 的 TIMESTAMP 类型时,ES-Hadoop 组件只能识别时间戳格式或者标准的 XSD 格式的日期字符串:

@Override
protected Object parseDate(Long value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(value)) : processLong(value));
}

@Override
protected Object parseDate(String value, boolean richDate) {
    return (richDate ? new TimestampWritable(new Timestamp(DatatypeConverter.parseDateTime(value).getTimeInMillis())) : parseString(value));
}

关于 XSD(XML Schema Date/Time Datatypes)可用参考文章:https://www.w3schools.com/xml/schema_dtypes_date.asp

为了兼容自定义的日期格式,需要编写自定义的日期读取类:


import org.apache.hadoop.hive.serde2.io.TimestampWritable;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.hive.HiveValueReader;

import java.sql.Timestamp;
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.util.Date;

public class EsValueReader extends HiveValueReader {
    private String dateFormat;
    private static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String DEFAULT_DATE_FORMAT_MIN = "yyyy-MM-dd HH:mm";
    private static final String DEFAULT_DATE_FORMAT_HOUR = "yyyy-MM-dd HH";
    private static final String DEFAULT_DATE_FORMAT_DAY = "yyyy-MM-dd";

    @Override
    public void setSettings(Settings settings) {
        super.setSettings(settings);
        dateFormat = settings.getProperty("es.date.format");
    }

    @Override
    protected Object parseDate(String value, boolean richDate) {
        if (value != null && value.trim().length() > 0 && DEFAULT_DATE_FORMAT.equalsIgnoreCase(dateFormat)) {
            if (richDate){
                if (value.length() == 16){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_MIN).getTime()));
                }
                if (value.length() == 13){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_HOUR).getTime()));
                }
                if (value.length() == 10){
                    return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT_DAY).getTime()));
                }
                return new TimestampWritable(new Timestamp(parseDate(value, DEFAULT_DATE_FORMAT).getTime()));
            }
            return parseString(value);
        }
        return super.parseDate(value, richDate);
    }

    /**
     * 解析日期,根據指定的格式進行解析.<br>
     * 如果解析錯誤,則返回null
     * @param stringDate 日期字串
     * @param format 日期格式
     * @return 日期型別
     */
    private static Date parseDate(String stringDate, String format) {
        if (stringDate == null) {
            return null;
        }
        try {
            return parseDate(stringDate, new String[] { format });
        } catch (ParseException e) {
            return null;
        }
    }

    public static Date parseDate(String str, String... parsePatterns) throws ParseException {
        return parseDateWithLeniency(str, parsePatterns, true);
    }

    private static Date parseDateWithLeniency(
            String str, String[] parsePatterns, boolean lenient) throws ParseException {
        if (str == null || parsePatterns == null) {
            throw new IllegalArgumentException("Date and Patterns must not be null");
        }

        SimpleDateFormat parser = new SimpleDateFormat();
        parser.setLenient(lenient);
        ParsePosition pos = new ParsePosition(0);
        for (String parsePattern : parsePatterns) {
            String pattern = parsePattern;
            if (parsePattern.endsWith("ZZ")) {
                pattern = pattern.substring(0, pattern.length() - 1);
            }
            parser.applyPattern(pattern);
            pos.setIndex(0);
            String str2 = str;
            if (parsePattern.endsWith("ZZ")) {
                str2 = str.replaceAll("([-+][0-9][0-9]):([0-9][0-9])$", "$1$2");
            }
            Date date = parser.parse(str2, pos);
            if (date != null && pos.getIndex() == str2.length()) {
                return date;
            }
        }
        throw new ParseException("Unable to parse the date: " + str, -1);
    }
}

上述代码的 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-hadoop</artifactId>
        <version>6.5.4</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

自定义日期解析包的部署

代码编写完成后,将代码进行打包,然后将打包好的 jar 包放置到 Hive 的 auxlib 目录中,然后重启 Hive 即可;该步骤与 ES-Hadoop 的安装步骤一样。

在编写 Spark 程序从 Hive 中读取数据的时候,需要添加对该包的依赖以及对 ES-Hadoop 的依赖。

三、总结

经过上述的步骤,Hive 与 ES 的映射已经不成问题,如果想从 ES 中导出数据,可用借助 HSQL insert into table XXX select * from XXXXX; 的方式从 ES 中读取数据写入到 HDFS;当然通过更为复杂的 HSQL 可以将数据进行处理,并将数据重新写入到 ES 或者存储到 HDFS。

充分利用 ES 的查询、过滤和聚合,可以很好的去服务数据标准化、数据清洗、数据分布情况等 ETL 流程。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

自研基于StanfordNLP的ES分词插件

为ES构建Stanford NLP分词插件

Stanford NLP?

Stanford分词器是斯坦福大学NLP团队维护的一个开源分词器,支持了包括中文、英文…的语言,而且除了分词之外,它还支持了包括词性分析、情感分析…的各种功能。\ 这俩是这个project的项目主页

Why Stanford core NLP?

  市面上确实会有很多很有名的开源分词器,比如IK、Jieba,还有一些其他团队和公司提供的开源/商用的分词器,他们各有优劣。但是在各种分词器上比较了一大堆的分词case之后,我们发现Stanford NLP似乎是最适合我们当前需求的一个,因为我们不仅仅需要分词,还需要一些包括情感分析之类在内的更多的一些功能。

我们公司是做金融数据的搜索推荐的,在对比了各家分词器之后我们老板觉得Stanford NLP的效果最好,但是作为算法出身的人,他实现了一套非常重的分词、排序、搜索的服务。

在对比如研报、财报之类的信息进行搜索的时候确实会比较有效,但是在对经济类的新闻进行搜索的时候就会显得十分的笨重。

基于这个背景,我开始试图在ES里面引入老板推崇的Stanford 分词器来适应他的搜索、分词的需要,同时也能够不通过他那个笨重的分词排序服务来对我们系统中大量的经济、金融类的新闻进行分词、索引,并提供和他自己分词效果类似的分词和检索服务。

Why this project

我在包括百度、某谷姓404网站、GitHub以及国内的中文社区(Elastic中文社区)在内的各种地方搜过也问过了,但是似乎没有一个直接开箱可用的分词插件。所以,我只剩一条路了,就是搭建一个自己的插件来引用这个分词器。

How

对ES来说,插件主要分为两个部分:

  1. 让ES可以看到的部分(class extends Plugin)
  2. 自己行使职能的部分(functional part)

plugin

  1. 为了让ES可以加载我们的plugin,我们需要先继承Plugin类,然后我们这个是个分词器插件,所以还要实现AnalysisPlugin类
  2. 看过ES源码或者其他分词器源码的同学应该会知道,分词器插件需要实现两个方法,一个用来提供tokenizer,一个是analyzer分别对应分词器中的这俩。
    • 重写Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>>是为了可以提供搜索用分词器
    • 重写Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>>是为了可以提供索引用分词器
  3. 在这个分词器里面我们主要是依靠Tokenizer来实现分词的

functional class

分词器,特别是Tokenizer主要是靠重写三个方法来实现分词的

  1. incrementToken:用来确定每一个词元,输出每一个单词(字)以及它的位置、长度等
  2. reset:用来重制分词结果
  3. end:用来告诉ES,这段文本的分词已经结束了

所以我们主要需要重写的就是这仨方法,当然了,为了能让分词器正确的使用,我们还需要添加一些分词器的配置和初始化的内容,具体代码不写了可以参考我的git,主要讲两个坑:

  1. ES是通过配置文件里的路径来寻找对应的插件类
  2. 然后通过配置文件里的key和刚才提到的代码里的key来寻找对应的分词器,所以这俩地方不要写错了 #plugin-descriptor.properties: classname=org.elasticsearch.plugin.analysis.AnalysisSDPlugin #plugin-descriptor.properties: name=stanford-core-nlp
  3. 在开发过程中由于有java-security的存在,所以需要通过AccessController来调用和加载我们需要的外部jar包

odds and ends

  1. Stanford分词器里面包含了很多功能,目前我使用了分词的部分
  2. 分词器自带词典文件,不过如果要做词典的修改可能需要解包,修改,再重新打包
  3. 我现在hardcode了一大堆的标点符号在里面,后面可能会去优化一下部分逻辑
  4. 待完成的功能还有其他功能包括情感分析之类的

also see

GitHub 地址

继续阅读 »

为ES构建Stanford NLP分词插件

Stanford NLP?

Stanford分词器是斯坦福大学NLP团队维护的一个开源分词器,支持了包括中文、英文…的语言,而且除了分词之外,它还支持了包括词性分析、情感分析…的各种功能。\ 这俩是这个project的项目主页

Why Stanford core NLP?

  市面上确实会有很多很有名的开源分词器,比如IK、Jieba,还有一些其他团队和公司提供的开源/商用的分词器,他们各有优劣。但是在各种分词器上比较了一大堆的分词case之后,我们发现Stanford NLP似乎是最适合我们当前需求的一个,因为我们不仅仅需要分词,还需要一些包括情感分析之类在内的更多的一些功能。

我们公司是做金融数据的搜索推荐的,在对比了各家分词器之后我们老板觉得Stanford NLP的效果最好,但是作为算法出身的人,他实现了一套非常重的分词、排序、搜索的服务。

在对比如研报、财报之类的信息进行搜索的时候确实会比较有效,但是在对经济类的新闻进行搜索的时候就会显得十分的笨重。

基于这个背景,我开始试图在ES里面引入老板推崇的Stanford 分词器来适应他的搜索、分词的需要,同时也能够不通过他那个笨重的分词排序服务来对我们系统中大量的经济、金融类的新闻进行分词、索引,并提供和他自己分词效果类似的分词和检索服务。

Why this project

我在包括百度、某谷姓404网站、GitHub以及国内的中文社区(Elastic中文社区)在内的各种地方搜过也问过了,但是似乎没有一个直接开箱可用的分词插件。所以,我只剩一条路了,就是搭建一个自己的插件来引用这个分词器。

How

对ES来说,插件主要分为两个部分:

  1. 让ES可以看到的部分(class extends Plugin)
  2. 自己行使职能的部分(functional part)

plugin

  1. 为了让ES可以加载我们的plugin,我们需要先继承Plugin类,然后我们这个是个分词器插件,所以还要实现AnalysisPlugin类
  2. 看过ES源码或者其他分词器源码的同学应该会知道,分词器插件需要实现两个方法,一个用来提供tokenizer,一个是analyzer分别对应分词器中的这俩。
    • 重写Map<String, AnalysisModule.AnalysisProvider<TokenizerFactory>>是为了可以提供搜索用分词器
    • 重写Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>>是为了可以提供索引用分词器
  3. 在这个分词器里面我们主要是依靠Tokenizer来实现分词的

functional class

分词器,特别是Tokenizer主要是靠重写三个方法来实现分词的

  1. incrementToken:用来确定每一个词元,输出每一个单词(字)以及它的位置、长度等
  2. reset:用来重制分词结果
  3. end:用来告诉ES,这段文本的分词已经结束了

所以我们主要需要重写的就是这仨方法,当然了,为了能让分词器正确的使用,我们还需要添加一些分词器的配置和初始化的内容,具体代码不写了可以参考我的git,主要讲两个坑:

  1. ES是通过配置文件里的路径来寻找对应的插件类
  2. 然后通过配置文件里的key和刚才提到的代码里的key来寻找对应的分词器,所以这俩地方不要写错了 #plugin-descriptor.properties: classname=org.elasticsearch.plugin.analysis.AnalysisSDPlugin #plugin-descriptor.properties: name=stanford-core-nlp
  3. 在开发过程中由于有java-security的存在,所以需要通过AccessController来调用和加载我们需要的外部jar包

odds and ends

  1. Stanford分词器里面包含了很多功能,目前我使用了分词的部分
  2. 分词器自带词典文件,不过如果要做词典的修改可能需要解包,修改,再重新打包
  3. 我现在hardcode了一大堆的标点符号在里面,后面可能会去优化一下部分逻辑
  4. 待完成的功能还有其他功能包括情感分析之类的

also see

GitHub 地址

收起阅读 »

ELK 使用小技巧(第 4 期)

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Logstash 性能调优主要参数

  • pipeline.workers:设置启动多少个线程执行 fliter 和 output;当 input 的内容出现堆积而 CPU 使用率还比较充足时,可以考虑增加该参数的大小;
  • pipeline.batch.size:设置单个工作线程在执行过滤器和输出之前收集的最大事件数,较大的批量大小通常更高效,但会增加内存开销。输出插件会将每个批处理作为一个输出单元。;例如,ES 输出会为收到的每个批次发出批量请求;调整 pipeline.batch.size 可调整发送到 ES 的批量请求(Bulk)的大小;
  • pipeline.batch.delay:设置 Logstash 管道的延迟时间, 管道批处理延迟是 Logstash 在当前管道工作线程中接收事件后等待新消息的最长时间(以毫秒为单位);简单来说,当 pipeline.batch.size 不满足时,会等待 pipeline.batch.delay 设置的时间,超时后便开始执行 filter 和 output 操作。

2、使用 Ruby Filter 根据现有字段计算一个新字段

filter {
    ruby {
           code => "event.set('kpi', ((event.get('a') + event.get('b'))/(event.get('c')+event.get('d'))).round(2))"
     }
}

3、logstash filter 如何判断字段是够为空或者 null

if ![updateTime]

4、Date Filter 设置多种日期格式

date {
  match => ["logtime", "yyyy-MM-dd HH:mm:ss.SSS","yyyy-MM-dd HH:mm:ss,SSS"]
  target => "logtime_utc"
}

二、Elasticsearch

1、高效翻页 Search After

通常情况下我们会使用 from 和 size 的方式实现查询结果的翻页,但是当达到深度分页时,成本变得过高(堆内存占用和时间耗费与 from+size 的大小成正比),因此 ES 设置了限制(index.max_result_window),默认值为 10000,防止用户进行过于深入的翻页。

推荐使用 Scroll api 进行高效深度滚动,但滚动上下文代价很高,因此不要将 Scroll 用于实时用户请求。search_after 参数通过提供实时游标来解决深度滚动的问题,其主要思路是使用上一页的结果来帮助检索下一页。

GET twitter/_search
{
    "size": 10,
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "search_after": [1463538857, "654323"],
    "sort": [
        {"date": "asc"},
        {"tie_breaker_id": "asc"}
    ]
}

2、ES 文档相似度 BM25 参数设置

ES2.X 默认是以 TF/IDF 算法计算文档相似度,从 ES5.X 开始,BM25 作为默认的相似度计算算法。

PUT /index
{
    "settings" : {
        "index" : {
            "similarity" : {
              "my_similarity" : {
                "type" : "DFR",
                "basic_model" : "g",
                "after_effect" : "l",
                "normalization" : "h2",
                "normalization.h2.c" : "3.0"
              }
            }
        }
    }
}

PUT /index/_mapping/_doc
{
  "properties" : {
    "title" : { "type" : "text", "similarity" : "my_similarity" }
  }
}

3、ES2.X 得分计算

得分计算脚本:

double tf = Math.sqrt(doc.freq); 
double idf = Math.log((field.docCount+1.0)/(term.docFreq+1.0)) + 1.0; 
double norm = 1/Math.sqrt(doc.length); 
return query.boost * tf * idf * norm;
  • 忽略词频统计及词频位置:将字段的 index_options 设置为 docs;
  • 忽略字段长度:设置字段的 "norms": { "enabled": false }

4、CircuitBreakingException: [parent] Data too large

报错信息:

[WARN ][r.suppressed             ] path: /, params: {}
org.elasticsearch.common.breaker.CircuitBreakingException: [parent] Data too large, data for [<http_request>] would be [1454565650/1.3gb], which is larger than the limit of [1454427340/1.3gb], usages [request=0/0b, fielddata=568/568b, in_flight_requests=0/0b, accounting=1454565082/1.3gb]

jvm 堆内存不够当前查询加载数据所以会报 data too large, 请求被熔断,indices.breaker.request.limit默认为 jvm heap 的 60%,因此可以通过调整 ES 的 Heap Size 来解决该问题。

5、ES 免费的自动化运维工具推荐

6、elasticsearch-hanlp 分词插件包

核心功能:

  • 内置多种分词模式,适合不同场景;
  • 内置词典,无需额外配置即可使用;
  • 支持外置词典,用户可自定义分词算法,基于词典或是模型;
  • 支持分词器级别的自定义词典,便于用于多租户场景;
  • 支持远程词典热更新(待开发);
  • 拼音过滤器、繁简体过滤器(待开发);
  • 基于词语或单字的 ngram 切分分词(待开发)。

https://github.com/AnyListen/elasticsearch-analysis-hanlp

7、节点重启时延迟索引分片重分配

当某个节点短时间离开集群时,一般是不会影响整体系统运行的,可以通过下面的请求延迟索引分片的再分配。

PUT _all/_settings
{
  "settings": {
    "index.unassigned.node_left.delayed_timeout": "5m"
  }
}

8、ES 数据修改后,查询还是未修改前的数据

默认是 1 秒可见,如果你的需求一定要写完就可见,那在写的时候增加 refresh 参数,强制刷新即可,但强烈建议不这么干,因为这样会把整个集群拖垮。

9、Terms Query 从另一个索引获取 terms

当 Terms Query 需要指定很多 terms 的时候,如果手动设置还是相当麻烦的,可以通过 terms-lookup 的方式从另外一个索引加载需要匹配的 terms。

PUT /users/_doc/2
{
    "followers" : ["1", "3"]
}

PUT /tweets/_doc/1
{
    "user" : "1"
}

GET /tweets/_search
{
    "query" : {
        "terms" : {
            "user" : {
                "index" : "users",
                "type" : "_doc",
                "id" : "2",
                "path" : "followers"
            }
        }
    }
}

-----------等效下面的语句--------------

PUT /users/_doc/2
{
 "followers" : [
   {
     "id" : "1"
   },
   {
     "id" : "2"
   }
 ]
}

10、ES 备份路径设置

报错信息:

doesn't match any of the locations specified by path.repo because this setting is empty

结局方案,修改 ES 的配置文件:

# 在 elasticsearch.yml 中添加下面配置来设置备份仓库路径 
path.repo: ["/home/test/backup/zty_logstash"]

11、Query cache 和 Filter cache 的区别

Filter cache 被重命名为 Node Query Cache,也就是说 Query cache 等同于 Filter cache;Query Cache 采用了 LRU 的缓存方式(当缓存满的时候,淘汰旧的不用的缓存数据),Query Cache 只缓存被用于 filter 上下文的内容。

12、Shard 大小需要考虑的因素有哪些?

Lucene 底层没有这个大小的限制,20-40GB 的这个区间范围本身就比较大,经验值有时候就是拍脑袋,不一定都好使。

  • Elasticsearch 对数据的隔离和迁移是以分片为单位进行的,分片太大,会加大迁移成本;
  • 一个分片就是一个 Lucene 的库,一个 Lucene 目录里面包含很多 Segment,每个 Segment 有文档数的上限,Segment 内部的文档 ID 目前使用的是 Java 的整型,也就是 2 的 31 次方,所以能够表示的总的文档数为 Integer.MAX_VALUE - 128 = 2^31 - 128 = 2147483647 - 1 = 2,147,483,519,也就是21.4亿条;
  • 同样,如果你不 force merge 成一个 Segment,单个 shard 的文档数能超过这个数;
  • 单个 Lucene 越大,索引会越大,查询的操作成本自然要越高,IO 压力越大,自然会影响查询体验;
  • 具体一个分片多少数据合适,还是需要结合实际的业务数据和实际的查询来进行测试以进行评估。

13、ES 索引更新时通过 mapping 限制指定字段更新

Elasticsearch 默认是 Dynamic Mapping,新字段会自动猜测数据类型,并自动 merge 到之前的 Mapping,你可以在 Mapping 里面可以配置字段是否支持动态加入,设置参数dynamic即可:true,默认,表示支持动态加入新字段;false,表示忽略该字段的后续索引等操作,但是索引还是成功的;strict支持不支持未知字段,直接抛错。

14、ES 数据快照到 HDFS

ES 做快照和使用 ES-Hadoop 导数据是完全的两种不同的方式,使用 ES-Hadoopp 后期导入的成本可能也不小。

  • 如果要恢复快,当然是做快照和还原的方式最快,速度完全取决于网络和磁盘的速度;
  • 如果为了节省磁盘,快照的时候,可以选 6.5 最新支持的 source_only 模式,导出的快照要小很多,不过恢复的时候要进行重建,速度慢。

15、segment.memory 简介

segment 的大小,和 indexing buffer 有关,有三种方式会生成 segment:

  • 一种是 indexing buffer 写满了会生成 segment 文件,默认是堆内存的10%,是节点共享的;
  • 一种是 index buffer 有文档,但是还没满,但是 refresh 时间到了,这个时候就会把 buffer 里面的生成 segment 文件;
  • 还有最后一种就是 es 自动的会将小的 segment 文件定期合并产生新的 segment 文件。

三、社区文章精选


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »

使用 ES-Hadoop 将 Spark Streaming 流数据写入 ES

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

继续阅读 »

本文将详细介绍利用 ES-Hadoop 将 Spark 处理的数据写入到 ES 中。

一、开发环境

1、组件版本

  • CDH 集群版本:6.0.1
  • Spark 版本:2.2.0
  • Kafka 版本:1.0.1
  • ES 版本:6.5.1

2、Maven 依赖

<!-- scala -->
<dependency>
  <groupId>org.scala-lang</groupId>
  <artifactId>scala-library</artifactId>
  <version>2.11.8</version>
</dependency>

<!-- spark 基础依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- spark-streaming-kafka 相关依赖 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
  <version>2.2.0</version>
</dependency>

<!-- zookeeper 相关依赖 -->
<dependency>
  <groupId>org.apache.zookeeper</groupId>
  <artifactId>zookeeper</artifactId>
  <version>3.4.5-cdh6.0.1</version>
</dependency>

<!-- Spark-ES 相关依赖 -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>6.5.4</version>
</dependency>

<!-- Spark-ES 依赖的 HTTP 传输组件 -->
<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

3、注意事项

如果使用 CDH 版本的 Spark,则在调试及实际部署运行的时候会出现下面的错误:

java.lang.ClassNotFoundException: org.apache.commons.httpclient.protocol.Protocol

很显然是缺少 httpclient 相关依赖造成的,对比开源版本与 CDH 版本的 Spark,发现开源版本多出了 commons-httpclient-3.1.jar,因此上述 Maven 的 pom 文件添加上对其依赖即可。

二、ES-Hadoop

1、简介

ES-Hadoop 实现了 Hadoop 生态(Hive、Spark、Pig、Storm 等)与 ElasticSearch 之间的数据交互,借助该组件可以将 Hadoop 生态的数据写入到 ES 中,然后借助 ES 对数据快速进行搜索、过滤、聚合等分析,进一步可以通过 Kibana 来实现数据的可视化。

同时,也可以借助 ES 作为数据存储层(类似数仓的 Stage 层或者 ODS 层),然后借助 Hadoop 生态的数据处理工具(Hive、MR、Spark 等)将处理后的数据写入到 HDFS 中。

使用 ES 做为原始数据的存储层,可以很好的进行数据去重、数据质量分析,还可以提供一些即时的数据服务,例如趋势展示、汇总分析等。

对 Hadoop 数据进行交互分析

2、组成

ES-Hadoop 是一个整合性质的组件,它封装了 Hadoop 生态的多种组件与 ES 交互的 API,如果你只需要部分功能,可以使用细分的组件:

  • elasticsearch-hadoop-mr
  • elasticsearch-hadoop-hive
  • elasticsearch-hadoop-pig
  • elasticsearch-spark-20_2.10
  • elasticsearch-hadoop-cascading
  • elasticsearch-storm

三、elasticsearch-spark

1、配置

es-hadoop 核心是通过 es 提供的 restful 接口来进行数据交互,下面是几个重要配置项,更多配置信息请参阅官方说明

  • es.nodes:需要连接的 es 节点(不需要配置全部节点,默认会自动发现其他可用节点);
  • es.port:节点 http 通讯端口;
  • es.nodes.discovery:默认为 true,表示自动发现集群可用节点;
  • es.nodes.wan.only:默认为 false,设置为 true 之后,会关闭节点的自动 discovery,只使用 es.nodes 声明的节点进行数据读写操作;如果你需要通过域名进行数据访问,则设置该选项为 true,否则请务必设置为 false;
  • es.index.auto.create:是否自动创建不存在的索引,默认为 true;
  • es.net.http.auth.user:Basic 认证的用户名;
  • es.net.http.auth.pass:Basic 认证的密码。
val conf = new SparkConf().setIfMissing("spark.app.name","rt-data-loader").setIfMissing("spark.master", "local[5]")
conf.set(ConfigurationOptions.ES_NODES, esNodes)
conf.set(ConfigurationOptions.ES_PORT, esPort)
conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
conf.set("es.write.rest.error.handlers", "ignoreConflict")
conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

特别需要注意的配置项为 es.nodes.wan.only,由于在云服务器环境中,配置文件使用的一般为内网地址,而本地调试的时候一般使用外网地址,这样将 es.nodes 配置为外网地址后,最后会出现节点找不到的问题(由于会使用节点配置的内网地址去进行连接):

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available; 
node discovery is disabled and none of nodes specified fit the criterion [xxx.xx.x.xx:9200]

此时将 es.nodes.wan.only 设置为 true 即可。推荐开发测试时使用域名,集群部署的时候将该选项置为 false

2、屏蔽写入冲突

如果数据存在重复,写入 ES 时往往会出现数据写入冲突的错误,此时有两种解决方法。

方法一:设置 es.write.operation 为 upsert,这样达到的效果为如果存在则更新,不存在则进行插入,该配置项默认值为 index。

方法二:自定义冲突处理类,类似上述配置中设置了自定义的 error.handlers,通过自定义类来处理相关错误,例如忽略冲突等:

public class IgnoreConflictsHandler extends BulkWriteErrorHandler {
    public HandlerResult onError(BulkWriteFailure entry, DelayableErrorCollector<byte[]> collector) throws Exception {
        if (entry.getResponseCode() == 409) {
            StaticLog.warn("Encountered conflict response. Ignoring old data.");
            return HandlerResult.HANDLED;
        }
        return collector.pass("Not a conflict response code.");
    }
}

方法二可以屏蔽写入版本比预期的小之类的版本冲突问题。

3、RDD 写入 ES

EsSpark 提供了两种主要方法来实现数据写入:

  • saveToEs :RDD 内容为 Seq[Map],即一个 Map 对象集合,每个 Map 对应一个文档;
  • saveJsonToEs:RDD 内容为 Seq[String],即一个 String 集合,每个 String 是一个 JSON 字符串,代表一条记录(对应 ES 的 _source)。

数据写入可以指定很多配置信息,例如:

  • es.resource:设置写入的索引和类型,索引和类型名均支持动态变量
  • es.mapping.id:设置文档 _id 对应的字段名;
  • es.mapping.exclude:设置写入时忽略的字段,支持通配符。
val itemRdd = rdd.flatMap(line => {
    val topic = line.topic()
    println("正在处理:" + topic + " - " + line.partition() + " : " + line.offset())
    val jsonArray = JSON.parseArray(line.value()).toJavaList(classOf[JSONObject]).asScala
    val resultMap = jsonArray.map(jsonObj =>{
      var tmpId = "xxx"
      var tmpIndex = "xxxxxx"
      jsonObj.put("myTmpId", tmpId)
      jsonObj.put("myTmpIndex", tmpIndex)
      jsonObj.getInnerMap
    })
    resultMap
})
val mapConf = Map(
    ("es.resource" , "{myTmpIndex}/doc"),
    ("es.write.operation" , "upsert"),
    ("es.mapping.id" , "myTmpId"),
    ("es.mapping.exclude" , "myTmp*")
)
EsSpark.saveToEs(itemRdd, mapConf)

es.mapping.exclude 只支持 RDD 为 Map 集合(saveToEs),当为 Json 字符串集合时(saveJsonToEs)会提示不支持的错误信息;这个配置项非常有用,例如 myTmpId 作为文档 id,因此没有必要重复存储到 _source 里面了,可以配置到这个配置项,将其从 _source 中排除。


Any Code,Code Any!

扫码关注『AnyCode』,编程路上,一起前行。

收起阅读 »