使用 dmesg 来查看一些硬件或驱动程序的信息或问题。

filebeat -> logstash 失败的问题

Beatsmedcl 回复了问题 • 3 人关注 • 2 个回复 • 13707 次浏览 • 2018-12-10 15:52 • 来自相关话题

bulk写入数据时,READ非常高

ElasticsearchJudge 回复了问题 • 4 人关注 • 2 个回复 • 3168 次浏览 • 2019-03-11 16:12 • 来自相关话题

社区日报 第472期 (2018-12-08)

社区日报bsll 发表了文章 • 0 个评论 • 1590 次浏览 • 2018-12-08 11:23 • 来自相关话题

  1. 用于ES数据警报的实时守护进程。
    [http://t.cn/EyQmiuE](http://t.cn/EyQmiuE)

  2. 最小的ibana Docker镜像。
    [http://t.cn/EyQbwGk](http://t.cn/EyQbwGk)

  3. Lucene列式存储格式DocValues详解。
    [http://t.cn/EyQ6pkK](http://t.cn/EyQ6pkK)



Day 8 - 如何使用Spark快速将数据写入Elasticsearch

AdventRicky Huo 发表了文章 • 0 个评论 • 11580 次浏览 • 2018-12-08 09:58 • 来自相关话题

如何使用Spark快速将数据写入Elasticsearch



说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:

  • 海量数据ETL
  • 海量数据聚合
  • 多源数据处理

    为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。

    我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。

    今天给大家推荐一款能够实现数据快速写入的黑科技——[Waterdrop](https://github.com/InterestingLab/waterdrop),一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。

    wd.png




    Kafka to Elasticsearch


    和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch

    Log Sample


    原始日志格式如下:
    <br /> 127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"<br />

    Elasticsearch Document


    我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:
    <br /> domain String<br /> hostname String<br /> status int<br /> datetime String<br /> count int<br />

    Waterdrop with Elasticsearch


    接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。

    Waterdrop


    [Waterdrop](https://github.com/InterestingLab/waterdrop)同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。

    Prerequisites


    首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

    1. 准备Spark环境
    2. 安装Waterdrop
    3. 配置Waterdrop

      以下是简易步骤,具体安装可以参照[Quick Start](https://interestinglab.github. ... -start)

      ```yaml
      cd /usr/local
      wget https://archive.apache.org/dis ... 7.tgz
      tar -xvf https://archive.apache.org/dis ... 7.tgz
      wget https://github.com/Interesting ... 1.zip
      unzip waterdrop-1.1.1.zip
      cd waterdrop-1.1.1

      vim config/waterdrop-env.sh

      指定Spark安装路径

      SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
      ```

      Waterdrop Pipeline


      与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。

      配置文件包括四个部分,分别是Spark、Input、filter和Output。

      Spark



      这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
      <br /> spark {<br /> spark.app.name = "Waterdrop"<br /> spark.executor.instances = 2<br /> spark.executor.cores = 1<br /> spark.executor.memory = "1g"<br /> }<br />

      Input


      这一部分定义数据源,如下是从Kafka中读取数据的配置案例,

      <br /> kafkaStream {<br /> topics = "waterdrop-es"<br /> consumer.bootstrap.servers = "localhost:9092"<br /> consumer.group.id = "waterdrop_es_group"<br /> consumer.rebalance.max.retries = 100<br /> }<br />

      Filter


      在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合
      ```yaml
      filter {

      使用正则解析原始日志

      最开始数据都在raw_message字段中

      grok {
      source_field = "raw_message"
      pattern = '%{NOTSPACE:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
      }

      将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

      Elasticsearch中支持的格式

      date {
      source_field = "timestamp"
      target_field = "datetime"
      source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
      target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
      }

      利用SQL对数据进行聚合

      sql {
      table_name = "access_log"
      sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
      }
      }
      ```

      Output

      最后我们将处理好的结构化数据写入Elasticsearch。

      yaml<br /> output {<br /> elasticsearch {<br /> hosts = ["localhost:9200"]<br /> index = "waterdrop-${now}"<br /> es.batch.size.entries = 100000<br /> index_time_format = "yyyy.MM.dd"<br /> }<br /> }<br />

      Running Waterdrop


      我们将上述四部分配置组合成为我们的配置文件config/batch.conf

      vim config/batch.conf

      ```
      spark {
      spark.app.name = "Waterdrop"
      spark.executor.instances = 2
      spark.executor.cores = 1
      spark.executor.memory = "1g"
      }
      input {
      kafkaStream {
      topics = "waterdrop-es"
      consumer.bootstrap.servers = "localhost:9092"
      consumer.group.id = "waterdrop_es_group"
      consumer.rebalance.max.retries = 100
      }
      }
      filter {

      使用正则解析原始日志

      最开始数据都在raw_message字段中

      grok {
      source_field = "raw_message"
      pattern = '%{IP:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
      }

      将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为

      Elasticsearch中支持的格式

      date {
      source_field = "timestamp"
      target_field = "datetime"
      source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
      target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
      }

      利用SQL对数据进行聚合

      sql {
      table_name = "access_log"
      sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
      }
      }
      output {
      elasticsearch {
      hosts = ["localhost:9200"]
      index = "waterdrop-${now}"
      es.batch.size.entries = 100000
      index_timeformat = "yyyy.MM.dd"
      }
      }
      ```

      执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。

      ./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

      最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^
      ^.

      <br /> "_source": {<br /> "domain": "elasticsearch.cn",<br /> "hostname": "localhost",<br /> "status": "200",<br /> "datetime": "2018-11-26T21:54:00.000+08:00",<br /> "count": 26<br /> }<br />

      Conclusion


      在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。

      当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。

      希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页[https://github.com/InterestingLab/waterdrop](https://github.com/InterestingLab/waterdrop)


      我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.

      Contract us


      欢迎联系我们交流Spark和Elasticsearch:

      Garyelephant: 微信: garyelephant

      RickyHuo: 微信: chodomatte1994

es每次get都把内存吃爆了。。。

Elasticsearchhuigy 回复了问题 • 7 人关注 • 7 个回复 • 4767 次浏览 • 2018-12-15 15:05 • 来自相关话题

kibana 作图 advanced 里的json input怎么使用

Kibanamedcl 回复了问题 • 2 人关注 • 1 个回复 • 9654 次浏览 • 2018-12-10 15:47 • 来自相关话题

Day 7 - Elasticsearch中数据是如何存储的

Adventweizijun 发表了文章 • 7 个评论 • 75179 次浏览 • 2018-12-07 13:55 • 来自相关话题

前言

很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。

Elasticsearch索引结构

Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。


elasticsearch_store_arc.png



shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。

一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。

所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。

lucene数据存储

下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。

lucene基本概念

  • segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
  • doc : doc表示lucene中的一条记录
  • field :field表示记录中的字段概念,一个doc由若干个field组成。
  • term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
  • 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
  • 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
  • docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。

    lucene文件内容

    lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。

    | Name | Extension | Brief Description |
    | ------ | ------ | ------ |
    |Segment Info|.si|segment的元数据文件|
    |Compound File|.cfs, .cfe|一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息|
    |Fields|.fnm|保存了fields的相关信息|
    |Field Index|.fdx|正排存储文件的元数据信息|
    |Field Data|.fdt|存储了正排存储数据,写入的原文存储在这|
    |Term Dictionary|.tim|倒排索引的元数据信息|
    |Term Index|.tip|倒排索引文件,存储了所有的倒排索引数据|
    |Frequencies|.doc|保存了每个term的doc id列表和term在doc中的词频|
    |Positions|.pos|Stores position information about where a term occurs in the index
    全文索引的字段,会有该文件,保存了term在doc中的位置|
    |Payloads|.pay|Stores additional per-position metadata information such as character offsets and user payloads
    全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性|
    |Norms|.nvd, .nvm|文件保存索引字段加权数据|
    |Per-Document Values|.dvd, .dvm|lucene的docvalues文件,即数据的列式存储,用作聚合和排序|
    |Term Vector Data|.tvx, .tvd, .tvf|Stores offset into the document data file
    保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用|
    |Live Documents|.liv|记录了segment中删除的doc|

    测试数据示例

    下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。

    写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。

    数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。

    使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:

    <br /> PUT test_field<br /> {<br /> "settings": {<br /> "index": {<br /> "number_of_shards": "1",<br /> "number_of_replicas": "0",<br /> "refresh_interval": "30s"<br /> }<br /> },<br /> "mappings": {<br /> "type": {<br /> "_all": {<br /> "enabled": false<br /> }, <br /> "properties": {<br /> "uuid": {<br /> "type": "string",<br /> "index": "not_analyzed"<br /> }<br /> }<br /> }<br /> }<br /> }<br />

    首先写入100w不同的uuid,使用磁盘容量细节如下:

    <br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 122.7mb 122.7mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 19 21:23 _8.fdt<br /> -rw-r--r-- 1 weizijun staff 17K Aug 19 21:23 _8.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:23 _8.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:23 _8.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 19 21:23 _8_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 19 21:23 _8_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 340K Aug 19 21:23 _8_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 19 21:23 _8_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 19 21:23 _8_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:23 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:20 write.lock<br />
    可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。

    35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。

    我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.2mb 13.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 5.5M Aug 19 21:29 _6.fdt<br /> -rw-r--r-- 1 weizijun staff 15K Aug 19 21:29 _6.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:29 _6.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:29 _6.si<br /> -rw-r--r-- 1 weizijun staff 309K Aug 19 21:29 _6_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 7.0M Aug 19 21:29 _6_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 195K Aug 19 21:29 _6_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 244K Aug 19 21:29 _6_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 252B Aug 19 21:29 _6_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:29 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:26 write.lock<br />

    这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。

    所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。

    我们再看看存100w个不一样的整型会是如何。

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.6mb 13.6mb <br /> <br /> -rw-r--r-- 1 weizijun staff 6.1M Aug 28 10:19 _42.fdt<br /> -rw-r--r-- 1 weizijun staff 22K Aug 28 10:19 _42.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 28 10:19 _42.fnm<br /> -rw-r--r-- 1 weizijun staff 503B Aug 28 10:19 _42.si<br /> -rw-r--r-- 1 weizijun staff 2.8M Aug 28 10:19 _42_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 2.2M Aug 28 10:19 _42_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 83K Aug 28 10:19 _42_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 2.5M Aug 28 10:19 _42_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 228B Aug 28 10:19 _42_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 28 10:19 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 28 10:16 write.lock<br />

    从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。

    我们再看一下开启最佳压缩参数对存储空间的影响:

    <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 107.2mb 107.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 25M Aug 20 12:30 _5.fdt<br /> -rw-r--r-- 1 weizijun staff 6.0K Aug 20 12:30 _5.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 20 12:31 _5.fnm<br /> -rw-r--r-- 1 weizijun staff 500B Aug 20 12:31 _5.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 20 12:31 _5_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 20 12:31 _5_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 322K Aug 20 12:31 _5_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 20 12:31 _5_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 20 12:31 _5_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 224B Aug 20 12:31 segments_4<br /> -rw-r--r-- 1 weizijun staff 0B Aug 20 12:00 write.lock<br />

    结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。

    我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:

    <br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 162.4mb 162.4mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 18 22:59 _20.fdt<br /> -rw-r--r-- 1 weizijun staff 18K Aug 18 22:59 _20.fdx<br /> -rw-r--r-- 1 weizijun staff 777B Aug 18 22:59 _20.fnm<br /> -rw-r--r-- 1 weizijun staff 59B Aug 18 22:59 _20.nvd<br /> -rw-r--r-- 1 weizijun staff 78B Aug 18 22:59 _20.nvm<br /> -rw-r--r-- 1 weizijun staff 539B Aug 18 22:59 _20.si<br /> -rw-r--r-- 1 weizijun staff 7.2M Aug 18 22:59 _20_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 4.2M Aug 18 22:59 _20_Lucene50_0.pos<br /> -rw-r--r-- 1 weizijun staff 73M Aug 18 22:59 _20_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 832K Aug 18 22:59 _20_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 18 22:59 _20_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 18 22:59 _20_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 18 22:59 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 18 22:53 write.lock<br /> <br />

    开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。

    然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。


    lucene各文件具体内容和实现

    lucene数据元信息文件

    文件名为:segments_xxx

    该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。

    该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。

    lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。

    文件示例


    elasticsearch_store_segments.png




    具体实现类


    ```
    public final class SegmentInfos implements Cloneable, Iterable {
    // generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
    private long generation; // generation of the "segments_N" for the next commit

    private long lastGeneration; // generation of the "segments_N" file we last successfully read
    // or wrote; this is normally the same as generation except if
    // there was an IOException that had interrupted a commit

    / Id for this commit; only written starting with Lucene 5.0 */
    private byte[] id;


    /* Which Lucene version wrote this commit, or null if this commit is pre-5.3. /
    private Version luceneVersion;

    /
    Counts how often the index has been changed. */
    public long version;

    / Used to name new segments. */
    // TODO: should this be a long ...?
    public int counter;

    /* Version of the oldest segment in the index, or null if there are no segments. /
    private Version minSegmentLuceneVersion;

    private List segments = new ArrayList<>();

    /
    Opaque Map<String, String> that user can specify during IndexWriter.commit */
    public Map<String,String> userData = Collections.emptyMap();
    }

    /** Embeds a [read-only] SegmentInfo and adds per-commit

    • fields.
      *
    • @lucene.experimental */
      public class SegmentCommitInfo {

      /* The {@link SegmentInfo} that we wrap. /
      public final SegmentInfo info;

      // How many deleted docs in the segment:
      private int delCount;

      // Generation number of the live docs file (-1 if there
      // are no deletes yet):
      private long delGen;

      // Normally 1+delGen, unless an exception was hit on last
      // attempt to write:
      private long nextWriteDelGen;

      // Generation number of the FieldInfos (-1 if there are no updates)
      private long fieldInfosGen;

      // Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
      // write
      private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;

      // Generation number of the DocValues (-1 if there are no updates)
      private long docValuesGen;

      // Normally 1+dvGen, unless an exception was hit on last attempt to
      // write
      private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;

      // TODO should we add .files() to FieldInfosFormat, like we have on
      // LiveDocsFormat?
      // track the fieldInfos update files
      private final Set fieldInfosFiles = new HashSet<>();

      // Track the per-field DocValues update files
      private final Map<Integer,Set> dvUpdatesFiles = new HashMap<>();

      // Track the per-generation updates files
      @Deprecated
      private final Map<Long,Set> genUpdatesFiles = new HashMap<>();

      private volatile long sizeInBytes = -1;
      }

      ```

      segment的元信息文件

      文件后缀:.si

      每个segment都有一个.si文件,记录了该segment的元信息。

      segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。

      文件示例


      elasticsearch_store_si.png




      具体实现类


      ```
      /**

    • Information about a segment such as its name, directory, and files related
    • to the segment.
      *
    • @lucene.experimental
      */
      public final class SegmentInfo {

      // _bl
      public final String name;

      /* Where this segment resides. /
      public final Directory dir;

      /* Id that uniquely identifies this segment. /
      private final byte[] id;

      private Codec codec;

      // Tracks the Lucene version this segment was created with, since 3.1. Null
      // indicates an older than 3.0 index, and it's used to detect a too old index.
      // The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
      // specific versions afterwards ("3.0.0", "3.1.0" etc.).
      // see o.a.l.util.Version.
      private Version version;

      private int maxDoc; // number of docs in seg

      private boolean isCompoundFile;


      private Map<String,String> diagnostics;

      private Set setFiles;

      private final Map<String,String> attributes;
      }
      ```

      fields信息文件

      文件后缀:.fnm

      该文件存储了fields的基本信息。

      fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。

      文件示例


      elasticsearch_store_fnm.png




      具体实现类


      ```
      /**

    • Access to the Field Info file that describes document fields and whether or
    • not they are indexed. Each segment has a separate Field Info file. Objects
    • of this class are thread-safe for multiple readers, but only one thread can
    • be adding documents at a time, with no other reader or writer threads
    • accessing this object.
      /
      public final class FieldInfo {
      /
      Field's name */
      public final String name;

      /* Internal field number /
      //field在内部的编号
      public final int number;

      //field docvalues的类型
      private DocValuesType docValuesType = DocValuesType.NONE;

      // True if any document indexed term vectors
      private boolean storeTermVector;

      private boolean omitNorms; // omit norms associated with indexed fields

      //index的配置项
      private IndexOptions indexOptions = IndexOptions.NONE;

      private boolean storePayloads; // whether this field stores payloads together with term positions

      private final Map<String,String> attributes;

      // docvalues的generation
      private long dvGen;
      }
      ```

      数据存储文件

      文件后缀:.fdx, .fdt

      索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这

      索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。

      文件示例


      elasticsearch_store_fdt.png




      具体实现类

      ```
      /**

    • Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
    • @lucene.internal
      */
      public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
      private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);

      final int maxDoc;

      //docid索引,快速定位某个docid的数组坐标
      final int[] docBases;

      //快速定位某个docid所在的文件offset的startPointer
      final long[] startPointers;

      //平均一个chunk的文档数
      final int[] avgChunkDocs;

      //平均一个chunk的size
      final long[] avgChunkSizes;

      final PackedInts.Reader[] docBasesDeltas; // delta from the avg

      final PackedInts.Reader[] startPointersDeltas; // delta from the avg
      }

      /**
    • {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
    • @lucene.experimental
      */
      public final class CompressingStoredFieldsReader extends StoredFieldsReader {

      //从fdt正排索引文件中获得
      private final int version;

      // field的基本信息
      private final FieldInfos fieldInfos;

      //fdt正排索引文件reader
      private final CompressingStoredFieldsIndexReader indexReader;

      //从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
      private final long maxPointer;

      //fdx正排数据文件句柄
      private final IndexInput fieldsStream;

      //块大小
      private final int chunkSize;

      private final int packedIntsVersion;

      //压缩类型
      private final CompressionMode compressionMode;

      //解压缩处理对象
      private final Decompressor decompressor;

      //文档数量,从segment元数据中获得
      private final int numDocs;

      //是否正在merge,默认为false
      private final boolean merging;

      //初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
      private final BlockState state;
      //chunk的数量
      private final long numChunks; // number of compressed blocks written

      //dirty chunk的数量
      private final long numDirtyChunks; // number of incomplete compressed blocks written

      //是否close,默认为false
      private boolean closed;
      }
      ```

      倒排索引文件

      索引后缀:.tip,.tim

      倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。

      5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:[http://www.cnblogs.com/bonelee/p/6226185.html ](http://www.cnblogs.com/bonelee/p/6226185.html )

      [http://examples.mikemccandless ... %2Bit](http://examples.mikemccandless ... d%2Bit) 为FST图实例,可以根据输入的数据构造出FST图

      <br /> 输入到 FST 中的数据为:<br /> String inputValues[] = {"mop","moth","pop","star","stop","top"};<br /> long outputValues[] = {0,1,2,3,4,5};<br />
      生成的 FST 图为:

      elasticsearch_store_tip1.png



      elasticsearch_store_tip2.png



      文件示例


      elasticsearch_store_tip3.png




      具体实现类

      ```
      public final class BlockTreeTermsReader extends FieldsProducer {
      // Open input to the main terms dict file (_X.tib)
      final IndexInput termsIn;
      // Reads the terms dict entries, to gather state to
      // produce DocsEnum on demand
      final PostingsReaderBase postingsReader;
      private final TreeMap<String,FieldReader> fields = new TreeMap<>();

      / File offset where the directory starts in the terms file. */
      /索引数据文件tim的数据的尾部的元数据的地址
      private long dirOffset;
      /* File offset where the directory starts in the index file. /

      //索引文件tip的数据的尾部的元数据的地址
      private long indexDirOffset;

      //semgent的名称
      final String segment;

      //版本号
      final int version;

      //5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
      final boolean anyAutoPrefixTerms;
      }

      /

    • BlockTree's implementation of {@link Terms}.
    • @lucene.internal
      */
      public final class FieldReader extends Terms implements Accountable {

      //term的数量
      final long numTerms;

      //field信息
      final FieldInfo fieldInfo;

      final long sumTotalTermFreq;

      //总的文档频率
      final long sumDocFreq;

      //文档数量
      final int docCount;

      //字段在索引文件tip中的起始位置
      final long indexStartFP;

      final long rootBlockFP;

      final BytesRef rootCode;

      final BytesRef minTerm;

      final BytesRef maxTerm;

      //longs:metadata buffer, holding monotonic values
      final int longsSize;

      final BlockTreeTermsReader parent;

      final FST index;
      }
      ```

      倒排链文件

      文件后缀:.doc, .pos, .pay

      .doc保存了每个term的doc id列表和term在doc中的词频

      全文索引的字段,会有.pos文件,保存了term在doc中的位置

      全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性

      文件示例


      elasticsearch_store_doc.png




      具体实现类

      ```
      /**

    • Concrete class that reads docId(maybe frq,pos,offset,payloads) list
    • with postings format.
      *
    • @lucene.experimental
      */
      public final class Lucene50PostingsReader extends PostingsReaderBase {
      private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
      private final IndexInput docIn;
      private final IndexInput posIn;
      private final IndexInput payIn;
      final ForUtil forUtil;
      private int version;

      //不分词的字段使用的是该对象,基于skiplist实现了倒排链
      final class BlockDocsEnum extends PostingsEnum {
      }

      //全文检索字段使用的是该对象
      final class BlockPostingsEnum extends PostingsEnum {
      }

      //包含高级特性的字段使用的是该对象
      final class EverythingEnum extends PostingsEnum {
      }
      }
      ```

      列存文件(docvalues)

      文件后缀:.dvm, .dvd

      索引文件为.dvm,数据文件为.dvd。

      lucene实现的docvalues有如下类型:

  • 1、NONE 不开启docvalue时的状态
  • 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
  • 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
  • 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
  • 5、SORTED_NUMERIC 存储数值类型的有序数组列表
  • 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
  • 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型

    其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。

    文件示例


    elasticsearch_store_dvd.png




    具体实现类

    <br /> /** reader for {@link Lucene54DocValuesFormat} */<br /> final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {<br /> //number类型的field的列存列表<br /> private final Map<String,NumericEntry> numerics = new HashMap<>();<br /> <br /> //字符串类型的field的列存列表<br /> private final Map<String,BinaryEntry> binaries = new HashMap<>();<br /> <br /> //有序字符串类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();<br /> <br /> //有序number类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();<br /> <br /> //字符串类型的field的ords列表<br /> private final Map<String,NumericEntry> ords = new HashMap<>();<br /> <br /> //docId -> address -> ord 中field的ords列表<br /> private final Map<String,NumericEntry> ordIndexes = new HashMap<>();<br /> <br /> //field的数量<br /> private final int numFields;<br /> <br /> //内存使用量<br /> private final AtomicLong ramBytesUsed;<br /> <br /> //数据源的文件句柄<br /> private final IndexInput data;<br /> <br /> //文档数<br /> private final int maxDoc;<br /> // memory-resident structures<br /> private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();<br /> private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();<br /> private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();<br /> <br /> //是否正在merge<br /> private final boolean merging;<br /> }<br /> <br /> /** metadata entry for a numeric docvalues field */<br /> static class NumericEntry {<br /> private NumericEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> <br /> /** offset to the actual numeric values */<br /> //field的在数据文件中的起始地址<br /> public long offset;<br /> <br /> /** end offset to the actual numeric values */<br /> //field的在数据文件中的结尾地址<br /> public long endOffset;<br /> <br /> /** bits per value used to pack the numeric values */<br /> public int bitsPerValue;<br /> <br /> //format类型<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> /** monotonic meta */<br /> public DirectMonotonicReader.Meta monotonicMeta;<br /> <br /> //最小的value<br /> long minValue;<br /> <br /> //Compressed by computing the GCD<br /> long gcd;<br /> <br /> //Compressed by giving IDs to unique values.<br /> long table[];<br /> /** for sparse compression */<br /> long numDocsWithValue;<br /> NumericEntry nonMissingValues;<br /> NumberType numberType;<br /> }<br /> <br /> /** metadata entry for a binary docvalues field */<br /> static class BinaryEntry {<br /> private BinaryEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> /** offset to the actual binary values */<br /> //field的在数据文件中的起始地址<br /> long offset;<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> <br /> //最短字符串的长度<br /> int minLength;<br /> <br /> //最长字符串的长度<br /> int maxLength;<br /> /** offset to the addressing data that maps a value to its slice of the byte[] */<br /> public long addressesOffset, addressesEndOffset;<br /> /** meta data for addresses */<br /> public DirectMonotonicReader.Meta addressesMeta;<br /> /** offset to the reverse index */<br /> public long reverseIndexOffset;<br /> /** packed ints version used to encode addressing information */<br /> public int packedIntsVersion;<br /> /** packed ints blocksize */<br /> public int blockSize;<br /> }<br />

    参考资料


    [lucene source code](https://github.com/apache/luce ... /5.5.0)

    [lucene document](https://lucene.apache.org/core/5_5_0/)

    [lucene字典实现原理——FST](http://www.cnblogs.com/bonelee/p/6226185.html )

请问es适合类似于百度搜索这种样子吗?

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 2349 次浏览 • 2018-12-08 00:27 • 来自相关话题

社区日报 第471期 (2018-12-07)

社区日报laoyang360 发表了文章 • 0 个评论 • 1767 次浏览 • 2018-12-07 08:41 • 来自相关话题

1、使用Elasticsearch作为主数据存储实践
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P

编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily

Day 6 - Logstash Pipeline-to-Pipeline 尝鲜

Adventrockybean 发表了文章 • 3 个评论 • 10598 次浏览 • 2018-12-06 23:40 • 来自相关话题

Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

![](https://ws1.sinaimg.cn/large/6 ... xr.jpg)

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml中定义好即可,如下所示:

```yaml

  • pipeline.id: apache
    pipeline.batch.size: 125
    queue.type: persisted
    path.config: "/path/to/config/apache.cfg"
  • pipeline.id: nginx
    path.config: "/path/to/config/nginx.cfg"
    ``<br /> <br /> 其中apachenginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。<br /> <br /> 在 6.3 版本中,Logstash 又增加了Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令|`的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。



    1. 上手


    废话少说,快速上手。修改 config/pipeline.yml文件如下:

    ```yaml

    • pipeline.id: upstream
      config.string: input { stdin {} } output { pipeline { send_to => [test_output] } }
    • pipeline.id: downstream
      config.string: input { pipeline { address => test_output } } output{ stdout{}}
      ``<br /> <br /> <br /> <br /> 然后运行 logstash,其中-r` 表示配置文件有改动时自动重新加载,方便我们调试。

      bin/logstash -r

      在终端随意输入字符(比如aaa)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。

      json<br /> {<br /> "@timestamp" => 2018-12-06T14:43:50.310Z,<br /> "@version" => "1",<br /> "message" => "aaa",<br /> "host" => "rockybean-MacBook-Pro.local"<br /> }<br />

      我们再回头看下这个配置,upstreamoutput 使用了名为 pipeline 的 plugin,然后 send_to的输出对象test_output是在 downstreaminput pipeline plugin 中定义的。通过这个唯一的address(虚拟地址)就能够把不同的 pipeline 连接在一起组成一个更长的pipeline来处理数据。类似下图所示:

      ![](https://ws1.sinaimg.cn/large/6 ... y9.jpg)



      当数据由 upstream传递给 downstream时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。

      2. 使用场景


      使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。

      2.1 Distributor Pattern 分发者模式


      该模式执行效果类似下图所示:

      ![](https://ws1.sinaimg.cn/large/6 ... l1.jpg)

      在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline的方式,我们如果轻松做到一个端口处理多个来源的数据呢?

      这种模式的参考配置如下所示:

      ```yaml

      config/pipelines.yml

  • pipeline.id: beats-server
    config.string: |
    input { beats { port => 5044 } }
    output {
    if [type] == apache {
    pipeline { send_to => weblogs }
    } else if [type] == system {
    pipeline { send_to => syslog }
    } else {
    pipeline { send_to => fallback }
    }
    }
  • pipeline.id: weblog-processing
    config.string: |
    input { pipeline { address => weblogs } }
    filter {

    Weblog filter statements here...

    }
    output {
    elasticsearch { hosts => [es_cluster_a_host] }
    }

  • pipeline.id: syslog-processing
    config.string: |
    input { pipeline { address => syslog } }
    filter {

    Syslog filter statements here...

    }
    output {
    elasticsearch { hosts => [es_cluster_b_host] }
    }

  • pipeline.id: fallback-processing
    config.string: |
    input { pipeline { address => fallback } }
    output { elasticsearch { hosts => [es_cluster_b_host] } }
    ```



    2.2 Output Isolator Pattern 输出隔离模式


    虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

    ![](https://ws1.sinaimg.cn/large/6 ... 8p.jpg)

    通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:

    ```yaml

    config/pipelines.yml

  • pipeline.id: intake
    queue.type: persisted
    config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [es, http] } }
  • pipeline.id: buffered-es
    queue.type: persisted
    config.string: |
    input { pipeline { address => es } }
    output { elasticsearch { } }
  • pipeline.id: buffered-http
    queue.type: persisted
    config.string: |
    input { pipeline { address => http } }
    output { http { } }
    ```

    2.3 Forked Path Pattern 克隆路径模式


    这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:

    ```

    config/pipelines.yml

  • pipeline.id: intake
    queue.type: persisted
    config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => ["internal-es", "partner-s3"] } }
  • pipeline.id: buffered-es
    queue.type: persisted
    config.string: |
    input { pipeline { address => "internal-es" } }

    Index the full event

    output { elasticsearch { } }

  • pipeline.id: partner
    queue.type: persisted
    config.string: |
    input { pipeline { address => "partner-s3" } }
    filter {

    Remove the sensitive data

    mutate { remove_field => 'sensitive-data' }
    }
    output { s3 { } } # Output to partner's bucket
    ```

    2.4 Collector Pattern 收集者模式


    从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

    ![](https://ws1.sinaimg.cn/large/6 ... 1y.jpg)

    其配置参考如下:

    ```

    config/pipelines.yml

  • pipeline.id: beats
    config.string: |
    input { beats { port => 5044 } }
    output { pipeline { send_to => [commonOut] } }
  • pipeline.id: kafka
    config.string: |
    input { kafka { ... } }
    output { pipeline { send_to => [commonOut] } }
  • pipeline.id: partner

    This common pipeline enforces the same logic whether data comes from Kafka or Beats

    config.string: |
    input { pipeline { address => commonOut } }
    filter {

    Always remove sensitive data from all input sources

    mutate { remove_field => 'sensitive-data' }
    }
    output { elasticsearch { } }
    ```



    3. 总结


    本文简单给大家讲解了 Pipeline-to-Pipeline的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!

build_scorer很长时间

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 3735 次浏览 • 2018-12-10 16:44 • 来自相关话题

metricbeat windows版本无法采集 fd指标

Beatsrojay 回复了问题 • 2 人关注 • 1 个回复 • 3101 次浏览 • 2019-03-09 15:39 • 来自相关话题

拼音加IK的搜索怎么固定顺序

ElasticsearchCircle 回复了问题 • 3 人关注 • 2 个回复 • 2281 次浏览 • 2018-12-06 15:42 • 来自相关话题

关于分词干预问题

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 2046 次浏览 • 2018-12-06 15:34 • 来自相关话题

社区日报 第470期 (2018-12-06)

社区日报白衬衣 发表了文章 • 0 个评论 • 1596 次浏览 • 2018-12-06 15:06 • 来自相关话题

1.图解 Elasticsearch2.2.0 原理
http://t.cn/Eybu8a3
2.ElasticSearch 恢复类型之对等恢复
http://t.cn/EyaCYFV
3.使用带注释的文本插件搜索事物
http://t.cn/EyaCntg

编辑:金桥
归档:https://elasticsearch.cn/article/6175
订阅:https://tinyletter.com/elastic-daily