2018 年 Elastic Advent Calendar 分享活动开启了🎤

medcl 发布了置顶文章 • 35 个评论 • 1693 次浏览 • 2018-11-20 22:33 • 来自相关话题

Day 14: Elasticsearch 5 入坑指南

kennywu76 发布了置顶文章 • 33 个评论 • 19093 次浏览 • 2016-12-15 13:16 • 来自相关话题

Day1: 大规模Elasticsearch集群管理心得

kennywu76 发布了置顶文章 • 74 个评论 • 30861 次浏览 • 2016-12-02 10:07 • 来自相关话题

Day 10 - Elasticsearch 分片恢复并发数过大引发的bug分析

howardhuang 发表了文章 • 1 个评论 • 410 次浏览 • 21 小时前 • 来自相关话题

       大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。

场景描述

       废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:

<br /> curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'<br /> {<br /> "persistent": {<br /> "cluster.routing.allocation.node_concurrent_recoveries": 100,<br /> "indices.recovery.max_bytes_per_sec": "40mb"<br /> }<br /> }<br /> '<br />
       设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。

基本概念

ES 线程池(thread pool)

ES 中每个节点有多种线程池,各有用途。重要的有:

  • generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
  • index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
  • search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
  • get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
  • write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。
    还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/e ... .html

    集群恢复之分片恢复

           我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。
    一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/deve ... 34743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:

  • EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
  • PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
  • SNAPSHOT : 数据从备份仓库恢复。
  • LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。

    PEER 场景分片恢复并发数主要由如下参数控制:

  • cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
  • cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。
    详细参数可参考官方文档:https://www.elastic.co/guide/e ... .html

           集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。

    问题复现与剖析

           为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。
    先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:
    <br /> curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'<br /> {<br /> "persistent": {<br /> "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数<br /> }<br /> }<br /> '<br />
     
    接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:

    分片恢复统计信息.png



     
    通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:

    分片恢复限制.png


     
    观察线程池的数量,generic 线程池打满128.

    线程池统计.png



    此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。
    通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。

    堆栈分析.png


     
           现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。

           当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:

    远端恢复时序分析.png


     

           6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:

  • 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
  • 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
  • 第5步:发送 diff 数据文件给目标节点。
  • 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
  • 第10步:源节点发送指定范围的 translog 快照给目标节点。
  • 第12步:结束恢复流程。

           我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。

    问题处理

           为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/ela ... 36195 进行跟踪。

    总结

           本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428

    完结,谢谢!

Day 9 - 动手实现一个自定义beat

Xinglong 发表了文章 • 0 个评论 • 149 次浏览 • 1 天前 • 来自相关话题

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。
本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9


下面的工具会提供python本身需要依赖的一些native包

<br /> yum install openssl -y<br /> yum install openssl-devel -y<br /> yum install zlib-devel -y<br /> yum install zlib -y<br />

安装python

<br /> wget <a href="https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz" rel="nofollow" target="_blank">https://www.python.org/ftp/pyt ... 9.tgz</a><br /> tar -zxvf Python-2.7.9.tgz<br /> cd ~/python/Python-2.7.9<br /> ./configure --prefix=/usr/local/python-2.7.9<br /> make<br /> make install<br /> <br /> rm -f /bin/python<br /> ln -s /usr/local/python-2.7.9/bin/python /bin/python<br />

安装工具包 distribute, setuptools, pip

<br /> cd ~/python/setuptools-19.6 && python setup.py install<br /> cd ~/python/pip-1.5.4 && python setup.py install<br /> cd ~/python/distribute-0.7.3 && python setup.py install<br />

安装cookiecutter

<br /> pip install --user cookiecutter<br />

安装cookiecutter所依赖的工具

<br /> pip install backports.functools-lru-cache<br /> pip install six<br /> pip install virtualenv<br />

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到


代码实现


需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat


  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go

    <br /> $ go get github.com/elastic/beats<br /> $ cd $GOPATH/src/github.com/elastic/beats<br /> $ git checkout 5.1<br /> <br /> [root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat<br /> project_name [Examplebeat]: hdfsauditbeat<br /> github_name [your-github-name]: suxingfate<br /> beat [hdfsauditbeat]:<br /> beat_path [github.com/suxingfate]:<br /> full_name [Firstname Lastname]: xinglong<br /> <br /> make setup<br />

    到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping



    1 _meta/beat.yml

    这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑
    ```
    [root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
    ################### Hdfsauditbeat Configuration Example #########################

    ############################# Hdfsauditbeat ######################################

    hdfsauditbeat:

    Defines how often an event is sent to the output

    period: 1s
    path: "."
    ```

    2 config/config.go

    这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

    <br /> [root@minikube-2830379 hdfsauditbeat]# cat config/config.go<br /> // Config is put into a different package to prevent cyclic imports in case<br /> // it is needed in several locations<br /> <br /> package config<br /> <br /> import "time"<br /> <br /> type Config struct {<br /> Period time.Duration `config:"period"`<br /> Path string `config:"path"`<br /> }<br /> <br /> var DefaultConfig = Config{<br /> Period: 1 * time.Second,<br /> Path: ".",<br /> }<br />

    3 beater/hdfsauditbeat.go

    这里需要改动的地方是:
    3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去
    3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

    <br /> [root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go<br /> package beater<br /> <br /> import (<br /> "fmt"<br /> "time"<br /> "os"<br /> "io"<br /> "bufio"<br /> "strings"<br /> "github.com/elastic/beats/libbeat/beat"<br /> "github.com/elastic/beats/libbeat/common"<br /> "github.com/elastic/beats/libbeat/logp"<br /> "github.com/elastic/beats/libbeat/publisher"<br /> <br /> "github.com/suxingfate/hdfsauditbeat/config"<br /> )<br /> <br /> type Hdfsauditbeat struct {<br /> done chan struct{}<br /> config config.Config<br /> client publisher.Client<br /> }<br /> <br /> // Creates beater<br /> func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {<br /> config := config.DefaultConfig<br /> if err := cfg.Unpack(&config); err != nil {<br /> return nil, fmt.Errorf("Error reading config file: %v", err)<br /> }<br /> <br /> bt := &Hdfsauditbeat{<br /> done: make(chan struct{}),<br /> config: config,<br /> }<br /> return bt, nil<br /> }<br /> <br /> func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {<br /> logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")<br /> <br /> bt.client = b.Publisher.Connect()<br /> ticker := time.NewTicker(bt.config.Period)<br /> counter := 1<br /> for {<br /> select {<br /> case <-bt.done:<br /> return nil<br /> case <-ticker.C:<br /> }<br /> <br /> bt.catAudit(bt.config.Path)<br /> <br /> logp.Info("Event sent")<br /> counter++<br /> }<br /> }<br /> <br /> func (bt *Hdfsauditbeat) Stop() {<br /> bt.client.Close()<br /> close(bt.done)<br /> }<br /> <br /> func (bt *Hdfsauditbeat) catAudit(auditFile string) {<br /> file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)<br /> if err != nil {<br /> //fmt.Println("Open file error!", err)<br /> return<br /> }<br /> defer file.Close()<br /> <br /> buf := bufio.NewReader(file)<br /> for {<br /> line, err := buf.ReadString('\n')<br /> line = strings.TrimSpace(line)<br /> if line == "" {<br /> return<br /> }<br /> <br /> timeEnd := strings.Index(line, ",")<br /> timeString := line[0 :timeEnd]<br /> tm, _ := time.Parse("2006-01-02 03:04:05", timeString)<br /> <br /> ugiStart := strings.Index(line, "ugi=") + 4<br /> ugiEnd := strings.Index(line, " (auth")<br /> ugi := line[ugiStart :ugiEnd]<br /> <br /> cmdStart := strings.Index(line, "cmd=") + 4<br /> line = line[cmdStart:len(line)]<br /> cmdEnd := strings.Index(line, " ")<br /> cmd := line[0 : cmdEnd]<br /> <br /> srcStart := strings.Index(line, "src=") + 4<br /> line = line[srcStart:len(line)]<br /> srcEnd := strings.Index(line, " ")<br /> src := line[0:srcEnd]<br /> <br /> dstStart := strings.Index(line, "dst=") + 4<br /> line = line[dstStart:len(line)]<br /> dstEnd := strings.Index(line, " ")<br /> dst := line[0:dstEnd]<br /> <br /> event := common.MapStr{<br /> "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),<br /> "ugi": ugi,<br /> "cmd": cmd,<br /> "src": src,<br /> "dst": dst,<br /> }<br /> bt.client.PublishEvent(event)<br /> <br /> if err != nil {<br /> if err == io.EOF {<br /> //fmt.Println("File read ok!")<br /> break<br /> } else {<br /> //fmt.Println("Read file error!", err)<br /> return<br /> }<br /> }<br /> }<br /> }<br /> <br />

    4 _meat/fields.yml

    ```
    [root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml

  • key: hdfsauditbeat
    title: hdfsauditbeat
    description:
    fields:
    • name: counter
      type: long
      required: true
      description: >
      PLEASE UPDATE DOCUMENTATION

      new fiels added hdfsaudit

    • name: entrytime
      type: date
    • name: ugi
      type: keyword
    • name: cmd
      type: keyword
    • name: src
      type: keyword
    • name: dst
      type: keyword
      ```

      测试


      首先编译好项目
      <br /> make update<br /> make<br />
      然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。
      修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

      ```
      [root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
      ################### Hdfsauditbeat Configuration Example #########################

      ############################# Hdfsauditbeat ######################################

      hdfsauditbeat:

      Defines how often an event is sent to the output

      period: 1s
      path: "/root/go/hdfs-audit.log"

      ================================ General =====================================


      The name of the shipper that publishes the network data. It can be used to group

      all the transactions sent by a single shipper in the web interface.

      name:


      The tags of the shipper are included in their own field with each

      transaction published.

      tags: ["service-X", "web-tier"]


      Optional fields that you can specify to add additional information to the

      output.

      fields:

      env: staging


      ================================ Outputs =====================================


      Configure what outputs to use when sending the data collected by the beat.

      Multiple outputs may be used.

      -------------------------- Elasticsearch output ------------------------------

      output.elasticsearch:

      Array of hosts to connect to.

      hosts: ["localhost:9200"]


      Optional protocol and basic auth credentials.

      protocol: "https"

      username: "elastic"

      password: "changeme"


      ----------------------------- Logstash output --------------------------------

      output.logstash:

      The Logstash hosts

      hosts: ["localhost:5044"]


      Optional SSL. By default is off.

      List of root certificates for HTTPS server verifications

      ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]


      Certificate for SSL client authentication

      ssl.certificate: "/etc/pki/client/cert.pem"


      Client Certificate Key

      ssl.key: "/etc/pki/client/cert.key"


      output.console:
      pretty: true

      ================================ Logging =====================================


      Sets log level. The default log level is info.

      Available log levels are: critical, error, warning, info, debug

      logging.level: debug


      At debug level, you can selectively enable logging only for some components.

      To enable all selectors use ["*"]. Examples of other selectors are "beat",

      "publish", "service".

      logging.selectors: ["*"]

      <br /> <br /> 开始执行<br />
      [root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
      {
      "@timestamp": "2018-12-09T03:00:00.000Z",
      "beat": {
      "hostname": "minikube-2830379.lvs02.dev.abc.com",
      "name": "minikube-2830379.lvs02.dev.abc.com",
      "version": "5.1.3"
      },
      "cmd": "create",
      "dst": "null",
      "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
      "ugi": "appmon@APD.ABC.COM"
      }
      {
      "@timestamp": "2018-12-09T03:00:00.000Z",
      "beat": {
      "hostname": "minikube-2830379.lvs02.dev.abc.com",
      "name": "minikube-2830379.lvs02.dev.abc.com",
      "version": "5.1.3"
      },
      "cmd": "create",
      "dst": "null",
      "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
      "ugi": "appmon@APD.ABC.COM"
      }
      ```

      结束


      使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

Day 8 - 如何使用Spark快速将数据写入Elasticsearch

Ricky Huo 发表了文章 • 0 个评论 • 1020 次浏览 • 2 天前 • 来自相关话题

如何使用Spark快速将数据写入Elasticsearch



说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

    为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

    我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

    今天给大家推荐一款能够实现数据快速写入的黑科技——[Waterdrop](https://github.com/InterestingLab/waterdrop),一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

    wd.png




    Kafka to Elasticsearch


    和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

    Log Sample


    原始日志格式如下:
    <br /> 127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"<br />

    Elasticsearch Document


    我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:
    <br /> domain String<br /> hostname String<br /> status int<br /> datetime String<br /> count int<br />

    Waterdrop with Elasticsearch


    接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

    Waterdrop


    [Waterdrop](https://github.com/InterestingLab/waterdrop)同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

    Prerequisites


    首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

    1. 准备Spark环境
    2. 安装Waterdrop
    3. 配置Waterdrop

      以下是简易步骤,具体安装可以参照[Quick Start](https://interestinglab.github. ... -start)

      ```yaml
      cd /usr/local
      wget https://archive.apache.org/dis ... 7.tgz
      tar -xvf https://archive.apache.org/dis ... 7.tgz
      wget https://github.com/Interesting ... 1.zip
      unzip waterdrop-1.1.1.zip
      cd waterdrop-1.1.1

      vim config/waterdrop-env.sh

      指定Spark安装路径

      SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
      ```

      Waterdrop Pipeline


      与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

      配置文件包括四个部分,分别是Spark、Input、filter和Output。

      Spark



      这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
      <br /> spark {<br /> spark.app.name = "Waterdrop"<br /> spark.executor.instances = 2<br /> spark.executor.cores = 1<br /> spark.executor.memory = "1g"<br /> }<br />

      Input


      这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

      <br /> kafkaStream {<br /> topics = "waterdrop-es"<br /> consumer.bootstrap.servers = "localhost:9092"<br /> consumer.group.id = "waterdrop_es_group"<br /> consumer.rebalance.max.retries = 100<br /> }<br />

      Filter


      在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合
      ```yaml
      filter {

      使用正则解析原始日志

      最开始数据都在raw_message字段中

      grok {
      source_field = "raw_message"
      pattern = '%{NOTSPACE:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
      }

      将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

      Elasticsearch中支持的格式

      date {
      source_field = "timestamp"
      target_field = "datetime"
      source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
      target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
      }

      利用SQL对数据进行聚合

      sql {
      table_name = "access_log"
      sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
      }
      }
      ```

      Output

      最后我们将处理好的结构化数据写入Elasticsearch。

      yaml<br /> output {<br /> elasticsearch {<br /> hosts = ["localhost:9200"]<br /> index = "waterdrop-${now}"<br /> es.batch.size.entries = 100000<br /> index_time_format = "yyyy.MM.dd"<br /> }<br /> }<br />

      Running Waterdrop


      我们将上述四部分配置组合成为我们的配置文件config/batch.conf

      vim config/batch.conf

      ```
      spark {
      spark.app.name = "Waterdrop"
      spark.executor.instances = 2
      spark.executor.cores = 1
      spark.executor.memory = "1g"
      }
      input {
      kafkaStream {
      topics = "waterdrop-es"
      consumer.bootstrap.servers = "localhost:9092"
      consumer.group.id = "waterdrop_es_group"
      consumer.rebalance.max.retries = 100
      }
      }
      filter {

      使用正则解析原始日志

      最开始数据都在raw_message字段中

      grok {
      source_field = "raw_message"
      pattern = '%{IP:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
      }

      将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

      Elasticsearch中支持的格式

      date {
      source_field = "timestamp"
      target_field = "datetime"
      source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
      target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
      }

      利用SQL对数据进行聚合

      sql {
      table_name = "access_log"
      sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
      }
      }
      output {
      elasticsearch {
      hosts = ["localhost:9200"]
      index = "waterdrop-${now}"
      es.batch.size.entries = 100000
      index_timeformat = "yyyy.MM.dd"
      }
      }
      ```

      执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

      ./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

      最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^
      ^.

      <br /> "_source": {<br /> "domain": "elasticsearch.cn",<br /> "hostname": "localhost",<br /> "status": "200",<br /> "datetime": "2018-11-26T21:54:00.000+08:00",<br /> "count": 26<br /> }<br />

      Conclusion


      在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

      当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

      希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页[https://github.com/InterestingLab/waterdrop](https://github.com/InterestingLab/waterdrop)


      我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

      Contract us


      欢迎联系我们交流Spark和Elasticsearch:

      Garyelephant: 微信: garyelephant

      RickyHuo: 微信: chodomatte1994

Day 7 - Elasticsearch中数据是如何存储的

weizijun 发表了文章 • 2 个评论 • 474 次浏览 • 3 天前 • 来自相关话题

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。


elasticsearch_store_arc.png



shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

    lucene文件内容

    lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

    | Name | Extension | Brief Description |
    | ------ | ------ | ------ |
    |Segment Info|.si|segment的元数据文件|
    |Compound File|.cfs, .cfe|一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息|
    |Fields|.fnm|保存了fields的相关信息|
    |Field Index|.fdx|正排存储文件的元数据信息|
    |Field Data|.fdt|存储了正排存储数据,写入的原文存储在这|
    |Term Dictionary|.tim|倒排索引的元数据信息|
    |Term Index|.tip|倒排索引文件,存储了所有的倒排索引数据|
    |Frequencies|.doc|保存了每个term的doc id列表和term在doc中的词频|
    |Positions|.pos|Stores position information about where a term occurs in the index
    全文索引的字段,会有该文件,保存了term在doc中的位置|
    |Payloads|.pay|Stores additional per-position metadata information such as character offsets and user payloads
    全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性|
    |Norms|.nvd, .nvm|文件保存索引字段加权数据|
    |Per-Document Values|.dvd, .dvm|lucene的docvalues文件,即数据的列式存储,用作聚合和排序|
    |Term Vector Data|.tvx, .tvd, .tvf|Stores offset into the document data file
    保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用|
    |Live Documents|.liv|记录了segment中删除的doc|

    测试数据示例

    下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

    写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

    数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

    使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

    <br /> PUT test_field<br /> {<br /> "settings": {<br /> "index": {<br /> "number_of_shards": "1",<br /> "number_of_replicas": "0",<br /> "refresh_interval": "30s"<br /> }<br /> },<br /> "mappings": {<br /> "type": {<br /> "_all": {<br /> "enabled": false<br /> }, <br /> "properties": {<br /> "uuid": {<br /> "type": "string",<br /> "index": "not_analyzed"<br /> }<br /> }<br /> }<br /> }<br /> }<br />

    首先写入100w不同的uuid,使用磁盘容量细节如下:

    <br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 122.7mb 122.7mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 19 21:23 _8.fdt<br /> -rw-r--r-- 1 weizijun staff 17K Aug 19 21:23 _8.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:23 _8.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:23 _8.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 19 21:23 _8_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 19 21:23 _8_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 340K Aug 19 21:23 _8_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 19 21:23 _8_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 19 21:23 _8_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:23 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:20 write.lock<br />
    可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

    35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

    我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.2mb 13.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 5.5M Aug 19 21:29 _6.fdt<br /> -rw-r--r-- 1 weizijun staff 15K Aug 19 21:29 _6.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:29 _6.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:29 _6.si<br /> -rw-r--r-- 1 weizijun staff 309K Aug 19 21:29 _6_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 7.0M Aug 19 21:29 _6_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 195K Aug 19 21:29 _6_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 244K Aug 19 21:29 _6_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 252B Aug 19 21:29 _6_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:29 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:26 write.lock<br />

    这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

    所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

    我们再看看存100w个不一样的整型会是如何。

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.6mb 13.6mb <br /> <br /> -rw-r--r-- 1 weizijun staff 6.1M Aug 28 10:19 _42.fdt<br /> -rw-r--r-- 1 weizijun staff 22K Aug 28 10:19 _42.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 28 10:19 _42.fnm<br /> -rw-r--r-- 1 weizijun staff 503B Aug 28 10:19 _42.si<br /> -rw-r--r-- 1 weizijun staff 2.8M Aug 28 10:19 _42_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 2.2M Aug 28 10:19 _42_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 83K Aug 28 10:19 _42_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 2.5M Aug 28 10:19 _42_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 228B Aug 28 10:19 _42_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 28 10:19 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 28 10:16 write.lock<br />

    从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

    我们再看一下开启最佳压缩参数对存储空间的影响:

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 107.2mb 107.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 25M Aug 20 12:30 _5.fdt<br /> -rw-r--r-- 1 weizijun staff 6.0K Aug 20 12:30 _5.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 20 12:31 _5.fnm<br /> -rw-r--r-- 1 weizijun staff 500B Aug 20 12:31 _5.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 20 12:31 _5_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 20 12:31 _5_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 322K Aug 20 12:31 _5_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 20 12:31 _5_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 20 12:31 _5_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 224B Aug 20 12:31 segments_4<br /> -rw-r--r-- 1 weizijun staff 0B Aug 20 12:00 write.lock<br />

    结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

    我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:

    <br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 162.4mb 162.4mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 18 22:59 _20.fdt<br /> -rw-r--r-- 1 weizijun staff 18K Aug 18 22:59 _20.fdx<br /> -rw-r--r-- 1 weizijun staff 777B Aug 18 22:59 _20.fnm<br /> -rw-r--r-- 1 weizijun staff 59B Aug 18 22:59 _20.nvd<br /> -rw-r--r-- 1 weizijun staff 78B Aug 18 22:59 _20.nvm<br /> -rw-r--r-- 1 weizijun staff 539B Aug 18 22:59 _20.si<br /> -rw-r--r-- 1 weizijun staff 7.2M Aug 18 22:59 _20_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 4.2M Aug 18 22:59 _20_Lucene50_0.pos<br /> -rw-r--r-- 1 weizijun staff 73M Aug 18 22:59 _20_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 832K Aug 18 22:59 _20_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 18 22:59 _20_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 18 22:59 _20_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 18 22:59 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 18 22:53 write.lock<br /> <br />

    开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

    然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。


    lucene各文件具体内容和实现

    lucene数据元信息文件

    文件名为:segments_xxx

    该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

    该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

    lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

    文件示例


    elasticsearch_store_segments.png




    具体实现类


    ```
    public final class SegmentInfos implements Cloneable, Iterable {
    // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
    private long generation; // generation of the "segments_N" for the next commit

    private long lastGeneration; // generation of the "segments_N" file we last successfully read
    // or wrote; this is normally the same as generation except if
    // there was an IOException that had interrupted a commit

    / Id for this commit; only written starting with Lucene 5.0 */
    private byte[] id;


    /* Which Lucene version wrote this commit, or null if this commit is pre-5.3. /
    private Version luceneVersion;

    /
    Counts how often the index has been changed. */
    public long version;

    / Used to name new segments. */
    // TODO: should this be a long ...?
    public int counter;

    /* Version of the oldest segment in the index, or null if there are no segments. /
    private Version minSegmentLuceneVersion;

    private List segments = new ArrayList<>();

    /
    Opaque Map<String, String> that user can specify during IndexWriter.commit */
    public Map<String,String> userData = Collections.emptyMap();
    }

    /** Embeds a [read-only] SegmentInfo and adds per-commit

    • fields.
      *
    • @lucene.experimental */
      public class SegmentCommitInfo {

      /* The {@link SegmentInfo} that we wrap. /
      public final SegmentInfo info;

      // How many deleted docs in the segment:
      private int delCount;

      // Generation number of the live docs file (-1 if there
      // are no deletes yet):
      private long delGen;

      // Normally 1+delGen, unless an exception was hit on last
      // attempt to write:
      private long nextWriteDelGen;

      // Generation number of the FieldInfos (-1 if there are no updates)
      private long fieldInfosGen;

      // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
      // write
      private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

      // Generation number of the DocValues (-1 if there are no updates)
      private long docValuesGen;

      // Normally 1+dvGen, unless an exception was hit on last attempt to
      // write
      private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

      // TODO should we add .files() to FieldInfosFormat, like we have on
      // LiveDocsFormat?
      // track the fieldInfos update files
      private final Set fieldInfosFiles = new HashSet<>();

      // Track the per-field DocValues update files
      private final Map<Integer,Set> dvUpdatesFiles = new HashMap<>();

      // Track the per-generation updates files
      @Deprecated
      private final Map<Long,Set> genUpdatesFiles = new HashMap<>();

      private volatile long sizeInBytes = -1;
      }

      ```

      segment的元信息文件

      文件后缀:.si

      每个segment都有一个.si文件,记录了该segment的元信息。

      segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

      文件示例


      elasticsearch_store_si.png




      具体实现类


      ```
      /**

    • Information about a segment such as its name, directory, and files related
    • to the segment.
      *
    • @lucene.experimental
      */
      public final class SegmentInfo {

      // _bl
      public final String name;

      /* Where this segment resides. /
      public final Directory dir;

      /* Id that uniquely identifies this segment. /
      private final byte[] id;

      private Codec codec;

      // Tracks the Lucene version this segment was created with, since 3.1. Null
      // indicates an older than 3.0 index, and it's used to detect a too old index.
      // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
      // specific versions afterwards ("3.0.0", "3.1.0" etc.).
      // see o.a.l.util.Version.
      private Version version;

      private int maxDoc; // number of docs in seg

      private boolean isCompoundFile;


      private Map<String,String> diagnostics;

      private Set setFiles;

      private final Map<String,String> attributes;
      }
      ```

      fields信息文件

      文件后缀:.fnm

      该文件存储了fields的基本信息。

      fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

      文件示例


      elasticsearch_store_fnm.png




      具体实现类


      ```
      /**

    • Access to the Field Info file that describes document fields and whether or
    • not they are indexed. Each segment has a separate Field Info file. Objects
    • of this class are thread-safe for multiple readers, but only one thread can
    • be adding documents at a time, with no other reader or writer threads
    • accessing this object.
      /
      public final class FieldInfo {
      /
      Field's name */
      public final String name;

      /* Internal field number /
      //field在内部的编号
      public final int number;

      //field docvalues的类型
      private DocValuesType docValuesType = DocValuesType.NONE;

      // True if any document indexed term vectors
      private boolean storeTermVector;

      private boolean omitNorms; // omit norms associated with indexed fields

      //index的配置项
      private IndexOptions indexOptions = IndexOptions.NONE;

      private boolean storePayloads; // whether this field stores payloads together with term positions

      private final Map<String,String> attributes;

      // docvalues的generation
      private long dvGen;
      }
      ```

      数据存储文件

      文件后缀:.fdx, .fdt

      索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

      索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

      文件示例


      elasticsearch_store_fdt.png




      具体实现类

      ```
      /**

    • Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
    • @lucene.internal
      */
      public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
      private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

      final int maxDoc;

      //docid索引,快速定位某个docid的数组坐标
      final int[] docBases;

      //快速定位某个docid所在的文件offset的startPointer
      final long[] startPointers;

      //平均一个chunk的文档数
      final int[] avgChunkDocs;

      //平均一个chunk的size
      final long[] avgChunkSizes;

      final PackedInts.Reader[] docBasesDeltas; // delta from the avg

      final PackedInts.Reader[] startPointersDeltas; // delta from the avg
      }

      /**
    • {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
    • @lucene.experimental
      */
      public final class CompressingStoredFieldsReader extends StoredFieldsReader {

      //从fdt正排索引文件中获得
      private final int version;

      // field的基本信息
      private final FieldInfos fieldInfos;

      //fdt正排索引文件reader
      private final CompressingStoredFieldsIndexReader indexReader;

      //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
      private final long maxPointer;

      //fdx正排数据文件句柄
      private final IndexInput fieldsStream;

      //块大小
      private final int chunkSize;

      private final int packedIntsVersion;

      //压缩类型
      private final CompressionMode compressionMode;

      //解压缩处理对象
      private final Decompressor decompressor;

      //文档数量,从segment元数据中获得
      private final int numDocs;

      //是否正在merge,默认为false
      private final boolean merging;

      //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
      private final BlockState state;
      //chunk的数量
      private final long numChunks; // number of compressed blocks written

      //dirty chunk的数量
      private final long numDirtyChunks; // number of incomplete compressed blocks written

      //是否close,默认为false
      private boolean closed;
      }
      ```

      倒排索引文件

      索引后缀:.tip,.tim

      倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

      5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:[http://www.cnblogs.com/bonelee/p/6226185.html ](http://www.cnblogs.com/bonelee/p/6226185.html )

      [http://examples.mikemccandless ... %2Bit](http://examples.mikemccandless ... d%2Bit) 为FST图实例,可以根据输入的数据构造出FST图

      <br /> 输入到 FST 中的数据为:<br /> String inputValues[] = {"mop","moth","pop","star","stop","top"};<br /> long outputValues[] = {0,1,2,3,4,5};<br />
      生成的 FST 图为:

      elasticsearch_store_tip1.png



      elasticsearch_store_tip2.png



      文件示例


      elasticsearch_store_tip3.png




      具体实现类

      ```
      public final class BlockTreeTermsReader extends FieldsProducer {
      // Open input to the main terms dict file (_X.tib)
      final IndexInput termsIn;
      // Reads the terms dict entries, to gather state to
      // produce DocsEnum on demand
      final PostingsReaderBase postingsReader;
      private final TreeMap<String,FieldReader> fields = new TreeMap<>();

      / File offset where the directory starts in the terms file. */
      /索引数据文件tim的数据的尾部的元数据的地址
      private long dirOffset;
      /* File offset where the directory starts in the index file. /

      //索引文件tip的数据的尾部的元数据的地址
      private long indexDirOffset;

      //semgent的名称
      final String segment;

      //版本号
      final int version;

      //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
      final boolean anyAutoPrefixTerms;
      }

      /

    • BlockTree's implementation of {@link Terms}.
    • @lucene.internal
      */
      public final class FieldReader extends Terms implements Accountable {

      //term的数量
      final long numTerms;

      //field信息
      final FieldInfo fieldInfo;

      final long sumTotalTermFreq;

      //总的文档频率
      final long sumDocFreq;

      //文档数量
      final int docCount;

      //字段在索引文件tip中的起始位置
      final long indexStartFP;

      final long rootBlockFP;

      final BytesRef rootCode;

      final BytesRef minTerm;

      final BytesRef maxTerm;

      //longs:metadata buffer, holding monotonic values
      final int longsSize;

      final BlockTreeTermsReader parent;

      final FST index;
      }
      ```

      倒排链文件

      文件后缀:.doc, .pos, .pay

      .doc保存了每个term的doc id列表和term在doc中的词频

      全文索引的字段,会有.pos文件,保存了term在doc中的位置

      全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

      文件示例


      elasticsearch_store_doc.png




      具体实现类

      ```
      /**

    • Concrete class that reads docId(maybe frq,pos,offset,payloads) list
    • with postings format.
      *
    • @lucene.experimental
      */
      public final class Lucene50PostingsReader extends PostingsReaderBase {
      private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
      private final IndexInput docIn;
      private final IndexInput posIn;
      private final IndexInput payIn;
      final ForUtil forUtil;
      private int version;

      //不分词的字段使用的是该对象,基于skiplist实现了倒排链
      final class BlockDocsEnum extends PostingsEnum {
      }

      //全文检索字段使用的是该对象
      final class BlockPostingsEnum extends PostingsEnum {
      }

      //包含高级特性的字段使用的是该对象
      final class EverythingEnum extends PostingsEnum {
      }
      }
      ```

      列存文件(docvalues)

      文件后缀:.dvm, .dvd

      索引文件为.dvm,数据文件为.dvd。

      lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

    其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

    文件示例


    elasticsearch_store_dvd.png




    具体实现类

    <br /> /** reader for {@link Lucene54DocValuesFormat} */<br /> final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {<br /> //number类型的field的列存列表<br /> private final Map<String,NumericEntry> numerics = new HashMap<>();<br /> <br /> //字符串类型的field的列存列表<br /> private final Map<String,BinaryEntry> binaries = new HashMap<>();<br /> <br /> //有序字符串类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();<br /> <br /> //有序number类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();<br /> <br /> //字符串类型的field的ords列表<br /> private final Map<String,NumericEntry> ords = new HashMap<>();<br /> <br /> //docId -> address -> ord 中field的ords列表<br /> private final Map<String,NumericEntry> ordIndexes = new HashMap<>();<br /> <br /> //field的数量<br /> private final int numFields;<br /> <br /> //内存使用量<br /> private final AtomicLong ramBytesUsed;<br /> <br /> //数据源的文件句柄<br /> private final IndexInput data;<br /> <br /> //文档数<br /> private final int maxDoc;<br /> // memory-resident structures<br /> private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();<br /> private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();<br /> private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();<br /> <br /> //是否正在merge<br /> private final boolean merging;<br /> }<br /> <br /> /** metadata entry for a numeric docvalues field */<br /> static class NumericEntry {<br /> private NumericEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> <br /> /** offset to the actual numeric values */<br /> //field的在数据文件中的起始地址<br /> public long offset;<br /> <br /> /** end offset to the actual numeric values */<br /> //field的在数据文件中的结尾地址<br /> public long endOffset;<br /> <br /> /** bits per value used to pack the numeric values */<br /> public int bitsPerValue;<br /> <br /> //format类型<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> /** monotonic meta */<br /> public DirectMonotonicReader.Meta monotonicMeta;<br /> <br /> //最小的value<br /> long minValue;<br /> <br /> //Compressed by computing the GCD<br /> long gcd;<br /> <br /> //Compressed by giving IDs to unique values.<br /> long table[];<br /> /** for sparse compression */<br /> long numDocsWithValue;<br /> NumericEntry nonMissingValues;<br /> NumberType numberType;<br /> }<br /> <br /> /** metadata entry for a binary docvalues field */<br /> static class BinaryEntry {<br /> private BinaryEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> /** offset to the actual binary values */<br /> //field的在数据文件中的起始地址<br /> long offset;<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> <br /> //最短字符串的长度<br /> int minLength;<br /> <br /> //最长字符串的长度<br /> int maxLength;<br /> /** offset to the addressing data that maps a value to its slice of the byte[] */<br /> public long addressesOffset, addressesEndOffset;<br /> /** meta data for addresses */<br /> public DirectMonotonicReader.Meta addressesMeta;<br /> /** offset to the reverse index */<br /> public long reverseIndexOffset;<br /> /** packed ints version used to encode addressing information */<br /> public int packedIntsVersion;<br /> /** packed ints blocksize */<br /> public int blockSize;<br /> }<br />

    参考资料


    [lucene source code](https://github.com/apache/luce ... /5.5.0)

    [lucene document](https://lucene.apache.org/core/5_5_0/)

    [lucene字典实现原理——FST](http://www.cnblogs.com/bonelee/p/6226185.html )

Day 6 - Logstash Pipeline-to-Pipeline 尝鲜

rockybean 发表了文章 • 2 个评论 • 213 次浏览 • 4 天前 • 来自相关话题

Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

![](https://ws1.sinaimg.cn/large/6 ... xr.jpg)

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml中定义好即可,如下所示:

```yaml

  • pipeline.id: apache
    pipeline.batch.size: 125
    queue.type: persisted
    path.config: "/path/to/config/apache.cfg"
  • pipeline.id: nginx
    path.config: "/path/to/config/nginx.cfg"
    ``<br /> <br /> 其中apachenginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。<br /> <br /> 在 6.3 版本中,Logstash 又增加了Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令|`的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。



    1. 上手


    废话少说,快速上手。修改 config/pipeline.yml文件如下:

    ```yaml

    • pipeline.id: upstream
      config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
    • pipeline.id: downstream
      config.string: input { pipeline { address => test_output } } output{ stdout{}}
      ``<br /> <br /> <br /> <br /> 然后运行 logstash,其中-r` 表示配置文件有改动时自动重新加载,方便我们调试。

      bin/logstash -r

      在终端随意输入字符(比如aaa)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。

      json<br /> {<br /> "@timestamp" => 2018-12-06T14:43:50.310Z,<br /> "@version" => "1",<br /> "message" => "aaa",<br /> "host" => "rockybean-MacBook-Pro.local"<br /> }<br />

      我们再回头看下这个配置,upstreamoutput 使用了名为 pipeline 的 plugin,然后 send_to的输出对象test_output是在 downstreaminput pipeline plugin 中定义的。通过这个唯一的address(虚拟地址)就能够把不同的 pipeline 连接在一起组成一个更长的pipeline来处理数据。类似下图所示:

      ![](https://ws1.sinaimg.cn/large/6 ... y9.jpg)



      当数据由 upstream传递给 downstream时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。

      2. 使用场景


      使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。

      2.1 Distributor Pattern 分发者模式


      该模式执行效果类似下图所示:

      ![](https://ws1.sinaimg.cn/large/6 ... l1.jpg)

      在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline的方式,我们如果轻松做到一个端口处理多个来源的数据呢?

      这种模式的参考配置如下所示:

      ```yaml

      config/pipelines.yml

  • pipeline.id: beats-server
    config.string: |
    input { beats { port => 5044 } }
    output {
    if [type] == apache {
    pipeline { send_to => weblogs }
    } else if [type] == system {
    pipeline { send_to => syslog }
    } else {
    pipeline { send_to => fallback }
    }
    }
  • pipeline.id: weblog-processing
    config.string: |
    input { pipeline { address => weblogs } }
    filter {

    Weblog filter statements here...

    }
    output {
    elasticsearch { hosts => [es_cluster_a_host] }
    }

  • pipeline.id: syslog-processing
    config.string: |
    input { pipeline { address => syslog } }
    filter {

    Syslog filter statements here...

    }
    output {
    elasticsearch { hosts => [es_cluster_b_host] }
    }

  • pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }
    ```



    2.2 Output Isolator Pattern 输出隔离模式


    虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

    ![](https://ws1.sinaimg.cn/large/6 ... 8p.jpg)

    通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:

    ```yaml

    config/pipelines.yml

  • pipeline.id: intake
    queue.type: persisted
    config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
  • pipeline.id: buffered-es
    queue.type: persisted
    config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
  • pipeline.id: buffered-http
    queue.type: persisted
    config.string: |
    input { pipeline { address => http } }
    output { http { } }
    ```

    2.3 Forked Path Pattern 克隆路径模式


    这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:

    ```

    config/pipelines.yml

  • pipeline.id: intake
    queue.type: persisted
    config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
  • pipeline.id: buffered-es
    queue.type: persisted
    config.string: |
    input { pipeline { address => "internal-es" } }

    Index the full event

    output { elasticsearch { } }

  • pipeline.id: partner
    queue.type: persisted
    config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {

    Remove the sensitive data

    mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket
    ```

    2.4 Collector Pattern 收集者模式


    从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

    ![](https://ws1.sinaimg.cn/large/6 ... 1y.jpg)

    其配置参考如下:

    ```

    config/pipelines.yml

  • pipeline.id: beats
    config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
  • pipeline.id: kafka
    config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
  • pipeline.id: partner

    This common pipeline enforces the same logic whether data comes from Kafka or Beats

    config.string: |
    input { pipeline { address => commonOut } }
    filter {

    Always remove sensitive data from all input sources

    mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }
    ```



    3. 总结


    本文简单给大家讲解了 Pipeline-to-Pipeline的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!

Day 5 - Elasticsearch 存储设备全解析

cyberdak 发表了文章 • 0 个评论 • 264 次浏览 • 5 天前 • 来自相关话题

day5 - es存储设备全解析

Elastic Search 作为一个分布式系统,它的最小单元(shard)实现基于 lucene , lucene是一个io密集cpu密集的系统。cpu密集可以通过使用更多核,更快的cpu以及优化算法来解决。而io密集部分需要搭配高性能的存储设备以及存储策略来解决。

传统的服务器硬盘分为SATA,SAS硬盘以及现在最高性能的SSD硬盘,其中SSD硬盘又分为 SATA SSD,PCI-E SSD ,M.2 SSD(性能依次提升)。

两者的区别在于 SATA 最高可以提供 7200转的。著名的HADOOP集群中,一半都会选择企业级SATA盘来降低存储成本。而SATA盘容易损坏以及恢复速度的问题,则交给10g高速网卡以及三副本策略来解决。

如果是了解数据库领域的同学就会知道,MySQL 之类的数据库严重推荐使用SSD来做存储。TiDB这种新时代的分布式数据库甚至在安装过程中会见存储是否是高性能设备,当时低速设备时,安装将失败。

如何查看io压力


iostat -x 1 100

可以根据 iowait , ioutil 等值来综合判断. 当iowait长期接近100%基本代表io系统出现瓶颈了。这时候可以用iotop命令来诊断出具体是什么进程在消耗io资源。


如何测试硬盘性能


通过 fio 测试 顺序读/写,随机读/写性能。

顺序读
fio -name iops -rw=read -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1
随机读
fio -name iops -rw=randread -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1
顺序写
fio -name iops -rw=write -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1
随机写
fio -name iops -rw=randwrite -bs=4k -runtime=60 -iodepth 32 -filename /dev/sda -ioengine libaio -direct=1

更具体的测试可以参考[磁盘性能指标--IOPS、吞吐量及测试](http://blog.51cto.com/wushank/1708168)

RAID


RAID 0


将数据分布在N块盘中,速度最快,可以享受磁盘的并行读取和写入;安全性最低,一块盘损坏,将导致所有数据丢失。

![raid0.png](https://i.loli.net/2018/12/04/5c0673f515db4.png)

RAID 1


将数据同时保存在N块盘中,写入速度最慢(需要同时写多块盘)。安全性最高。

![raid1.png](https://i.loli.net/2018/12/04/5c0673f411bdd.png)


RAID 10 💗


将RAID 1 和 RAID 0 结合起来,获得高安全性和高性能。最常用的RAID策略。同时也是TiDB,MySQL等数据库推荐的RAID策略。

![raid10.png](https://i.loli.net/2018/12/04/5c0673f475d50.png)

RAID 5


RAID 5 最低三块盘,存储数据的异或编码,在一块盘损坏时,可以提供编码恢复出数据。

![raid5.png](https://i.loli.net/2018/12/04/5c0673f454d91.png)

ElasticSearch 使用低速设备的 Tips


修改index.merge.scheduler.max_thread_count参数为1;该参数影响lucene后台的合并线程数量,默认设置只适合SDD。多个合并线程可能导致io压力过大,触发 (linux 120s timeout)[https://cyberdak.github.io/es/ ... -down].


存储策略


  1. 避免单机存储过多数据,如果单机故障,将导致集群需要大量数据,影响集群的吞吐量,特别是发生在高峰时候更会影响业务。千兆网卡每小时可以同步的数据为463gb,可以参考这个速度结合资深集群网卡以及存储来调节每个节点存储的数据量。
  2. 存储有条件使用RAID10,增加单节点性能以及避免单节点存储故障

    RAID卡策略


    根据服务器RAID卡的等级不同,高级的RAID卡可以使用 write-back 写策略,数据写入会直接写入到缓存中,随后刷新到硬盘上。当主机掉电时,由RAID卡带的电池来保证数据成功写入到硬盘中。write back的设置需要电池有电才能支持,而某些场景可以设置为force write-back(即使电池没电了,也要写缓存),从而提高写入性能。

Day 4 - PB级规模数据的Elasticsearch分库分表实践

ouyangchucai 发表了文章 • 0 个评论 • 591 次浏览 • 6 天前 • 来自相关话题

从2018年7月在开始在某阿里云数据中心部署Elasticsearch软件,到2018年12月共创建了15个集群,服务于客户的文档检索、交通视频检索、地理信息检索、日志安全审计等业务。其中数据规模最大的一个业务,共有800张表,7万亿条数据,每天新增500亿条记录,数据要求存储半年,单条记录大小1KB左右,存储规模约10PB,需要支持1000并发查询。



一、数据存储空间规划。

数据中心能用于搭建Elasticsearch集群的SSD盘共700TB,SATA盘共50PB。根据业务类型、时间范围划分热数据和冷数据,一部分重要数据存储在SSD盘的热数据集群,其它数据存储在SATA盘的冷数据集群。热数据集群主要存储各类实体信息,包括人员、物品、事件、地址、组织数据,以及最新轨迹数据。冷数据集群主要存储历史轨迹信息。热数据和冷数据按照业务拆分多个小集群,每个集群规模保持在50个节点左右,单个集群最大不超过200个节点。利用阿里云平台弹性伸缩的能力,每个Elasticsearch集群可以先从小规模创建,根据资源使用情况来弹性扩展节点规模。

Elasticsearch集群节点配置

pb001.jpg





二、索引设计。

1.索引别名(alias)。每类数据根据数据源表名建立索引(index),索引中只包含一个类型(type)。配置索引别名(alias),业务上根据别名写入、查询数据,索引重建等数据维护操作可以通过别名切换对业务透明。

2.按时间分表。轨迹类数据按时间(日/月)拆分,每个索引存储数据量保持在1TB(10亿)左右,索引名带上日期/月份后缀,拆分后的索引配置别名区分冷热数据。配置索引模板,指定索引分片数和副本数、字段类型、分词器。配置Linux crontab定时任务,通过shell脚本创建索引。

3.分片(shard)设置。索引按照单个分片10-40GB数据大小设计分片数,数据量少于10GB(1000万)的索引设置1个分片即可,数据量大于1TB(10亿)的索引设置分片数为集群节点数整数倍(例如50个节点的集群配置50个分片)。

4.副本(replica)设置。数据首次批量导入时索引副本数设置为0,快速写入数据。生产环境索引副本数设置为1,避免集群节点故障数据丢失。



三、索引mapping设计。

1.精心设计索引字段类型。在开发环境配置Elasticsearch允许自动创建索引,从数据源每张表取1000条记录批量写入Elasticsearch,自动创建索引mapping,然后再根据业务需要修改mapping配置合适的字段类型,指定字段索引分词器、是否存储、是否索引、是否合并至全文检索字段。 对于数据量大的表尤其要精心设计字段类型,尽量减少索引存储空间占用。在生产环境中建议配置不允许自动创建索引。

2.配置全文检索字段。如果业务需要全文检索,可以配置开启全文字段,同时需要占用更多存储空间;如果业务上只是按字段查询,可以配置禁用全文字段,减少存储空间。Elasticsearch5.X及之前的版本默认启用_all字段,合并所有字段的值。Elasticsearch6.X及之后的版本默认禁用_all字段,可以通过copy_to将多个字段值合并成一个全文字段。对于数据查全率要求高的业务场景,建议对全文字段配置cjk分词器(Elasticsearch和Lucene中自带,对中日韩文进行二元分词的分词器)。

3.通用字段统一命名。各个索引中的姓名、证件号码、时间(开始时间、结束时间)、地点(始发地、目的地)等常用字段统一命名。用户指定证件号、时间范围等精确字段查询条件时,可以使用统一的查询条件并行查询多个索引。



四、分词设置。

1.选择合适的分词器。Elasticsearch中内置了很多分词器:standard、cjk、nGram等,也可以安装ik、pinyin等开源分词器, 可以根据业务场景选择合适的分词器。
常用分词器:
standard:Elasticsearch默认分词,英文按空格切分,中文按单个汉字切分。
cjk:根据二元索引(两个相邻的字作为一个词条)对中日韩文分词,可以保证查全率。
NGram:可以将英文按照字母切分,结合Elasticsearch的短语搜索(match_phrase)使用。
ik:比较热门的中文分词,能按照中文语义切分,可以自定义词典。
pinyin:可以让用户输入拼音,就能查找到相关的关键词。
对于查全率要求较高的场景,建议使用cjk分词,同时能支持比较快的响应速度。对于查准率要求较高的场景,建议使用ik分词。

CJK分词和IK分词对比(测试环境:Elasticsearch5.5.3,8335万条人员档案信息,10节点集群,单节点16核CPU、64G内存、2T SSD盘,1个线程批量写入,1个并发查询)

pb002.jpg



测试分词效果:
curl -XPOST "<a href="http://localhost:9200/_analyze"" rel="nofollow" target="_blank">http://localhost:9200/_analyze" -H 'Content-Type: application/json' -d'
{
"analyzer": "ik_max_word",
"text": "南京市长江大桥"
}'

2.NGram分词。对于像车牌号之类数字和字母连在一起的字符,默认会被切成一个完整词条,但是业务上又需要支持前缀、后缀模糊匹配,可以根据业务需求进行分词。车牌号建议增加一个分词字段,配置NGram分词器,切分1元至7元的组合。身份证号码建议增加分词字段,根据业务需要切分18位完整词条、前2位(省)、前4位(省市)、前6位(省市区县)、后4位、出生年月日、出生年份、出生年月、出生月日等组合。

3.单字分词。对于像姓名类字段,业务上需要支持完整匹配,又需要支持单字查询。可以配置1个keyword字段(不分词);1个text字段(分词),分词器选择Elasticsearch默认分词器standard,按单个汉字切分。




五、数据写入策略。

1.批量离线数据导入。各类业务数据源主要在数据仓库MaxCompute(原ODPS),为了把表数据从MaxCompute表导入到ElasticSearch集群中, 我们基于MaxCompute MapReduce开发了MaxCompute到ElasticSearch的数据导出作业,通过简单的配置就可以把数据导入到ElasticSearch中。
数据源在关系数据库RDS或者NoSQL的数据,可以通过配置DataWorks(dataX企业版)导入Elasticsearch集群。

2.实时数据导入。实时数据源主要是流式数据服务DataHub,
配置DataHub任务即可同步至Elasticsearch集群。也可以自己开发程序调用DataHub的SDK获取实时数据,经过业务处理后,调用ES Rest Client SDK批量写入Elasticsearch。

3.冷热数据自动迁移。轨迹类实时数据默认先写入热数据集群(SSD盘Elasticsearch集群),对于热数据集群过期的索引(例如1个月前的索引)需要迁移到冷数据集群(SATA盘Elasticsearch)。为了实现数据跨集群迁移,我们开发了snapshot插件将索引备份到对象存储服务OSS或分布式文件系统盘古。配置定时任务,将热数据集群索引备份后,从冷数据集群恢复,然后再删除热集群中的过期索引,保持热数据集群只存储较小规模数据。冷数据集群的索引如果超过半年,则关闭索引,减少JVM堆内存占用。

4.配置索引主键字段。为了保证Elasticsearch集群和数据源记录的一致性,建议所有索引配置主键字段,而不是让Elasticsearch自动生成主键。配置数据业务主键字段作为Elasticsearch主键字段。如果没有主键字段,则将原始数据能确定记录惟一性的几个字段合并为主键,或者将所有字段值合并起来计算MD5值作为主键。

5.配置写入路由。如果业务上需要经常根据某个字段查询,例如用户ID、车牌号等的字段,写入时可以指定路由字段。

6.写入参数调优。调整数据写入任务参数,避免写入操作占用过多磁盘IO和CPU。使用批量请求,配置合理的写入线程数,调大索引刷新时间间隔refresh interval,调整事务日志translog同步策略。




六、数据查询策略。

1.冷热库异步查询。用户输入关键词查询时,优先从热数据集群查询,有结果立即返回,并估算命中记录条数。热数据集群命中结果集不足时,再查询冷数据集群。

2.跨集群搜索。业务上需要多个Elasticsearch集群一起参与检索时,可以通过Cross Cluster Search同时对多个集群发起检索请求合并检索结果。单独创建一个5节点的Cross Cluster,设置远程集群节点信息,用于跨集群搜索,不存储业务数据。

3.快速返回和超时设置。查询请求中设置参数teminate_after指定每个分片(shard)最多匹配N条记录后返回(例如10000),设置查询超时时间timeout(例如10s),避免查询一些宽泛的条件时耗费过多系统资源。

4.查询语法解析。解析用户查询条件,识别用户的查询类型,例如用户输入车牌号、证件号、年龄段等条件时,查询条件改写为字段精确匹配,无法识别的查询条件默认从全文字段匹配。

5.查询条件调优。查询结果不需要相关度排序时使用过滤器(filter),尽量使用路由(routing),设置较少的查询读取记录条数和字段,避免前缀模糊匹配,设置search_after规避深度翻页性能问题。



七、数据写入、查询性能测试。

SSD盘集群写入性能测试(测试环境:Elasticsearch6.3.2集群,单节点16核CPU、64G内存、2T SSD盘,写入10亿条记录,单条记录1KB,副本数为0,1台写入服务器):

pb003.jpg



SSD盘集群查询性能测试

pb004.jpg



SATA盘集群写入性能测试(测试环境:Elasticsearch5.5.3集群,单节点56核CPU、128G内存、12块 6T SATA盘,分别写入1亿、3亿、5亿、30亿、300亿条记录,单条记录1KB,0副本,50台写入服务器):

pb005.jpg



SATA盘集群查询性能测试

pb006.jpg




参考文档:

  1. 阿里云Elasticsearch帮助文档 https://help.aliyun.com/product/57736.html
  2. Elasticsearch参考
    https://www.elastic.co/guide/e ... .html
  3. 《Elasticsearch: 权威指南》
    https://www.elastic.co/guide/c ... .html
  4. 《深入理解Elasticsearch》https://detail.tmall.com/item.htm?id=551001166567
  5. 《死磕Elasticsearch方法论》https://blog.csdn.net/laoyang3 ... 93493
  6. Elasticsearch索引别名和零停机
    https://www.elastic.co/guide/c ... .html
  7. Elasticsearch自动按天创建索引脚本
    https://blog.csdn.net/reblue52 ... 53317
  8. Elasticsearch NGram分词器
    https://www.elastic.co/guide/e ... .html
  9. Elasticsearch开源权限管理认证插件Search Guard
    https://github.com/floragunncom/search-guard
  10. Elasticsearch开源可视化管理插件cerebro
    https://github.com/lmenezes/cerebro
  11. Elasticsearch开源SQL插件 https://github.com/NLPchina/elasticsearch-sql
  12. Elasticsearch快照及恢复 https://help.aliyun.com/document_detail/65675.html

    Elasticsearch技术交流钉钉群
    dingdingpng.png






Day 3 - kibana二次开发tips

vvv 发表了文章 • 1 个评论 • 240 次浏览 • 2018-12-03 09:11 • 来自相关话题

介绍


大家好,我是vvv,这个名字和王者荣耀AG超玩会中的vv没有一毛钱关系,随意取的一个的名字,后来发现貌似不能改了。做过一些数据产品,正是这段时间里开始接触elasticstack,对kibana做过一些二次开发。今天主要想写一些开发过程中的一些tips,希望可以给大家带来一些帮助。


技术栈分析


既然我们的主题是kibana,我们先来看下kibana的主要技术栈。很早开始kibana就开始基于nodejs (hapi框架) + angular(1.x)来进行开发,后来引入了react。kibana本身的代码也经过了多次重构剥离。现在的kibana的代码结构更加清晰


前提

elasticstack发展迅速,现在已经是6.5版本了。我们今天要介绍的是6.x系列的版本,6.x各个版本会有一些细微差异,但大致一样


tips


官方提供kibana下载版本主要是编译后的release版本。如果要基于kibana做二次开发,我们需要去https://github.com/elastic/kibana 上面下载对应的分支。官方有相应的文档去说明如何安装开发环境。我这里有一些tips:

设置国内yarn源

<br /> yarn config --global set 'registry <a href="https://registry.npm.taobao.or" rel="nofollow" target="_blank">https://registry.npm.taobao.or</a>g'<br />

一些耗时需要编译的包可以全局安装

<br /> yarn global add node-sass<br />

多环境nodejs版本

<br /> 不同kibana版本对nodejs版本要求也不一样,为了减少坑我们通常和官方要求的保持一致,如果你的电脑上需要运行多套不同版本的nodejs,那么你可能需要zsh + nvs, 会根据根目录的.node-version版本自动切换当前使用的node版本<br />

IDE推荐

<br /> 推荐使用vscode,轻量免费,支持很多插件。可以安装个prettier插件,帮助对代码做更好的格式化<br />

debug

如果你用的不是上面我推荐的vscode的话,请忽略这一条。对于使用vscode的同学,首先在vsocde的设置里面开启:

<br /> "autoAttach": "on"<br />

然后在vsocode里面打开一个终端,输入:

<br /> node --inspect-brk scripts/kibana.js --dev --oss --no-base-path<br />
这个时候vscode就会在启动kibana dev模式的同时attach一个进程进去用于断点调试,对于调试node层非常方便。也能帮助你更好的阅读kibana源码

本地es

我们知道kibana是长在es之上,想要运行kibana怎么少得了es。kibana又一些命令命令可以快速的启动一个es环境:

下载并启动一个当前kibana需要的es版本
<br /> yarn es snapshot<br />

灌入一些测试数据(如果需要定制灌入的数据可以看下这个脚本的帮助内容,加-h参数即可)
<br /> node scripts/makelogs<br />

编译

kibana代码在release之前是要进行编译的。kibana提供了方便的命令行进行编译,甚至跨平台的交叉编译(生成不同平台的kibana release版本)。但是呢,有几个问题:

  1. kibana在编译的时候需要去aws上下载一些安装包,会导致正常情况下国内访问十分缓慢。(编译命令提供了几个参数可以关掉下载一些如nodejs等,但是还是很慢)
  2. build十分消耗cpu/gpu (mac的iterm2启动会做gpu优化)

    解决办法:

  3. 如果你能解决网络问题,而且有性能不错的编译机器。这都不是问题
  4. 如果你对kibana的代码更改都是无侵入的(比如只是写了一些插件扩展),那么你可以去官方下载他们的snapshot版本
  5. 当然,如果你用的kibana版本就是release版本并且你的扩展都是插件,那么你直接用官方的release版本就好了

    库的选型

  6. server端:
    nodejs具有十分丰富的生态,你可以找到很多nodejs相关的库。kibana本身的后端web框架是基于node的hapi的。hapi是一个沃尔玛团队维护的企业级框架,其本身也有很多扩展。当你需要对web框架做一些扩展的时候,可以优先想到去hapi官方看下

  7. ui端:
    kibana有一套漂亮的ui,其本身也是单独剥离成了一个库,方便引入使用。当然你也可以引入一些其他的前端库来满足你的具体业务需求。推荐尽量使用原生的eui和kibana源码里面的一些封装。这样让你的引入更少,更容易维护。


    国际化

    国际化是kibana很早就开始立的一个项。这块的进度也是越来越快。新版的kibana里面用@kbn/i18n这个package来统一javascript,html, nodejs做国际化的事情(具体大家可以看下这个package的readme)。国际化这块有一些建议:
  8. 扩展插件的时候养成国际化的习惯
  9. 默认的语系不建议再次设置成一个json文件。因为最新的@kbn/i18n会提供一个默认的文本,用于默认情况下展示。所以我们是没必要重复去维护一个默认的语言翻译json
  10. 假设你的默认语言是英文(和kibana一致),只有当你想要替换kibana默认翻译的时候,才去覆写en.json
  11. 当你对原生kibana有国际化这种需求的时候,建议独立出一个i18n翻译的插件去维护各个语言翻译相关的东西
  12. 目前kibana的国际化还未100%,如果你想知道目前哪些文本内容是支持国际化的。可以尝试如下脚本:

    <br /> node scripts/extract_default_translations \<br /> --path src/core_plugins/kibana \<br /> --output /tmp<br />
  13. 各个插件的之间的翻译文件独立,即使是相同的翻译内容。插件文本内容养成预留国际化的习惯



    总结


    上面列举了一些我平时的一些经验。时间篇幅有限,未能一一列举。希望可以帮到大家吧。也希望可以和大家多多交流

Day 2 - ES 6.x拼音分词高亮爬坑记

abia 发表了文章 • 8 个评论 • 278 次浏览 • 2018-12-02 16:29 • 来自相关话题

大家好,我是来自尚德机构ES平台的负责人,白凡,今天为大家分享一些在6.x版本中拼音分词高亮问题爬坑的心路历程~,其实问题不复杂,主要介绍下思路。

首先简单讲下背景~可能在很多公司的很多部门,都有使用到ES。包括在尚德,由于很多部门的业务都涉及到ES,于是我们为了统一管理及维护,专门成立了ES平台部门,主要扮演的是类似于op dba角色,帮助业务部门部署维护ES集群,并根据业务需求提供解决方案。当然,除此之外,我们也会在公司内部推荐业务方去尝试除了日志和搜索以外的应用场景,比如分布式计算存储、监控、安全等方面。毕竟ES相比于其他组建,搭建部署更加方便,更轻量级,查询方式更丰富。所以,现如今在尚德机构,ES平台不仅用于了传统的日志和搜索领域,也在分布式数据存储和计算方面有很多应用。当然,这里只是为大家提供一些ES应用场景及其团队构建的思路。主要还是ES这个工具确实好用。

广告先做到这,回到正文。所以,前段日子,我们接收了一个新的业务部门需求,大致是:他们之前使用的自己搭建ES 2.x集群,现在接入到我们6.x的平台上来。我们帮忙设计了mapping,数据写入及同步方案之后,数据就慢慢接入进来。但问题也随即出现,原来在2.x上使用正常的拼音高亮mapping,在6.x上只能检索但无法高亮了?

2.x field如下:
"index" : {
"analysis" : {
"analyzer" : {
"pinyin_analyzer" : {
"tokenizer" : "my_pinyin"
}
},
"tokenizer" : {
"my_pinyin" : {
"type" : "pinyin",
"keep_full_pinyin" : false,
"limit_first_letter_length" : 16,
"lowercase" : true,
"remove_duplicated_term":true,
"keep_first_letter":true,
"keep_separate_first_letter" :true
}
}
}
}
POST /medcl/doc/_mapping
{
"properties": {
"name":{
"analyzer": "pinyin_analyzer",
"type": "string"
}
}
}

可以从上面例子看出,这个analyzer并没有问题,但是在搜索时,能得到结果,却无法高亮,即分词结果中start_offset及end_offset为0,这个如何解决呢?

回到medcl的拼音分词项目:
https://github.com/medcl/elast ... inyin
其中,有个配置项引起了我们的注意:

图片1.png


没跑了,应该是要将这个参数设置为false。
并且查看了源码,在PinyinTokenizer这个类下面,看到了这一行:

图片2.png


确定了我们的思路,于是乎,在tokenizer中将此参数设为false,如下:
"tokenizer" : {
"my_pinyin" : {
"type" : "pinyin",
"keep_full_pinyin" : true,
"keep_original" : false,
"limit_first_letter_length" : 16,
"lowercase" : true,
"remove_duplicated_term":true,
"ignore_pinyin_offset": false,
"keep_first_letter":true,
"keep_separate_first_letter" :true
}
}

写入一条数据,高亮没问题了,问题“看似”解决了。
当然,没有那么简单。因为在批量写入一部分数据后,总会报如下异常:
startOffset must be non-negative, and endOffset must be >= startOffset
这个异常,导致数据无法写入集群。
这里又是为什么呢?
这个问题,我也搞了一段时间,始终没找到很好的解决方案,此处只能先@medcl。
只是猜测在end()或者reset()方法内,需要lastOffset置0或者offsetAtt清空。但尝试了几次,依然报错。。。

这就比较头疼了,不过好在条条道路通罗马。在某次蹲坑过程中,灵感如尿崩。

如果Tokenizer解决不了,为何不仅用filter就行了呢?可以先用其他分词器,按我们业务的需求进行分词,再用filter,将分词过滤为拼音呢?

大致思路如下:
目前我们这个业务,需要如对于“尚德机构”这个词,搜索“shang”,“shangde”,“deji”时,能返回结果并高亮。
所以我们先用ngram分词,将“尚德机构”这个词分为“尚”,“尚德”,“徳机”,“德机构”等等。。
再用pinyin filter将各分词过滤为拼音,即“shang”,“shangde”,“deji”等。
并在搜索时,采用standard分词。
Mapping如下:
{
"settings": {
"analysis": {
"analyzer": {
"pinyin_analyzer": {
"tokenizer": "my_ngram",
"filter": [
"pinyin_filter"
]
}
},
"tokenizer": {
"my_ngram": {
"type": "ngram",
"min_gram": 1,
"max_gram": 50,
"token_chars": [
"letter",
"digit",
"punctuation",
"symbol"
]
}
},
"filter": {
"pinyin_filter": {
"type": "pinyin",
"keep_full_pinyin": false,
"keep_joined_full_pinyin": true,
"keep_none_chinese_in_joined_full_pinyin": true,
"none_chinese_pinyin_tokenize": false,
"remove_duplicated_term": true
}
}
}
},
"mappings": {
"abia321": {
"properties": {
"name": {
"type": "text",
"analyzer": "pinyin_analyzer",
"search_analyzer": "standard",
"term_vector": "with_positions_offsets"
}
}
}
}
}
最后,高亮问题解决,数据写入问题同样解决。
当然有朋友肯定还会需要搜索拼音首字母进行搜索,如搜“s”,“sd”,“dj”,也返回结果。
其实,只需要再专门设置个field,并调整pinyin filter参数,
搜索时用bool查询,逻辑should查询,同时对完整拼音field和拼音首字母field进行搜索即可。
在此就不做过多赘述。

当然,这里仅仅只是提供一种ES在选择analyzer,tokenizer,filter解决需求的思路。拼音分词这个问题,还是需要等待后续修复

最后,这里有较为完整的issue:
https://github.com/medcl/elast ... s/169

Day 1 - ELK 使用小技巧(第 3 期)

rochy 发表了文章 • 1 个评论 • 495 次浏览 • 2018-12-01 02:28 • 来自相关话题

ELK Tips 主要介绍一些 ELK 使用过程中的小技巧,内容主要来源为 Elastic 中文社区。

一、Logstash

1、Filebeat 设置多个 output

在 6.0 之前,Filebeat 可以设置多个输出(必须是不同类型的输出);从 6.0 开始已经禁止多输出了,只能拥有一个输出,如果想实现多输出,可以借助 logstash 等中间组件进行输出分发。


二、Elasticsearch

1、ES 用户占用的内存大于为 ES 设置的 heapsize

ES 是 Java 应用,底层存储引擎是基于 Lucene 的,heapsize 设置的是 Java 应用的内存;而 Lucene 建立倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的,因此 Lucene 也会占用一部分内存。

https://elasticsearch.cn/article/32

2、ES 使用别名插入数据

ES 可以通过索引的方式向索引插入数据,但是同时只能有一个索引可以被写入,而且需要手动设置,未设置的情况下会报错:no write index is defined for alias [xxxx], The write index may be explicitly disabled using is_write_index=false or the alias points to multiple indices without one being designated as a write index。
<br /> POST /_aliases<br /> {<br /> "actions" : [<br /> {<br /> "add" : {<br /> "index" : "test",<br /> "alias" : "alias1",<br /> "is_write_index" : true<br /> }<br /> }<br /> ]<br /> }<br />

3、ES 设置 G1 垃圾回收

修改 jvm.options文件,将下面几行:
<br /> -XX:+UseConcMarkSweepGC<br /> -XX:CMSInitiatingOccupancyFraction=75<br /> -XX:+UseCMSInitiatingOccupancyOnly<br />
改为
<br /> -XX:+UseG1GC<br /> -XX:MaxGCPauseMillis=50<br />
即可。

其中 -XX:MaxGCPauseMillis 是控制预期的最高 GC 时长,默认值为 200ms,如果线上业务特性对于 GC 停顿非常敏感,可以适当设置低一些。但是这个值如果设置过小,可能会带来比较高的 cpu 消耗。

4、ES 和 Zipkin 集成时设置验证信息

<br /> java -DKAFKA_ZOOKEEPER=10.14.123.117:2181 <br /> -DSTORAGE_TYPE=elasticsearch <br /> -DES_HOSTS=http://10.14.125.5:9200 <br /> ES_USERNAME=xxx ES_PASSWORD=xxx <br /> -jar zipkin.jar<br />

5、ES 集群部署报错

问题 1 报错信息如下:
<br /> Received message from unsupported version:[2.0.0] minimal compatible version is:[5.6.0]<br />
经排查是集群中存在低版本的 ES 实例,将低版本实例移除即可。

问题 2 报错信息如下:
<br /> with the same id but is a different node instance<br />
删除对应节点 elsticsearch 文件夹下的 data 文件夹下的节点数据即可。

6、海量中文分词插件

海量分词是天津海量信息技术股份有限公司自主研发的中文分词核心,经测试分词效果还是不错的,值得一试。

https://github.com/HylandaOpen ... hlseg

7、查询一个索引下的所有 type 名

通过下面的 API,即可获取全部的 type,下面的例子中 doc 就是 indexName 索引下的一个 type:
<br /> GET <a href="http://es127.0.0.1:9200/indexName/_mappings" rel="nofollow" target="_blank">http://es127.0.0.1:9200/indexName/_mappings</a><br /> -----------------------------------------------<br /> {<br /> indexName: - {<br /> mappings: - {<br /> doc: - {<br /> _all: + {... },<br /> dynamic_date_formats: + [... ],<br /> dynamic_templates: + [... ],<br /> properties: + {... }<br /> }<br /> }<br /> }<br /> }<br />

8、索引模板中根据字段值设置别名

设置索引模板的时候,别名可以使用 Query 条件来进行匹配。
<br /> PUT _template/template_1<br /> {<br /> "index_patterns" : ["te*"],<br /> "settings" : {<br /> "number_of_shards" : 1<br /> },<br /> "aliases" : {<br /> "alias2" : {<br /> "filter" : {<br /> "term" : {"user" : "kimchy" }<br /> },<br /> "routing" : "kimchy"<br /> },<br /> "{index}-alias" : {} <br /> }<br /> }<br />

9、索引模板设置默认时间匹配格式

ES 默认是不会将 yyyy-MM-dd HH:mm:ss 识别为时间的,可以通过在索引模板进行如下设置实现多种时间格式的识别:
<br /> "mappings": {<br /> "doc": {<br /> "dynamic_date_formats": ["yyyy-MM-dd HH:mm:ss||strict_date_optional_time||epoch_millis"],<br />

10、ES 中 Merge 相关设置

Merge 是非常耗费 CPU 的操作;而且如果不是 SSD 的话,推荐将 index.merge.scheduler.max_thread_count 设置为 1;否则 ES 会启动 Math.min(3, Runtime.getRuntime().availableProcessors() / 2) 个线程进行 Merge 操作;这样大部分机械硬盘的磁盘 IO 都很难承受,就可能出现阻塞。
<br /> "index": {<br /> "refresh_interval": "5s",<br /> "number_of_shards": "3",<br /> "max_result_window": 10000,<br /> "translog": {<br /> "flush_threshold_size": "500mb",<br /> "sync_interval": "30s",<br /> "durability": "async"<br /> },<br /> "merge": {<br /> "scheduler": {<br /> "max_merge_count": "100",<br /> "max_thread_count": "1"<br /> }<br /> },<br />

11、mapping 中 enabled store index 参数

  • enabled:默认是true,只用于 mapping 中的 object 字段类型;当设置为 false 时,其作用是使 es 不去解析该字段,并且该字段不能被查询和 store,只有在 source 中才能看到,设置 enabled 为 false,可以不设置字段类型,默认类型为 object;
  • store:默认 false,store 参数的功能和 source 有一些相似,我们的数据默认都会在 source 中存在,但我们也可以将数据 store 起来;当我们使用 copy_to 参数时,copy_to 的目标字段并不会在 source 中存储,此时 store 就派上用场了;
  • index:默认是 true,当设置为 false,表明该字段不能被查询,如果查询会报错。

    12、ES 图片搜索

  • 可以借助局部敏感 LSH 或者 pHash 来实现:https://stackoverflow.com/questions/32785803
  • Github 也有一个开源项目使用了多种 Hash 算法借助 ES 来实现图片搜索:https://github.com/usc-isi-i2/ ... tures

    13、Term 聚合根据子聚合结果排序

    <br /> GET /_search<br /> {<br /> "aggs" : {<br /> "genres" : {<br /> "terms" : {<br /> "field" : "genre",<br /> "order" : { "playback_stats.max" : "desc" }<br /> },<br /> "aggs" : {<br /> "playback_stats" : { "stats" : { "field" : "play_count" } }<br /> }<br /> }<br /> }<br /> }<br />

    三、社区文章精选

  • [ET007 ElasticStack 6.5 介绍](https://elasticsearch.cn/article/6144)
  • [CentOS 7.4 下安装 ES 6.5.1 搜索集群](https://elasticsearch.cn/article/6152)
  • [Elastic Stack v6.5 新特性解读](https://elasticsearch.cn/article/6156)
  • [Elasticsearch 史上最全最常用工具清单](https://mp.weixin.qq.com/s/s2ema4tIXKcqTNUUhjGt1w)

2018 年 Elastic Advent Calendar 分享活动开启了🎤

medcl 发表了文章 • 35 个评论 • 1693 次浏览 • 2018-11-20 22:33 • 来自相关话题

活动规则很简单:
 
活动创意来自于圣诞节倒计时,从12月1号开始到12月24日结束。
 
每天固定一篇文章分享,内容长短都可。
 
报名现在开始,留言报名即可,留言格式: Day[日期] -  你的分享标题。

一共24篇,报满即止。
 
虽然是西方的节日,不过目的是为了大家一起分享,重在参与嘛。
 
往期活动可参考:https://elasticsearch.cn/topic/advent
 
活动参与名单:

 
 
 如何发布?
自己选择发布文章,按你的标题在12月你的这一天发布出来就好了。

1543474748734.jpg


 

Day 14: Elasticsearch 5 入坑指南

kennywu76 发表了文章 • 33 个评论 • 19093 次浏览 • 2016-12-15 13:16 • 来自相关话题

尝鲜

10月26日,Elasticsearch5.0.0 GA终于放出,携程ES Ops团队也在第一时间在DEV和UAT环境分别进行了2.4.0 至5.0.0的升级和测试。升级完成后,除了部分Query不向前兼容(主要是Filtered Query),需要在应用端做一些修改以外,未发现其他问题。通过监控系统看对比升级前后的主要系统指标,在同等索引量的情况下,CPU使用率有明显下降 ( 30% - 50%左右) ,相信性能方面5.0应该是有较大提升的。 

在测试环境稳定运行了2周以后,我们决定选定一个生产集群进行升级,考验新版本在更为复杂的用户环境下的表现。 出于对业务影响最小化的考虑,用于日志分析的集群被圈定为升级目标。该集群也是携程十几个集群中规模最大的一个,共有120个数据结点运行于70台物理机上,总数据量接近1PB。

升级前需要做一些准备工作,下载官方的Migration Helper插件,检查集群设置和索引的兼容性。对于不兼容的配置项,MH会详尽列出,其中标注为红色部分为为升级前必须修改项。1.x版本创建的索引,是无法直接升级到5的,需要先在2.x集群里做一次reindex 。 MH提供了不兼容索引扫描功能,对于找到的不兼容索引,可以直接在UI上发起reindex操作,等待结束即可。 如果是用于业务搜索集群,数据可能比较重要,建议升级前做一个Snapshot,万一升级过程出现意外,可以回退版本从备份里快速恢复数据。我们的日志集群数据量极大,也没有对数据100%不丢的要求,因此升级前没有做Snapshot。 做完所有的准备工作后,预先通知所有用户集群升级的时间以及可能产生的影响,选定了周五深夜用户低峰期,开始正式升级工作。 

首先通过Ansible将新版本批量部署到所有结点并统一配置,紧接着对原有集群做了Full Stop,校验所有的ES已经停下后,开始Full Start。整个过程比较顺利,所有结点正常启动,数据恢复完成后,集群重新回到正常服务状态。

周末两天运行,未发现有任何的异样,CPU利用率也降了不少,看起来很靠谱……直到周一


踏坑

周一早上,随着用户访问量高峰来临,马上浮现出一个诡异的现象: 索引速率遇到了瓶颈,数据开始在前置的消息队列(Kafka)里堆积。 从监控数据看,尽管所有的数据结点CPU消耗都比上周同期低,磁盘IO也很低,但索引速率却低了很多。反复对比查看升级前后各类监控指标后,终于发现一个可疑点,所有结点的网络流量比升级前高了好几倍!  在集群架构上,我们是单独架设了几台client node做为数据写入和分发的入口,现在这几个node的网络流量已经饱和,成为数据写入的瓶颈。一开始,怀疑是否2.4启用了tcp压缩,而5.0取消了,但翻查官方文档后发现transport.tcp.compress在2.4和5.0里默认都是关闭的! 这时候只有两个解决办法了,要么启用tcp压缩,要么扩容client node。 先考虑了一下tcp压缩的方案,快速扒了一下ES源码,在transport.TcpTransport这个类里,sendRequest和sendResponse两个方法会根据transport.tcp.compress设置来决定发送的消息是否要经过压缩,而在messageReceived方法则会读取消息头部的状态信息,探测消息是否经过压缩以及压缩的方法,而后决定是否需要解压,以及采用的解压方式。 这样看起来,ES是允许tcp压缩和不压缩的结点之间通讯的,那么只对client node启用压缩应该就可以了。测试环境测试过后,验证了想法的可行性。于是对生产的client node开启tcp压缩,同时在数据发送端(hangout的ES output)也启用tcp压缩,重启client node后入口网络流量降到和之前2.4差不多的程度,问题得到规避。 针对这个问题在Github上提交了issue https://github.com/elastic/ela ... 21612, 但未得到官方合理的解释。

解决好这个问题,另外一个问题来了,很多执行大量历史数据搜索的用户反映出不了结果。 从监控数据看,这类查询的搜索耗时非常久,直到网关300秒超时(查询api前置的nginx代理)。我们之前对集群设置过Global Search timeout为60s,用来保护集群资源过多被超高代价的查询消耗,在2.4版本是有效果的,现在看来不起作用了。手动测试了一下,这个参数果然失效! 于是向官方报告了第2个问题:https://github.com/elastic/ela ... 21595 。 这个问题很快被官方确认为Bug,修复也很快加入到了5.0.2。 为了规避这个问题,我们只好临时修改了一下Kibana以及第三方API访问要经过的nginx proxy,默认为所有的search request加入一个超时选项。此后,问题有一些缓解,但仍然发现用户查询大范围历史数据时,部分用于存储历史数据的结点响应很慢。

我们的集群是做了冷热分离的结构的,热节点主要承担写入和存放过去24小时数据,冷结点没有写入,查询频率也低,所以为了最大化利用硬件资源,一台物理机上跑了3个实例,这样一台128GB内存的机器可以存放下近30TB的索引。查看冷结点的监控数据,看到用户查询期间磁盘的read IO非常高,直接将磁盘IO Util%撑到100%,并且可持续数小时,同时search thread pool有大量的active thread处于无法完成状态,search queue不断攀升直至饱和、开始reject。 表象上看search thread似乎一直在尝试从磁盘大量读取数据,一次search甚至可以持续几十分钟至一个小时,耗尽了所有的搜索线程,导致拒绝后续的搜索服务。 于是Github上报了第3个issue: https://github.com/elastic/ela ... 21611  这个问题找到解决办法之前,我们只能通过反复重启有问题的冷结点来缓解。 和官方讨论过程中,得知5.0在Lucene文件访问方式上有一个比较大的改动,2.4使用mmapfs读取索引文件的部分,而5.0以后改为用mmapfs读取索引文件的全部。怀疑问题和这个变动有关,尝试将所有索引文件的设置改为NIOFS后,问题迎刃而解。 搜索性能一下回到了2.4时代,再也没出现搜索线程超长时间执行的问题。之后找时间复现了这个问题,并抓取了线程栈,看到长时间执行的搜索线程一直在做Global Ordinal的构造工作。 至于为何会这样,还不清楚。 从官方给出的信息看,底层索引文件的访问模式是没有变化的,仅仅是将文件读取方式全部改成了mmapfs,理论上应该性能更好,但是看起来在我们这种一台机器跑多个ES实例,所有分配的heap为系统缓存3倍的极端用例下,大范围的数据搜索可能造成过高的磁盘读IO,集群性能指数级下降。

以上问题前后耗了4天才完全规避掉,支持团队连续熬夜后集群总算回复到平稳状态。然而好景不长,运行一段时间以后,数据结点出现疑似内存泄漏现象。结点总数据没怎么增加、甚至还有减少的情况下,heap使用率一只呈攀升趋势,Old GC无法回收内存。这个问题对用户影响较小,通过监控我们可以及时发现内存即将用尽的结点,做一次重启很快就恢复了。 为排查根源,我们对一个有问题的结点做了dump,通过MAT工具分析,看到meta data相关的一个alias对象被实例化了有6600万次之多! 在Github上提交了第四个issue: https://github.com/elastic/ela ... 22013,不多久被确认为已知问题https://github.com/elastic/ela ... 21284 ,在5.0.1已经修复。

最后还存在一个master node内存泄漏的问题,这个问题在2.4.0时代就存在了,升级到5.0.0以后依然没有修复。由于我们的master node和data node是分离的,所以这个问题比较容易通过监控发现,解决方式也很简单和迅速,重启master node即可,对用户完全无影响。之后不久,5.0.2版本正式发布,release notes里提到了对这个问题的修复 https://github.com/elastic/ela ... 21578

上周周末我们将集群rolling upgrade到了5.0.2,global search timeout失效和两个内存泄漏的问题从根源上解决掉了。 网络流量增大的问题依然存在,仍然需要通过启用client结点的transport.tcp.compress规避。 冷结点搜索性能的问题没看到有提及,估计没解决,安全起见,还是保持索引的文件系统为NIOFS。升级完成运行一段时间后,可以肯定,5.0.2已经比较稳定。


心得

升到5.0.2后,对于其中一组数据结点这两天特意加了点索引负载,通过监控数据将v5.0.2与2.4.0做实际运行环境的索引吞吐量对比。

2.4_.png


5.0_.png

 
在近似的CPU使用率和load情况下,5.0.2能够支撑更大的吞吐量。另外5.0带来的Instant aggregation功能,对于跨多个索引的时序类型数据的聚合也可以有效Cache了,在使用Kibana的时候提速感觉非常明显。

升级过程虽然遇到很多波折,但由于集群架构上做了角色分离(client,master,data)和冷热分离,因而Bug引起的故障比较容易被限定在一个较小的范围而不至于影响所有的功能和所有的用户。 故障点定位更加容易,规避措施也更容易实施。 部分规避措施实施过程中甚至对用户是完全无影响的,比如: 重启内存泄漏的master node)。详尽的监控为问题的发现和诊断提供了有力的支持。

Elasticsearch是非常复杂的系统,官方的测试无法覆盖所有的用例场景和数据规模,一些极端的应用场景可能触发某个深藏的Bug或者缺陷而陷入困境。 因此对于稳定性要求极高的应用,最好还是采用经过长时间考验的版本,比如v2.4.2。

Day5: 《PacketBeat奇妙的OOM小记》

kira8565 发表了文章 • 0 个评论 • 2754 次浏览 • 2016-12-05 23:00 • 来自相关话题

Beats这个项目的确很好用,几行命令下来,一个成型的Agent就出来了。使用者只需要关注采集什么数据就好,后续的事情libbeat基本都处理完了。不过值得吐槽的是,Beat太散了,管理起来东一个西一个的,产品化的时候对客户说,我们要在机器上放n个Agent不知道客户会是什么样的表情。


d7d0a529244b57acb6ce3796da132df8.jpg



不过轻量级、已部署的特点还是极大的吸引了我,于是就有了后面的事情了。

PacketBeat不明原因的OOM

某天我把PacketBeat放到了我的服务器上面,这台服务器上面有个MongoDB,MongoDB主要是拿来存放ES的元数据的。ES2.x的时候并没有很好的元数据管理,为了能让ES的索引分配的比较均匀,并且有元数据辅助查询,设计好一个元数据管理的仓库是必要的。然后我打开了对MongoDB的抓包功能,恩,一切都很好,接着我打开了日志管理页面,看到了一条一条的MongoDB的包被抓回来,解码,然后塞到了ES。可是第二天一看,咦??Packet跪了?不是吧,ElasticSearch做的产品这么不稳定么。我不信。


06170826_dLgU.png



然后我又启动了第二次,紧接着熟练的top了一下,观察了PacketBeat半个多小时,在被观察的这段时间里面,PacketBeat的表现非常的正常,看不出有什么异样。好吧,那上一次的OOM可能只是个意外,Windows也经常蓝屏嘛,OOM一次也正常。结果第二天我再次打开终端,发现这货居然又OOM了!!


06170909_irst.png



好吧好吧,我感觉我已经踩到Bug了,拿了开源社区这么多东西,总得贡献一下的,好吧,提个Issue去 https://github.com/elastic/beats/issues/2867

真相只有一个

微信群里面聊起这个奇妙的OOM,Medcl大神问是不是因为采集了ES的日志,(我的这台服务器和日志服务器有关系)然后导致滚雪球把PacketBeat给滚死了。咦?说不定真的是这个原因耶!但是看了看PacketBeat,我并没有抓ES的包,而且假如我采集了ES的包,应该一下就OOM掉了,不应该等那么久。不过这么一说,却仿佛打开了新世界的大门


06171040_FVEG.png



我把这台服务器在日志服务器中的角色重新梳理了下,终于发现了这次OOM的原因了。。

由于2.X的ES没有比较好的元数据信息,所以当日志送到LogServer的时候,我做了些额外的操作,让LogServer持久化到ES一定量的时候就会往Mongo写一下元数据信息(当然也有其他服务会往里面做CRUD啦),开始的时候访问Mongo的次数其实是很少的,假设按1W来算。那么问题来了,由于我们的PacketBeat抓了Mongo的包,那么LogServer往ES的CRUD操作都会被PacketBeat给抓走,然后再送回给LogServer


06171248_tcdl.png



那么一个隐藏的滚雪球事件就产生了,刚开始的那段时间,Mongo被抓包的次数只有1W,然后就往LogServer多送了1W条日志,不。。应该多很多,毕竟网络包嘛,然后就导致LogServer因为要管理元数据的频率开始逐渐地提高,逐渐提高CRUD的频率后抓包的内容也越来越多,紧接着到这发生到LogServer的频率也越来越高。。。。。每次PacketBeat崩掉的时候,都送了80W左右的日志量出去,然后它就OOM掉了(因为我那台机器就只剩下2G的空闲内存给它用,被系统给干掉了)。。我居然发现了这样的场景


06171336_PbWI.png



结论

使用PacketBeat的时候,记得要留意一下有没这种反馈型滚雪球的情况,多发生在自己的日志服务器上面。当然那种直接抓ES的就没什么好说了,估计启动了之后没多久就崩溃掉了

Day4: 《将sql转换为es的DSL》

Xargin 发表了文章 • 6 个评论 • 11328 次浏览 • 2016-12-04 23:23 • 来自相关话题

es现在几乎已经是开源搜索引擎的事实标准了,搭建简易,使用方便。不过在很多公司里(包括我司的部分部门),并不是把它当搜索引擎来用,而是当db来用。因为本身查询/搜索原理的区别,使es在千万或者亿级的数据中进行逻辑筛选相对高效。例如一些wms、工单查询系统,单表几十个甚至上百个字段,如果在数据库里为每种类型的查询都建立合适的索引,成本比较高,更不用说索引建多了还会影响到插入速度,后期的索引优化也是比较麻烦的问题。

不过如果把es当db来使的话,始终会有一个绕不过去的坎。就是es的DSL。让所有业务开发去学习dsl的话也不是不可以,但DSL真的有点反人类(不要打我)。简单的a and b或者a or b还比较容易写,如果我要的是a and (b and (c or d) and e)的查询逻辑,那我觉得谁写都会晕。即使是用官方或者第三方提供的client,如果需求多种多样的话,想要灵活地实现`需求=>DSL`的过程还是比较痛苦。

对于业务开发来说,当然是sql更平易近人(毕竟写了这么多年CRUD)。所以还有一种歪门邪道的流派,直接把sql转成DSL。要做sql和DSL转换的工作,需要进行sql的解析,先不要怵,这个年代找一个靠谱的sql parser还是比较容易的。比如阿里开源的druid连接池里的sql模块:
 
https://github.com/alibaba/dru ... d/sql

因为笔者的实现是用的下面这个golang版的parser:

https://github.com/xwb1989/sqlparser

所以用这个来举例吧~

这个是其作者从youtube/vitness里提取并进行改进的一个parser,我们能用到的是一部分子集功能,只需要解析select类的sql。

先举个简单的sql的例子:
select * from x_order where userId = 1 order by id desc limit 10,1;

解析之后会变成golang的一个struct,来看看具体的定义:

&sqlparser.Select{
Comments:sqlparser.Comments(nil),
Distinct:"",
SelectExprs:sqlparser.SelectExprs{(*sqlparser.StarExpr)(0xc42000aee0)},
From:sqlparser.TableExprs{(*sqlparser.AliasedTableExpr)(0xc420016930)},
Where:(*sqlparser.Where)(0xc42000afa0),
GroupBy:sqlparser.GroupBy(nil),
Having:(*sqlparser.Where)(nil),
OrderBy:sqlparser.OrderBy{(*sqlparser.Order)(0xc42000af20)},
Limit:(*sqlparser.Limit)(0xc42000af80),
Lock:""
}


sql的select语句在被解析之后生成一个Select的结构体,如果我们不关心使用者需要的字段的话,可以先把SelectExprs/Distinct/Comments/Lock里的内容忽略掉。如果不是分组统计类的需求,也可以先把GroupBy/Having忽略掉。这里我们关心的就剩下From、Where、OrderBy和Limit。

From对应的TableExprs实际上可以认为是简单的字符串,这里的值其实就是`x_order`。

OrderBy实际上是一个元素为
type Order struct {
Expr ValExpr
Direction string
}\

的数组。

Limit也很简单,
type Limit struct {
Offset, Rowcount ValExpr
}

其实就是俩数字。

那么剩下的就是这个Where结构了。where会被解析为AST(`https://en.wikipedia.org/wiki/Abstract_syntax_tree`),中文是抽象语法树。在不说子查询之类的情况下,这个AST也不会太复杂,毕竟where后面的情况比起编译原理里的程序语言来说情况还是要少得多的。以上述的sql为例,这里解析出来的Where结构是这样的:
&sqlparser.Where{
Type:"where",
Expr:(*sqlparser.ComparisonExpr)(0xc420016a50)
}


只有一个节点,一个ComparisonExpr表达式,这个ComparisonExpr,中文比较表达式,指代的就是我们sql里的`user_id = 1`。实际上我们可以认为这个"比较表达式"即是所有复杂AST的叶子节点。叶子结点在AST遍历的时候一般也就是递归的终点。因为这里的查询比较简单,整棵AST只有一个节点,即根节点和叶子节点都是这个ComparisonExpr。

再来一个复杂点的例子。
select * from users where user_id = 1 and product_id =2

=>

&sqlparser.Where{
Type:"where",
Expr:(*sqlparser.AndExpr)(0xc42000b020)
}

AndExpr有Left和Right两个成员,分别是:

Left:
&sqlparser.ComparisonExpr{
Operator:"=",
Left:(*sqlparser.ColName)(0xc4200709c0),
Right:sqlparser.NumVal{0x31}
}

Right:
&sqlparser.ComparisonExpr{
Operator:"=",
Left:(*sqlparser.ColName)(0xc420070a50),
Right:sqlparser.NumVal{0x32}
}


稍微有一些二叉树的样子了吧。把这棵简单的树画出来:


Untitled1.png



回到文章开头的那个复杂的例子:
a and (b and (c or d) and e)

=>

select * from user_history where user_id = 1 and (product_id = 2 and (star_num = 4 or star_num = 5) and banned = 1)


看着真够麻烦的,我们把这棵树画出来:


Untitled.png



这样看着就直观多了。我们有了AST的结构,那要怎么对应到es的查询DSL呢?少安毋躁。

我们知道es的bool query是可以进行嵌套的,所以实际上我们可以同样可以构造出树形结构的bool query。这里把bool嵌套must和bool嵌套should简化一下,写成boolmust和boolshould:

例如a and (b and c)
query {
boolmust {
a,
boolmust {
b,
c
}
}
}


我们把query内部的第一个boolmust当作根节点,内部嵌套的a和另一个boolmust当作它的两个子节点,然后b和c又是这个boolmust的子节点。可以看出来,实际上这棵树和AST的节点可以一一对应。

再回到文章开头的例子,a and (b and (c or d) and e):
query {
boolmust {
a,
boolmust {
b,
boolshould {
c,
d
},
e
}
}
}

和前文中ast来做个简单的结构对比~


dsl和ast对比.png




和前文中sql的where解析后的AST树也是完全匹配的。思路来了,只要对sql解析生成的AST进行递归,即可得到这棵树。当然了,这里还可以进行一些优化,如果子节点的类型和父
节点的类型一致,例如都是and表达式或者都是or表达式,我们可以在生成dsl的时候将其作为并列的节点进行合并,这里不再赘述。


在递归中有这么几种情况:
AndExpr => bool must [{left}, {right}]
OrExpr => bool should [{left}, {right}]
ComparisonExpr => 一般是叶子节点
ParenBoolExpr => 指代括号表达式,其实内部是上述三种节点的某一种,所以直接取出内部节点按上述方法来处理


这样问题就变成了如何处理AST的叶子节点。前面提到了叶子节点实际上就是Comparison Expression。只要简单进行一些对应即可,下面是我们的项目里的一些对应关系,仅供参考:


convert.png

最后再附上demo
 
https://github.com/cch123/elasticsql

Day3: 《创建一个你自己的 Beat》

medcl 发表了文章 • 0 个评论 • 4184 次浏览 • 2016-12-03 22:19 • 来自相关话题

Elastic Advent 第三篇, 手头上事情实在太多,这两天正在进行权威指南翻译的冲刺阶段,临时填下坑,翻译官网的一篇文章吧(原文:https://www.elastic.co/blog/build-your-own-beat),Advent 规则很自由的,没说不能翻译文章啊,嘿嘿嘿,另外号召大家踊跃报名,大家一起玩才有意思。
 
活动地址:http://elasticsearch.cn/article/107
 
言归正传!
 Beat 是一个开源的用来构建轻量级数据汇集的平台,可用于将各种类型的数据发送至Elasticsearch 与 Logstash。我们有 Packetbeat 用于监控局域网内服务器之间的网络流量信息,有 Filebeat 收集服务器上的日志信息,还有新推出的 Metricbeat 可以定期获取外部系统的监控指标信息,除此以外,你还可以非常方便的基于 libbeat 框架来构建你属于自己的专属 Beat,目前 beas 社区已经有超过25个 Community Beats 了。

Elastic 还提供一个 Beat generator(Beat 生成器)来帮你快速构建属于你自己的 Beat。通过这篇博客你将会看到如何通过 Beat 生成器来快速创建一个你自己的 Beat。今天我们创建的是一个叫做 lsbeat 的 Beat,lsbeat 非常类似 Unix 系统下的命令行 ls,我们用 lsbeat 来索引目录和文件信息。本篇文章环境基于 Unix 系统,如果你是 Windows 或是其它系统,相关操作可能需要根据实际情况进行调整。

第一步 – 配置 Golang 环境

Beats 是用 Golang写的,显然,要创建和开发一个 beat,Golang 环境必不可少,关于这方面的文章很多,建议查看这篇 Golang 的安装向导: install Golang。当前 Beats 需要的最低版本是 Golang 1.6。另外请确保正确设置了你的 $GOPATH 环境变量。

另外 Golang Glide 被用来进行包的依赖管理,所以也需要确保正确安装,最低版本是 Glide 0.10.0,安装说明点这里。

让我们先来看看 lsbeat 将会用到的一段代码吧,这是一个简单的 golang 程序,通过命令行接收一个目录参数,然后列出该目录下的文件和子目录信息。
package main

import (
"fmt"
"io/ioutil"
"os"
)

func main() {
//apply run path "." without argument.
if len(os.Args) == 1 {
listDir(".")
} else {
listDir(os.Args[1])
}
}

func listDir(dirFile string) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()
fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())

if f.IsDir() {
listDir(dirFile + "/" + f.Name())
}
}
}

后面我们将使用到这段代码和 listDir 函数。

第二步 – 生成项目

要生成一个你自己的 Beat,就要用到 beat-generator 了,首先你必须安装 cookiecutter。安装的详细说明看这里。安装好 cookiecutter 之后,我们要给自己的 Beat 起一个好听的名字,最好是小写的英文字母,我们今天这个例子就叫 lsbeat 吧。

生成项目模板之前,我们需要下载 Beats generator 包文件,就在 beats 仓库。安装好 GoLang 之后,你就可以很方便的使用 go get 命令来下载 Beats generator 包文件了。 当你执行下面的这个命令后,所有的源码文件都会下载到 $GOPATH/src 目录。
$ go get github.com/elastic/beats

在 GOPATH 下创建一个以你自己github账号名称命名的目录,并切换过去,然后执行 cookiecutter 命令并指向 Beat Generator 源码路径。
$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat

Cookiecutter 接下来会问你几个问题,比如项目名称,我们输入:lsbeat;你的 github 用户名,输入你自己的 github 账户;还有两个关于beat和beat_path应该会自动识别,默认回车就好;最后的问题,你可以输入你的姓和名。
project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}


现在应该已经创建好了一个名为 lsbeat 的目录,并且里面应该会生成一些文件,让我们一起来看一下吧,结构如下:
$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│ └── lsbeat.go
├── config
│ ├── config.go
│ └── config_test.go
├── dev-tools
│ └── packer
│ ├── Makefile
│ ├── beats
│ │ └── lsbeat.yml
│ └── version.yml
├── docs
│ └── index.asciidoc
├── etc
│ ├── beat.yml
│ └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
└── system
├── config
│ └── lsbeat.yml.j2
├── lsbeat.py
├── requirements.txt
└── test_base.py


我们刚刚已经生成好了一个原始的 Beat 模板了,但是你还需要获取相关的依赖和设置好 git 仓库。

首先,你需要拉取依赖的相关包信息,我们的这个例子是 lsbeat,我们先做一些的基本的配置,回头再看看详细看看其它的模板和配置文件,只需要执行 make setup 就可以自动获取依赖。
$ make setup


当你创建好了自己的 Beat 之后,记得上传到 github 仓库,并和社区进行分享哦,如下:


beats.png



要 push lsbeat 到你的 git 仓库,只需要执行如下命令:
$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master


恭喜你,现在你已经完成了一个 Beat ,并且发布了第一个版本到了 Github,不过里面还没有什么具体内容,现在让我们进一步看看里面的代码吧。

第四步 – 配置

执行过上面一系列命令之后,项目里将会自动创建名为 lsbeat.yml 和 lsbeat.template.json 的配置文件。所有的基本配置项都已经生成在了里面。
lsbeat.yml
lsbeat:
# Defines how often an event is sent to the output
period: 1s


Period 参数包含在每一个生成的 Beats 里面,它表示 lsbeat 将会每隔 1 秒钟轮询一次,这里我们修改 period 时间间隔为 10 秒。还可以在修改 etc/ 目录下面的 beat.yml 文件,这里新增一个 path 参数表示我们具体要监听哪个目录。
lsbeat:
# Defines how often an event is sent to the output
period: 10s
path: "."


参数添加好了之后,我们只需要运行 make update 命令就能让这些修改应用到配置文件lsbeat.yml。
$ make update
$ cat lsbeat.yml

################### Lsbeat Configuration Example #########################

############################# Lsbeat ######################################

lsbeat:
# Defines how often an event is sent to the output
period: 10s
path: "."
###############################################################################


修改完 yml 文件,记得修改 config/config.go文件,添加一个path 参数。
package config

import "time"

type Config struct {
Period time.Duration `config:"period"`
Path string `config:"path"`
}

var DefaultConfig = Config{
Period: 10 * time.Second,
Path: ".",
}


同时我们修改 period 默认时间间隔为 10 秒,默认监听的是当前目录 (.) 。.

第五步 – 添加代码

每一个 Beat 需要实现 Beater 接口,里面定义了 Run() 和 Stop() 函数。. 

我们可以定义一个名为 Lsbeat 的结构体,然后用这个对象实现 Beater 接口。然后添加字段 lastIndexTime 来保存最后运行的时间戳信息。
type Lsbeat struct {
done chan struct{}
config config.Config
client publisher.Client

lastIndexTime time.Time
...
}


另外,每个 Beat 还需要实现 New() 方法来接收 Beat 配置信息和返回 Lsbeat 的具体实例。
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}

ls := &Lsbeat{
done: make(chan struct{}),
config: config,
}
return ls, nil
}


在我们的 lsbeat 例子中,我们要做的就是扩展默认的 Run() 函数来导出指定目录的文件和子目录信息。

在修改 Run() 函数之前,我们先在 lsbeat.go 增加 listDir() 函数,就是我们前面最开始测试的那段代码,用于收集目录和文件信息的简单例子稍微修改一下。另外我们还要生成以下字段信息:
"@timestamp": common.Time(time.Now()),
"type": beatname,
"modtime": common.Time(t),
"filename": f.Name(),
"path": dirFile + "/" + f.Name(),
"directory": f.IsDir(),
"filesize": f.Size(),


第一次运行的时候我们将索引所有的文件和目录信息,然后我们再定期检查是否有新文件被创建或者修改,再索引这些新创建的文件和目录。每次定期检查的时间戳都会保存在 lasIndexTime 变量,完整代码如下:
func (bt *Lsbeat) listDir(dirFile string, beatname string, init bool) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": beatname,
"modtime": common.Time(t),
"filename": f.Name(),
"path": dirFile + "/" + f.Name(),
"directory": f.IsDir(),
"filesize": f.Size(),
}
if init {
// index all files and directories on init
bt.client.PublishEvent(event)
} else {
// Index only changed files since last run.
if t.After(bt.lastIndexTime) {
bt.client.PublishEvent(event)
}
}

if f.IsDir() {
bt.listDir(dirFile+"/"+f.Name(), beatname, init)
}
}
}


记住在最开始需要导入 “io/ioutil” 包。
import (
"fmt"
"io/ioutil"
"time"
)


现在,让我们看看如何在 Run() 函数里面调用 listDir() 函数,并且保存时间戳到 lasIndexTime 变量。
func (bt *Lsbeat) Run(b *beat.Beat) error {
logp.Info("lsbeat is running! Hit CTRL-C to stop it.")

bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {

select {
case <-bt.done:
return nil
case <-ticker.C:
}

bt.listDir(bt.config.Path, b.Name, true) // call lsDir
bt.lastIndexTime = time.Now() // mark Timestamp

logp.Info("Event sent")
counter++
}

}


函数 Stop() 用来中断 run 的循环执行,保持默认生成的就行。
func (bt *Lsbeat) Stop() {
bt.client.Close()
close(bt.done)
}


到这里,编码部分基本就完成了。我们接下来添加新字段到 mapping 中,修改文件 etc/fields.yml。.
- key: lsbeat
title: LS Beat
description:
fields:
- name: counter
type: integer
required: true
description: >
PLEASE UPDATE DOCUMENTATION
#new fiels added lsbeat
- name: modtime
type: date
- name: filename
type: text
- name: path
- name: directory
type: boolean
- name: filesize
type: long


重新应用新的配置。

$ make update

字段 file_name 将使用 nGram 分词,我们还需要在文件 lsbeat.template.json 的 “settings” 节点添加一个自定义的 analyzer。
{
"mappings": {
...
},
"order": 0,
"settings": {
"index.refresh_interval": "5s",
"analysis": {
"analyzer": {
"ls_ngram_analyzer": {
"tokenizer": "ls_ngram_tokenizer"
}
},
"tokenizer": {
"ls_ngram_tokenizer": {
"type": "ngram",
"min_gram": "2",
"token_chars": [
"letter",
"digit"
]
}
}
}
},
"template": "lsbeat-*"
}


第六步 – 编译和运行

现在我们可以编译和运行了,只需要执行 make 命令就可以编译出可执行文件 lsbeat (lsbeat.exe on windows) 。

$ make

修改 lsbeat.yml 文件,设置需要监听的目录,如: “/Users/ec2-user/go”,记住是全路径。
lsbeat:
# Defines how often an event is sent to the output
period: 10s
path: "/Users/ec2-user/go"

同时确保你的 elasticsearch 和 kibana 正常运行。现在运行一下 lsbeat 命令看看会发生什么事情吧。

$ ./lsbeat

打开Kibana,通过调用 _cat 接口我们看看的索引是不是创建了。

beats-1.png


可以看到创建了一个名为 lsbeat-2016.06.03 的索引,并且看到已经有了一些文档了。现在对 filename 字段查询一下,由于使用的是 nGram 分词,支持模糊匹配,我们使用 lsbe 关键字搜一下。

beats-2.png


大功告成! 恭喜你,你已经完成了第一个属于你自己的 beat。