filebeat -> logstash 失败的问题
Beats • medcl 回复了问题 • 3 人关注 • 2 个回复 • 13707 次浏览 • 2018-12-10 15:52
bulk写入数据时,READ非常高
Elasticsearch • Judge 回复了问题 • 4 人关注 • 2 个回复 • 3168 次浏览 • 2019-03-11 16:12
社区日报 第472期 (2018-12-08)
社区日报 • bsll 发表了文章 • 0 个评论 • 1590 次浏览 • 2018-12-08 11:23
- 用于ES数据警报的实时守护进程。
[http://t.cn/EyQmiuE](http://t.cn/EyQmiuE)
- 最小的ibana Docker镜像。
[http://t.cn/EyQbwGk](http://t.cn/EyQbwGk)
- Lucene列式存储格式DocValues详解。
[http://t.cn/EyQ6pkK](http://t.cn/EyQ6pkK)
Day 8 - 如何使用Spark快速将数据写入Elasticsearch
Advent • Ricky Huo 发表了文章 • 0 个评论 • 11580 次浏览 • 2018-12-08 09:58
如何使用Spark快速将数据写入Elasticsearch
说到数据写入Elasticsearch,最先想到的肯定是Logstash。Logstash因为其简单上手、可扩展、可伸缩等优点被广大用户接受。但是尺有所短,寸有所长,Logstash肯定也有它无法适用的应用场景,比如:
- 海量数据ETL
- 海量数据聚合
- 多源数据处理
为了满足这些场景,很多同学都会选择Spark,借助Spark算子进行数据处理,最后将处理结果写入Elasticsearch。
我们部门之前利用Spark对Nginx日志进行分析,统计我们的Web服务访问情况,将Nginx日志每分钟聚合一次最后将结果写入Elasticsearch,然后利用Kibana配置实时监控Dashboard。Elasticsearch和Kibana都很方便、实用,但是随着类似需求越来越多,如何快速通过Spark将数据写入Elasticsearch成为了我们的一大问题。
今天给大家推荐一款能够实现数据快速写入的黑科技——[Waterdrop](https://github.com/InterestingLab/waterdrop),一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上,简单易用,灵活配置,无需开发。
Kafka to Elasticsearch
和Logstash一样,Waterdrop同样支持多种类型的数据输入,这里我们以最常见的Kakfa作为输入源为例,讲解如何使用Waterdrop将数据快速写入Elasticsearch
Log Sample
原始日志格式如下:
<br /> 127.0.0.1 elasticsearch.cn 114.250.140.241 0.001s "127.0.0.1:80" [26/Oct/2018:21:54:32 +0800] "GET /article HTTP/1.1" 200 123 "-" - "Dalvik/2.1.0 (Linux; U; Android 7.1.1; OPPO R11 Build/NMF26X)"<br />
Elasticsearch Document
我们想要统计,一分钟每个域名的访问情况,聚合完的数据有以下字段:
<br /> domain String<br /> hostname String<br /> status int<br /> datetime String<br /> count int<br />
Waterdrop with Elasticsearch
接下来会给大家详细介绍,我们如何通过Waterdrop读取Kafka中的数据,对数据进行解析以及聚合,最后将处理结果写入Elasticsearch中。
Waterdrop
[Waterdrop](https://github.com/InterestingLab/waterdrop)同样拥有着非常丰富的插件,支持从Kafka、HDFS、Hive中读取数据,进行各种各样的数据处理,并将结果写入Elasticsearch、Kudu或者Kafka中。
Prerequisites
首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量- 准备Spark环境
- 安装Waterdrop
- 配置Waterdrop
以下是简易步骤,具体安装可以参照[Quick Start](https://interestinglab.github. ... -start)
```yaml
cd /usr/local
wget https://archive.apache.org/dis ... 7.tgz
tar -xvf https://archive.apache.org/dis ... 7.tgz
wget https://github.com/Interesting ... 1.zip
unzip waterdrop-1.1.1.zip
cd waterdrop-1.1.1
vim config/waterdrop-env.sh
指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
```
Waterdrop Pipeline
与Logstash一样,我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入,相信了解Logstash的朋友可以很快入手Waterdrop配置。
配置文件包括四个部分,分别是Spark、Input、filter和Output。
Spark
这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。
<br /> spark {<br /> spark.app.name = "Waterdrop"<br /> spark.executor.instances = 2<br /> spark.executor.cores = 1<br /> spark.executor.memory = "1g"<br /> }<br />
Input
这一部分定义数据源,如下是从Kafka中读取数据的配置案例,
<br /> kafkaStream {<br /> topics = "waterdrop-es"<br /> consumer.bootstrap.servers = "localhost:9092"<br /> consumer.group.id = "waterdrop_es_group"<br /> consumer.rebalance.max.retries = 100<br /> }<br />
Filter
在Filter部分,这里我们配置一系列的转化,包括正则解析将日志进行拆分、时间转换将HTTPDATE转化为Elasticsearch支持的日期格式、对Number类型的字段进行类型转换以及通过SQL进行数据聚合
```yaml
filter {使用正则解析原始日志
最开始数据都在raw_message字段中
grok {
source_field = "raw_message"
pattern = '%{NOTSPACE:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
}将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
Elasticsearch中支持的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:ss.SSS+08:00"
}利用SQL对数据进行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, int(status), datetime, count(*) from access_log group by domain, hostname, status, datetime"
}
}
```
Output
最后我们将处理好的结构化数据写入Elasticsearch。
yaml<br /> output {<br /> elasticsearch {<br /> hosts = ["localhost:9200"]<br /> index = "waterdrop-${now}"<br /> es.batch.size.entries = 100000<br /> index_time_format = "yyyy.MM.dd"<br /> }<br /> }<br />
Running Waterdrop
我们将上述四部分配置组合成为我们的配置文件config/batch.conf
。
vim config/batch.conf
```
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
kafkaStream {
topics = "waterdrop-es"
consumer.bootstrap.servers = "localhost:9092"
consumer.group.id = "waterdrop_es_group"
consumer.rebalance.max.retries = 100
}
}
filter {使用正则解析原始日志
最开始数据都在raw_message字段中
grok {
source_field = "raw_message"
pattern = '%{IP:hostname}\s%{NOTSPACE:domain}\s%{IP:remote_addr}\s%{NUMBER:request_time}s\s\"%{DATA:upstream_ip}\"\s\[%{HTTPDATE:timestamp}\]\s\"%{NOTSPACE:method}\s%{DATA:url}\s%{NOTSPACE:http_ver}\"\s%{NUMBER:status}\s%{NUMBER:body_bytes_send}\s%{DATA:referer}\s%{NOTSPACE:cookie_info}\s\"%{DATA:user_agent}'
}将"dd/MMM/yyyy:HH:mm:ss Z"格式的数据转换为
Elasticsearch中支持的格式
date {
source_field = "timestamp"
target_field = "datetime"
source_time_format = "dd/MMM/yyyy:HH:mm:ss Z"
target_time_format = "yyyy-MM-dd'T'HH:mm:00.SSS+08:00"
}利用SQL对数据进行聚合
sql {
table_name = "access_log"
sql = "select domain, hostname, status, datetime, count(*) from access_log group by domain, localhost, status, datetime"
}
}
output {
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-${now}"
es.batch.size.entries = 100000
index_timeformat = "yyyy.MM.dd"
}
}
```
执行命令,指定配置文件,运行Waterdrop,即可将数据写入Elasticsearch。这里我们以本地模式为例。
./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'
最后,写入Elasticsearch中的数据如下,再配上Kibana就可以实现Web服务的实时监控了^^.
<br /> "_source": {<br /> "domain": "elasticsearch.cn",<br /> "hostname": "localhost",<br /> "status": "200",<br /> "datetime": "2018-11-26T21:54:00.000+08:00",<br /> "count": 26<br /> }<br />
Conclusion
在这篇文章中,我们介绍了如何通过Waterdrop将Kafka中的数据写入Elasticsearch中。仅仅通过一个配置文件便可快速运行一个Spark Application,完成数据的处理、写入,无需编写任何代码,十分简单。
当数据处理过程中有遇到Logstash无法支持的场景或者Logstah性能无法达到预期的情况下,都可以尝试使用Waterdrop解决问题。
希望了解Waterdrop与Elasticsearch、Kafka、Hadoop结合使用的更多功能和案例,可以直接进入项目主页[https://github.com/InterestingLab/waterdrop](https://github.com/InterestingLab/waterdrop)
我们近期会再发布一篇《如何用Spark和Elasticsearch做交互式数据分析》,敬请期待.
Contract us
欢迎联系我们交流Spark和Elasticsearch:
Garyelephant: 微信: garyelephant
RickyHuo: 微信: chodomatte1994
- 准备Spark环境
es每次get都把内存吃爆了。。。
Elasticsearch • huigy 回复了问题 • 7 人关注 • 7 个回复 • 4767 次浏览 • 2018-12-15 15:05
kibana 作图 advanced 里的json input怎么使用
Kibana • medcl 回复了问题 • 2 人关注 • 1 个回复 • 9654 次浏览 • 2018-12-10 15:47
Day 7 - Elasticsearch中数据是如何存储的
Advent • weizijun 发表了文章 • 7 个评论 • 75179 次浏览 • 2018-12-07 13:55
前言
很多使用Elasticsearch的同学会关心数据存储在ES中的存储容量,会有这样的疑问:xxTB的数据入到ES会使用多少存储空间。这个问题其实很难直接回答的,只有数据写入ES后,才能观察到实际的存储空间。比如同样是1TB的数据,写入ES的存储空间可能差距会非常大,可能小到只有300~400GB,也可能多到6-7TB,为什么会造成这么大的差距呢?究其原因,我们来探究下Elasticsearch中的数据是如何存储。文章中我以Elasticsearch 2.3版本为示例,对应的lucene版本是5.5,Elasticsearch现在已经来到了6.5版本,数字类型、列存等存储结构有些变化,但基本的概念变化不多,文章中的内容依然适用。
Elasticsearch索引结构
Elasticsearch对外提供的是index的概念,可以类比为DB,用户查询是在index上完成的,每个index由若干个shard组成,以此来达到分布式可扩展的能力。比如下图是一个由10个shard组成的index。
shard是Elasticsearch数据存储的最小单位,index的存储容量为所有shard的存储容量之和。Elasticsearch集群的存储容量则为所有index存储容量之和。
一个shard就对应了一个lucene的library。对于一个shard,Elasticsearch增加了translog的功能,类似于HBase WAL,是数据写入过程中的中间数据,其余的数据都在lucene库中管理的。
所以Elasticsearch索引使用的存储内容主要取决于lucene中的数据存储。
lucene数据存储
下面我们主要看下lucene的文件内容,在了解lucene文件内容前,大家先了解些lucene的基本概念。
lucene基本概念
- segment : lucene内部的数据是由一个个segment组成的,写入lucene的数据并不直接落盘,而是先写在内存中,经过了refresh间隔,lucene才将该时间段写入的全部数据refresh成一个segment,segment多了之后会进行merge成更大的segment。lucene查询时会遍历每个segment完成。由于lucene* 写入的数据是在内存中完成,所以写入效率非常高。但是也存在丢失数据的风险,所以Elasticsearch基于此现象实现了translog,只有在segment数据落盘后,Elasticsearch才会删除对应的translog。
- doc : doc表示lucene中的一条记录
- field :field表示记录中的字段概念,一个doc由若干个field组成。
- term :term是lucene中索引的最小单位,某个field对应的内容如果是全文检索类型,会将内容进行分词,分词的结果就是由term组成的。如果是不分词的字段,那么该字段的内容就是一个term。
- 倒排索引(inverted index): lucene索引的通用叫法,即实现了term到doc list的映射。
- 正排数据:搜索引擎的通用叫法,即原始数据,可以理解为一个doc list。
- docvalues :Elasticsearch中的列式存储的名称,Elasticsearch除了存储原始存储、倒排索引,还存储了一份docvalues,用作分析和排序。
lucene文件内容
lucene包的文件是由很多segment文件组成的,segments_xxx文件记录了lucene包下面的segment文件数量。每个segment会包含如下的文件。
| Name | Extension | Brief Description |
| ------ | ------ | ------ |
|Segment Info|.si|segment的元数据文件|
|Compound File|.cfs, .cfe|一个segment包含了如下表的各个文件,为减少打开文件的数量,在segment小的时候,segment的所有文件内容都保存在cfs文件中,cfe文件保存了lucene各文件在cfs文件的位置信息|
|Fields|.fnm|保存了fields的相关信息|
|Field Index|.fdx|正排存储文件的元数据信息|
|Field Data|.fdt|存储了正排存储数据,写入的原文存储在这|
|Term Dictionary|.tim|倒排索引的元数据信息|
|Term Index|.tip|倒排索引文件,存储了所有的倒排索引数据|
|Frequencies|.doc|保存了每个term的doc id列表和term在doc中的词频|
|Positions|.pos|Stores position information about where a term occurs in the index
全文索引的字段,会有该文件,保存了term在doc中的位置|
|Payloads|.pay|Stores additional per-position metadata information such as character offsets and user payloads
全文索引的字段,使用了一些像payloads的高级特性会有该文件,保存了term在doc中的一些高级特性|
|Norms|.nvd, .nvm|文件保存索引字段加权数据|
|Per-Document Values|.dvd, .dvm|lucene的docvalues文件,即数据的列式存储,用作聚合和排序|
|Term Vector Data|.tvx, .tvd, .tvf|Stores offset into the document data file
保存索引字段的矢量信息,用在对term进行高亮,计算文本相关性中使用|
|Live Documents|.liv|记录了segment中删除的doc|
测试数据示例
下面我们以真实的数据作为示例,看看lucene中各类型数据的容量占比。
写100w数据,有一个uuid字段,写入的是长度为36位的uuid,字符串总为3600w字节,约为35M。
数据使用一个shard,不带副本,使用默认的压缩算法,写入完成后merge成一个segment方便观察。
使用线上默认的配置,uuid存为不分词的字符串类型。创建如下索引:
<br /> PUT test_field<br /> {<br /> "settings": {<br /> "index": {<br /> "number_of_shards": "1",<br /> "number_of_replicas": "0",<br /> "refresh_interval": "30s"<br /> }<br /> },<br /> "mappings": {<br /> "type": {<br /> "_all": {<br /> "enabled": false<br /> }, <br /> "properties": {<br /> "uuid": {<br /> "type": "string",<br /> "index": "not_analyzed"<br /> }<br /> }<br /> }<br /> }<br /> }<br />
首先写入100w不同的uuid,使用磁盘容量细节如下:
<br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 122.7mb 122.7mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 19 21:23 _8.fdt<br /> -rw-r--r-- 1 weizijun staff 17K Aug 19 21:23 _8.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:23 _8.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:23 _8.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 19 21:23 _8_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 19 21:23 _8_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 340K Aug 19 21:23 _8_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 19 21:23 _8_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 19 21:23 _8_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:23 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:20 write.lock<br />
可以看到正排数据、倒排索引数据,列存数据容量占比几乎相同,正排数据和倒排数据还会存储Elasticsearch的唯一id字段,所以容量会比列存多一些。
35M的uuid存入Elasticsearch后,数据膨胀了3倍,达到了122.7mb。Elasticsearch竟然这么消耗资源,不要着急下结论,接下来看另一个测试结果。
我们写入100w一样的uuid,然后看看Elasticsearch使用的容量。
<br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.2mb 13.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 5.5M Aug 19 21:29 _6.fdt<br /> -rw-r--r-- 1 weizijun staff 15K Aug 19 21:29 _6.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 19 21:29 _6.fnm<br /> -rw-r--r-- 1 weizijun staff 494B Aug 19 21:29 _6.si<br /> -rw-r--r-- 1 weizijun staff 309K Aug 19 21:29 _6_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 7.0M Aug 19 21:29 _6_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 195K Aug 19 21:29 _6_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 244K Aug 19 21:29 _6_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 252B Aug 19 21:29 _6_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 195B Aug 19 21:29 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 19 21:26 write.lock<br />
这回35M的数据Elasticsearch容量只有13.2mb,其中还有主要的占比还是Elasticsearch的唯一id,100w的uuid几乎不占存储容积。
所以在Elasticsearch中建立索引的字段如果基数越大(count distinct),越占用磁盘空间。
我们再看看存100w个不一样的整型会是如何。
<br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 13.6mb 13.6mb <br /> <br /> -rw-r--r-- 1 weizijun staff 6.1M Aug 28 10:19 _42.fdt<br /> -rw-r--r-- 1 weizijun staff 22K Aug 28 10:19 _42.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 28 10:19 _42.fnm<br /> -rw-r--r-- 1 weizijun staff 503B Aug 28 10:19 _42.si<br /> -rw-r--r-- 1 weizijun staff 2.8M Aug 28 10:19 _42_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 2.2M Aug 28 10:19 _42_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 83K Aug 28 10:19 _42_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 2.5M Aug 28 10:19 _42_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 228B Aug 28 10:19 _42_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 28 10:19 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 28 10:16 write.lock<br />
从结果可以看到,100w整型数据,Elasticsearch的存储开销为13.6mb。如果以int型计算100w数据的长度的话,为400w字节,大概是3.8mb数据。忽略Elasticsearch唯一id字段的影响,Elasticsearch实际存储容量跟整型数据长度差不多。
我们再看一下开启最佳压缩参数对存储空间的影响:
<br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 107.2mb 107.2mb <br /> <br /> -rw-r--r-- 1 weizijun staff 25M Aug 20 12:30 _5.fdt<br /> -rw-r--r-- 1 weizijun staff 6.0K Aug 20 12:30 _5.fdx<br /> -rw-r--r-- 1 weizijun staff 688B Aug 20 12:31 _5.fnm<br /> -rw-r--r-- 1 weizijun staff 500B Aug 20 12:31 _5.si<br /> -rw-r--r-- 1 weizijun staff 265K Aug 20 12:31 _5_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 44M Aug 20 12:31 _5_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 322K Aug 20 12:31 _5_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 20 12:31 _5_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 20 12:31 _5_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 224B Aug 20 12:31 segments_4<br /> -rw-r--r-- 1 weizijun staff 0B Aug 20 12:00 write.lock<br />
结果中可以发现,只有正排数据会启动压缩,压缩能力确实强劲,不考虑唯一id字段,存储容量大概压缩到接近50%。
我们还做了一些实验,Elasticsearch默认是开启_all参数的,_all可以让用户传入的整体json数据作为全文检索的字段,可以更方便的检索,但在现实场景中已经使用的不多,相反会增加很多存储容量的开销,可以看下开启_all的磁盘空间使用情况:
<br /> <br /> health status index pri rep docs.count docs.deleted store.size pri.store.size <br /> green open test_field 1 0 1000000 0 162.4mb 162.4mb <br /> <br /> -rw-r--r-- 1 weizijun staff 41M Aug 18 22:59 _20.fdt<br /> -rw-r--r-- 1 weizijun staff 18K Aug 18 22:59 _20.fdx<br /> -rw-r--r-- 1 weizijun staff 777B Aug 18 22:59 _20.fnm<br /> -rw-r--r-- 1 weizijun staff 59B Aug 18 22:59 _20.nvd<br /> -rw-r--r-- 1 weizijun staff 78B Aug 18 22:59 _20.nvm<br /> -rw-r--r-- 1 weizijun staff 539B Aug 18 22:59 _20.si<br /> -rw-r--r-- 1 weizijun staff 7.2M Aug 18 22:59 _20_Lucene50_0.doc<br /> -rw-r--r-- 1 weizijun staff 4.2M Aug 18 22:59 _20_Lucene50_0.pos<br /> -rw-r--r-- 1 weizijun staff 73M Aug 18 22:59 _20_Lucene50_0.tim<br /> -rw-r--r-- 1 weizijun staff 832K Aug 18 22:59 _20_Lucene50_0.tip<br /> -rw-r--r-- 1 weizijun staff 37M Aug 18 22:59 _20_Lucene54_0.dvd<br /> -rw-r--r-- 1 weizijun staff 254B Aug 18 22:59 _20_Lucene54_0.dvm<br /> -rw-r--r-- 1 weizijun staff 196B Aug 18 22:59 segments_2<br /> -rw-r--r-- 1 weizijun staff 0B Aug 18 22:53 write.lock<br /> <br />
开启_all比不开启多了40mb的存储空间,多的数据都在倒排索引上,大约会增加30%多的存储开销。所以线上都直接禁用。
然后我还做了其他几个尝试,为了验证存储容量是否和数据量成正比,写入1000w数据的uuid,发现存储容量基本为100w数据的10倍。我还验证了数据长度是否和数据量成正比,发现把uuid增长2倍、4倍,存储容量也响应的增加了2倍和4倍。在此就不一一列出数据了。
lucene各文件具体内容和实现
lucene数据元信息文件
文件名为:segments_xxx
该文件为lucene数据文件的元信息文件,记录所有segment的元数据信息。
该文件主要记录了目前有多少segment,每个segment有一些基本信息,更新这些信息定位到每个segment的元信息文件。
lucene元信息文件还支持记录userData,Elasticsearch可以在此记录translog的一些相关信息。文件示例
具体实现类
```
public final class SegmentInfos implements Cloneable, Iterable{
// generation是segment的版本的概念,从文件名中提取出来,实例中为:2t/101
private long generation; // generation of the "segments_N" for the next commit
private long lastGeneration; // generation of the "segments_N" file we last successfully read
// or wrote; this is normally the same as generation except if
// there was an IOException that had interrupted a commit
/ Id for this commit; only written starting with Lucene 5.0 */
private byte[] id;
/* Which Lucene version wrote this commit, or null if this commit is pre-5.3. /
private Version luceneVersion;
/ Counts how often the index has been changed. */
public long version;
/ Used to name new segments. */
// TODO: should this be a long ...?
public int counter;
/* Version of the oldest segment in the index, or null if there are no segments. /
private Version minSegmentLuceneVersion;
private Listsegments = new ArrayList<>(); Opaque Map<String, String> that user can specify during IndexWriter.commit */
/
public Map<String,String> userData = Collections.emptyMap();
}
/** Embeds a [read-only] SegmentInfo and adds per-commit- fields.
* - @lucene.experimental */
public class SegmentCommitInfo {
/* The {@link SegmentInfo} that we wrap. /
public final SegmentInfo info;
// How many deleted docs in the segment:
private int delCount;
// Generation number of the live docs file (-1 if there
// are no deletes yet):
private long delGen;
// Normally 1+delGen, unless an exception was hit on last
// attempt to write:
private long nextWriteDelGen;
// Generation number of the FieldInfos (-1 if there are no updates)
private long fieldInfosGen;
// Normally 1+fieldInfosGen, unless an exception was hit on last attempt to
// write
private long nextWriteFieldInfosGen; //fieldInfosGen == -1 ? 1 : fieldInfosGen + 1;
// Generation number of the DocValues (-1 if there are no updates)
private long docValuesGen;
// Normally 1+dvGen, unless an exception was hit on last attempt to
// write
private long nextWriteDocValuesGen; //docValuesGen == -1 ? 1 : docValuesGen + 1;
// TODO should we add .files() to FieldInfosFormat, like we have on
// LiveDocsFormat?
// track the fieldInfos update files
private final SetfieldInfosFiles = new HashSet<>();
// Track the per-field DocValues update files
private final Map<Integer,Set> dvUpdatesFiles = new HashMap<>();
// Track the per-generation updates files
@Deprecated
private final Map<Long,Set> genUpdatesFiles = new HashMap<>();
private volatile long sizeInBytes = -1;
}
```
segment的元信息文件
文件后缀:.si
每个segment都有一个.si文件,记录了该segment的元信息。
segment元信息文件中记录了segment的文档数量,segment对应的文件列表等信息。
文件示例
具体实现类
```
/** - Information about a segment such as its name, directory, and files related
- to the segment.
* - @lucene.experimental
*/
public final class SegmentInfo {
// _bl
public final String name;
/* Where this segment resides. /
public final Directory dir;
/* Id that uniquely identifies this segment. /
private final byte[] id;
private Codec codec;
// Tracks the Lucene version this segment was created with, since 3.1. Null
// indicates an older than 3.0 index, and it's used to detect a too old index.
// The format expected is "x.y" - "2.x" for pre-3.0 indexes (or null), and
// specific versions afterwards ("3.0.0", "3.1.0" etc.).
// see o.a.l.util.Version.
private Version version;
private int maxDoc; // number of docs in seg
private boolean isCompoundFile;
private Map<String,String> diagnostics;
private SetsetFiles;
private final Map<String,String> attributes;
}
```
fields信息文件
文件后缀:.fnm
该文件存储了fields的基本信息。
fields信息中包括field的数量,field的类型,以及IndexOpetions,包括是否存储、是否索引,是否分词,是否需要列存等等。
文件示例
具体实现类
```
/** - Access to the Field Info file that describes document fields and whether or
- not they are indexed. Each segment has a separate Field Info file. Objects
- of this class are thread-safe for multiple readers, but only one thread can
- be adding documents at a time, with no other reader or writer threads
- accessing this object.
/
public final class FieldInfo {
/ Field's name */
public final String name;
/* Internal field number /
//field在内部的编号
public final int number;
//field docvalues的类型
private DocValuesType docValuesType = DocValuesType.NONE;
// True if any document indexed term vectors
private boolean storeTermVector;
private boolean omitNorms; // omit norms associated with indexed fields
//index的配置项
private IndexOptions indexOptions = IndexOptions.NONE;
private boolean storePayloads; // whether this field stores payloads together with term positions
private final Map<String,String> attributes;
// docvalues的generation
private long dvGen;
}
```
数据存储文件
文件后缀:.fdx, .fdt
索引文件为.fdx,数据文件为.fdt,数据存储文件功能为根据自动的文档id,得到文档的内容,搜索引擎的术语习惯称之为正排数据,即doc_id -> content,es的_source数据就存在这
索引文件记录了快速定位文档数据的索引信息,数据文件记录了所有文档id的具体内容。
文件示例
具体实现类
```
/** - Random-access reader for {@link CompressingStoredFieldsIndexWriter}.
- @lucene.internal
*/
public final class CompressingStoredFieldsIndexReader implements Cloneable, Accountable {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(CompressingStoredFieldsIndexReader.class);
final int maxDoc;
//docid索引,快速定位某个docid的数组坐标
final int[] docBases;
//快速定位某个docid所在的文件offset的startPointer
final long[] startPointers;
//平均一个chunk的文档数
final int[] avgChunkDocs;
//平均一个chunk的size
final long[] avgChunkSizes;
final PackedInts.Reader[] docBasesDeltas; // delta from the avg
final PackedInts.Reader[] startPointersDeltas; // delta from the avg
}
/** - {@link StoredFieldsReader} impl for {@link CompressingStoredFieldsFormat}.
- @lucene.experimental
*/
public final class CompressingStoredFieldsReader extends StoredFieldsReader {
//从fdt正排索引文件中获得
private final int version;
// field的基本信息
private final FieldInfos fieldInfos;
//fdt正排索引文件reader
private final CompressingStoredFieldsIndexReader indexReader;
//从fdt正排索引文件中获得,用于指向fdx数据文件的末端,指向numChunks地址4
private final long maxPointer;
//fdx正排数据文件句柄
private final IndexInput fieldsStream;
//块大小
private final int chunkSize;
private final int packedIntsVersion;
//压缩类型
private final CompressionMode compressionMode;
//解压缩处理对象
private final Decompressor decompressor;
//文档数量,从segment元数据中获得
private final int numDocs;
//是否正在merge,默认为false
private final boolean merging;
//初始化时new了一个BlockState,BlockState记录下当前正排文件读取的状态信息
private final BlockState state;
//chunk的数量
private final long numChunks; // number of compressed blocks written
//dirty chunk的数量
private final long numDirtyChunks; // number of incomplete compressed blocks written
//是否close,默认为false
private boolean closed;
}
```
倒排索引文件
索引后缀:.tip,.tim
倒排索引也包含索引文件和数据文件,.tip为索引文件,.tim为数据文件,索引文件包含了每个字段的索引元信息,数据文件有具体的索引内容。
5.5.0版本的倒排索引实现为FST tree,FST tree的最大优势就是内存空间占用非常低 ,具体可以参看下这篇文章:[http://www.cnblogs.com/bonelee/p/6226185.html ](http://www.cnblogs.com/bonelee/p/6226185.html )
[http://examples.mikemccandless ... %2Bit](http://examples.mikemccandless ... d%2Bit) 为FST图实例,可以根据输入的数据构造出FST图
<br /> 输入到 FST 中的数据为:<br /> String inputValues[] = {"mop","moth","pop","star","stop","top"};<br /> long outputValues[] = {0,1,2,3,4,5};<br />
生成的 FST 图为:
文件示例
具体实现类
```
public final class BlockTreeTermsReader extends FieldsProducer {
// Open input to the main terms dict file (_X.tib)
final IndexInput termsIn;
// Reads the terms dict entries, to gather state to
// produce DocsEnum on demand
final PostingsReaderBase postingsReader;
private final TreeMap<String,FieldReader> fields = new TreeMap<>();
/ File offset where the directory starts in the terms file. */
/索引数据文件tim的数据的尾部的元数据的地址
private long dirOffset;
/* File offset where the directory starts in the index file. /
//索引文件tip的数据的尾部的元数据的地址
private long indexDirOffset;
//semgent的名称
final String segment;
//版本号
final int version;
//5.3.x index, we record up front if we may have written any auto-prefix terms,示例中记录的是false
final boolean anyAutoPrefixTerms;
}
/ - BlockTree's implementation of {@link Terms}.
- @lucene.internal
*/
public final class FieldReader extends Terms implements Accountable {
//term的数量
final long numTerms;
//field信息
final FieldInfo fieldInfo;
final long sumTotalTermFreq;
//总的文档频率
final long sumDocFreq;
//文档数量
final int docCount;
//字段在索引文件tip中的起始位置
final long indexStartFP;
final long rootBlockFP;
final BytesRef rootCode;
final BytesRef minTerm;
final BytesRef maxTerm;
//longs:metadata buffer, holding monotonic values
final int longsSize;
final BlockTreeTermsReader parent;
final FSTindex;
}
```
倒排链文件
文件后缀:.doc, .pos, .pay
.doc保存了每个term的doc id列表和term在doc中的词频
全文索引的字段,会有.pos文件,保存了term在doc中的位置
全文索引的字段,使用了一些像payloads的高级特性才会有.pay文件,保存了term在doc中的一些高级特性
文件示例
具体实现类
```
/** - Concrete class that reads docId(maybe frq,pos,offset,payloads) list
- with postings format.
* - @lucene.experimental
*/
public final class Lucene50PostingsReader extends PostingsReaderBase {
private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(Lucene50PostingsReader.class);
private final IndexInput docIn;
private final IndexInput posIn;
private final IndexInput payIn;
final ForUtil forUtil;
private int version;
//不分词的字段使用的是该对象,基于skiplist实现了倒排链
final class BlockDocsEnum extends PostingsEnum {
}
//全文检索字段使用的是该对象
final class BlockPostingsEnum extends PostingsEnum {
}
//包含高级特性的字段使用的是该对象
final class EverythingEnum extends PostingsEnum {
}
}
```
列存文件(docvalues)
文件后缀:.dvm, .dvd
索引文件为.dvm,数据文件为.dvd。
lucene实现的docvalues有如下类型:
- fields.
- 1、NONE 不开启docvalue时的状态
- 2、NUMERIC 单个数值类型的docvalue主要包括(int,long,float,double)
- 3、BINARY 二进制类型值对应不同的codes最大值可能超过32766字节,
- 4、SORTED 有序增量字节存储,仅仅存储不同部分的值和偏移量指针,值必须小于等于32766字节
- 5、SORTED_NUMERIC 存储数值类型的有序数组列表
- 6、SORTED_SET 可以存储多值域的docvalue值,但返回时,仅仅只能返回多值域的第一个docvalue
- 7、对应not_anaylized的string字段,使用的是SORTED_SET类型,number的类型是SORTED_NUMERIC类型
其中SORTED_SET 的 SORTED_SINGLE_VALUED类型包括了两类数据 : binary + numeric, binary是按ord排序的term的列表,numeric是doc到ord的映射。
文件示例
具体实现类
<br /> /** reader for {@link Lucene54DocValuesFormat} */<br /> final class Lucene54DocValuesProducer extends DocValuesProducer implements Closeable {<br /> //number类型的field的列存列表<br /> private final Map<String,NumericEntry> numerics = new HashMap<>();<br /> <br /> //字符串类型的field的列存列表<br /> private final Map<String,BinaryEntry> binaries = new HashMap<>();<br /> <br /> //有序字符串类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedSets = new HashMap<>();<br /> <br /> //有序number类型的field的列存列表<br /> private final Map<String,SortedSetEntry> sortedNumerics = new HashMap<>();<br /> <br /> //字符串类型的field的ords列表<br /> private final Map<String,NumericEntry> ords = new HashMap<>();<br /> <br /> //docId -> address -> ord 中field的ords列表<br /> private final Map<String,NumericEntry> ordIndexes = new HashMap<>();<br /> <br /> //field的数量<br /> private final int numFields;<br /> <br /> //内存使用量<br /> private final AtomicLong ramBytesUsed;<br /> <br /> //数据源的文件句柄<br /> private final IndexInput data;<br /> <br /> //文档数<br /> private final int maxDoc;<br /> // memory-resident structures<br /> private final Map<String,MonotonicBlockPackedReader> addressInstances = new HashMap<>();<br /> private final Map<String,ReverseTermsIndex> reverseIndexInstances = new HashMap<>();<br /> private final Map<String,DirectMonotonicReader.Meta> directAddressesMeta = new HashMap<>();<br /> <br /> //是否正在merge<br /> private final boolean merging;<br /> }<br /> <br /> /** metadata entry for a numeric docvalues field */<br /> static class NumericEntry {<br /> private NumericEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> <br /> /** offset to the actual numeric values */<br /> //field的在数据文件中的起始地址<br /> public long offset;<br /> <br /> /** end offset to the actual numeric values */<br /> //field的在数据文件中的结尾地址<br /> public long endOffset;<br /> <br /> /** bits per value used to pack the numeric values */<br /> public int bitsPerValue;<br /> <br /> //format类型<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> /** monotonic meta */<br /> public DirectMonotonicReader.Meta monotonicMeta;<br /> <br /> //最小的value<br /> long minValue;<br /> <br /> //Compressed by computing the GCD<br /> long gcd;<br /> <br /> //Compressed by giving IDs to unique values.<br /> long table[];<br /> /** for sparse compression */<br /> long numDocsWithValue;<br /> NumericEntry nonMissingValues;<br /> NumberType numberType;<br /> }<br /> <br /> /** metadata entry for a binary docvalues field */<br /> static class BinaryEntry {<br /> private BinaryEntry() {}<br /> /** offset to the bitset representing docsWithField, or -1 if no documents have missing values */<br /> long missingOffset;<br /> /** offset to the actual binary values */<br /> //field的在数据文件中的起始地址<br /> long offset;<br /> int format;<br /> /** count of values written */<br /> public long count;<br /> <br /> //最短字符串的长度<br /> int minLength;<br /> <br /> //最长字符串的长度<br /> int maxLength;<br /> /** offset to the addressing data that maps a value to its slice of the byte[] */<br /> public long addressesOffset, addressesEndOffset;<br /> /** meta data for addresses */<br /> public DirectMonotonicReader.Meta addressesMeta;<br /> /** offset to the reverse index */<br /> public long reverseIndexOffset;<br /> /** packed ints version used to encode addressing information */<br /> public int packedIntsVersion;<br /> /** packed ints blocksize */<br /> public int blockSize;<br /> }<br />
参考资料
[lucene source code](https://github.com/apache/luce ... /5.5.0)
[lucene document](https://lucene.apache.org/core/5_5_0/)
[lucene字典实现原理——FST](http://www.cnblogs.com/bonelee/p/6226185.html )
请问es适合类似于百度搜索这种样子吗?
Elasticsearch • rochy 回复了问题 • 3 人关注 • 1 个回复 • 2349 次浏览 • 2018-12-08 00:27
社区日报 第471期 (2018-12-07)
社区日报 • laoyang360 发表了文章 • 0 个评论 • 1767 次浏览 • 2018-12-07 08:41
http://t.cn/EyS6wcL
2、Elasticsearch 6.x启动过程
http://t.cn/EyJgQ7U
3、Elasticsearch你知道多少?
http://t.cn/EyS6I4P
编辑:铭毅天下
归档: https://elasticsearch.cn/article/6177
订阅:https://tinyletter.com/elastic-daily
Day 6 - Logstash Pipeline-to-Pipeline 尝鲜
Advent • rockybean 发表了文章 • 3 个评论 • 10598 次浏览 • 2018-12-06 23:40
Logstash 在 6.0 推出了 multiple pipeline 的解决方案,即在一个 logstash 实例中可以同时进行多个独立数据流程的处理工作,如下图所示。

而在这之前用户只能通过在单机运行多个 logstash 实例或者在配置文件中增加大量 if-else 条件判断语句来解决。要使用 multiple pipeline 也很简单,只需要将不同的 pipeline 在 config/pipeline.yml
中定义好即可,如下所示:
```yaml
- pipeline.id: apache
pipeline.batch.size: 125
queue.type: persisted
path.config: "/path/to/config/apache.cfg" - pipeline.id: nginx
path.config: "/path/to/config/nginx.cfg"
``<br /> <br /> 其中
apache和
nginx作为独立的 pipeline 执行,而且配置也可以独立设置,互不干扰。
pipeline.yml的引入极大地简化了 logstash 的配置管理工作,使得新手也可以很快完成复杂的 ETL 配置。<br /> <br /> 在 6.3 版本中,Logstash 又增加了
Pipeline-to-Pipeline的管道机制(beta),即管道和管道之间可以连接在一起组成一个完成的数据处理流。熟悉 linux 的管道命令
|`的同学应该可以很快明白这种模式的好处。这无疑使得 Logstash 的配置会更加灵活,今天我们就来了解下这种灵活自由的配置方式。
1. 上手
废话少说,快速上手。修改config/pipeline.yml
文件如下:
```yaml- pipeline.id: upstream
config.string: input { stdin {} } output { pipeline { send_to => [test_output] } } - pipeline.id: downstream
config.string: input { pipeline { address => test_output } } output{ stdout{}}
``<br /> <br /> <br /> <br /> 然后运行 logstash,其中
-r` 表示配置文件有改动时自动重新加载,方便我们调试。
bin/logstash -r
在终端随意输入字符(比如aaa
)后回车,会看到屏幕输出了类似下面的内容,代表运行成功了。
json<br /> {<br /> "@timestamp" => 2018-12-06T14:43:50.310Z,<br /> "@version" => "1",<br /> "message" => "aaa",<br /> "host" => "rockybean-MacBook-Pro.local"<br /> }<br />
我们再回头看下这个配置,upstream
output 使用了名为pipeline
的 plugin,然后send_to
的输出对象test_output
是在downstream
的input pipeline plugin
中定义的。通过这个唯一的address
(虚拟地址)就能够把不同的pipeline
连接在一起组成一个更长的pipeline
来处理数据。类似下图所示:

当数据由upstream
传递给downstream
时会进行一个复制操作,这也意味着在这两个 pipeline 中的数据是完全独立的,互不影响。有一点要注意的是:数据的复制会增加额外的性能开销,比如会加大 JVM Heap 的使用。
2. 使用场景
使用方法是不是很简单,接下来我们来看下官方为我们开的几个脑洞。
2.1 Distributor Pattern 分发者模式
该模式执行效果类似下图所示:

在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。大家可以想一想如果不用这种Pipeline-to-Pipeline
的方式,我们如果轻松做到一个端口处理多个来源的数据呢?
这种模式的参考配置如下所示:
```yamlconfig/pipelines.yml
- pipeline.id: upstream
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [type] == apache {
pipeline { send_to => weblogs }
} else if [type] == system {
pipeline { send_to => syslog }
} else {
pipeline { send_to => fallback }
}
} - pipeline.id: weblog-processing
config.string: |
input { pipeline { address => weblogs } }
filter {
Weblog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_a_host] }
} - pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
Syslog filter statements here...
}
output {
elasticsearch { hosts => [es_cluster_b_host] }
} - pipeline.id: fallback-processing
config.string: |
input { pipeline { address => fallback } }
output { elasticsearch { hosts => [es_cluster_b_host] } }
```
2.2 Output Isolator Pattern 输出隔离模式
虽然 Logstash 的一个 pipeline 可以配置多个 output,但是这多个 output 会相依为命,一旦某一个 output 出问题,会导致另一个 output 也无法接收新数据。而通过这种模式可以完美解决这个问题。其运行方式如下图所示:

通过输出到两个独立的 pipeline,解除相互之间的影响,比如 http service 出问题的时候,es 依然可以正常接收数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性,其配置如下所示:
```yamlconfig/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [es, http] } } - pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => es } }
output { elasticsearch { } } - pipeline.id: buffered-http
queue.type: persisted
config.string: |
input { pipeline { address => http } }
output { http { } }
```
2.3 Forked Path Pattern 克隆路径模式
这个模式类似 Output Isolator Pattern,只是在不同的 output pipeline 中可以配置不同的 filter 来完成各自输出的数据处理需求,这里就不展开讲了,可以参考如下的配置,其中不同 output pipeline 的 filter 是不同的,比如 partner 这个 pipeline 去掉了一些敏感数据:
```config/pipelines.yml
- pipeline.id: intake
queue.type: persisted
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => ["internal-es", "partner-s3"] } } - pipeline.id: buffered-es
queue.type: persisted
config.string: |
input { pipeline { address => "internal-es" } }
Index the full event
output { elasticsearch { } }
- pipeline.id: partner
queue.type: persisted
config.string: |
input { pipeline { address => "partner-s3" } }
filter {
Remove the sensitive data
mutate { remove_field => 'sensitive-data' }
}
output { s3 { } } # Output to partner's bucket
```
2.4 Collector Pattern 收集者模式
从名字可以看出,该模式是将所有 Pipeline 汇集于一处的处理模式,如下图所示:

其配置参考如下:
```config/pipelines.yml
- pipeline.id: beats
config.string: |
input { beats { port => 5044 } }
output { pipeline { send_to => [commonOut] } } - pipeline.id: kafka
config.string: |
input { kafka { ... } }
output { pipeline { send_to => [commonOut] } } - pipeline.id: partner
This common pipeline enforces the same logic whether data comes from Kafka or Beats
config.string: |
input { pipeline { address => commonOut } }
filter {Always remove sensitive data from all input sources
mutate { remove_field => 'sensitive-data' }
}
output { elasticsearch { } }
```
3. 总结
本文简单给大家讲解了Pipeline-to-Pipeline
的使用方法及官方推荐的几种模式,希望可以给大家有所帮助。另外这个机制目前还处于 Beta 阶段,尝鲜需谨慎!
build_scorer很长时间
Elasticsearch • medcl 回复了问题 • 2 人关注 • 1 个回复 • 3735 次浏览 • 2018-12-10 16:44
metricbeat windows版本无法采集 fd指标
Beats • rojay 回复了问题 • 2 人关注 • 1 个回复 • 3101 次浏览 • 2019-03-09 15:39
拼音加IK的搜索怎么固定顺序
Elasticsearch • Circle 回复了问题 • 3 人关注 • 2 个回复 • 2281 次浏览 • 2018-12-06 15:42
关于分词干预问题
Elasticsearch • rochy 回复了问题 • 3 人关注 • 1 个回复 • 2046 次浏览 • 2018-12-06 15:34
社区日报 第470期 (2018-12-06)
社区日报 • 白衬衣 发表了文章 • 0 个评论 • 1596 次浏览 • 2018-12-06 15:06
http://t.cn/Eybu8a3
2.ElasticSearch 恢复类型之对等恢复
http://t.cn/EyaCYFV
3.使用带注释的文本插件搜索事物
http://t.cn/EyaCntg
编辑:金桥
归档:https://elasticsearch.cn/article/6175
订阅:https://tinyletter.com/elastic-daily