advent

advent

Day 14: Elasticsearch 5 入坑指南

Adventkennywu76 发表了文章 • 33 个评论 • 17918 次浏览 • 2016-12-15 13:16 • 来自相关话题

尝鲜 10月26日,Elasticsearch5.0.0 GA终于放出,携程ES Ops团队也在第一时间在DEV和UAT环境分别进行了2.4.0 至5.0.0的升级和测试。升级完成后,除了部分Query不向前兼容(主要是Filtered Query),需要在应用端做一些修改以外,未发现其他问题。通过监控系统看对比升级前后的主要系统指标,在同等索引量的情况下,CPU使用率有明显下降 ( 30% - 50%左右) ,相信性能方面5.0应该是有较大提升的。  在测试环境稳定运行了2周以后,我们决定选定一个生产集群进行升级,考验新版本在更为复杂的用户环境下的表现。 出于对业务影响最小化的考虑,用于日志分析的集群被圈定为升级目标。该集群也是携程十几个集群中规模最大的一个,共有120个数据结点运行于70台物理机上,总数据量接近1PB。 升级前需要做一些准备工作,下载官方的Migration Helper插件,检查集群设置和索引的兼容性。对于不兼容的配置项,MH会详尽列出,其中标注为红色部分为为升级前必须修改项。1.x版本创建的索引,是无法直接升级到5的,需要先在2.x集群里做一次reindex 。 MH提供了不兼容索引扫描功能,对于找到的不兼容索引,可以直接在UI上发起reindex操作,等待结束即可。 如果是用于业务搜索集群,数据可能比较重要,建议升级前做一个Snapshot,万一升级过程出现意外,可以回退版本从备份里快速恢复数据。我们的日志集群数据量极大,也没有对数据100%不丢的要求,因此升级前没有做Snapshot。 做完所有的准备工作后,预先通知所有用户集群升级的时间以及可能产生的影响,选定了周五深夜用户低峰期,开始正式升级工作。  首先通过Ansible将新版本批量部署到所有结点并统一配置,紧接着对原有集群做了Full Stop,校验所有的ES已经停下后,开始Full Start。整个过程比较顺利,所有结点正常启动,数据恢复完成后,集群重新回到正常服务状态。 周末两天运行,未发现有任何的异样,CPU利用率也降了不少,看起来很靠谱……直到周一 踏坑 周一早上,随着用户访问量高峰来临,马上浮现出一个诡异的现象: 索引速率遇到了瓶颈,数据开始在前置的消息队列(Kafka)里堆积。 从监控数据看,尽管所有的数据结点CPU消耗都比上周同期低,磁盘IO也很低,但索引速率却低了很多。反复对比查看升级前后各类监控指标后,终于发现一个可疑点,所有结点的网络流量比升级前高了好几倍!  在集群架构上,我们是单独架设了几台client node做为数据写入和分发的入口,现在这几个node的网络流量已经饱和,成为数据写入的瓶颈。一开始,怀疑是否2.4启用了tcp压缩,而5.0取消了,但翻查官方文档后发现transport.tcp.compress在2.4和5.0里默认都是关闭的! 这时候只有两个解决办法了,要么启用tcp压缩,要么扩容client node。 先考虑了一下tcp压缩的方案,快速扒了一下ES源码,在transport.TcpTransport这个类里,sendRequest和sendResponse两个方法会根据transport.tcp.compress设置来决定发送的消息是否要经过压缩,而在messageReceived方法则会读取消息头部的状态信息,探测消息是否经过压缩以及压缩的方法,而后决定是否需要解压,以及采用的解压方式。 这样看起来,ES是允许tcp压缩和不压缩的结点之间通讯的,那么只对client node启用压缩应该就可以了。测试环境测试过后,验证了想法的可行性。于是对生产的client node开启tcp压缩,同时在数据发送端(hangout的ES output)也启用tcp压缩,重启client node后入口网络流量降到和之前2.4差不多的程度,问题得到规避。 针对这个问题在Github上提交了issue https://github.com/elastic/ela ... 21612, 但未得到官方合理的解释。 解决好这个问题,另外一个问题来了,很多执行大量历史数据搜索的用户反映出不了结果。 从监控数据看,这类查询的搜索耗时非常久,直到网关300秒超时(查询api前置的nginx代理)。我们之前对集群设置过Global Search timeout为60s,用来保护集群资源过多被超高代价的查询消耗,在2.4版本是有效果的,现在看来不起作用了。手动测试了一下,这个参数果然失效! 于是向官方报告了第2个问题:https://github.com/elastic/ela ... 21595 。 这个问题很快被官方确认为Bug,修复也很快加入到了5.0.2。 为了规避这个问题,我们只好临时修改了一下Kibana以及第三方API访问要经过的nginx proxy,默认为所有的search request加入一个超时选项。此后,问题有一些缓解,但仍然发现用户查询大范围历史数据时,部分用于存储历史数据的结点响应很慢。 我们的集群是做了冷热分离的结构的,热节点主要承担写入和存放过去24小时数据,冷结点没有写入,查询频率也低,所以为了最大化利用硬件资源,一台物理机上跑了3个实例,这样一台128GB内存的机器可以存放下近30TB的索引。查看冷结点的监控数据,看到用户查询期间磁盘的read IO非常高,直接将磁盘IO Util%撑到100%,并且可持续数小时,同时search thread pool有大量的active thread处于无法完成状态,search queue不断攀升直至饱和、开始reject。 表象上看search thread似乎一直在尝试从磁盘大量读取数据,一次search甚至可以持续几十分钟至一个小时,耗尽了所有的搜索线程,导致拒绝后续的搜索服务。 于是Github上报了第3个issue: https://github.com/elastic/ela ... 21611  这个问题找到解决办法之前,我们只能通过反复重启有问题的冷结点来缓解。 和官方讨论过程中,得知5.0在Lucene文件访问方式上有一个比较大的改动,2.4使用mmapfs读取索引文件的部分,而5.0以后改为用mmapfs读取索引文件的全部。怀疑问题和这个变动有关,尝试将所有索引文件的设置改为NIOFS后,问题迎刃而解。 搜索性能一下回到了2.4时代,再也没出现搜索线程超长时间执行的问题。之后找时间复现了这个问题,并抓取了线程栈,看到长时间执行的搜索线程一直在做Global Ordinal的构造工作。 至于为何会这样,还不清楚。 从官方给出的信息看,底层索引文件的访问模式是没有变化的,仅仅是将文件读取方式全部改成了mmapfs,理论上应该性能更好,但是看起来在我们这种一台机器跑多个ES实例,所有分配的heap为系统缓存3倍的极端用例下,大范围的数据搜索可能造成过高的磁盘读IO,集群性能指数级下降。 以上问题前后耗了4天才完全规避掉,支持团队连续熬夜后集群总算回复到平稳状态。然而好景不长,运行一段时间以后,数据结点出现疑似内存泄漏现象。结点总数据没怎么增加、甚至还有减少的情况下,heap使用率一只呈攀升趋势,Old GC无法回收内存。这个问题对用户影响较小,通过监控我们可以及时发现内存即将用尽的结点,做一次重启很快就恢复了。 为排查根源,我们对一个有问题的结点做了dump,通过MAT工具分析,看到meta data相关的一个alias对象被实例化了有6600万次之多! 在Github上提交了第四个issue: https://github.com/elastic/ela ... 22013,不多久被确认为已知问题https://github.com/elastic/ela ... 21284 ,在5.0.1已经修复。 最后还存在一个master node内存泄漏的问题,这个问题在2.4.0时代就存在了,升级到5.0.0以后依然没有修复。由于我们的master node和data node是分离的,所以这个问题比较容易通过监控发现,解决方式也很简单和迅速,重启master node即可,对用户完全无影响。之后不久,5.0.2版本正式发布,release notes里提到了对这个问题的修复 https://github.com/elastic/ela ... 21578 。 上周周末我们将集群rolling upgrade到了5.0.2,global search timeout失效和两个内存泄漏的问题从根源上解决掉了。 网络流量增大的问题依然存在,仍然需要通过启用client结点的transport.tcp.compress规避。 冷结点搜索性能的问题没看到有提及,估计没解决,安全起见,还是保持索引的文件系统为NIOFS。升级完成运行一段时间后,可以肯定,5.0.2已经比较稳定。 心得 升到5.0.2后,对于其中一组数据结点这两天特意加了点索引负载,通过监控数据将v5.0.2与2.4.0做实际运行环境的索引吞吐量对比。
2.4_.png
5.0_.png
  在近似的CPU使用率和load情况下,5.0.2能够支撑更大的吞吐量。另外5.0带来的Instant aggregation功能,对于跨多个索引的时序类型数据的聚合也可以有效Cache了,在使用Kibana的时候提速感觉非常明显。 升级过程虽然遇到很多波折,但由于集群架构上做了角色分离(client,master,data)和冷热分离,因而Bug引起的故障比较容易被限定在一个较小的范围而不至于影响所有的功能和所有的用户。 故障点定位更加容易,规避措施也更容易实施。 部分规避措施实施过程中甚至对用户是完全无影响的,比如: 重启内存泄漏的master node)。详尽的监控为问题的发现和诊断提供了有力的支持。 Elasticsearch是非常复杂的系统,官方的测试无法覆盖所有的用例场景和数据规模,一些极端的应用场景可能触发某个深藏的Bug或者缺陷而陷入困境。 因此对于稳定性要求极高的应用,最好还是采用经过长时间考验的版本,比如v2.4.2。

Day6:《记一次es性能调优》

Elasticsearchxiaorui 发表了文章 • 4 个评论 • 5048 次浏览 • 2016-12-13 11:49 • 来自相关话题

一.前言 应medcl写es文章的时候,其实这段时间es研究的不多,感觉没什么新东西可写。 考虑只有这次调优心得可与大家分享,文笔有限,见谅! 二.背景 先交代一下背景,调优的项目是某电商类搜索项目,流量来自于前端的app和h5。 搜索主要是根据用户的地理位置和关键字等条件搜索附近的商家和商品。 商品数据大概在5000w左右,商品更新很频繁,更新量大概是每天2000w条左右,(因商家经常会促销、或者调上下架状态、改价格等)查询也相当频繁。 集群有2个集群,一个主一个备,用于有问题的时候随时切换。主集群有8个节点,配置是32核, 32g内存的docker的机器。给es jvm分配20g内存,jdk 版本是1.7,gc 是使用parnew/cms gc。 这个项目我是后期加入的,来的时候项目已上线。由于参与进来的时候es跑的也还是比较稳定,所以也一直 没调过es的参数。程序,参数基本上也就保持上线的时候那个样子。 es上线的时候是用的1.5版本,后期没升过级。 三.问题 项目大概跑了一年多,时间来到大概16年的9月份。搜索请求响应时间开始出现几秒才完成的情况, 我就被拉过来调优了。通过我们自己内部的调用方法监控,tp99和avg这些值还好,维持在200ms以下。max 最大有5,6s的情况,而且次数有点多。 这里没怎么折腾,很快定位就是es gc导致的。翻了一下es gc日志,就是cms remark这个阶段时间特别长, 而且 这个阶段是stop the world的。 四.解决 为什么remark阶段这么长时间? 直接上结论,就是一次cms 周期内,并发标记后到remark这个期间jvm 堆内存对象 变化很大。说白了对应我们的场景就是一大波 es bulk操作。对应Bigdesk观察,几秒的卡顿基本都出现在一大波 es bulk操作之后。 这里解释一下,引用网上文章的说法: remark如果耗时较长,通常原因是在cms gc已经结束了concurrent-mark步骤后,旧生代的引用关系仍然发生了很多的变化,旧生代的引用关系发生变化的原因主要是: * 在这个间隔时间段内,新生代晋升到旧生代的对象比较多; * 在这个间隔时间段内,新生代没怎么发生ygc,但创建出来的对象又比较多,这种通常就只能是新生代比较大的原因; 原文地址: http://hellojava.info/?tag=cms-gc-remark 调整一: 加cms gc 的 线程 直接从根源入手,你remark 慢,我就让你跑快点。 因为我们是32 核的cpu ,cpu 利用率用bigdesk观察还是很低的,5%左右。这么低,那就加点线程呗。 -XX:ParallelGCThreads= N -XX:ParallelCMSThreads= M 调整这2个参数都可以,它们的关系:ParallelCMSThreads = (ParallelGCThreads + 3)/4) 调整后情况缓解了一些,remark还是有3,4秒的情况。 调整二: 关于这点是我们自己的问题。一次bulk 操作就写1条数据。 是的,你没有看错,我们这边的工程师就是这么干的。 一年以前提过这里最好是能合并写,但现在还是这个样子。 这里有一些业务上的原因,合并写会导致一些字段值不准确。 合并写暂时没办法,只能调整es 了。(这里说明一下,其实合并写应该是本次优化比较有效果的办法,可惜这招不让我用。) 通过bigdesk观察,bulk线程池有reject的情况。 但就增加bulk线程池的消费线程,加快数据的消费速度,减少对象驻留在jvm 的时间。 调整后情况没有明显的好转, 但肯定有用,能优化一点是一点嘛。 调整三: 再次从gc入手, -XX:+CMSParallelRemarkEnabled -XX:+CMSScavengeBeforeRemark 这个是网上找的办法: 为了减少第二次暂停的时间,开启并行remark: -XX:+CMSParallelRemarkEnabled。 如果remark还是过长的话,可以开启-XX:+CMSScavengeBeforeRemark选项,强制 remark之前开始一次minor gc,减少remark的暂停时间,但是在remark之后也将立即开始又一次minor gc。调整后情况也没有特别的好转。 以上都是从减小单次cms gc的开销的方向去解决问题,然后我就换了个方向,降低cms gc发生的次数,让它少发生或者不发生。 调整四: 这里调整了一共5个参数, Xmn10g ==> 8g CMSInitiatingOccupancyFraction=70 ==>80 index.cache.filter.max_size 2g==>1g index.cache.filter.expire 2m==>40s index.refresh_interval 20s==>30s 前2个参数没什么好说的,提高cms gc 被触发的条件,降低cms gc 触发几率。 后3个参数是本文的重点,这里大概讲下es 对于filter cache的管理。 这部分是通过阅读源码分析出来的,涉及代码还挺多,有点复杂,还有很大一部分还是lucene的代码。 这里就不贴大段代码了。 es 对于 filter cache管理是内部维护了一个map的结构(实际是利用com.google.common.cache实现的),关键是这个map 的key 是个它自己定义的类 叫 FilterCacheKey,它override了equals方法 public FilterCacheKey(Object readerKey, Object filterKey) {     this.readerKey = readerKey;     this.filterKey = filterKey; } ... @Override public boolean equals(Object o) {     if (this == o) return true; //            if (o == null || getClass() != o.getClass()) return false;     FilterCacheKey that = (FilterCacheKey) o;     return (readerKey().equals(that.readerKey()) && filterKey.equals(that.filterKey)); } 从这里可以看出,filter cache 能否被再次利用到就跟readerKey和filterKey 有关。 filterkey如果你build 查询语句的时候什么都没设置的话,就是filter对象本身。 举个例子,TermFilter,如果term一样,那前后2次查询filterKey是一致的。 关键是这个readerKey是否一致呢?这个readerKey其实就是lucene 的 indexReader,如果前后2次查询有数据更新并且 index.refresh_interval 这个参数设置的比较小,es 会去刷新indexReader,那么很有可能readerKey不一致。 对应我们的场景,数据更新频繁,而且index.refresh_interval 比较小,那么缓存利用率就不太高。 后一次查询的filter cache 会重新put 到上面提到的map里,然后这个index.cache.filter.max_size 2g  就迅速占满(我们程序代码里很多地方使用到了filter),配多大都有可能占满。那这个filter cache什么时候被移除呢,index.cache.filter.expire 2m管移除这个事,当然应该还有size满了移除的策略。 这就造成了缓存没什么用,还占这么大地方,还占那么久。 然后这个filter cache就很可能跑到 old gen去了。 那么就让它占少点,不干活就快点走: index.cache.filter.max_size 2g==>1g index.cache.filter.expire 2m==>40s index.refresh_interval 20s==>30s 这些调整完毕,重启es ,通过bigdesk ,gc a线图好看多了,cms gc 基本没了,monitor gc 也顺畅多了。 五.总结和备注 总结: 1.优化这个事,我认为是业务上的优化要比技术上优化有效的多 2.日志和监控是你最好的朋友,仔细的看,你总会看出什么 3.es 缓存要利用好,还是需要好好去设计,回头考虑单独写一篇。 备注: 因为这次优化后,我就离开了原来的公司,没有了原来的环境。所以本文 部分参数和数字可能不准确,我仅凭记忆完成。  

Day5: 《PacketBeat奇妙的OOM小记》

Adventkira8565 发表了文章 • 0 个评论 • 2621 次浏览 • 2016-12-05 23:00 • 来自相关话题

Beats这个项目的确很好用,几行命令下来,一个成型的Agent就出来了。使用者只需要关注采集什么数据就好,后续的事情libbeat基本都处理完了。不过值得吐槽的是,Beat太散了,管理起来东一个西一个的,产品化的时候对客户说,我们要在机器上放n个Agent不知道客户会是什么样的表情。
d7d0a529244b57acb6ce3796da132df8.jpg
不过轻量级、已部署的特点还是极大的吸引了我,于是就有了后面的事情了。 PacketBeat不明原因的OOM 某天我把PacketBeat放到了我的服务器上面,这台服务器上面有个MongoDB,MongoDB主要是拿来存放ES的元数据的。ES2.x的时候并没有很好的元数据管理,为了能让ES的索引分配的比较均匀,并且有元数据辅助查询,设计好一个元数据管理的仓库是必要的。然后我打开了对MongoDB的抓包功能,恩,一切都很好,接着我打开了日志管理页面,看到了一条一条的MongoDB的包被抓回来,解码,然后塞到了ES。可是第二天一看,咦??Packet跪了?不是吧,ElasticSearch做的产品这么不稳定么。我不信。
06170826_dLgU.png
然后我又启动了第二次,紧接着熟练的top了一下,观察了PacketBeat半个多小时,在被观察的这段时间里面,PacketBeat的表现非常的正常,看不出有什么异样。好吧,那上一次的OOM可能只是个意外,Windows也经常蓝屏嘛,OOM一次也正常。结果第二天我再次打开终端,发现这货居然又OOM了!!
06170909_irst.png
好吧好吧,我感觉我已经踩到Bug了,拿了开源社区这么多东西,总得贡献一下的,好吧,提个Issue去 https://github.com/elastic/beats/issues/2867 真相只有一个 微信群里面聊起这个奇妙的OOM,Medcl大神问是不是因为采集了ES的日志,(我的这台服务器和日志服务器有关系)然后导致滚雪球把PacketBeat给滚死了。咦?说不定真的是这个原因耶!但是看了看PacketBeat,我并没有抓ES的包,而且假如我采集了ES的包,应该一下就OOM掉了,不应该等那么久。不过这么一说,却仿佛打开了新世界的大门
06171040_FVEG.png
我把这台服务器在日志服务器中的角色重新梳理了下,终于发现了这次OOM的原因了。。 由于2.X的ES没有比较好的元数据信息,所以当日志送到LogServer的时候,我做了些额外的操作,让LogServer持久化到ES一定量的时候就会往Mongo写一下元数据信息(当然也有其他服务会往里面做CRUD啦),开始的时候访问Mongo的次数其实是很少的,假设按1W来算。那么问题来了,由于我们的PacketBeat抓了Mongo的包,那么LogServer往ES的CRUD操作都会被PacketBeat给抓走,然后再送回给LogServer
06171248_tcdl.png
那么一个隐藏的滚雪球事件就产生了,刚开始的那段时间,Mongo被抓包的次数只有1W,然后就往LogServer多送了1W条日志,不。。应该多很多,毕竟网络包嘛,然后就导致LogServer因为要管理元数据的频率开始逐渐地提高,逐渐提高CRUD的频率后抓包的内容也越来越多,紧接着到这发生到LogServer的频率也越来越高。。。。。每次PacketBeat崩掉的时候,都送了80W左右的日志量出去,然后它就OOM掉了(因为我那台机器就只剩下2G的空闲内存给它用,被系统给干掉了)。。我居然发现了这样的场景
06171336_PbWI.png
结论 使用PacketBeat的时候,记得要留意一下有没这种反馈型滚雪球的情况,多发生在自己的日志服务器上面。当然那种直接抓ES的就没什么好说了,估计启动了之后没多久就崩溃掉了

Day4: 《将sql转换为es的DSL》

AdventXargin 发表了文章 • 6 个评论 • 9472 次浏览 • 2016-12-04 23:23 • 来自相关话题

es现在几乎已经是开源搜索引擎的事实标准了,搭建简易,使用方便。不过在很多公司里(包括我司的部分部门),并不是把它当搜索引擎来用,而是当db来用。因为本身查询/搜索原理的区别,使es在千万或者亿级的数据中进行逻辑筛选相对高效。例如一些wms、工单查询系统,单表几十个甚至上百个字段,如果在数据库里为每种类型的查询都建立合适的索引,成本比较高,更不用说索引建多了还会影响到插入速度,后期的索引优化也是比较麻烦的问题。 不过如果把es当db来使的话,始终会有一个绕不过去的坎。就是es的DSL。让所有业务开发去学习dsl的话也不是不可以,但DSL真的有点反人类(不要打我)。简单的a and b或者a or b还比较容易写,如果我要的是a and (b and (c or d) and e)的查询逻辑,那我觉得谁写都会晕。即使是用官方或者第三方提供的client,如果需求多种多样的话,想要灵活地实现`需求=>DSL`的过程还是比较痛苦。 对于业务开发来说,当然是sql更平易近人(毕竟写了这么多年CRUD)。所以还有一种歪门邪道的流派,直接把sql转成DSL。要做sql和DSL转换的工作,需要进行sql的解析,先不要怵,这个年代找一个靠谱的sql parser还是比较容易的。比如阿里开源的druid连接池里的sql模块:   https://github.com/alibaba/dru ... d/sql 因为笔者的实现是用的下面这个golang版的parser: https://github.com/xwb1989/sqlparser 所以用这个来举例吧~ 这个是其作者从youtube/vitness里提取并进行改进的一个parser,我们能用到的是一部分子集功能,只需要解析select类的sql。 先举个简单的sql的例子:
select * from x_order where userId = 1 order by id desc limit 10,1;

解析之后会变成golang的一个struct,来看看具体的定义:

&sqlparser.Select{
    Comments:sqlparser.Comments(nil),
    Distinct:"",
    SelectExprs:sqlparser.SelectExprs{(*sqlparser.StarExpr)(0xc42000aee0)},
    From:sqlparser.TableExprs{(*sqlparser.AliasedTableExpr)(0xc420016930)},
    Where:(*sqlparser.Where)(0xc42000afa0),
    GroupBy:sqlparser.GroupBy(nil),
    Having:(*sqlparser.Where)(nil),
    OrderBy:sqlparser.OrderBy{(*sqlparser.Order)(0xc42000af20)},
    Limit:(*sqlparser.Limit)(0xc42000af80),
    Lock:""
}
sql的select语句在被解析之后生成一个Select的结构体,如果我们不关心使用者需要的字段的话,可以先把SelectExprs/Distinct/Comments/Lock里的内容忽略掉。如果不是分组统计类的需求,也可以先把GroupBy/Having忽略掉。这里我们关心的就剩下From、Where、OrderBy和Limit。 From对应的TableExprs实际上可以认为是简单的字符串,这里的值其实就是`x_order`。 OrderBy实际上是一个元素为
type Order struct {
    Expr      ValExpr
    Direction string
}\
的数组。 Limit也很简单,
type Limit struct {
    Offset, Rowcount ValExpr
}
其实就是俩数字。 那么剩下的就是这个Where结构了。where会被解析为AST(`https://en.wikipedia.org/wiki/Abstract_syntax_tree`),中文是抽象语法树。在不说子查询之类的情况下,这个AST也不会太复杂,毕竟where后面的情况比起编译原理里的程序语言来说情况还是要少得多的。以上述的sql为例,这里解析出来的Where结构是这样的:
&sqlparser.Where{
    Type:"where",
    Expr:(*sqlparser.ComparisonExpr)(0xc420016a50)
}
只有一个节点,一个ComparisonExpr表达式,这个ComparisonExpr,中文比较表达式,指代的就是我们sql里的`user_id = 1`。实际上我们可以认为这个"比较表达式"即是所有复杂AST的叶子节点。叶子结点在AST遍历的时候一般也就是递归的终点。因为这里的查询比较简单,整棵AST只有一个节点,即根节点和叶子节点都是这个ComparisonExpr。 再来一个复杂点的例子。
select * from users where user_id = 1 and product_id =2

=>

&sqlparser.Where{
    Type:"where",
    Expr:(*sqlparser.AndExpr)(0xc42000b020)
}

AndExpr有Left和Right两个成员,分别是:

Left:
&sqlparser.ComparisonExpr{
    Operator:"=",
    Left:(*sqlparser.ColName)(0xc4200709c0),
    Right:sqlparser.NumVal{0x31}
}

Right:
&sqlparser.ComparisonExpr{
    Operator:"=",
    Left:(*sqlparser.ColName)(0xc420070a50),
    Right:sqlparser.NumVal{0x32}
}
稍微有一些二叉树的样子了吧。把这棵简单的树画出来:
Untitled1.png
回到文章开头的那个复杂的例子:
a and (b and (c or d) and e)

=>

select * from user_history where user_id = 1 and (product_id = 2 and (star_num = 4 or star_num = 5) and banned = 1)
看着真够麻烦的,我们把这棵树画出来:
Untitled.png
这样看着就直观多了。我们有了AST的结构,那要怎么对应到es的查询DSL呢?少安毋躁。 我们知道es的bool query是可以进行嵌套的,所以实际上我们可以同样可以构造出树形结构的bool query。这里把bool嵌套must和bool嵌套should简化一下,写成boolmust和boolshould: 例如a and (b and c)
query {
    boolmust {
        a,
        boolmust {
            b,
            c
        }
    }
}
我们把query内部的第一个boolmust当作根节点,内部嵌套的a和另一个boolmust当作它的两个子节点,然后b和c又是这个boolmust的子节点。可以看出来,实际上这棵树和AST的节点可以一一对应。 再回到文章开头的例子,a and (b and (c or d) and e):
query {
    boolmust {
        a,
        boolmust {
            b,
            boolshould {
                c,
                d
            },
            e
        }
    }
}
和前文中ast来做个简单的结构对比~
dsl和ast对比.png
和前文中sql的where解析后的AST树也是完全匹配的。思路来了,只要对sql解析生成的AST进行递归,即可得到这棵树。当然了,这里还可以进行一些优化,如果子节点的类型和父 节点的类型一致,例如都是and表达式或者都是or表达式,我们可以在生成dsl的时候将其作为并列的节点进行合并,这里不再赘述。 在递归中有这么几种情况:
AndExpr => bool must [{left}, {right}]
OrExpr => bool should [{left}, {right}]
ComparisonExpr => 一般是叶子节点
ParenBoolExpr => 指代括号表达式,其实内部是上述三种节点的某一种,所以直接取出内部节点按上述方法来处理
这样问题就变成了如何处理AST的叶子节点。前面提到了叶子节点实际上就是Comparison Expression。只要简单进行一些对应即可,下面是我们的项目里的一些对应关系,仅供参考:
convert.png
最后再附上demo   https://github.com/cch123/elasticsql

Day3: 《创建一个你自己的 Beat》

Adventmedcl 发表了文章 • 0 个评论 • 3880 次浏览 • 2016-12-03 22:19 • 来自相关话题

Elastic Advent 第三篇, 手头上事情实在太多,这两天正在进行权威指南翻译的冲刺阶段,临时填下坑,翻译官网的一篇文章吧(原文:https://www.elastic.co/blog/build-your-own-beat),Advent 规则很自由的,没说不能翻译文章啊,嘿嘿嘿,另外号召大家踊跃报名,大家一起玩才有意思。   活动地址:http://elasticsearch.cn/article/107   言归正传!  Beat 是一个开源的用来构建轻量级数据汇集的平台,可用于将各种类型的数据发送至Elasticsearch 与 Logstash。我们有 Packetbeat 用于监控局域网内服务器之间的网络流量信息,有 Filebeat 收集服务器上的日志信息,还有新推出的 Metricbeat 可以定期获取外部系统的监控指标信息,除此以外,你还可以非常方便的基于 libbeat 框架来构建你属于自己的专属 Beat,目前 beas 社区已经有超过25个 Community Beats 了。 Elastic 还提供一个 Beat generator(Beat 生成器)来帮你快速构建属于你自己的 Beat。通过这篇博客你将会看到如何通过 Beat 生成器来快速创建一个你自己的 Beat。今天我们创建的是一个叫做 lsbeat 的 Beat,lsbeat 非常类似 Unix 系统下的命令行 ls,我们用 lsbeat 来索引目录和文件信息。本篇文章环境基于 Unix 系统,如果你是 Windows 或是其它系统,相关操作可能需要根据实际情况进行调整。 第一步 – 配置 Golang 环境 Beats 是用 Golang写的,显然,要创建和开发一个 beat,Golang 环境必不可少,关于这方面的文章很多,建议查看这篇 Golang 的安装向导: install Golang。当前 Beats 需要的最低版本是 Golang 1.6。另外请确保正确设置了你的 $GOPATH 环境变量。 另外 Golang Glide 被用来进行包的依赖管理,所以也需要确保正确安装,最低版本是 Glide 0.10.0,安装说明点这里。 让我们先来看看 lsbeat 将会用到的一段代码吧,这是一个简单的 golang 程序,通过命令行接收一个目录参数,然后列出该目录下的文件和子目录信息。
package main
 
import (
    "fmt"
    "io/ioutil"
    "os"
)
 
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
 
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
 
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}
后面我们将使用到这段代码和 listDir 函数。 第二步 – 生成项目 要生成一个你自己的 Beat,就要用到 beat-generator 了,首先你必须安装 cookiecutter。安装的详细说明看这里。安装好 cookiecutter 之后,我们要给自己的 Beat 起一个好听的名字,最好是小写的英文字母,我们今天这个例子就叫 lsbeat 吧。 生成项目模板之前,我们需要下载 Beats generator 包文件,就在 beats 仓库。安装好 GoLang 之后,你就可以很方便的使用 go get 命令来下载 Beats generator 包文件了。 当你执行下面的这个命令后,所有的源码文件都会下载到 $GOPATH/src 目录。
$ go get github.com/elastic/beats
在 GOPATH 下创建一个以你自己github账号名称命名的目录,并切换过去,然后执行 cookiecutter 命令并指向 Beat Generator 源码路径。
$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat
Cookiecutter 接下来会问你几个问题,比如项目名称,我们输入:lsbeat;你的 github 用户名,输入你自己的 github 账户;还有两个关于beat和beat_path应该会自动识别,默认回车就好;最后的问题,你可以输入你的姓和名。
project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}
现在应该已经创建好了一个名为 lsbeat 的目录,并且里面应该会生成一些文件,让我们一起来看一下吧,结构如下:
$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py
我们刚刚已经生成好了一个原始的 Beat 模板了,但是你还需要获取相关的依赖和设置好 git 仓库。 首先,你需要拉取依赖的相关包信息,我们的这个例子是 lsbeat,我们先做一些的基本的配置,回头再看看详细看看其它的模板和配置文件,只需要执行 make setup 就可以自动获取依赖。
$ make setup
当你创建好了自己的 Beat 之后,记得上传到 github 仓库,并和社区进行分享哦,如下:
beats.png
要 push lsbeat 到你的 git 仓库,只需要执行如下命令:
$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master
恭喜你,现在你已经完成了一个 Beat ,并且发布了第一个版本到了 Github,不过里面还没有什么具体内容,现在让我们进一步看看里面的代码吧。 第四步 – 配置 执行过上面一系列命令之后,项目里将会自动创建名为 lsbeat.yml 和 lsbeat.template.json 的配置文件。所有的基本配置项都已经生成在了里面。
lsbeat.yml
lsbeat:
# Defines how often an event is sent to the output
period: 1s
Period 参数包含在每一个生成的 Beats 里面,它表示 lsbeat 将会每隔 1 秒钟轮询一次,这里我们修改 period 时间间隔为 10 秒。还可以在修改 etc/ 目录下面的 beat.yml 文件,这里新增一个 path 参数表示我们具体要监听哪个目录。
lsbeat:
# Defines how often an event is sent to the output
period: 10s
path: "."
参数添加好了之后,我们只需要运行 make update 命令就能让这些修改应用到配置文件lsbeat.yml。
$ make update
$ cat lsbeat.yml
 
################### Lsbeat Configuration Example #########################
 
############################# Lsbeat ######################################
 
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################
修改完 yml 文件,记得修改 config/config.go文件,添加一个path 参数。
package config
 
import "time"
 
type Config struct {
Period time.Duration `config:"period"`
Path   string        `config:"path"`
}
 
var DefaultConfig = Config{
Period: 10 * time.Second,
Path:   ".",
}
同时我们修改 period 默认时间间隔为 10 秒,默认监听的是当前目录 (.) 。. 第五步 – 添加代码 每一个 Beat 需要实现 Beater 接口,里面定义了 Run() 和 Stop() 函数。.  我们可以定义一个名为 Lsbeat 的结构体,然后用这个对象实现 Beater 接口。然后添加字段 lastIndexTime 来保存最后运行的时间戳信息。
type Lsbeat struct {
done   chan struct{}
config config.Config
client publisher.Client
 
lastIndexTime time.Time
...
}
另外,每个 Beat 还需要实现 New() 方法来接收 Beat 配置信息和返回 Lsbeat 的具体实例。
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
 
ls := &Lsbeat{
done:   make(chan struct{}),
config: config,
}
return ls, nil
}
在我们的 lsbeat 例子中,我们要做的就是扩展默认的 Run() 函数来导出指定目录的文件和子目录信息。 在修改 Run() 函数之前,我们先在 lsbeat.go 增加 listDir() 函数,就是我们前面最开始测试的那段代码,用于收集目录和文件信息的简单例子稍微修改一下。另外我们还要生成以下字段信息:
"@timestamp": common.Time(time.Now()),
"type": beatname,
"modtime": common.Time(t),
"filename": f.Name(),
"path": dirFile + "/" + f.Name(),
"directory": f.IsDir(),
"filesize": f.Size(),
第一次运行的时候我们将索引所有的文件和目录信息,然后我们再定期检查是否有新文件被创建或者修改,再索引这些新创建的文件和目录。每次定期检查的时间戳都会保存在 lasIndexTime 变量,完整代码如下:
func (bt *Lsbeat) listDir(dirFile string, beatname string, init bool) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()
 
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type":       beatname,
"modtime":    common.Time(t),
"filename":   f.Name(),
"path":       dirFile + "/" + f.Name(),
"directory":  f.IsDir(),
"filesize":   f.Size(),
}
if init {
// index all files and directories on init
bt.client.PublishEvent(event)
} else {
// Index only changed files since last run.
if t.After(bt.lastIndexTime) {
bt.client.PublishEvent(event)
}
}
 
if f.IsDir() {
bt.listDir(dirFile+"/"+f.Name(), beatname, init)
}
}
}
记住在最开始需要导入 “io/ioutil” 包。
import (
"fmt"
"io/ioutil"
"time"
)
现在,让我们看看如何在 Run() 函数里面调用 listDir() 函数,并且保存时间戳到 lasIndexTime 变量。
func (bt *Lsbeat) Run(b *beat.Beat) error {
logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
 
bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
 
select {
case <-bt.done:
return nil
case <-ticker.C:
}
 
bt.listDir(bt.config.Path, b.Name, true)   // call lsDir
bt.lastIndexTime = time.Now()               // mark Timestamp
 
logp.Info("Event sent")
counter++
}
 
}
函数 Stop() 用来中断 run 的循环执行,保持默认生成的就行。
func (bt *Lsbeat) Stop() {
bt.client.Close()
close(bt.done)
}
到这里,编码部分基本就完成了。我们接下来添加新字段到 mapping 中,修改文件 etc/fields.yml。.
- key: lsbeat
  title: LS Beat
  description:
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

重新应用新的配置。 $ make update 字段 file_name 将使用 nGram 分词,我们还需要在文件 lsbeat.template.json 的 “settings” 节点添加一个自定义的 analyzer。
{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

第六步 – 编译和运行 现在我们可以编译和运行了,只需要执行 make 命令就可以编译出可执行文件 lsbeat (lsbeat.exe on windows) 。 $ make 修改 lsbeat.yml 文件,设置需要监听的目录,如: “/Users/ec2-user/go”,记住是全路径。
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"
同时确保你的 elasticsearch 和 kibana 正常运行。现在运行一下 lsbeat 命令看看会发生什么事情吧。 $ ./lsbeat 打开Kibana,通过调用 _cat 接口我们看看的索引是不是创建了。
beats-1.png
可以看到创建了一个名为 lsbeat-2016.06.03 的索引,并且看到已经有了一些文档了。现在对 filename 字段查询一下,由于使用的是 nGram 分词,支持模糊匹配,我们使用 lsbe 关键字搜一下。
beats-2.png
大功告成! 恭喜你,你已经完成了第一个属于你自己的 beat。

Day2:《Kibana 系漫游指南》

Advent三斗室 发表了文章 • 6 个评论 • 6677 次浏览 • 2016-12-02 22:48 • 来自相关话题

大家好,欢迎你们来到 ELK 三体星系的第二天。昨天,wood 送给大家一本脚踏实地的生存指南,今天让我们仰望星空,由我给大家介绍一下围绕在 Kibana 身边的诸多行星们~ Kibana Plugin 类型简介 我们最熟悉的 Kibana Plugin,其实就是 Kibana 本身~ Kibana 提供了一整套框架,我们可以在此基础上,开发诸多不同类型的插件,包括:
  • app
  • visTypes
  • spyModes
  • fieldFormatter
列这么几个源码里的名词出来可能大家觉得比较晦涩。其实呢,app 就是同时具有前后端实现的应用,在 Kibana 5 里,默认分发的 app 有四个:实现日志查询和可视化的 kibana app、实现时序指标统计和可视化的 timelion app、实现和 ES 接口交互命令的 console app、在有异常的时候才看得到的状态页面 status_page app。 而 visTypes 则是在 kibana 中具体可用的可视化效果。默认分发的有:kbn_vislib_vis_types、metric、table、markdown。我们常用的那些由 D3.js 完成的饼图线图地图,都是在 kbn_vislib_vis_types 中完成的。 fieldFormatter 则用来定义在 ES 中相同类型的数据,根据其实际含义,可以有不同的展示方式。比如说:URL 肯定是一个字符串,但是可以用 fieldFormatter 把它在页面展示的时候,加上 `<a href></a>` 的样式,让人一键点击;同理,还可以过滤判断一下图片类 URL,加上 `<img src></img>` 的样式,直接在 Kibana 界面上就看图片内容~~ 官方的我们会看手册啦~ 好啦好啦,我也不会真的去抄一把官方手册假冒《Kibana 系漫游指南》来骗你们流量的。下面给大家介绍一些社区开源的,让你绝对眼前一亮的各种新奇扩展: 1. logtrail     这是一个 app 插件,创意来自 papertrail 公司的产品。完全的满足了 Geeker 们喜欢黑底白字终端的癖好~不过其实实现非常简单:每隔 10 秒请求一次最近 500 条日志就是啦! 2. vectormap     这是一个 visType 插件,也就是我们在 Kibana3 里曾经用过的 map panel 效果。这个插件不被官方直接采用的一个原因是版权许可问题。不做商用的情况下,这个插件还是可以极大方便我们做行政区域的访问情况统计和展示的。      3. kbn_network     这也是一个 visType 插件,酷毙了的网状图效果!通过不同的 aggs 数据展示 node 和 relational。     注意这个跟 Elastic 的 graph 并不是完全一致的东西。该插件要求你本身的数据已经有直接的关联可用。      4. sentinl     这是一个同时带有 spyMode 和 app 双插件的项目。其基础思路是参照 Elastic 的 Watcher 接口,但是将监控告警的进程从 ES 挪到 Kibana 里。同时还可以通过 phantomjs 做到截图报表。          这个项目最大的特点,是通过 spyMode 插件,大大降低了配置告警规则的复杂度。这个扩展让你可以在 Kibana 上配置任意聚合效果之后,就地点击定义当前聚合语句为告警规则!      5. kibana-keynote     这是另一个剑走偏锋的 app 插件,出自 Kibana 作者本人之手。它的作用是:播放 keynote 演讲稿!事实上项目里放的演讲稿就是作者本人在 ELastic{ON} 2016 上用的。让我们猜一猜下周的大会上,他会不会就用这个插件给我们分享呢? 今天就先讲这几颗最闪亮的星了~有兴趣了解更多 Kibana 行星的游客,欢迎阅读全本《Kibana系漫游指南》。 也欢迎观看 Kibana 行星的《探索·发现》节目哟~

Day1: 大规模Elasticsearch集群管理心得

Adventkennywu76 发表了文章 • 63 个评论 • 27649 次浏览 • 2016-12-02 10:07 • 来自相关话题

【携程旅行网 吴晓刚】  ElasticSearch目前在互联网公司主要用于两种应用场景,其一是用于构建业务的搜索功能模块且多是垂直领域的搜索,数据量级一般在千万至数十亿这个级别;其二用于大规模数据的实时OLAP,经典的如ELKStack,数据规模可能达到千亿或更多。 这两种场景的数据索引和应用访问模式上差异较大,在硬件选型和集群优化方面侧重点也会有所不同。一般来说后一种场景属于大数据范畴,数据量级和集群规模更大,在管理方面也更有挑战。 应Medcl大大的邀请,为ES中文社区做今年的Advent开篇,分享一下我在管理自家公司用于日志分析的ES集群方面的一点心得,蜻蜓点水,泛泛而谈,希望大方向上能对大家提供一些帮助。 这里的自家,即是携程旅行网。从2013年开始接触ES,我们团队先后实践过0.9.x -> 5.0.0中间各个版本,从最初只用于运维内部IIS日志的分析,到如今支持IT、呼叫中心、安全、测试、业务研发等多个部门超过200种日志型数据的实时检索与分析。 一路走来,愉悦了大家,也死磕了自己。 目前我们最大的日志单集群有120个data node,运行于70台物理服务器上。数据规模如下:
  • 单日索引数据条数600亿,新增索引文件25TB (含一个复制片则为50TB)
  • 业务高峰期峰值索引速率维持在百万条/秒
  • 历史数据保留时长根据业务需求制定,从10天 - 90天不等
  • 集群共3441个索引、17000个分片、数据总量约9300亿, 磁盘总消耗1PB
  • Kibana用户600多人, 每日来自Kibana和第三方的API调用共63万次
  • 查询响应时间百分位 75%:0.160s  90%:1.640s 95%:6.691s 99%:14.0039s
运维这样大规模的ES集群,有哪些值得注意的地方? 一. 必不可少的工具 工欲善其事必先利其器,从一开始,哪怕就只有几个node,就应该使用分布式配置管理工具来做集群的部署。随着应用的成熟,集群规模的逐步扩大,效率的提升会凸显。 官方提供了ES Puppet Module和Chef Cookbook,熟悉这两个工具的同学可以直接拿过来用。 我们自己则是采用的Ansible,编写了一套Playbook来达到类似的效果。 用熟这类工具,对于集群的初始部署,配置批量更改,集群版本升级,重启故障结点都会快捷和安全许多。 第二个必备利器就是sense插件。通过这个插件直接调用集群的restful API,在做集群和索引的状态查看,索引配置更改的时候非常方便。语法提示和自动补全功能更是实用,减少了翻看文档的频率。在Kibana5里面,sense已经成为一个内置的控制台,无需额外安装。 二. 硬件配置 我们采用的是32vcoreCPU + 128GB RAM的服务器,磁盘配置大部分服务器是12块4TB SATA机械磁盘做的Raid0,少部分机器是刚上了不久的6块800GB SSD raid0,主要目的是想做冷热数据分离,后面谈到集群架构的时候,再进一步解释一下如何利用硬件资源。 三. 集群的管理
  1. 首先很有必要对ES的结点做角色划分和隔离。大家知道ES的data node除了放数据以外,也可以兼任master和client的角色,多数同学会将这些角色混入到data node。然而对于一个规模较大,用户较多的集群,master和client在一些极端使用情况下可能会有性能瓶颈甚至内存溢出,从而使得共存的data node故障。data node的故障恢复涉及到数据的迁移,对集群资源有一定消耗,容易造成数据写入延迟或者查询减慢。如果将master和client独立出来,一旦出现问题,重启后几乎是瞬间就恢复的,对用户几乎没有任何影响。另外将这些角色独立出来的以后,也将对应的计算资源消耗从data node剥离出来,更容易掌握data node资源消耗与写入量和查询量之间的联系,便于做容量管理和规划。
  2. 避免过高的并发,包括控制shard数量和threadpool的数量。在写入量和查询性能能够满足的前提下,为索引分配尽量少的分片。分片过多会带来诸多负面影响,例如:每次查询后需要汇总排序的数据更多;过多的并发带来的线程切换造成过多的CPU损耗;索引的删除和配置更新更慢Issue#18776; 过多的shard也带来更多小的segment,而过多的小segment会带来非常显著的heap内存消耗,特别是如果查询线程配置得很多的情况下。 配置过大的threadpool更是会产生很多诡异的性能问题Issue#18161里所描述的问题就是我们所经历过的。 默认的Theadpool大小一般来说工作得很不错了。
  3. 冷热数据最好做分离。对于日志型应用来说,一般是每天建立一个新索引,当天的热索引在写入的同时也会有较多的查询。如果上面还存有比较长时间之前的冷数据,那么当用户做大跨度的历史数据查询的时候,过多的磁盘IO和CPU消耗很容易拖慢写入,造成数据的延迟。所以我们用了一部分机器来做冷数据的存储,利用ES可以给结点配置自定义属性的功能,为冷结点加上"boxtype":"weak"的标识,每晚通过维护脚本更新冷数据的索引路由设置index.routing.allocation.{require|include|exclude},让数据自动向冷结点迁移。 冷数据的特性是不再写入,用户查的频率较低,但量级可能很大。比如我们有个索引每天2TB,并且用户要求保持过去90天数据随时可查。保持这么大量的索引为open状态,并非只消耗磁盘空间。ES为了快速访问磁盘上的索引文件,需要在内存里驻留一些数据(索引文件的索引),也就是所谓的segment memory。稍微熟悉ES的同学知道,JVM heap分配不能超过32GB,对于我们128GB RAM, 48TB磁盘空间的机器而言,如果只跑一个ES实例,只能利用到32GB不到的heap,当heap快用饱和的时候,磁盘上保存的索引文件还不到10TB,这样显然是不经济的。 因此我们决定在冷结点上跑3个ES实例,每个分配31GB heap空间,从而可以在一台物理服务器上存储30多TB的索引数据并保持open状态,供用户随时搜索。 实际使用下来,由于冷数据搜索频率不高,也没有写入,即时只剩余35GB内存给os做文件系统缓存,查询性能还是可以满足需求的。
  4. 不同数据量级的shard最好隔离到不同组别的结点。 大家知道ES会自己平衡shard在集群的分布,这个自动平衡的逻辑主要考量三个因素。其一同一索引下的shard尽量分散到不同的结点;其二每个结点上的shard数量尽量接近;其三结点的磁盘有足够的剩余空间。这个策略只能保证shard数量分布均匀,而并不能保证数据大小分布均匀。 实际应用中,我们有200多种索引,数据量级差别很大,大的一天几个TB,小的一个月才几个GB,并且每种类型的数据保留时长又千差万别。抛出的问题,就是如何能比较平衡并充分的利用所有节点的资源。 针对这个问题,我们还是通过对结点添加属性标签来做分组,结合index routing控制的方式来做一些精细化的控制。尽量让不同量级的数据使用不同组别的结点,使得每个组内结点上的数据量比较容易自动平衡。
  5. 定期做索引的force merge,并且最好是每个shard merge成一个segment。前面提到过,heap消耗与segment数量也有关系,force merge可以显著降低这种消耗。 如果merge成一个segment还有一个好处,就是对于terms aggregation,搜索时无需构造Global Ordinals,可以提升聚合速度。
四. 版本选择 我们在2.4版本上稳定跑了很长时间,比较保守的同学可以上2.4,激进有精力折腾的可以考虑最新的5.0。 我们集群两周前从v2.4.0升级到了v5.0.0这个版本,除了升级第一周遇到一个不稳定的问题以外,感觉新版本带来的以下特性还是非常值得去升级的:
  • 结点启动的Bootstrap过程加入了很多关键系统参数设置的核验,比如Max File Descriptors, Memory Lock, Virtual Memory设置等等,如果设置不正确会拒绝启动并抛出异常。 与其带着错误的系统参数启动,并在日后造成性能问题,不如启动失败告知用户问题,是个很好的设计!
  • 索引性能提升。升级后在同样索引速率下,我们看到cpu消耗下降非常明显,除了对索引速率提升有帮助,也会一定程度提升搜索速率。
  • 新的数值型数据结构,存储空间更小,Range和地理位置计算更快速
  • Instant Aggregation对于类似now-7d to now这样的范围查询聚合能够做cache了,实际使用下来,效果明显,用户在Kibana上跑个过去一周数据的聚合,头2次刷新慢点,之后有cache了几乎就瞬间刷出!
  • 更多的保护措施保证集群的稳定,比如对一次搜索hit的shard数量做了限制,增强了circuit breaker的特性,更好的防护集群资源被坏查询耗尽。
升级第一周,我们的冷数据结点出现间歇性不响应问题,从而刨出3个issue提交给官方: Issue#21595 Issue#21612 Issue#21611 第一个问题确认为Bug,将在5.0.2修复,其他两个目前还不清楚根源,看起来也只在我们的应用场景里遇到了。所幸问题都找到了了规避措施,实施这些措施以后,最近一周我们的集群重新回到以前2.4版本时期的稳定状态。 五. 监控 不差钱没空折腾的建议还是买官方的xpack省心,有精力折腾的,利用ES各种丰富的stats api,用自己熟悉的监控工具采集数据,可视化出来就好了。 那么多监控指标,最最关键的还是以下几类:
  1. 各类Thread pool的使用情况,active/queue/reject可视化出来。 判断集群是否有性能瓶颈了,看看业务高峰期各类queue是不是很高,reject是不是经常发生,基本可以做到心里有数。
  2. JVM的heap used%以及old GC的频率,如果old GC频率很高,并且多次GC过后heap used%几乎下不来,说明heap压力太大,要考虑扩容了。(也有可能是有问题的查询或者聚合造成的,需要结合用户访问记录来判断)。
  3. Segment memory大小和Segment的数量。节点上存放的索引较多的时候,这两个指标就值得关注,要知道segment memory是常驻heap不会被GC回收的,因此当heap压力太大的时候,可以结合这个指标判断是否是因为节点上存放的数据过多,需要扩容。Segement的数量也是比较关键的,如果小的segment非常多,比如有几千,即使segment memory本身不多,但是在搜索线程很多的情况下,依然会吃掉相当多的heap,原因是lucene为每个segment会在thread local里记录状态信息,这块的heap内存开销和(segment数量* thread数量)相关。
  4. 很有必要记录用户的访问记录。我们只开放了http api给用户,前置了一个nginx做http代理,将用户第三方api的访问记录通过access log全部记录下来。通过分析访问记录,可以在集群出现性能问题时,快速找到问题根源,对于问题排查和性能优化都很有帮助。
最后就是多上手实践,遇到问题多查官方资料,多Google看是否有其他人遇到同类问题,精力充足有编程背景的同学也可以多刨刨源码。

Elastic Advent Calendar 活动启动咯!

Adventmedcl 发表了文章 • 11 个评论 • 3295 次浏览 • 2016-11-04 13:46 • 来自相关话题

时间一转又到了年末,去年的 Advent 在三斗的发起下,进行的很不错,今年的 Advent 活动继续办下去吧,借鉴日本(http://qiita.com/advent-calendar/2016/elastic)的做法,我们今年可以先报名占坑,预定一个日子和你打算写的文章的标题,尽量错开时间。 今年的Advent文章也会同步发布到社区公众号。 去年 Advent 活动回顾 http://elasticsearch.cn/topic/advent   由于本站没有日历的功能,大家留言评论报名预定就好了。   格式(仅12月):日期,标题 如:12月x日 , xxx 小技巧一则   已发布: 《大规模Elasticsearch集群管理心得》 《Kibana 系漫游指南》  《创建一个你自己的 Beat》 《将sql转换为es的DSL》 《Elasticsearch 2.x mapping tips》 《无外网环境10分钟快速集成 elasticsearch-head》 《Elasticsearch 5 入坑指南》 《可定制的 elasticsearch 数据导入工具 ——mysql_2_elasticsearch》 《记一次es性能调优》 《PacketBeat奇妙的OOM小记》 《ES5.0.0 安装记录》

Day24: Elasticsearch添加Shield后TransportClient如何连接?

Adventmedcl 发表了文章 • 6 个评论 • 4253 次浏览 • 2015-12-28 12:13 • 来自相关话题

Shield是Elasticsearch一个安全防护插件,提供了权限访问控制和日志审计功能,企业可以很方便的和LDAP或是ActiveDirectory进行集成,重用现有的安全认证体系.
shield-triad.png
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接. 关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin. 1.HTTP方式 现在直接访问es的http接口就会报错 curl http://localhost:9200 {"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401} shield支持HttpBasic验证,所以正确的访问姿势是: curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" } 如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问. 注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用. 2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下: 2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下: <repositories>  <repository>  <id>elasticsearch-releases</id>  <url>https://maven.elasticsearch.or ... gt%3B  <releases> <enabled>true</enabled> </releases>  <snapshots> <enabled>false</enabled> </snapshots>  </repository>  </repositories> 2.2 添加依赖的配置 <dependency>  <groupId>org.elasticsearch.plugin</groupId> <artifactId>shield</artifactId> <version>2.1.1</version> </dependency 2.3 构建TransportClient的地方增加访问用户的配置 import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue; String clusterName="elasticsearch"; String ip= "127.0.0.1";  Settings settings = Settings.settingsBuilder()    .put("cluster.name", clusterName)  .put("shield.user", "tribe_user:tribe_user")  .build();  try { client = TransportClient.builder()  .addPlugin(ShieldPlugin.class)  .settings(settings).build()  .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));  String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray()));   client.prepareSearch() .putHeader("Authorization", token).get();   }  catch (UnknownHostException e)  { logger.error("es",e); }   现在的编辑器贴代码有点恶心,可以看这里: http://log.medcl.net/item/2015 ... -1252

Day 23 谈谈ES 的Recovery

Adventkennywu76 发表了文章 • 9 个评论 • 7220 次浏览 • 2015-12-25 16:45 • 来自相关话题

Note: 本文针对ES2.x  Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
  • 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
  • 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
  • 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
cat_health.png
可能的状态及含义:

Green: 所有的shard主副片都完好的 Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。 Red: 某些shard的主副片都没有了,对应的索引数据不完整

Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。 减少集群Full Restart造成的数据来回拷贝 集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。

gateway.expected_nodes gateway.expected_master_nodes gateway.expected_data_nodes

以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。 在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:

gateway.recover_after_nodes gateway.recover_after_master_nodes gateway.recover_after_data_nodes

举例来说,对于一个有10个data node的集群,如果有以下的设置:

gateway.expected_data_nodes: 10 gateway.recover_after_time: 5m gateway.recover_after_data_nodes: 8

那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。 减少主副本之间的数据复制 如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
cluster_settings.png
然后在结点重启完成加入集群后,再重新打开:
put_cluster_settings.png
这样在结点重启完成后,尽量多的从本地直接恢复数据。 但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。 为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。 需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启结点
  5. 重新开启集群shard allocation 
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序
(特别大的)热索引为何恢复慢 对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。 从主片恢复数据到副片需要经历3个阶段:
  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。 万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery 其他Recovery相关的专家级设置 还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。   最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。

Day22:pipeline aggregation计算日留存率示例

Advent三斗室 发表了文章 • 1 个评论 • 6693 次浏览 • 2015-12-25 11:06 • 来自相关话题

网友们多次讨论如何利用 ES 计算用户留存率的问题。这是个比较尴尬的情况,如果多次请求再自己做一下运算,问题很简单。但如果想要一次请求得到最终结果,在没有完整 JOIN 支持的 ES 里又显得比较难以完成。 目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
  "settings" : {
    "number_of_shards" : 1
  },
  "mappings" : {
    "logs" : {
      "properties" : {
        "uid" : { "type" : "string", "index" : "not_analyzed" },
        "register_time" : { "type" : "date", "index" : "not_analyzed" },
        "login_time" : { "type" : "date", "index" : "not_analyzed" }
      }
    }
  }
}'
那么实际记录的日志会类似这样:
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。 显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
  "size" : 0,
  "aggs" : {
    "new_users" : {

      "filters" : {
        "filters" : [
          {
            "range" : {
              "register_time" : {
                "gte" : "2015-12-23",
                "lt" : "2015-12-24"
              }
            }
          }
        ]
      },
      "aggs" : {
        "register_count" : {
          "cardinality" : {
            "field" : "uid"
          }
        },
        "today" : {
          "filter" : {
            "range" : {
              "login_time" : {
                "gte" : "2015-12-24",
                "lt" : "2015-12-25"
              }
            }
          },
          "aggs" : {
            "login_count" : {
              "cardinality" : {
                "field" : "uid"
              }
            }
          }
        },
        "retention" : {
          "bucket_script" : {
            "buckets_path" : {
              "today_count" : "today>login_count",
              "yesterday_count" : "register_count"
            },
            "script" : {
              "lang" : "expression",
              "inline" : "today_count / yesterday_count"
            }
          }
        }
      }
    }
  }
}'
这个 pipeline aggregation 在使用上有几个要点:
  1. pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
  2. bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "new_users" : {
      "buckets" : [ {
        "doc_count" : 3,
        "today" : {
          "doc_count" : 1,
          "login_count" : {
            "value" : 1
          }
        },
        "register_count" : {
          "value" : 2
        },
        "retention" : {
          "value" : 0.5
        }
      } ]
    }
  }
}
这个 retention 数据,就是我们要求解的 0.5 了。  

Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV

Advent三斗室 发表了文章 • 6 个评论 • 8164 次浏览 • 2015-12-24 16:33 • 来自相关话题

我们都知道Kibana4里,所有的aggregation生成的visualize都可以在请求细节查看里选择`Export`成raw或者formatted。其中formatted就是CSV文件。 但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。 可惜Kibana4没有针对search document table的导出! 国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成…… 点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated   然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:     然后点击这个『CSV』按钮,会弹出一片提示: 可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。 我们当然选择本机…… 然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。 实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。   注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!

Day20 利用tcpdump和kafka协议定位不合法topic的来源

Adventchilde 发表了文章 • 0 个评论 • 3467 次浏览 • 2015-12-23 23:26 • 来自相关话题

事情是这样滴,  我们在很多linux机器上部署了logstash采集日志, topic_id用的是 test-%{type}, 但非常不幸的是,  有些机器的某些日志, 没有带上type字段.    因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误  
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
        at kafka.common.Topic$.validate(Topic.scala:42)
        at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
        at scala.collection.SetLike$class.map(SetLike.scala:93)
        at scala.collection.AbstractSet.map(Set.scala:47)
        at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
        at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:744)
把可用的信息瞬间淹没.     更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.   我想做的, 就是把有问题的logstash机器找出来.   我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'
dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.   先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.   重点来了!  向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest   看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求.  topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37   tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.   咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37  最后呢, 再提2点结束.   1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'
2.  不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了.

Day19 ES内存那点事

Adventkennywu76 发表了文章 • 51 个评论 • 22529 次浏览 • 2015-12-22 18:51 • 来自相关话题

【携程旅行网  吴晓刚】   注: 本文主要针对ES 2.x。  “该给ES分配多少内存?”  “JVM参数如何优化?“ “为何我的Heap占用这么高?” “为何经常有某个field的数据量超出内存限制的异常?“ “为何感觉上没多少数据,也会经常Out Of Memory?” 以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。 要理解ES如何使用内存,先要理解下面两个基本事实: 1.  ES是JAVA应用 2.  底层存储引擎是基于Lucene的 看似很普通是吗?但其实没多少人真正理解这意味着什么。  首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。 其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。  基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。 Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。 那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读: 1.  segment memory 2.  filter cache 3.  field data cache 4.  bulk queue 5.  indexing buffer 6.  state buffer 7.  超大搜索聚合结果集的fetch 8. 对高cardinality字段做terms aggregation Segment Memory Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。 下面是词典索引和词典主存储之间的一个对应关系图:
lucene_index.png
Lucene  file的完整数据结构参见Apache Lucene - Index File Formats 说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。 怎么知道segment memory占用情况呢?  CAT API可以给出答案。 1.  查看一个索引所有segment的memory占用情况:
seg_mem.png
2.  查看一个node上所有segment占用的memory总和:
seg_mem_node.png
那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法: 1.  删除不用的索引 2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。 3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。 Filter Cache (5.x里叫做Request cache) Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。 Field Data cache 在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。 Bulk Queue 一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。 Indexing Buffer Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。 Cluster State Buffer ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。 超大搜索聚合结果集的fetch ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。   对高cardinality字段做terms aggregation 所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。  在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。 小结:
  1. 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2. 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3. 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
  4. cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5. 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
  6. 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。

Day18: 程序内的消息流:ArrayBlockingQueue和zeromq对比

Adventchilde 发表了文章 • 1 个评论 • 2202 次浏览 • 2015-12-20 22:51 • 来自相关话题

在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转. 在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走. 但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块. zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看. 在我自己的电脑上测试,2.6 GHz Intel Core i5.  一个主线程生成10,000,000个随机数, 分发给四个线程消费. 用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右. 不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.

Day 14: Elasticsearch 5 入坑指南

Adventkennywu76 发表了文章 • 33 个评论 • 17918 次浏览 • 2016-12-15 13:16 • 来自相关话题

尝鲜 10月26日,Elasticsearch5.0.0 GA终于放出,携程ES Ops团队也在第一时间在DEV和UAT环境分别进行了2.4.0 至5.0.0的升级和测试。升级完成后,除了部分Query不向前兼容(主要是Filtered Query),需要在应用端做一些修改以外,未发现其他问题。通过监控系统看对比升级前后的主要系统指标,在同等索引量的情况下,CPU使用率有明显下降 ( 30% - 50%左右) ,相信性能方面5.0应该是有较大提升的。  在测试环境稳定运行了2周以后,我们决定选定一个生产集群进行升级,考验新版本在更为复杂的用户环境下的表现。 出于对业务影响最小化的考虑,用于日志分析的集群被圈定为升级目标。该集群也是携程十几个集群中规模最大的一个,共有120个数据结点运行于70台物理机上,总数据量接近1PB。 升级前需要做一些准备工作,下载官方的Migration Helper插件,检查集群设置和索引的兼容性。对于不兼容的配置项,MH会详尽列出,其中标注为红色部分为为升级前必须修改项。1.x版本创建的索引,是无法直接升级到5的,需要先在2.x集群里做一次reindex 。 MH提供了不兼容索引扫描功能,对于找到的不兼容索引,可以直接在UI上发起reindex操作,等待结束即可。 如果是用于业务搜索集群,数据可能比较重要,建议升级前做一个Snapshot,万一升级过程出现意外,可以回退版本从备份里快速恢复数据。我们的日志集群数据量极大,也没有对数据100%不丢的要求,因此升级前没有做Snapshot。 做完所有的准备工作后,预先通知所有用户集群升级的时间以及可能产生的影响,选定了周五深夜用户低峰期,开始正式升级工作。  首先通过Ansible将新版本批量部署到所有结点并统一配置,紧接着对原有集群做了Full Stop,校验所有的ES已经停下后,开始Full Start。整个过程比较顺利,所有结点正常启动,数据恢复完成后,集群重新回到正常服务状态。 周末两天运行,未发现有任何的异样,CPU利用率也降了不少,看起来很靠谱……直到周一 踏坑 周一早上,随着用户访问量高峰来临,马上浮现出一个诡异的现象: 索引速率遇到了瓶颈,数据开始在前置的消息队列(Kafka)里堆积。 从监控数据看,尽管所有的数据结点CPU消耗都比上周同期低,磁盘IO也很低,但索引速率却低了很多。反复对比查看升级前后各类监控指标后,终于发现一个可疑点,所有结点的网络流量比升级前高了好几倍!  在集群架构上,我们是单独架设了几台client node做为数据写入和分发的入口,现在这几个node的网络流量已经饱和,成为数据写入的瓶颈。一开始,怀疑是否2.4启用了tcp压缩,而5.0取消了,但翻查官方文档后发现transport.tcp.compress在2.4和5.0里默认都是关闭的! 这时候只有两个解决办法了,要么启用tcp压缩,要么扩容client node。 先考虑了一下tcp压缩的方案,快速扒了一下ES源码,在transport.TcpTransport这个类里,sendRequest和sendResponse两个方法会根据transport.tcp.compress设置来决定发送的消息是否要经过压缩,而在messageReceived方法则会读取消息头部的状态信息,探测消息是否经过压缩以及压缩的方法,而后决定是否需要解压,以及采用的解压方式。 这样看起来,ES是允许tcp压缩和不压缩的结点之间通讯的,那么只对client node启用压缩应该就可以了。测试环境测试过后,验证了想法的可行性。于是对生产的client node开启tcp压缩,同时在数据发送端(hangout的ES output)也启用tcp压缩,重启client node后入口网络流量降到和之前2.4差不多的程度,问题得到规避。 针对这个问题在Github上提交了issue https://github.com/elastic/ela ... 21612, 但未得到官方合理的解释。 解决好这个问题,另外一个问题来了,很多执行大量历史数据搜索的用户反映出不了结果。 从监控数据看,这类查询的搜索耗时非常久,直到网关300秒超时(查询api前置的nginx代理)。我们之前对集群设置过Global Search timeout为60s,用来保护集群资源过多被超高代价的查询消耗,在2.4版本是有效果的,现在看来不起作用了。手动测试了一下,这个参数果然失效! 于是向官方报告了第2个问题:https://github.com/elastic/ela ... 21595 。 这个问题很快被官方确认为Bug,修复也很快加入到了5.0.2。 为了规避这个问题,我们只好临时修改了一下Kibana以及第三方API访问要经过的nginx proxy,默认为所有的search request加入一个超时选项。此后,问题有一些缓解,但仍然发现用户查询大范围历史数据时,部分用于存储历史数据的结点响应很慢。 我们的集群是做了冷热分离的结构的,热节点主要承担写入和存放过去24小时数据,冷结点没有写入,查询频率也低,所以为了最大化利用硬件资源,一台物理机上跑了3个实例,这样一台128GB内存的机器可以存放下近30TB的索引。查看冷结点的监控数据,看到用户查询期间磁盘的read IO非常高,直接将磁盘IO Util%撑到100%,并且可持续数小时,同时search thread pool有大量的active thread处于无法完成状态,search queue不断攀升直至饱和、开始reject。 表象上看search thread似乎一直在尝试从磁盘大量读取数据,一次search甚至可以持续几十分钟至一个小时,耗尽了所有的搜索线程,导致拒绝后续的搜索服务。 于是Github上报了第3个issue: https://github.com/elastic/ela ... 21611  这个问题找到解决办法之前,我们只能通过反复重启有问题的冷结点来缓解。 和官方讨论过程中,得知5.0在Lucene文件访问方式上有一个比较大的改动,2.4使用mmapfs读取索引文件的部分,而5.0以后改为用mmapfs读取索引文件的全部。怀疑问题和这个变动有关,尝试将所有索引文件的设置改为NIOFS后,问题迎刃而解。 搜索性能一下回到了2.4时代,再也没出现搜索线程超长时间执行的问题。之后找时间复现了这个问题,并抓取了线程栈,看到长时间执行的搜索线程一直在做Global Ordinal的构造工作。 至于为何会这样,还不清楚。 从官方给出的信息看,底层索引文件的访问模式是没有变化的,仅仅是将文件读取方式全部改成了mmapfs,理论上应该性能更好,但是看起来在我们这种一台机器跑多个ES实例,所有分配的heap为系统缓存3倍的极端用例下,大范围的数据搜索可能造成过高的磁盘读IO,集群性能指数级下降。 以上问题前后耗了4天才完全规避掉,支持团队连续熬夜后集群总算回复到平稳状态。然而好景不长,运行一段时间以后,数据结点出现疑似内存泄漏现象。结点总数据没怎么增加、甚至还有减少的情况下,heap使用率一只呈攀升趋势,Old GC无法回收内存。这个问题对用户影响较小,通过监控我们可以及时发现内存即将用尽的结点,做一次重启很快就恢复了。 为排查根源,我们对一个有问题的结点做了dump,通过MAT工具分析,看到meta data相关的一个alias对象被实例化了有6600万次之多! 在Github上提交了第四个issue: https://github.com/elastic/ela ... 22013,不多久被确认为已知问题https://github.com/elastic/ela ... 21284 ,在5.0.1已经修复。 最后还存在一个master node内存泄漏的问题,这个问题在2.4.0时代就存在了,升级到5.0.0以后依然没有修复。由于我们的master node和data node是分离的,所以这个问题比较容易通过监控发现,解决方式也很简单和迅速,重启master node即可,对用户完全无影响。之后不久,5.0.2版本正式发布,release notes里提到了对这个问题的修复 https://github.com/elastic/ela ... 21578 。 上周周末我们将集群rolling upgrade到了5.0.2,global search timeout失效和两个内存泄漏的问题从根源上解决掉了。 网络流量增大的问题依然存在,仍然需要通过启用client结点的transport.tcp.compress规避。 冷结点搜索性能的问题没看到有提及,估计没解决,安全起见,还是保持索引的文件系统为NIOFS。升级完成运行一段时间后,可以肯定,5.0.2已经比较稳定。 心得 升到5.0.2后,对于其中一组数据结点这两天特意加了点索引负载,通过监控数据将v5.0.2与2.4.0做实际运行环境的索引吞吐量对比。
2.4_.png
5.0_.png
  在近似的CPU使用率和load情况下,5.0.2能够支撑更大的吞吐量。另外5.0带来的Instant aggregation功能,对于跨多个索引的时序类型数据的聚合也可以有效Cache了,在使用Kibana的时候提速感觉非常明显。 升级过程虽然遇到很多波折,但由于集群架构上做了角色分离(client,master,data)和冷热分离,因而Bug引起的故障比较容易被限定在一个较小的范围而不至于影响所有的功能和所有的用户。 故障点定位更加容易,规避措施也更容易实施。 部分规避措施实施过程中甚至对用户是完全无影响的,比如: 重启内存泄漏的master node)。详尽的监控为问题的发现和诊断提供了有力的支持。 Elasticsearch是非常复杂的系统,官方的测试无法覆盖所有的用例场景和数据规模,一些极端的应用场景可能触发某个深藏的Bug或者缺陷而陷入困境。 因此对于稳定性要求极高的应用,最好还是采用经过长时间考验的版本,比如v2.4.2。

Day1: 大规模Elasticsearch集群管理心得

Adventkennywu76 发表了文章 • 63 个评论 • 27649 次浏览 • 2016-12-02 10:07 • 来自相关话题

【携程旅行网 吴晓刚】  ElasticSearch目前在互联网公司主要用于两种应用场景,其一是用于构建业务的搜索功能模块且多是垂直领域的搜索,数据量级一般在千万至数十亿这个级别;其二用于大规模数据的实时OLAP,经典的如ELKStack,数据规模可能达到千亿或更多。 这两种场景的数据索引和应用访问模式上差异较大,在硬件选型和集群优化方面侧重点也会有所不同。一般来说后一种场景属于大数据范畴,数据量级和集群规模更大,在管理方面也更有挑战。 应Medcl大大的邀请,为ES中文社区做今年的Advent开篇,分享一下我在管理自家公司用于日志分析的ES集群方面的一点心得,蜻蜓点水,泛泛而谈,希望大方向上能对大家提供一些帮助。 这里的自家,即是携程旅行网。从2013年开始接触ES,我们团队先后实践过0.9.x -> 5.0.0中间各个版本,从最初只用于运维内部IIS日志的分析,到如今支持IT、呼叫中心、安全、测试、业务研发等多个部门超过200种日志型数据的实时检索与分析。 一路走来,愉悦了大家,也死磕了自己。 目前我们最大的日志单集群有120个data node,运行于70台物理服务器上。数据规模如下:
  • 单日索引数据条数600亿,新增索引文件25TB (含一个复制片则为50TB)
  • 业务高峰期峰值索引速率维持在百万条/秒
  • 历史数据保留时长根据业务需求制定,从10天 - 90天不等
  • 集群共3441个索引、17000个分片、数据总量约9300亿, 磁盘总消耗1PB
  • Kibana用户600多人, 每日来自Kibana和第三方的API调用共63万次
  • 查询响应时间百分位 75%:0.160s  90%:1.640s 95%:6.691s 99%:14.0039s
运维这样大规模的ES集群,有哪些值得注意的地方? 一. 必不可少的工具 工欲善其事必先利其器,从一开始,哪怕就只有几个node,就应该使用分布式配置管理工具来做集群的部署。随着应用的成熟,集群规模的逐步扩大,效率的提升会凸显。 官方提供了ES Puppet Module和Chef Cookbook,熟悉这两个工具的同学可以直接拿过来用。 我们自己则是采用的Ansible,编写了一套Playbook来达到类似的效果。 用熟这类工具,对于集群的初始部署,配置批量更改,集群版本升级,重启故障结点都会快捷和安全许多。 第二个必备利器就是sense插件。通过这个插件直接调用集群的restful API,在做集群和索引的状态查看,索引配置更改的时候非常方便。语法提示和自动补全功能更是实用,减少了翻看文档的频率。在Kibana5里面,sense已经成为一个内置的控制台,无需额外安装。 二. 硬件配置 我们采用的是32vcoreCPU + 128GB RAM的服务器,磁盘配置大部分服务器是12块4TB SATA机械磁盘做的Raid0,少部分机器是刚上了不久的6块800GB SSD raid0,主要目的是想做冷热数据分离,后面谈到集群架构的时候,再进一步解释一下如何利用硬件资源。 三. 集群的管理
  1. 首先很有必要对ES的结点做角色划分和隔离。大家知道ES的data node除了放数据以外,也可以兼任master和client的角色,多数同学会将这些角色混入到data node。然而对于一个规模较大,用户较多的集群,master和client在一些极端使用情况下可能会有性能瓶颈甚至内存溢出,从而使得共存的data node故障。data node的故障恢复涉及到数据的迁移,对集群资源有一定消耗,容易造成数据写入延迟或者查询减慢。如果将master和client独立出来,一旦出现问题,重启后几乎是瞬间就恢复的,对用户几乎没有任何影响。另外将这些角色独立出来的以后,也将对应的计算资源消耗从data node剥离出来,更容易掌握data node资源消耗与写入量和查询量之间的联系,便于做容量管理和规划。
  2. 避免过高的并发,包括控制shard数量和threadpool的数量。在写入量和查询性能能够满足的前提下,为索引分配尽量少的分片。分片过多会带来诸多负面影响,例如:每次查询后需要汇总排序的数据更多;过多的并发带来的线程切换造成过多的CPU损耗;索引的删除和配置更新更慢Issue#18776; 过多的shard也带来更多小的segment,而过多的小segment会带来非常显著的heap内存消耗,特别是如果查询线程配置得很多的情况下。 配置过大的threadpool更是会产生很多诡异的性能问题Issue#18161里所描述的问题就是我们所经历过的。 默认的Theadpool大小一般来说工作得很不错了。
  3. 冷热数据最好做分离。对于日志型应用来说,一般是每天建立一个新索引,当天的热索引在写入的同时也会有较多的查询。如果上面还存有比较长时间之前的冷数据,那么当用户做大跨度的历史数据查询的时候,过多的磁盘IO和CPU消耗很容易拖慢写入,造成数据的延迟。所以我们用了一部分机器来做冷数据的存储,利用ES可以给结点配置自定义属性的功能,为冷结点加上"boxtype":"weak"的标识,每晚通过维护脚本更新冷数据的索引路由设置index.routing.allocation.{require|include|exclude},让数据自动向冷结点迁移。 冷数据的特性是不再写入,用户查的频率较低,但量级可能很大。比如我们有个索引每天2TB,并且用户要求保持过去90天数据随时可查。保持这么大量的索引为open状态,并非只消耗磁盘空间。ES为了快速访问磁盘上的索引文件,需要在内存里驻留一些数据(索引文件的索引),也就是所谓的segment memory。稍微熟悉ES的同学知道,JVM heap分配不能超过32GB,对于我们128GB RAM, 48TB磁盘空间的机器而言,如果只跑一个ES实例,只能利用到32GB不到的heap,当heap快用饱和的时候,磁盘上保存的索引文件还不到10TB,这样显然是不经济的。 因此我们决定在冷结点上跑3个ES实例,每个分配31GB heap空间,从而可以在一台物理服务器上存储30多TB的索引数据并保持open状态,供用户随时搜索。 实际使用下来,由于冷数据搜索频率不高,也没有写入,即时只剩余35GB内存给os做文件系统缓存,查询性能还是可以满足需求的。
  4. 不同数据量级的shard最好隔离到不同组别的结点。 大家知道ES会自己平衡shard在集群的分布,这个自动平衡的逻辑主要考量三个因素。其一同一索引下的shard尽量分散到不同的结点;其二每个结点上的shard数量尽量接近;其三结点的磁盘有足够的剩余空间。这个策略只能保证shard数量分布均匀,而并不能保证数据大小分布均匀。 实际应用中,我们有200多种索引,数据量级差别很大,大的一天几个TB,小的一个月才几个GB,并且每种类型的数据保留时长又千差万别。抛出的问题,就是如何能比较平衡并充分的利用所有节点的资源。 针对这个问题,我们还是通过对结点添加属性标签来做分组,结合index routing控制的方式来做一些精细化的控制。尽量让不同量级的数据使用不同组别的结点,使得每个组内结点上的数据量比较容易自动平衡。
  5. 定期做索引的force merge,并且最好是每个shard merge成一个segment。前面提到过,heap消耗与segment数量也有关系,force merge可以显著降低这种消耗。 如果merge成一个segment还有一个好处,就是对于terms aggregation,搜索时无需构造Global Ordinals,可以提升聚合速度。
四. 版本选择 我们在2.4版本上稳定跑了很长时间,比较保守的同学可以上2.4,激进有精力折腾的可以考虑最新的5.0。 我们集群两周前从v2.4.0升级到了v5.0.0这个版本,除了升级第一周遇到一个不稳定的问题以外,感觉新版本带来的以下特性还是非常值得去升级的:
  • 结点启动的Bootstrap过程加入了很多关键系统参数设置的核验,比如Max File Descriptors, Memory Lock, Virtual Memory设置等等,如果设置不正确会拒绝启动并抛出异常。 与其带着错误的系统参数启动,并在日后造成性能问题,不如启动失败告知用户问题,是个很好的设计!
  • 索引性能提升。升级后在同样索引速率下,我们看到cpu消耗下降非常明显,除了对索引速率提升有帮助,也会一定程度提升搜索速率。
  • 新的数值型数据结构,存储空间更小,Range和地理位置计算更快速
  • Instant Aggregation对于类似now-7d to now这样的范围查询聚合能够做cache了,实际使用下来,效果明显,用户在Kibana上跑个过去一周数据的聚合,头2次刷新慢点,之后有cache了几乎就瞬间刷出!
  • 更多的保护措施保证集群的稳定,比如对一次搜索hit的shard数量做了限制,增强了circuit breaker的特性,更好的防护集群资源被坏查询耗尽。
升级第一周,我们的冷数据结点出现间歇性不响应问题,从而刨出3个issue提交给官方: Issue#21595 Issue#21612 Issue#21611 第一个问题确认为Bug,将在5.0.2修复,其他两个目前还不清楚根源,看起来也只在我们的应用场景里遇到了。所幸问题都找到了了规避措施,实施这些措施以后,最近一周我们的集群重新回到以前2.4版本时期的稳定状态。 五. 监控 不差钱没空折腾的建议还是买官方的xpack省心,有精力折腾的,利用ES各种丰富的stats api,用自己熟悉的监控工具采集数据,可视化出来就好了。 那么多监控指标,最最关键的还是以下几类:
  1. 各类Thread pool的使用情况,active/queue/reject可视化出来。 判断集群是否有性能瓶颈了,看看业务高峰期各类queue是不是很高,reject是不是经常发生,基本可以做到心里有数。
  2. JVM的heap used%以及old GC的频率,如果old GC频率很高,并且多次GC过后heap used%几乎下不来,说明heap压力太大,要考虑扩容了。(也有可能是有问题的查询或者聚合造成的,需要结合用户访问记录来判断)。
  3. Segment memory大小和Segment的数量。节点上存放的索引较多的时候,这两个指标就值得关注,要知道segment memory是常驻heap不会被GC回收的,因此当heap压力太大的时候,可以结合这个指标判断是否是因为节点上存放的数据过多,需要扩容。Segement的数量也是比较关键的,如果小的segment非常多,比如有几千,即使segment memory本身不多,但是在搜索线程很多的情况下,依然会吃掉相当多的heap,原因是lucene为每个segment会在thread local里记录状态信息,这块的heap内存开销和(segment数量* thread数量)相关。
  4. 很有必要记录用户的访问记录。我们只开放了http api给用户,前置了一个nginx做http代理,将用户第三方api的访问记录通过access log全部记录下来。通过分析访问记录,可以在集群出现性能问题时,快速找到问题根源,对于问题排查和性能优化都很有帮助。
最后就是多上手实践,遇到问题多查官方资料,多Google看是否有其他人遇到同类问题,精力充足有编程背景的同学也可以多刨刨源码。

Day 14: Elasticsearch 5 入坑指南

Adventkennywu76 发表了文章 • 33 个评论 • 17918 次浏览 • 2016-12-15 13:16 • 来自相关话题

尝鲜 10月26日,Elasticsearch5.0.0 GA终于放出,携程ES Ops团队也在第一时间在DEV和UAT环境分别进行了2.4.0 至5.0.0的升级和测试。升级完成后,除了部分Query不向前兼容(主要是Filtered Query),需要在应用端做一些修改以外,未发现其他问题。通过监控系统看对比升级前后的主要系统指标,在同等索引量的情况下,CPU使用率有明显下降 ( 30% - 50%左右) ,相信性能方面5.0应该是有较大提升的。  在测试环境稳定运行了2周以后,我们决定选定一个生产集群进行升级,考验新版本在更为复杂的用户环境下的表现。 出于对业务影响最小化的考虑,用于日志分析的集群被圈定为升级目标。该集群也是携程十几个集群中规模最大的一个,共有120个数据结点运行于70台物理机上,总数据量接近1PB。 升级前需要做一些准备工作,下载官方的Migration Helper插件,检查集群设置和索引的兼容性。对于不兼容的配置项,MH会详尽列出,其中标注为红色部分为为升级前必须修改项。1.x版本创建的索引,是无法直接升级到5的,需要先在2.x集群里做一次reindex 。 MH提供了不兼容索引扫描功能,对于找到的不兼容索引,可以直接在UI上发起reindex操作,等待结束即可。 如果是用于业务搜索集群,数据可能比较重要,建议升级前做一个Snapshot,万一升级过程出现意外,可以回退版本从备份里快速恢复数据。我们的日志集群数据量极大,也没有对数据100%不丢的要求,因此升级前没有做Snapshot。 做完所有的准备工作后,预先通知所有用户集群升级的时间以及可能产生的影响,选定了周五深夜用户低峰期,开始正式升级工作。  首先通过Ansible将新版本批量部署到所有结点并统一配置,紧接着对原有集群做了Full Stop,校验所有的ES已经停下后,开始Full Start。整个过程比较顺利,所有结点正常启动,数据恢复完成后,集群重新回到正常服务状态。 周末两天运行,未发现有任何的异样,CPU利用率也降了不少,看起来很靠谱……直到周一 踏坑 周一早上,随着用户访问量高峰来临,马上浮现出一个诡异的现象: 索引速率遇到了瓶颈,数据开始在前置的消息队列(Kafka)里堆积。 从监控数据看,尽管所有的数据结点CPU消耗都比上周同期低,磁盘IO也很低,但索引速率却低了很多。反复对比查看升级前后各类监控指标后,终于发现一个可疑点,所有结点的网络流量比升级前高了好几倍!  在集群架构上,我们是单独架设了几台client node做为数据写入和分发的入口,现在这几个node的网络流量已经饱和,成为数据写入的瓶颈。一开始,怀疑是否2.4启用了tcp压缩,而5.0取消了,但翻查官方文档后发现transport.tcp.compress在2.4和5.0里默认都是关闭的! 这时候只有两个解决办法了,要么启用tcp压缩,要么扩容client node。 先考虑了一下tcp压缩的方案,快速扒了一下ES源码,在transport.TcpTransport这个类里,sendRequest和sendResponse两个方法会根据transport.tcp.compress设置来决定发送的消息是否要经过压缩,而在messageReceived方法则会读取消息头部的状态信息,探测消息是否经过压缩以及压缩的方法,而后决定是否需要解压,以及采用的解压方式。 这样看起来,ES是允许tcp压缩和不压缩的结点之间通讯的,那么只对client node启用压缩应该就可以了。测试环境测试过后,验证了想法的可行性。于是对生产的client node开启tcp压缩,同时在数据发送端(hangout的ES output)也启用tcp压缩,重启client node后入口网络流量降到和之前2.4差不多的程度,问题得到规避。 针对这个问题在Github上提交了issue https://github.com/elastic/ela ... 21612, 但未得到官方合理的解释。 解决好这个问题,另外一个问题来了,很多执行大量历史数据搜索的用户反映出不了结果。 从监控数据看,这类查询的搜索耗时非常久,直到网关300秒超时(查询api前置的nginx代理)。我们之前对集群设置过Global Search timeout为60s,用来保护集群资源过多被超高代价的查询消耗,在2.4版本是有效果的,现在看来不起作用了。手动测试了一下,这个参数果然失效! 于是向官方报告了第2个问题:https://github.com/elastic/ela ... 21595 。 这个问题很快被官方确认为Bug,修复也很快加入到了5.0.2。 为了规避这个问题,我们只好临时修改了一下Kibana以及第三方API访问要经过的nginx proxy,默认为所有的search request加入一个超时选项。此后,问题有一些缓解,但仍然发现用户查询大范围历史数据时,部分用于存储历史数据的结点响应很慢。 我们的集群是做了冷热分离的结构的,热节点主要承担写入和存放过去24小时数据,冷结点没有写入,查询频率也低,所以为了最大化利用硬件资源,一台物理机上跑了3个实例,这样一台128GB内存的机器可以存放下近30TB的索引。查看冷结点的监控数据,看到用户查询期间磁盘的read IO非常高,直接将磁盘IO Util%撑到100%,并且可持续数小时,同时search thread pool有大量的active thread处于无法完成状态,search queue不断攀升直至饱和、开始reject。 表象上看search thread似乎一直在尝试从磁盘大量读取数据,一次search甚至可以持续几十分钟至一个小时,耗尽了所有的搜索线程,导致拒绝后续的搜索服务。 于是Github上报了第3个issue: https://github.com/elastic/ela ... 21611  这个问题找到解决办法之前,我们只能通过反复重启有问题的冷结点来缓解。 和官方讨论过程中,得知5.0在Lucene文件访问方式上有一个比较大的改动,2.4使用mmapfs读取索引文件的部分,而5.0以后改为用mmapfs读取索引文件的全部。怀疑问题和这个变动有关,尝试将所有索引文件的设置改为NIOFS后,问题迎刃而解。 搜索性能一下回到了2.4时代,再也没出现搜索线程超长时间执行的问题。之后找时间复现了这个问题,并抓取了线程栈,看到长时间执行的搜索线程一直在做Global Ordinal的构造工作。 至于为何会这样,还不清楚。 从官方给出的信息看,底层索引文件的访问模式是没有变化的,仅仅是将文件读取方式全部改成了mmapfs,理论上应该性能更好,但是看起来在我们这种一台机器跑多个ES实例,所有分配的heap为系统缓存3倍的极端用例下,大范围的数据搜索可能造成过高的磁盘读IO,集群性能指数级下降。 以上问题前后耗了4天才完全规避掉,支持团队连续熬夜后集群总算回复到平稳状态。然而好景不长,运行一段时间以后,数据结点出现疑似内存泄漏现象。结点总数据没怎么增加、甚至还有减少的情况下,heap使用率一只呈攀升趋势,Old GC无法回收内存。这个问题对用户影响较小,通过监控我们可以及时发现内存即将用尽的结点,做一次重启很快就恢复了。 为排查根源,我们对一个有问题的结点做了dump,通过MAT工具分析,看到meta data相关的一个alias对象被实例化了有6600万次之多! 在Github上提交了第四个issue: https://github.com/elastic/ela ... 22013,不多久被确认为已知问题https://github.com/elastic/ela ... 21284 ,在5.0.1已经修复。 最后还存在一个master node内存泄漏的问题,这个问题在2.4.0时代就存在了,升级到5.0.0以后依然没有修复。由于我们的master node和data node是分离的,所以这个问题比较容易通过监控发现,解决方式也很简单和迅速,重启master node即可,对用户完全无影响。之后不久,5.0.2版本正式发布,release notes里提到了对这个问题的修复 https://github.com/elastic/ela ... 21578 。 上周周末我们将集群rolling upgrade到了5.0.2,global search timeout失效和两个内存泄漏的问题从根源上解决掉了。 网络流量增大的问题依然存在,仍然需要通过启用client结点的transport.tcp.compress规避。 冷结点搜索性能的问题没看到有提及,估计没解决,安全起见,还是保持索引的文件系统为NIOFS。升级完成运行一段时间后,可以肯定,5.0.2已经比较稳定。 心得 升到5.0.2后,对于其中一组数据结点这两天特意加了点索引负载,通过监控数据将v5.0.2与2.4.0做实际运行环境的索引吞吐量对比。
2.4_.png
5.0_.png
  在近似的CPU使用率和load情况下,5.0.2能够支撑更大的吞吐量。另外5.0带来的Instant aggregation功能,对于跨多个索引的时序类型数据的聚合也可以有效Cache了,在使用Kibana的时候提速感觉非常明显。 升级过程虽然遇到很多波折,但由于集群架构上做了角色分离(client,master,data)和冷热分离,因而Bug引起的故障比较容易被限定在一个较小的范围而不至于影响所有的功能和所有的用户。 故障点定位更加容易,规避措施也更容易实施。 部分规避措施实施过程中甚至对用户是完全无影响的,比如: 重启内存泄漏的master node)。详尽的监控为问题的发现和诊断提供了有力的支持。 Elasticsearch是非常复杂的系统,官方的测试无法覆盖所有的用例场景和数据规模,一些极端的应用场景可能触发某个深藏的Bug或者缺陷而陷入困境。 因此对于稳定性要求极高的应用,最好还是采用经过长时间考验的版本,比如v2.4.2。

Day6:《记一次es性能调优》

Elasticsearchxiaorui 发表了文章 • 4 个评论 • 5048 次浏览 • 2016-12-13 11:49 • 来自相关话题

一.前言 应medcl写es文章的时候,其实这段时间es研究的不多,感觉没什么新东西可写。 考虑只有这次调优心得可与大家分享,文笔有限,见谅! 二.背景 先交代一下背景,调优的项目是某电商类搜索项目,流量来自于前端的app和h5。 搜索主要是根据用户的地理位置和关键字等条件搜索附近的商家和商品。 商品数据大概在5000w左右,商品更新很频繁,更新量大概是每天2000w条左右,(因商家经常会促销、或者调上下架状态、改价格等)查询也相当频繁。 集群有2个集群,一个主一个备,用于有问题的时候随时切换。主集群有8个节点,配置是32核, 32g内存的docker的机器。给es jvm分配20g内存,jdk 版本是1.7,gc 是使用parnew/cms gc。 这个项目我是后期加入的,来的时候项目已上线。由于参与进来的时候es跑的也还是比较稳定,所以也一直 没调过es的参数。程序,参数基本上也就保持上线的时候那个样子。 es上线的时候是用的1.5版本,后期没升过级。 三.问题 项目大概跑了一年多,时间来到大概16年的9月份。搜索请求响应时间开始出现几秒才完成的情况, 我就被拉过来调优了。通过我们自己内部的调用方法监控,tp99和avg这些值还好,维持在200ms以下。max 最大有5,6s的情况,而且次数有点多。 这里没怎么折腾,很快定位就是es gc导致的。翻了一下es gc日志,就是cms remark这个阶段时间特别长, 而且 这个阶段是stop the world的。 四.解决 为什么remark阶段这么长时间? 直接上结论,就是一次cms 周期内,并发标记后到remark这个期间jvm 堆内存对象 变化很大。说白了对应我们的场景就是一大波 es bulk操作。对应Bigdesk观察,几秒的卡顿基本都出现在一大波 es bulk操作之后。 这里解释一下,引用网上文章的说法: remark如果耗时较长,通常原因是在cms gc已经结束了concurrent-mark步骤后,旧生代的引用关系仍然发生了很多的变化,旧生代的引用关系发生变化的原因主要是: * 在这个间隔时间段内,新生代晋升到旧生代的对象比较多; * 在这个间隔时间段内,新生代没怎么发生ygc,但创建出来的对象又比较多,这种通常就只能是新生代比较大的原因; 原文地址: http://hellojava.info/?tag=cms-gc-remark 调整一: 加cms gc 的 线程 直接从根源入手,你remark 慢,我就让你跑快点。 因为我们是32 核的cpu ,cpu 利用率用bigdesk观察还是很低的,5%左右。这么低,那就加点线程呗。 -XX:ParallelGCThreads= N -XX:ParallelCMSThreads= M 调整这2个参数都可以,它们的关系:ParallelCMSThreads = (ParallelGCThreads + 3)/4) 调整后情况缓解了一些,remark还是有3,4秒的情况。 调整二: 关于这点是我们自己的问题。一次bulk 操作就写1条数据。 是的,你没有看错,我们这边的工程师就是这么干的。 一年以前提过这里最好是能合并写,但现在还是这个样子。 这里有一些业务上的原因,合并写会导致一些字段值不准确。 合并写暂时没办法,只能调整es 了。(这里说明一下,其实合并写应该是本次优化比较有效果的办法,可惜这招不让我用。) 通过bigdesk观察,bulk线程池有reject的情况。 但就增加bulk线程池的消费线程,加快数据的消费速度,减少对象驻留在jvm 的时间。 调整后情况没有明显的好转, 但肯定有用,能优化一点是一点嘛。 调整三: 再次从gc入手, -XX:+CMSParallelRemarkEnabled -XX:+CMSScavengeBeforeRemark 这个是网上找的办法: 为了减少第二次暂停的时间,开启并行remark: -XX:+CMSParallelRemarkEnabled。 如果remark还是过长的话,可以开启-XX:+CMSScavengeBeforeRemark选项,强制 remark之前开始一次minor gc,减少remark的暂停时间,但是在remark之后也将立即开始又一次minor gc。调整后情况也没有特别的好转。 以上都是从减小单次cms gc的开销的方向去解决问题,然后我就换了个方向,降低cms gc发生的次数,让它少发生或者不发生。 调整四: 这里调整了一共5个参数, Xmn10g ==> 8g CMSInitiatingOccupancyFraction=70 ==>80 index.cache.filter.max_size 2g==>1g index.cache.filter.expire 2m==>40s index.refresh_interval 20s==>30s 前2个参数没什么好说的,提高cms gc 被触发的条件,降低cms gc 触发几率。 后3个参数是本文的重点,这里大概讲下es 对于filter cache的管理。 这部分是通过阅读源码分析出来的,涉及代码还挺多,有点复杂,还有很大一部分还是lucene的代码。 这里就不贴大段代码了。 es 对于 filter cache管理是内部维护了一个map的结构(实际是利用com.google.common.cache实现的),关键是这个map 的key 是个它自己定义的类 叫 FilterCacheKey,它override了equals方法 public FilterCacheKey(Object readerKey, Object filterKey) {     this.readerKey = readerKey;     this.filterKey = filterKey; } ... @Override public boolean equals(Object o) {     if (this == o) return true; //            if (o == null || getClass() != o.getClass()) return false;     FilterCacheKey that = (FilterCacheKey) o;     return (readerKey().equals(that.readerKey()) && filterKey.equals(that.filterKey)); } 从这里可以看出,filter cache 能否被再次利用到就跟readerKey和filterKey 有关。 filterkey如果你build 查询语句的时候什么都没设置的话,就是filter对象本身。 举个例子,TermFilter,如果term一样,那前后2次查询filterKey是一致的。 关键是这个readerKey是否一致呢?这个readerKey其实就是lucene 的 indexReader,如果前后2次查询有数据更新并且 index.refresh_interval 这个参数设置的比较小,es 会去刷新indexReader,那么很有可能readerKey不一致。 对应我们的场景,数据更新频繁,而且index.refresh_interval 比较小,那么缓存利用率就不太高。 后一次查询的filter cache 会重新put 到上面提到的map里,然后这个index.cache.filter.max_size 2g  就迅速占满(我们程序代码里很多地方使用到了filter),配多大都有可能占满。那这个filter cache什么时候被移除呢,index.cache.filter.expire 2m管移除这个事,当然应该还有size满了移除的策略。 这就造成了缓存没什么用,还占这么大地方,还占那么久。 然后这个filter cache就很可能跑到 old gen去了。 那么就让它占少点,不干活就快点走: index.cache.filter.max_size 2g==>1g index.cache.filter.expire 2m==>40s index.refresh_interval 20s==>30s 这些调整完毕,重启es ,通过bigdesk ,gc a线图好看多了,cms gc 基本没了,monitor gc 也顺畅多了。 五.总结和备注 总结: 1.优化这个事,我认为是业务上的优化要比技术上优化有效的多 2.日志和监控是你最好的朋友,仔细的看,你总会看出什么 3.es 缓存要利用好,还是需要好好去设计,回头考虑单独写一篇。 备注: 因为这次优化后,我就离开了原来的公司,没有了原来的环境。所以本文 部分参数和数字可能不准确,我仅凭记忆完成。  

Day5: 《PacketBeat奇妙的OOM小记》

Adventkira8565 发表了文章 • 0 个评论 • 2621 次浏览 • 2016-12-05 23:00 • 来自相关话题

Beats这个项目的确很好用,几行命令下来,一个成型的Agent就出来了。使用者只需要关注采集什么数据就好,后续的事情libbeat基本都处理完了。不过值得吐槽的是,Beat太散了,管理起来东一个西一个的,产品化的时候对客户说,我们要在机器上放n个Agent不知道客户会是什么样的表情。
d7d0a529244b57acb6ce3796da132df8.jpg
不过轻量级、已部署的特点还是极大的吸引了我,于是就有了后面的事情了。 PacketBeat不明原因的OOM 某天我把PacketBeat放到了我的服务器上面,这台服务器上面有个MongoDB,MongoDB主要是拿来存放ES的元数据的。ES2.x的时候并没有很好的元数据管理,为了能让ES的索引分配的比较均匀,并且有元数据辅助查询,设计好一个元数据管理的仓库是必要的。然后我打开了对MongoDB的抓包功能,恩,一切都很好,接着我打开了日志管理页面,看到了一条一条的MongoDB的包被抓回来,解码,然后塞到了ES。可是第二天一看,咦??Packet跪了?不是吧,ElasticSearch做的产品这么不稳定么。我不信。
06170826_dLgU.png
然后我又启动了第二次,紧接着熟练的top了一下,观察了PacketBeat半个多小时,在被观察的这段时间里面,PacketBeat的表现非常的正常,看不出有什么异样。好吧,那上一次的OOM可能只是个意外,Windows也经常蓝屏嘛,OOM一次也正常。结果第二天我再次打开终端,发现这货居然又OOM了!!
06170909_irst.png
好吧好吧,我感觉我已经踩到Bug了,拿了开源社区这么多东西,总得贡献一下的,好吧,提个Issue去 https://github.com/elastic/beats/issues/2867 真相只有一个 微信群里面聊起这个奇妙的OOM,Medcl大神问是不是因为采集了ES的日志,(我的这台服务器和日志服务器有关系)然后导致滚雪球把PacketBeat给滚死了。咦?说不定真的是这个原因耶!但是看了看PacketBeat,我并没有抓ES的包,而且假如我采集了ES的包,应该一下就OOM掉了,不应该等那么久。不过这么一说,却仿佛打开了新世界的大门
06171040_FVEG.png
我把这台服务器在日志服务器中的角色重新梳理了下,终于发现了这次OOM的原因了。。 由于2.X的ES没有比较好的元数据信息,所以当日志送到LogServer的时候,我做了些额外的操作,让LogServer持久化到ES一定量的时候就会往Mongo写一下元数据信息(当然也有其他服务会往里面做CRUD啦),开始的时候访问Mongo的次数其实是很少的,假设按1W来算。那么问题来了,由于我们的PacketBeat抓了Mongo的包,那么LogServer往ES的CRUD操作都会被PacketBeat给抓走,然后再送回给LogServer
06171248_tcdl.png
那么一个隐藏的滚雪球事件就产生了,刚开始的那段时间,Mongo被抓包的次数只有1W,然后就往LogServer多送了1W条日志,不。。应该多很多,毕竟网络包嘛,然后就导致LogServer因为要管理元数据的频率开始逐渐地提高,逐渐提高CRUD的频率后抓包的内容也越来越多,紧接着到这发生到LogServer的频率也越来越高。。。。。每次PacketBeat崩掉的时候,都送了80W左右的日志量出去,然后它就OOM掉了(因为我那台机器就只剩下2G的空闲内存给它用,被系统给干掉了)。。我居然发现了这样的场景
06171336_PbWI.png
结论 使用PacketBeat的时候,记得要留意一下有没这种反馈型滚雪球的情况,多发生在自己的日志服务器上面。当然那种直接抓ES的就没什么好说了,估计启动了之后没多久就崩溃掉了

Day4: 《将sql转换为es的DSL》

AdventXargin 发表了文章 • 6 个评论 • 9472 次浏览 • 2016-12-04 23:23 • 来自相关话题

es现在几乎已经是开源搜索引擎的事实标准了,搭建简易,使用方便。不过在很多公司里(包括我司的部分部门),并不是把它当搜索引擎来用,而是当db来用。因为本身查询/搜索原理的区别,使es在千万或者亿级的数据中进行逻辑筛选相对高效。例如一些wms、工单查询系统,单表几十个甚至上百个字段,如果在数据库里为每种类型的查询都建立合适的索引,成本比较高,更不用说索引建多了还会影响到插入速度,后期的索引优化也是比较麻烦的问题。 不过如果把es当db来使的话,始终会有一个绕不过去的坎。就是es的DSL。让所有业务开发去学习dsl的话也不是不可以,但DSL真的有点反人类(不要打我)。简单的a and b或者a or b还比较容易写,如果我要的是a and (b and (c or d) and e)的查询逻辑,那我觉得谁写都会晕。即使是用官方或者第三方提供的client,如果需求多种多样的话,想要灵活地实现`需求=>DSL`的过程还是比较痛苦。 对于业务开发来说,当然是sql更平易近人(毕竟写了这么多年CRUD)。所以还有一种歪门邪道的流派,直接把sql转成DSL。要做sql和DSL转换的工作,需要进行sql的解析,先不要怵,这个年代找一个靠谱的sql parser还是比较容易的。比如阿里开源的druid连接池里的sql模块:   https://github.com/alibaba/dru ... d/sql 因为笔者的实现是用的下面这个golang版的parser: https://github.com/xwb1989/sqlparser 所以用这个来举例吧~ 这个是其作者从youtube/vitness里提取并进行改进的一个parser,我们能用到的是一部分子集功能,只需要解析select类的sql。 先举个简单的sql的例子:
select * from x_order where userId = 1 order by id desc limit 10,1;

解析之后会变成golang的一个struct,来看看具体的定义:

&sqlparser.Select{
    Comments:sqlparser.Comments(nil),
    Distinct:"",
    SelectExprs:sqlparser.SelectExprs{(*sqlparser.StarExpr)(0xc42000aee0)},
    From:sqlparser.TableExprs{(*sqlparser.AliasedTableExpr)(0xc420016930)},
    Where:(*sqlparser.Where)(0xc42000afa0),
    GroupBy:sqlparser.GroupBy(nil),
    Having:(*sqlparser.Where)(nil),
    OrderBy:sqlparser.OrderBy{(*sqlparser.Order)(0xc42000af20)},
    Limit:(*sqlparser.Limit)(0xc42000af80),
    Lock:""
}
sql的select语句在被解析之后生成一个Select的结构体,如果我们不关心使用者需要的字段的话,可以先把SelectExprs/Distinct/Comments/Lock里的内容忽略掉。如果不是分组统计类的需求,也可以先把GroupBy/Having忽略掉。这里我们关心的就剩下From、Where、OrderBy和Limit。 From对应的TableExprs实际上可以认为是简单的字符串,这里的值其实就是`x_order`。 OrderBy实际上是一个元素为
type Order struct {
    Expr      ValExpr
    Direction string
}\
的数组。 Limit也很简单,
type Limit struct {
    Offset, Rowcount ValExpr
}
其实就是俩数字。 那么剩下的就是这个Where结构了。where会被解析为AST(`https://en.wikipedia.org/wiki/Abstract_syntax_tree`),中文是抽象语法树。在不说子查询之类的情况下,这个AST也不会太复杂,毕竟where后面的情况比起编译原理里的程序语言来说情况还是要少得多的。以上述的sql为例,这里解析出来的Where结构是这样的:
&sqlparser.Where{
    Type:"where",
    Expr:(*sqlparser.ComparisonExpr)(0xc420016a50)
}
只有一个节点,一个ComparisonExpr表达式,这个ComparisonExpr,中文比较表达式,指代的就是我们sql里的`user_id = 1`。实际上我们可以认为这个"比较表达式"即是所有复杂AST的叶子节点。叶子结点在AST遍历的时候一般也就是递归的终点。因为这里的查询比较简单,整棵AST只有一个节点,即根节点和叶子节点都是这个ComparisonExpr。 再来一个复杂点的例子。
select * from users where user_id = 1 and product_id =2

=>

&sqlparser.Where{
    Type:"where",
    Expr:(*sqlparser.AndExpr)(0xc42000b020)
}

AndExpr有Left和Right两个成员,分别是:

Left:
&sqlparser.ComparisonExpr{
    Operator:"=",
    Left:(*sqlparser.ColName)(0xc4200709c0),
    Right:sqlparser.NumVal{0x31}
}

Right:
&sqlparser.ComparisonExpr{
    Operator:"=",
    Left:(*sqlparser.ColName)(0xc420070a50),
    Right:sqlparser.NumVal{0x32}
}
稍微有一些二叉树的样子了吧。把这棵简单的树画出来:
Untitled1.png
回到文章开头的那个复杂的例子:
a and (b and (c or d) and e)

=>

select * from user_history where user_id = 1 and (product_id = 2 and (star_num = 4 or star_num = 5) and banned = 1)
看着真够麻烦的,我们把这棵树画出来:
Untitled.png
这样看着就直观多了。我们有了AST的结构,那要怎么对应到es的查询DSL呢?少安毋躁。 我们知道es的bool query是可以进行嵌套的,所以实际上我们可以同样可以构造出树形结构的bool query。这里把bool嵌套must和bool嵌套should简化一下,写成boolmust和boolshould: 例如a and (b and c)
query {
    boolmust {
        a,
        boolmust {
            b,
            c
        }
    }
}
我们把query内部的第一个boolmust当作根节点,内部嵌套的a和另一个boolmust当作它的两个子节点,然后b和c又是这个boolmust的子节点。可以看出来,实际上这棵树和AST的节点可以一一对应。 再回到文章开头的例子,a and (b and (c or d) and e):
query {
    boolmust {
        a,
        boolmust {
            b,
            boolshould {
                c,
                d
            },
            e
        }
    }
}
和前文中ast来做个简单的结构对比~
dsl和ast对比.png
和前文中sql的where解析后的AST树也是完全匹配的。思路来了,只要对sql解析生成的AST进行递归,即可得到这棵树。当然了,这里还可以进行一些优化,如果子节点的类型和父 节点的类型一致,例如都是and表达式或者都是or表达式,我们可以在生成dsl的时候将其作为并列的节点进行合并,这里不再赘述。 在递归中有这么几种情况:
AndExpr => bool must [{left}, {right}]
OrExpr => bool should [{left}, {right}]
ComparisonExpr => 一般是叶子节点
ParenBoolExpr => 指代括号表达式,其实内部是上述三种节点的某一种,所以直接取出内部节点按上述方法来处理
这样问题就变成了如何处理AST的叶子节点。前面提到了叶子节点实际上就是Comparison Expression。只要简单进行一些对应即可,下面是我们的项目里的一些对应关系,仅供参考:
convert.png
最后再附上demo   https://github.com/cch123/elasticsql

Day3: 《创建一个你自己的 Beat》

Adventmedcl 发表了文章 • 0 个评论 • 3880 次浏览 • 2016-12-03 22:19 • 来自相关话题

Elastic Advent 第三篇, 手头上事情实在太多,这两天正在进行权威指南翻译的冲刺阶段,临时填下坑,翻译官网的一篇文章吧(原文:https://www.elastic.co/blog/build-your-own-beat),Advent 规则很自由的,没说不能翻译文章啊,嘿嘿嘿,另外号召大家踊跃报名,大家一起玩才有意思。   活动地址:http://elasticsearch.cn/article/107   言归正传!  Beat 是一个开源的用来构建轻量级数据汇集的平台,可用于将各种类型的数据发送至Elasticsearch 与 Logstash。我们有 Packetbeat 用于监控局域网内服务器之间的网络流量信息,有 Filebeat 收集服务器上的日志信息,还有新推出的 Metricbeat 可以定期获取外部系统的监控指标信息,除此以外,你还可以非常方便的基于 libbeat 框架来构建你属于自己的专属 Beat,目前 beas 社区已经有超过25个 Community Beats 了。 Elastic 还提供一个 Beat generator(Beat 生成器)来帮你快速构建属于你自己的 Beat。通过这篇博客你将会看到如何通过 Beat 生成器来快速创建一个你自己的 Beat。今天我们创建的是一个叫做 lsbeat 的 Beat,lsbeat 非常类似 Unix 系统下的命令行 ls,我们用 lsbeat 来索引目录和文件信息。本篇文章环境基于 Unix 系统,如果你是 Windows 或是其它系统,相关操作可能需要根据实际情况进行调整。 第一步 – 配置 Golang 环境 Beats 是用 Golang写的,显然,要创建和开发一个 beat,Golang 环境必不可少,关于这方面的文章很多,建议查看这篇 Golang 的安装向导: install Golang。当前 Beats 需要的最低版本是 Golang 1.6。另外请确保正确设置了你的 $GOPATH 环境变量。 另外 Golang Glide 被用来进行包的依赖管理,所以也需要确保正确安装,最低版本是 Glide 0.10.0,安装说明点这里。 让我们先来看看 lsbeat 将会用到的一段代码吧,这是一个简单的 golang 程序,通过命令行接收一个目录参数,然后列出该目录下的文件和子目录信息。
package main
 
import (
    "fmt"
    "io/ioutil"
    "os"
)
 
func main() {
    //apply run path "." without argument.
    if len(os.Args) == 1 {
        listDir(".")
    } else {
        listDir(os.Args[1])
    }
}
 
func listDir(dirFile string) {
    files, _ := ioutil.ReadDir(dirFile)
    for _, f := range files {
        t := f.ModTime()
        fmt.Println(f.Name(), dirFile+"/"+f.Name(), f.IsDir(), t, f.Size())
 
        if f.IsDir() {
            listDir(dirFile + "/" + f.Name())
        }
    }
}
后面我们将使用到这段代码和 listDir 函数。 第二步 – 生成项目 要生成一个你自己的 Beat,就要用到 beat-generator 了,首先你必须安装 cookiecutter。安装的详细说明看这里。安装好 cookiecutter 之后,我们要给自己的 Beat 起一个好听的名字,最好是小写的英文字母,我们今天这个例子就叫 lsbeat 吧。 生成项目模板之前,我们需要下载 Beats generator 包文件,就在 beats 仓库。安装好 GoLang 之后,你就可以很方便的使用 go get 命令来下载 Beats generator 包文件了。 当你执行下面的这个命令后,所有的源码文件都会下载到 $GOPATH/src 目录。
$ go get github.com/elastic/beats
在 GOPATH 下创建一个以你自己github账号名称命名的目录,并切换过去,然后执行 cookiecutter 命令并指向 Beat Generator 源码路径。
$ cd $GOPATH/src/github.com/{user}
$ cookiecutter $GOPATH/src/github.com/elastic/beats/generate/beat
Cookiecutter 接下来会问你几个问题,比如项目名称,我们输入:lsbeat;你的 github 用户名,输入你自己的 github 账户;还有两个关于beat和beat_path应该会自动识别,默认回车就好;最后的问题,你可以输入你的姓和名。
project_name [Examplebeat]: lsbeat
github_name [your-github-name]: {username}
beat [lsbeat]:
beat_path [github.com/{github id}]:
full_name [Firstname Lastname]: {Full Name}
现在应该已经创建好了一个名为 lsbeat 的目录,并且里面应该会生成一些文件,让我们一起来看一下吧,结构如下:
$ cd lsbeat
$ tree
.
├── CONTRIBUTING.md
├── LICENSE
├── Makefile
├── README.md
├── beater
│   └── lsbeat.go
├── config
│   ├── config.go
│   └── config_test.go
├── dev-tools
│   └── packer
│       ├── Makefile
│       ├── beats
│       │   └── lsbeat.yml
│       └── version.yml
├── docs
│   └── index.asciidoc
├── etc
│   ├── beat.yml
│   └── fields.yml
├── glide.yaml
├── lsbeat.template.json
├── main.go
├── main_test.go
└── tests
    └── system
        ├── config
        │   └── lsbeat.yml.j2
        ├── lsbeat.py
        ├── requirements.txt
        └── test_base.py
我们刚刚已经生成好了一个原始的 Beat 模板了,但是你还需要获取相关的依赖和设置好 git 仓库。 首先,你需要拉取依赖的相关包信息,我们的这个例子是 lsbeat,我们先做一些的基本的配置,回头再看看详细看看其它的模板和配置文件,只需要执行 make setup 就可以自动获取依赖。
$ make setup
当你创建好了自己的 Beat 之后,记得上传到 github 仓库,并和社区进行分享哦,如下:
beats.png
要 push lsbeat 到你的 git 仓库,只需要执行如下命令:
$ git remote add origin git@github.com:{username}/lsbeat.git
$ git push -u origin master
恭喜你,现在你已经完成了一个 Beat ,并且发布了第一个版本到了 Github,不过里面还没有什么具体内容,现在让我们进一步看看里面的代码吧。 第四步 – 配置 执行过上面一系列命令之后,项目里将会自动创建名为 lsbeat.yml 和 lsbeat.template.json 的配置文件。所有的基本配置项都已经生成在了里面。
lsbeat.yml
lsbeat:
# Defines how often an event is sent to the output
period: 1s
Period 参数包含在每一个生成的 Beats 里面,它表示 lsbeat 将会每隔 1 秒钟轮询一次,这里我们修改 period 时间间隔为 10 秒。还可以在修改 etc/ 目录下面的 beat.yml 文件,这里新增一个 path 参数表示我们具体要监听哪个目录。
lsbeat:
# Defines how often an event is sent to the output
period: 10s
path: "."
参数添加好了之后,我们只需要运行 make update 命令就能让这些修改应用到配置文件lsbeat.yml。
$ make update
$ cat lsbeat.yml
 
################### Lsbeat Configuration Example #########################
 
############################# Lsbeat ######################################
 
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "."
###############################################################################
修改完 yml 文件,记得修改 config/config.go文件,添加一个path 参数。
package config
 
import "time"
 
type Config struct {
Period time.Duration `config:"period"`
Path   string        `config:"path"`
}
 
var DefaultConfig = Config{
Period: 10 * time.Second,
Path:   ".",
}
同时我们修改 period 默认时间间隔为 10 秒,默认监听的是当前目录 (.) 。. 第五步 – 添加代码 每一个 Beat 需要实现 Beater 接口,里面定义了 Run() 和 Stop() 函数。.  我们可以定义一个名为 Lsbeat 的结构体,然后用这个对象实现 Beater 接口。然后添加字段 lastIndexTime 来保存最后运行的时间戳信息。
type Lsbeat struct {
done   chan struct{}
config config.Config
client publisher.Client
 
lastIndexTime time.Time
...
}
另外,每个 Beat 还需要实现 New() 方法来接收 Beat 配置信息和返回 Lsbeat 的具体实例。
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
 
ls := &Lsbeat{
done:   make(chan struct{}),
config: config,
}
return ls, nil
}
在我们的 lsbeat 例子中,我们要做的就是扩展默认的 Run() 函数来导出指定目录的文件和子目录信息。 在修改 Run() 函数之前,我们先在 lsbeat.go 增加 listDir() 函数,就是我们前面最开始测试的那段代码,用于收集目录和文件信息的简单例子稍微修改一下。另外我们还要生成以下字段信息:
"@timestamp": common.Time(time.Now()),
"type": beatname,
"modtime": common.Time(t),
"filename": f.Name(),
"path": dirFile + "/" + f.Name(),
"directory": f.IsDir(),
"filesize": f.Size(),
第一次运行的时候我们将索引所有的文件和目录信息,然后我们再定期检查是否有新文件被创建或者修改,再索引这些新创建的文件和目录。每次定期检查的时间戳都会保存在 lasIndexTime 变量,完整代码如下:
func (bt *Lsbeat) listDir(dirFile string, beatname string, init bool) {
files, _ := ioutil.ReadDir(dirFile)
for _, f := range files {
t := f.ModTime()
 
event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type":       beatname,
"modtime":    common.Time(t),
"filename":   f.Name(),
"path":       dirFile + "/" + f.Name(),
"directory":  f.IsDir(),
"filesize":   f.Size(),
}
if init {
// index all files and directories on init
bt.client.PublishEvent(event)
} else {
// Index only changed files since last run.
if t.After(bt.lastIndexTime) {
bt.client.PublishEvent(event)
}
}
 
if f.IsDir() {
bt.listDir(dirFile+"/"+f.Name(), beatname, init)
}
}
}
记住在最开始需要导入 “io/ioutil” 包。
import (
"fmt"
"io/ioutil"
"time"
)
现在,让我们看看如何在 Run() 函数里面调用 listDir() 函数,并且保存时间戳到 lasIndexTime 变量。
func (bt *Lsbeat) Run(b *beat.Beat) error {
logp.Info("lsbeat is running! Hit CTRL-C to stop it.")
 
bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
 
select {
case <-bt.done:
return nil
case <-ticker.C:
}
 
bt.listDir(bt.config.Path, b.Name, true)   // call lsDir
bt.lastIndexTime = time.Now()               // mark Timestamp
 
logp.Info("Event sent")
counter++
}
 
}
函数 Stop() 用来中断 run 的循环执行,保持默认生成的就行。
func (bt *Lsbeat) Stop() {
bt.client.Close()
close(bt.done)
}
到这里,编码部分基本就完成了。我们接下来添加新字段到 mapping 中,修改文件 etc/fields.yml。.
- key: lsbeat
  title: LS Beat
  description:
  fields:
    - name: counter
      type: integer
      required: true
      description: >
        PLEASE UPDATE DOCUMENTATION
    #new fiels added lsbeat
    - name: modtime
      type: date
    - name: filename
      type: text
    - name: path
    - name: directory
      type: boolean
    - name: filesize
      type: long

重新应用新的配置。 $ make update 字段 file_name 将使用 nGram 分词,我们还需要在文件 lsbeat.template.json 的 “settings” 节点添加一个自定义的 analyzer。
{
  "mappings": {
        ...
  },
  "order": 0,
  "settings": {
    "index.refresh_interval": "5s",
    "analysis": {
      "analyzer": {
        "ls_ngram_analyzer": {
          "tokenizer": "ls_ngram_tokenizer"
        }
      },
      "tokenizer": {
        "ls_ngram_tokenizer": {
          "type": "ngram",
          "min_gram": "2",
          "token_chars": [
            "letter",
            "digit"
          ]
        }
      }
    }
  },
  "template": "lsbeat-*"
}

第六步 – 编译和运行 现在我们可以编译和运行了,只需要执行 make 命令就可以编译出可执行文件 lsbeat (lsbeat.exe on windows) 。 $ make 修改 lsbeat.yml 文件,设置需要监听的目录,如: “/Users/ec2-user/go”,记住是全路径。
lsbeat:
  # Defines how often an event is sent to the output
  period: 10s
  path: "/Users/ec2-user/go"
同时确保你的 elasticsearch 和 kibana 正常运行。现在运行一下 lsbeat 命令看看会发生什么事情吧。 $ ./lsbeat 打开Kibana,通过调用 _cat 接口我们看看的索引是不是创建了。
beats-1.png
可以看到创建了一个名为 lsbeat-2016.06.03 的索引,并且看到已经有了一些文档了。现在对 filename 字段查询一下,由于使用的是 nGram 分词,支持模糊匹配,我们使用 lsbe 关键字搜一下。
beats-2.png
大功告成! 恭喜你,你已经完成了第一个属于你自己的 beat。

Day2:《Kibana 系漫游指南》

Advent三斗室 发表了文章 • 6 个评论 • 6677 次浏览 • 2016-12-02 22:48 • 来自相关话题

大家好,欢迎你们来到 ELK 三体星系的第二天。昨天,wood 送给大家一本脚踏实地的生存指南,今天让我们仰望星空,由我给大家介绍一下围绕在 Kibana 身边的诸多行星们~ Kibana Plugin 类型简介 我们最熟悉的 Kibana Plugin,其实就是 Kibana 本身~ Kibana 提供了一整套框架,我们可以在此基础上,开发诸多不同类型的插件,包括:
  • app
  • visTypes
  • spyModes
  • fieldFormatter
列这么几个源码里的名词出来可能大家觉得比较晦涩。其实呢,app 就是同时具有前后端实现的应用,在 Kibana 5 里,默认分发的 app 有四个:实现日志查询和可视化的 kibana app、实现时序指标统计和可视化的 timelion app、实现和 ES 接口交互命令的 console app、在有异常的时候才看得到的状态页面 status_page app。 而 visTypes 则是在 kibana 中具体可用的可视化效果。默认分发的有:kbn_vislib_vis_types、metric、table、markdown。我们常用的那些由 D3.js 完成的饼图线图地图,都是在 kbn_vislib_vis_types 中完成的。 fieldFormatter 则用来定义在 ES 中相同类型的数据,根据其实际含义,可以有不同的展示方式。比如说:URL 肯定是一个字符串,但是可以用 fieldFormatter 把它在页面展示的时候,加上 `<a href></a>` 的样式,让人一键点击;同理,还可以过滤判断一下图片类 URL,加上 `<img src></img>` 的样式,直接在 Kibana 界面上就看图片内容~~ 官方的我们会看手册啦~ 好啦好啦,我也不会真的去抄一把官方手册假冒《Kibana 系漫游指南》来骗你们流量的。下面给大家介绍一些社区开源的,让你绝对眼前一亮的各种新奇扩展: 1. logtrail     这是一个 app 插件,创意来自 papertrail 公司的产品。完全的满足了 Geeker 们喜欢黑底白字终端的癖好~不过其实实现非常简单:每隔 10 秒请求一次最近 500 条日志就是啦! 2. vectormap     这是一个 visType 插件,也就是我们在 Kibana3 里曾经用过的 map panel 效果。这个插件不被官方直接采用的一个原因是版权许可问题。不做商用的情况下,这个插件还是可以极大方便我们做行政区域的访问情况统计和展示的。      3. kbn_network     这也是一个 visType 插件,酷毙了的网状图效果!通过不同的 aggs 数据展示 node 和 relational。     注意这个跟 Elastic 的 graph 并不是完全一致的东西。该插件要求你本身的数据已经有直接的关联可用。      4. sentinl     这是一个同时带有 spyMode 和 app 双插件的项目。其基础思路是参照 Elastic 的 Watcher 接口,但是将监控告警的进程从 ES 挪到 Kibana 里。同时还可以通过 phantomjs 做到截图报表。          这个项目最大的特点,是通过 spyMode 插件,大大降低了配置告警规则的复杂度。这个扩展让你可以在 Kibana 上配置任意聚合效果之后,就地点击定义当前聚合语句为告警规则!      5. kibana-keynote     这是另一个剑走偏锋的 app 插件,出自 Kibana 作者本人之手。它的作用是:播放 keynote 演讲稿!事实上项目里放的演讲稿就是作者本人在 ELastic{ON} 2016 上用的。让我们猜一猜下周的大会上,他会不会就用这个插件给我们分享呢? 今天就先讲这几颗最闪亮的星了~有兴趣了解更多 Kibana 行星的游客,欢迎阅读全本《Kibana系漫游指南》。 也欢迎观看 Kibana 行星的《探索·发现》节目哟~

Day1: 大规模Elasticsearch集群管理心得

Adventkennywu76 发表了文章 • 63 个评论 • 27649 次浏览 • 2016-12-02 10:07 • 来自相关话题

【携程旅行网 吴晓刚】  ElasticSearch目前在互联网公司主要用于两种应用场景,其一是用于构建业务的搜索功能模块且多是垂直领域的搜索,数据量级一般在千万至数十亿这个级别;其二用于大规模数据的实时OLAP,经典的如ELKStack,数据规模可能达到千亿或更多。 这两种场景的数据索引和应用访问模式上差异较大,在硬件选型和集群优化方面侧重点也会有所不同。一般来说后一种场景属于大数据范畴,数据量级和集群规模更大,在管理方面也更有挑战。 应Medcl大大的邀请,为ES中文社区做今年的Advent开篇,分享一下我在管理自家公司用于日志分析的ES集群方面的一点心得,蜻蜓点水,泛泛而谈,希望大方向上能对大家提供一些帮助。 这里的自家,即是携程旅行网。从2013年开始接触ES,我们团队先后实践过0.9.x -> 5.0.0中间各个版本,从最初只用于运维内部IIS日志的分析,到如今支持IT、呼叫中心、安全、测试、业务研发等多个部门超过200种日志型数据的实时检索与分析。 一路走来,愉悦了大家,也死磕了自己。 目前我们最大的日志单集群有120个data node,运行于70台物理服务器上。数据规模如下:
  • 单日索引数据条数600亿,新增索引文件25TB (含一个复制片则为50TB)
  • 业务高峰期峰值索引速率维持在百万条/秒
  • 历史数据保留时长根据业务需求制定,从10天 - 90天不等
  • 集群共3441个索引、17000个分片、数据总量约9300亿, 磁盘总消耗1PB
  • Kibana用户600多人, 每日来自Kibana和第三方的API调用共63万次
  • 查询响应时间百分位 75%:0.160s  90%:1.640s 95%:6.691s 99%:14.0039s
运维这样大规模的ES集群,有哪些值得注意的地方? 一. 必不可少的工具 工欲善其事必先利其器,从一开始,哪怕就只有几个node,就应该使用分布式配置管理工具来做集群的部署。随着应用的成熟,集群规模的逐步扩大,效率的提升会凸显。 官方提供了ES Puppet Module和Chef Cookbook,熟悉这两个工具的同学可以直接拿过来用。 我们自己则是采用的Ansible,编写了一套Playbook来达到类似的效果。 用熟这类工具,对于集群的初始部署,配置批量更改,集群版本升级,重启故障结点都会快捷和安全许多。 第二个必备利器就是sense插件。通过这个插件直接调用集群的restful API,在做集群和索引的状态查看,索引配置更改的时候非常方便。语法提示和自动补全功能更是实用,减少了翻看文档的频率。在Kibana5里面,sense已经成为一个内置的控制台,无需额外安装。 二. 硬件配置 我们采用的是32vcoreCPU + 128GB RAM的服务器,磁盘配置大部分服务器是12块4TB SATA机械磁盘做的Raid0,少部分机器是刚上了不久的6块800GB SSD raid0,主要目的是想做冷热数据分离,后面谈到集群架构的时候,再进一步解释一下如何利用硬件资源。 三. 集群的管理
  1. 首先很有必要对ES的结点做角色划分和隔离。大家知道ES的data node除了放数据以外,也可以兼任master和client的角色,多数同学会将这些角色混入到data node。然而对于一个规模较大,用户较多的集群,master和client在一些极端使用情况下可能会有性能瓶颈甚至内存溢出,从而使得共存的data node故障。data node的故障恢复涉及到数据的迁移,对集群资源有一定消耗,容易造成数据写入延迟或者查询减慢。如果将master和client独立出来,一旦出现问题,重启后几乎是瞬间就恢复的,对用户几乎没有任何影响。另外将这些角色独立出来的以后,也将对应的计算资源消耗从data node剥离出来,更容易掌握data node资源消耗与写入量和查询量之间的联系,便于做容量管理和规划。
  2. 避免过高的并发,包括控制shard数量和threadpool的数量。在写入量和查询性能能够满足的前提下,为索引分配尽量少的分片。分片过多会带来诸多负面影响,例如:每次查询后需要汇总排序的数据更多;过多的并发带来的线程切换造成过多的CPU损耗;索引的删除和配置更新更慢Issue#18776; 过多的shard也带来更多小的segment,而过多的小segment会带来非常显著的heap内存消耗,特别是如果查询线程配置得很多的情况下。 配置过大的threadpool更是会产生很多诡异的性能问题Issue#18161里所描述的问题就是我们所经历过的。 默认的Theadpool大小一般来说工作得很不错了。
  3. 冷热数据最好做分离。对于日志型应用来说,一般是每天建立一个新索引,当天的热索引在写入的同时也会有较多的查询。如果上面还存有比较长时间之前的冷数据,那么当用户做大跨度的历史数据查询的时候,过多的磁盘IO和CPU消耗很容易拖慢写入,造成数据的延迟。所以我们用了一部分机器来做冷数据的存储,利用ES可以给结点配置自定义属性的功能,为冷结点加上"boxtype":"weak"的标识,每晚通过维护脚本更新冷数据的索引路由设置index.routing.allocation.{require|include|exclude},让数据自动向冷结点迁移。 冷数据的特性是不再写入,用户查的频率较低,但量级可能很大。比如我们有个索引每天2TB,并且用户要求保持过去90天数据随时可查。保持这么大量的索引为open状态,并非只消耗磁盘空间。ES为了快速访问磁盘上的索引文件,需要在内存里驻留一些数据(索引文件的索引),也就是所谓的segment memory。稍微熟悉ES的同学知道,JVM heap分配不能超过32GB,对于我们128GB RAM, 48TB磁盘空间的机器而言,如果只跑一个ES实例,只能利用到32GB不到的heap,当heap快用饱和的时候,磁盘上保存的索引文件还不到10TB,这样显然是不经济的。 因此我们决定在冷结点上跑3个ES实例,每个分配31GB heap空间,从而可以在一台物理服务器上存储30多TB的索引数据并保持open状态,供用户随时搜索。 实际使用下来,由于冷数据搜索频率不高,也没有写入,即时只剩余35GB内存给os做文件系统缓存,查询性能还是可以满足需求的。
  4. 不同数据量级的shard最好隔离到不同组别的结点。 大家知道ES会自己平衡shard在集群的分布,这个自动平衡的逻辑主要考量三个因素。其一同一索引下的shard尽量分散到不同的结点;其二每个结点上的shard数量尽量接近;其三结点的磁盘有足够的剩余空间。这个策略只能保证shard数量分布均匀,而并不能保证数据大小分布均匀。 实际应用中,我们有200多种索引,数据量级差别很大,大的一天几个TB,小的一个月才几个GB,并且每种类型的数据保留时长又千差万别。抛出的问题,就是如何能比较平衡并充分的利用所有节点的资源。 针对这个问题,我们还是通过对结点添加属性标签来做分组,结合index routing控制的方式来做一些精细化的控制。尽量让不同量级的数据使用不同组别的结点,使得每个组内结点上的数据量比较容易自动平衡。
  5. 定期做索引的force merge,并且最好是每个shard merge成一个segment。前面提到过,heap消耗与segment数量也有关系,force merge可以显著降低这种消耗。 如果merge成一个segment还有一个好处,就是对于terms aggregation,搜索时无需构造Global Ordinals,可以提升聚合速度。
四. 版本选择 我们在2.4版本上稳定跑了很长时间,比较保守的同学可以上2.4,激进有精力折腾的可以考虑最新的5.0。 我们集群两周前从v2.4.0升级到了v5.0.0这个版本,除了升级第一周遇到一个不稳定的问题以外,感觉新版本带来的以下特性还是非常值得去升级的:
  • 结点启动的Bootstrap过程加入了很多关键系统参数设置的核验,比如Max File Descriptors, Memory Lock, Virtual Memory设置等等,如果设置不正确会拒绝启动并抛出异常。 与其带着错误的系统参数启动,并在日后造成性能问题,不如启动失败告知用户问题,是个很好的设计!
  • 索引性能提升。升级后在同样索引速率下,我们看到cpu消耗下降非常明显,除了对索引速率提升有帮助,也会一定程度提升搜索速率。
  • 新的数值型数据结构,存储空间更小,Range和地理位置计算更快速
  • Instant Aggregation对于类似now-7d to now这样的范围查询聚合能够做cache了,实际使用下来,效果明显,用户在Kibana上跑个过去一周数据的聚合,头2次刷新慢点,之后有cache了几乎就瞬间刷出!
  • 更多的保护措施保证集群的稳定,比如对一次搜索hit的shard数量做了限制,增强了circuit breaker的特性,更好的防护集群资源被坏查询耗尽。
升级第一周,我们的冷数据结点出现间歇性不响应问题,从而刨出3个issue提交给官方: Issue#21595 Issue#21612 Issue#21611 第一个问题确认为Bug,将在5.0.2修复,其他两个目前还不清楚根源,看起来也只在我们的应用场景里遇到了。所幸问题都找到了了规避措施,实施这些措施以后,最近一周我们的集群重新回到以前2.4版本时期的稳定状态。 五. 监控 不差钱没空折腾的建议还是买官方的xpack省心,有精力折腾的,利用ES各种丰富的stats api,用自己熟悉的监控工具采集数据,可视化出来就好了。 那么多监控指标,最最关键的还是以下几类:
  1. 各类Thread pool的使用情况,active/queue/reject可视化出来。 判断集群是否有性能瓶颈了,看看业务高峰期各类queue是不是很高,reject是不是经常发生,基本可以做到心里有数。
  2. JVM的heap used%以及old GC的频率,如果old GC频率很高,并且多次GC过后heap used%几乎下不来,说明heap压力太大,要考虑扩容了。(也有可能是有问题的查询或者聚合造成的,需要结合用户访问记录来判断)。
  3. Segment memory大小和Segment的数量。节点上存放的索引较多的时候,这两个指标就值得关注,要知道segment memory是常驻heap不会被GC回收的,因此当heap压力太大的时候,可以结合这个指标判断是否是因为节点上存放的数据过多,需要扩容。Segement的数量也是比较关键的,如果小的segment非常多,比如有几千,即使segment memory本身不多,但是在搜索线程很多的情况下,依然会吃掉相当多的heap,原因是lucene为每个segment会在thread local里记录状态信息,这块的heap内存开销和(segment数量* thread数量)相关。
  4. 很有必要记录用户的访问记录。我们只开放了http api给用户,前置了一个nginx做http代理,将用户第三方api的访问记录通过access log全部记录下来。通过分析访问记录,可以在集群出现性能问题时,快速找到问题根源,对于问题排查和性能优化都很有帮助。
最后就是多上手实践,遇到问题多查官方资料,多Google看是否有其他人遇到同类问题,精力充足有编程背景的同学也可以多刨刨源码。

Elastic Advent Calendar 活动启动咯!

Adventmedcl 发表了文章 • 11 个评论 • 3295 次浏览 • 2016-11-04 13:46 • 来自相关话题

时间一转又到了年末,去年的 Advent 在三斗的发起下,进行的很不错,今年的 Advent 活动继续办下去吧,借鉴日本(http://qiita.com/advent-calendar/2016/elastic)的做法,我们今年可以先报名占坑,预定一个日子和你打算写的文章的标题,尽量错开时间。 今年的Advent文章也会同步发布到社区公众号。 去年 Advent 活动回顾 http://elasticsearch.cn/topic/advent   由于本站没有日历的功能,大家留言评论报名预定就好了。   格式(仅12月):日期,标题 如:12月x日 , xxx 小技巧一则   已发布: 《大规模Elasticsearch集群管理心得》 《Kibana 系漫游指南》  《创建一个你自己的 Beat》 《将sql转换为es的DSL》 《Elasticsearch 2.x mapping tips》 《无外网环境10分钟快速集成 elasticsearch-head》 《Elasticsearch 5 入坑指南》 《可定制的 elasticsearch 数据导入工具 ——mysql_2_elasticsearch》 《记一次es性能调优》 《PacketBeat奇妙的OOM小记》 《ES5.0.0 安装记录》

Day24: Elasticsearch添加Shield后TransportClient如何连接?

Adventmedcl 发表了文章 • 6 个评论 • 4253 次浏览 • 2015-12-28 12:13 • 来自相关话题

Shield是Elasticsearch一个安全防护插件,提供了权限访问控制和日志审计功能,企业可以很方便的和LDAP或是ActiveDirectory进行集成,重用现有的安全认证体系.
shield-triad.png
Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接. 关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin. 1.HTTP方式 现在直接访问es的http接口就会报错 curl http://localhost:9200 {"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401} shield支持HttpBasic验证,所以正确的访问姿势是: curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" } 如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问. 注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用. 2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下: 2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下: <repositories>  <repository>  <id>elasticsearch-releases</id>  <url>https://maven.elasticsearch.or ... gt%3B  <releases> <enabled>true</enabled> </releases>  <snapshots> <enabled>false</enabled> </snapshots>  </repository>  </repositories> 2.2 添加依赖的配置 <dependency>  <groupId>org.elasticsearch.plugin</groupId> <artifactId>shield</artifactId> <version>2.1.1</version> </dependency 2.3 构建TransportClient的地方增加访问用户的配置 import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue; String clusterName="elasticsearch"; String ip= "127.0.0.1";  Settings settings = Settings.settingsBuilder()    .put("cluster.name", clusterName)  .put("shield.user", "tribe_user:tribe_user")  .build();  try { client = TransportClient.builder()  .addPlugin(ShieldPlugin.class)  .settings(settings).build()  .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300));  String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray()));   client.prepareSearch() .putHeader("Authorization", token).get();   }  catch (UnknownHostException e)  { logger.error("es",e); }   现在的编辑器贴代码有点恶心,可以看这里: http://log.medcl.net/item/2015 ... -1252

Day 23 谈谈ES 的Recovery

Adventkennywu76 发表了文章 • 9 个评论 • 7220 次浏览 • 2015-12-25 16:45 • 来自相关话题

Note: 本文针对ES2.x  Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
  • 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
  • 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
  • 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。
通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:
cat_health.png
可能的状态及含义:

Green: 所有的shard主副片都完好的 Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。 Red: 某些shard的主副片都没有了,对应的索引数据不完整

Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。 减少集群Full Restart造成的数据来回拷贝 集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。

gateway.expected_nodes gateway.expected_master_nodes gateway.expected_data_nodes

以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。 在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:

gateway.recover_after_nodes gateway.recover_after_master_nodes gateway.recover_after_data_nodes

举例来说,对于一个有10个data node的集群,如果有以下的设置:

gateway.expected_data_nodes: 10 gateway.recover_after_time: 5m gateway.recover_after_data_nodes: 8

那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。 减少主副本之间的数据复制 如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:
cluster_settings.png
然后在结点重启完成加入集群后,再重新打开:
put_cluster_settings.png
这样在结点重启完成后,尽量多的从本地直接恢复数据。 但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。 为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。 需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启结点
  5. 重新开启集群shard allocation 
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序
(特别大的)热索引为何恢复慢 对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。 从主片恢复数据到副片需要经历3个阶段:
  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。
可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。 万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery 其他Recovery相关的专家级设置 还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。   最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。

Day22:pipeline aggregation计算日留存率示例

Advent三斗室 发表了文章 • 1 个评论 • 6693 次浏览 • 2015-12-25 11:06 • 来自相关话题

网友们多次讨论如何利用 ES 计算用户留存率的问题。这是个比较尴尬的情况,如果多次请求再自己做一下运算,问题很简单。但如果想要一次请求得到最终结果,在没有完整 JOIN 支持的 ES 里又显得比较难以完成。 目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
  "settings" : {
    "number_of_shards" : 1
  },
  "mappings" : {
    "logs" : {
      "properties" : {
        "uid" : { "type" : "string", "index" : "not_analyzed" },
        "register_time" : { "type" : "date", "index" : "not_analyzed" },
        "login_time" : { "type" : "date", "index" : "not_analyzed" }
      }
    }
  }
}'
那么实际记录的日志会类似这样:
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。 显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
  "size" : 0,
  "aggs" : {
    "new_users" : {

      "filters" : {
        "filters" : [
          {
            "range" : {
              "register_time" : {
                "gte" : "2015-12-23",
                "lt" : "2015-12-24"
              }
            }
          }
        ]
      },
      "aggs" : {
        "register_count" : {
          "cardinality" : {
            "field" : "uid"
          }
        },
        "today" : {
          "filter" : {
            "range" : {
              "login_time" : {
                "gte" : "2015-12-24",
                "lt" : "2015-12-25"
              }
            }
          },
          "aggs" : {
            "login_count" : {
              "cardinality" : {
                "field" : "uid"
              }
            }
          }
        },
        "retention" : {
          "bucket_script" : {
            "buckets_path" : {
              "today_count" : "today>login_count",
              "yesterday_count" : "register_count"
            },
            "script" : {
              "lang" : "expression",
              "inline" : "today_count / yesterday_count"
            }
          }
        }
      }
    }
  }
}'
这个 pipeline aggregation 在使用上有几个要点:
  1. pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
  2. bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。
最终这次请求的响应如下:
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "failed" : 0
  },
  "hits" : {
    "total" : 3,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "new_users" : {
      "buckets" : [ {
        "doc_count" : 3,
        "today" : {
          "doc_count" : 1,
          "login_count" : {
            "value" : 1
          }
        },
        "register_count" : {
          "value" : 2
        },
        "retention" : {
          "value" : 0.5
        }
      } ]
    }
  }
}
这个 retention 数据,就是我们要求解的 0.5 了。  

Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV

Advent三斗室 发表了文章 • 6 个评论 • 8164 次浏览 • 2015-12-24 16:33 • 来自相关话题

我们都知道Kibana4里,所有的aggregation生成的visualize都可以在请求细节查看里选择`Export`成raw或者formatted。其中formatted就是CSV文件。 但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。 可惜Kibana4没有针对search document table的导出! 国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成…… 点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated   然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:     然后点击这个『CSV』按钮,会弹出一片提示: 可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。 我们当然选择本机…… 然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。 实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。   注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!

Day20 利用tcpdump和kafka协议定位不合法topic的来源

Adventchilde 发表了文章 • 0 个评论 • 3467 次浏览 • 2015-12-23 23:26 • 来自相关话题

事情是这样滴,  我们在很多linux机器上部署了logstash采集日志, topic_id用的是 test-%{type}, 但非常不幸的是,  有些机器的某些日志, 没有带上type字段.    因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误  
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
        at kafka.common.Topic$.validate(Topic.scala:42)
        at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
        at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
        at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
        at scala.collection.SetLike$class.map(SetLike.scala:93)
        at scala.collection.AbstractSet.map(Set.scala:47)
        at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
        at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
        at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
        at java.lang.Thread.run(Thread.java:744)
把可用的信息瞬间淹没.     更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.   我想做的, 就是把有问题的logstash机器找出来.   我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'
dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.   先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.   重点来了!  向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest   看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求.  topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37   tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.   咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37  最后呢, 再提2点结束.   1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'
2.  不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了.

Day19 ES内存那点事

Adventkennywu76 发表了文章 • 51 个评论 • 22529 次浏览 • 2015-12-22 18:51 • 来自相关话题

【携程旅行网  吴晓刚】   注: 本文主要针对ES 2.x。  “该给ES分配多少内存?”  “JVM参数如何优化?“ “为何我的Heap占用这么高?” “为何经常有某个field的数据量超出内存限制的异常?“ “为何感觉上没多少数据,也会经常Out Of Memory?” 以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。 要理解ES如何使用内存,先要理解下面两个基本事实: 1.  ES是JAVA应用 2.  底层存储引擎是基于Lucene的 看似很普通是吗?但其实没多少人真正理解这意味着什么。  首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。 其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。  基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。 Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。 那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读: 1.  segment memory 2.  filter cache 3.  field data cache 4.  bulk queue 5.  indexing buffer 6.  state buffer 7.  超大搜索聚合结果集的fetch 8. 对高cardinality字段做terms aggregation Segment Memory Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。 下面是词典索引和词典主存储之间的一个对应关系图:
lucene_index.png
Lucene  file的完整数据结构参见Apache Lucene - Index File Formats 说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。 怎么知道segment memory占用情况呢?  CAT API可以给出答案。 1.  查看一个索引所有segment的memory占用情况:
seg_mem.png
2.  查看一个node上所有segment占用的memory总和:
seg_mem_node.png
那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法: 1.  删除不用的索引 2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。 3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。 Filter Cache (5.x里叫做Request cache) Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。 Field Data cache 在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。 Bulk Queue 一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。 Indexing Buffer Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。 Cluster State Buffer ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。 超大搜索聚合结果集的fetch ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。   对高cardinality字段做terms aggregation 所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。  在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。 小结:
  1. 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2. 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3. 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
  4. cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5. 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
  6. 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。

Day18: 程序内的消息流:ArrayBlockingQueue和zeromq对比

Adventchilde 发表了文章 • 1 个评论 • 2202 次浏览 • 2015-12-20 22:51 • 来自相关话题

在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转. 在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走. 但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块. zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看. 在我自己的电脑上测试,2.6 GHz Intel Core i5.  一个主线程生成10,000,000个随机数, 分发给四个线程消费. 用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右. 不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
每年的12月, 每天一篇围绕 Elastic 相关主题即可,从小的经验分享到实际案例.
https://en.wikipedia.org/wiki/Advent_calendar