java api

java api

求java api这种查询

Elasticsearchrochy 回复了问题 • 2 人关注 • 1 个回复 • 109 次浏览 • 2018-11-23 17:10 • 来自相关话题

javaAPI组合查询时如何实现“OR”的逻辑,用should不行,should是可满足可不满足,我需要实现OR,求指导,谢谢!

Elasticsearchstrglee 回复了问题 • 2 人关注 • 1 个回复 • 178 次浏览 • 2018-10-10 19:32 • 来自相关话题

java.lang.ClassNotFoundException: org.elasticsearch.script.ScriptEngine

Elasticsearchmedcl 回复了问题 • 2 人关注 • 2 个回复 • 1409 次浏览 • 2018-08-03 18:18 • 来自相关话题

java连接es 创建客户端

Elasticsearchzqc0512 回复了问题 • 2 人关注 • 1 个回复 • 323 次浏览 • 2018-07-10 12:30 • 来自相关话题

查询返回结果和size不匹配的问题

回复

Elasticsearchcode4j 回复了问题 • 2 人关注 • 1 个回复 • 287 次浏览 • 2018-07-06 13:08 • 来自相关话题

javaapi中嵌套性实体存储到es报错:IllegalArgumentException

Elasticsearch陈将军 回复了问题 • 2 人关注 • 1 个回复 • 1315 次浏览 • 2018-05-24 17:02 • 来自相关话题

求帮助将dsl翻译为java的API

Elasticsearchlc1993929 回复了问题 • 2 人关注 • 2 个回复 • 489 次浏览 • 2018-03-02 09:51 • 来自相关话题

Java API [6.1] 查询 setSource顺序不同得到结果不同

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 709 次浏览 • 2018-02-10 17:11 • 来自相关话题

ES5模板查询Java API如何控制返回字段

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 1222 次浏览 • 2018-01-19 13:24 • 来自相关话题

has_child java API找不到

Elasticsearchshiyuan 回复了问题 • 4 人关注 • 3 个回复 • 830 次浏览 • 2018-01-08 10:28 • 来自相关话题

Elasticsearch+java demo

Elasticsearchkingfs 回复了问题 • 3 人关注 • 4 个回复 • 796 次浏览 • 2017-12-21 10:57 • 来自相关话题

使用JavaAPI插入1000w数据,控制台报了OOM异常

回复

Elasticsearchshabusi 发起了问题 • 1 人关注 • 0 个回复 • 501 次浏览 • 2017-12-15 18:07 • 来自相关话题

想问下,这么多客户端到底啥区别,用哪个呢?

Elasticsearchziyou 回复了问题 • 4 人关注 • 2 个回复 • 747 次浏览 • 2017-11-29 17:31 • 来自相关话题

Elasticsearch Java API 索引的增删改查(二)

Elasticsearchquanke 发表了文章 • 1 个评论 • 3538 次浏览 • 2017-11-16 09:53 • 来自相关话题

本节介绍以下 CRUD API:

 单文档  APIs

多文档 APIs

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";
实例
/**  
 * 手动生成JSON  
 */  
@Test  
public void CreateJSON(){  
      
    String json = "{" +  
            "\"user\":\"fendo\"," +  
            "\"postDate\":\"2013-01-30\"," +  
            "\"message\":\"Hell word\"" +  
        "}";  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  Map方式

Map是key:value数据类型,可以代表json结构.

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
 /**  
 * 使用集合  
 */  
@Test  
public void CreateList(){  
      
    Map<String, Object> json = new HashMap<String, Object>();  
    json.put("user","kimchy");  
    json.put("postDate","2013-01-30");  
    json.put("message","trying out Elasticsearch");  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/**  
 * 使用JACKSON序列化  
 * @throws Exception  
 */  
@Test  
public void CreateJACKSON() throws Exception{  
      
    CsdnBlog csdn=new CsdnBlog();  
    csdn.setAuthor("fendo");  
    csdn.setContent("这是JAVA书籍");  
    csdn.setTag("C");  
    csdn.setView("100");  
    csdn.setTitile("编程");  
    csdn.setDate(new Date().toString());  
      
    // instance a json mapper  
    ObjectMapper mapper = new ObjectMapper(); // create once, reuse  

    // generate json  
    byte[] json = mapper.writeValueAsBytes(csdn);  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
}  

  XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
/**  
 * 使用ElasticSearch 帮助类  
 * @throws IOException   
 */  
@Test  
public void CreateXContentBuilder() throws IOException{  
      
    XContentBuilder builder = XContentFactory.jsonBuilder()  
            .startObject()  
                .field("user", "ccse")  
                .field("postDate", new Date())  
                .field("message", "this is Elasticsearch")  
            .endObject();  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
    System.out.println("创建成功!");  
      
      
}  

综合实例

 
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.elasticsearch.action.index.IndexResponse;  
import org.elasticsearch.client.transport.TransportClient;  
import org.elasticsearch.common.settings.Settings;  
import org.elasticsearch.common.transport.InetSocketTransportAddress;  
import org.elasticsearch.common.xcontent.XContentBuilder;  
import org.elasticsearch.common.xcontent.XContentFactory;  
import org.elasticsearch.transport.client.PreBuiltTransportClient;  
import org.junit.Before;  
import org.junit.Test;  
  
import com.fasterxml.jackson.core.JsonProcessingException;  
import com.fasterxml.jackson.databind.ObjectMapper;  
  
public class CreateIndex {  
  
    private TransportClient client;  
      
    @Before  
    public void getClient() throws Exception{  
        //设置集群名称  
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名  
        //创建client  
        client  = new PreBuiltTransportClient(settings)  
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));  
    }  
      
    /**  
     * 手动生成JSON  
     */  
    @Test  
    public void CreateJSON(){  
          
        String json = "{" +  
                "\"user\":\"fendo\"," +  
                "\"postDate\":\"2013-01-30\"," +  
                "\"message\":\"Hell word\"" +  
            "}";  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
      
    /**  
     * 使用集合  
     */  
    @Test  
    public void CreateList(){  
          
        Map<String, Object> json = new HashMap<String, Object>();  
        json.put("user","kimchy");  
        json.put("postDate","2013-01-30");  
        json.put("message","trying out Elasticsearch");  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
    /**  
     * 使用JACKSON序列化  
     * @throws Exception  
     */  
    @Test  
    public void CreateJACKSON() throws Exception{  
          
        CsdnBlog csdn=new CsdnBlog();  
        csdn.setAuthor("fendo");  
        csdn.setContent("这是JAVA书籍");  
        csdn.setTag("C");  
        csdn.setView("100");  
        csdn.setTitile("编程");  
        csdn.setDate(new Date().toString());  
          
        // instance a json mapper  
        ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
  
        // generate json  
        byte[] json = mapper.writeValueAsBytes(csdn);  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
    }  
      
    /**  
     * 使用ElasticSearch 帮助类  
     * @throws IOException   
     */  
    @Test  
    public void CreateXContentBuilder() throws IOException{  
          
        XContentBuilder builder = XContentFactory.jsonBuilder()  
                .startObject()  
                    .field("user", "ccse")  
                    .field("postDate", new Date())  
                    .field("message", "this is Elasticsearch")  
                .endObject();  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
        System.out.println("创建成功!");  
          
          
    }  
      
}  

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete API

根据ID删除:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete By Query API

通过查询条件删除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
        .source("persons") //index(索引名)
        .get();  //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))      //查询            
    .source("persons")                //index(索引名)                                    
    .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听     
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();   //删除文档的数量                 
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE 
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()   //合并到现有文档
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Update by script

使用脚本更新文档 

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

如果不存在,会插入新的文档:

{
    "name" : "Joe Smith",
    "gender": "male"
}

Multi Get API

一次获取多个文档

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1") //一个id的方式
    .add("twitter", "tweet", "2", "3", "4") //多个id的方式
    .add("another", "type", "foo")  //可以从另外一个索引获取
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {      //判断是否存在                
        String json = response.getSourceAsString(); //_source 字段
    }
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
    //处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  //增加elasticsearch客户端
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } //调用失败抛 Throwable
        })
        .setBulkActions(10000) //每次10000请求
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
        .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
        .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
        .build();

BulkProcessor 默认设置

  • bulkActions  1000 
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

 

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

Elasticsearchquanke 发表了文章 • 0 个评论 • 12021 次浏览 • 2017-11-16 09:52 • 来自相关话题

Elasticsearch Java API 客户端连接

一个是TransportClient,一个是NodeClient,还有一个XPackTransportClient

  • TransportClient:

作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。

  • NodeClient

作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的。

  • XPackTransportClient:

服务安装了 x-pack 插件

重要:客户端版本应该和服务端版本保持一致

TransportClient旨在被Java高级REST客户端取代,该客户端执行HTTP请求而不是序列化的Java请求。 在即将到来的Elasticsearch版本中将不赞成使用TransportClient,建议使用Java高级REST客户端。

上面的警告比较尴尬,但是在 5xx版本中使用还是没有问题的,可能使用rest 客户端兼容性更好做一些。

Elasticsearch Java Rest API 手册

Maven Repository

Elasticsearch Java API包已经上传到 Maven Central

pom.xml文件中增加:

transport 版本号最好就是与Elasticsearch版本号一致。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.6.3</version>
</dependency>

Transport Client

不设置集群名称

// on startup

//此步骤添加IP,至少一个,如果设置了"client.transport.sniff"= true 一个就够了,因为添加了自动嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

// on shutdown  关闭client

client.close();

设置集群名称

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();  //设置ES实例的名称
TransportClient client = new PreBuiltTransportClient(settings);  //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
//Add transport addresses and do something with the client...

增加自动嗅探配置

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

其他配置

client.transport.ignore_cluster_name  //设置 true ,忽略连接节点集群名验证
client.transport.ping_timeout       //ping一个节点的响应时间 默认5秒
client.transport.nodes_sampler_interval //sample/ping 节点的时间间隔,默认是5s

对于ES Client,有两种形式,一个是TransportClient,一个是NodeClient。两个的区别为: TransportClient作为一个外部访问者,通过HTTP去请求ES的集群,对于集群而言,它是一个外部因素。 NodeClient顾名思义,是作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的,不像TransportClient那样,ES集群对它一无所知。NodeClient通信的性能会更好,但是因为是ES的一环,所以它出问题,也会给ES集群带来问题。NodeClient可以设置不作为数据节点,在elasticsearch.yml中设置,这样就不会在此节点上分配数据。

如果用ES的节点,仁者见仁智者见智。

实例

package name.quanke.es.study;

import name.quanke.es.study.util.Utils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;

import java.net.InetAddress;

/**
 * Elasticsearch 5.5.1 的client 和 ElasticsearchTemplate的初始化
 * 作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {

        Settings esSettings = Settings.builder()
                .put("cluster.name", "utan-es") //设置ES实例的名称
                .put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
                .build();

        /**
         * 这里的连接方式指的是没有安装x-pack插件,如果安装了x-pack则参考{@link ElasticsearchXPackClient}
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        client = new PreBuiltTransportClient(esSettings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));

        System.out.println("ElasticsearchClient 连接成功");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }

}

本实例代码已经上传到 Git ElasticsearchClient.java

所有实例 已经上传到Git

XPackTransportClient

如果 ElasticSearch 服务安装了 x-pack 插件,需要PreBuiltXPackTransportClient实例才能访问

使用Maven管理项目,把下面代码增加到pom.xml;

一定要修改默认仓库地址为https://artifacts.elastic.co/maven ,因为这个库没有上传到Maven中央仓库,如果有自己的 maven ,请配置代理

<project ...>

   <repositories>
      <!-- add the elasticsearch repo -->
      <repository>
         <id>elasticsearch-releases</id>
         <url>https://artifacts.elastic.co/maven</url>
         <releases>
            <enabled>true</enabled>
         </releases>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
      ...
   </repositories>
   ...

   <dependencies>
      <!-- add the x-pack jar as a dependency -->
      <dependency>
         <groupId>org.elasticsearch.client</groupId>
         <artifactId>x-pack-transport</artifactId>
         <version>5.6.3</version>
      </dependency>
      ...
   </dependencies>
   ...

 </project>

实例


/**
 * Elasticsearch XPack Client
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchXPackClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {
        /**
         * 如果es集群安装了x-pack插件则以此种方式连接集群
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        Settings settings = Settings.builder()
                .put("xpack.security.user", "elastic:utan100")
                .put("cluster.name", "utan-es")
                .build();
        client = new PreBuiltXPackTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
//        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//        credentialsProvider.setCredentials(AuthScope.ANY,
//                new UsernamePasswordCredentials("elastic", "utan100"));

        System.out.println("ElasticsearchXPackClient 启动成功");
    }

    @Test
    public void testClientConnection() throws Exception {

        System.out.println("--------------------------");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }
}

本实例代码已经上传到 Git ElasticsearchXPackClient.java

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

求java api这种查询

回复

Elasticsearchrochy 回复了问题 • 2 人关注 • 1 个回复 • 109 次浏览 • 2018-11-23 17:10 • 来自相关话题

javaAPI组合查询时如何实现“OR”的逻辑,用should不行,should是可满足可不满足,我需要实现OR,求指导,谢谢!

回复

Elasticsearchstrglee 回复了问题 • 2 人关注 • 1 个回复 • 178 次浏览 • 2018-10-10 19:32 • 来自相关话题

java.lang.ClassNotFoundException: org.elasticsearch.script.ScriptEngine

回复

Elasticsearchmedcl 回复了问题 • 2 人关注 • 2 个回复 • 1409 次浏览 • 2018-08-03 18:18 • 来自相关话题

java连接es 创建客户端

回复

Elasticsearchzqc0512 回复了问题 • 2 人关注 • 1 个回复 • 323 次浏览 • 2018-07-10 12:30 • 来自相关话题

查询返回结果和size不匹配的问题

回复

Elasticsearchcode4j 回复了问题 • 2 人关注 • 1 个回复 • 287 次浏览 • 2018-07-06 13:08 • 来自相关话题

javaapi中嵌套性实体存储到es报错:IllegalArgumentException

回复

Elasticsearch陈将军 回复了问题 • 2 人关注 • 1 个回复 • 1315 次浏览 • 2018-05-24 17:02 • 来自相关话题

求帮助将dsl翻译为java的API

回复

Elasticsearchlc1993929 回复了问题 • 2 人关注 • 2 个回复 • 489 次浏览 • 2018-03-02 09:51 • 来自相关话题

Java API [6.1] 查询 setSource顺序不同得到结果不同

回复

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 709 次浏览 • 2018-02-10 17:11 • 来自相关话题

ES5模板查询Java API如何控制返回字段

回复

Elasticsearchmedcl 回复了问题 • 2 人关注 • 1 个回复 • 1222 次浏览 • 2018-01-19 13:24 • 来自相关话题

has_child java API找不到

回复

Elasticsearchshiyuan 回复了问题 • 4 人关注 • 3 个回复 • 830 次浏览 • 2018-01-08 10:28 • 来自相关话题

Elasticsearch+java demo

回复

Elasticsearchkingfs 回复了问题 • 3 人关注 • 4 个回复 • 796 次浏览 • 2017-12-21 10:57 • 来自相关话题

使用JavaAPI插入1000w数据,控制台报了OOM异常

回复

Elasticsearchshabusi 发起了问题 • 1 人关注 • 0 个回复 • 501 次浏览 • 2017-12-15 18:07 • 来自相关话题

想问下,这么多客户端到底啥区别,用哪个呢?

回复

Elasticsearchziyou 回复了问题 • 4 人关注 • 2 个回复 • 747 次浏览 • 2017-11-29 17:31 • 来自相关话题

elasticsearch的java api操作

回复

Elasticsearchstrglee 回复了问题 • 2 人关注 • 1 个回复 • 803 次浏览 • 2017-10-31 19:06 • 来自相关话题

java api 查询某字段 总共有多少个值

回复

Elasticsearchnovia 回复了问题 • 2 人关注 • 1 个回复 • 3786 次浏览 • 2017-07-05 10:09 • 来自相关话题

Elasticsearch Java API 索引的增删改查(二)

Elasticsearchquanke 发表了文章 • 1 个评论 • 3538 次浏览 • 2017-11-16 09:53 • 来自相关话题

本节介绍以下 CRUD API:

 单文档  APIs

多文档 APIs

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";
实例
/**  
 * 手动生成JSON  
 */  
@Test  
public void CreateJSON(){  
      
    String json = "{" +  
            "\"user\":\"fendo\"," +  
            "\"postDate\":\"2013-01-30\"," +  
            "\"message\":\"Hell word\"" +  
        "}";  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  Map方式

Map是key:value数据类型,可以代表json结构.

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
 /**  
 * 使用集合  
 */  
@Test  
public void CreateList(){  
      
    Map<String, Object> json = new HashMap<String, Object>();  
    json.put("user","kimchy");  
    json.put("postDate","2013-01-30");  
    json.put("message","trying out Elasticsearch");  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/**  
 * 使用JACKSON序列化  
 * @throws Exception  
 */  
@Test  
public void CreateJACKSON() throws Exception{  
      
    CsdnBlog csdn=new CsdnBlog();  
    csdn.setAuthor("fendo");  
    csdn.setContent("这是JAVA书籍");  
    csdn.setTag("C");  
    csdn.setView("100");  
    csdn.setTitile("编程");  
    csdn.setDate(new Date().toString());  
      
    // instance a json mapper  
    ObjectMapper mapper = new ObjectMapper(); // create once, reuse  

    // generate json  
    byte[] json = mapper.writeValueAsBytes(csdn);  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
}  

  XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
/**  
 * 使用ElasticSearch 帮助类  
 * @throws IOException   
 */  
@Test  
public void CreateXContentBuilder() throws IOException{  
      
    XContentBuilder builder = XContentFactory.jsonBuilder()  
            .startObject()  
                .field("user", "ccse")  
                .field("postDate", new Date())  
                .field("message", "this is Elasticsearch")  
            .endObject();  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
    System.out.println("创建成功!");  
      
      
}  

综合实例

 
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.elasticsearch.action.index.IndexResponse;  
import org.elasticsearch.client.transport.TransportClient;  
import org.elasticsearch.common.settings.Settings;  
import org.elasticsearch.common.transport.InetSocketTransportAddress;  
import org.elasticsearch.common.xcontent.XContentBuilder;  
import org.elasticsearch.common.xcontent.XContentFactory;  
import org.elasticsearch.transport.client.PreBuiltTransportClient;  
import org.junit.Before;  
import org.junit.Test;  
  
import com.fasterxml.jackson.core.JsonProcessingException;  
import com.fasterxml.jackson.databind.ObjectMapper;  
  
public class CreateIndex {  
  
    private TransportClient client;  
      
    @Before  
    public void getClient() throws Exception{  
        //设置集群名称  
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名  
        //创建client  
        client  = new PreBuiltTransportClient(settings)  
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));  
    }  
      
    /**  
     * 手动生成JSON  
     */  
    @Test  
    public void CreateJSON(){  
          
        String json = "{" +  
                "\"user\":\"fendo\"," +  
                "\"postDate\":\"2013-01-30\"," +  
                "\"message\":\"Hell word\"" +  
            "}";  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
      
    /**  
     * 使用集合  
     */  
    @Test  
    public void CreateList(){  
          
        Map<String, Object> json = new HashMap<String, Object>();  
        json.put("user","kimchy");  
        json.put("postDate","2013-01-30");  
        json.put("message","trying out Elasticsearch");  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
    /**  
     * 使用JACKSON序列化  
     * @throws Exception  
     */  
    @Test  
    public void CreateJACKSON() throws Exception{  
          
        CsdnBlog csdn=new CsdnBlog();  
        csdn.setAuthor("fendo");  
        csdn.setContent("这是JAVA书籍");  
        csdn.setTag("C");  
        csdn.setView("100");  
        csdn.setTitile("编程");  
        csdn.setDate(new Date().toString());  
          
        // instance a json mapper  
        ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
  
        // generate json  
        byte[] json = mapper.writeValueAsBytes(csdn);  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
    }  
      
    /**  
     * 使用ElasticSearch 帮助类  
     * @throws IOException   
     */  
    @Test  
    public void CreateXContentBuilder() throws IOException{  
          
        XContentBuilder builder = XContentFactory.jsonBuilder()  
                .startObject()  
                    .field("user", "ccse")  
                    .field("postDate", new Date())  
                    .field("message", "this is Elasticsearch")  
                .endObject();  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
        System.out.println("创建成功!");  
          
          
    }  
      
}  

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete API

根据ID删除:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete By Query API

通过查询条件删除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
        .source("persons") //index(索引名)
        .get();  //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))      //查询            
    .source("persons")                //index(索引名)                                    
    .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听     
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();   //删除文档的数量                 
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE 
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()   //合并到现有文档
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Update by script

使用脚本更新文档 

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

如果不存在,会插入新的文档:

{
    "name" : "Joe Smith",
    "gender": "male"
}

Multi Get API

一次获取多个文档

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1") //一个id的方式
    .add("twitter", "tweet", "2", "3", "4") //多个id的方式
    .add("another", "type", "foo")  //可以从另外一个索引获取
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {      //判断是否存在                
        String json = response.getSourceAsString(); //_source 字段
    }
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
    //处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  //增加elasticsearch客户端
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } //调用失败抛 Throwable
        })
        .setBulkActions(10000) //每次10000请求
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
        .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
        .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
        .build();

BulkProcessor 默认设置

  • bulkActions  1000 
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

 

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

Elasticsearchquanke 发表了文章 • 0 个评论 • 12021 次浏览 • 2017-11-16 09:52 • 来自相关话题

Elasticsearch Java API 客户端连接

一个是TransportClient,一个是NodeClient,还有一个XPackTransportClient

  • TransportClient:

作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。

  • NodeClient

作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的。

  • XPackTransportClient:

服务安装了 x-pack 插件

重要:客户端版本应该和服务端版本保持一致

TransportClient旨在被Java高级REST客户端取代,该客户端执行HTTP请求而不是序列化的Java请求。 在即将到来的Elasticsearch版本中将不赞成使用TransportClient,建议使用Java高级REST客户端。

上面的警告比较尴尬,但是在 5xx版本中使用还是没有问题的,可能使用rest 客户端兼容性更好做一些。

Elasticsearch Java Rest API 手册

Maven Repository

Elasticsearch Java API包已经上传到 Maven Central

pom.xml文件中增加:

transport 版本号最好就是与Elasticsearch版本号一致。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.6.3</version>
</dependency>

Transport Client

不设置集群名称

// on startup

//此步骤添加IP,至少一个,如果设置了"client.transport.sniff"= true 一个就够了,因为添加了自动嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

// on shutdown  关闭client

client.close();

设置集群名称

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();  //设置ES实例的名称
TransportClient client = new PreBuiltTransportClient(settings);  //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
//Add transport addresses and do something with the client...

增加自动嗅探配置

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

其他配置

client.transport.ignore_cluster_name  //设置 true ,忽略连接节点集群名验证
client.transport.ping_timeout       //ping一个节点的响应时间 默认5秒
client.transport.nodes_sampler_interval //sample/ping 节点的时间间隔,默认是5s

对于ES Client,有两种形式,一个是TransportClient,一个是NodeClient。两个的区别为: TransportClient作为一个外部访问者,通过HTTP去请求ES的集群,对于集群而言,它是一个外部因素。 NodeClient顾名思义,是作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的,不像TransportClient那样,ES集群对它一无所知。NodeClient通信的性能会更好,但是因为是ES的一环,所以它出问题,也会给ES集群带来问题。NodeClient可以设置不作为数据节点,在elasticsearch.yml中设置,这样就不会在此节点上分配数据。

如果用ES的节点,仁者见仁智者见智。

实例

package name.quanke.es.study;

import name.quanke.es.study.util.Utils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;

import java.net.InetAddress;

/**
 * Elasticsearch 5.5.1 的client 和 ElasticsearchTemplate的初始化
 * 作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {

        Settings esSettings = Settings.builder()
                .put("cluster.name", "utan-es") //设置ES实例的名称
                .put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
                .build();

        /**
         * 这里的连接方式指的是没有安装x-pack插件,如果安装了x-pack则参考{@link ElasticsearchXPackClient}
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        client = new PreBuiltTransportClient(esSettings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));

        System.out.println("ElasticsearchClient 连接成功");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }

}

本实例代码已经上传到 Git ElasticsearchClient.java

所有实例 已经上传到Git

XPackTransportClient

如果 ElasticSearch 服务安装了 x-pack 插件,需要PreBuiltXPackTransportClient实例才能访问

使用Maven管理项目,把下面代码增加到pom.xml;

一定要修改默认仓库地址为https://artifacts.elastic.co/maven ,因为这个库没有上传到Maven中央仓库,如果有自己的 maven ,请配置代理

<project ...>

   <repositories>
      <!-- add the elasticsearch repo -->
      <repository>
         <id>elasticsearch-releases</id>
         <url>https://artifacts.elastic.co/maven</url>
         <releases>
            <enabled>true</enabled>
         </releases>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
      ...
   </repositories>
   ...

   <dependencies>
      <!-- add the x-pack jar as a dependency -->
      <dependency>
         <groupId>org.elasticsearch.client</groupId>
         <artifactId>x-pack-transport</artifactId>
         <version>5.6.3</version>
      </dependency>
      ...
   </dependencies>
   ...

 </project>

实例


/**
 * Elasticsearch XPack Client
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchXPackClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {
        /**
         * 如果es集群安装了x-pack插件则以此种方式连接集群
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        Settings settings = Settings.builder()
                .put("xpack.security.user", "elastic:utan100")
                .put("cluster.name", "utan-es")
                .build();
        client = new PreBuiltXPackTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
//        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//        credentialsProvider.setCredentials(AuthScope.ANY,
//                new UsernamePasswordCredentials("elastic", "utan100"));

        System.out.println("ElasticsearchXPackClient 启动成功");
    }

    @Test
    public void testClientConnection() throws Exception {

        System.out.println("--------------------------");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }
}

本实例代码已经上传到 Git ElasticsearchXPackClient.java

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg