行动是治愈恐惧的良药,而犹豫、拖延将不断滋养恐惧。

通过elasticsearch-mapper attachment插件实现文件建立索引

1.安装elasticsearch-mapper attachment

bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
 2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;

import com.spatial4j.core.io.ParseUtils;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
    public static void main(String[] args) throws Exception{
        sys();
}

    private static void sys() throws IOException {
        // TODO Auto-generated method stub
        String idxName = "test";
        String idxType = "attachments";
        Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
        Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
       String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
        XContentBuilder source = jsonBuilder().startObject()
            .field("file", data64)
                .field("text", data64)
                .endObject();

        String id = "file"+11;
        IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
                .setSource(source).setRefresh(true).execute().actionGet();
        System.out.println(idxResp);
        client.close();
    }
4.按官方文档正常的搜索就可以了
继续阅读 »
1.安装elasticsearch-mapper attachment

bin/plugin install elasticsearch/elasticsearch-mapper-attachments/3.1.1
 2.按照插件官方文档来测试
3.插件需要手动把文档内容转化为base64编码然后建立索引,代码如下
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.tika.Tika;
import org.apache.tika.config.TikaConfig;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.AutoDetectParser;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.parser.pdf.PDFParser;
import org.apache.tika.sax.BodyContentHandler;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.xml.sax.ContentHandler;

import com.spatial4j.core.io.ParseUtils;

import static org.elasticsearch.common.xcontent.XContentFactory.*;
public class sysfiles {
    public static void main(String[] args) throws Exception{
        sys();
}

    private static void sys() throws IOException {
        // TODO Auto-generated method stub
        String idxName = "test";
        String idxType = "attachments";
        Settings settings =ImmutableSettings.settingsBuilder().put("cluster.name","az_bsms_elasticsearch").build();
        Client client=new TransportClient(settings).addTransportAddress(new InetSocketTransportAddress("127.0.0.1", 9300));
       String data64=org.elasticsearch.common.Base64.encodeFromFile(filepath);
        XContentBuilder source = jsonBuilder().startObject()
            .field("file", data64)
                .field("text", data64)
                .endObject();

        String id = "file"+11;
        IndexResponse idxResp = client.prepareIndex().setIndex(idxName).setType(idxType).setId(id)
                .setSource(source).setRefresh(true).execute().actionGet();
        System.out.println(idxResp);
        client.close();
    }
4.按官方文档正常的搜索就可以了 收起阅读 »

Packetbeat协议扩展开发教程(1)

Packetbeat(https://www.elastic.co/products/beats/packetbeat
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。

开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。

这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。

A.源码签出
登陆Github打开https://github.com/elastic/beats 

fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat 
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/ 
cd $GOPATH/src/github.com/elastic

#签出源码
git clone https://github.com/elastic/beats.git
cd beats

#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git

#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master

#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat

#切换到packetbeat模块
cd packetbeat

#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep

#编译
make

编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html 
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"

现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml  -d "publish"

介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志

好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG  Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events

然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb

至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。
继续阅读 »
Packetbeat(https://www.elastic.co/products/beats/packetbeat
是一个开源的网络抓包与分析框架,内置了很多常见的协议解析,如HTPP、MySQL、Thrift等。但是网络协议有很多,如何扩展一个自己的协议呢,本文将为您介绍如何在Packetbeat基础上扩展实现您自己的协议。

开发环境:
1.Go语言
Packetbeat是由Go语言编写,具有高性能和易部署的特点,有关Go语言的更多信息请访问:https://golang.org/
2.Git
源码管理,相信大家都比较熟悉了。
3.Tcpdump
*nix下的抓包分析,可选,用于调试。
4.Mac本一台
Windows太伤,不建议。
5.IDE
推荐idea,其它只要你顺手都行。

这个教程给大家介绍的是编写一个SMTP协议的扩展,SMTP就是我们发邮件使用的协议,加密的比较麻烦,为了方便,本教程使用不加密的名文传输的SMTP协议,默认对应端口是25。

A.源码签出
登陆Github打开https://github.com/elastic/beats 

fork后得到你自己的仓库,比如我的:https://github.com/medcl/packetbeat 
#创建相应目录
mkdir -p $GOPATH/src/github.com/elastic/ 
cd $GOPATH/src/github.com/elastic

#签出源码
git clone https://github.com/elastic/beats.git
cd beats

#修改官方仓库为upstream源,设置自己的仓库为origin源
git remote rename origin upstream
git remote add origin git@github.com:medcl/packetbeat.git

#获取上游最新的代码,如果是刚fork的话可不用管
git pull upstream master

#签出一个名为smtpbeat的分支,用于开发这个功能
git checkout -b smtpbeat

#切换到packetbeat模块
cd packetbeat

#获取依赖信息
(mkdir -p $GOPATH/src/golang.org/x/&&cd $GOPATH/src/golang.org/x &&git clone https://github.com/golang/tools.git )
go get github.com/tools/godep

#编译
make

编译出来的文件:packetbeat就在根目录
现在我们测试一下
修改etc/packetbeat.yml,在output下面的elasticsearch下面添加enabled: true,默认是不启用的,另外如果你的Elasticsearch安装了Shield,比如我的Elasticsearch的用户名和密码都是tribe_user,哦,忘了说了,我们的Elasticsearch跑在本机。
packetbeat.yml的详细配置可参见:https://www.elastic.co/guide/e ... .html 
output:
elasticsearch:
enabled: true
hosts: ["localhost:9200"]
username: "tribe_user"
password: "tribe_user"

现在可以运行命令启动packetbeat了,默认会监听所有内置的协议,如HTTP、DNS等。
./packetbeat -e -c etc/packetbeat.yml  -d "publish"

介绍一下常用的参数:
-N dry run模式,不实际output存储日志
-e 控制台输出调试日志
-d 仅显示对应logger的日志

好的,我们打开几个网页,控制台会有相应的输出,如下:
2015/12/29 14:24:39.965037 preprocess.go:37: DBG  Start Preprocessing
2015/12/29 14:24:39.965366 publish.go:98: DBG Publish: {
"@timestamp": "2015-12-29T14:24:39.709Z",
"beat": {
"hostname": "medcls-MacBook.local",
"name": "medcls-MacBook.local"
},
"bytes_in": 31,
"bytes_out": 115,
"client_ip": "192.168.3.10",
"client_port": 53669,
"client_proc": "",
"client_server": "",
"count": 1,
"direction": "out",
"dns": {
"additionals_count": 0,
"answers": [
{
"class": "IN",
"data": "www.a.shifen.com",
"name": "sp2.baidu.com",
"ttl": 333,
"type": "CNAME"
}
],
"answers_count": 1,
"authorities": [
{
"class": "IN",
"data": "ns1.a.shifen.com",
"expire": 86400,
"minimum": 3600,
"name": "a.shifen.com",
"refresh": 5,
"retry": 5,
"rname": "baidu_dns_master.baidu.com",
"serial": 1512240003,
"ttl": 12,
"type": "SOA"
}
],
"authorities_count": 1,
"flags": {
"authoritative": false,
"recursion_allowed": true,
"recursion_desired": true,
"truncated_response": false
},
"id": 7435,
"op_code": "QUERY",
"question": {
"class": "IN",
"name": "sp2.baidu.com",
"type": "AAAA"
},
"response_code": "NOERROR"
},
"ip": "192.168.3.1",
"method": "QUERY",
"port": 53,
"proc": "",
"query": "class IN, type AAAA, sp2.baidu.com",
"resource": "sp2.baidu.com",
"responsetime": 18,
"server": "",
"status": "OK",
"transport": "udp",
"type": "dns"
}
2015/12/29 14:24:39.965774 preprocess.go:94: DBG Forward preprocessed events
2015/12/29 14:24:39.965796 async.go:42: DBG async forward to outputers (1)
2015/12/29 14:24:40.099973 output.go:103: DBG output worker: publish 2 events

然后Elasticsearch应该就会有数据进去了,我们看看:
curl http://localhost:9200/_cat/indices\?pretty\=true -u tribe_user:tribe_user
yellow open packetbeat-2015.12.29 5 1 135 0 561.2kb 561.2kb

至此,packetbeat源码的build成功,我们整个开发流程已经跑通了,下一节正式开始介绍SMTP协议的扩展。 收起阅读 »

关于提示TooManyClauses[maxClauseCount is set to 1024]的问题。

今天头一次出现的,其实也不算什么问题。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。
我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。
继续阅读 »
今天头一次出现的,其实也不算什么问题。
通过数据库获取到了1126个条件数据,然后叠加进bool进行查询,直接抛出个异常:
TooManyClauses[maxClauseCount is set to 1024]
问了Medcl大神,得知是超过默认搜索条件大小的问题,可以通过参数修改
index.query.bool.max_clause_count: 4096
M大也说,太BT了。。。 这么多条件查询。。。
我也觉得挺BT的,自己想想都有点小激动,太佩服自己了。。。 收起阅读 »

Day24: Elasticsearch添加Shield后TransportClient如何连接?

Shield是Elasticsearch一个安全防护插件,提供了权限访问控制和日志审计功能,企业可以很方便的和LDAP或是ActiveDirectory进行集成,重用现有的安全认证体系.

shield-triad.png


Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.

关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.

1.HTTP方式
现在直接访问es的http接口就会报错

curl http://localhost:9200

{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}

shield支持HttpBasic验证,所以正确的访问姿势是:

curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }

如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.

2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:

<repositories> 
<repository> 
<id>elasticsearch-releases</id> 
<url>https://maven.elasticsearch.or ... gt%3B 
<releases> <enabled>true</enabled> </releases> 
<snapshots> <enabled>false</enabled> </snapshots> 
</repository> 
</repositories>

2.2 添加依赖的配置

<dependency> 
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency

2.3 构建TransportClient的地方增加访问用户的配置

import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;

String clusterName="elasticsearch"; String ip= "127.0.0.1"; 
Settings settings = Settings.settingsBuilder()   
.put("cluster.name", clusterName)
 .put("shield.user", "tribe_user:tribe_user") 
.build(); 
try { client = TransportClient.builder() 
.addPlugin(ShieldPlugin.class) 
.settings(settings).build() 
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300)); 
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray()));   client.prepareSearch()
.putHeader("Authorization", token).get();   } 
catch (UnknownHostException e) 
{ logger.error("es",e); }
 
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252
继续阅读 »
Shield是Elasticsearch一个安全防护插件,提供了权限访问控制和日志审计功能,企业可以很方便的和LDAP或是ActiveDirectory进行集成,重用现有的安全认证体系.

shield-triad.png


Elasticsearch使用了Shield后,Elasticsearch就需要权限才能访问了,和默认的调用方式有些不同,下面简单介绍一下HTTP和TCP两种方式的连接.

关于Shield的安装和配置我这里不就具体介绍,创建了一个用户名和密码都是tribe_user的用户,权限是admin.

1.HTTP方式
现在直接访问es的http接口就会报错

curl http://localhost:9200

{"error":{"root_cause":[{"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}}],"type":"security_exception","reason":"missing authentication token for REST request [/]","header":{"WWW-Authenticate":"Basic realm=\"shield\""}},"status":401}

shield支持HttpBasic验证,所以正确的访问姿势是:

curl -u tribe_user:tribe_user http://localhost:9200 { "name" : "Melter", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "805c528f3167980046f224310f9147fa745e5371", "build_timestamp" : "2015-12-09T20:23:16Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }

如果是浏览器访问的话,第一次访问会弹出验证窗口,后续只要不关闭这个浏览器保持这个session就能一直访问.
注意http basic是不安全的认证方式,仅供开发调试使用,生产环境还需要结合HTTPS的加密通道使用.

2.TransportClient方式的访问Shield加防的Elasticsearch,稍微麻烦点,需要依赖Shield的包,步骤如下:
2.1 如果你是maven管理的项目,在pom.xml文件里添加Elasticsearch的maven仓库源,如下:

<repositories> 
<repository> 
<id>elasticsearch-releases</id> 
<url>https://maven.elasticsearch.or ... gt%3B 
<releases> <enabled>true</enabled> </releases> 
<snapshots> <enabled>false</enabled> </snapshots> 
</repository> 
</repositories>

2.2 添加依赖的配置

<dependency> 
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>shield</artifactId>
<version>2.1.1</version>
</dependency

2.3 构建TransportClient的地方增加访问用户的配置

import org.elasticsearch.shield.ShieldPlugin; import org.elasticsearch.shield.authc.support.SecuredString; import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue;

String clusterName="elasticsearch"; String ip= "127.0.0.1"; 
Settings settings = Settings.settingsBuilder()   
.put("cluster.name", clusterName)
 .put("shield.user", "tribe_user:tribe_user") 
.build(); 
try { client = TransportClient.builder() 
.addPlugin(ShieldPlugin.class) 
.settings(settings).build() 
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ip),9300)); 
String token = basicAuthHeaderValue("tribe_user", new SecuredString("tribe_user".toCharArray()));   client.prepareSearch()
.putHeader("Authorization", token).get();   } 
catch (UnknownHostException e) 
{ logger.error("es",e); }
 
现在的编辑器贴代码有点恶心,可以看这里:
http://log.medcl.net/item/2015 ... -1252 收起阅读 »

感谢elastic送来的圣诞礼物!

圣诞节收到了elastic圣诞老人medcl送来的圣诞礼物,在此特别感谢!
发个赞吧!
晒图@!

IMG_0245.JPG


IMG_0246.JPG


IMG_0247.JPG


IMG_0248.JPG


IMG_0249.JPG


IMG_0250.JPG


IMG_0251.JPG

居然有个瓶起子,看来不喝一台是不行了。。。
sugru据说是什么都能粘的‘硅胶’。。。
elastic的周边越来越强大了。
特别感谢medcl大神对中国地区elastic用户的关照!
继续阅读 »
圣诞节收到了elastic圣诞老人medcl送来的圣诞礼物,在此特别感谢!
发个赞吧!
晒图@!

IMG_0245.JPG


IMG_0246.JPG


IMG_0247.JPG


IMG_0248.JPG


IMG_0249.JPG


IMG_0250.JPG


IMG_0251.JPG

居然有个瓶起子,看来不喝一台是不行了。。。
sugru据说是什么都能粘的‘硅胶’。。。
elastic的周边越来越强大了。
特别感谢medcl大神对中国地区elastic用户的关照! 收起阅读 »

Day 23 谈谈ES 的Recovery

Note: 本文针对ES2.x
 Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
  • 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
  • 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
  • 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。


通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:

cat_health.png



可能的状态及含义:


Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整



Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。

减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。


gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes


以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。

在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:


gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes



举例来说,对于一个有10个data node的集群,如果有以下的设置:


gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8



那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。


减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:

cluster_settings.png



然后在结点重启完成加入集群后,再重新打开:

put_cluster_settings.png


这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启结点
  5. 重新开启集群shard allocation 
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序


(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。

从主片恢复数据到副片需要经历3个阶段:
  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。


可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。

万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery

其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
 
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。
继续阅读 »
Note: 本文针对ES2.x
 Recovery是指将一个索引的未分配shard分配到一个结点的过程。 在快照恢复,更改索引复制片数量,结点故障或者结点启动时发生。由于master持有整个集群的状态信息,因此可以判断出哪些shard需要做再分配,以及分配到哪个结点。例如:
  • 如果某个shard主片在,副片所在结点挂了,那么选择另外一个可用结点,将副片分配(allocate)上去,然后进行主从片的复制。
  • 如果某个shard的主片所在结点挂了,副片还在,那么将副片升级为主片,然后做主副复制。
  • 如果某个shard的主副片所在结点都挂了,则暂时无法恢复,等待持有相关数据的结点重新加入集群后,从结点上恢复主分片,再选择某个结点分配复制片,并从主分片同步数据。


通过CAT health API,我们可以查看集群的状态,从而获知数据的完整性情况:

cat_health.png



可能的状态及含义:


Green: 所有的shard主副片都完好的
Yellow: 所有shard的主片都完好,部分副片没有了,数据完整性依然完好。
Red: 某些shard的主副片都没有了,对应的索引数据不完整



Recovery过程要消耗额外的资源,CPU、内存、结点之间的网络带宽等等。 这些额外的资源消耗,有可能会导致集群的服务能力降级,或者一部分功能暂时不可用。了解一些Recovery的过程和相关的配置参数,对于减小recovery带来的资源消耗,加快集群恢复过程都是很有帮助的。

减少集群Full Restart造成的数据来回拷贝
集群可能会有整体重启的需要,比如需要升级硬件、升级操作系统或者升级ES大版本。重启所有结点可能带来的一个问题: 某些结点可能先于其他结点加入集群。 先加入集群的结点可能已经可以选举好master,并立即启动了recovery的过程,由于这个时候整个集群数据还不完整,master会指示一些结点之间相互开始复制数据。 那些晚到的结点,一旦发现本地的数据已经被复制到其他结点,则直接删除掉本地“失效”的数据。 当整个集群恢复完毕后,数据分布不均衡显然是不均衡的,master会触发rebalance过程,将数据在结点之间挪动。整个过程无谓消耗了大量的网络流量。 合理设置recovery相关参数则可以防范这种问题的发生。


gateway.expected_nodes
gateway.expected_master_nodes
gateway.expected_data_nodes


以上三个参数是说集群里一旦有多少个结点就立即开始recovery过程。 不同之处在于,第一个参数指的是master或者data都算在内,而后面两个参数则分指master和data node。

在期待的节点数条件满足之前, recovery过程会等待gateway.recover_after_time (默认5分钟) 这么长时间,一旦等待超时,则会根据以下条件判断是否启动:


gateway.recover_after_nodes
gateway.recover_after_master_nodes
gateway.recover_after_data_nodes



举例来说,对于一个有10个data node的集群,如果有以下的设置:


gateway.expected_data_nodes: 10
gateway.recover_after_time: 5m
gateway.recover_after_data_nodes: 8



那么集群5分钟以内10个data node都加入了,或者5分钟以后8个以上的data node加入了,都会立即启动recovery过程。


减少主副本之间的数据复制
如果不是full restart,而是重启单个data node,仍然会造成数据在不同结点之间来回复制。为避免这个问题,可以在重启之前,先关闭集群的shard allocation:

cluster_settings.png



然后在结点重启完成加入集群后,再重新打开:

put_cluster_settings.png


这样在结点重启完成后,尽量多的从本地直接恢复数据。
但是在ES1.6版本之前,即使做了以上措施,仍然会发现有大量主副本之间的数据拷贝。从表面去看,这点很让人不能理解。 主副本数据完全一致,ES应该直接从副本本地恢复数据就好了,为什么要重新从主片再复制一遍呢? 原因在于Recovery是简单对比主副本的segment file来判断哪些数据一致可以本地恢复,哪些不一致需要远端拷贝的。而不同结点的segment merge是完全独立运行的,可能导致主副本merge的深度不完全一样,从而造成即使文档集完全一样,产生的segment file却不完全一样。
为了解决这个问题,ES1.6版本以后加入了synced flush的新特性。 对于5分钟没有更新过的shard,会自动synced flush一下,实质是为对应的shard加了一个synced flush ID。这样当重启结点的时候,先对比一下shard的synced flush ID,就可以知道两个shard是否完全相同,避免了不必要的segment file拷贝,极大加快了冷索引的恢复速度。
需要注意的是synced flush只对冷索引有效,对于热索引(5分钟内有更新的索引)没有作用。 如果重启的结点包含有热索引,那么还是免不了大量的文件拷贝。因此在重启一个结点之前,最好按照以下步骤执行,recovery几乎可以瞬间完成:
  1. 暂停数据写入程序
  2. 关闭集群shard allocation
  3. 手动执行POST /_flush/synced
  4. 重启结点
  5. 重新开启集群shard allocation 
  6. 等待recovery完成,集群health status变成green
  7. 重新开启数据写入程序


(特别大的)热索引为何恢复慢
对于冷索引,由于数据不再更新,利用synced flush特性,可以快速直接从本地恢复数据。 而对于热索引,特别是shard很大的热索引,除了synced flush派不上用场需要大量跨结点拷贝segment file以外,translog recovery是导致慢的更重要的原因。

从主片恢复数据到副片需要经历3个阶段:
  1. 对主片上的segment file做一个快照,然后拷贝到复制片分配到的结点。数据拷贝期间,不会阻塞索引请求,新增索引操作记录到translog里。
  2. 对translog做一个快照,此快照包含第一阶段新增的索引请求,然后重放快照里的索引操作。此阶段仍然不阻塞索引请求,新增索引操作记录到translog里。
  3. 为了能达到主副片完全同步,阻塞掉新索引请求,然后重放阶段二新增的translog操作。


可见,在recovery完成之前,translog是不能够被清除掉的(禁用掉正常运作期间后台的flush操作)。如果shard比较大,第一阶段耗时很长,会导致此阶段产生的translog很大。重放translog比起简单的文件拷贝耗时要长得多,因此第二阶段的translog耗时也会显著增加。等到第三阶段,需要重放的translog可能会比第二阶段还要多。 而第三阶段是会阻塞新索引写入的,在对写入实时性要求很高的场合,就会非常影响用户体验。 因此,要加快大的热索引恢复速度,最好的方式是遵从上一节提到的方法: 暂停新数据写入,手动sync flush,等待数据恢复完成后,重新开启数据写入,这样可以将数据延迟影响可以降到最低。

万一遇到Recovery慢,想知道进度怎么办呢? CAT Recovery API可以显示详细的recovery各个阶段的状态。 这个API怎么用就不在这里赘述了,参考: CAT Recovery

其他Recovery相关的专家级设置
还有其他一些专家级的设置(参见: recovery)可以影响recovery的速度,但提升速度的代价是更多的资源消耗,因此在生产集群上调整这些参数需要结合实际情况谨慎调整,一旦影响应用要立即调整回来。 对于搜索并发量要求高,延迟要求低的场合,默认设置一般就不要去动了。 对于日志实时分析类对于搜索延迟要求不高,但对于数据写入延迟期望比较低的场合,可以适当调大indices.recovery.max_bytes_per_sec,提升recovery速度,减少数据写入被阻塞的时长。
 
最后要说的一点是ES的版本迭代很快,对于Recovery的机制也在不断的优化中。 其中有一些版本甚至引入了一些bug,比如在ES1.4.x有严重的translog recovery bug,导致大的索引trans log recovery几乎无法完成 (issue #9226)  。因此实际使用中如果遇到问题,最好在Github的issue list里搜索一下,看是否使用的版本有其他人反映同样的问题。 收起阅读 »

elasticsearch-rtf更新至2.1.1

地址:https://github.com/medcl/elasticsearch-rtf
 
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
 
包含插件:
elasticsearch-analysis-ik-1.6.2        elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2     elasticsearch-analysis-stconvert-1.6.1

使用:
cd elasticsearch/bin 
./elasticsearch
 
 
继续阅读 »
地址:https://github.com/medcl/elasticsearch-rtf
 
使用git快速签出最新版:
git clone git://github.com/medcl/elasticsearch-rtf.git -b master --depth 1
 
包含插件:
elasticsearch-analysis-ik-1.6.2        elasticsearch-analysis-pinyin-1.5.2
elasticsearch-analysis-mmseg-1.6.2     elasticsearch-analysis-stconvert-1.6.1

使用:
cd elasticsearch/bin 
./elasticsearch
 
  收起阅读 »

Day22:pipeline aggregation计算日留存率示例

网友们多次讨论如何利用 ES 计算用户留存率的问题。这是个比较尴尬的情况,如果多次请求再自己做一下运算,问题很简单。但如果想要一次请求得到最终结果,在没有完整 JOIN 支持的 ES 里又显得比较难以完成。

目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。

显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
    "new_users" : {

      "filters" : {
        "filters" : [
          {
            "range" : {
              "register_time" : {
                "gte" : "2015-12-23",
                "lt" : "2015-12-24"
              }
            }
          }
        ]
      },
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:
  1. pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
  2. bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。

最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。
 
继续阅读 »
网友们多次讨论如何利用 ES 计算用户留存率的问题。这是个比较尴尬的情况,如果多次请求再自己做一下运算,问题很简单。但如果想要一次请求得到最终结果,在没有完整 JOIN 支持的 ES 里又显得比较难以完成。

目前我想到的比较容易达成的做法,是我们在记录用户登录操作日志的时候,把该用户的注册时间也同期输出。也就是说,这个索引的 mapping 是下面这样:
curl -XPUT 'http://127.0.0.1:9200/login-2015.12.23/' -d '{
"settings" : {
"number_of_shards" : 1
},
"mappings" : {
"logs" : {
"properties" : {
"uid" : { "type" : "string", "index" : "not_analyzed" },
"register_time" : { "type" : "date", "index" : "not_analyzed" },
"login_time" : { "type" : "date", "index" : "not_analyzed" }
}
}
}
}'
那么实际记录的日志会类似这样:
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.23","_type":"logs"}}
{"uid":"2","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-23T12:00:00Z"}
{"index":{"_index":"login-2015.12.24","_type":"logs"}}
{"uid":"1","register_time":"2015-12-23T12:00:00Z","login_time":"2015-12-24T12:00:00Z"}
这段我虚拟的数据,表示 uid 为 1 的用户,23 号注册并登录,24 号再次登录;uid 为 2 的用户,23 号注册并登录,24 号无登录。

显然以这短短 3 行示例数据,我们口算都知道单日留存率是 50% 了。那么怎么通过一次 ES 请求也算出来呢?下面就要用到 ES 2.0 新增加的 pipeline aggregation 了。
curl -XPOST 'http://127.0.0.1:9200/login-2015.12.23,login-2015.12.24/_search' -d'
{
"size" : 0,
"aggs" : {
    "new_users" : {

      "filters" : {
        "filters" : [
          {
            "range" : {
              "register_time" : {
                "gte" : "2015-12-23",
                "lt" : "2015-12-24"
              }
            }
          }
        ]
      },
"aggs" : {
"register_count" : {
"cardinality" : {
"field" : "uid"
}
},
"today" : {
"filter" : {
"range" : {
"login_time" : {
"gte" : "2015-12-24",
"lt" : "2015-12-25"
}
}
},
"aggs" : {
"login_count" : {
"cardinality" : {
"field" : "uid"
}
}
}
},
"retention" : {
"bucket_script" : {
"buckets_path" : {
"today_count" : "today>login_count",
"yesterday_count" : "register_count"
},
"script" : {
"lang" : "expression",
"inline" : "today_count / yesterday_count"
}
}
}
}
}
}
}'
这个 pipeline aggregation 在使用上有几个要点:
  1. pipeline agg 的 parent agg 必须是返回数组的 buckets agg 类型。我这里曾经打算使用 filter agg 直接请求register_time:["now-2d" TO "now-1d"],结果报错说找不到 buckets_path 的 START_OBJECT。所以改用了 filters agg 的数组格式。
  2. bucket_script agg 同样受 scripting module 的影响。也就是说,官网示例里的"script":"today_count / yesterday_count" 这种写法,是采用了 groovy 引擎的 inline 模式。在 ES 2.0 的默认设置下,是被禁止运行的!所以,应该按照 scripting module 的统一要求,改写成 file 形式存放到 config/scripts下;或者改用 Lucene Expression 运行。考虑到 pipeline aggregation 只支持数值运算,这里使用 groovy 价值不大,所以直接指明 lang 参数即可。

最终这次请求的响应如下:
{
"took" : 3,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"hits" : {
"total" : 3,
"max_score" : 0.0,
"hits" : [ ]
},
"aggregations" : {
"new_users" : {
"buckets" : [ {
"doc_count" : 3,
"today" : {
"doc_count" : 1,
"login_count" : {
"value" : 1
}
},
"register_count" : {
"value" : 2
},
"retention" : {
"value" : 0.5
}
} ]
}
}
}
这个 retention 数据,就是我们要求解的 0.5 了。
  收起阅读 »

Day21: 如何快速把Kibana4 Discover页的Document Table导出成CSV

我们都知道Kibana4里,所有的aggregation生成的visualize都可以在请求细节查看里选择`Export`成raw或者formatted。其中formatted就是CSV文件。
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
 
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
 

 
然后点击这个『CSV』按钮,会弹出一片提示:

可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
 
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集!
继续阅读 »
我们都知道Kibana4里,所有的aggregation生成的visualize都可以在请求细节查看里选择`Export`成raw或者formatted。其中formatted就是CSV文件。
但是Discover页上,除了顶部的date_histogram这个visualize,更重要的是下边的search document table的内容。当我们通过搜索发现异常信息,想要长期保存证据,或者分享给其他没有权限的外部人员的时候,单纯保存search到es,或者分享单条日志的link都不顶用,还是需要能导出成一个文件。
可惜Kibana4没有针对search document table的导出!
国外一家叫MineWhat的公司,最近公开了一个非常细小的创新方案,意图解决这个问题。他们的方式是:避免修改Kibana源码,而通过chrome浏览器插件完成……
点击这个地址安装chrome插件:https://chrome.google.com/webs ... lated
 
然后再访问Kibana的时候,你会发现自己的搜索框最右侧多了一个CSV按钮:
 

 
然后点击这个『CSV』按钮,会弹出一片提示:

可以点击选择,把search document table内容保存到本机的复制粘贴板,还是Google Drive网盘。
我们当然选择本机……
然后打开本地的文本文件,Ctrl+V,就看到编辑器里出现了整个CSV内容。
实测下来,发现有个小问题,粘贴出来的数据里丢掉了空格~不过聊胜于无吧,还是介绍给大家一试。
 
注意:这个功能只会导出目前页面上已经展示出来的table内容。并不代表其使用了scroll API去ES拉取全部结果集! 收起阅读 »

简繁体转换插件更新:elasticsearch-analysis-stconvert 升级支持2.0

版本1.5.0 支持es2.0.0
 
项目地址:https://github.com/medcl/elast ... nvert 
 
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es

这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
 
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
 
详细配置和使用请参照上面的地址。
 
继续阅读 »
版本1.5.0 支持es2.0.0
 
项目地址:https://github.com/medcl/elast ... nvert 
 
mvn 编译打包,拷贝release下面的zip并解压到你的es plugins目录即可,需要重启es

这个插件帮你处理简繁体,简繁体全部统一成简体或繁体,不管输入的简体还是繁体,都能得到搜索结果
 
比如:
不管输入的是『北京国际电视台』的还是『北京國際電視臺』都能命中。
 
详细配置和使用请参照上面的地址。
  收起阅读 »

Day20 利用tcpdump和kafka协议定位不合法topic的来源

事情是这样滴,  我们在很多linux机器上部署了logstash采集日志, topic_id用的是 test-%{type}, 但非常不幸的是,  有些机器的某些日志, 没有带上type字段. 
 
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
 
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)

把可用的信息瞬间淹没.  
 
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
 
我想做的, 就是把有问题的logstash机器找出来.
 
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'

dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
 
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
 
重点来了!  向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
 
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求.  topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
 
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
 
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37 
最后呢, 再提2点结束.
 
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'

2.  不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了.
继续阅读 »
事情是这样滴,  我们在很多linux机器上部署了logstash采集日志, topic_id用的是 test-%{type}, 但非常不幸的是,  有些机器的某些日志, 没有带上type字段. 
 
因为在topic名字里面不能含有%字符, 所以kafka server的日志里面大量报错. Logstash每发一次数据, kafka就会生成下面一大段错误
 
[2015-12-23 23:20:47,749] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 48; ClientId: ; Topics: test-%{type} (kafka.server.KafkaApis)
kafka.common.InvalidTopicException: topic name test-%{type} is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'
at kafka.common.Topic$.validate(Topic.scala:42)
at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:181)
at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:172)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:520)
at kafka.server.KafkaApis$$anonfun$19.apply(KafkaApis.scala:503)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at kafka.server.KafkaApis.getTopicMetadata(KafkaApis.scala:503)
at kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:542)
at kafka.server.KafkaApis.handle(KafkaApis.scala:62)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:744)

把可用的信息瞬间淹没.  
 
更不幸的是, 错误日志里面并没有客户来源的信息, 根本不知道是哪些机器还有问题.
 
我想做的, 就是把有问题的logstash机器找出来.
 
我就先事后诸葛亮一把, 用下面这个命令就可以把配置错误的机器找出来(也可以没有任何结果, 原因后面说)
tcpdump -nn 'dst port 9092 and tcp[37]==3 and tcp[57]==37'

dst port 9092就不说了, 这是kafka的默认端口, 后面的tcp[37]==3 and tcp[57]==37是啥意思呢, 我们慢慢说.
 
先要说一下: client要生产数据到kafka, 在发送消息之前, 首先得向kafka"询问"这个topic的metadata信息, 包括有几个partiton, 每个parttion在哪个服务器上面等信息, 拿到这些信息之后, 才能把消息发到正确的kafka服务器上.
 
重点来了!  向kafka"询问"topic的metadata, 其实就是发送一个tcp包过去, 我们需要知道的是这个tcp包的格式. 我已经帮你找到了, 就在这里 https://cwiki.apache.org/confl ... quest
 
看完文档之后(半小时或者更长时间过去了), 你就会知道, tcp body(除去tcp head)里面的第6个字节是03, 代表这是一个TopicMetadataRequest请求.  topicname里面的%字符出现在tcp body的第26个字节, %的ascii码是37
 
tcp头一般是20个字符, 所以加上这20个字节, 然后下标从0算起, 就是tcp[20+5]==3 and tcp[20+25]==37, 也就是tcp[25]==3 and tcp[45]==37.
 
咦, 为啥和开始写的那个过滤条件不一样呢, 因为tcp头"一般"是20字节, 但是如果其中还包含了tcp选项的话, 就可能比20多了. 反正我这里看到的的tcp头都是32个字节, 所以不能加20, 要加32, 也就是最开始写的 tcp[37]==3 and tcp[57]==37 
最后呢, 再提2点结束.
 
1. 终极大杀器, 不过tcp头的长度是多少, 20也好, 32也好, 或者其他也好, 下面这样都能搞定
tcpdump -nn 'dst port 9092 and tcp[(tcp[12]>>2)+5]==3 and tcp[(tcp[12]>>2)+25]==37'

2.  不要一上来就这么高端, 其实我最开始是这样先确定问题的
tcpdump -vv -nn -X -s 0 dst port 9092 | grep -C 5 "test-"
你问我为啥不把test-t{type}写完整? 不是为了省事, 其实是因为很不幸, test-%{t 到这里的时候, 正好换行了. 收起阅读 »

Day19 ES内存那点事

【携程旅行网  吴晓刚】
 
注: 本文主要针对ES 2.x。

 “该给ES分配多少内存?” 
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”

以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。

要理解ES如何使用内存,先要理解下面两个基本事实:
1.  ES是JAVA应用
2.  底层存储引擎是基于Lucene的

看似很普通是吗?但其实没多少人真正理解这意味着什么。 

首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。

其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。 

基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。

Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。

那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
1.  segment memory
2.  filter cache
3.  field data cache
4.  bulk queue
5.  indexing buffer
6.  state buffer
7.  超大搜索聚合结果集的fetch
8. 对高cardinality字段做terms aggregation


Segment Memory
Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。

下面是词典索引和词典主存储之间的一个对应关系图:

lucene_index.png


Lucene  file的完整数据结构参见Apache Lucene - Index File Formats

说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。

怎么知道segment memory占用情况呢?  CAT API可以给出答案。
1.  查看一个索引所有segment的memory占用情况:

seg_mem.png


2.  查看一个node上所有segment占用的memory总和:

seg_mem_node.png



那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
1.  删除不用的索引
2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。

Filter Cache (5.x里叫做Request cache)
Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。


Field Data cache
在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。


Bulk Queue
一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。


Indexing Buffer
Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。


Cluster State Buffer
ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。


超大搜索聚合结果集的fetch
ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。
 
对高cardinality字段做terms aggregation
所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。  在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。


小结:
  1. 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2. 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3. 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
  4. cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5. 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
  6. 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。

继续阅读 »
【携程旅行网  吴晓刚】
 
注: 本文主要针对ES 2.x。

 “该给ES分配多少内存?” 
“JVM参数如何优化?“
“为何我的Heap占用这么高?”
“为何经常有某个field的数据量超出内存限制的异常?“
“为何感觉上没多少数据,也会经常Out Of Memory?”

以上问题,显然没有一个统一的数学公式能够给出答案。 和数据库类似,ES对于内存的消耗,和很多因素相关,诸如数据总量、mapping设置、查询方式、查询频度等等。默认的设置虽开箱即用,但不能适用每一种使用场景。作为ES的开发、运维人员,如果不了解ES对内存使用的一些基本原理,就很难针对特有的应用场景,有效的测试、规划和管理集群,从而踩到各种坑,被各种问题挫败。

要理解ES如何使用内存,先要理解下面两个基本事实:
1.  ES是JAVA应用
2.  底层存储引擎是基于Lucene的

看似很普通是吗?但其实没多少人真正理解这意味着什么。 

首先,作为一个JAVA应用,就脱离不开JVM和GC。很多人上手ES的时候,对GC一点概念都没有就去网上抄各种JVM“优化”参数,却仍然被heap不够用,内存溢出这样的问题搞得焦头烂额。了解JVM GC的概念和基本工作机制是很有必要的,本文不在此做过多探讨,读者可以自行Google相关资料进行学习。如何知道ES heap是否真的有压力了? 推荐阅读这篇博客:Understanding Memory Pressure Indicator。 即使对于JVM GC机制不够熟悉,头脑里还是需要有这么一个基本概念: 应用层面生成大量长生命周期的对象,是给heap造成压力的主要原因,例如读取一大片数据在内存中进行排序,或者在heap内部建cache缓存大量数据。如果GC释放的空间有限,而应用层面持续大量申请新对象,GC频度就开始上升,同时会消耗掉很多CPU时间。严重时可能恶性循环,导致整个集群停工。因此在使用ES的过程中,要知道哪些设置和操作容易造成以上问题,有针对性的予以规避。

其次,Lucene的倒排索引(Inverted Index)是先在内存里生成,然后定期以段文件(segment file)的形式刷到磁盘的。每个段实际就是一个完整的倒排索引,并且一旦写到磁盘上就不会做修改。 API层面的文档更新和删除实际上是增量写入的一种特殊文档,会保存在新的段里。不变的段文件易于被操作系统cache,热数据几乎等效于内存访问。 

基于以上2个基本事实,我们不难理解,为何官方建议的heap size不要超过系统可用内存的一半。heap以外的内存并不会被浪费,操作系统会很开心的利用他们来cache被用读取过的段文件。

Heap分配多少合适?遵从官方建议就没错。 不要超过系统可用内存的一半,并且不要超过32GB。JVM参数呢?对于初级用户来说,并不需要做特别调整,仍然遵从官方的建议,将xms和xmx设置成和heap一样大小,避免动态分配heap size就好了。虽然有针对性的调整JVM参数可以带来些许GC效率的提升,当有一些“坏”用例的时候,这些调整并不会有什么魔法效果帮你减轻heap压力,甚至可能让问题更糟糕。

那么,ES的heap是如何被瓜分掉的? 说几个我知道的内存消耗大户并分别做解读:
1.  segment memory
2.  filter cache
3.  field data cache
4.  bulk queue
5.  indexing buffer
6.  state buffer
7.  超大搜索聚合结果集的fetch
8. 对高cardinality字段做terms aggregation


Segment Memory
Segment不是file吗?segment memory又是什么?前面提到过,一个segment是一个完备的lucene倒排索引,而倒排索引是通过词典 (Term Dictionary)到文档列表(Postings List)的映射关系,快速做查询的。 由于词典的size会很大,全部装载到heap里不现实,因此Lucene为词典做了一层前缀索引(Term Index),这个索引在Lucene4.0以后采用的数据结构是FST (Finite State Transducer)。 这种数据结构占用空间很小,Lucene打开索引的时候将其全量装载到内存中,加快磁盘上词典查询速度的同时减少随机磁盘访问次数。

下面是词典索引和词典主存储之间的一个对应关系图:

lucene_index.png


Lucene  file的完整数据结构参见Apache Lucene - Index File Formats

说了这么多,要传达的一个意思就是,ES的data node存储数据并非只是耗费磁盘空间的,为了加速数据的访问,每个segment都有会一些索引数据驻留在heap里。因此segment越多,瓜分掉的heap也越多,并且这部分heap是无法被GC掉的! 理解这点对于监控和管理集群容量很重要,当一个node的segment memory占用过多的时候,就需要考虑删除、归档数据,或者扩容了。

怎么知道segment memory占用情况呢?  CAT API可以给出答案。
1.  查看一个索引所有segment的memory占用情况:

seg_mem.png


2.  查看一个node上所有segment占用的memory总和:

seg_mem_node.png



那么有哪些途径减少data node上的segment memory占用呢? 总结起来有三种方法:
1.  删除不用的索引
2.  关闭索引 (文件仍然存在于磁盘,只是释放掉内存)。需要的时候可以重新打开。
3.  定期对不再更新的索引做optimize (ES2.0以后更改为force merge api)。这Optimze的实质是对segment file强制做合并,可以节省大量的segment memory。

Filter Cache (5.x里叫做Request cache)
Filter cache是用来缓存使用过的filter的结果集的,需要注意的是这个缓存也是常驻heap,在被evict掉之前,是无法被GC的。我的经验是默认的10% heap设置工作得够好了,如果实际使用中heap没什么压力的情况下,才考虑加大这个设置。


Field Data cache
在有大量排序、数据聚合的应用场景,可以说field data cache是性能和稳定性的杀手。 对搜索结果做排序或者聚合操作,需要将倒排索引里的数据进行解析,按列构造成docid->value的形式才能够做后续快速计算。 对于数据量很大的索引,这个构造过程会非常耗费时间,因此ES 2.0以前的版本会将构造好的数据缓存起来,提升性能。但是由于heap空间有限,当遇到用户对海量数据做计算的时候,就很容易导致heap吃紧,集群频繁GC,根本无法完成计算过程。 ES2.0以后,正式默认启用Doc Values特性(1.x需要手动更改mapping开启),将field data在indexing time构建在磁盘上,经过一系列优化,可以达到比之前采用field data cache机制更好的性能。因此需要限制对field data cache的使用,最好是完全不用,可以极大释放heap压力。 需要注意的是,很多同学已经升级到ES2.0,或者1.0里已经设置mapping启用了doc values,在kibana里仍然会遇到问题。 这里一个陷阱就在于kibana的table panel可以对所有字段排序。 设想如果有一个字段是analyzed过的,而用户去点击对应字段的排序表头是什么后果? 一来排序的结果并不是用户想要的,排序的对象实际是词典; 二来analyzed过的字段无法利用doc values,需要装载到field data cache,数据量很大的情况下可能集群就在忙着GC或者根本出不来结果。


Bulk Queue
一般来说,Bulk queue不会消耗很多的heap,但是见过一些用户为了提高bulk的速度,客户端设置了很大的并发量,并且将bulk Queue设置到不可思议的大,比如好几千。 Bulk Queue是做什么用的?当所有的bulk thread都在忙,无法响应新的bulk request的时候,将request在内存里排列起来,然后慢慢清掉。 这在应对短暂的请求爆发的时候有用,但是如果集群本身索引速度一直跟不上,设置的好几千的queue都满了会是什么状况呢? 取决于一个bulk的数据量大小,乘上queue的大小,heap很有可能就不够用,内存溢出了。一般来说官方默认的thread pool设置已经能很好的工作了,建议不要随意去“调优”相关的设置,很多时候都是适得其反的效果。


Indexing Buffer
Indexing Buffer是用来缓存新数据,当其满了或者refresh/flush interval到了,就会以segment file的形式写入到磁盘。 这个参数的默认值是10% heap size。根据经验,这个默认值也能够很好的工作,应对很大的索引吞吐量。 但有些用户认为这个buffer越大吞吐量越高,因此见过有用户将其设置为40%的。到了极端的情况,写入速度很高的时候,40%都被占用,导致OOM。


Cluster State Buffer
ES被设计成每个node都可以响应用户的api请求,因此每个node的内存里都包含有一份集群状态的拷贝。这个cluster state包含诸如集群有多少个node,多少个index,每个index的mapping是什么?有少shard,每个shard的分配情况等等 (ES有各类stats api获取这类数据)。 在一个规模很大的集群,这个状态信息可能会非常大的,耗用的内存空间就不可忽视了。并且在ES2.0之前的版本,state的更新是由master node做完以后全量散播到其他结点的。 频繁的状态更新就可以给heap带来很大的压力。 在超大规模集群的情况下,可以考虑分集群并通过tribe node连接做到对用户api的透明,这样可以保证每个集群里的state信息不会膨胀得过大。


超大搜索聚合结果集的fetch
ES是分布式搜索引擎,搜索和聚合计算除了在各个data node并行计算以外,还需要将结果返回给汇总节点进行汇总和排序后再返回。无论是搜索,还是聚合,如果返回结果的size设置过大,都会给heap造成很大的压力,特别是数据汇聚节点。超大的size多数情况下都是用户用例不对,比如本来是想计算cardinality,却用了terms aggregation + size:0这样的方式; 对大结果集做深度分页;一次性拉取全量数据等等。
 
对高cardinality字段做terms aggregation
所谓高cardinality,就是该字段的唯一值比较多。 比如client ip,可能存在上千万甚至上亿的不同值。 对这种类型的字段做terms aggregation时,需要在内存里生成海量的分桶,内存需求会非常高。如果内部再嵌套有其他聚合,情况会更糟糕。  在做日志聚合分析时,一个典型的可以引起性能问题的场景,就是对带有参数的url字段做terms aggregation。 对于访问量大的网站,带有参数的url字段cardinality可能会到数亿,做一次terms aggregation内存开销巨大,然而对带有参数的url字段做聚合通常没有什么意义。 对于这类问题,可以额外索引一个url_stem字段,这个字段索引剥离掉参数部分的url。可以极大降低内存消耗,提高聚合速度。


小结:
  1. 倒排词典的索引需要常驻内存,无法GC,需要监控data node上segment memory增长趋势。
  2. 各类缓存,field cache, filter cache, indexing cache, bulk queue等等,要设置合理的大小,并且要应该根据最坏的情况来看heap是否够用,也就是各类缓存全部占满的时候,还有heap空间可以分配给其他任务吗?避免采用clear cache等“自欺欺人”的方式来释放内存。
  3. 避免返回大量结果集的搜索与聚合。确实需要大量拉取数据的场景,可以采用scan & scroll api来实现。
  4. cluster stats驻留内存并无法水平扩展,超大规模集群可以考虑分拆成多个集群通过tribe node连接。
  5. 想知道heap够不够,必须结合实际应用场景,并对集群的heap使用情况做持续的监控。
  6. 根据监控数据理解内存需求,合理配置各类circuit breaker,将内存溢出风险降低到最低。

收起阅读 »

Day18: 程序内的消息流:ArrayBlockingQueue和zeromq对比

在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.

在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.

但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.

zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.

在我自己的电脑上测试,2.6 GHz Intel Core i5.  一个主线程生成10,000,000个随机数, 分发给四个线程消费.

用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.

不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率.
继续阅读 »
在logstash内部, input到filter, 以及filter到output, 消息都是通过一个队列来中转.

在我写hangout的第一个版本,也是这么做的,用ArrayBlockingQueue来中转消息, 上游几个线程把消息放在queue中, 下游再几个线程把queue中的消息消费走.

但是, 用下来之后, 发现在queue上面消耗的资源是相当的大,strace查看,非常大量的lock相关的系统调用, 现在的版本已经把queue去掉了. 想必Logstash也会有大量资源用在这一块.

zeromq中的Parallel Pipeline正好适合这个场景,而且文档中说是lock free的, 拿来和queue对比一下看.

在我自己的电脑上测试,2.6 GHz Intel Core i5.  一个主线程生成10,000,000个随机数, 分发给四个线程消费.

用Queue来实现, 需要约37秒, CPU使用率在150%. 用zeromq的ipc来传递消息, 只需要22秒, 期间CPU使用率在250%. 总的CPU使用时间都60秒左右.

不知道java中还有没有更合适的Queue可以用在这个场景中.至少zeromq和ArrayBlockingQueue相比, zeromq可以更快的处理消息, 但代价就是更高的CPU使用率. 收起阅读 »

Day17: "奇怪"的搜索

代@childe 发文。

除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}

插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}

但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索
{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?
## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?
是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:
{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?
上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
 
继续阅读 »
代@childe 发文。

除了应用在日志系统外, 越来越多的业务数据也接入ES, 利用它天生强大的搜索性能和分布式可扩展, 可以为业务的精确快速灵活的搜索提供极大便利, 我觉得这是未来一个很好的方向.
但是, 对它ES各种各样的搜索方式, 你了解了吗?
我们来看几个”奇怪”的搜索.
## 奇怪的打分
### 奇怪的打分1
我们有个数据结构是
{
“first_name”:”string”,
“last_name”:”string”
}

插入了几条数据, 有诸葛亮 诸葛明 诸葛暗 诸葛黑, 还有个人名字很奇怪, 叫司马诸葛.
然后我们要搜索诸葛瑾, 虽然索引里面没有一个人叫这个名字, 但搜索出来诸葛亮也不错, 他们名字这么像, 说不定是亲兄弟, 可以顺藤摸瓜, 找到我们需要的信息呢.
{
"query": {
"multi_match": {
"query": "诸葛瑜",
"type": "most_fields",
"fields": [ “*_name” ]
}
}
}

但实际上呢, 司马诸葛这个人居然稳居搜索榜首位, 他是搞竞价排名了吧? 你知道其中的打分原理吗?
### 奇怪的打分2
我们有两条数据:
PUT /my_index/my_type/1
{
"title": "Quick brown rabbits",
"body": "Brown rabbits are commonly seen."
}
PUT /my_index/my_type/2
{
"title": "Keeping pets healthy",
"body": "My quick brown fox eats rabbits on a regular basis."
}
要搜索
{
"query": {
"bool": {
"should": [
{ "match": { "title": "Brown fox" }},
{ "match": { "body": "Brown fox" }}
]
}
}
}
第二条文档里面明确含有”brown fox”这个词组, 但是它的搜索得分比较低, 你知道为啥吗?
## and用在哪
{
"query": {
"multi_match": {
"query": "peter smith",
"type": "most_fields",
"operator": "and",
"fields": [ "first_name", "last_name" ]
}
}
}
你知道这个and代表什么吗?
是说
A: 姓和名里面都要含有"peter smith”,
还是说
B: 姓或者名里面要包含peter以及smith ?
还有, 怎么才能获得另外一个效果呢?
# 列表中的元素
我们有一条数据如下(按汉语分词)
{
“时代”:”三国”,
“姓名”: [“大司马”,“诸葛亮”]
}
我以词组的方式搜索:
{
"query": {
"match_phrase": {
"姓名": "司马诸葛"
}
}
}
能搜索到吗?
上面这些其实都是[elasticsearch Definitive Guide](https://www.elastic.co/guide)里面的几个小例子, 欢迎大家继续去那里寻找答案和其他各种小技巧.
  收起阅读 »

day16 logstash-forwader To Kakfa!

看到前一天, Medcl 介绍了Beat, 我想今天我就介绍一下算是同一个领域的, 我们的一个小产品吧, 同样也基于elastic旗下的logstash-forwarder. 我真的不是来打广告的, 就是第一次写, 没经验, 看着前一天的文章, 顺手就想到了.

在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.

logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.

目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~

贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}

再简单介绍一下参数吧,
  • DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
  • “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
  • path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.


grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]

以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.

其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.

现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下.
继续阅读 »
看到前一天, Medcl 介绍了Beat, 我想今天我就介绍一下算是同一个领域的, 我们的一个小产品吧, 同样也基于elastic旗下的logstash-forwarder. 我真的不是来打广告的, 就是第一次写, 没经验, 看着前一天的文章, 顺手就想到了.

在日志收集系统中, 从kafkf到ES这条路是没问题了, 但散布在各个服务器上采集日志的agent用logstash实在是太重了, 而且效率也低. 特别是我们有大量的windows服务器, 找一个合适的agent居然不是想象中的容易.

logstash-forwarder对于日志文件的探测和offset记录, deadtime等配置都非常适合我们, 但惟一不支持吐数据到kafak,对我们来说是一个遗憾. 我和oliver(https://github.com/oliveagle)做过一点改造之后, 让她支持了这个功能.

目前我们所有iis服务器已经部署了这个应用, 效率高, 占资源小, 可以数据压缩, 支持简单的格式切割, 实乃windows居家必备(我真不是来打广告的). golang客户端, 还能直接发送到kafka, 想想就很贴心~

贴上一段配置瞅瞅先, 启一个进程采集nginx和tomcat日志, 分别吐到kafka的2个topic中.
{
"files": [
{
"paths": [
"/var/log/nginx/*.log"
],
"Fields":{
"type":"nginx"
},
"DeadTime": "30m"
},
{
"paths": [
"/var/log/tomcat/*.log",
"/var/log/tomcat/*/*.log"
],
"Fields":{
"type":"tomcat"
},
"DeadTime": "30m"
}
],
"kafka": {
"broker_list": ["10.0.0.1:9092","10.0.0.2:9092"],
"topic_id": "topic_name_change_it_{{.type}}",
"compression_codec": "gzip"
}
}

再简单介绍一下参数吧,
  • DeadTime:30m 是说超过30分钟没有更新, 就不会再继续跟踪这个文件了(退出goroutine)
  • “Fields”:{ “type”:”tomcat” } , 会在每条日志中增加配置的字段
  • path目前就是用的golang官方库, 好像是还不支持递归多层目录查找, 反正我翻了一下文档, 没有找到.


grok还不支持, 但简单的分割是可以的
"files": [
{
"paths": [
"d:\\target.txt"
],
"FieldNames": ["datetime", "datetime", "s_ip", "cs_method", "cs_uri_stem", "cs_uri_query", "s_port", "time_taken"],
"Delimiter": "\\s+",
"QuoteChar": "\""
}
]

以上配置就是说按空白符把日志切割来, 塞到对应的字段中去. 第一个第二个合在一起, 放在datetime字段中.

其实还是有不少要完善的地方, 比如说没有带上机器的Hostname, 以及日志的路径. 在很多时候, 这些信息还是很有用的, 我们也会继续完善.

现在放在了https://github.com/childe/logs ... kafka, 有需要的同学,可以去看下. 收起阅读 »