怎么又是你

ES的父子查询中如何使用scripts的来过滤数据

Elasticsearchrochy 回复了问题 • 3 人关注 • 1 个回复 • 2153 次浏览 • 2018-12-19 09:10 • 来自相关话题

请问marvel或x-pack里面的监控数据有API直接获取吗?

Elasticsearchrochy 回复了问题 • 2 人关注 • 2 个回复 • 1415 次浏览 • 2018-12-19 09:03 • 来自相关话题

Day 19 - 通过点击反馈优化es搜索结果排序

Adventlaigood 发表了文章 • 1 个评论 • 6068 次浏览 • 2018-12-19 00:15 • 来自相关话题

      相信不少人都把es当做一个主要的搜索引擎来使用,但是对于搜索结果之后的点击反馈,es没有很好的方案。比如说用户搜索了某些关键词,点击了某些结果,而这些结果并不是排在最前面的,但确实是用户最想要的。那有没有什么方法可以使它们排在前面呢?一种简单的做法就是就是离线统计文档的点击率,然后在排序时根据这个点击率进行加权,但这样笼统的算法不一定适合所有情况。现在就来简单介绍下learning to rank,翻译过来就是学习排序,可以根据点击日志里面的记录,来反向影响搜索结果的排序。刚好这个库也有es的插件,下面以这个插件的官方demo来解释下如何使用。
demo的下载地址如下,都是python脚本,环境需求:python3+,es
https://github.com/o19s/elasti ... /demo
1.准备数据
python prepare.py
下载RankLib.jar (用来训练模型) 和tmdb.json (测试数据集,tmdb的电影数据)
2.导测试数据入es
python index_ml_tmdb.py
3.训练模型
python train.py
训练脚本很简单,但是脚本里面有丰富的实现,下面介绍下主要方法。
load_features(FEATURE_SET_NAME)
这个是读取特征信息,demo定义了两个特征,分别在1.json
{
"query": {
"match": {
"title": "{{keywords}}"
}
}
}
和2.json
{
"query": {
"match": {
"overview": "{{keywords}}"
}
}
}
1就是查title,2就是查overview,生成训练数据时就是需要根据特征的查询语法,去es里面匹配相关得分作为特征分数。
movieJudgments = judgments_by_qid(judgments_from_file(filename=JUDGMENTS_FILE))
读取生成训练数据的原始数据,官方称其为决策列表(Judgment list),第一列是数值为0-4的权重,数值越大,相关性越高。回到我们最初的需求就是越多人点击的文档,那么这个权重就越大。第二列是queryid,同次查询结果中的queryid一样,第三列是文档id,这里就是电影id,第四列是文档标题,这里就是电影名。
4   qid:1 #    7555   Rambo
3  qid:1 #    1370   Rambo III
3  qid:1 #    1369   Rambo: First Blood Part II
3  qid:1 #    1368   First Blood
0  qid:1 #    136278 Blood
4  qid:2 #    1366   Rocky
3  qid:2 #    1246   Rocky Balboa
3  qid:2 #    60375  Rocky VI
3  qid:2 #    1371   Rocky III
3  qid:2 #    1375   Rocky V
log_features(es, judgments_dict=movieJudgments, search_index=INDEX_NAME)
build_features_judgments_file(movieJudgments, filename=JUDGMENTS_FILE_FEATURES)
之后就是生成特征集,就是把上面的每条训练数据根据特征查询语句扔进es里面进行查询,把得分放到1和2特征后面,如:下面数据第一条中的,1:12.318446就表示1特征的分数,2:10.573845表示2特征的分数,然后把特征集写到文件。
生成完的特征集如下:
4   qid:1  1:12.318446    2:10.573845 # 7555 rambo
3  qid:1  1:10.357836    2:11.950331 # 1370 rambo
3  qid:1  1:7.0104666    2:11.220029 # 1369 rambo
3  qid:1  1:0.0  2:11.220029 # 1368 rambo
0  qid:1  1:0.0  2:0.0 # 136278 rambo
4  qid:2  1:10.686367    2:8.814796 # 1366  rocky
3  qid:2  1:8.985519 2:9.984467 # 1246  rocky
3  qid:2  1:8.985519 2:8.067647 # 60375 rocky
3  qid:2  1:8.985519 2:5.6604943 # 1371 rocky
3  qid:2  1:8.985519 2:7.3007236 # 1375 rocky
特征集出来后就是训练了,demo提供10总不同的算法,训练好之后把结果传到es提供服务
for modelType in [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]:
# 0, MART
# 1, RankNet
# 2, RankBoost
# 3, AdaRank
# 4, coord Ascent
# 6, LambdaMART
# 7, ListNET
# 8, Random Forests
# 9, Linear Regression
Logger.logger.info("*** Training %s " % modelType)
train_model(judgments_with_features_file=JUDGMENTS_FILE_FEATURES, model_output='model.txt',
which_model=modelType)
save_model(script_name="test_%s" % modelType, feature_set=FEATURE_SET_NAME, model_fname='model.txt')
4.最后搜索数据
python search.py Rambo
搜索时主要用到了es里面的rescore特性,就是对前面topn条记录根据模型进行再排序,查询dsl如下:
{
"query": {
"multi_match": {
"query": "Rambo",
"fields": ["title", "overview"]
}
},
"rescore": {
"query": {
"rescore_query": {
"sltr": {
"params": {
"keywords": "Rambo"
},
"model": "test_1",
}
}
}
}
}

得到结果
Rambo
Rambo III
Rambo: First Blood Part II
First Blood
In the Line of Duty: The F.B.I. Murders
Son of Rambow
Spud
当然这个是最简单的一个例子,深入研究可以参考官方文档,很详细:https://elasticsearch-learning ... test/
 

单机硬盘160T如果写满对应32G内存如何?

ElasticsearchHelloClyde 回复了问题 • 5 人关注 • 3 个回复 • 1974 次浏览 • 2018-12-18 22:58 • 来自相关话题

老生常谈ES heap问题;

ElasticsearchHelloClyde 回复了问题 • 3 人关注 • 2 个回复 • 3291 次浏览 • 2018-12-18 17:14 • 来自相关话题

....................

Elasticsearchrochy 回复了问题 • 2 人关注 • 1 个回复 • 1413 次浏览 • 2018-12-18 16:24 • 来自相关话题

如何选择在ES之前处理写入的中间件?

ElasticsearchGod_lockin 回复了问题 • 4 人关注 • 2 个回复 • 3376 次浏览 • 2018-12-18 14:48 • 来自相关话题

大家有碰到过这个异常吗can not write type [class java.math.BigDecimal]

Elasticsearchrochy 回复了问题 • 2 人关注 • 1 个回复 • 4066 次浏览 • 2018-12-18 14:18 • 来自相关话题

社区日报 第482期 (2018-12-18)

社区日报kimichen123 发表了文章 • 0 个评论 • 1287 次浏览 • 2018-12-18 13:27 • 来自相关话题

1、Elasticsearch常用操作之集群管理篇。
http://t.cn/EUFd8Yd
​2、ElasticsearchSQL用法详解。
http://t.cn/EUFdu9z
​3、在滴滴云DC2云服务器上搭建ELK。
http://t.cn/EUFdrKb

编辑:叮咚光军
归档:https://elasticsearch.cn/article/6207
订阅:https://tinyletter.com/elastic-daily

Kibana查询结果根据字段进行去重

Kibanainsist_93 回复了问题 • 3 人关注 • 2 个回复 • 14500 次浏览 • 2018-12-18 09:21 • 来自相关话题

filebeat采集可以采集多少个文件

Beatscao 回复了问题 • 2 人关注 • 2 个回复 • 3687 次浏览 • 2018-12-17 17:45 • 来自相关话题

使用search-guard 后java代码连接不上

Elasticsearchly898197688 回复了问题 • 2 人关注 • 2 个回复 • 2331 次浏览 • 2018-12-17 16:09 • 来自相关话题

Day 18: 记filebeat内存泄漏问题分析及调优

Beats点火三周 发表了文章 • 1 个评论 • 11873 次浏览 • 2018-12-17 14:55 • 来自相关话题

ELK 从发布5.0之后加入了beats套件之后,就改名叫做elastic stack了。beats是一组轻量级的软件,给我们提供了简便,快捷的方式来实时收集、丰富更多的数据用以支撑我们的分析。但由于beats都需要安装在ELK集群之外,在宿主机之上,其对宿主机的性能的影响往往成为了考量其是否能被使用的关键,而不是它到底提供了什么样的功能。因为业务的稳定运行才是核心KPI,而其他因运维而生的数据永远是更低的优先级。影响宿主机性能的方面可能有很多,比如CPU占用率,网络吞吐占用率,磁盘IO,内存等,这里我们详细讨论一下内存泄漏的问题

@[toc]

filebeat是beats套件的核心组件之一(另一个核心是metricbeat),用于采集文件内容并发送到收集端(ES),它一般安装在宿主机上,即生成文件的机器。根据文档的描述,filebeat是不建议用来采集NFS(网络共享磁盘)上的数据的,因此,我们这里只讨论filebeat对本地文件进行采集时的性能情况。

当filebeat部署和运行之后,必定会对cpu,内存,网络等资源产生一定的消耗,当这种消耗能够限定在一个可接受的范围时,在企业内部的生产服务器上大规模部署filebeat是可行的。但如果出现一些非预期的情况,比如占用了大量的内存,那么运维团队肯定是优先保障核心业务的资源,把filebeat进程给杀了。很可惜的是,内存泄漏的问题,从filebeat的诞生到现在就一直没有完全解决过。(可以区社区讨论贴看看,直到现在V6.5.1都还有人在报告内存泄漏的问题)。在特定的场景和配置下,内存占用过多已经成为了抑止filebeat大规模部署的主要问题了。在这里,我主要描述一下我碰到的在filebeat 6.0上遇到的问题。

问题场景和配置


一开始我们在很多机器上部署了filebeat,并且使用了一套统一无差别的的简单配置。对于想要在企业内部大规模推广filebeat的同学来说,这是大忌!!! 合理的方式是具体问题具体分析,需对每台机器上产生文件的方式和rotate的方式进行充分的调研,针对不同的场景是做定制化的配置。以下是我们之前使用的配置:

  • multiline,多行的配置,当日志文件不符合规范,大量的匹配pattern的时候,会造成内存泄漏
  • max_procs,限制filebeat的进程数量,其实是内核数,建议手动设为1



    ```yaml
    filebeat.prospectors:
  • type: log
    enabled: true
    paths:
  • /qhapp//.log
    tail_files: true
    multiline.pattern: '^[[:space:]]+|^Caused by:|^.+Exception:|^\d+\serror'
    multiline.negate: false
    multiline.match: after
    fields:
    app_id: bi_lass
    service: "{{ hostvars[inventory_hostname]['service'] }}"
    ip_address: "{{ hostvars[inventory_hostname]['ansible_host'] }}"
    topic: qh_app_raw_log

    filebeat.config.modules:
    path: ${path.config}/modules.d/*.yml
    reload.enabled: false

    setup.template.settings:
    index.number_of_shards: 3

    index.codec: best_compression

    _source.enabled: false

    output.kafka:
    enabled: true
    hosts: [{{kafka_url}}]

    topic: '%{[fields][topic]}'

    max_procs: 1

    ```
    注意,以上的配置中,仅仅对cpu的内核数进行了限制,而没有对内存的使用率进行特殊的限制。从配置层面来说,影响filebeat内存使用情况的指标主要有两个:

  • queue.mem.events消息队列的大小,默认值是4096,这个参数在6.0以前的版本是spool-size,通过命令行,在启动时进行配置
  • max_message_bytes 单条消息的大小, 默认值是10M

    filebeat最大的可能占用的内存是max_message_bytes * queue.mem.events = 40G,考虑到这个queue是用于存储encode过的数据,raw数据也是要存储的,所以,在没有对内存进行限制的情况下,最大的内存占用情况是可以达到超过80G

    因此,建议是同时对filebeat的CPU和内存进行限制。

    下面,我们看看,使用以上的配置在什么情况下会观测到内存泄漏

    监控文件过多


    对于实时大量产生内容的文件,比如日志,常用的做法往往是将日志文件进行rotate,根据策略的不同,每隔一段时间或者达到固定大小之后,将日志rotate。
    这样,在文件目录下可能会产生大量的日志文件。
    如果我们使用通配符的方式,去监控该目录,则filebeat会启动大量的harvester实例去采集文件。但是,请记住,我这里不是说这样一定会产生内存泄漏,只是在这里观测到了内存泄漏而已,不是说这是造成内存泄漏的原因。


    20181126205516139.png




    当filebeat运行了几个月之后,占用了超过10个G的内存


    20181126205316830.png




    非常频繁的rotate日志


    另一个可能是,filebeat只配置监控了一个文件,比如test2.log,但由于test2.log不停的rotate出新的文件,虽然没有使用通配符采集该目录下的所有文件,但因为linux系统是使用inode number来唯一标示文件的,rotate出来的新文件并没有改变其inode number,因此,时间上filebeat还是同时开启了对多个文件的监控。

    20181126220013971.png



    另外,因为对文件进行rotate的时候,一般会限制rotate的个数,即到达一定数量时,新rotate一个文件,必然会删除一个旧的文件,文件删除之后,inode number是可以复用的,如果不巧,新rotate出来的文件被分配了一个之前已删掉文件的inode number,而此时filebeat还没有监测之前持有该inode number的文件已删除,则会抛出以下异常:

    <br /> 2018-11-21T18:06:55+08:00 ERR Harvester could not be started on truncated file: /qhapp/logs/bd-etl/logs/test2.log, Err: Error setting up harvester: Harvester setup failed. Unexpected file opening error: file info is not identical with opened file. Aborting harvesting and retrying file later again<br />

    而类似Harvester setup failed.的异常会导致内存泄漏

    https://github.com/elastic/beats/issues/6797



    因为multiline导致内存占用过多

    multiline.pattern: '^[[:space:]]+|^Caused by:|^.+Exception:|^\d+\serror,比如这个配置,认为空格或者制表符开头的line是上一行的附加内容,需要作为多行模式,存储到同一个event当中。当你监控的文件刚巧在文件的每一行带有一个空格时,会错误的匹配多行,造成filebeat解析过后,单条event的行数达到了上千行,大小达到了10M,并且在这过程中使用的是正则表达式,每一条event的处理都会极大的消耗内存。因为大多数的filebeat output是需应答的,buffer这些event必然会大量的消耗内存。

    模拟场景

    这里不多说,简单来一段python的代码:
    ini<br /> [loggers]<br /> keys=root<br /> <br /> [handlers]<br /> keys=NormalHandler<br /> <br /> [formatters]<br /> keys=formatter<br /> <br /> [logger_root]<br /> level=DEBUG<br /> handlers=NormalHandler<br /> <br /> [handler_NormalHandler]<br /> class=logging.handlers.TimedRotatingFileHandler<br /> formatter=formatter<br /> args=('./test2.log', 'S', 10, 200)<br /> <br /> [formatter_formatter]<br /> format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s<br />
    以上,每隔10秒('S', 'M' = 分钟,'D'= 天)rotate一个文件,一共可以rotate 200个文件。
    然后,随便找一段日志,不停的打,以下是330条/秒

    python<br /> import logging<br /> from logging.config import fileConfig<br /> import os<br /> import time<br /> CURRENT_FOLDER = os.path.dirname(os.path.realpath(__file__))<br /> <br /> fileConfig(CURRENT_FOLDER + '/logging.ini')<br /> logger = logging.getLogger()<br /> <br /> while True:<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!")<br /> logger.debug("DEBUG 2018-11-26 09:31:35 com.sunyard.insurance.server.GetImage 43 - 资源请求:date=20181126&file_name=/imagedata/imv2/pool1/images/GXTB/2017/11/14/57/06b6bcdd31763b70b20f56c689e51f5e_1/06b6bcdd31763b70b20f56c689e51f5e_2.syd&file_encrypt=0&token=HUtGGG20GH4GAqq209R9tc9UGtAURR0b DEBUG 2018-11-26 09:31:40 com.sunyard.insurance.scheduler.job.DbEroorHandleJob 26 - [数据库操作异常处理JOB]处理异常文件,本机不运行,退出任务!!@#!@#!@#!@#!@#!@#!@#!@#!@#!@#!@#!#@!!!@##########################################################################################################################################################")<br /> time.sleep(0.03)<br />


    如何观察filebeat的内存


    在6.3版本之前,我们是无法通过xpack的monitoring功能来观察beats套件的性能的。因此,这里讨论的是没有monitoring时,我们如何去检测filebeat的性能。当然,简单的方法是通过top,ps等操作系统的命令进行查看,但这些都是实时的,无法做趋势的观察,并且都是进程级别的,无法看到filebeat内部的真是情况。因此,这里介绍如何通过filebeat的日志和pprof这个工具来观察内存的使用情况

    通过filebeat的日志


    filebeat文件解读

    其实filebeat的日志,已经包含了很多参数用于实时观测filebeat的资源使用情况,以下是filebeat的一个日志片段(这里的日志片段是6.0版本的,6.3版本之后,整个日志格式变了,从kv格式变成了json对象格式,xpack可以直接通过日志进行filebeat的monitoring):

    shell<br /> 2018-11-02T17:40:01+08:00 INFO Non-zero metrics in the last 30s: beat.memstats.gc_next=623475680 beat.memstats.memory_alloc=391032232 beat.memstats.memory_total=155885103371024 filebeat.events.active=-402 filebeat.events.added=13279 filebeat.events.done=13681 filebeat.harvester.closed=1 filebeat.harvester.open_files=7 filebeat.harvester.running=7 filebeat.harvester.started=2 libbeat.config.module.running=0 libbeat.output.events.acked=13677 libbeat.output.events.batches=28 libbeat.output.events.total=13677 libbeat.outputs.kafka.bytes_read=12112 libbeat.outputs.kafka.bytes_write=1043381 libbeat.pipeline.clients=1 libbeat.pipeline.events.active=0 libbeat.pipeline.events.filtered=4 libbeat.pipeline.events.published=13275 libbeat.pipeline.events.total=13279 libbeat.pipeline.queue.acked=13677 registrar.states.cleanup=1 registrar.states.current=8 registrar.states.update=13681 registrar.writes=28<br /> <br />
    里面的参数主要分成三个部分:

  • beat.*,包含memstats.gc_next,memstats.memory_alloc,memstats.memory_total,这个是所有beat组件都有的指标,是filebeat继承来的,主要是内存相关的,我们这里特别关注memstats.memory_alloc,alloc的越多,占用内存越大
  • filebeat.*,这部分是filebeat特有的指标,通过event相关的指标,我们知道吞吐,通过harvester,我们知道正在监控多少个文件,未消费event堆积的越多,havester创建的越多,消耗内存越大
  • libbeat.*,也是beats组件通用的指标,包含outputs和pipeline等信息。这里要主要当outputs发生阻塞的时候,会直接影响queue里面event的消费,造成内存堆积
  • registrar,filebeat将监控文件的状态放在registry文件里面,当监控文件非常多的时候,比如10万个,而且没有合理的设置close_inactive参数,这个文件能达到100M,载入内存后,直接占用内存

    filebeat日志解析

    当然,我们不可能直接去读这个日志,既然我们使用ELK,肯定是用ELK进行解读。因为是kv格式,很方便,用logstash的kv plugin:
    ruby<br /> filter {<br /> kv {}<br /> }<br />
    kv无法指定properties的type,所以,我们需要稍微指定了一下索引的模版:
    <br /> PUT _template/template_1<br /> {<br /> "index_patterns": ["filebeat*"],<br /> "settings": {<br /> "number_of_shards": 1<br /> },<br /> "mappings": {<br /> "doc": {<br /> "_source": {<br /> "enabled": false<br /> },<br /> "dynamic_templates": [<br /> {<br /> "longs_as_strings": {<br /> "match_mapping_type": "string",<br /> "path_match": "*beat.*",<br /> "path_unmatch": "*.*name",<br /> "mapping": {<br /> "type": "long"<br /> }<br /> }<br /> }<br /> ]<br /> }<br /> }<br /> }<br />
    上面的模版,将kv解析出的properties都mapping到long类型,但注意"path_match": "*beat.*"无法match到registrar的指标,读者可以自己写一个更完善的mapping。
    这样,我们就可以通过kibana可视化组件,清楚的看到内存泄漏的过程

    20181127114253940.png



    以及资源的使用情况:

    20181127114342557.png



    将信息可视化之后,我们可以明显的发现,内存的突变和ERR是同时发生的

    20181127114536608.png



    即以下error:
    2018-11-27T09:05:44+08:00 ERR Harvester could not be started on new file: /qhapp/logs/bd-etl/logs/test2.log, Err: Error setting up harvester: Harvester setup failed. Unexpected file opening error: file info is not identical with opened file. Aborting harvesting and retrying file later again

    会导致filebeat突然申请了额外的内存。具体请查看[issue](https://github.com/elastic/beats/issues/6797)

    通过pprof

    众所周知,filebeat是用go语言实现的,而go语言本身的基础库里面就包含pprof这个功能极其强大的性能分析工具,只是这个工具是用于debug的,在正常模式下,filebeat是不会启动这个选贤的,并且很遗憾,在官方文档里面根本没有提及我们可以使用pprof来观测filebeat。我们接下来可以通过6.3上修复的一个内存泄漏的[issue](https://github.com/elastic/beats/issues/6797),来学习怎么使用pprof进行分析

    启动pprof监测


    首先,需要让filebeat在启动的时候运行pprof,具体的做法是在启动是加上参数-httpprof localhost:6060,即/usr/share/filebeat/bin/filebeat -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/filebeat -httpprof localhost:6060。这里只绑定了localhost,无法通过远程访问,如果想远程访问,应该使用0.0.0.0
    这时,你就可以通过curl <a href="http://localhost:6060/debug/pprof/heap" rel="nofollow" target="_blank">http://localhost:6060/debug/pprof/heap</a> > profile.txt等命令,获取filebeat的实时堆栈信息了。

    远程连接


    当然,你也可以通过在你的本地电脑上安装go,然后通过go tool远程连接pprof。
    因为我们是需要研究内存的问题,所以以下连接访问的是/heap子路径
    go tool pprof <a href="http://10.60.x.x:6060/debug/pprof/heap" rel="nofollow" target="_blank">http://10.60.x.x:6060/debug/pprof/heap</a>

    top 命令


    连接之后,你可以通过top命令,查看消耗内存最多的几个实例:
    <br /> 33159.58kB of 33159.58kB total ( 100%)<br /> Dropped 308 nodes (cum <= 165.80kB)<br /> Showing top 10 nodes out of 51 (cum >= 512.04kB)<br /> flat flat% sum% cum cum%<br /> 19975.92kB 60.24% 60.24% 19975.92kB 60.24% runtime.malg<br /> 7680.66kB 23.16% 83.40% 7680.66kB 23.16% github.com/elastic/beats/filebeat/channel.SubOutlet<br /> 2048.19kB 6.18% 89.58% 2048.19kB 6.18% github.com/elastic/beats/filebeat/prospector/log.NewHarvester<br /> 1357.91kB 4.10% 93.68% 1357.91kB 4.10% runtime.allgadd<br /> 1024.08kB 3.09% 96.76% 1024.08kB 3.09% runtime.acquireSudog<br /> 544.67kB 1.64% 98.41% 544.67kB 1.64% github.com/elastic/beats/libbeat/publisher/queue/memqueue.NewBroker<br /> 528.17kB 1.59% 100% 528.17kB 1.59% regexp.(*bitState).reset<br /> 0 0% 100% 528.17kB 1.59% github.com/elastic/beats/filebeat/beater.(*Filebeat).Run<br /> 0 0% 100% 512.04kB 1.54% github.com/elastic/beats/filebeat/channel.CloseOnSignal.func1<br /> 0 0% 100% 512.04kB 1.54% github.com/elastic/beats/filebeat/channel.SubOutlet.func1<br />

    查看堆栈调用图

    输入web命令,会生产堆栈调用关系的svg图,在这个svg图中,你可以结合top命令一起查看,在top中,我们已经知道github.com/elastic/beats/filebeat/channel.SubOutlet占用了很多的内存,在图中,展现的是调用关系栈,你可以看到这个类是怎么被实例化的,并且在整个堆中,内存是怎么分布的。最直观的是,实例所处的长方形面积越大,代表占用的内存越多。:

    20181126222514954.png



    查看源码

    通过list命令,可以迅速查看可以实例的问题源码,比如在之前的top10命令中,我们已经看到github.com/elastic/beats/filebeat/channel.SubOutlet这个类的实例占用了大量的内存,我们可以通过list做进一步的分析,看看这个类内部在哪个语句开始出现内存的占用:
    <br /> (pprof) list SubOutlet<br /> Total: 32.38MB<br /> ROUTINE ======================== github.com/elastic/beats/filebeat/channel.SubOutlet in /home/jeremy/src/go/src/github.com/elastic/beats/filebeat/channel/util.go<br /> 7.50MB 7.50MB (flat, cum) 23.16% of Total<br /> . . 15:// SubOutlet create a sub-outlet, which can be closed individually, without closing the<br /> . . 16:// underlying outlet.<br /> . . 17:func SubOutlet(out Outleter) Outleter {<br /> . . 18: s := &subOutlet{<br /> . . 19: isOpen: atomic.MakeBool(true),<br /> 1MB 1MB 20: done: make(chan struct{}),<br /> 2MB 2MB 21: ch: make(chan *util.Data),<br /> 4.50MB 4.50MB 22: res: make(chan bool, 1),<br /> . . 23: }<br /> . . 24:<br /> . . 25: go func() {<br /> . . 26: for event := range s.ch {<br /> . . 27: s.res <- out.OnEvent(event) <br />

    如何调优

    其实调优的过程就是调整参数的过程,之前说过了,和内存相关的参数, max_message_bytes,queue.mem.events,queue.mem.flush.min_events,以及队列占用内存的公式:max_message_bytes * queue.mem.events
    ```
    output.kafka:
    enabled: true

    max_message_bytes: 1000000

    hosts: ["10.60.x.x:9092"]
    topic: '%{[fields][topic]}'
    max_procs: 1

    queue.mem.events: 256

    queue.mem.flush.min_events: 128

    ```
    但其实,不同的环境下,不同的原因都可能会造成filebeat占用的内存过大,此时,需要仔细的确认你的上下文环境:

  • 是否因为通配符的原因,造成同时监控数量巨大的文件,这种情况应该避免用通配符监控无用的文件。
  • 是否文件的单行内容巨大,确定是否需要改造文件内容,或者将其过滤
  • 是否过多的匹配了multiline的pattern,并且多行的event是否单条体积过大。这时,就需要暂时关闭multiline,修改文件内容或者multiline的pattern。
  • 是否output经常阻塞,event queue里面总是一直缓存event。这时要检查你的网络环境或者消息队列等中间件是否正常




目前beats工具只有官网列出来的8种吗?

Beatsmedcl 回复了问题 • 3 人关注 • 1 个回复 • 2595 次浏览 • 2018-12-17 12:05 • 来自相关话题

Day 17 - 关于日志型数据管理策略的思考

Elasticsearchkennywu76 发表了文章 • 7 个评论 • 5645 次浏览 • 2018-12-17 11:19 • 来自相关话题

近两年随着Elastic Stack的愈发火热,其近乎成为构建实时日志应用的工业标准。在小型数据应用场景,最新的6.5版本已经可以做到开箱即用,无需过多考虑架构上的设计工作。 然而一旦应用规模扩大到数百TB甚至PB的数据量级,整个系统的架构和后期维护工作则显得非常重要。借着2018 Elastic Advent写文的机会,结合过去几年架构和运维公司日志集群的实践经验,对于大规模日志型数据的管理策略,在此做一个总结性的思考。 文中抛出的观点,有些已经在我们的集群中有所应用并取得比较好的效果,有些则还待实践的检验。抛砖引玉,不尽成熟的地方,还请社区各位专家指正。

对于日志系统,最终用户通常有以下几个基本要求:

  1. 数据从产生到可检索的实时性要求高,可接受的延迟通常要求控制在数秒,至多不超过数十秒
  2. 新鲜数据(当天至过去几天)的查询和统计频率高,返回速度要快(毫秒级,至多几秒)
  3. 历史数据保留得越久越好。

    针对这些需求,加上对成本控制的必要性,大家通常想到的第一个架构设计就是冷热数据分离。

    冷热数据分离

    冷热分离的概念比较好理解,热结点做数据的写入,保存近期热数据,冷数据定期迁移到冷数据结点,就这么简单。不过实际操作起来可能还是碰到一些具体需要考虑的细节问题。

  4. 冷热结点集群配置的JVM heap配置要差异化。热结点无需存放太多数据,对于heap的要求通常不是太高,在够用的情况下尽量配置小一点。可以配置在26GB左右甚至更小,而不是大多数人知道的经验值31GB。原因在于这个size的heap,可以启用zero based Compressed Oops,JVM运行效率是最高的, 参考: [heap-size](https://www.elastic.co/guide/e ... e.html)。而冷结点存在的目的是尽量放更多的数据,性能不是首要的,因此heap可以配置在31GB。
  5. 数据迁移过程有一定资源消耗,为避免对数据写入产生显著影响,通常定时在业务低峰期,日志产出量比较低的时候进行,比如半夜。
  6. 索引是否应该启用压缩,如何启用?最初我们对于热结点上的索引是不启用压缩的,为了节省CPU消耗。只在冷结点配置里,增加了索引压缩选项。这样索引迁移到冷结点后,执行force merge操作的时候,ES会自动将结点上配置的索引压缩属性套用到merge过后新生成的segment,这样就实现了热结点不压缩,冷结点merge过后压缩的功能。极大节省了冷结点的磁盘空间。后来随着硬件的升级,我们发现服务器的cpu基本都是过剩的,磁盘IO通常先到瓶颈。 因此尝试在热结点上一开始创建索引的时候,就启用压缩选项。实际对比测试并没有发现显著的索引吞吐量下降,并且因为索引压缩后磁盘文件size的大幅减少,每天夜间的数据迁移工作可以节省大量的时间。至此我们的日志集群索引默认就是压缩的。
  7. 冷结点上留做系统缓存的内存一般不多,加上数据量非常巨大。索引默认的mmapfs读取方式,很容易因为系统缓存不够,导致数据在内存和磁盘之间频繁换入换出。严重的情况下,整个结点甚至会因为io持续在100%无法响应。 实践中我们发现对冷结点使用niofs效果会更好。

    实现了冷热结点分离以后,集群的资源利用率提升了不少,可管理性也要好很多了. 但是随着接入日志的类型越来越多(我们生产上有差不多400种类型的日志),各种日志的速率差异又很大,让ES自己管理shard的分布很容易产生写入热点问题。 针对这个问题,可以采用对集群结点进行分组管理的策略来解决。

    热结点分组管理

    所谓分组管理,就是通过在结点的配置文件中增加自定义的标签属性,将服务器区分到不同的组别中。然后通过设置索引的index.routing.allocation.include属性,控制改索引分布在哪个组别。同时配合设置index.routing.allocation.total_shards_per_node,可以做到某个索引的shard在某个group的结点之间绝对均匀分布。

    比如一个分组有10台机器,对一个5 primary ,1 replica的索引,让该索引分布在该分组的同时,设置total_shards_per_node为1,让每个节点上只能有一个分片,这样就避免了写入热点问题。 该方案的缺陷也显而易见,一旦有结点挂掉,不会自动recovery,某个shard将一直处于unassigned状态,集群状态变成yellow。 但我认为,热数据的恢复开销是非常高的,与其立即在其他结点开始复制,之后再重新rebalance,不如就让集群暂时处于yellow状态。 通过监控报警的手段,及时通知运维人员解决结点故障。 待故障解决之后,直接从恢复后的结点开始数据复制,开销要低得多。

    在我们的生产环境主要有两种类型的结点分组,分别是10台机器一个分组,和2台机器一个分组。10台机器的分组用于应对速率非常高,shard划分比较多的索引,2台机器的分组用于速率很低,一个shard(加一个复制片)就可以应对的索引。

    这种分组策略在我们的生产环境中经过验证,非常好的解决了写入热点问题。那么冷数据怎么管理? 冷数据不做写入,不存在写入热点问题,查询频率也比较低,业务需求方面对查询响应要求也不那么严苛,所以查询热点问题也不是那么突出。因此为了简化管理,冷结点我们是不做shard分布的精细控制,所有数据迁移到冷数据结点之后,由ES默认的shard分布则略去控制数据的分布。

    不过如果想进一步提高冷数据结点服务器资源的利用率,还是可以有进一步挖掘的的空间。我们知道ES默认的shard分布策略,只是保证一个索引的shard尽量分布在不同的结点,同时保证每个节点上shard数量差不多。但是如果采用默认按天创建索引的策略,由于索引速率差异很大,不同索引之间shard的大小差异可能是1-2个数量级的。如果每个shard的size差异不大就好了,那么默认的分布策略,基本上可以保证冷结点之间数据量分布的大致均匀。 能实现类似功能的是ES的rollover特性。

    索引的Rollover

    Rollover api可以让索引根据预先定义的时间跨度,或者索引大小来自动切分出新索引,从而将索引的大小控制在计划的范围内。合理的应用rollver api可以保证集群shard大小差别不会太大。 只是集群索引类别比较多的时候,rollover全部手动管理负担比较大,需要借助额外的管理工具和监控工具。我们出于管理简便的考虑,暂时没有应用到这个特性。

    索引的Rollup

    我们发现生产有些用户写入的“日志”,实际上是多维的metrcis数据,使用的时候不是为了查询日志的详情,仅仅是为了做各种维度组合的过滤和聚合。对于这种类型的数据,保留历史数据过多一来浪费存储空间,二来每次聚合都要在裸数据上跑,非常浪费资源。 ES从6.3开始,在x-pack里推出了rollup api,通过定期对裸数据做预先聚合,大大缩减了保存在磁盘上的数据量。对于不需要查询裸日志的应用场景,合理应用该特性,可以将历史数据的磁盘消耗降低几个数量级,同时rollup search也可以大大提升聚合速度。不过rollup也有其局限性,即他的实现是通过定期任务,对间隔期数据跑聚合完成的,有一定的计算开销。 如果数据写入速率非常高,集群压力很大,rollup可能无法跟上写入速率,而不具有实用性。 所以实际环境中,还是需要根据应用场景和资源使用情况,进行灵活的取舍。

    多集群的便利性

    数据量大到一定程度以后,单集群由于master node单点的限制,会遇到各种集群状态数据更新时得性能问题。 由此现在一些大规模的应用已经开始利用到多集群互联和cross cluster search的特性。 这种结构除了解决单集群数据容量限制问题以外,我们还发现在做容量均衡方面还有比较好的便利性。应用日志写入量通常随着业务变化也会剧烈变化,好不容易规划好的容量,不久就被业务的增长给打破,数倍或者数10倍的流量增长很可能就让一组结点过载出现写入延迟。 如果只有一个集群,在结点之间重新平衡shard比较费力,涉及到数据的迁移,可能非常缓慢,还会影响写入。 但如果有多集群互联,切换就可以做到非常的快速和简单。 原理上只需要在新集群中加入对应的索引配置模版,然后更新写入程序的配置,写入目标指向新集群,重启写入程序即可。并且,可以进一步将整个流程工具化,在GUI上完成一键切换。