看,灰机...

Day22:pipeline aggregation计算日留存率示例

网友们多次讨论如何利用 ES 计算用户留存率的问题。这是个比较尴尬的情况,如果多次请求再自己做一下运算,问题很简单。但如果想要一次请求得到最终结果,在没有完整 JOIN 支持的 ES 里又显得比较难以完成。

目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。

显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
    "new_users" : {

      "filters" : {
        "filters" : [
          {
            "range" : {
              "register_time" : {
                "gte" : "2015-12-23",
                "lt" : "2015-12-24"
              }
            }
          }
        ]
      },
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:
  1. pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
  2. bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。

最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。
 
继续阅读 »
网友们多次讨论如何利用 ES 计算用户留存率的问题。这是个比较尴尬的情况,如果多次请求再自己做一下运算,问题很简单。但如果想要一次请求得到最终结果,在没有完整 JOIN 支持的 ES 里又显得比较难以完成。

目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。

显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
    "new_users" : {

      "filters" : {
        "filters" : [
          {
            "range" : {
              "register_time" : {
                "gte" : "2015-12-23",
                "lt" : "2015-12-24"
              }
            }
          }
        ]
      },
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:
  1. pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
  2. bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。

最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。
  收起阅读 »

Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV

我们都知道Kibana4里,所有的aggregation生成的visualize都可以在请求细节查看里选择`Export`成raw或者formatted。其中formatted就是CSV文件。
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
 
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
 

 
然后点击这个『CSV』按钮,会弹出一片提示:

可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
 
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!
继续阅读 »
我们都知道Kibana4里,所有的aggregation生成的visualize都可以在请求细节查看里选择`Export`成raw或者formatted。其中formatted就是CSV文件。
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
 
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
 

 
然后点击这个『CSV』按钮,会弹出一片提示:

可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
 
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集! 收起阅读 »

简繁体转换插件更新:elasticsearch-analysis-stconvert 升级支持2.0

版本1.5.0 支持es2.0.0
 
项目地址:https://github.com/medcl/elast ... nvert 
 
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es

这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
 
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
 
详细配置和使用请参照上面的地址。
 
继续阅读 »
版本1.5.0 支持es2.0.0
 
项目地址:https://github.com/medcl/elast ... nvert 
 
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es

这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
 
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
 
详细配置和使用请参照上面的地址。
  收起阅读 »

Day20 利用tcpdump和kafka协议定位不合法topic的来源

事情是这样滴,  我们在很多linux机器上部署了logstash采集日志, topic_id用的是 test-%{type}, 但非常不幸的是,  有些机器的某些日志, 没有带上type字段. 
 
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
 
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)

把可用的信息瞬间淹没.  
 
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
 
我想做的, 就是把有问题的logstash机器找出来.
 
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'

dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
 
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
 
重点来了!  向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
 
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求.  topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
 
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
 
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37 
最后呢, 再提2点结束.
 
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'

2.  不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了.
继续阅读 »
事情是这样滴,  我们在很多linux机器上部署了logstash采集日志, topic_id用的是 test-%{type}, 但非常不幸的是,  有些机器的某些日志, 没有带上type字段. 
 
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
 
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)

把可用的信息瞬间淹没.  
 
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
 
我想做的, 就是把有问题的logstash机器找出来.
 
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'

dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
 
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
 
重点来了!  向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
 
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求.  topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
 
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
 
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37 
最后呢, 再提2点结束.
 
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'

2.  不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了. 收起阅读 »

Day19 ES内存那点事

【携程旅行网  吴晓刚】
 
注: 本文主要针对ES 2.x。

 “该给ES分配多少内存?” 
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”

以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。

要理解ES如何使用内存,先要理解下面两个基本事实:
1.  ES是JAVA应用
2.  底层存储引擎是基于Lucene的

看似很普通是吗?但其实没多少人真正理解这意味着什么。 

首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。

其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。 

基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。

Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。

那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
1.  segment memory
2.  filter cache
3.  field data cache
4.  bulk queue
5.  indexing buffer
6.  state buffer
7.  超大搜索聚合结果集的fetch
8. 对高cardinality字段做terms aggregation


Segment Memory
Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。

下面是词典索引和词典主存储之间的一个对应关系图:

lucene_index.png


Lucene  file的完整数据结构参见Apache Lucene - Index File Formats

说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。

怎么知道segment memory占用情况呢?  CAT API可以给出答案。
1.  查看一个索引所有segment的memory占用情况:

seg_mem.png


2.  查看一个node上所有segment占用的memory总和:

seg_mem_node.png



那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
1.  删除不用的索引
2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。

Filter Cache (5.x里叫做Request cache)
Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。


Field Data cache
在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。


Bulk Queue
一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。


Indexing Buffer
Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。


Cluster State Buffer
ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。


超大搜索聚合结果集的fetch
ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。
 
对高cardinality字段做terms aggregation
所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。  在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。


小结:
  1. 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2. 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3. 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
  4. cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5. 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
  6. 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。

继续阅读 »
【携程旅行网  吴晓刚】
 
注: 本文主要针对ES 2.x。

 “该给ES分配多少内存?” 
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”

以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。

要理解ES如何使用内存,先要理解下面两个基本事实:
1.  ES是JAVA应用
2.  底层存储引擎是基于Lucene的

看似很普通是吗?但其实没多少人真正理解这意味着什么。 

首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。

其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。 

基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。

Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。

那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
1.  segment memory
2.  filter cache
3.  field data cache
4.  bulk queue
5.  indexing buffer
6.  state buffer
7.  超大搜索聚合结果集的fetch
8. 对高cardinality字段做terms aggregation


Segment Memory
Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。

下面是词典索引和词典主存储之间的一个对应关系图:

lucene_index.png


Lucene  file的完整数据结构参见Apache Lucene - Index File Formats

说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。

怎么知道segment memory占用情况呢?  CAT API可以给出答案。
1.  查看一个索引所有segment的memory占用情况:

seg_mem.png


2.  查看一个node上所有segment占用的memory总和:

seg_mem_node.png



那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
1.  删除不用的索引
2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。

Filter Cache (5.x里叫做Request cache)
Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。


Field Data cache
在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。


Bulk Queue
一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。


Indexing Buffer
Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。


Cluster State Buffer
ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。


超大搜索聚合结果集的fetch
ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。
 
对高cardinality字段做terms aggregation
所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。  在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。


小结:
  1. 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2. 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3. 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
  4. cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5. 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
  6. 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。

收起阅读 »

Day18: 程序内的消息流:ArrayBlockingQueue和zeromq对比

在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.

在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.

但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.

zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.

在我自己的电脑上测试,2.6 GHz Intel Core i5.  一个主线程生成10,000,000个随机数, 分发给四个线程消费.

用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.

不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
继续阅读 »
在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.

在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.

但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.

zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.

在我自己的电脑上测试,2.6 GHz Intel Core i5.  一个主线程生成10,000,000个随机数, 分发给四个线程消费.

用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.

不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率. 收起阅读 »

Day17: "奇怪"的搜索

代@childe 发文。

除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}

插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}

但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索
{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?
## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?
是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:
{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?
上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
 
继续阅读 »
代@childe 发文。

除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}

插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}

但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索
{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?
## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?
是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:
{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?
上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
  收起阅读 »

day16 logstash-forwader To Kakfa!

看到前一天, Medcl 介绍了Beat, 我想今天我就介绍一下算是同一个领域的, 我们的一个小产品吧, 同样也基于elastic旗下的logstash-forwarder. 我真的不是来打广告的, 就是第一次写, 没经验, 看着前一天的文章, 顺手就想到了.

在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.

logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.

目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~

贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}

再简单介绍一下参数吧,
  • DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
  • “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
  • path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.


grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]

以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.

其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.

现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下.
继续阅读 »
看到前一天, Medcl 介绍了Beat, 我想今天我就介绍一下算是同一个领域的, 我们的一个小产品吧, 同样也基于elastic旗下的logstash-forwarder. 我真的不是来打广告的, 就是第一次写, 没经验, 看着前一天的文章, 顺手就想到了.

在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.

logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.

目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~

贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}

再简单介绍一下参数吧,
  • DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
  • “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
  • path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.


grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]

以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.

其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.

现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下. 收起阅读 »

Day15:Beats是什么东西?

Advent接力传到我这里了,今天我给大家介绍一下Beats,刚好前几天也有好多人问我它是干嘛的,之前的上海我有分享过Beats的内容,PPT在这里:

https://pan.baidu.com/s/1eS157 ... -6-18 


事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
  1. PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
  2. TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
  3. FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
  4. 。。。

所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
 
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
 
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
 
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
 
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
 
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
 
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
 
继续阅读 »
Advent接力传到我这里了,今天我给大家介绍一下Beats,刚好前几天也有好多人问我它是干嘛的,之前的上海我有分享过Beats的内容,PPT在这里:

https://pan.baidu.com/s/1eS157 ... -6-18 


事实上Beats是一系列产品的统称,属于ElasticStack里面收集数据的这一层:Data Shipper Layer,包括以下若干Beats:
  1. PacketBeat,用来嗅探和分析网络流量,如HTTP、MySQL、Redis等
  2. TopBeat,用来收集系统的监控信息,功能如其名,类似*nix下的top命令,只不过所有的信息都会发送给后端的集中存储:Elasticsearch,这样你就可以很方便的监控所有的服务器的运行情况了
  3. FileBeat,用来收集数据源是文件的数据,比如常见的系统日志、应用日志、网站日志等等,FIleBeat思路来自Logstash-forwarder,Beats团队加入之后重构改写而成,解决的就是Logstash作为Agent采集时占用太多被收集系统资源的问题,Beats家族都是Golang编写,效率高,占用内存和CPU比较少,非常适合作为agent跑着服务器上
  4. 。。。

所以Beats其实是一套框架,另外的一个子项目Libbeat,就是所有beats都共用的模块,封装了所有的公共的组件,如配置管理、公共基础类、协议的解析处理、与Elasticsearch的操作等等,你可以很方便基于它实现你自己的beats,这也是Beats的目标,希望将来会出现更多的Beats,做各种各样的事情。
 
另外PacketBeat比较特殊,它又是网络协议抓包和处理的一个框架,目前支持了常见的一些协议,要扩展未知的协议其实非常简单,PacketBeat作为一个框架,数据抓包和后续的存储已经帮你处理好了,你只需要实现你的协议的解码操作就行了,当然这块也是最难和最业务相关的。
 
关于PacketBeat我回头再单独写一篇文章来介绍怎样编写一个PacketBeat的协议扩展吧,PacketBeat扩展的其它协议最终还是需要和PacketBeat集成在一起,也就是最终你的代码是要和PacketBeat的代码在一个工程里面的,而其它的Beats使用Libbeat完全是单独的Beat,如Filebeat和TopBeat,完全是独立打包和独立运行,这个也是两大Beats的主要区别。
 
随便提一下,现在所有的这些Beats已经合并到一个项目里面来方便管理了,golang,you know:https://github.com/elastic/beats
 
现在社区已经提交了的Beats:
https://www.elastic.co/guide/e ... .html
 
明后天在Beijing的ArchSummit2015,我将在Elastic展台,欢迎过来骚扰,领取Elastic的各种贴纸,还有限量的印有Elastic的T恤,数量有限哦
 
今天的Advent就这些吧。
Advent接力活动,规则:http://elasticsearch.cn/article/20
  收起阅读 »

Day14: percolator接口在logstash中的运用

我们都知道 Elasticsearch 除了普通的 search 接口以外,还有另一个 Percolator 接口,天生用来做实时过滤告警的。但是由于接口比较复杂,在目前的 ELK 体系中不是很容易运用。

而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。

这个插件的设计逻辑是:
  1. 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
  2. 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
  3. Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
  4. 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据

Percolator 接口的用法简单说是这样:

创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:
curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。

对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:
$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
我们都知道 Elasticsearch 除了普通的 search 接口以外,还有另一个 Percolator 接口,天生用来做实时过滤告警的。但是由于接口比较复杂,在目前的 ELK 体系中不是很容易运用。

而单纯从 Logstash 来做实时过滤报警,规则又不是很灵活。toplog.io 公司开发了一个 logstash-output-percolator插件,在有一定既定条件的情况下,成功运用上了 Percolator 方案。

这个插件的设计逻辑是:
  1. 通过 logstash-filter-checksum 自主生成 ES 文档的 _id;
  2. 使用上一步生成的 _id 同时发送 logstash-output-elasticsearch 和 logstash-output-percolator
  3. Percolator 接口一旦过滤成功,将 _id 发送给 Redis 服务器
  4. 其他系统从 Redis 服务器中获取 _id 即可从 ES 里拿到实际数据

Percolator 接口的用法简单说是这样:

创建接口:
curl -XPUT 'localhost:9200/patterns/.percolator/my-pattern-id' -d '{"query" : {"match" : {"message" : "ERROR"} } }'
过滤测试:
curl -XGET 'localhost:9200/my-index/my-type/_percolate' -d '{"doc" : {"message" : "ERROR: Service Apache failed to connect to MySQL"} }'
要点就是把文档放在 doc 属性里发送到 _percolate 里。

对应的 Logstash 配置如下:
filter {
checksum {
algorithm => "md5"
keys => ["message"]
}
}
output {
elasticsearch {
host => "localhost"
cluster => "my-cluster"
document_id => "%{logstash_checksum}"
index => "my-index"
}
percolator {
host => "es-balancer"
redis_host => ["localhost"]
document_id => "%{logstash_checksum}"
pattern_index => "patterns"
}
}
连接上对应的 Redis,就可以看到报警信息了:
$ redis-cli
127.0.0.1:6379> lrange percolator 0 1
1) "{\"matches\":[\"2\"],\"document_id\":\"a5d5c5f69b26ac0597370c9b1e7a8111\"}"
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day13: ipip.net介绍

Geo 定位在 ELK 应用中是非常重要和有用的一个环节。不幸的是:GeoIP 本身在国内的准确度实在堪忧。高春辉近年成立了一个项目,专注收集细化 IP 地址在国内的数据:http://www.ipip.net。数据分为免费版和收费版两种。项目提供了不少客户端,有趣的是,有社区贡献了一个 Logstash 插件:https://github.com/bittopaz/logstash-filter-ipip

用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:
{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
Geo 定位在 ELK 应用中是非常重要和有用的一个环节。不幸的是:GeoIP 本身在国内的准确度实在堪忧。高春辉近年成立了一个项目,专注收集细化 IP 地址在国内的数据:http://www.ipip.net。数据分为免费版和收费版两种。项目提供了不少客户端,有趣的是,有社区贡献了一个 Logstash 插件:https://github.com/bittopaz/logstash-filter-ipip

用法很简单:
filter {
ipip {
source => "clientip"
target => "ipip"
}
}
生成的 JSON 数据结构类似下面这样:
{
"clientip" : "",
"ipip" : {
"country" : "",
"city" : "",
"carrier" : "",
"province" : ""
}
}
不过这个插件只实现了收费版的数据库基础格式。免费版的支持,收费版高级的经纬度、基站位置等,都没有随着更新。事实上,我们可以通过 ipip 官方的 Java 库,实现一个更灵活的 logstash-filter-ipip_java 插件出来,下期见。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day12: siren-join简介

很多从 MySQL 转过来的 Elasticsearch 用户总是很习惯的问一个问题:『怎么在 ES 里实现 join 操作?』过去,我们的回答一般都是:通过类似宽表的思路,将数据平铺在一个索引里。不过,最近另一家 Lucene 开发商给出了另一个方案,他们开发了一个 Elasticsearch 插件,实现了 filter 层面的 join,GitHub 项目地址见:https://github.com/sirensolutions/siren-join

不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。

安装和其他 ES 插件一样:
# bin/plugin -i solutions.siren/siren-join/1.0
注意 siren-join v1.0 只支持 ES 1.7 版本,2.0 版本支持据说正在开发中。

我们 bulk 上传这么一段数据:
{"index":{"_index":"index1","_type":"type","_id":"1"}}
{"id":1, "foreign_key":"13"}
{"index":{"_index":"index1","_type":"type","_id":"2"}}
{"id":2}
{"index":{"_index":"index1","_type":"type","_id":"3"}}
{"id":3, "foreign_key": "2"}
{"index":{"_index":"index1","_type":"type","_id":"4"}}
{"id":4, "foreign_key": "14"}
{"index":{"_index":"index1","_type":"type","_id":"5"}}
{"id":5, "foreign_key": "2"}
{"index":{"_index":"index2","_type":"type","_id":"1"}}
{"id":"1", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"2"}}
{"id":"2", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"3"}}
{"id":"3", "tag": "bbb"}
{"index":{"_index":"index2","_type":"type","_id":"4"}}
{"id":"4", "tag": "ccc"}
注意,siren-join 要求用来 join 的字段必须数据类型一致。所以,当我们要用 index2 的 id 和 index1 的foreign_key 做 join 的时候,这两个字段就要保持一致,这里为了演示,特意都改成字符串。那么我们发起一个请求如下:
# curl -s -XPOST 'http://localhost:9200/index1/_coordinate_search?pretty' -d '
{
"query":{
"filtered":{
"query":{
"match_all":{}
},
"filter":{
"filterjoin":{
"foreign_key":{
"index":"index2",
"type":"type",
"path":"id",
"query":{
"terms":{
"tag":["aaa"]
}
}
}
}
}
}
},
"aggs":{
"avg":{
"avg":{
"field":"id"
}
}
}
}'
意即:从 index2 中搜索 q=tag:aaa 的数据的 id,查找 index1 中对应 foreign_key 的文档的 id 数据平均值。响应结果如下:
{
"coordinate_search" : {
"actions" : [ {
"relations" : {
"from" : {
"indices" : [ ],
"types" : [ ],
"field" : "id"
},
"to" : {
"indices" : null,
"types" : null,
"field" : "foreign_key"
}
},
"size" : 2,
"size_in_bytes" : 20,
"is_pruned" : false,
"cache_hit" : true,
"took" : 0
} ]
},
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "index1",
"_type" : "type",
"_id" : "5",
"_score" : 1.0,
"_source":{"id":5, "foreign_key": "2"}
}, {
"_index" : "index1",
"_type" : "type",
"_id" : "3",
"_score" : 1.0,
"_source":{"id":3, "foreign_key": "2"}
} ]
},
"aggregations" : {
"avg" : {
"value" : 4.0
}
}
}
响应告诉我们:从 index2 中搜索到 2 条参与 join 的文档,在 index1 中命中 2 条数据,最后求平均值为 4.0。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
很多从 MySQL 转过来的 Elasticsearch 用户总是很习惯的问一个问题:『怎么在 ES 里实现 join 操作?』过去,我们的回答一般都是:通过类似宽表的思路,将数据平铺在一个索引里。不过,最近另一家 Lucene 开发商给出了另一个方案,他们开发了一个 Elasticsearch 插件,实现了 filter 层面的 join,GitHub 项目地址见:https://github.com/sirensolutions/siren-join

不过需要提醒一下的是:filter 层面的意思,就是只相当于是 SQL 里的 exists 操作。所以目前对这个插件也不要抱有太大期望。今天我们来稍微演示一下。

安装和其他 ES 插件一样:
# bin/plugin -i solutions.siren/siren-join/1.0
注意 siren-join v1.0 只支持 ES 1.7 版本,2.0 版本支持据说正在开发中。

我们 bulk 上传这么一段数据:
{"index":{"_index":"index1","_type":"type","_id":"1"}}
{"id":1, "foreign_key":"13"}
{"index":{"_index":"index1","_type":"type","_id":"2"}}
{"id":2}
{"index":{"_index":"index1","_type":"type","_id":"3"}}
{"id":3, "foreign_key": "2"}
{"index":{"_index":"index1","_type":"type","_id":"4"}}
{"id":4, "foreign_key": "14"}
{"index":{"_index":"index1","_type":"type","_id":"5"}}
{"id":5, "foreign_key": "2"}
{"index":{"_index":"index2","_type":"type","_id":"1"}}
{"id":"1", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"2"}}
{"id":"2", "tag": "aaa"}
{"index":{"_index":"index2","_type":"type","_id":"3"}}
{"id":"3", "tag": "bbb"}
{"index":{"_index":"index2","_type":"type","_id":"4"}}
{"id":"4", "tag": "ccc"}
注意,siren-join 要求用来 join 的字段必须数据类型一致。所以,当我们要用 index2 的 id 和 index1 的foreign_key 做 join 的时候,这两个字段就要保持一致,这里为了演示,特意都改成字符串。那么我们发起一个请求如下:
# curl -s -XPOST 'http://localhost:9200/index1/_coordinate_search?pretty' -d '
{
"query":{
"filtered":{
"query":{
"match_all":{}
},
"filter":{
"filterjoin":{
"foreign_key":{
"index":"index2",
"type":"type",
"path":"id",
"query":{
"terms":{
"tag":["aaa"]
}
}
}
}
}
}
},
"aggs":{
"avg":{
"avg":{
"field":"id"
}
}
}
}'
意即:从 index2 中搜索 q=tag:aaa 的数据的 id,查找 index1 中对应 foreign_key 的文档的 id 数据平均值。响应结果如下:
{
"coordinate_search" : {
"actions" : [ {
"relations" : {
"from" : {
"indices" : [ ],
"types" : [ ],
"field" : "id"
},
"to" : {
"indices" : null,
"types" : null,
"field" : "foreign_key"
}
},
"size" : 2,
"size_in_bytes" : 20,
"is_pruned" : false,
"cache_hit" : true,
"took" : 0
} ]
},
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
},
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [ {
"_index" : "index1",
"_type" : "type",
"_id" : "5",
"_score" : 1.0,
"_source":{"id":5, "foreign_key": "2"}
}, {
"_index" : "index1",
"_type" : "type",
"_id" : "3",
"_score" : 1.0,
"_source":{"id":3, "foreign_key": "2"}
} ]
},
"aggregations" : {
"avg" : {
"value" : 4.0
}
}
}
响应告诉我们:从 index2 中搜索到 2 条参与 join 的文档,在 index1 中命中 2 条数据,最后求平均值为 4.0。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day11: timelion请求语法

ES2.0 开始提供了一个崭新的 pipeline aggregation 特性,但是 Kibana 似乎并没有立刻跟进这方面的意思,相反,Elastic 公司推出了另一个实验室产品:Timelion。
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。

timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。

为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:

可视化效果类:
    .bars($width): 用柱状图展示数组
.lines($width, $fill, $show, $steps): 用折线图展示数组
.points(): 用散点图展示数组
.color("#c6c6c6"): 改变颜色
.hide(): 隐藏该数组
.label("change from %s"): 标签
.legend($position, $column): 图例位置
.yaxis($yaxis_number, $min, $max, $position): 设置 Y 轴属性,.yaxis(2) 表示第二根 Y 轴

数据运算类:
    .abs(): 对整个数组元素求绝对值
.precision($number): 浮点数精度
.testcast($count, $alpha, $beta, $gamma): holt-winters 预测
.cusum($base): 数组元素之和,再加上 $base
.derivative(): 对数组求导数
.divide($divisor): 数组元素除法
.multiply($multiplier): 数组元素乘法
.subtract($term): 数组元素减法
.sum($term): 数组元素加法
.add(): 同 .sum()
.plus(): 同 .sum()
.first(): 返回第一个元素
.movingaverage($window): 用指定的窗口大小计算移动平均值
.mvavg(): .movingaverage() 的简写
.movingstd($window): 用指定的窗口大小计算移动标准差
.mvstd(): .movingstd() 的简写
数据源设定类:
    .elasticsearch(): 从 ES 读取数据
.es(q="querystring", metric="cardinality:uid", index="logstash-*", offset="-1d"): .elasticsearch() 的简写
.graphite(metric="path.to.*.data", offset="-1d"): 从 graphite 读取数据
.quandl(): 从 quandl.com 读取 quandl 码
.worldbank_indicators(): 从 worldbank.org 读取国家数据
.wbi(): .worldbank_indicators() 的简写
.worldbank(): 从 worldbank.org 读取数据
.wb(): .worldbanck() 的简写
以上所有函数,都在 series_functions 目录下实现,每个 js 文件实现一个 TimelionFunction 功能。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
ES2.0 开始提供了一个崭新的 pipeline aggregation 特性,但是 Kibana 似乎并没有立刻跟进这方面的意思,相反,Elastic 公司推出了另一个实验室产品:Timelion。
timelion 的用法在官博里已经有介绍。尤其是最近两篇如何用 timelion 实现异常告警的文章,更是从 ES 的 pipeline aggregation 细节和场景一路讲到 timelion 具体操作,我这里几乎没有再重新讲一遍 timelion 操作入门的必要了。不过,官方却一直没有列出来 timelion 支持的请求语法的文档,而是在页面上通过点击图标的方式下拉帮助。

timelion 页面设计上,更接近 Kibana3 而不是 Kibana4。比如 panel 分布是通过设置几行几列的数目来固化的;query 框是唯一的,要修改哪个 panel 的 query,鼠标点选一下 panel,query 就自动切换成这个 panel 的了。

为了方便大家在上手之前了解 timelion 能做到什么,今天特意把 timelion 的请求语法所支持的函数分为几类,罗列如下:

可视化效果类:
    .bars($width): 用柱状图展示数组
.lines($width, $fill, $show, $steps): 用折线图展示数组
.points(): 用散点图展示数组
.color("#c6c6c6"): 改变颜色
.hide(): 隐藏该数组
.label("change from %s"): 标签
.legend($position, $column): 图例位置
.yaxis($yaxis_number, $min, $max, $position): 设置 Y 轴属性,.yaxis(2) 表示第二根 Y 轴

数据运算类:
    .abs(): 对整个数组元素求绝对值
.precision($number): 浮点数精度
.testcast($count, $alpha, $beta, $gamma): holt-winters 预测
.cusum($base): 数组元素之和,再加上 $base
.derivative(): 对数组求导数
.divide($divisor): 数组元素除法
.multiply($multiplier): 数组元素乘法
.subtract($term): 数组元素减法
.sum($term): 数组元素加法
.add(): 同 .sum()
.plus(): 同 .sum()
.first(): 返回第一个元素
.movingaverage($window): 用指定的窗口大小计算移动平均值
.mvavg(): .movingaverage() 的简写
.movingstd($window): 用指定的窗口大小计算移动标准差
.mvstd(): .movingstd() 的简写
数据源设定类:
    .elasticsearch(): 从 ES 读取数据
.es(q="querystring", metric="cardinality:uid", index="logstash-*", offset="-1d"): .elasticsearch() 的简写
.graphite(metric="path.to.*.data", offset="-1d"): 从 graphite 读取数据
.quandl(): 从 quandl.com 读取 quandl 码
.worldbank_indicators(): 从 worldbank.org 读取国家数据
.wbi(): .worldbank_indicators() 的简写
.worldbank(): 从 worldbank.org 读取数据
.wb(): .worldbanck() 的简写
以上所有函数,都在 series_functions 目录下实现,每个 js 文件实现一个 TimelionFunction 功能。

想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

Day10: 如何处理数组形式的JSON日志

ELK 收集业务日志的来源,除了应用服务器以外,还有很大一部分来自客户端。考虑到客户端网络流量的因素,一般实现上都不会要求实时上报数据,而是攒一批,等到手机连上 WIFI 网络了,再统一发送出来。所以,这类客户端日志一般都有几个特点:
  1. 预先已经记录成 JSON 了;
  2. 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
  3. 一次 POST 会有几 MB 到几十 MB 大小。


在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。

假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:
input {
tcp {
codec => json
}
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。
filter {
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
remove_fields => ["logs", "timestamp"]
}
}
这样,就可以得到两个 event 了:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环:
    original_value = event[@field]

if original_value.is_a?(Array)
splits = original_value
elsif original_value.is_a?(String)
splits = original_value.split(@terminator, -1)
else
raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
end

return if splits.length == 1

splits.each do |value|
next if value.empty?

event_split = event.clone
@logger.debug("Split event", :value => value, :field => @field)
event_split[(@target || @field)] = value
filter_matched(event_split)

yield event_split
end
event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。
继续阅读 »
ELK 收集业务日志的来源,除了应用服务器以外,还有很大一部分来自客户端。考虑到客户端网络流量的因素,一般实现上都不会要求实时上报数据,而是攒一批,等到手机连上 WIFI 网络了,再统一发送出来。所以,这类客户端日志一般都有几个特点:
  1. 预先已经记录成 JSON 了;
  2. 日志主体内容是一个巨大无比的数组,数据元素才是实际的单次日志记录;
  3. 一次 POST 会有几 MB 到几十 MB 大小。


在处理这类数据的时候,第一关是别让数据超长直接给丢弃了(说的就是你啊,Rsyslog);第二关就是拆分 JSON 数组,把几十 MB 数据扔 ES 字段里,显然是不利于搜索和统计需求的。今天我们就来说说怎么拆分 JSON 数组。

假设收到的是这么一段日志:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
首先我们知道可以在读取的时候把 JSON 数据解析成 LogStash::Event 对象:
input {
tcp {
codec => json
}
}
但是怎么把解析出来的 logs 字段拆分成多个 event 呢?这里我们可以用一个已有插件:logstash-filter-split。
filter {
split {
field => "logs"
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
remove_fields => ["logs", "timestamp"]
}
}
这样,就可以得到两个 event 了:
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","@timestamp":"2015-12-10T09:55:00Z","reason":"****"}
{"uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10T09:56:12Z","tracert":"****"}
看起来可能跟这个插件的文档描述不太一样。文档上写的是通过 terminator 字符,切割 field 字符串成多个 event。但实际上,field 设置是会自动判断的,如果 field 内容是字符串,就切割字符串成为数组再循环;如果内容已经是数组了,直接循环:
    original_value = event[@field]

if original_value.is_a?(Array)
splits = original_value
elsif original_value.is_a?(String)
splits = original_value.split(@terminator, -1)
else
raise LogStash::ConfigurationError, "Only String and Array types are splittable. field:#{@field} is of type = #{original_value.class}"
end

return if splits.length == 1

splits.each do |value|
next if value.empty?

event_split = event.clone
@logger.debug("Split event", :value => value, :field => @field)
event_split[(@target || @field)] = value
filter_matched(event_split)

yield event_split
end
event.cancel
顺带提一句:这里 yield 在 Logstash 1.5.0 之前,实现有问题,生成的新事件,不会继续执行后续 filter,直接进入到 output 阶段。也就是说,如果你用 Logstash 1.4.2 来执行上面那段配置,生成的两个事件会是这样的:
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
{"@timestamp":"2015-12-10T09:38:13Z","uid":123456,"upload_datetime":"2015-12-10 11:38:11","type":"network_error","@timestamp":"2015-12-10 17:56:12","tracert":"****","logs":[{"type":"crash","timestamp":"2015-12-10 17:55:00","reason":"****"},{"type":"network_error","timestamp":"2015-12-10 17:56:12","tracert":"****"}]}
想了解更全面的 ELK Stack 知识和细节,欢迎购买我的《ELK Stack权威指南》,也欢迎加 QQ 群:315428175 哟。 收起阅读 »

elasticsearch源码调试环境小结

前端时间折腾了一下源码调试的问题,简单总结以下。
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
   | The path to the local repository maven will use to store artifacts.
   |
   | Default: ${user.home}/.m2/repository
  <localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0 
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。

继续阅读 »
前端时间折腾了一下源码调试的问题,简单总结以下。
---------------------
调试环境是window(linux理论上通用)
用到的工具类:
1:mvn:https://maven.apache.org/
elasticsearch的源码是用mvn工具管理的,根据pom.xml来下载一些依赖包非常方便。
(当然也可以用gradle,由于不太熟悉,就没研究)
安装mvn,注意配置后环境变量即可。官方文档写的很明白。
最好自己修改一下mvn的setting.xml文件中的本地repo
<!-- localRepository
   | The path to the local repository maven will use to store artifacts.
   |
   | Default: ${user.home}/.m2/repository
  <localRepository>/path/to/local/repo</localRepository>
-->
我设置成了:
<localRepository>E:/m2/repository</localRepository>
mvn -v 测试以下
2:eclipse:编辑器,应用应该还比较广泛的。我用的最新版的mars。
(intellij idea据说这是一个很牛逼的编辑器,也是因为暂时不熟悉,还没研究)
----------------------
步骤:
1: 去github上选择一个tag版本,我用的是2.1.0.
https://github.com/elastic/ela ... 2.1.0 
直接DownloadZip文件即可
(也可以用git clone下来)
解压缩。
假设目录为E:/elasticsearch-2.1.0
2: 编译源代码
cmd 打开命令行
进入源文件目录 E:/elasticsearch-2.1.0
执行 mvn package命令
这个时间段耗时比较长,当然也得根据网速情况。
会出现失败,大多是因为拉取不到依赖包。可以根据提示信息,手动去下载失败的jar,然后拷贝到本地repo对应的文件夹下边即可。
等出现build success信息的时候代表成功了。
可以到core/target目录下看到elasticsearch-2.1.0-SNAPSHOT.jar。
3:转为eclipse工程
可能习惯了eclipse工程,所以这里就直接用mvn转成了eclipse的工程,生成.classpath和.project文件。
进入core目录执行以下指令
mvn eclipse:eclipse
这一步也会消耗一些时间,通常的错误也是jar包下载不成功,根据终端打印的错误信息,把对应jar包直接下载下来,放到本地的repo对应目录下边即可,然后重新运行命令。直到成功。
之后,就会发现出现了.classpath和.project文件了。
然后打开eclipse 直接带入core中的工程即可。
4: 设置运行参数
打开刚刚导入成功的工程:
Run As----Run Configution---Args
设置ProgramArgument 为 start
设置VMArgument为 -Des.path.home=E:\elasticsearch-2.1.0\core\
完毕
-------
现在就就可以运行+调试了。

收起阅读 »