Q:非洲食人族的酋长吃什么?

ElasticSearch插件集

ElasticSearch的很多功能都是官方或第三方基于ElasticSearch的AbstractPlugin类实现的插件来提供的,所以,在里里记录下一些常用的及实用的插件地址,以备不时之需

分词插件

Combo Analysis Plugin (作者 Olivier Favre, Yakaz)

简介:组合分词器,可以把多个分词器的结果组合在一起。

Smart Chinese Analysis Plugin (作者 elasticsearch 团队)

简介:lucene默认的中文分词器

ICU Analysis plugin (作者 elasticsearch 团队)

简介:lucene自带的ICU分词,ICU是一套稳定、成熟、功能强大、轻便易用和跨平台支持Unicode 的开发包。

Stempel (Polish) Analysis plugin (作者 elasticsearch 团队)

简介:法文分词器

IK Analysis Plugin (作者 Medcl)

简介:大名鼎鼎的ik分词,都懂的!

Mmseg Analysis Plugin (作者 Medcl)

简介:mmseg中文分词

Hunspell Analysis Plugin (作者 Jörg Prante)

简介:lucene自带的Hunspell模块

Japanese (Kuromoji) Analysis plugin (作者 elasticsearch 团队).

简介:日文分词器

Japanese Analysis plugin (作者 suguru).

简介:日文分词器

Russian and English Morphological Analysis Plugin (作者 Igor Motov)

简介:俄文英文分词器

Pinyin Analysis Plugin (作者 Medcl)

简介:拼音分词器

String2Integer Analysis Plugin (作者 Medcl)

简介:字符串转整型工具。主要用在facet这个功能上,如果facet的field的值是字符串的话,计算起来比较耗资源。可以把字符串映射成整型,对整型进行facet操作要比对字符串的快很多。

同步插件

CouchDB River Plugin (作者 elasticsearch 团队)

简介:CouchDB和elasticsearch的同步插件

Wikipedia River Plugin (作者 elasticsearch 团队)

简介:wikipedia文件读取插件。wikipedia是维基百科的一个离线库,不定期发布最新数据,是以xml形式发布的。这个river读取这个文件来建索引。

Twitter River Plugin (作者 elasticsearch 团队)

简介:twitter的同步插件,可以同步你twitter上的微博。

RabbitMQ River Plugin (作者 elasticsearch 团队)

简介:rabbitmq同步插件,读取rabbitmq上的队列信息并索引。

RSS River Plugin (作者 David Pilato)

简介:定期索引指定一个或多个RSS源的数据。

MongoDB River Plugin (作者 Richard Louapre)

简介:mongodb同步插件,mongodb必须搭成副本集的模式,因为这个插件的原理是通过定期读取mongodb中的oplog来同步数据。

Open Archives Initiative (OAI) River Plugin (作者 Jörg Prante)

简介:可以索引oai数据提供者提供的数据。

Sofa River Plugin (作者 adamlofts)

简介:这个插件可以把多个CouchDB的数据库同步到同一个es索引中。

JDBC River Plugin (作者 Jörg Prante)

简介:关系型数据库的同步插件

FileSystem River Plugin (作者 David Pilato)

简介:本地文件系统文件同步插件,使用方法是指定一个本地目录路径,es会定期扫描索引该目录下的文件。

LDAP River Plugin (作者 Tanguy Leroux)

简介:索引LDAP目录下的文件数据。

Dropbox River Plugin (作者 David Pilato)

简介:索引dropbox网盘上的文件。通过oauth协议来调用dropbox上的api建索引。

ActiveMQ River Plugin (作者 Dominik Dorn)

简介:activemq队列的同步插件,和之前rabbitmq的类似

Solr River Plugin (作者 Luca Cavanna)

简介:solr同步插件,可以把solr里面的索引同步到es

CSV River Plugin (作者 Martin Bednar)

简介:通过指定目录地址来索引csv文件。

数据传输插件

Servlet transport (作者 elasticsearch 团队)

简介:Servlet rest插件,通过servlet来封装rest接口。

Memcached transport plugin (作者 elasticsearch 团队)

简介:本插件可以通过memcached协议进行rest接口的调用。注意:这里不是使用memcache作为es的缓存。

Thrift Transport (作者 elasticsearch 团队)

简介:使用thrift进行数据传输。

ZeroMQ transport layer plugin (作者 Tanguy Leroux)

简介:使用zeromq进rest接口的调用。

Jetty HTTP transport plugin (作者 Sonian Inc.)

简介:使用jetty来提供http rest接口。默认是使用netty。这个插件的好处是可以对http接口进行一些权限的设置。

脚本插件

Python language Plugin (作者 elasticsearch 团队)

简介:python脚本支持

JavaScript language Plugin (作者 elasticsearch 团队)

简介:javascript脚本支持

Groovy lang Plugin (作者 elasticsearch 团队)

简介:groovy脚本支持

Clojure Language Plugin (作者 Kevin Downey)

简介:clojure脚本支持

站点插件(以网页形式展现)

BigDesk Plugin (作者 Lukáš Vlček)

简介:监控es状态的插件,推荐!

Elasticsearch Head Plugin (作者 Ben Birch)

简介:很方便对es进行各种操作的客户端。

Paramedic Plugin (作者 Karel Minařík)

简介:es监控插件

SegmentSpy Plugin (作者 Zachary Tong)

简介:查看es索引segment状态的插件

Inquisitor Plugin (作者 Zachary Tong)

简介:这个插件主要用来调试你的查询。

其它插件

Mapper Attachments Type plugin (作者 elasticsearch 团队)

简介:附件类型插件,通过tika库把各种类型的文件格式解析成字符串。

Hadoop Plugin (作者 elasticsearch team)

简介:hadoop和elasticsearch的集成插件,可以通过hadoop的mapreduce算法来并行建立索引,同时支持cascading,hive和pig等框架。

AWS Cloud Plugin (作者 elasticsearch 团队)

简介:elasticsearch与amazon web services的集成。

ElasticSearch Mock Solr Plugin (作者 Matt Weber)

简介:elasticsearch的solr api接口。用了这个插件可以使用solr的api来调用es,直接用solrj就可以调用es。比较适用于从solr转es时暂时过度。

Suggester Plugin (作者 Alexander Reelsen)

简介:es 搜索提示功能插件,不过es0.9版本后自带了这个功能,

ElasticSearch PartialUpdate Plugin (作者 Medcl)

简介:elasticsearch的部分更新插件。

ZooKeeper Discovery Plugin (作者 Sonian Inc.)

简介:通过zookeeper管理集群的插件。通过这个插件,es的分布式架构和solrcloud相似。

ElasticSearch Changes Plugin (作者 Thomas Peuss)

简介:elasticsearch索引操作记录插件。通过这个插件可以查看用户对索引的增删改操作。

ElasticSearch View Plugin (作者 Tanguy Leroux)

简介:这个插件可以把es的文档以html,xml或text的方式显示出来,它也可以通过查询生成web页面。

ElasticSearch New Relic Plugin (作者 Vinicius Carvalho)

简介:elasticsearch和newrelic的集成插件。newrelica是一个性能监控工具。这个插件会把节点的状态数据传到newrelic的账号上。
社区的编辑器好像不支持复制富文本信息,所以插件都没有链接,插件太多懒得一个个打链接了,想点地址的可以移步寒舍http://www.kailing.pub/article/index/arcid/87.html
 
继续阅读 »
ElasticSearch的很多功能都是官方或第三方基于ElasticSearch的AbstractPlugin类实现的插件来提供的,所以,在里里记录下一些常用的及实用的插件地址,以备不时之需

分词插件

Combo Analysis Plugin (作者 Olivier Favre, Yakaz)

简介:组合分词器,可以把多个分词器的结果组合在一起。

Smart Chinese Analysis Plugin (作者 elasticsearch 团队)

简介:lucene默认的中文分词器

ICU Analysis plugin (作者 elasticsearch 团队)

简介:lucene自带的ICU分词,ICU是一套稳定、成熟、功能强大、轻便易用和跨平台支持Unicode 的开发包。

Stempel (Polish) Analysis plugin (作者 elasticsearch 团队)

简介:法文分词器

IK Analysis Plugin (作者 Medcl)

简介:大名鼎鼎的ik分词,都懂的!

Mmseg Analysis Plugin (作者 Medcl)

简介:mmseg中文分词

Hunspell Analysis Plugin (作者 Jörg Prante)

简介:lucene自带的Hunspell模块

Japanese (Kuromoji) Analysis plugin (作者 elasticsearch 团队).

简介:日文分词器

Japanese Analysis plugin (作者 suguru).

简介:日文分词器

Russian and English Morphological Analysis Plugin (作者 Igor Motov)

简介:俄文英文分词器

Pinyin Analysis Plugin (作者 Medcl)

简介:拼音分词器

String2Integer Analysis Plugin (作者 Medcl)

简介:字符串转整型工具。主要用在facet这个功能上,如果facet的field的值是字符串的话,计算起来比较耗资源。可以把字符串映射成整型,对整型进行facet操作要比对字符串的快很多。

同步插件

CouchDB River Plugin (作者 elasticsearch 团队)

简介:CouchDB和elasticsearch的同步插件

Wikipedia River Plugin (作者 elasticsearch 团队)

简介:wikipedia文件读取插件。wikipedia是维基百科的一个离线库,不定期发布最新数据,是以xml形式发布的。这个river读取这个文件来建索引。

Twitter River Plugin (作者 elasticsearch 团队)

简介:twitter的同步插件,可以同步你twitter上的微博。

RabbitMQ River Plugin (作者 elasticsearch 团队)

简介:rabbitmq同步插件,读取rabbitmq上的队列信息并索引。

RSS River Plugin (作者 David Pilato)

简介:定期索引指定一个或多个RSS源的数据。

MongoDB River Plugin (作者 Richard Louapre)

简介:mongodb同步插件,mongodb必须搭成副本集的模式,因为这个插件的原理是通过定期读取mongodb中的oplog来同步数据。

Open Archives Initiative (OAI) River Plugin (作者 Jörg Prante)

简介:可以索引oai数据提供者提供的数据。

Sofa River Plugin (作者 adamlofts)

简介:这个插件可以把多个CouchDB的数据库同步到同一个es索引中。

JDBC River Plugin (作者 Jörg Prante)

简介:关系型数据库的同步插件

FileSystem River Plugin (作者 David Pilato)

简介:本地文件系统文件同步插件,使用方法是指定一个本地目录路径,es会定期扫描索引该目录下的文件。

LDAP River Plugin (作者 Tanguy Leroux)

简介:索引LDAP目录下的文件数据。

Dropbox River Plugin (作者 David Pilato)

简介:索引dropbox网盘上的文件。通过oauth协议来调用dropbox上的api建索引。

ActiveMQ River Plugin (作者 Dominik Dorn)

简介:activemq队列的同步插件,和之前rabbitmq的类似

Solr River Plugin (作者 Luca Cavanna)

简介:solr同步插件,可以把solr里面的索引同步到es

CSV River Plugin (作者 Martin Bednar)

简介:通过指定目录地址来索引csv文件。

数据传输插件

Servlet transport (作者 elasticsearch 团队)

简介:Servlet rest插件,通过servlet来封装rest接口。

Memcached transport plugin (作者 elasticsearch 团队)

简介:本插件可以通过memcached协议进行rest接口的调用。注意:这里不是使用memcache作为es的缓存。

Thrift Transport (作者 elasticsearch 团队)

简介:使用thrift进行数据传输。

ZeroMQ transport layer plugin (作者 Tanguy Leroux)

简介:使用zeromq进rest接口的调用。

Jetty HTTP transport plugin (作者 Sonian Inc.)

简介:使用jetty来提供http rest接口。默认是使用netty。这个插件的好处是可以对http接口进行一些权限的设置。

脚本插件

Python language Plugin (作者 elasticsearch 团队)

简介:python脚本支持

JavaScript language Plugin (作者 elasticsearch 团队)

简介:javascript脚本支持

Groovy lang Plugin (作者 elasticsearch 团队)

简介:groovy脚本支持

Clojure Language Plugin (作者 Kevin Downey)

简介:clojure脚本支持

站点插件(以网页形式展现)

BigDesk Plugin (作者 Lukáš Vlček)

简介:监控es状态的插件,推荐!

Elasticsearch Head Plugin (作者 Ben Birch)

简介:很方便对es进行各种操作的客户端。

Paramedic Plugin (作者 Karel Minařík)

简介:es监控插件

SegmentSpy Plugin (作者 Zachary Tong)

简介:查看es索引segment状态的插件

Inquisitor Plugin (作者 Zachary Tong)

简介:这个插件主要用来调试你的查询。

其它插件

Mapper Attachments Type plugin (作者 elasticsearch 团队)

简介:附件类型插件,通过tika库把各种类型的文件格式解析成字符串。

Hadoop Plugin (作者 elasticsearch team)

简介:hadoop和elasticsearch的集成插件,可以通过hadoop的mapreduce算法来并行建立索引,同时支持cascading,hive和pig等框架。

AWS Cloud Plugin (作者 elasticsearch 团队)

简介:elasticsearch与amazon web services的集成。

ElasticSearch Mock Solr Plugin (作者 Matt Weber)

简介:elasticsearch的solr api接口。用了这个插件可以使用solr的api来调用es,直接用solrj就可以调用es。比较适用于从solr转es时暂时过度。

Suggester Plugin (作者 Alexander Reelsen)

简介:es 搜索提示功能插件,不过es0.9版本后自带了这个功能,

ElasticSearch PartialUpdate Plugin (作者 Medcl)

简介:elasticsearch的部分更新插件。

ZooKeeper Discovery Plugin (作者 Sonian Inc.)

简介:通过zookeeper管理集群的插件。通过这个插件,es的分布式架构和solrcloud相似。

ElasticSearch Changes Plugin (作者 Thomas Peuss)

简介:elasticsearch索引操作记录插件。通过这个插件可以查看用户对索引的增删改操作。

ElasticSearch View Plugin (作者 Tanguy Leroux)

简介:这个插件可以把es的文档以html,xml或text的方式显示出来,它也可以通过查询生成web页面。

ElasticSearch New Relic Plugin (作者 Vinicius Carvalho)

简介:elasticsearch和newrelic的集成插件。newrelica是一个性能监控工具。这个插件会把节点的状态数据传到newrelic的账号上。
社区的编辑器好像不支持复制富文本信息,所以插件都没有链接,插件太多懒得一个个打链接了,想点地址的可以移步寒舍http://www.kailing.pub/article/index/arcid/87.html
  收起阅读 »

Shanghai Elastic Meetup启动啦!

Shanghai Elastic Meetup

时间:2016年5月7日 13:30

地点:上海市徐汇区广元西路55号浩然科技大厦1808(交通大学内)

报名链接:

[Meetup](http://www.meetup.com/Shanghai ... 07915/)

[微信](https://jinshuju.net/f/Ed5I5o)

## 《ES用于时间序列存储 - Hickwall监控报警平台简介》

唐锐华  携程旅行网软件技术专家

简介:
  
  
  随着携程业务的扩张,新应用不断涌现,基础监控和应用监控的需求迅猛增长,zabbix已经不堪负重。在调研了很多开源的解决方案之后发觉或多或少都存在不太满意的地方。
  所以在借鉴多种方案的基础上重新设计开发了一套监控告警系统。其中对比过多种现有存储方案之后我们选择了ES。这里和大家分享一下这个系统和ES用在在我们场景中的优缺点。

提纲:

* 为什么会有这个项目
* 现有开源项目的调研
* 项目的整体设计与其特点
* ES在使用过程中碰到的问题

## 《ES在日志分析产品中的实践》

简介:

主要介绍如何在JAVA开发产品中使用ES,以及常用的ES JAVA接口,以及JAVA代码阅读的简单说明

2010年进入深圳天源迪科从事运营商业务系统相关的开发工作,期间做过软件开发,需求分析师,架构师等职务,后面2006年进入江苏保旺达从事安全产品相关的研发。从毕业到工作十几年的时间大部分都在做和技术相关的工作,本人非常热爱技术,热爱开发,现任赛克蓝德公司技术总监,从事数据分析领域相关产品的研发,现在主要研发日志分析产品(SeciLog)。

## 《Hangout: 一个logstash indexer的替代方案》
讲师简介:

刘佳  携程旅行网软件技术专家

简介:

logstash以其丰富的插件功能,成为ELK技术栈中不可或缺的一个组件。 但目前版本的logstash主要由jruby实现,在处理日志的吞吐量方面不尽如人意。 hangout是一个类logsatsh的java实现,提供了logstash里常用的filter功能。 这里分享一下hangout的特性,实际生产环境的吞吐量以及多实例的管理方式。

提纲:

* 为何开发hangout   
* 支持的filter
* 与logsatsh性能对比
* 影响吞吐量的主要参数及其含
* 用mesos+marathon管理hangout
    
继续阅读 »
Shanghai Elastic Meetup

时间:2016年5月7日 13:30

地点:上海市徐汇区广元西路55号浩然科技大厦1808(交通大学内)

报名链接:

[Meetup](http://www.meetup.com/Shanghai ... 07915/)

[微信](https://jinshuju.net/f/Ed5I5o)

## 《ES用于时间序列存储 - Hickwall监控报警平台简介》

唐锐华  携程旅行网软件技术专家

简介:
  
  
  随着携程业务的扩张,新应用不断涌现,基础监控和应用监控的需求迅猛增长,zabbix已经不堪负重。在调研了很多开源的解决方案之后发觉或多或少都存在不太满意的地方。
  所以在借鉴多种方案的基础上重新设计开发了一套监控告警系统。其中对比过多种现有存储方案之后我们选择了ES。这里和大家分享一下这个系统和ES用在在我们场景中的优缺点。

提纲:

* 为什么会有这个项目
* 现有开源项目的调研
* 项目的整体设计与其特点
* ES在使用过程中碰到的问题

## 《ES在日志分析产品中的实践》

简介:

主要介绍如何在JAVA开发产品中使用ES,以及常用的ES JAVA接口,以及JAVA代码阅读的简单说明

2010年进入深圳天源迪科从事运营商业务系统相关的开发工作,期间做过软件开发,需求分析师,架构师等职务,后面2006年进入江苏保旺达从事安全产品相关的研发。从毕业到工作十几年的时间大部分都在做和技术相关的工作,本人非常热爱技术,热爱开发,现任赛克蓝德公司技术总监,从事数据分析领域相关产品的研发,现在主要研发日志分析产品(SeciLog)。

## 《Hangout: 一个logstash indexer的替代方案》
讲师简介:

刘佳  携程旅行网软件技术专家

简介:

logstash以其丰富的插件功能,成为ELK技术栈中不可或缺的一个组件。 但目前版本的logstash主要由jruby实现,在处理日志的吞吐量方面不尽如人意。 hangout是一个类logsatsh的java实现,提供了logstash里常用的filter功能。 这里分享一下hangout的特性,实际生产环境的吞吐量以及多实例的管理方式。

提纲:

* 为何开发hangout   
* 支持的filter
* 与logsatsh性能对比
* 影响吞吐量的主要参数及其含
* 用mesos+marathon管理hangout
     收起阅读 »

java爬虫爬取Elastic中文社区用作es测试数据

前言
为了测试es的完美功能,笔者使用爬虫爬取了Elastic中文社区和CSDN的大量数据,作为测试之用,下面简单介绍一下折腾的过程
认识 WebCollector
WebCollector是一个无须配置、便于二次开发的JAVA爬虫框架(内核),它提供精简的的API,只需少量代码即可实现一个功能强大的爬虫。WebCollector-Hadoop是WebCollector的Hadoop版本,支持分布式爬取。
WebCollector致力于维护一个稳定、可扩的爬虫内核,便于开发者进行灵活的二次开发。内核具有很强的扩展性,用户可以在内核基础上开发自己想要的爬虫。源码中集成了Jsoup,可进行精准的网页解析。2.x版本中集成了selenium,可以处理javascript生成的数据。
官网地址:http://crawlscript.github.io/WebCollector/
使用步骤
导入jar依赖,笔者是maven项目,所有加入如下pom.xml依赖
ps:笔者这里是使用的最新版的,maven仓库目前最新版的是2.09,所以使用最新的就自己下载打包吧 
环境有了后,直接新建一个类继承BreadthCrawler类重新​visit方法,你的处理逻辑都在visit方法里面,下面楼主贴下我的代码
​爬取Elastic中文社区资源
/**
* Created by 小陈 on 2016/3/29.
*/
@Component
public class ElasticCrawler extends BreadthCrawler {
@Autowired
IpaDao ipaDao;
public ElasticCrawler() {
super("crawl", true);
/*start page*/
this.addSeed("xxx");
/*fetch url like http://news.hfut.edu.cn/show-xxxxxxhtml*/
this.addRegex("xxx");
/*do not fetch jpg|png|gif*/
this.addRegex("-.*\\.(jpg|png|gif).*");
/*do not fetch url contains #*/
// this.addRegex("-.*#.*");
}
@Override
public void visit(Page page, CrawlDatums next) {
String url = page.getUrl();
String content="";
try {
content = ContentExtractor.getContentByUrl(url);
}catch (Exception e){
e.printStackTrace();
}
/*抽取标题*/
String title=page.getDoc().title();
System.out.println("-------------------->"+title);
if(!title.isEmpty() && ! content.isEmpty()){
Pa pa=new Pa(title,content);
ipaDao.save(pa);//持久化到数据库
}
}
爬取CSDN资源
/**
* @author kl by 2016/3/29
* @boke www.kailing.pub
*/
@Component
public class CSDNCrawler extends BreadthCrawler {
@Autowired
IpaDao ipaDao;
public CSDNCrawler() {
super("crawl", true);
/*start page*/
this.addSeed("http://blog.csdn.net/.*");//添加种子地址
/*fetch url like http://news.hfut.edu.cn/show-xxxxxxhtml*/
this.addRegex("http://blog.csdn.net/.*/article/details/.*");
/*do not fetch jpg|png|gif*/
this.addRegex("-.*\\.(jpg|png|gif).*");
/*do not fetch url contains #*/
// this.addRegex("-.*#.*");
}
@Override
public void visit(Page page, CrawlDatums next) {
String url = page.getUrl();
String content="";
try {
content = ContentExtractor.getContentByUrl(url);
}catch (Exception e){
e.printStackTrace();
}
if (page.matchUrl("http://blog.csdn.net/.*/article/details/.*")) {
String title = page.select("div[class=article_title]").first().text();
String author = page.select("div[id=blog_userface]").first().text();//获取作者名
System.out.println("title:" + title + "\tauthor:" + author);
if(!title.isEmpty() && ! content.isEmpty()){
Pa pa=new Pa(title,content);
ipaDao.save(pa);
}
}
}
ps:Elastic中文社区的爬取规则和谐了,楼主是爱社区的,大家可以放心的爬CSDN吧,WebCollector功能很强大,爬虫的一个关键就是需要知道网站的url规则,有兴趣的可以研究​ 下,Elastic的数据不多,分吧钟就够了,CSDN爬了5,6分钟,没有做深度的爬,取了大概二三十万的数据样子,只取标题和正文 
 
去我博客查看原文 http://www.kailing.pub/article/index/arcid/86.html
下面是导入数据的截图

QQ图片20160329221750.png


QQ图片20160329221921.png

 
继续阅读 »
前言
为了测试es的完美功能,笔者使用爬虫爬取了Elastic中文社区和CSDN的大量数据,作为测试之用,下面简单介绍一下折腾的过程
认识 WebCollector
WebCollector是一个无须配置、便于二次开发的JAVA爬虫框架(内核),它提供精简的的API,只需少量代码即可实现一个功能强大的爬虫。WebCollector-Hadoop是WebCollector的Hadoop版本,支持分布式爬取。
WebCollector致力于维护一个稳定、可扩的爬虫内核,便于开发者进行灵活的二次开发。内核具有很强的扩展性,用户可以在内核基础上开发自己想要的爬虫。源码中集成了Jsoup,可进行精准的网页解析。2.x版本中集成了selenium,可以处理javascript生成的数据。
官网地址:http://crawlscript.github.io/WebCollector/
使用步骤
导入jar依赖,笔者是maven项目,所有加入如下pom.xml依赖
ps:笔者这里是使用的最新版的,maven仓库目前最新版的是2.09,所以使用最新的就自己下载打包吧 
环境有了后,直接新建一个类继承BreadthCrawler类重新​visit方法,你的处理逻辑都在visit方法里面,下面楼主贴下我的代码
​爬取Elastic中文社区资源
/**
* Created by 小陈 on 2016/3/29.
*/
@Component
public class ElasticCrawler extends BreadthCrawler {
@Autowired
IpaDao ipaDao;
public ElasticCrawler() {
super("crawl", true);
/*start page*/
this.addSeed("xxx");
/*fetch url like http://news.hfut.edu.cn/show-xxxxxxhtml*/
this.addRegex("xxx");
/*do not fetch jpg|png|gif*/
this.addRegex("-.*\\.(jpg|png|gif).*");
/*do not fetch url contains #*/
// this.addRegex("-.*#.*");
}
@Override
public void visit(Page page, CrawlDatums next) {
String url = page.getUrl();
String content="";
try {
content = ContentExtractor.getContentByUrl(url);
}catch (Exception e){
e.printStackTrace();
}
/*抽取标题*/
String title=page.getDoc().title();
System.out.println("-------------------->"+title);
if(!title.isEmpty() && ! content.isEmpty()){
Pa pa=new Pa(title,content);
ipaDao.save(pa);//持久化到数据库
}
}
爬取CSDN资源
/**
* @author kl by 2016/3/29
* @boke www.kailing.pub
*/
@Component
public class CSDNCrawler extends BreadthCrawler {
@Autowired
IpaDao ipaDao;
public CSDNCrawler() {
super("crawl", true);
/*start page*/
this.addSeed("http://blog.csdn.net/.*");//添加种子地址
/*fetch url like http://news.hfut.edu.cn/show-xxxxxxhtml*/
this.addRegex("http://blog.csdn.net/.*/article/details/.*");
/*do not fetch jpg|png|gif*/
this.addRegex("-.*\\.(jpg|png|gif).*");
/*do not fetch url contains #*/
// this.addRegex("-.*#.*");
}
@Override
public void visit(Page page, CrawlDatums next) {
String url = page.getUrl();
String content="";
try {
content = ContentExtractor.getContentByUrl(url);
}catch (Exception e){
e.printStackTrace();
}
if (page.matchUrl("http://blog.csdn.net/.*/article/details/.*")) {
String title = page.select("div[class=article_title]").first().text();
String author = page.select("div[id=blog_userface]").first().text();//获取作者名
System.out.println("title:" + title + "\tauthor:" + author);
if(!title.isEmpty() && ! content.isEmpty()){
Pa pa=new Pa(title,content);
ipaDao.save(pa);
}
}
}
ps:Elastic中文社区的爬取规则和谐了,楼主是爱社区的,大家可以放心的爬CSDN吧,WebCollector功能很强大,爬虫的一个关键就是需要知道网站的url规则,有兴趣的可以研究​ 下,Elastic的数据不多,分吧钟就够了,CSDN爬了5,6分钟,没有做深度的爬,取了大概二三十万的数据样子,只取标题和正文 
 
去我博客查看原文 http://www.kailing.pub/article/index/arcid/86.html
下面是导入数据的截图

QQ图片20160329221750.png


QQ图片20160329221921.png

  收起阅读 »

java使用HTTP Rest client 客户端Jest连接操作es,功能很强大

前言

在了解jest框架前,楼主一直尝试用官方的Elasticsearch java api连接es服务的,可是,不知何故,一直报如下的异常信息,谷歌了很久,都说是jvm版本不一致导致的问题,可我是本地测试的,jvm肯定是一致的,这个问题现在都木有解决,but,这怎么能阻止我探索es的脚步呢,so,让我发现了jest 这个框架   


org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
我的测试代码是参考官方api实例的,官方api地址:Elasticsearch java api,代码如下:



Client client = new TransportClient().addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300)); QueryBuilder queryBuilder = QueryBuilders.termQuery("content", "搜"); SearchResponse searchResponse = client.prepareSearch("indexdata").setTypes("fulltext") .setQuery(queryBuilder) .execute() .actionGet(); SearchHits hits = searchResponse.getHits(); System.out.println("查询到记录数:" + hits.getTotalHits()); SearchHit[] searchHists = hits.getHits(); for(SearchHit sh : searchHists){ System.out.println("content:"+sh.getSource().get("content")); } client.close();
如果有人知道怎么回事,告诉一下楼主吧,让楼主坑的明白,感激不尽了,我的es版本是2.2.0


进入正题

了解jest

jest是一个基于 HTTP Rest 的连接es服务的api工具集,功能强大,能够使用es java api的查询语句,项目是开源的,github地址:https://github.com/searchbox-io/Jest




我的测试用例

分词器:ik,分词器地址:https://github.com/medcl/elasticsearch-analysis-ik ,es的很多功能都是基于插件提供的,es版本升级都2.2.0后,安装插件的方式不一样了,如果你安装ik分词插件有问题,请点击右上角的qq联系博主

新建索引

curl -XPUT http://localhost:9200/indexdata


创建索引的mapping,指定分词器

curl -XPOST http://localhost:9200/indexdata/fulltext/_mapping

{
  "fulltext": {
    "_all": {
      "analyzer": "ik_max_word",
      "search_analyzer": "ik_max_word",
      "term_vector": "no",
      "store": "false"
    },
    "properties": {
      "content": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      },
      "description": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      },
      "title": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      },
      "keyword": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      }
    }
  }
}

mapping信息可以用head插件查看,如下


导入数据和查询,看代码吧


@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = ElasticSearchTestApplication.class) public class JestTestApplicationTests { @Autowired private KlarticleDao klarticleDao; //得到JestClient实例 public JestClient getClient()throws Exception{ JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig .Builder("http://127.0.0.1:9200&quot;) .multiThreaded(true) .build()); return factory.getObject(); } /** * 导入数据库数据到es * @throws Exception */ @Test public void contextLoads() throws Exception{ JestClient client=getClient(); Listlists=klarticleDao.findAll(); for(Klarticle k:lists){ Index index = new Index.Builder(k).index("indexdata").type("fulltext").id(k.getArcid()+"").build(); System.out.println("添加索引----》"+k.getTitle()); client.execute(index); } //批量新增的方式,效率更高 Bulk.Builder bulkBuilder = new Bulk.Builder(); for(Klarticle k:lists){ Index index = new Index.Builder(k).index("indexdata").type("fulltext").id(k.getArcid()+"").build(); bulkBuilder.addAction(index); } client.execute(bulkBuilder.build()); client.shutdownClient(); } //搜索测试 @Test public void JestSearchTest()throws Exception{ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("content", "搜索")); Search search = new Search.Builder(searchSourceBuilder.toString()) // multiple index or types can be added. .addIndex("indexdata") .build(); JestClient client =getClient(); SearchResult result= client.execute(search); // List> hits = result.getHits(Klarticle.class); Listarticles = result.getSourceAsObjectList(Klarticle.class); for(Klarticle k:articles){ System.out.println("------->:"+k.getTitle()); } } }下面是依赖的jar,maven项目<!--jest依赖--> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>2.0.0</version> </dependency> <!--jest 日志依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.2.0</version> </dependency> </dependencies>
去我的博客查看原文:http://www.kailing.pub/article/index/arcid/84.html
继续阅读 »
前言

在了解jest框架前,楼主一直尝试用官方的Elasticsearch java api连接es服务的,可是,不知何故,一直报如下的异常信息,谷歌了很久,都说是jvm版本不一致导致的问题,可我是本地测试的,jvm肯定是一致的,这个问题现在都木有解决,but,这怎么能阻止我探索es的脚步呢,so,让我发现了jest 这个框架   


org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream Caused by: org.elasticsearch.transport.TransportSerializationException: Failed to deserialize exception response from stream
我的测试代码是参考官方api实例的,官方api地址:Elasticsearch java api,代码如下:



Client client = new TransportClient().addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300)); QueryBuilder queryBuilder = QueryBuilders.termQuery("content", "搜"); SearchResponse searchResponse = client.prepareSearch("indexdata").setTypes("fulltext") .setQuery(queryBuilder) .execute() .actionGet(); SearchHits hits = searchResponse.getHits(); System.out.println("查询到记录数:" + hits.getTotalHits()); SearchHit[] searchHists = hits.getHits(); for(SearchHit sh : searchHists){ System.out.println("content:"+sh.getSource().get("content")); } client.close();
如果有人知道怎么回事,告诉一下楼主吧,让楼主坑的明白,感激不尽了,我的es版本是2.2.0


进入正题

了解jest

jest是一个基于 HTTP Rest 的连接es服务的api工具集,功能强大,能够使用es java api的查询语句,项目是开源的,github地址:https://github.com/searchbox-io/Jest




我的测试用例

分词器:ik,分词器地址:https://github.com/medcl/elasticsearch-analysis-ik ,es的很多功能都是基于插件提供的,es版本升级都2.2.0后,安装插件的方式不一样了,如果你安装ik分词插件有问题,请点击右上角的qq联系博主

新建索引

curl -XPUT http://localhost:9200/indexdata


创建索引的mapping,指定分词器

curl -XPOST http://localhost:9200/indexdata/fulltext/_mapping

{
  "fulltext": {
    "_all": {
      "analyzer": "ik_max_word",
      "search_analyzer": "ik_max_word",
      "term_vector": "no",
      "store": "false"
    },
    "properties": {
      "content": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      },
      "description": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      },
      "title": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      },
      "keyword": {
        "type": "string",
        "store": "no",
        "term_vector": "with_positions_offsets",
        "analyzer": "ik_max_word",
        "search_analyzer": "ik_max_word",
        "include_in_all": "true",
        "boost": 8
      }
    }
  }
}

mapping信息可以用head插件查看,如下


导入数据和查询,看代码吧


@RunWith(SpringJUnit4ClassRunner.class) @SpringApplicationConfiguration(classes = ElasticSearchTestApplication.class) public class JestTestApplicationTests { @Autowired private KlarticleDao klarticleDao; //得到JestClient实例 public JestClient getClient()throws Exception{ JestClientFactory factory = new JestClientFactory(); factory.setHttpClientConfig(new HttpClientConfig .Builder("http://127.0.0.1:9200&quot;) .multiThreaded(true) .build()); return factory.getObject(); } /** * 导入数据库数据到es * @throws Exception */ @Test public void contextLoads() throws Exception{ JestClient client=getClient(); Listlists=klarticleDao.findAll(); for(Klarticle k:lists){ Index index = new Index.Builder(k).index("indexdata").type("fulltext").id(k.getArcid()+"").build(); System.out.println("添加索引----》"+k.getTitle()); client.execute(index); } //批量新增的方式,效率更高 Bulk.Builder bulkBuilder = new Bulk.Builder(); for(Klarticle k:lists){ Index index = new Index.Builder(k).index("indexdata").type("fulltext").id(k.getArcid()+"").build(); bulkBuilder.addAction(index); } client.execute(bulkBuilder.build()); client.shutdownClient(); } //搜索测试 @Test public void JestSearchTest()throws Exception{ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.matchQuery("content", "搜索")); Search search = new Search.Builder(searchSourceBuilder.toString()) // multiple index or types can be added. .addIndex("indexdata") .build(); JestClient client =getClient(); SearchResult result= client.execute(search); // List> hits = result.getHits(Klarticle.class); Listarticles = result.getSourceAsObjectList(Klarticle.class); for(Klarticle k:articles){ System.out.println("------->:"+k.getTitle()); } } }下面是依赖的jar,maven项目<!--jest依赖--> <dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> <version>2.0.0</version> </dependency> <!--jest 日志依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.2.0</version> </dependency> </dependencies>
去我的博客查看原文:http://www.kailing.pub/article/index/arcid/84.html 收起阅读 »

大家聊一聊使用的什么版本的Elasticsearch,看看Elasticsearch版本变化

我是最近从lucene过渡Elasticsearch的,直接用的最新的2.2.0版本的。发现离线安装插件的方式和以前不一样了,一些配置也有改变,最大的问题是java client api 连接报了如下的异常,我是参照官方api测试的,地址:https://www.elastic.co/guide/e ... .html
org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
谷歌都说是服务和客户端的jvm不一致,我是本机环境测试的,所以,现在这个问题都还没解决,有遇到过的么,还是和版本有关系啊
继续阅读 »
我是最近从lucene过渡Elasticsearch的,直接用的最新的2.2.0版本的。发现离线安装插件的方式和以前不一样了,一些配置也有改变,最大的问题是java client api 连接报了如下的异常,我是参照官方api测试的,地址:https://www.elastic.co/guide/e ... .html
org.elasticsearch.transport.RemoteTransportException: Failed to deserialize exception response from stream
谷歌都说是服务和客户端的jvm不一致,我是本机环境测试的,所以,现在这个问题都还没解决,有遇到过的么,还是和版本有关系啊 收起阅读 »

Elastic线下交流活动走起来!

线上交流不过瘾?那就参加线下交流活动吧!
这里是搜罗的最新的线下交流活动预告:

 
大家分别找到组织报名参加吧,貌似有些还需要场地支持,大家一起出谋划策,把活动办起来吧。
继续阅读 »
线上交流不过瘾?那就参加线下交流活动吧!
这里是搜罗的最新的线下交流活动预告:

 
大家分别找到组织报名参加吧,貌似有些还需要场地支持,大家一起出谋划策,把活动办起来吧。 收起阅读 »

es索引模版配置不当导致的aggs聚合查询字段显示错误的问题

今天在es中对http日志的状态码status进行aggs搜索出现字段内容显示不正常的问题,记录过程:

http日志的情况:
1、http日志从logstash写入es时,状态码配置为status,其内容为 200 ,302 ,400 ,404等。
2、使用kibana对该日志的索引进行查询,在discover页面中显示的status内容跟logstash的内容一致,是正常的。

出现问题的场景:
(我这里使用的是kibana的sense插件进行的查询,如果直接使用curl python-ES也是一样的)
查询该索引:
POST http-2016.03.18/_search
{
  "fields": ["status"],
          "query":{
            "bool":{
              "must": [
                {
                  "range" : {
                    "@timestamp" : {"gte" : "now-5m"}
                  }
                }
              ]
            }
          },
          "_source": "false",
          "size": 0,
          "aggs": {
            "status_type": {
              "terms":{"field":"status"}
            }
          }
}

查询返回的结果中aggregations部分的内容:
"aggregations" : {
    "status_type" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ {
        "key" : -56,
        "doc_count" : 376341
      }, {
        "key" : 46,
        "doc_count" : 51439
      }, {
        "key" : 45,
        "doc_count" : 5543
      }, {
        "key" : 48,
        "doc_count" : 1669
      }, {
        "key" : -108,
        "doc_count" : 1068
      }, {
        "key" : -50,
        "doc_count" : 11
      }, {
        "key" : -109,
        "doc_count" : 8
      }, {
        "key" : -112,
        "doc_count" : 4
      } 

寻找原因:
起先先去掉了查询的aggs部分,单独查询query的内容:
POST http-2016.03.18/_search
{
  "fields": ["status"],
          "query":{
            "bool":{
              "must": [
                {
                  "range" : {
                    "@timestamp" : {"gte" : "now-5m"}
                  }
                }
              ]
            }
          }
}

返回的结果中,hits显示的status字段内容是正常的:
"hits": {
    "total": 1242104,
    "max_score": 1,
    "hits": [
      {
        "_index": "http-2016.03.18",
        "_type": "log",
        "_id": "AVOI3EiwidwPAhB1e7gQ",
        "_score": 1,
        "fields": {
          "status": [
            "200"
          ]
        }
      }
    ......

然后查询了http索引的索引信息和模版配置:
GET /http-2016.03.18/
GET /_template/http
发现其中http的status的属性type类型的内容是byte :
        "properties": {
          "@timestamp": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
        ......
        ......
          "status": {
            "type": "byte"
          },
        ......
        ......

原因:
在aggs查询中发现了status字段显示错误的情况,status的type类型在es模版中定义成了byte类型,当status的值超过127后将出现溢出的情况,因此修改为short后,恢复了正常。
(对于http的状态码status,其type类型使用short已经足够了,如果使用integer,long或默认的string类型也是可以的,这里影响的是存储空间占用的大小。)
 
 
继续阅读 »
今天在es中对http日志的状态码status进行aggs搜索出现字段内容显示不正常的问题,记录过程:

http日志的情况:
1、http日志从logstash写入es时,状态码配置为status,其内容为 200 ,302 ,400 ,404等。
2、使用kibana对该日志的索引进行查询,在discover页面中显示的status内容跟logstash的内容一致,是正常的。

出现问题的场景:
(我这里使用的是kibana的sense插件进行的查询,如果直接使用curl python-ES也是一样的)
查询该索引:
POST http-2016.03.18/_search
{
  "fields": ["status"],
          "query":{
            "bool":{
              "must": [
                {
                  "range" : {
                    "@timestamp" : {"gte" : "now-5m"}
                  }
                }
              ]
            }
          },
          "_source": "false",
          "size": 0,
          "aggs": {
            "status_type": {
              "terms":{"field":"status"}
            }
          }
}

查询返回的结果中aggregations部分的内容:
"aggregations" : {
    "status_type" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [ {
        "key" : -56,
        "doc_count" : 376341
      }, {
        "key" : 46,
        "doc_count" : 51439
      }, {
        "key" : 45,
        "doc_count" : 5543
      }, {
        "key" : 48,
        "doc_count" : 1669
      }, {
        "key" : -108,
        "doc_count" : 1068
      }, {
        "key" : -50,
        "doc_count" : 11
      }, {
        "key" : -109,
        "doc_count" : 8
      }, {
        "key" : -112,
        "doc_count" : 4
      } 

寻找原因:
起先先去掉了查询的aggs部分,单独查询query的内容:
POST http-2016.03.18/_search
{
  "fields": ["status"],
          "query":{
            "bool":{
              "must": [
                {
                  "range" : {
                    "@timestamp" : {"gte" : "now-5m"}
                  }
                }
              ]
            }
          }
}

返回的结果中,hits显示的status字段内容是正常的:
"hits": {
    "total": 1242104,
    "max_score": 1,
    "hits": [
      {
        "_index": "http-2016.03.18",
        "_type": "log",
        "_id": "AVOI3EiwidwPAhB1e7gQ",
        "_score": 1,
        "fields": {
          "status": [
            "200"
          ]
        }
      }
    ......

然后查询了http索引的索引信息和模版配置:
GET /http-2016.03.18/
GET /_template/http
发现其中http的status的属性type类型的内容是byte :
        "properties": {
          "@timestamp": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
        ......
        ......
          "status": {
            "type": "byte"
          },
        ......
        ......

原因:
在aggs查询中发现了status字段显示错误的情况,status的type类型在es模版中定义成了byte类型,当status的值超过127后将出现溢出的情况,因此修改为short后,恢复了正常。
(对于http的状态码status,其type类型使用short已经足够了,如果使用integer,long或默认的string类型也是可以的,这里影响的是存储空间占用的大小。)
 
  收起阅读 »

使用 SQL 查询 Elasticsearch

我新写了一个用 SQL 查询 Elasticsearch 的工具 https://github.com/taowen/es-monitor,欢迎大家使用。详细的文档参见:https://segmentfault.com/a/1190000003502849
 
在此之前,有这么三个SQL查询Elasticsearch的工具:

 
Crate.io 的问题是它不是Elasticsearch,它的聚合是自己实现的版本,和Elasticsearch的Aggregation是两套东西。
http://sqltoelasticsearch.fr/ 语法支持很不晚上,同时 WHERE 和 GROUP BY 就翻译错了。
https://github.com/NLPchina/elasticsearch-sql 的问题在于其用Java来翻译SQL太笨拙了,如果要达到同样的SQL语法支持程度还要增加大量的Java代码。
如果只是支持SQL,很多Elasticsearch的功能是无法被充分释放的。比如Elasticsearch支持sub aggregation,每个sub aggregation就是OLAP里的下钻一次的概念。而且每下钻一次都可以有自己的指标计算。简单的SQL是无法表达这样的特性的。所以我扩充了一下SQL的语义,使得其更贴近Elasticsearch聚合的工作方式:
 
$ cat << EOF | ./es_query.py http://127.0.0.1:9200 
WITH SELECT MAX(market_cap) AS max_all_times FROM symbol AS all_symbols;
WITH SELECT MAX(market_cap) AS max_at_2000 FROM all_symbols WHERE ipo_year=2000 AS year_2000;
WITH SELECT MAX(market_cap) AS max_at_2001 FROM all_symbols WHERE ipo_year=2001 AS year_2001;
EOF
希望我的小工具可以帮到你
 
继续阅读 »
我新写了一个用 SQL 查询 Elasticsearch 的工具 https://github.com/taowen/es-monitor,欢迎大家使用。详细的文档参见:https://segmentfault.com/a/1190000003502849
 
在此之前,有这么三个SQL查询Elasticsearch的工具:

 
Crate.io 的问题是它不是Elasticsearch,它的聚合是自己实现的版本,和Elasticsearch的Aggregation是两套东西。
http://sqltoelasticsearch.fr/ 语法支持很不晚上,同时 WHERE 和 GROUP BY 就翻译错了。
https://github.com/NLPchina/elasticsearch-sql 的问题在于其用Java来翻译SQL太笨拙了,如果要达到同样的SQL语法支持程度还要增加大量的Java代码。
如果只是支持SQL,很多Elasticsearch的功能是无法被充分释放的。比如Elasticsearch支持sub aggregation,每个sub aggregation就是OLAP里的下钻一次的概念。而且每下钻一次都可以有自己的指标计算。简单的SQL是无法表达这样的特性的。所以我扩充了一下SQL的语义,使得其更贴近Elasticsearch聚合的工作方式:
 
$ cat << EOF | ./es_query.py http://127.0.0.1:9200 
WITH SELECT MAX(market_cap) AS max_all_times FROM symbol AS all_symbols;
WITH SELECT MAX(market_cap) AS max_at_2000 FROM all_symbols WHERE ipo_year=2000 AS year_2000;
WITH SELECT MAX(market_cap) AS max_at_2001 FROM all_symbols WHERE ipo_year=2001 AS year_2001;
EOF
希望我的小工具可以帮到你
  收起阅读 »

ElasticSearch2.1.1安装及简单配置说明


目前最新版ES超级详细的安装、配置流程。
根据自己真实的安装过程以及多篇博客文章的重要提示编写。
按照文档中的说明一步一步操作,分分钟就能开始ES2.1.1的非凡体验!

目前最新版ES超级详细的安装、配置流程。
根据自己真实的安装过程以及多篇博客文章的重要提示编写。
按照文档中的说明一步一步操作,分分钟就能开始ES2.1.1的非凡体验!

nest驱动IndexName问题

nest驱动访问ES,按照官网文档,使用如下程序可以正常索引:
static void Main()
        {
            var node = new Uri("http://localhost:9200&quot;);

            var settings = new ConnectionSettings(
                node,
                defaultIndex: "my-application"
            );
            var client = new ElasticClient(settings);
            var person = new Person
            {
                Id = "1",
                Firstname = "Martijn",
                Lastname = "Laarman"
            };
            var index = client.Index(person);
        }
调整为手动设置indexName时出错,示例代码如下:
static void Main()
        {
            var node = new Uri("http://localhost:9200&quot;);

            var settings = new ConnectionSettings(
                node,
                defaultIndex: "my-application"
            );
            settings.MapDefaultTypeIndices(d => d.Add(typeof(Person), "constIndex"));
            var client = new ElasticClient(settings);
            var person = new Person
            {
                Id = "1",
                Firstname = "Martijn",
                Lastname = "Laarman"
            };
            var index = client.Index(person);
        }
出错提示为:
{StatusCode: 400,
 Method: PUT,
 Url: http://localhost:9200/constIndex/automobile/1,
 Request: {
  "firstname": "Martijn",
  "lastname": "Laarman",
  "id": "1"
},
 Response: <Response stream not captured or already read to completion by serializer, set ExposeRawResponse() on connectionsettings to force it to be set on>}
 
 
using System;
using Nest;

namespace ConsoleApplication1
{
class Program
{
private static IIndexResponse Index<T>(T person, string indexName) where T : class
{
var node = new Uri("http://localhost:9200&quot;);

var settings = new ConnectionSettings(node,defaultIndex: indexName);
var client = new ElasticClient(settings);
return client.Index(person);
}

static void Main()
{
string indexNameError = typeof(Person).FullName
.Substring(typeof(Person).FullName.LastIndexOf(".", StringComparison.Ordinal) + 1) + "Indexs";
const string indexNameOk = "test";
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var ok = Index(person, indexNameOk);
Console.WriteLine("Result:" + ok.IsValid);
var error = Index(person, indexNameError);
Console.WriteLine("Result:" + error.IsValid);
Console.ReadKey();
}

}

[Serializable]
public class Person
{
public string Firstname { get; set; }
public string Lastname { get; set; }
public string Id { get; set; }
}
}

 
继续阅读 »
nest驱动访问ES,按照官网文档,使用如下程序可以正常索引:
static void Main()
        {
            var node = new Uri("http://localhost:9200&quot;);

            var settings = new ConnectionSettings(
                node,
                defaultIndex: "my-application"
            );
            var client = new ElasticClient(settings);
            var person = new Person
            {
                Id = "1",
                Firstname = "Martijn",
                Lastname = "Laarman"
            };
            var index = client.Index(person);
        }
调整为手动设置indexName时出错,示例代码如下:
static void Main()
        {
            var node = new Uri("http://localhost:9200&quot;);

            var settings = new ConnectionSettings(
                node,
                defaultIndex: "my-application"
            );
            settings.MapDefaultTypeIndices(d => d.Add(typeof(Person), "constIndex"));
            var client = new ElasticClient(settings);
            var person = new Person
            {
                Id = "1",
                Firstname = "Martijn",
                Lastname = "Laarman"
            };
            var index = client.Index(person);
        }
出错提示为:
{StatusCode: 400,
 Method: PUT,
 Url: http://localhost:9200/constIndex/automobile/1,
 Request: {
  "firstname": "Martijn",
  "lastname": "Laarman",
  "id": "1"
},
 Response: <Response stream not captured or already read to completion by serializer, set ExposeRawResponse() on connectionsettings to force it to be set on>}
 
 
using System;
using Nest;

namespace ConsoleApplication1
{
class Program
{
private static IIndexResponse Index<T>(T person, string indexName) where T : class
{
var node = new Uri("http://localhost:9200&quot;);

var settings = new ConnectionSettings(node,defaultIndex: indexName);
var client = new ElasticClient(settings);
return client.Index(person);
}

static void Main()
{
string indexNameError = typeof(Person).FullName
.Substring(typeof(Person).FullName.LastIndexOf(".", StringComparison.Ordinal) + 1) + "Indexs";
const string indexNameOk = "test";
var person = new Person
{
Id = "1",
Firstname = "Martijn",
Lastname = "Laarman"
};
var ok = Index(person, indexNameOk);
Console.WriteLine("Result:" + ok.IsValid);
var error = Index(person, indexNameError);
Console.WriteLine("Result:" + error.IsValid);
Console.ReadKey();
}

}

[Serializable]
public class Person
{
public string Firstname { get; set; }
public string Lastname { get; set; }
public string Id { get; set; }
}
}

  收起阅读 »

Packetbeat协议扩展开发教程(3)

 书接上回:http://elasticsearch.cn/article/53
 
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error

// Called to return the configured ports
GetPorts() int
}

type TcpProtocolPlugin interface {
ProtocolPlugin

// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData

// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)

// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}

type UdpProtocolPlugin interface {
ProtocolPlugin

// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。

请问:
Packetbeat怎么工作的?

回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。

貌似很简单嘛!

进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,
Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。

首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:
git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}

type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}

+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,
这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。
git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)

// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}

继续修改packetbeat.go主文件,允许SMTP协议并加载。
git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)

@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}

做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。

说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。

现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。

下面看段MySQL模块里面的例子
 priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}

[ ... ]

return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。
我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}

if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。

好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321

由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:
MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消 

最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/
Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:

上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。

打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool
message *SmtpMessage
}

type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:
package smtp

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)

type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool

message *SmtpMessage
}

type SmtpMessage struct {
start int
end int

Ts time.Time
From string
To string
IgnoreMessage bool
}

type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}

func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}

func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}

if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}

return nil
}

func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}

func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()

if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results

return nil
}

func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}

func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {

defer logp.Recover("ParseSmtp exception")

priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}

if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}

stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}

ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}

if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}

return priv
}

func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}

func stmpMessageParser(s *SmtpStream) (bool, bool) {

var value string=""

for s.parseOffset < len(s.data) {


logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))


if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {

logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))

found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}

value = line[1:]
logp.Debug("smtp", "value %s", value)

s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}

return true, false
}

func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")

//TODO

}

// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {

logp.Info("smtp","message completed...")

// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]

if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}

// and reset message
stream.PrepareForNewMessage()
}

func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")

stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}



func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {

defer logp.Recover("GapInStream(smtp) exception")

if private == nil {
return private, false
}

return private, true
}

func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

logp.Info("smtp","stream closed...")

// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}

现在切换到命令行,编译一下
cd ~/go/src/github.com/elastic/beats/packetbeat
make

编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)
./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

运行查看控制台输出结果:
➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...

成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。
留待下回分解。
继续阅读 »
 书接上回:http://elasticsearch.cn/article/53
 
前面介绍了Packetbeat的项目结构,今天终于要开始写代码了,想想还是有点小激动呢。(你快点吧,拖半天了)
网络传输两大协议TCP和UDP,我们的所有协议都不离这两种,HTTP、MySQL走的是TCP传输协议,DNS走的是UDP协议,在Packetbeat里面,实现一个自己的协议非常简单,继承并实现这两者对应的接口就行了,我们看一下长什么样:
打开一个现有的UDP和HTTP协议接口定义:
/~/go/src/github.com/elastic/beats/packetbeat/protos/protos.go
// Functions to be exported by a protocol plugin
type ProtocolPlugin interface {
// Called to initialize the Plugin
Init(test_mode bool, results publisher.Client) error

// Called to return the configured ports
GetPorts() int
}

type TcpProtocolPlugin interface {
ProtocolPlugin

// Called when TCP payload data is available for parsing.
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

// Called when the FIN flag is seen in the TCP stream.
ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private ProtocolData) ProtocolData

// Called when a packets are missing from the tcp
// stream.
GapInStream(tcptuple *common.TcpTuple, dir uint8, nbytes int,
private ProtocolData) (priv ProtocolData, drop bool)

// ConnectionTimeout returns the per stream connection timeout.
// Return <=0 to set default tcp module transaction timeout.
ConnectionTimeout() time.Duration
}

type UdpProtocolPlugin interface {
ProtocolPlugin

// ParseUdp is invoked when UDP payload data is available for parsing.
ParseUdp(pkt *Packet)
}
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。

请问:
Packetbeat怎么工作的?

回答:
每一个协议都有一个固定的端口用于通信,你要做的事情就是定义协议端口,然后按协议是TCP还是UDP来实现对应的接口,Packetbeat将会截获指定端口的数据包(Packet),然后如果交给你定义的方法来进行解析,TCP是Parse,UDP是ParseUdp,都在上面的接口定义好的,然后将解析出来的结构化数据封装成Json,然后扔给Elasticsearch,后续的就的如何对这些数据做一些有趣的分析和应用了。

貌似很简单嘛!

进入每个端口的数据包,我们假设是一个自来水管,拧开80端口,哗啦啦出来的全是HTTP请求的数据包,Packetbeat里面Http协议监听的是80端口啊,所有这些包统统都交给Packetbeat里面的Http协议模块来进行解析,Http协议会一个个的检查这些数据包,也就是每个数据包都会调用一次Parse接口,到这里提到了传过来一个Packet,我们看看它的数据结构长什么样?
type Packet struct {
Ts time.Time
Tuple common.IpPortTuple
Payload byte
}
Packet结构简单,
Ts是收到数据包的时间戳;
Tuple是一个来源IP+来源端口和目的IP+目的端口的元组;
Payload就是这个包里面的传输的有用的数据,应用层的字节数据,不包括IP和TCP/UDP头信息,是不是处理起来简单许多。

首选我们确定SMTP协议的配置,每个协议在packetbeat.yml的protocol下面都应该有一个配置节点,如下:
protocols:
smtp:
# Configure the ports where to listen for Smtp traffic. You can disable
# the Smtp protocol by commenting out the list of ports.
ports: [25]
还需要在对应的config类文件:packetbeat/config/config.go,增加SMTP的结构体,目前只支持一个端口参数,继承基类ProtocolCommon就行,如下:
git diff config/config.go
@@ -42,6 +42,7 @@ type Protocols struct {
Pgsql Pgsql
Redis Redis
Thrift Thrift
+ Smtp Smtp
}

type Dns struct {
@@ -118,5 +119,9 @@ type Redis struct {
Send_response *bool
}

+type Smtp struct {
+ ProtocolCommon `yaml:",inline"`
+}
+
// Config Singleton
var ConfigSingleton Config
在protos文件夹下面,新增smtp目录,并新增空白文件smtp.go,路径:packetbeat/protos/smtp/smtp.go,
这里就是解析SMTP协议的地方,也是我们扩展协议的主要的工作。
...TODO...
修改protos/protos.go,增加SMTP协议枚举,这里记得保证顺序一致,并且protocol名称必须和配置的节点名称一致,如这里都是smtp。
git diff protos/protos.go
@@ -103,6 +103,7 @@ const (
MongodbProtocol
DnsProtocol
MemcacheProtocol
+ SmtpProtocol
)

// Protocol names
@@ -116,6 +117,7 @@ var ProtocolNames = string{
"mongodb",
"dns",
"memcache",
+ "smtp",
}

继续修改packetbeat.go主文件,允许SMTP协议并加载。
git diff packetbeat.go
@@ -27,6 +27,7 @@ import (
"github.com/elastic/packetbeat/protos/tcp"
"github.com/elastic/packetbeat/protos/thrift"
"github.com/elastic/packetbeat/protos/udp"
+ "github.com/elastic/packetbeat/protos/smtp"
"github.com/elastic/packetbeat/sniffer"
)

@@ -43,6 +44,7 @@ var EnabledProtocolPlugins map[protos.Protocol]protos.ProtocolPlugin = map[proto
protos.ThriftProtocol: new(thrift.Thrift),
protos.MongodbProtocol: new(mongodb.Mongodb),
protos.DnsProtocol: new(dns.Dns),
+ protos.SmtpProtocol: new(smtp.Smtp),
}

做完上面一系列修改之后,一个空白的SMTP协议的插件的架子就搭好了,并且插件也注册到了Packetbeat里面了,接下来我们再把packetbeat/protos/smtp/smtp.go按照TCPplugin接口的要求实现一下。

说实话TCP处理起来很难,开始之前,我们先明确几个概念,TCP协议是有状态的,并且是流式的,我们关注的是七层应用层的消息,如HTTP里面的一个HTTP请求和返回,但是TCP底层都是一系列数据包,并且不同的请求的数据包是混杂在一起的,也就是说一个数据包里面可能只是一个HTTP请求的一部分也可能包含多条HTTP请求的一部分,所以Parse()里面需要处理跨数据包的状态信息,我们要把这些数据包和具体的七层的应用层的消息关联起来。

现在我们仔细看看Parse()接口的各个参数定义是做什么用的
Parse(pkt *Packet, tcptuple *common.TcpTuple,
dir uint8, private ProtocolData) ProtocolData

pkt不用说了,是送进来的数据包,前面已经介绍了其数据结构,tcptuple是该数据包所属的TCP数据流所在的唯一标示(一个未关闭的TCP数据量包含若干数据包,直到TCP链接关闭),使用tcptuple.Hashable()获取唯一值;dir参数标示数据包在TCP数据流中的流向,和第一个TCP数据包方向一致是TcpDirectionOriginal,否则是TcpDirectionReverse;private参数可用来在TCP流中存储状态信息,可在运行时转换成具体的强类型,任意修改和传递给下一个Parse方法,简单来说就是进行中间数据的共享。

下面看段MySQL模块里面的例子
 priv := mysqlPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(mysqlPrivateData)
if !ok {
priv = mysqlPrivateData{}
}
}

[ ... ]

return priv
上面的代码就是将private强制转换成mysqlPrivateData结构,然后再使用。
我们再继续看后续怎么处理这些包的一个逻辑例子
ok, complete := mysqlMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("mysql", "Ignore MySQL message. Drop tcp stream.")
return priv
}

if complete {
mysql.messageComplete(tcptuple, dir, stream)
} else {
// wait for more data
break
}
mysqlMessageParser是一个解析mysql消息的方法,细节我们忽略,我们只需要关心它的返回,ok标示成功或者失败,true则继续处理,false表示数据包不能用,那就直接忽略;第二个参数complete表示判断这一个MySQL消息是否已经完整了,如果完整了,我们就可以扔出去了,否则继续等待剩下的消息内容。

好的,我们看看SMTP协议怎么折腾吧,先看看一个邮件交互的流程图,来自RFC5321

由上图可见,发送端和邮件服务器通过一系列命令来执行邮件的发送,下面看看一个具体的命令操作流程(来源:简单邮件传输协议)[/url]
S: 220 www.example.com ESMTP Postfix
C: HELO mydomain.com
S: 250 Hello mydomain.com
C: MAIL FROM:
S: 250 Ok
C: RCPT TO:
S: 250 Ok
C: DATA
S: 354 End data with .
C: Subject: test message
C: From:""< sender@mydomain.com>
C: To:""< friend@example.com>
C:
C: Hello,
C: This is a test.
C: Goodbye.
C: .
S: 250 Ok: queued as 12345
C: quit
S: 221 Bye
上面的过程可以看到就几个命令就能将邮件发送出去,但是其实SMTP协议比较复杂,还包括身份认证、附件、多媒体编码等等,我们今天精简一下,我们目前只关心谁给谁发了邮件,发送内容先不管,这样相比完整的SMTP协议(RFC5321),我们只需要关注以下几个命令:
MAIL:开始一份邮件 mail from: xxx@xx.com
RCPT: 标识单个的邮件接收人;常在mail命令后面 可有多个rcpt to: xx@xx.com
QUIT:结束SMTP会话,不一定发送了邮件,注意
RESET:重置会话,当前传输被取消 

最终希望通过Packetbeat将这些数据解析并处理成我们想要的如下JSON数据,即大功告成:
{
"timestamp":"2016-1-15 12:00:00",
"from":"medcl@example.co",
"to":["lcdem@example.co"]
}
我们还需要一个测试数据,这里有一个下载各种协议测试数据包的地方,由wireshark站点提供:https://wiki.wireshark.org/SampleCaptures/
Ctrl+F找到SMTP的下载地址:smtp.pcap
用wireshark打开我们刚刚下载的smtp.pcap文件,然后再输入过滤条件:tcp.port == 25,只看25端口的数据,如下图:

上图可以看到25端口的跑的数据有很多,不过我们只关心我们需要的那几个命令就好了。

打开/~/go/src/github.com/elastic/beats/packetbeat/protos/smtp/smtp.go
定义smtpPrivateData,里面的Data是一个数组,分别是TCP两个方向的数据,SmtpMessage是解析出来的邮件信息
type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool
message *SmtpMessage
}

type SmtpMessage struct {
Ts time.Time
From string
To string
}
然后参照MySQL协议,定义相应的方法,最终如下:
package smtp

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/packetbeat/config"
"github.com/elastic/beats/packetbeat/protos"
"github.com/elastic/beats/packetbeat/protos/tcp"
"bytes"
"time"
"strings"
)

type smtpPrivateData struct{
Data [2]*SmtpStream
}

type SmtpStream struct {
tcptuple *common.TcpTuple

data byte

parseOffset int
isClient bool

message *SmtpMessage
}

type SmtpMessage struct {
start int
end int

Ts time.Time
From string
To string
IgnoreMessage bool
}

type Smtp struct {
SendRequest bool
SendResponse bool
transactionTimeout time.Duration
Ports int
results publisher.Client
}

func (smtp *Smtp) initDefaults() {
smtp.SendRequest = false
smtp.SendResponse = false
smtp.transactionTimeout = protos.DefaultTransactionExpiration
}

func (smtp *Smtp) setFromConfig(config config.Smtp) error {
smtp.Ports = config.Ports
if config.SendRequest != nil {
smtp.SendRequest = *config.SendRequest
}
if config.SendResponse != nil {
smtp.SendResponse = *config.SendResponse
}

if config.TransactionTimeout != nil && *config.TransactionTimeout > 0 {
smtp.transactionTimeout = time.Duration(*config.TransactionTimeout) * time.Second
}

return nil
}

func (smtp *Smtp) GetPorts() int {
return smtp.Ports
}

func (smtp *Smtp) Init(test_mode bool, results publisher.Client) error {
smtp.initDefaults()

if !test_mode {
err := smtp.setFromConfig(config.ConfigSingleton.Protocols.Smtp)
if err != nil {
return err
}
}
smtp.results = results

return nil
}

func readLine(data byte, offset int) (bool, string, int) {
q := bytes.Index(data[offset:], byte("\r\n"))
if q == -1 {
return false, "", 0
}
return true, string(data[offset : offset+q]), offset + q + 2
}

func (smtp *Smtp) Parse(pkt *protos.Packet, tcptuple *common.TcpTuple, dir uint8, private protos.ProtocolData, ) protos.ProtocolData {

defer logp.Recover("ParseSmtp exception")

priv := smtpPrivateData{}
if private != nil {
var ok bool
priv, ok = private.(smtpPrivateData)
if !ok {
priv = smtpPrivateData{}
}
}

if priv.Data[dir] == nil {
priv.Data[dir] = &SmtpStream{
tcptuple: tcptuple,
data: pkt.Payload,
message: &SmtpMessage{Ts: pkt.Ts},
}
} else {
// concatenate bytes
priv.Data[dir].data = append(priv.Data[dir].data, pkt.Payload...)
if len(priv.Data[dir].data) > tcp.TCP_MAX_DATA_IN_STREAM {
logp.Debug("smtp", "Stream data too large, dropping TCP stream")
priv.Data[dir] = nil
return priv
}
}

stream := priv.Data[dir]
for len(stream.data) > 0 {
if stream.message == nil {
stream.message = &SmtpMessage{Ts: pkt.Ts}
}

ok, complete := stmpMessageParser(priv.Data[dir])
if !ok {
// drop this tcp stream. Will retry parsing with the next
// segment in it
priv.Data[dir] = nil
logp.Debug("smtp", "Ignore SMTP message. Drop tcp stream. Try parsing with the next segment")
return priv
}

if complete {
smtp.messageComplete(tcptuple, dir, stream)
} else {
logp.Debug("smtp","still wait message...")
// wait for more data
break
}
}

return priv
}

func (smtp *Smtp) ConnectionTimeout() time.Duration {
return smtp.transactionTimeout
}

func stmpMessageParser(s *SmtpStream) (bool, bool) {

var value string=""

for s.parseOffset < len(s.data) {


logp.Debug("smtp", "Parse message: %s", string(s.data[s.parseOffset]))


if strings.HasPrefix(string(s.data[s.parseOffset]),"MAIL" ) {

logp.Debug("smtp", "Hit MAIL command: %s", string(s.data[s.parseOffset]))

found, line, off := readLine(s.data, s.parseOffset)
if !found {
return true, false
}

value = line[1:]
logp.Debug("smtp", "value %s", value)

s.parseOffset = off
} else {
logp.Debug("smtp", "Unexpected message starting with %s", s.data[s.parseOffset:])
return false, false
}
}

return true, false
}

func handleSmtp(stmp *Smtp, m *SmtpMessage, tcptuple *common.TcpTuple,
dir uint8, raw_msg byte) {
logp.Info("smtp","handle smtp message...")

//TODO

}

// Called when the parser has identified a full message.
func (smtp *Smtp) messageComplete(tcptuple *common.TcpTuple, dir uint8, stream *SmtpStream) {

logp.Info("smtp","message completed...")

// all ok, ship it
msg := stream.data[stream.message.start:stream.message.end]

if !stream.message.IgnoreMessage {
handleSmtp(smtp, stream.message, tcptuple, dir, msg)
}

// and reset message
stream.PrepareForNewMessage()
}

func (stream *SmtpStream) PrepareForNewMessage() {
logp.Info("smtp","prepare for new message...")

stream.data = stream.data[stream.parseOffset:]
stream.parseOffset = 0
stream.isClient = false
stream.message = nil
}



func (smtp *Smtp) GapInStream(tcptuple *common.TcpTuple, dir uint8,
nbytes int, private protos.ProtocolData) (priv protos.ProtocolData, drop bool) {

defer logp.Recover("GapInStream(smtp) exception")

if private == nil {
return private, false
}

return private, true
}

func (smtp *Smtp) ReceivedFin(tcptuple *common.TcpTuple, dir uint8,
private protos.ProtocolData) protos.ProtocolData {

logp.Info("smtp","stream closed...")

// TODO: check if we have data pending and either drop it to free
// memory or send it up the stack.
return private
}

现在切换到命令行,编译一下
cd ~/go/src/github.com/elastic/beats/packetbeat
make

编译成功,一个滚烫的packetbeat可执行文件就躺在当前目录下了,运行一下先,参数-I 指定pcap文件(还记得前面下载的那个测试文件吧)
./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N

运行查看控制台输出结果:
➜  packetbeat git:(smtpbeat) ✗ ./packetbeat -d "smtp" -c etc/packetbeat.yml -I ~/Downloads/smtp.pcap  -e -N 
2016/01/15 10:12:19.058535 publish.go:191: INFO Dry run mode. All output types except the file based one are disabled.
2016/01/15 10:12:19.058570 geolite.go:24: INFO GeoIP disabled: No paths were set under output.geoip.paths
2016/01/15 10:12:19.058592 publish.go:262: INFO Publisher name: medcls-MacBook.local
2016/01/15 10:12:19.058724 beat.go:145: INFO Init Beat: packetbeat; Version: 1.0.0
2016/01/15 10:12:19.059758 beat.go:171: INFO packetbeat sucessfully setup. Start running.
2016/01/15 10:12:20.155335 smtp.go:163: DBG Parse message: 2
2016/01/15 10:12:20.155416 smtp.go:180: DBG Unexpected message starting with 250-xc90.websitewelcome.com Hello GP [122.162.143.157]
250-SIZE 52428800
250-PIPELINING
250-AUTH PLAIN LOGIN
250-STARTTLS
250 HELP
2016/01/15 10:12:22.310974 smtp.go:163: DBG Parse message: F
2016/01/15 10:12:22.311025 smtp.go:180: DBG Unexpected message starting with From: "Gurpartap Singh"
To:
Subject: SMTP
Date: Mon, 5 Oct 2009 11:36:07 +0530
Message-ID: <000301ca4581$ef9e57f0$cedb07d0$@in>
MIME-Version: 1.0
...

成功了,邮件内容都在控制台输出了,但这还不是我们要的最终结果,我需要里面的关键信息,我们继续修改smtp.go这个文件。
留待下回分解。 收起阅读 »

Packetbeat协议扩展开发教程(2)

书接上回:http://elasticsearch.cn/article/48

我们打开Packetbeat项目,看看里面长什么样:



现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;

现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;

知道项目的大概架构就知道从哪下手了,下节分解。
继续阅读 »
书接上回:http://elasticsearch.cn/article/48

我们打开Packetbeat项目,看看里面长什么样:



现在beats项目都合并在一起了,第一级可以看到各个子项目:
/libbeat: 公共依赖;
/filebeat: 替代Logstash-forwarder,处理日志类型数据;
/packetbeat: 本文扩展重点,网络抓包;
/topbeat: 监控系统性能;
/winlogbeat: 监控windows下面的日志信息;
/vender: 依赖的第三方库;
/tests: 用于测试的pcamp抓包文件,非常有用;
/scripts: 一些用于开发和测试的Docker脚本文件;

现在重点看看/packetbeat下面目录都有些什么:
/packetbeat/main.go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档:Traffic Capturing Options;
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;

知道项目的大概架构就知道从哪下手了,下节分解。 收起阅读 »

一个把数据从MySQL同步到Elasticsearch的工具

https://github.com/zhongbiaode ... -sync
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。
继续阅读 »
https://github.com/zhongbiaode ... -sync
这个工具用python实现,主要使用了mysqldump输出xml进行初次同步,以及binlog进行增量同步,欢迎试用以及提出修改意见。
最近刚刚更新了中文文档。 收起阅读 »