Q:非洲食人族的酋长吃什么?

filebeat -> logstash 失败的问题

Beatsmedcl 回复了问题 • 3 人关注 • 2 个回复 • 12909 次浏览 • 2018-12-10 15:52 • 来自相关话题

kibana 作图 advanced 里的json input怎么使用

Kibanamedcl 回复了问题 • 2 人关注 • 1 个回复 • 8931 次浏览 • 2018-12-10 15:47 • 来自相关话题

关于滚动模式,关注terms耗时的化是不是就不需要缩小分片了

Elasticsearchmedcl 回复了问题 • 3 人关注 • 1 个回复 • 1569 次浏览 • 2018-12-10 15:33 • 来自相关话题

elasticsearch的索引消费慢问题。

Elasticsearchgodma 回复了问题 • 5 人关注 • 7 个回复 • 1801 次浏览 • 2018-12-10 15:27 • 来自相关话题

Day 10 - Elasticsearch 分片恢复并发数过大引发的bug分析

Adventhowardhuang 发表了文章 • 4 个评论 • 10466 次浏览 • 2018-12-10 11:43 • 来自相关话题

       大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。

场景描述

       废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:

<br /> curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'<br /> {<br /> "persistent": {<br /> "cluster.routing.allocation.node_concurrent_recoveries": 100,<br /> "indices.recovery.max_bytes_per_sec": "40mb"<br /> }<br /> }<br /> '<br />
       设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。

基本概念

ES 线程池(thread pool)

ES 中每个节点有多种线程池,各有用途。重要的有:

  • generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
  • index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
  • search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
  • get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
  • write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。
    还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/e ... .html

    集群恢复之分片恢复

           我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。
    一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/deve ... 34743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:

  • EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
  • PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
  • SNAPSHOT : 数据从备份仓库恢复。
  • LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。

    PEER 场景分片恢复并发数主要由如下参数控制:

  • cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
  • cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
  • cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。
    详细参数可参考官方文档:https://www.elastic.co/guide/e ... .html

           集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。

    问题复现与剖析

           为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。
    先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:
    <br /> curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'<br /> {<br /> "persistent": {<br /> "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数<br /> }<br /> }<br /> '<br />
     
    接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:

    分片恢复统计信息.png



     
    通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:

    分片恢复限制.png


     
    观察线程池的数量,generic 线程池打满128.

    线程池统计.png



    此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。
    通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。

    堆栈分析.png


     
           现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。

           当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:

    远端恢复时序分析.png


     

           6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:

  • 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
  • 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
  • 第5步:发送 diff 数据文件给目标节点。
  • 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
  • 第10步:源节点发送指定范围的 translog 快照给目标节点。
  • 第12步:结束恢复流程。

           我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。

    问题处理

           为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/ela ... 36195 进行跟踪。

    总结

           本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428

    完结,谢谢!

社区日报 第474期 (2018-12-10)

社区日报cyberdak 发表了文章 • 0 个评论 • 1517 次浏览 • 2018-12-10 10:03 • 来自相关话题

1. 如何设置kibana的堆大小避免oom
http://t.cn/Ey1omYc

2. Es 另外一款web管理UI,包含导入,查看,编辑等功能
http://t.cn/Ey1dKAj

3. 使用elasticseach 搜索emoji表情
http://t.cn/Rf5r848

编辑:cyberdak
归档:https://elasticsearch.cn/article/6183
订阅:https://tinyletter.com/elastic-daily

Day 9 - 动手实现一个自定义beat

AdventXinglong 发表了文章 • 0 个评论 • 3380 次浏览 • 2018-12-09 21:09 • 来自相关话题

参考

https://elasticsearch.cn/article/113

https://www.elastic.co/blog/build-your-own-beat

介绍

公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。
本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。

环境配置

go version go1.9.4 linux/amd64

python version: 2.7.9

不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9


下面的工具会提供python本身需要依赖的一些native包

<br /> yum install openssl -y<br /> yum install openssl-devel -y<br /> yum install zlib-devel -y<br /> yum install zlib -y<br />

安装python

<br /> wget <a href="https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz" rel="nofollow" target="_blank">https://www.python.org/ftp/pyt ... 9.tgz</a><br /> tar -zxvf Python-2.7.9.tgz<br /> cd ~/python/Python-2.7.9<br /> ./configure --prefix=/usr/local/python-2.7.9<br /> make<br /> make install<br /> <br /> rm -f /bin/python<br /> ln -s /usr/local/python-2.7.9/bin/python /bin/python<br />

安装工具包 distribute, setuptools, pip

<br /> cd ~/python/setuptools-19.6 && python setup.py install<br /> cd ~/python/pip-1.5.4 && python setup.py install<br /> cd ~/python/distribute-0.7.3 && python setup.py install<br />

安装cookiecutter

<br /> pip install --user cookiecutter<br />

安装cookiecutter所依赖的工具

<br /> pip install backports.functools-lru-cache<br /> pip install six<br /> pip install virtualenv<br />

*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到


代码实现


需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping

使用官方提供的beat模板创建自己的beat


  • 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go

    <br /> $ go get github.com/elastic/beats<br /> $ cd $GOPATH/src/github.com/elastic/beats<br /> $ git checkout 5.1<br /> <br /> [root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat<br /> project_name [Examplebeat]: hdfsauditbeat<br /> github_name [your-github-name]: suxingfate<br /> beat [hdfsauditbeat]:<br /> beat_path [github.com/suxingfate]:<br /> full_name [Firstname Lastname]: xinglong<br /> <br /> make setup<br />

    到这里,模板就生成了,然后就是定制需要的东西。

  • 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
  • 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
  • 3 beater/hdfsauditbeat.go # 核心逻辑代码
  • 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping



    1 _meta/beat.yml

    这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑
    ```
    [root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
    ################### Hdfsauditbeat Configuration Example #########################

    ############################# Hdfsauditbeat ######################################

    hdfsauditbeat:

    Defines how often an event is sent to the output

    period: 1s
    path: "."
    ```

    2 config/config.go

    这里把path定义到struct里面,后面核心代码就可以从config对象获得path了

    <br /> [root@minikube-2830379 hdfsauditbeat]# cat config/config.go<br /> // Config is put into a different package to prevent cyclic imports in case<br /> // it is needed in several locations<br /> <br /> package config<br /> <br /> import "time"<br /> <br /> type Config struct {<br /> Period time.Duration `config:"period"`<br /> Path string `config:"path"`<br /> }<br /> <br /> var DefaultConfig = Config{<br /> Period: 1 * time.Second,<br /> Path: ".",<br /> }<br />

    3 beater/hdfsauditbeat.go

    这里需要改动的地方是:
    3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去
    3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入

    <br /> [root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go<br /> package beater<br /> <br /> import (<br /> "fmt"<br /> "time"<br /> "os"<br /> "io"<br /> "bufio"<br /> "strings"<br /> "github.com/elastic/beats/libbeat/beat"<br /> "github.com/elastic/beats/libbeat/common"<br /> "github.com/elastic/beats/libbeat/logp"<br /> "github.com/elastic/beats/libbeat/publisher"<br /> <br /> "github.com/suxingfate/hdfsauditbeat/config"<br /> )<br /> <br /> type Hdfsauditbeat struct {<br /> done chan struct{}<br /> config config.Config<br /> client publisher.Client<br /> }<br /> <br /> // Creates beater<br /> func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {<br /> config := config.DefaultConfig<br /> if err := cfg.Unpack(&config); err != nil {<br /> return nil, fmt.Errorf("Error reading config file: %v", err)<br /> }<br /> <br /> bt := &Hdfsauditbeat{<br /> done: make(chan struct{}),<br /> config: config,<br /> }<br /> return bt, nil<br /> }<br /> <br /> func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {<br /> logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")<br /> <br /> bt.client = b.Publisher.Connect()<br /> ticker := time.NewTicker(bt.config.Period)<br /> counter := 1<br /> for {<br /> select {<br /> case <-bt.done:<br /> return nil<br /> case <-ticker.C:<br /> }<br /> <br /> bt.catAudit(bt.config.Path)<br /> <br /> logp.Info("Event sent")<br /> counter++<br /> }<br /> }<br /> <br /> func (bt *Hdfsauditbeat) Stop() {<br /> bt.client.Close()<br /> close(bt.done)<br /> }<br /> <br /> func (bt *Hdfsauditbeat) catAudit(auditFile string) {<br /> file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)<br /> if err != nil {<br /> //fmt.Println("Open file error!", err)<br /> return<br /> }<br /> defer file.Close()<br /> <br /> buf := bufio.NewReader(file)<br /> for {<br /> line, err := buf.ReadString('\n')<br /> line = strings.TrimSpace(line)<br /> if line == "" {<br /> return<br /> }<br /> <br /> timeEnd := strings.Index(line, ",")<br /> timeString := line[0 :timeEnd]<br /> tm, _ := time.Parse("2006-01-02 03:04:05", timeString)<br /> <br /> ugiStart := strings.Index(line, "ugi=") + 4<br /> ugiEnd := strings.Index(line, " (auth")<br /> ugi := line[ugiStart :ugiEnd]<br /> <br /> cmdStart := strings.Index(line, "cmd=") + 4<br /> line = line[cmdStart:len(line)]<br /> cmdEnd := strings.Index(line, " ")<br /> cmd := line[0 : cmdEnd]<br /> <br /> srcStart := strings.Index(line, "src=") + 4<br /> line = line[srcStart:len(line)]<br /> srcEnd := strings.Index(line, " ")<br /> src := line[0:srcEnd]<br /> <br /> dstStart := strings.Index(line, "dst=") + 4<br /> line = line[dstStart:len(line)]<br /> dstEnd := strings.Index(line, " ")<br /> dst := line[0:dstEnd]<br /> <br /> event := common.MapStr{<br /> "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),<br /> "ugi": ugi,<br /> "cmd": cmd,<br /> "src": src,<br /> "dst": dst,<br /> }<br /> bt.client.PublishEvent(event)<br /> <br /> if err != nil {<br /> if err == io.EOF {<br /> //fmt.Println("File read ok!")<br /> break<br /> } else {<br /> //fmt.Println("Read file error!", err)<br /> return<br /> }<br /> }<br /> }<br /> }<br /> <br />

    4 _meat/fields.yml

    ```
    [root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml

  • key: hdfsauditbeat
    title: hdfsauditbeat
    description:
    fields:
    • name: counter
      type: long
      required: true
      description: >
      PLEASE UPDATE DOCUMENTATION

      new fiels added hdfsaudit

    • name: entrytime
      type: date
    • name: ugi
      type: keyword
    • name: cmd
      type: keyword
    • name: src
      type: keyword
    • name: dst
      type: keyword
      ```

      测试


      首先编译好项目
      <br /> make update<br /> make<br />
      然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。
      修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console

      ```
      [root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
      ################### Hdfsauditbeat Configuration Example #########################

      ############################# Hdfsauditbeat ######################################

      hdfsauditbeat:

      Defines how often an event is sent to the output

      period: 1s
      path: "/root/go/hdfs-audit.log"

      ================================ General =====================================


      The name of the shipper that publishes the network data. It can be used to group

      all the transactions sent by a single shipper in the web interface.

      name:


      The tags of the shipper are included in their own field with each

      transaction published.

      tags: ["service-X", "web-tier"]


      Optional fields that you can specify to add additional information to the

      output.

      fields:

      env: staging


      ================================ Outputs =====================================


      Configure what outputs to use when sending the data collected by the beat.

      Multiple outputs may be used.

      -------------------------- Elasticsearch output ------------------------------

      output.elasticsearch:

      Array of hosts to connect to.

      hosts: ["localhost:9200"]


      Optional protocol and basic auth credentials.

      protocol: "https"

      username: "elastic"

      password: "changeme"


      ----------------------------- Logstash output --------------------------------

      output.logstash:

      The Logstash hosts

      hosts: ["localhost:5044"]


      Optional SSL. By default is off.

      List of root certificates for HTTPS server verifications

      ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]


      Certificate for SSL client authentication

      ssl.certificate: "/etc/pki/client/cert.pem"


      Client Certificate Key

      ssl.key: "/etc/pki/client/cert.key"


      output.console:
      pretty: true

      ================================ Logging =====================================


      Sets log level. The default log level is info.

      Available log levels are: critical, error, warning, info, debug

      logging.level: debug


      At debug level, you can selectively enable logging only for some components.

      To enable all selectors use ["*"]. Examples of other selectors are "beat",

      "publish", "service".

      logging.selectors: ["*"]

      <br /> <br /> 开始执行<br />
      [root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
      {
      "@timestamp": "2018-12-09T03:00:00.000Z",
      "beat": {
      "hostname": "minikube-2830379.lvs02.dev.abc.com",
      "name": "minikube-2830379.lvs02.dev.abc.com",
      "version": "5.1.3"
      },
      "cmd": "create",
      "dst": "null",
      "src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
      "ugi": "appmon@APD.ABC.COM"
      }
      {
      "@timestamp": "2018-12-09T03:00:00.000Z",
      "beat": {
      "hostname": "minikube-2830379.lvs02.dev.abc.com",
      "name": "minikube-2830379.lvs02.dev.abc.com",
      "version": "5.1.3"
      },
      "cmd": "create",
      "dst": "null",
      "src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
      "ugi": "appmon@APD.ABC.COM"
      }
      ```

      结束


      使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。

社区日报 第473期 (2018-12-09)

社区日报至尊宝 发表了文章 • 0 个评论 • 1099 次浏览 • 2018-12-09 10:11 • 来自相关话题

1.ElasticSearch连接:Has_Child,Has_parent查询。
http://t.cn/EyEJZGO
2.(自备梯子)将full-scale ELK栈部署到Kubernetes。
http://t.cn/EyEiOtk
3.(自备梯子)Facebook建立在不平等的基础之上。
http://t.cn/EyE6quM

编辑:至尊宝
归档:https://elasticsearch.cn/article/6181
订阅:https://tinyletter.com/elastic-daily

logstash 输入插件syslog中为啥不能配置host

Logstashmedcl 回复了问题 • 3 人关注 • 2 个回复 • 2636 次浏览 • 2018-12-08 21:40 • 来自相关话题

社区日报 第472期 (2018-12-08)

社区日报bsll 发表了文章 • 0 个评论 • 1086 次浏览 • 2018-12-08 11:23 • 来自相关话题

  1. 用于ES数据警报的实时守护进程。
    [http://t.cn/EyQmiuE](http://t.cn/EyQmiuE)

  2. 最小的ibana Docker镜像。
    [http://t.cn/EyQbwGk](http://t.cn/EyQbwGk)

  3. Lucene列式存储格式DocValues详解。
    [http://t.cn/EyQ6pkK](http://t.cn/EyQ6pkK)



Day 8 - 如何使用Spark快速将数据写入Elasticsearch

AdventRicky Huo 发表了文章 • 0 个评论 • 8517 次浏览 • 2018-12-08 09:58 • 来自相关话题

如何使用Spark快速将数据写入Elasticsearch



说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

    为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

    我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

    今天给大家推荐一款能够实现数据快速写入的黑科技——[Waterdrop](https://github.com/InterestingLab/waterdrop),一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

    wd.png




    Kafka to Elasticsearch


    和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

    Log Sample


    原始日志格式如下:
    <br /> 127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"<br />

    Elasticsearch Document


    我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:
    <br /> domain String<br /> hostname String<br /> status int<br /> datetime String<br /> count int<br />

    Waterdrop with Elasticsearch


    接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

    Waterdrop


    [Waterdrop](https://github.com/InterestingLab/waterdrop)同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

    Prerequisites


    首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

    1. 准备Spark环境
    2. 安装Waterdrop
    3. 配置Waterdrop

      以下是简易步骤,具体安装可以参照[Quick Start](https://interestinglab.github. ... -start)

      ```yaml
      cd /usr/local
      wget https://archive.apache.org/dis ... 7.tgz
      tar -xvf https://archive.apache.org/dis ... 7.tgz
      wget https://github.com/Interesting ... 1.zip
      unzip waterdrop-1.1.1.zip
      cd waterdrop-1.1.1

      vim config/waterdrop-env.sh

      指定Spark安装路径

      SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
      ```

      Waterdrop Pipeline


      与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

      配置文件包括四个部分,分别是Spark、Input、filter和Output。

      Spark



      这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
      <br /> spark {<br /> spark.app.name = "Waterdrop"<br /> spark.executor.instances = 2<br /> spark.executor.cores = 1<br /> spark.executor.memory = "1g"<br /> }<br />

      Input


      这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

      <br /> kafkaStream {<br /> topics = "waterdrop-es"<br /> consumer.bootstrap.servers = "localhost:9092"<br /> consumer.group.id = "waterdrop_es_group"<br /> consumer.rebalance.max.retries = 100<br /> }<br />

      Filter


      在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合
      ```yaml
      filter {

      使用正则解析原始日志

      最开始数据都在raw_message字段中

      grok {
      source_field = "raw_message"
      pattern = '%{NOTSPACE:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
      }

      将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

      Elasticsearch中支持的格式

      date {
      source_field = "timestamp"
      target_field = "datetime"
      source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
      target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
      }

      利用SQL对数据进行聚合

      sql {
      table_name = "access_log"
      sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
      }
      }
      ```

      Output

      最后我们将处理好的结构化数据写入Elasticsearch。

      yaml<br /> output {<br /> elasticsearch {<br /> hosts = ["localhost:9200"]<br /> index = "waterdrop-${now}"<br /> es.batch.size.entries = 100000<br /> index_time_format = "yyyy.MM.dd"<br /> }<br /> }<br />

      Running Waterdrop


      我们将上述四部分配置组合成为我们的配置文件config/batch.conf

      vim config/batch.conf

      ```
      spark {
      spark.app.name = "Waterdrop"
      spark.executor.instances = 2
      spark.executor.cores = 1
      spark.executor.memory = "1g"
      }
      input {
      kafkaStream {
      topics = "waterdrop-es"
      consumer.bootstrap.servers = "localhost:9092"
      consumer.group.id = "waterdrop_es_group"
      consumer.rebalance.max.retries = 100
      }
      }
      filter {

      使用正则解析原始日志

      最开始数据都在raw_message字段中

      grok {
      source_field = "raw_message"
      pattern = '%{IP:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
      }

      将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

      Elasticsearch中支持的格式

      date {
      source_field = "timestamp"
      target_field = "datetime"
      source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
      target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
      }

      利用SQL对数据进行聚合

      sql {
      table_name = "access_log"
      sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
      }
      }
      output {
      elasticsearch {
      hosts = ["localhost:9200"]
      index = "waterdrop-${now}"
      es.batch.size.entries = 100000
      index_timeformat = "yyyy.MM.dd"
      }
      }
      ```

      执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

      ./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

      最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^
      ^.

      <br /> "_source": {<br /> "domain": "elasticsearch.cn",<br /> "hostname": "localhost",<br /> "status": "200",<br /> "datetime": "2018-11-26T21:54:00.000+08:00",<br /> "count": 26<br /> }<br />

      Conclusion


      在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

      当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

      希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页[https://github.com/InterestingLab/waterdrop](https://github.com/InterestingLab/waterdrop)


      我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

      Contract us


      欢迎联系我们交流Spark和Elasticsearch:

      Garyelephant: 微信: garyelephant

      RickyHuo: 微信: chodomatte1994

请问es适合类似于百度搜索这种样子吗?

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 2080 次浏览 • 2018-12-08 00:27 • 来自相关话题

Day 7 - Elasticsearch中数据是如何存储的

Adventweizijun 发表了文章 • 7 个评论 • 68790 次浏览 • 2018-12-07 13:55 • 来自相关话题

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。


elasticsearch_store_arc.png



shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

    lucene文件内容

    lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

    | Name | Extension | Brief Description |
    | ------ | ------ | ------ |
    |Segment Info|.si|segment的元数据文件|
    |Compound File|.cfs, .cfe|一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息|
    |Fields|.fnm|保存了fields的相关信息|
    |Field Index|.fdx|正排存储文件的元数据信息|
    |Field Data|.fdt|存储了正排存储数据,写入的原文存储在这|
    |Term Dictionary|.tim|倒排索引的元数据信息|
    |Term Index|.tip|倒排索引文件,存储了所有的倒排索引数据|
    |Frequencies|.doc|保存了每个term的doc id列表和term在doc中的词频|
    |Positions|.pos|Stores position information about where a term occurs in the index
    全文索引的字段,会有该文件,保存了term在doc中的位置|
    |Payloads|.pay|Stores additional per-position metadata information such as character offsets and user payloads
    全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性|
    |Norms|.nvd, .nvm|文件保存索引字段加权数据|
    |Per-Document Values|.dvd, .dvm|lucene的docvalues文件,即数据的列式存储,用作聚合和排序|
    |Term Vector Data|.tvx, .tvd, .tvf|Stores offset into the document data file
    保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用|
    |Live Documents|.liv|记录了segment中删除的doc|

    测试数据示例

    下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

    写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

    数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

    使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

    <br /> PUT test_field<br /> {<br /> "settings": {<br /> "index": {<br /> "number_of_shards": "1",<br /> "number_of_replicas": "0",<br /> "refresh_interval": "30s"<br /> }<br /> },<br /> "mappings": {<br /> "type": {<br /> "_all": {<br /> "enabled": false<br /> }, <br /> "properties": {<br /> "uuid": {<br /> "type": "string",<br /> "index": "not_analyzed"<br /> }<br /> }<br /> }<br /> }<br /> }<br />

    首先写入100w不同的uuid,使用磁盘容量细节如下:

    <br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 122.7mb 122.7mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 19 21:23 _8.fdt<br /> -rw-r--r-- 1 weizijun staff 17K Aug 19 21:23 _8.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:23 _8.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:23 _8.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 19 21:23 _8_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 19 21:23 _8_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 340K Aug 19 21:23 _8_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 19 21:23 _8_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 19 21:23 _8_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:23 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:20 write.lock<br />
    可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

    35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

    我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.2mb 13.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 5.5M Aug 19 21:29 _6.fdt<br /> -rw-r--r-- 1 weizijun staff 15K Aug 19 21:29 _6.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:29 _6.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:29 _6.si<br /> -rw-r--r-- 1 weizijun staff 309K Aug 19 21:29 _6_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 7.0M Aug 19 21:29 _6_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 195K Aug 19 21:29 _6_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 244K Aug 19 21:29 _6_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 252B Aug 19 21:29 _6_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:29 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:26 write.lock<br />

    这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

    所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

    我们再看看存100w个不一样的整型会是如何。

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.6mb 13.6mb <br /> <br /> -rw-r--r-- 1 weizijun staff 6.1M Aug 28 10:19 _42.fdt<br /> -rw-r--r-- 1 weizijun staff 22K Aug 28 10:19 _42.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 28 10:19 _42.fnm<br /> -rw-r--r-- 1 weizijun staff 503B Aug 28 10:19 _42.si<br /> -rw-r--r-- 1 weizijun staff 2.8M Aug 28 10:19 _42_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 2.2M Aug 28 10:19 _42_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 83K Aug 28 10:19 _42_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 2.5M Aug 28 10:19 _42_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 228B Aug 28 10:19 _42_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 28 10:19 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 28 10:16 write.lock<br />

    从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

    我们再看一下开启最佳压缩参数对存储空间的影响:

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 107.2mb 107.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 25M Aug 20 12:30 _5.fdt<br /> -rw-r--r-- 1 weizijun staff 6.0K Aug 20 12:30 _5.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 20 12:31 _5.fnm<br /> -rw-r--r-- 1 weizijun staff 500B Aug 20 12:31 _5.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 20 12:31 _5_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 20 12:31 _5_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 322K Aug 20 12:31 _5_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 20 12:31 _5_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 20 12:31 _5_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 224B Aug 20 12:31 segments_4<br /> -rw-r--r-- 1 weizijun staff 0B Aug 20 12:00 write.lock<br />

    结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

    我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:

    <br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 162.4mb 162.4mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 18 22:59 _20.fdt<br /> -rw-r--r-- 1 weizijun staff 18K Aug 18 22:59 _20.fdx<br /> -rw-r--r-- 1 weizijun staff 777B Aug 18 22:59 _20.fnm<br /> -rw-r--r-- 1 weizijun staff 59B Aug 18 22:59 _20.nvd<br /> -rw-r--r-- 1 weizijun staff 78B Aug 18 22:59 _20.nvm<br /> -rw-r--r-- 1 weizijun staff 539B Aug 18 22:59 _20.si<br /> -rw-r--r-- 1 weizijun staff 7.2M Aug 18 22:59 _20_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 4.2M Aug 18 22:59 _20_Lucene50_0.pos<br /> -rw-r--r-- 1 weizijun staff 73M Aug 18 22:59 _20_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 832K Aug 18 22:59 _20_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 18 22:59 _20_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 18 22:59 _20_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 18 22:59 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 18 22:53 write.lock<br /> <br />

    开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

    然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。


    lucene各文件具体内容和实现

    lucene数据元信息文件

    文件名为:segments_xxx

    该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

    该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

    lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

    文件示例


    elasticsearch_store_segments.png




    具体实现类


    ```
    public final class SegmentInfos implements Cloneable, Iterable {
    // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
    private long generation; // generation of the "segments_N" for the next commit

    private long lastGeneration; // generation of the "segments_N" file we last successfully read
    // or wrote; this is normally the same as generation except if
    // there was an IOException that had interrupted a commit

    / Id for this commit; only written starting with Lucene 5.0 */
    private byte[] id;


    /* Which Lucene version wrote this commit, or null if this commit is pre-5.3. /
    private Version luceneVersion;

    /
    Counts how often the index has been changed. */
    public long version;

    / Used to name new segments. */
    // TODO: should this be a long ...?
    public int counter;

    /* Version of the oldest segment in the index, or null if there are no segments. /
    private Version minSegmentLuceneVersion;

    private List segments = new ArrayList<>();

    /
    Opaque Map<String, String> that user can specify during IndexWriter.commit */
    public Map<String,String> userData = Collections.emptyMap();
    }

    /** Embeds a [read-only] SegmentInfo and adds per-commit

    • fields.
      *
    • @lucene.experimental */
      public class SegmentCommitInfo {

      /* The {@link SegmentInfo} that we wrap. /
      public final SegmentInfo info;

      // How many deleted docs in the segment:
      private int delCount;

      // Generation number of the live docs file (-1 if there
      // are no deletes yet):
      private long delGen;

      // Normally 1+delGen, unless an exception was hit on last
      // attempt to write:
      private long nextWriteDelGen;

      // Generation number of the FieldInfos (-1 if there are no updates)
      private long fieldInfosGen;

      // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
      // write
      private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

      // Generation number of the DocValues (-1 if there are no updates)
      private long docValuesGen;

      // Normally 1+dvGen, unless an exception was hit on last attempt to
      // write
      private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

      // TODO should we add .files() to FieldInfosFormat, like we have on
      // LiveDocsFormat?
      // track the fieldInfos update files
      private final Set fieldInfosFiles = new HashSet<>();

      // Track the per-field DocValues update files
      private final Map<Integer,Set> dvUpdatesFiles = new HashMap<>();

      // Track the per-generation updates files
      @Deprecated
      private final Map<Long,Set> genUpdatesFiles = new HashMap<>();

      private volatile long sizeInBytes = -1;
      }

      ```

      segment的元信息文件

      文件后缀:.si

      每个segment都有一个.si文件,记录了该segment的元信息。

      segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

      文件示例


      elasticsearch_store_si.png




      具体实现类


      ```
      /**

    • Information about a segment such as its name, directory, and files related
    • to the segment.
      *
    • @lucene.experimental
      */
      public final class SegmentInfo {

      // _bl
      public final String name;

      /* Where this segment resides. /
      public final Directory dir;

      /* Id that uniquely identifies this segment. /
      private final byte[] id;

      private Codec codec;

      // Tracks the Lucene version this segment was created with, since 3.1. Null
      // indicates an older than 3.0 index, and it's used to detect a too old index.
      // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
      // specific versions afterwards ("3.0.0", "3.1.0" etc.).
      // see o.a.l.util.Version.
      private Version version;

      private int maxDoc; // number of docs in seg

      private boolean isCompoundFile;


      private Map<String,String> diagnostics;

      private Set setFiles;

      private final Map<String,String> attributes;
      }
      ```

      fields信息文件

      文件后缀:.fnm

      该文件存储了fields的基本信息。

      fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

      文件示例


      elasticsearch_store_fnm.png




      具体实现类


      ```
      /**

    • Access to the Field Info file that describes document fields and whether or
    • not they are indexed. Each segment has a separate Field Info file. Objects
    • of this class are thread-safe for multiple readers, but only one thread can
    • be adding documents at a time, with no other reader or writer threads
    • accessing this object.
      /
      public final class FieldInfo {
      /
      Field's name */
      public final String name;

      /* Internal field number /
      //field在内部的编号
      public final int number;

      //field docvalues的类型
      private DocValuesType docValuesType = DocValuesType.NONE;

      // True if any document indexed term vectors
      private boolean storeTermVector;

      private boolean omitNorms; // omit norms associated with indexed fields

      //index的配置项
      private IndexOptions indexOptions = IndexOptions.NONE;

      private boolean storePayloads; // whether this field stores payloads together with term positions

      private final Map<String,String> attributes;

      // docvalues的generation
      private long dvGen;
      }
      ```

      数据存储文件

      文件后缀:.fdx, .fdt

      索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

      索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

      文件示例


      elasticsearch_store_fdt.png




      具体实现类

      ```
      /**

    • Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
    • @lucene.internal
      */
      public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
      private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

      final int maxDoc;

      //docid索引,快速定位某个docid的数组坐标
      final int[] docBases;

      //快速定位某个docid所在的文件offset的startPointer
      final long[] startPointers;

      //平均一个chunk的文档数
      final int[] avgChunkDocs;

      //平均一个chunk的size
      final long[] avgChunkSizes;

      final PackedInts.Reader[] docBasesDeltas; // delta from the avg

      final PackedInts.Reader[] startPointersDeltas; // delta from the avg
      }

      /**
    • {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
    • @lucene.experimental
      */
      public final class CompressingStoredFieldsReader extends StoredFieldsReader {

      //从fdt正排索引文件中获得
      private final int version;

      // field的基本信息
      private final FieldInfos fieldInfos;

      //fdt正排索引文件reader
      private final CompressingStoredFieldsIndexReader indexReader;

      //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
      private final long maxPointer;

      //fdx正排数据文件句柄
      private final IndexInput fieldsStream;

      //块大小
      private final int chunkSize;

      private final int packedIntsVersion;

      //压缩类型
      private final CompressionMode compressionMode;

      //解压缩处理对象
      private final Decompressor decompressor;

      //文档数量,从segment元数据中获得
      private final int numDocs;

      //是否正在merge,默认为false
      private final boolean merging;

      //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
      private final BlockState state;
      //chunk的数量
      private final long numChunks; // number of compressed blocks written

      //dirty chunk的数量
      private final long numDirtyChunks; // number of incomplete compressed blocks written

      //是否close,默认为false
      private boolean closed;
      }
      ```

      倒排索引文件

      索引后缀:.tip,.tim

      倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

      5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:[http://www.cnblogs.com/bonelee/p/6226185.html ](http://www.cnblogs.com/bonelee/p/6226185.html )

      [http://examples.mikemccandless ... %2Bit](http://examples.mikemccandless ... d%2Bit) 为FST图实例,可以根据输入的数据构造出FST图

      <br /> 输入到 FST 中的数据为:<br /> String inputValues[] = {"mop","moth","pop","star","stop","top"};<br /> long outputValues[] = {0,1,2,3,4,5};<br />
      生成的 FST 图为:

      elasticsearch_store_tip1.png



      elasticsearch_store_tip2.png



      文件示例


      elasticsearch_store_tip3.png




      具体实现类

      ```
      public final class BlockTreeTermsReader extends FieldsProducer {
      // Open input to the main terms dict file (_X.tib)
      final IndexInput termsIn;
      // Reads the terms dict entries, to gather state to
      // produce DocsEnum on demand
      final PostingsReaderBase postingsReader;
      private final TreeMap<String,FieldReader> fields = new TreeMap<>();

      / File offset where the directory starts in the terms file. */
      /索引数据文件tim的数据的尾部的元数据的地址
      private long dirOffset;
      /* File offset where the directory starts in the index file. /

      //索引文件tip的数据的尾部的元数据的地址
      private long indexDirOffset;

      //semgent的名称
      final String segment;

      //版本号
      final int version;

      //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
      final boolean anyAutoPrefixTerms;
      }

      /

    • BlockTree's implementation of {@link Terms}.
    • @lucene.internal
      */
      public final class FieldReader extends Terms implements Accountable {

      //term的数量
      final long numTerms;

      //field信息
      final FieldInfo fieldInfo;

      final long sumTotalTermFreq;

      //总的文档频率
      final long sumDocFreq;

      //文档数量
      final int docCount;

      //字段在索引文件tip中的起始位置
      final long indexStartFP;

      final long rootBlockFP;

      final BytesRef rootCode;

      final BytesRef minTerm;

      final BytesRef maxTerm;

      //longs:metadata buffer, holding monotonic values
      final int longsSize;

      final BlockTreeTermsReader parent;

      final FST index;
      }
      ```

      倒排链文件

      文件后缀:.doc, .pos, .pay

      .doc保存了每个term的doc id列表和term在doc中的词频

      全文索引的字段,会有.pos文件,保存了term在doc中的位置

      全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

      文件示例


      elasticsearch_store_doc.png




      具体实现类

      ```
      /**

    • Concrete class that reads docId(maybe frq,pos,offset,payloads) list
    • with postings format.
      *
    • @lucene.experimental
      */
      public final class Lucene50PostingsReader extends PostingsReaderBase {
      private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
      private final IndexInput docIn;
      private final IndexInput posIn;
      private final IndexInput payIn;
      final ForUtil forUtil;
      private int version;

      //不分词的字段使用的是该对象,基于skiplist实现了倒排链
      final class BlockDocsEnum extends PostingsEnum {
      }

      //全文检索字段使用的是该对象
      final class BlockPostingsEnum extends PostingsEnum {
      }

      //包含高级特性的字段使用的是该对象
      final class EverythingEnum extends PostingsEnum {
      }
      }
      ```

      列存文件(docvalues)

      文件后缀:.dvm, .dvd

      索引文件为.dvm,数据文件为.dvd。

      lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

    其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

    文件示例


    elasticsearch_store_dvd.png




    具体实现类

    <br /> /** reader for {@link Lucene54DocValuesFormat} */<br /> final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {<br /> //number类型的field的列存列表<br /> private final Map<String,NumericEntry> numerics = new HashMap<>();<br /> <br /> //字符串类型的field的列存列表<br /> private final Map<String,BinaryEntry> binaries = new HashMap<>();<br /> <br /> //有序字符串类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();<br /> <br /> //有序number类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();<br /> <br /> //字符串类型的field的ords列表<br /> private final Map<String,NumericEntry> ords = new HashMap<>();<br /> <br /> //docId -> address -> ord 中field的ords列表<br /> private final Map<String,NumericEntry> ordIndexes = new HashMap<>();<br /> <br /> //field的数量<br /> private final int numFields;<br /> <br /> //内存使用量<br /> private final AtomicLong ramBytesUsed;<br /> <br /> //数据源的文件句柄<br /> private final IndexInput data;<br /> <br /> //文档数<br /> private final int maxDoc;<br /> // memory-resident structures<br /> private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();<br /> private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();<br /> private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();<br /> <br /> //是否正在merge<br /> private final boolean merging;<br /> }<br /> <br /> /** metadata entry for a numeric docvalues field */<br /> static class NumericEntry {<br /> private NumericEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> <br /> /** offset to the actual numeric values */<br /> //field的在数据文件中的起始地址<br /> public long offset;<br /> <br /> /** end offset to the actual numeric values */<br /> //field的在数据文件中的结尾地址<br /> public long endOffset;<br /> <br /> /** bits per value used to pack the numeric values */<br /> public int bitsPerValue;<br /> <br /> //format类型<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> /** monotonic meta */<br /> public DirectMonotonicReader.Meta monotonicMeta;<br /> <br /> //最小的value<br /> long minValue;<br /> <br /> //Compressed by computing the GCD<br /> long gcd;<br /> <br /> //Compressed by giving IDs to unique values.<br /> long table[];<br /> /** for sparse compression */<br /> long numDocsWithValue;<br /> NumericEntry nonMissingValues;<br /> NumberType numberType;<br /> }<br /> <br /> /** metadata entry for a binary docvalues field */<br /> static class BinaryEntry {<br /> private BinaryEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> /** offset to the actual binary values */<br /> //field的在数据文件中的起始地址<br /> long offset;<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> <br /> //最短字符串的长度<br /> int minLength;<br /> <br /> //最长字符串的长度<br /> int maxLength;<br /> /** offset to the addressing data that maps a value to its slice of the byte[] */<br /> public long addressesOffset, addressesEndOffset;<br /> /** meta data for addresses */<br /> public DirectMonotonicReader.Meta addressesMeta;<br /> /** offset to the reverse index */<br /> public long reverseIndexOffset;<br /> /** packed ints version used to encode addressing information */<br /> public int packedIntsVersion;<br /> /** packed ints blocksize */<br /> public int blockSize;<br /> }<br />

    参考资料


    [lucene source code](https://github.com/apache/luce ... /5.5.0)

    [lucene document](https://lucene.apache.org/core/5_5_0/)

    [lucene字典实现原理——FST](http://www.cnblogs.com/bonelee/p/6226185.html )

如何控制日志导入ElasticSearch之后的体积?

Elasticsearchwajika 回复了问题 • 7 人关注 • 5 个回复 • 2492 次浏览 • 2018-12-07 10:27 • 来自相关话题

社区日报 第471期 (2018-12-07)

社区日报laoyang360 发表了文章 • 0 个评论 • 1349 次浏览 • 2018-12-07 08:41 • 来自相关话题

1、使用Elasticsearch作为主数据存储实践
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P

编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily