elasticsearch

elasticsearch

请教elasticsearch里面如何查询 字段A=xxx and (字段B<>xxx or 字段C=xxx ) 这种语法

Elasticsearchwuzx 回复了问题 • 3 人关注 • 3 个回复 • 34 次浏览 • 10 小时前 • 来自相关话题

elasticsearch 文档数量不一致

Elasticsearchlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 46 次浏览 • 12 小时前 • 来自相关话题

ElasticSearch 创建索引

Elasticsearchlunatictwo 回复了问题 • 4 人关注 • 2 个回复 • 72 次浏览 • 15 小时前 • 来自相关话题

产品筛选项互相影响问题

回复

Elasticsearchiwtbafp 发起了问题 • 1 人关注 • 0 个回复 • 29 次浏览 • 20 小时前 • 来自相关话题

ElasticSearch 5.x版本中删除过期文档

Elasticsearchshiyuan 回复了问题 • 2 人关注 • 1 个回复 • 53 次浏览 • 1 天前 • 来自相关话题

elasticsearchsearch2.2.x版本,termQuery、PrefixQuery查询没有数据返回

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 255 次浏览 • 2 天前 • 来自相关话题

关于elasticsearch的merge操作

Elasticsearchhelloes 回复了问题 • 3 人关注 • 2 个回复 • 59 次浏览 • 3 天前 • 来自相关话题

es 2.4.4 时间字段range会出现不在该范围内的数据

回复

Elasticsearchysslover 发起了问题 • 1 人关注 • 0 个回复 • 42 次浏览 • 4 天前 • 来自相关话题

es 2.4.4 时间字段range会出现不在该范围内的数据

回复

Elasticsearchysslover 发起了问题 • 1 人关注 • 0 个回复 • 28 次浏览 • 4 天前 • 来自相关话题

es集群所有机器需要安装相同的插件

Elasticsearchlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 72 次浏览 • 4 天前 • 来自相关话题

请教elasticsearch里面如何查询 字段A=xxx and (字段B<>xxx or 字段C=xxx ) 这种语法

回复

Elasticsearchwuzx 回复了问题 • 3 人关注 • 3 个回复 • 34 次浏览 • 10 小时前 • 来自相关话题

elasticsearch 文档数量不一致

回复

Elasticsearchlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 46 次浏览 • 12 小时前 • 来自相关话题

ElasticSearch 创建索引

回复

Elasticsearchlunatictwo 回复了问题 • 4 人关注 • 2 个回复 • 72 次浏览 • 15 小时前 • 来自相关话题

产品筛选项互相影响问题

回复

Elasticsearchiwtbafp 发起了问题 • 1 人关注 • 0 个回复 • 29 次浏览 • 20 小时前 • 来自相关话题

ElasticSearch 5.x版本中删除过期文档

回复

Elasticsearchshiyuan 回复了问题 • 2 人关注 • 1 个回复 • 53 次浏览 • 1 天前 • 来自相关话题

elasticsearchsearch2.2.x版本,termQuery、PrefixQuery查询没有数据返回

回复

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 255 次浏览 • 2 天前 • 来自相关话题

关于elasticsearch的merge操作

回复

Elasticsearchhelloes 回复了问题 • 3 人关注 • 2 个回复 • 59 次浏览 • 3 天前 • 来自相关话题

es 2.4.4 时间字段range会出现不在该范围内的数据

回复

Elasticsearchysslover 发起了问题 • 1 人关注 • 0 个回复 • 42 次浏览 • 4 天前 • 来自相关话题

es 2.4.4 时间字段range会出现不在该范围内的数据

回复

Elasticsearchysslover 发起了问题 • 1 人关注 • 0 个回复 • 28 次浏览 • 4 天前 • 来自相关话题

es集群所有机器需要安装相同的插件

回复

Elasticsearchlaoyang360 回复了问题 • 2 人关注 • 1 个回复 • 72 次浏览 • 4 天前 • 来自相关话题

Elasticsearch Java API 索引的增删改查(二)

Elasticsearchquanke 发表了文章 • 0 个评论 • 126 次浏览 • 5 天前 • 来自相关话题

本节介绍以下 CRUD API:

 单文档  APIs

多文档 APIs

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";
实例
/**  
 * 手动生成JSON  
 */  
@Test  
public void CreateJSON(){  
      
    String json = "{" +  
            "\"user\":\"fendo\"," +  
            "\"postDate\":\"2013-01-30\"," +  
            "\"message\":\"Hell word\"" +  
        "}";  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  Map方式

Map是key:value数据类型,可以代表json结构.

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
 /**  
 * 使用集合  
 */  
@Test  
public void CreateList(){  
      
    Map<String, Object> json = new HashMap<String, Object>();  
    json.put("user","kimchy");  
    json.put("postDate","2013-01-30");  
    json.put("message","trying out Elasticsearch");  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/**  
 * 使用JACKSON序列化  
 * @throws Exception  
 */  
@Test  
public void CreateJACKSON() throws Exception{  
      
    CsdnBlog csdn=new CsdnBlog();  
    csdn.setAuthor("fendo");  
    csdn.setContent("这是JAVA书籍");  
    csdn.setTag("C");  
    csdn.setView("100");  
    csdn.setTitile("编程");  
    csdn.setDate(new Date().toString());  
      
    // instance a json mapper  
    ObjectMapper mapper = new ObjectMapper(); // create once, reuse  

    // generate json  
    byte[] json = mapper.writeValueAsBytes(csdn);  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
}  

  XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
/**  
 * 使用ElasticSearch 帮助类  
 * @throws IOException   
 */  
@Test  
public void CreateXContentBuilder() throws IOException{  
      
    XContentBuilder builder = XContentFactory.jsonBuilder()  
            .startObject()  
                .field("user", "ccse")  
                .field("postDate", new Date())  
                .field("message", "this is Elasticsearch")  
            .endObject();  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
    System.out.println("创建成功!");  
      
      
}  

综合实例

 
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.elasticsearch.action.index.IndexResponse;  
import org.elasticsearch.client.transport.TransportClient;  
import org.elasticsearch.common.settings.Settings;  
import org.elasticsearch.common.transport.InetSocketTransportAddress;  
import org.elasticsearch.common.xcontent.XContentBuilder;  
import org.elasticsearch.common.xcontent.XContentFactory;  
import org.elasticsearch.transport.client.PreBuiltTransportClient;  
import org.junit.Before;  
import org.junit.Test;  
  
import com.fasterxml.jackson.core.JsonProcessingException;  
import com.fasterxml.jackson.databind.ObjectMapper;  
  
public class CreateIndex {  
  
    private TransportClient client;  
      
    @Before  
    public void getClient() throws Exception{  
        //设置集群名称  
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名  
        //创建client  
        client  = new PreBuiltTransportClient(settings)  
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));  
    }  
      
    /**  
     * 手动生成JSON  
     */  
    @Test  
    public void CreateJSON(){  
          
        String json = "{" +  
                "\"user\":\"fendo\"," +  
                "\"postDate\":\"2013-01-30\"," +  
                "\"message\":\"Hell word\"" +  
            "}";  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
      
    /**  
     * 使用集合  
     */  
    @Test  
    public void CreateList(){  
          
        Map<String, Object> json = new HashMap<String, Object>();  
        json.put("user","kimchy");  
        json.put("postDate","2013-01-30");  
        json.put("message","trying out Elasticsearch");  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
    /**  
     * 使用JACKSON序列化  
     * @throws Exception  
     */  
    @Test  
    public void CreateJACKSON() throws Exception{  
          
        CsdnBlog csdn=new CsdnBlog();  
        csdn.setAuthor("fendo");  
        csdn.setContent("这是JAVA书籍");  
        csdn.setTag("C");  
        csdn.setView("100");  
        csdn.setTitile("编程");  
        csdn.setDate(new Date().toString());  
          
        // instance a json mapper  
        ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
  
        // generate json  
        byte[] json = mapper.writeValueAsBytes(csdn);  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
    }  
      
    /**  
     * 使用ElasticSearch 帮助类  
     * @throws IOException   
     */  
    @Test  
    public void CreateXContentBuilder() throws IOException{  
          
        XContentBuilder builder = XContentFactory.jsonBuilder()  
                .startObject()  
                    .field("user", "ccse")  
                    .field("postDate", new Date())  
                    .field("message", "this is Elasticsearch")  
                .endObject();  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
        System.out.println("创建成功!");  
          
          
    }  
      
}  

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete API

根据ID删除:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete By Query API

通过查询条件删除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
        .source("persons") //index(索引名)
        .get();  //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))      //查询            
    .source("persons")                //index(索引名)                                    
    .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听     
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();   //删除文档的数量                 
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE 
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()   //合并到现有文档
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Update by script

使用脚本更新文档 

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

如果不存在,会插入新的文档:

{
    "name" : "Joe Smith",
    "gender": "male"
}

Multi Get API

一次获取多个文档

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1") //一个id的方式
    .add("twitter", "tweet", "2", "3", "4") //多个id的方式
    .add("another", "type", "foo")  //可以从另外一个索引获取
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {      //判断是否存在                
        String json = response.getSourceAsString(); //_source 字段
    }
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
    //处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  //增加elasticsearch客户端
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } //调用失败抛 Throwable
        })
        .setBulkActions(10000) //每次10000请求
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
        .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
        .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
        .build();

BulkProcessor 默认设置

  • bulkActions  1000 
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

 

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

Elasticsearchquanke 发表了文章 • 0 个评论 • 92 次浏览 • 5 天前 • 来自相关话题

Elasticsearch Java API 客户端连接

一个是TransportClient,一个是NodeClient,还有一个XPackTransportClient

  • TransportClient:

作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。

  • NodeClient

作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的。

  • XPackTransportClient:

服务安装了 x-pack 插件

重要:客户端版本应该和服务端版本保持一致

TransportClient旨在被Java高级REST客户端取代,该客户端执行HTTP请求而不是序列化的Java请求。 在即将到来的Elasticsearch版本中将不赞成使用TransportClient,建议使用Java高级REST客户端。

上面的警告比较尴尬,但是在 5xx版本中使用还是没有问题的,可能使用rest 客户端兼容性更好做一些。

Elasticsearch Java Rest API 手册

Maven Repository

Elasticsearch Java API包已经上传到 Maven Central

pom.xml文件中增加:

transport 版本号最好就是与Elasticsearch版本号一致。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.6.3</version>
</dependency>

Transport Client

不设置集群名称

// on startup

//此步骤添加IP,至少一个,如果设置了"client.transport.sniff"= true 一个就够了,因为添加了自动嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

// on shutdown  关闭client

client.close();

设置集群名称

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();  //设置ES实例的名称
TransportClient client = new PreBuiltTransportClient(settings);  //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
//Add transport addresses and do something with the client...

增加自动嗅探配置

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

其他配置

client.transport.ignore_cluster_name  //设置 true ,忽略连接节点集群名验证
client.transport.ping_timeout       //ping一个节点的响应时间 默认5秒
client.transport.nodes_sampler_interval //sample/ping 节点的时间间隔,默认是5s

对于ES Client,有两种形式,一个是TransportClient,一个是NodeClient。两个的区别为: TransportClient作为一个外部访问者,通过HTTP去请求ES的集群,对于集群而言,它是一个外部因素。 NodeClient顾名思义,是作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的,不像TransportClient那样,ES集群对它一无所知。NodeClient通信的性能会更好,但是因为是ES的一环,所以它出问题,也会给ES集群带来问题。NodeClient可以设置不作为数据节点,在elasticsearch.yml中设置,这样就不会在此节点上分配数据。

如果用ES的节点,仁者见仁智者见智。

实例

package name.quanke.es.study;

import name.quanke.es.study.util.Utils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;

import java.net.InetAddress;

/**
 * Elasticsearch 5.5.1 的client 和 ElasticsearchTemplate的初始化
 * 作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {

        Settings esSettings = Settings.builder()
                .put("cluster.name", "utan-es") //设置ES实例的名称
                .put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
                .build();

        /**
         * 这里的连接方式指的是没有安装x-pack插件,如果安装了x-pack则参考{@link ElasticsearchXPackClient}
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        client = new PreBuiltTransportClient(esSettings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));

        System.out.println("ElasticsearchClient 连接成功");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }

}

本实例代码已经上传到 Git ElasticsearchClient.java

所有实例 已经上传到Git

XPackTransportClient

如果 ElasticSearch 服务安装了 x-pack 插件,需要PreBuiltXPackTransportClient实例才能访问

使用Maven管理项目,把下面代码增加到pom.xml;

一定要修改默认仓库地址为https://artifacts.elastic.co/maven ,因为这个库没有上传到Maven中央仓库,如果有自己的 maven ,请配置代理

<project ...>

   <repositories>
      <!-- add the elasticsearch repo -->
      <repository>
         <id>elasticsearch-releases</id>
         <url>https://artifacts.elastic.co/maven</url>
         <releases>
            <enabled>true</enabled>
         </releases>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
      ...
   </repositories>
   ...

   <dependencies>
      <!-- add the x-pack jar as a dependency -->
      <dependency>
         <groupId>org.elasticsearch.client</groupId>
         <artifactId>x-pack-transport</artifactId>
         <version>5.6.3</version>
      </dependency>
      ...
   </dependencies>
   ...

 </project>

实例


/**
 * Elasticsearch XPack Client
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchXPackClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {
        /**
         * 如果es集群安装了x-pack插件则以此种方式连接集群
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        Settings settings = Settings.builder()
                .put("xpack.security.user", "elastic:utan100")
                .put("cluster.name", "utan-es")
                .build();
        client = new PreBuiltXPackTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
//        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//        credentialsProvider.setCredentials(AuthScope.ANY,
//                new UsernamePasswordCredentials("elastic", "utan100"));

        System.out.println("ElasticsearchXPackClient 启动成功");
    }

    @Test
    public void testClientConnection() throws Exception {

        System.out.println("--------------------------");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }
}

本实例代码已经上传到 Git ElasticsearchXPackClient.java

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

Elasticsearch 5.6 Java API 中文手册

Elasticsearchquanke 发表了文章 • 1 个评论 • 759 次浏览 • 2017-11-08 22:30 • 来自相关话题

432952-5448f57c503678f5.jpg
 [Elasticsearch 5.6 Java API 中文手册] 本手册由 全科 翻译,并且整理成电子书,支持PDF,ePub,Mobi格式,方便大家下载阅读。 不只是官方文档的翻译,还包含使用实例,包含我们使用踩过的坑 阅读地址:https://es.quanke.name 下载地址:https://www.gitbook.com/book/q ... -java github地址:https://github.com/quanke/elasticsearch-java 编辑:http://quanke.name 编辑整理辛苦,还望大神们点一下star ,抚平我虚荣的心 [全科的公众号]
08183543_ysUa.jpg
 
432952-5448f57c503678f5.jpg
 [Elasticsearch 5.6 Java API 中文手册] 本手册由 全科 翻译,并且整理成电子书,支持PDF,ePub,Mobi格式,方便大家下载阅读。 不只是官方文档的翻译,还包含使用实例,包含我们使用踩过的坑 阅读地址:https://es.quanke.name 下载地址:https://www.gitbook.com/book/q ... -java github地址:https://github.com/quanke/elasticsearch-java 编辑:http://quanke.name 编辑整理辛苦,还望大神们点一下star ,抚平我虚荣的心 [全科的公众号]
08183543_ysUa.jpg
 

Bulk异常引发的Elasticsearch内存泄漏

Elasticsearchkennywu76 发表了文章 • 5 个评论 • 222 次浏览 • 2017-11-08 18:54 • 来自相关话题

原文链接: http://www.jianshu.com/p/d4f7a6d58008

前天公司度假部门一个线上ElasticSearch集群发出报警,有Data Node的Heap使用量持续超过80%警戒线。 收到报警邮件后,不敢怠慢,立即登陆监控系统查看集群状态。还好,所有的结点都在正常服务,只是有2个结点的Heap使用率非常高。此时,Old GC一直在持续的触发,却无法回收内存。

Heap Used %


初步排查

问题结点的Heap分配了30GB,80%的使用率约等于24GB。 但集群的数据总量并不大,5个结点所有索引文件加起来占用的磁盘空间还不到10GB。

GET /_cat/allocation?v&h=shards,disk.indices,disk.used,disk.avail

shards disk.indices disk.used disk.avail
     3        1.9gb    38.3gb     89.7gb
     4        2.2gb    13.4gb    114.6gb
     4        2.5gb    20.3gb    107.7gb
     4        2.3gb    33.9gb     94.1gb
     3        1.7gb    12.8gb    115.2gb

查看各结点的segment memory和cache占用量也都非常小,是MB级别的。

GET /_cat/nodes?v&h=id,port,v,m,fdp,mc,mcs,sc,sm,qcm,fm,im,siwm,svmm

id   port v     m fdp mc     mcs sc     sm     qcm      fm siwm svmm
e1LV 9300 5.3.2 -   1  0      0b 68   69mb   1.5mb   1.9mb   0b 499b
5VnU 9300 5.3.2 -   1  0      0b 75   79mb   1.5mb   1.9mb   0b 622b
_Iob 9300 5.3.2 -   1  0      0b 56 55.7mb   1.3mb 914.1kb   0b 499b
4Kyl 9300 5.3.2 *   1  1 330.1mb 81 84.4mb   1.2mb   1.9mb   0b 622b
XEP_ 9300 5.3.2 -   1  0      0b 45 50.4mb 748.5kb     1mb   0b 622b

集群的QPS只有30上下,CPU消耗10%都不到,各类thread pool的活动线程数量也都非常低。 Bulk & Search Thread Pool

非常费解是什么东西占着20多GB的内存不释放?

出现问题的集群ES版本是5.3.2,而这个版本的稳定性在公司内部已经经过长时间的考验,做为稳定版本在线上进行了大规模部署。 其他一些读写负载非常高的集群也未曾出现过类似的状况,看来是遇到新问题了。

查看问题结点ES的日志,除了看到一些Bulk异常以外,未见特别明显的其他和资源相关的错误:

[2017-11-06T16:33:15,668][DEBUG][o.e.a.b.TransportShardBulkAction] [] [suggest-3][0] failed to execute bulk item (update) BulkShardRequest [[suggest-3][0]] containing [44204
] requests
org.elasticsearch.index.engine.DocumentMissingException: [type][纳格尔果德_1198]: document missing
        at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:92) ~[elasticsearch-5.3.2.jar:5.3.2]
        at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:81) ~[elasticsearch-5.3.2.jar:5.3.2]

和用户确认这些异常的原因,是因为写入程序会从数据源拿到数据后,根据doc_id对ES里的数据做update。会有部分doc_id在ES里不存在的情况,但并不影响业务逻辑,因而ES记录的document missing异常应该可以忽略。

至此别无他法,只能对JVM做Dump分析了。


Heap Dump分析

用的工具是Eclipse MAT,从这里下载的Mac版:Downloads 。 使用这个工具需要经过以下2个步骤:

  • 获取二进制的head dump文件 jmap -dump:format=b,file=/tmp/es_heap.bin <pid> 其中pid是ES JAVA进程的进程号。
  • 将生成的dump文件下载到本地开发机器,启动MAT,从其GUI打开文件。

要注意,MAT本身也是JAVA应用,需要有JDK运行环境的支持。

MAT第一次打dump文件的时候,需要对其解析,生成多个索引。这个过程比较消耗CPU和内存,但一旦完成,之后再打开dump文件就很快,消耗很低。 对于这种20多GB的大文件,第一次解析的过程会非常缓慢,并且很可能因为开发机内存的较少而内存溢出。因此,我找了台大内存的服务器来做第一次的解析工作:

  • 将linux版的MAT拷贝上去,解压缩后,修改配置文件MemoryAnalyzer.ini,将内存设置为20GB左右:

    $ cat MemoryAnalyzer.ini 
    
      -startup
      plugins/org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
      --launcher.library
      plugins/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.1.300.v20150602-1417
      -vmargs
      -Xmx20240m

    这样能保证解析的过程中不会内存溢出。

  • 将dump文件拷贝上去,执行下面几个命令生成索引及3个分析报告:
    • mat/ParseHeapDump.sh es_heap.bin org.eclipse.mat.api:suspects
    • mat/ParseHeapDump.sh es_heap.bin org.eclipse.mat.api:overview
    • mat/ParseHeapDump.sh es_heap.bin org.eclipse.mat.api:top_components

分析成功以后,会生成如下一堆索引文件(.index)和分析报告(.zip)

-rw-r--r--@ 1 xgwu  staff    62M Nov  6 16:18 es_heap.a2s.index
-rw-r--r--@ 1 xgwu  staff    25G Nov  6 14:59 es_heap.bin
-rw-r--r--@ 1 xgwu  staff    90M Nov  6 16:21 es_heap.domIn.index
-rw-r--r--@ 1 xgwu  staff   271M Nov  6 16:21 es_heap.domOut.index
-rw-r--r--  1 xgwu  staff   144K Nov  7 18:38 es_heap.i2sv2.index
-rw-r--r--@ 1 xgwu  staff   220M Nov  6 16:18 es_heap.idx.index
-rw-r--r--@ 1 xgwu  staff   356M Nov  6 16:20 es_heap.inbound.index
-rw-r--r--@ 1 xgwu  staff   6.8M Nov  6 16:20 es_heap.index
-rw-r--r--@ 1 xgwu  staff    76M Nov  6 16:18 es_heap.o2c.index
-rw-r--r--@ 1 xgwu  staff   231M Nov  6 16:20 es_heap.o2hprof.index
-rw-r--r--@ 1 xgwu  staff   206M Nov  6 16:21 es_heap.o2ret.index
-rw-r--r--@ 1 xgwu  staff   353M Nov  6 16:20 es_heap.outbound.index
-rw-r--r--@ 1 xgwu  staff   399K Nov  6 16:16 es_heap.threads
-rw-r--r--@ 1 xgwu  staff    89K Nov  7 17:40 es_heap_Leak_Suspects.zip
-rw-r--r--@ 1 xgwu  staff    78K Nov  6 19:22 es_heap_System_Overview.zip
-rw-r--r--@ 1 xgwu  staff   205K Nov  6 19:22 es_heap_Top_Components.zip
drwxr-xr-x@ 3 xgwu  staff    96B Nov  6 16:15 workspace

将这些文件打包下载到本地机器上,用MAT GUI打开就可以分析了。

在MAT里打开dump文件的时候,可以选择打开已经生成好的报告,比如Leak suspects: 选择打开leak Suspects报告

通过Leak Suspects,一眼看到这20多GB内存主要是被一堆bulk线程实例占用了,每个实例则占用了接近1.5GB的内存。 Leak Suspects

进入"dominator_tree"面板,按照"Retained Heap"排序,可以看到多个bulk线程的内存占用都非常高。 Dominator Tree

将其中一个thread的引用链条展开,看看这些线程是如何Retain这么多内存的,特别注意红圈部分: 对象引用链

这个引用关系解读如下:

  1. 这个bulk线程的thread local map里保存了一个log4j的MultableLogEvent对象。
  2. MutablelogEvent对象引用了log4j的ParameterizedMessage对象。
  3. ParameterizedMessage引用了bulkShardRequest对象。
  4. bulkShardRequest引用了4万多个BulkitemRequest对象。

这样看下来,似乎是log4j的logevent对一个大的bulk请求对象有强引用而导致其无法被垃圾回收掉,产生内存泄漏。

联想到ES日志里,有记录一些document missing的bulk异常,猜测是否在记录这些异常的时候产生的泄漏。


问题复现

为了验证猜测,我在本地开发机上,启动了一个单结点的5.3.2测试集群,用bulk api做批量的update,并且有意为其中1个update请求设置不存在的doc_id。为了便于测试,我在ES的配置文件elasticsearch.yml里添加了配置项processors: 1。 这个配置项影响集群thread_pool的配置,bulk thread pool的大小将减少为1个,这样可以更快速和便捷的做各类验证。

启动集群,发送完bulk请求后,立即做一个dump,重复之前的分析过程,问题得到了复现。 这时候想,是否其他bulk异常也会引起同样的问题,比如写入的数据和mapping不匹配? 测试了一下,问题果然还是会产生。再用不同的bulk size进行测试,发现无法回收的这段内存大小,取决于最后一次抛过异常的bulk size大小。至此,基本可以确定内存泄漏与log4j记录异常消息的逻辑有关系。

为了搞清楚这个问题是否5.3.2独有,后续版本是否有修复,在最新的5.6.3上做了同样的测试,问题依旧,因此这应该是一个还未发现的深层Bug.


读源码查根源

大致搞清楚问题查找的方向了,但根源还未找到,也就不知道如何修复和避免,只有去扒源码了。 在TransportShardBulkAction 第209行,找到了ES日志里抛异常的代码片段。

 if (isConflictException(failure)) {
     logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
             request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
 } else {
     logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
             request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
 }

这里看到了ParameterizedMessage实例化过程中,request做为一个参数传入了。这里的request是一个BulkShardRequest对象,保存的是要写入到一个shard的一批bulk item request。 这样以来,一个批次写入的请求数量越多,这个对象retain的内存就越多。 可问题是,为什么logger.debug()调用完毕以后,这个引用不会被释放?

通过和之前MAT上的dominator tree仔细对比,可以看到ParameterizedMessage之所以无法释放,是因为被一个MutableLogEvent在引用,而这个MutableLogEvent被做为一个thread local存放起来了。 由于ES的Bulk thread pool是fix size的,也就是预先创建好,不会销毁和再创建。 那么这些MutableLogEvent对象由于是thread local的,只要线程没有销毁,就会对该线程实例一直全局存在,并且其还会一直引用最后一次处理过的ParameterizedMessage。 所以在ES记录bulk exception这种比较大的请求情况下, 整个request对象会被thread local变量一直强引用无法释放,产生大量的内存泄漏。

再继续挖一下log4j的源码,发现MutableLogEvent是在org.apache.logging.log4j.core.impl.ReusableLogEventFactory里做为thread local创建的。

public class ReusableLogEventFactory implements LogEventFactory {
    private static final ThreadNameCachingStrategy THREAD_NAME_CACHING_STRATEGY = ThreadNameCachingStrategy.create();
    private static final Clock CLOCK = ClockFactory.getClock();

    private static ThreadLocal<MutableLogEvent> mutableLogEventThreadLocal = new ThreadLocal<>();

org.apache.logging.log4j.core.config.LoggerConfig则根据一个常数ENABLE_THREADLOCALS的值来决定用哪个LogEventFactory。

        if (LOG_EVENT_FACTORY == null) {
            LOG_EVENT_FACTORY = Constants.ENABLE_THREADLOCALS
                    ? new ReusableLogEventFactory()
                    : new DefaultLogEventFactory();
        }

继续深挖,在org.apache.logging.log4j.util.Constants里看到,log4j会根据运行环境判断是否是WEB应用,如果不是,就从系统参数log4j2.enable.threadlocals读取这个常量,如果没有设置,则默认值是true

public static final boolean ENABLE_THREADLOCALS = !IS_WEB_APP && PropertiesUtil.getProperties().getBooleanProperty(
            "log4j2.enable.threadlocals", true);

由于ES不是一个web应用,导致log4j选择使用了ReusableLogEventFactory,因而使用了thread_local来创建MutableLogEvent对象,最终在ES记录bulk exception这个特殊场景下产生非常显著的内存泄漏。

再问一个问题,为何log4j要将logevent做为thread local创建? 跑到log4j的官网去扒了一下文档,在这里 Garbage-free Steady State Logging 找到了合理的解释。 原来为了减少记录日志过程中的反复创建的对象数量,减轻GC压力从而提高性能,log4j有很多地方使用了thread_local来重用变量。 但使用thread local字段装载非JDK类,可能会产生内存泄漏问题,特别是对于web应用。 因此才会在启动的时候判断运行环境,对于web应用会禁用thread local类型的变量。

ThreadLocal fields holding non-JDK classes can cause memory leaks in web applications when the application server's thread pool continues to reference these fields after the web application is undeployed. To avoid causing memory leaks, Log4j will not use these ThreadLocals when it detects that it is used in a web application (when the javax.servlet.Servlet class is in the classpath, or when system property log4j2.is.webapp is set to "true").

参考上面的文档后,也为ES找到了规避这个问题的措施: 在ES的JVM配置文件jvm.options里,添加一个log4j的系统变量-Dlog4j2.enable.threadlocals=false,禁用掉thread local即可。 经过测试,该选项可以有效避开这个内存泄漏问题。

这个问题Github上也提交了Issue,对应的链接是: Memory leak upon partial TransportShardBulkAction failure


写在最后

ES的确是非常复杂的一个系统,包含非常多的模块和第三方组件,可以支持很多想象不到的用例场景,但一些边缘场景可能会引发一些难以排查的问题。完备的监控体系和一个经验丰富的支撑团队对于提升业务开发人员使用ES开发的效率、提升业务的稳定性是非常重要的!  

【京东商城】ES高级工程师

求职招聘whh32 发表了文章 • 1 个评论 • 367 次浏览 • 2017-11-07 13:53 • 来自相关话题

工作地点:北京 薪资待遇:25k ~ 40k 工作内容: 1、开发、维护ES及相应管理后台 2、ElasticSearch集群的配置管理及优化 3、个性化功能及插件开发。 职位要求: 1、本科以上学历,4年以上工作经验。 2、精通Java,熟悉各种中间件技术及常用框架。 3、熟悉Elasticsearch,有相应开发维护经验者优先。 京东正大力推进Elasticsearch的使用场景,目前已有数千个实例,每日新增数据百T,日查询量千亿级别,技术氛围好,发展潜力大。欢迎您的加入~ 欢迎投递简历至:wanghanghang@jd.com  
工作地点:北京 薪资待遇:25k ~ 40k 工作内容: 1、开发、维护ES及相应管理后台 2、ElasticSearch集群的配置管理及优化 3、个性化功能及插件开发。 职位要求: 1、本科以上学历,4年以上工作经验。 2、精通Java,熟悉各种中间件技术及常用框架。 3、熟悉Elasticsearch,有相应开发维护经验者优先。 京东正大力推进Elasticsearch的使用场景,目前已有数千个实例,每日新增数据百T,日查询量千亿级别,技术氛围好,发展潜力大。欢迎您的加入~ 欢迎投递简历至:wanghanghang@jd.com  

ElasticSearch 集群监控

Elasticsearchzhisheng 发表了文章 • 2 个评论 • 144 次浏览 • 2017-11-07 00:41 • 来自相关话题

原文地址:http://www.54tianzhisheng.cn/2017/10/15/ElasticSearch-cluster-health-metrics/  

最近在做 ElasticSearch 的信息(集群和节点)监控,特此稍微整理下学到的东西。这篇文章主要介绍集群的监控。

要监控哪些 ElasticSearch metrics

Elasticsearch 提供了大量的 Metric,可以帮助您检测到问题的迹象,在遇到节点不可用、out-of-memory、long garbage collection times 的时候采取相应措施。但是指标太多了,有时我们并不需要这么多,这就需要我们进行筛选。

集群健康

一个 Elasticsearch 集群至少包括一个节点和一个索引。或者它 可能有一百个数据节点、三个单独的主节点,以及一小打客户端节点——这些共同操作一千个索引(以及上万个分片)。

不管集群扩展到多大规模,你都会想要一个快速获取集群状态的途径。Cluster Health API 充当的就是这个角色。你可以把它想象成是在一万英尺的高度鸟瞰集群。它可以告诉你安心吧一切都好,或者警告你集群某个地方有问题。

让我们执行一下 cluster-health API 然后看看响应体是什么样子的:

GET _cluster/health

和 Elasticsearch 里其他 API 一样,cluster-health 会返回一个 JSON 响应。这对自动化和告警系统来说,非常便于解析。响应中包含了和你集群有关的一些关键信息:

{
   "cluster_name": "elasticsearch_zach",
   "status": "green",
   "timed_out": false,
   "number_of_nodes": 1,
   "number_of_data_nodes": 1,
   "active_primary_shards": 10,
   "active_shards": 10,
   "relocating_shards": 0,
   "initializing_shards": 0,
   "unassigned_shards": 0
}

响应信息中最重要的一块就是 status 字段。状态可能是下列三个值之一 :

status 含义
green 所有的主分片和副本分片都已分配。你的集群是 100% 可用的。
yellow 所有的主分片已经分片了,但至少还有一个副本是缺失的。不会有数据丢失,所以搜索结果依然是完整的。不过,你的高可用性在某种程度上被弱化。如果 更多的 分片消失,你就会丢数据了。把 yellow 想象成一个需要及时调查的警告。
red 至少一个主分片(以及它的全部副本)都在缺失中。这意味着你在缺少数据:搜索只能返回部分数据,而分配到这个分片上的写入请求会返回一个异常。
  • number_of_nodesnumber_of_data_nodes 这个命名完全是自描述的。
  • active_primary_shards 指出你集群中的主分片数量。这是涵盖了所有索引的汇总值。
  • active_shards 是涵盖了所有索引的所有分片的汇总值,即包括副本分片。
  • relocating_shards 显示当前正在从一个节点迁往其他节点的分片的数量。通常来说应该是 0,不过在 Elasticsearch 发现集群不太均衡时,该值会上涨。比如说:添加了一个新节点,或者下线了一个节点。
  • initializing_shards 是刚刚创建的分片的个数。比如,当你刚创建第一个索引,分片都会短暂的处于 initializing 状态。这通常会是一个临时事件,分片不应该长期停留在 initializing状态。你还可能在节点刚重启的时候看到 initializing 分片:当分片从磁盘上加载后,它们会从initializing 状态开始。
  • unassigned_shards 是已经在集群状态中存在的分片,但是实际在集群里又找不着。通常未分配分片的来源是未分配的副本。比如,一个有 5 分片和 1 副本的索引,在单节点集群上,就会有 5 个未分配副本分片。如果你的集群是 red 状态,也会长期保有未分配分片(因为缺少主分片)。

集群统计

集群统计信息包含 集群的分片数,文档数,存储空间,缓存信息,内存作用率,插件内容,文件系统内容,JVM 作用状况,系统 CPU,OS 信息,段信息。

查看全部统计信息命令:

curl -XGET 'http://localhost:9200/_cluster/stats?human&pretty'

返回 JSON 结果:

{
   "timestamp": 1459427693515,
   "cluster_name": "elasticsearch",
   "status": "green",
   "indices": {
      "count": 2,
      "shards": {
         "total": 10,
         "primaries": 10,
         "replication": 0,
         "index": {
            "shards": {
               "min": 5,
               "max": 5,
               "avg": 5
            },
            "primaries": {
               "min": 5,
               "max": 5,
               "avg": 5
            },
            "replication": {
               "min": 0,
               "max": 0,
               "avg": 0
            }
         }
      },
      "docs": {
         "count": 10,
         "deleted": 0
      },
      "store": {
         "size": "16.2kb",
         "size_in_bytes": 16684,
         "throttle_time": "0s",
         "throttle_time_in_millis": 0
      },
      "fielddata": {
         "memory_size": "0b",
         "memory_size_in_bytes": 0,
         "evictions": 0
      },
      "query_cache": {
         "memory_size": "0b",
         "memory_size_in_bytes": 0,
         "total_count": 0,
         "hit_count": 0,
         "miss_count": 0,
         "cache_size": 0,
         "cache_count": 0,
         "evictions": 0
      },
      "completion": {
         "size": "0b",
         "size_in_bytes": 0
      },
      "segments": {
         "count": 4,
         "memory": "8.6kb",
         "memory_in_bytes": 8898,
         "terms_memory": "6.3kb",
         "terms_memory_in_bytes": 6522,
         "stored_fields_memory": "1.2kb",
         "stored_fields_memory_in_bytes": 1248,
         "term_vectors_memory": "0b",
         "term_vectors_memory_in_bytes": 0,
         "norms_memory": "384b",
         "norms_memory_in_bytes": 384,
         "doc_values_memory": "744b",
         "doc_values_memory_in_bytes": 744,
         "index_writer_memory": "0b",
         "index_writer_memory_in_bytes": 0,
         "version_map_memory": "0b",
         "version_map_memory_in_bytes": 0,
         "fixed_bit_set": "0b",
         "fixed_bit_set_memory_in_bytes": 0,
         "file_sizes": {}
      },
      "percolator": {
         "num_queries": 0
      }
   },
   "nodes": {
      "count": {
         "total": 1,
         "data": 1,
         "coordinating_only": 0,
         "master": 1,
         "ingest": 1
      },
      "versions": [
         "5.6.3"
      ],
      "os": {
         "available_processors": 8,
         "allocated_processors": 8,
         "names": [
            {
               "name": "Mac OS X",
               "count": 1
            }
         ],
         "mem" : {
            "total" : "16gb",
            "total_in_bytes" : 17179869184,
            "free" : "78.1mb",
            "free_in_bytes" : 81960960,
            "used" : "15.9gb",
            "used_in_bytes" : 17097908224,
            "free_percent" : 0,
            "used_percent" : 100
         }
      },
      "process": {
         "cpu": {
            "percent": 9
         },
         "open_file_descriptors": {
            "min": 268,
            "max": 268,
            "avg": 268
         }
      },
      "jvm": {
         "max_uptime": "13.7s",
         "max_uptime_in_millis": 13737,
         "versions": [
            {
               "version": "1.8.0_74",
               "vm_name": "Java HotSpot(TM) 64-Bit Server VM",
               "vm_version": "25.74-b02",
               "vm_vendor": "Oracle Corporation",
               "count": 1
            }
         ],
         "mem": {
            "heap_used": "57.5mb",
            "heap_used_in_bytes": 60312664,
            "heap_max": "989.8mb",
            "heap_max_in_bytes": 1037959168
         },
         "threads": 90
      },
      "fs": {
         "total": "200.6gb",
         "total_in_bytes": 215429193728,
         "free": "32.6gb",
         "free_in_bytes": 35064553472,
         "available": "32.4gb",
         "available_in_bytes": 34802409472
      },
      "plugins": [
        {
          "name": "analysis-icu",
          "version": "5.6.3",
          "description": "The ICU Analysis plugin integrates Lucene ICU module into elasticsearch, adding ICU relates analysis components.",
          "classname": "org.elasticsearch.plugin.analysis.icu.AnalysisICUPlugin",
          "has_native_controller": false
        },
        {
          "name": "ingest-geoip",
          "version": "5.6.3",
          "description": "Ingest processor that uses looksup geo data based on ip adresses using the Maxmind geo database",
          "classname": "org.elasticsearch.ingest.geoip.IngestGeoIpPlugin",
          "has_native_controller": false
        },
        {
          "name": "ingest-user-agent",
          "version": "5.6.3",
          "description": "Ingest processor that extracts information from a user agent",
          "classname": "org.elasticsearch.ingest.useragent.IngestUserAgentPlugin",
          "has_native_controller": false
        }
      ]
   }
}

内存使用和 GC 指标

在运行 Elasticsearch 时,内存是您要密切监控的关键资源之一。 Elasticsearch 和 Lucene 以两种方式利用节点上的所有可用 RAM:JVM heap 和文件系统缓存。 Elasticsearch 运行在Java虚拟机(JVM)中,这意味着JVM垃圾回收的持续时间和频率将成为其他重要的监控领域。

上面返回的 JSON监控的指标有我个人觉得有这些:

  • nodes.successful
  • nodes.failed
  • nodes.total
  • nodes.mem.used_percent
  • nodes.process.cpu.percent
  • nodes.jvm.mem.heap_used

可以看到 JSON 文件是很复杂的,如果从这复杂的 JSON 中获取到对应的指标(key)的值呢,这里请看文章 :JsonPath —— JSON 解析神器

最后

这里主要讲下 ES 集群的一些监控信息,有些监控指标是个人觉得需要监控的,但是具体情况还是得看需求了。下篇文章主要讲节点的监控信息。转载请注明地址:http://www.54tianzhisheng.cn/2017/10/15/ElasticSearch-cluster-health-metrics/

参考资料

1、How to monitor Elasticsearch performance

2、ElasticSearch 性能监控

3、cluster-health

4、cluster-stats

相关阅读

1、Elasticsearch 默认分词器和中分分词器之间的比较及使用方法

2、全文搜索引擎 Elasticsearch 集群搭建入门教程

ElasticSearch 单个节点监控

Elasticsearchzhisheng 发表了文章 • 1 个评论 • 91 次浏览 • 2017-11-07 00:39 • 来自相关话题

原文地址:http://www.54tianzhisheng.cn/2017/10/18/ElasticSearch-nodes-metrics/  

集群健康监控是对集群信息进行高度的概括,节点统计值 API 提供了集群中每个节点的统计值。节点统计值很多,在监控的时候仍需要我们清楚哪些指标是最值得关注的。

集群健康监控可以参考这篇文章:ElasticSearch 集群监控

节点信息   Node Info :

curl -XGET 'http://localhost:9200/_nodes'

执行上述命令可以获取所有 node 的信息

_nodes: {
  total: 2,
  successful: 2,
  failed: 0
},
cluster_name: "elasticsearch",
nodes: {
    MSQ_CZ7mTNyOSlYIfrvHag: {
    name: "node0",
    transport_address: "192.168.180.110:9300",
    host: "192.168.180.110",
    ip: "192.168.180.110",
    version: "5.5.0",
    build_hash: "260387d",
    total_indexing_buffer: 103887667,
    roles:{...},
    settings: {...},
    os: {
      refresh_interval_in_millis: 1000,
      name: "Linux",
      arch: "amd64",
      version: "3.10.0-229.el7.x86_64",
      available_processors: 4,
      allocated_processors: 4
    },
    process: {
      refresh_interval_in_millis: 1000,
      id: 3022,
      mlockall: false
    },
    jvm: {
      pid: 3022,
      version: "1.8.0_121",
      vm_name: "Java HotSpot(TM) 64-Bit Server VM",
      vm_version: "25.121-b13",
      vm_vendor: "Oracle Corporation",
      start_time_in_millis: 1507515225302,
      mem: {
      heap_init_in_bytes: 1073741824,
      heap_max_in_bytes: 1038876672,
      non_heap_init_in_bytes: 2555904,
      non_heap_max_in_bytes: 0,
      direct_max_in_bytes: 1038876672
      },
      gc_collectors: [],
      memory_pools: [],
      using_compressed_ordinary_object_pointers: "true",
      input_arguments:{}
    }
    thread_pool:{
      force_merge: {},
      fetch_shard_started: {},
      listener: {},
      index: {},
      refresh: {},
      generic: {},
      warmer: {},
      search: {},
      flush: {},
      fetch_shard_store: {},
      management: {},
      get: {},
      bulk: {},
      snapshot: {}
    }
    transport: {...},
    http: {...},
    plugins: [],
    modules: [],
    ingest: {...}
 }

上面是我已经简写了很多数据之后的返回值,但是指标还是很多,有些是一些常规的指标,对于监控来说,没必要拿取。从上面我们可以主要关注以下这些指标:

os, process, jvm, thread_pool, transport, http, ingest and indices

节点统计     nodes-statistics

节点统计值 API 可通过如下命令获取:

GET /_nodes/stats

得到:

_nodes: {
  total: 2,
  successful: 2,
  failed: 0
},
cluster_name: "elasticsearch",
nodes: {
  MSQ_CZ7mTNyOSlYI0yvHag: {
    timestamp: 1508312932354,
    name: "node0",
    transport_address: "192.168.180.110:9300",
    host: "192.168.180.110",
    ip: "192.168.180.110:9300",
    roles: [],
    indices: {
      docs: {
           count: 6163666,
           deleted: 0
        },
      store: {
           size_in_bytes: 2301398179,
           throttle_time_in_millis: 122850
        },
      indexing: {},
      get: {},
      search: {},
      merges: {},
      refresh: {},
      flush: {},
      warmer: {},
      query_cache: {},
      fielddata: {},
      completion: {},
      segments: {},
      translog: {},
      request_cache: {},
      recovery: {}
  },
  os: {
    timestamp: 1508312932369,
    cpu: {
      percent: 0,
      load_average: {
        1m: 0.09,
        5m: 0.12,
        15m: 0.08
      }
    },
    mem: {
      total_in_bytes: 8358301696,
      free_in_bytes: 1381613568,
      used_in_bytes: 6976688128,
      free_percent: 17,
      used_percent: 83
    },
    swap: {
      total_in_bytes: 8455712768,
      free_in_bytes: 8455299072,
      used_in_bytes: 413696
    },
    cgroup: {
      cpuacct: {},
      cpu: {
        control_group: "/user.slice",
        cfs_period_micros: 100000,
        cfs_quota_micros: -1,
        stat: {}
      }
  }
},
process: {
  timestamp: 1508312932369,
  open_file_descriptors: 228,
  max_file_descriptors: 65536,
  cpu: {
    percent: 0,
    total_in_millis: 2495040
  },
  mem: {
    total_virtual_in_bytes: 5002465280
  }
},
jvm: {
  timestamp: 1508312932369,
  uptime_in_millis: 797735804,
  mem: {
    heap_used_in_bytes: 318233768,
    heap_used_percent: 30,
    heap_committed_in_bytes: 1038876672,
    heap_max_in_bytes: 1038876672,
    non_heap_used_in_bytes: 102379784,
    non_heap_committed_in_bytes: 108773376,
  pools: {
    young: {
      used_in_bytes: 62375176,
      max_in_bytes: 279183360,
      peak_used_in_bytes: 279183360,
      peak_max_in_bytes: 279183360
    },
    survivor: {
      used_in_bytes: 175384,
      max_in_bytes: 34865152,
      peak_used_in_bytes: 34865152,
      peak_max_in_bytes: 34865152
    },
    old: {
      used_in_bytes: 255683208,
      max_in_bytes: 724828160,
      peak_used_in_bytes: 255683208,
      peak_max_in_bytes: 724828160
    }
  }
  },
  threads: {},
  gc: {},
  buffer_pools: {},
  classes: {}
},
  thread_pool: {
    bulk: {},
    fetch_shard_started: {},
    fetch_shard_store: {},
    flush: {},
    force_merge: {},
    generic: {},
    get: {},
    index: {
       threads: 1,
       queue: 0,
       active: 0,
       rejected: 0,
       largest: 1,
       completed: 1
    }
    listener: {},
    management: {},
    refresh: {},
    search: {},
    snapshot: {},
    warmer: {}
  },
  fs: {},
  transport: {
    server_open: 13,
    rx_count: 11696,
    rx_size_in_bytes: 1525774,
    tx_count: 10282,
    tx_size_in_bytes: 1440101928
  },
  http: {
    current_open: 4,
    total_opened: 23
  },
  breakers: {},
  script: {},
  discovery: {},
  ingest: {}
}

节点名是一个 UUID,上面列举了很多指标,下面讲解下:

索引部分 indices

这部分列出了这个节点上所有索引的聚合过的统计值 :

  • docs 展示节点内存有多少文档,包括还没有从段里清除的已删除文档数量。

  • store 部分显示节点耗用了多少物理存储。这个指标包括主分片和副本分片在内。如果限流时间很大,那可能表明你的磁盘限流设置得过低。

  • indexing 显示已经索引了多少文档。这个值是一个累加计数器。在文档被删除的时候,数值不会下降。还要注意的是,在发生内部 索引操作的时候,这个值也会增加,比如说文档更新。

  还列出了索引操作耗费的时间,正在索引的文档数量,以及删除操作的类似统计值。

  • get 显示通过 ID 获取文档的接口相关的统计值。包括对单个文档的 GETHEAD 请求。

  • search 描述在活跃中的搜索( open_contexts )数量、查询的总数量、以及自节点启动以来在查询上消耗的总时间。用 query_time_in_millis / query_total 计算的比值,可以用来粗略的评价你的查询有多高效。比值越大,每个查询花费的时间越多,你应该要考虑调优了。

  fetch 统计值展示了查询处理的后一半流程(query-then-fetch 里的 fetch )。如果 fetch 耗时比 query 还多,说明磁盘较慢,或者获取了太多文档,或者可能搜索请求设置了太大的分页(比如, size: 10000 )。

  • merges 包括了 Lucene 段合并相关的信息。它会告诉你目前在运行几个合并,合并涉及的文档数量,正在合并的段的总大小,以及在合并操作上消耗的总时间。

  • filter_cache 展示了已缓存的过滤器位集合所用的内存数量,以及过滤器被驱逐出内存的次数。过多的驱逐数 可能 说明你需要加大过滤器缓存的大小,或者你的过滤器不太适合缓存(比如它们因为高基数而在大量产生,就像是缓存一个 now 时间表达式)。

  不过,驱逐数是一个很难评定的指标。过滤器是在每个段的基础上缓存的,而从一个小的段里驱逐过滤器,代价比从一个大的段里要廉价的多。有可能你有很大的驱逐数,但是它们都发生在小段上,也就意味着这些对查询性能只有很小的影响。

  把驱逐数指标作为一个粗略的参考。如果你看到数字很大,检查一下你的过滤器,确保他们都是正常缓存的。不断驱逐着的过滤器,哪怕都发生在很小的段上,效果也比正确缓存住了的过滤器差很多。

  • field_data 显示 fielddata 使用的内存, 用以聚合、排序等等。这里也有一个驱逐计数。和 filter_cache 不同的是,这里的驱逐计数是很有用的:这个数应该或者至少是接近于 0。因为 fielddata 不是缓存,任何驱逐都消耗巨大,应该避免掉。如果你在这里看到驱逐数,你需要重新评估你的内存情况,fielddata 限制,请求语句,或者这三者。

  • segments 会展示这个节点目前正在服务中的 Lucene 段的数量。 这是一个重要的数字。大多数索引会有大概 50–150 个段,哪怕它们存有 TB 级别的数十亿条文档。段数量过大表明合并出现了问题(比如,合并速度跟不上段的创建)。注意这个统计值是节点上所有索引的汇聚总数。记住这点。

  memory 统计值展示了 Lucene 段自己用掉的内存大小。 这里包括底层数据结构,比如倒排表,字典,和布隆过滤器等。太大的段数量会增加这些数据结构带来的开销,这个内存使用量就是一个方便用来衡量开销的度量值。

操作系统和进程部分

OSProcess 部分基本是自描述的,不会在细节中展开讲解。它们列出来基础的资源统计值,比如 CPU 和负载。OS 部分描述了整个操作系统,而 Process 部分只显示 Elasticsearch 的 JVM 进程使用的资源情况。

这些都是非常有用的指标,不过通常在你的监控技术栈里已经都测量好了。统计值包括下面这些:

  • CPU
  • 负载
  • 内存使用率 (mem.used_percent)
  • Swap 使用率
  • 打开的文件描述符 (open_file_descriptors)

JVM 部分

jvm 部分包括了运行 Elasticsearch 的 JVM 进程一些很关键的信息。 最重要的,它包括了垃圾回收的细节,这对你的 Elasticsearch 集群的稳定性有着重大影响。

jvm: {
  timestamp: 1508312932369,
  uptime_in_millis: 797735804,
  mem: {
    heap_used_in_bytes: 318233768,
    heap_used_percent: 30,
    heap_committed_in_bytes: 1038876672,
    heap_max_in_bytes: 1038876672,
    non_heap_used_in_bytes: 102379784,
    non_heap_committed_in_bytes: 108773376,
  }
}

jvm 部分首先列出一些和 heap 内存使用有关的常见统计值。你可以看到有多少 heap 被使用了,多少被指派了(当前被分配给进程的),以及 heap 被允许分配的最大值。理想情况下,heap_committed_in_bytes 应该等于 heap_max_in_bytes 。如果指派的大小更小,JVM 最终会被迫调整 heap 大小——这是一个非常昂贵的操作。如果你的数字不相等,阅读 堆内存:大小和交换 学习如何正确的配置它。

heap_used_percent 指标是值得关注的一个数字。Elasticsearch 被配置为当 heap 达到 75% 的时候开始 GC。如果你的节点一直 >= 75%,你的节点正处于 内存压力 状态。这是个危险信号,不远的未来可能就有慢 GC 要出现了。

如果 heap 使用率一直 >=85%,你就麻烦了。Heap 在 90–95% 之间,则面临可怕的性能风险,此时最好的情况是长达 10–30s 的 GC,最差的情况就是内存溢出(OOM)异常。

线程池部分

Elasticsearch 在内部维护了线程池。 这些线程池相互协作完成任务,有必要的话相互间还会传递任务。通常来说,你不需要配置或者调优线程池,不过查看它们的统计值有时候还是有用的,可以洞察你的集群表现如何。

每个线程池会列出已配置的线程数量( threads ),当前在处理任务的线程数量( active ),以及在队列中等待处理的任务单元数量( queue )。

如果队列中任务单元数达到了极限,新的任务单元会开始被拒绝,你会在 rejected 统计值上看到它反映出来。这通常是你的集群在某些资源上碰到瓶颈的信号。因为队列满意味着你的节点或集群在用最高速度运行,但依然跟不上工作的蜂拥而入。

这里的一系列的线程池,大多数你可以忽略,但是有一小部分还是值得关注的:

  • indexing    普通的索引请求的线程池
  • bulk    批量请求,和单条的索引请求不同的线程池
  • get     Get-by-ID 操作
  • search    所有的搜索和查询请求
  • merging   专用于管理 Lucene 合并的线程池

网络部分

  • transport 显示和 传输地址 相关的一些基础统计值。包括节点间的通信(通常是 9300 端口)以及任意传输客户端或者节点客户端的连接。如果看到这里有很多连接数不要担心;Elasticsearch 在节点之间维护了大量的连接。
  • http 显示 HTTP 端口(通常是 9200)的统计值。如果你看到 total_opened 数很大而且还在一直上涨,这是一个明确信号,说明你的 HTTP 客户端里有没启用 keep-alive 长连接的。持续的 keep-alive 长连接对性能很重要,因为连接、断开套接字是很昂贵的(而且浪费文件描述符)。请确认你的客户端都配置正确。

参考资料

1、nodes-info

2、nodes-stats

3、ES监控指标

最后:

转载请注明地址:http://www.54tianzhisheng.cn/2017/10/18/ElasticSearch-nodes-metrics/

[分享]Kibana,Elasticsearch 指令速查

资料分享medcl 发表了文章 • 3 个评论 • 307 次浏览 • 2017-10-31 14:10 • 来自相关话题

Elasticsearch_CheatSheet.jpg
Kibana_CheatSheet.jpg
 
Elasticsearch_CheatSheet.jpg
Kibana_CheatSheet.jpg
 

一个仿Linux 控制台的ES的_cat的插件

开源项目psfu 发表了文章 • 3 个评论 • 166 次浏览 • 2017-10-16 10:31 • 来自相关话题

00-console.png
  • 简化_cat使用,可以直接输入 cat 命令 ,可以滚动查看历史结果
  • 支持字体放大缩小
  • 支持命令历史记录(通过上下方向键来切换 )
  • 支持鼠标划取的复制粘贴(暂不复制到剪贴板)
  • 安装后在 http://127.0.0.1:9200/_console  使用,也可本地使用:直接访问html文件
  GIT 地址   欢迎加 576037940 这个群讨论哈  
00-console.png
  • 简化_cat使用,可以直接输入 cat 命令 ,可以滚动查看历史结果
  • 支持字体放大缩小
  • 支持命令历史记录(通过上下方向键来切换 )
  • 支持鼠标划取的复制粘贴(暂不复制到剪贴板)
  • 安装后在 http://127.0.0.1:9200/_console  使用,也可本地使用:直接访问html文件
  GIT 地址   欢迎加 576037940 这个群讨论哈