话题 (es) 已与当前话题合并
elasticsearch

elasticsearch

嵌套查询问题

回复

Elasticsearchmcs41531 发起了问题 • 2 人关注 • 0 个回复 • 25 次浏览 • 10 小时前 • 来自相关话题

logstash和elasticsearch整合失败,Could not index event to Elasticsearch.

Logstashbugq 回复了问题 • 3 人关注 • 2 个回复 • 42 次浏览 • 15 小时前 • 来自相关话题

XContentType.SMILE与XContentType.JSON的区别?

Elasticsearchstrglee 回复了问题 • 2 人关注 • 1 个回复 • 64 次浏览 • 6 天前 • 来自相关话题

Elastic日报 第189期 (2018-02-14)

Elastic日报elk123 发表了文章 • 0 个评论 • 80 次浏览 • 2018-02-14 22:41 • 来自相关话题

1. 日志收集工具fluentd与logstash比较。  http://t.cn/RR9sgLX 2. 如何通过logstash将csv数据导入到elasticsearch。 http://t.cn/RCGeeJK 3. 搜索引擎选择:Elasticsearch与Solr http://t.cn/RUncwIu   编辑:wt 归档:https://elasticsearch.cn/article/503 订阅:https://tinyletter.com/elastic-daily
1. 日志收集工具fluentd与logstash比较。  http://t.cn/RR9sgLX 2. 如何通过logstash将csv数据导入到elasticsearch。 http://t.cn/RCGeeJK 3. 搜索引擎选择:Elasticsearch与Solr http://t.cn/RUncwIu   编辑:wt 归档:https://elasticsearch.cn/article/503 订阅:https://tinyletter.com/elastic-daily

es-hadoop的es.mapping.join属性格式是什么样的?

回复

Elasticsearchlinfq 发起了问题 • 1 人关注 • 0 个回复 • 74 次浏览 • 2018-02-13 14:13 • 来自相关话题

linux环境下启动es很慢

Elasticsearchlocatelli 回复了问题 • 5 人关注 • 4 个回复 • 138 次浏览 • 2018-02-12 04:38 • 来自相关话题

es 设计mapping时 如何创建 嵌套型实体:一个实体里面有一个list

回复

ElasticsearchPhoebM 回复了问题 • 1 人关注 • 1 个回复 • 72 次浏览 • 2018-02-11 20:02 • 来自相关话题

elasticsearch 可以根据某个时间字段不限日期查询在6~10点范围内的数据吗?

Elasticsearch王庆焕 回复了问题 • 5 人关注 • 4 个回复 • 1832 次浏览 • 2018-02-11 15:31 • 来自相关话题

org.elasticsearch.index.mapper.MapperParsingException: No type specified for field

回复

ElasticsearchDeepRedApple 发起了问题 • 1 人关注 • 0 个回复 • 46 次浏览 • 2018-02-11 11:04 • 来自相关话题

Elastic日报 第185期 (2018-02-10)

Elastic日报elk123 发表了文章 • 0 个评论 • 68 次浏览 • 2018-02-10 11:29 • 来自相关话题

  1. Elasticsearch与Hbase特性对比。 http://t.cn/RRyM1vm
  2. 将Elasticsearch作为Hive的存储? http://t.cn/RRyxDNZ
  3. 基于Elasticsearch实现搜索推荐 http://t.cn/RRyJiHx
  1. Elasticsearch与Hbase特性对比。 http://t.cn/RRyM1vm
  2. 将Elasticsearch作为Hive的存储? http://t.cn/RRyxDNZ
  3. 基于Elasticsearch实现搜索推荐 http://t.cn/RRyJiHx

Elasticsearch mapping 配置个人解读

Elasticsearch夏李俊 发表了文章 • 0 个评论 • 130 次浏览 • 2018-02-09 15:47 • 来自相关话题

配置详解 文件中"mapping":{}中的内容,即为创建索引的mappingsource 如:
"mappings": {
    "_default_" : {    //@1
        "_all" : {"enabled" : true},    //@2
        "properties" : {    //@3
            "tableType" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},    //@4
            "caption" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
            "code" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
            "description" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
            "perm" : {"type" : "string", "index" : "not_analyzed", "include_in_all" : false}
		}
	},
	"ec02_goodsinfo" : {    //@5
	    "_all" : {"enabled" : true},    //@6
		"properties" : {    //@7
			"tableType" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"caption" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"code" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"description" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"perm" : {"type" : "string", "index" : "not_analyzed", "include_in_all" : false},
			"bill":{    //@8
				properties" : {
		                       "CreateYear" : {"type" : "string", "index" : "not_analyzed", "include_in_all" : true}    //@9
				}
			}
		}
	}
}
  • @1 _default_所有单据默认的创建索引的配置
  • @2 _all{} 每个单据下所有的字段配置,"enabled" : true 所有字段创建索引,false 所有字段禁止创建索引,[*注意]除非properties指定的字段,默认字段类型将自动匹配
  • @3 properties {},每个单据下字段或者properties的指定配置
  • @4 properties {}中指定了属性(properties):"tableType"的检索配置,type:string > 类型字符串,include_in_all:false > 改字段或者属性不包含在单据的所有字段中,"store": true > 储存在数据库中
  • @5 ec02_goodsinfo 表示对单据 "ec02_goodsinfo" 的特定检索配置
  • @6 _all{} 只对"ec02_goodsinfo"单据下所有的字段配置
  • @7 properties {},只对"ec02_goodsinfo"单据下字段或者properties的指定配置
  • [*注意]@8,@9 bill在单据中额字段都会包括一层bill,所以如果要对单据中某个字段指定需要套一层bill{}
----------------------------------------------------------------------------------------------------------------------------------------- 属性解说 版本5.X以前
  • index 可选值为analyzed(默认)和not_analyzed,如果是字段是字符串类型的,则可以是not_analyzed
  • store 可选值为yes或no,指定该字段的原始值是否被写入索引中,默认为no,即结果中不能返回该字段。
  • boost默认为1,定义了文档中该字段的重要性,越高越重要
  • null_value 如果一个字段为null值(空数组或者数组都是null值)的话不会被索引及搜索到,null_value参数可以显示替代null values为指定值,这样使得字段可以被搜索到。
  • include_in_all 指定该字段是否应该包括在_all字段里头,默认情况下都会包含。
  • type 可以指定String,long,int,doulbe,floot,boolean,等
版本5.X以后
  • 原本type string,其index 可选值为analyzed(默认)和not_analyzed,现在直接拆违type text( index analyzed),type keyword(index not_analyzed)
  • store 可选值为enable或false,指定该字段的原始值是否被写入索引中,默认为enable,即结果中不能返回该字段。
  • index 表示是否用于检索默认enable,可选false
------------------------------------------------------------------------------------------------------------------------------- 字段的数据类型
  • 简单类型string(指定分词器)
  • date(默认使用UTC保持,也可以使用format指定格式)
  • 数值类型(byte,short,integer,long,float,double)
  • boolean
  • binary(存储在索引中的二进制数据的base64表示,比如图像,只存储不索引)
  • ip(以数字形式简化IPV4地址的使用,可以被索引、排序并使用IP值做范围查询)注意string是5.x以前的,5.x之后被分裂为text,keyword
有层级结构的类型,比如object 或者 nested. 特殊类型
  • geo_point
  • geo_shape
  • completion
 

Elasticsearch、Logstash安装x-pack后,logstash发送数据给ES,ES问什么接收不到?

回复

Elasticsearchxinian 回复了问题 • 1 人关注 • 2 个回复 • 89 次浏览 • 2018-02-09 11:15 • 来自相关话题

Elasticsearch安装X-pack,使用Java API创建client怎么设置ES的用户名密码?

Elasticsearchxinian 回复了问题 • 2 人关注 • 2 个回复 • 69 次浏览 • 2018-02-09 10:36 • 来自相关话题

kibana 无法显示

Kibanajianfzhu 回复了问题 • 2 人关注 • 2 个回复 • 69 次浏览 • 2018-02-09 08:40 • 来自相关话题

聚合搜索 size:0 无效 elasticsearch 6.1.2

Elasticsearchlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 75 次浏览 • 2018-02-08 19:01 • 来自相关话题

嵌套查询问题

回复

Elasticsearchmcs41531 发起了问题 • 2 人关注 • 0 个回复 • 25 次浏览 • 10 小时前 • 来自相关话题

logstash和elasticsearch整合失败,Could not index event to Elasticsearch.

回复

Logstashbugq 回复了问题 • 3 人关注 • 2 个回复 • 42 次浏览 • 15 小时前 • 来自相关话题

XContentType.SMILE与XContentType.JSON的区别?

回复

Elasticsearchstrglee 回复了问题 • 2 人关注 • 1 个回复 • 64 次浏览 • 6 天前 • 来自相关话题

es-hadoop的es.mapping.join属性格式是什么样的?

回复

Elasticsearchlinfq 发起了问题 • 1 人关注 • 0 个回复 • 74 次浏览 • 2018-02-13 14:13 • 来自相关话题

linux环境下启动es很慢

回复

Elasticsearchlocatelli 回复了问题 • 5 人关注 • 4 个回复 • 138 次浏览 • 2018-02-12 04:38 • 来自相关话题

es 设计mapping时 如何创建 嵌套型实体:一个实体里面有一个list

回复

ElasticsearchPhoebM 回复了问题 • 1 人关注 • 1 个回复 • 72 次浏览 • 2018-02-11 20:02 • 来自相关话题

elasticsearch 可以根据某个时间字段不限日期查询在6~10点范围内的数据吗?

回复

Elasticsearch王庆焕 回复了问题 • 5 人关注 • 4 个回复 • 1832 次浏览 • 2018-02-11 15:31 • 来自相关话题

org.elasticsearch.index.mapper.MapperParsingException: No type specified for field

回复

ElasticsearchDeepRedApple 发起了问题 • 1 人关注 • 0 个回复 • 46 次浏览 • 2018-02-11 11:04 • 来自相关话题

Elasticsearch、Logstash安装x-pack后,logstash发送数据给ES,ES问什么接收不到?

回复

Elasticsearchxinian 回复了问题 • 1 人关注 • 2 个回复 • 89 次浏览 • 2018-02-09 11:15 • 来自相关话题

Elasticsearch安装X-pack,使用Java API创建client怎么设置ES的用户名密码?

回复

Elasticsearchxinian 回复了问题 • 2 人关注 • 2 个回复 • 69 次浏览 • 2018-02-09 10:36 • 来自相关话题

kibana 无法显示

回复

Kibanajianfzhu 回复了问题 • 2 人关注 • 2 个回复 • 69 次浏览 • 2018-02-09 08:40 • 来自相关话题

聚合搜索 size:0 无效 elasticsearch 6.1.2

回复

Elasticsearchlaoyang360 回复了问题 • 3 人关注 • 2 个回复 • 75 次浏览 • 2018-02-08 19:01 • 来自相关话题

java api批量导入数据,没报错,但是没数据啊

回复

Elasticsearch郭靖 回复了问题 • 3 人关注 • 2 个回复 • 139 次浏览 • 2018-02-08 16:47 • 来自相关话题

大家都用es-hadoop做什么场景

回复

Elasticsearchzjthree 回复了问题 • 2 人关注 • 1 个回复 • 104 次浏览 • 2018-02-08 15:32 • 来自相关话题

多条日志合并

回复

Elasticsearchyangjiajun111 回复了问题 • 2 人关注 • 2 个回复 • 66 次浏览 • 2018-02-08 11:03 • 来自相关话题

Elastic日报 第189期 (2018-02-14)

Elastic日报elk123 发表了文章 • 0 个评论 • 80 次浏览 • 2018-02-14 22:41 • 来自相关话题

1. 日志收集工具fluentd与logstash比较。  http://t.cn/RR9sgLX 2. 如何通过logstash将csv数据导入到elasticsearch。 http://t.cn/RCGeeJK 3. 搜索引擎选择:Elasticsearch与Solr http://t.cn/RUncwIu   编辑:wt 归档:https://elasticsearch.cn/article/503 订阅:https://tinyletter.com/elastic-daily
1. 日志收集工具fluentd与logstash比较。  http://t.cn/RR9sgLX 2. 如何通过logstash将csv数据导入到elasticsearch。 http://t.cn/RCGeeJK 3. 搜索引擎选择:Elasticsearch与Solr http://t.cn/RUncwIu   编辑:wt 归档:https://elasticsearch.cn/article/503 订阅:https://tinyletter.com/elastic-daily

Elastic日报 第185期 (2018-02-10)

Elastic日报elk123 发表了文章 • 0 个评论 • 68 次浏览 • 2018-02-10 11:29 • 来自相关话题

  1. Elasticsearch与Hbase特性对比。 http://t.cn/RRyM1vm
  2. 将Elasticsearch作为Hive的存储? http://t.cn/RRyxDNZ
  3. 基于Elasticsearch实现搜索推荐 http://t.cn/RRyJiHx
  1. Elasticsearch与Hbase特性对比。 http://t.cn/RRyM1vm
  2. 将Elasticsearch作为Hive的存储? http://t.cn/RRyxDNZ
  3. 基于Elasticsearch实现搜索推荐 http://t.cn/RRyJiHx

Elasticsearch mapping 配置个人解读

Elasticsearch夏李俊 发表了文章 • 0 个评论 • 130 次浏览 • 2018-02-09 15:47 • 来自相关话题

配置详解 文件中"mapping":{}中的内容,即为创建索引的mappingsource 如:
"mappings": {
    "_default_" : {    //@1
        "_all" : {"enabled" : true},    //@2
        "properties" : {    //@3
            "tableType" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},    //@4
            "caption" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
            "code" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
            "description" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
            "perm" : {"type" : "string", "index" : "not_analyzed", "include_in_all" : false}
		}
	},
	"ec02_goodsinfo" : {    //@5
	    "_all" : {"enabled" : true},    //@6
		"properties" : {    //@7
			"tableType" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"caption" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"code" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"description" : {"type" : "string", "index" : "no", "include_in_all" : false, "store": true},
			"perm" : {"type" : "string", "index" : "not_analyzed", "include_in_all" : false},
			"bill":{    //@8
				properties" : {
		                       "CreateYear" : {"type" : "string", "index" : "not_analyzed", "include_in_all" : true}    //@9
				}
			}
		}
	}
}
  • @1 _default_所有单据默认的创建索引的配置
  • @2 _all{} 每个单据下所有的字段配置,"enabled" : true 所有字段创建索引,false 所有字段禁止创建索引,[*注意]除非properties指定的字段,默认字段类型将自动匹配
  • @3 properties {},每个单据下字段或者properties的指定配置
  • @4 properties {}中指定了属性(properties):"tableType"的检索配置,type:string > 类型字符串,include_in_all:false > 改字段或者属性不包含在单据的所有字段中,"store": true > 储存在数据库中
  • @5 ec02_goodsinfo 表示对单据 "ec02_goodsinfo" 的特定检索配置
  • @6 _all{} 只对"ec02_goodsinfo"单据下所有的字段配置
  • @7 properties {},只对"ec02_goodsinfo"单据下字段或者properties的指定配置
  • [*注意]@8,@9 bill在单据中额字段都会包括一层bill,所以如果要对单据中某个字段指定需要套一层bill{}
----------------------------------------------------------------------------------------------------------------------------------------- 属性解说 版本5.X以前
  • index 可选值为analyzed(默认)和not_analyzed,如果是字段是字符串类型的,则可以是not_analyzed
  • store 可选值为yes或no,指定该字段的原始值是否被写入索引中,默认为no,即结果中不能返回该字段。
  • boost默认为1,定义了文档中该字段的重要性,越高越重要
  • null_value 如果一个字段为null值(空数组或者数组都是null值)的话不会被索引及搜索到,null_value参数可以显示替代null values为指定值,这样使得字段可以被搜索到。
  • include_in_all 指定该字段是否应该包括在_all字段里头,默认情况下都会包含。
  • type 可以指定String,long,int,doulbe,floot,boolean,等
版本5.X以后
  • 原本type string,其index 可选值为analyzed(默认)和not_analyzed,现在直接拆违type text( index analyzed),type keyword(index not_analyzed)
  • store 可选值为enable或false,指定该字段的原始值是否被写入索引中,默认为enable,即结果中不能返回该字段。
  • index 表示是否用于检索默认enable,可选false
------------------------------------------------------------------------------------------------------------------------------- 字段的数据类型
  • 简单类型string(指定分词器)
  • date(默认使用UTC保持,也可以使用format指定格式)
  • 数值类型(byte,short,integer,long,float,double)
  • boolean
  • binary(存储在索引中的二进制数据的base64表示,比如图像,只存储不索引)
  • ip(以数字形式简化IPV4地址的使用,可以被索引、排序并使用IP值做范围查询)注意string是5.x以前的,5.x之后被分裂为text,keyword
有层级结构的类型,比如object 或者 nested. 特殊类型
  • geo_point
  • geo_shape
  • completion
 

【阿里云】专访阿里云 MVP & Elastic中文社区发起人 曾勇—— 做你想做的事情,培养解决问题的能力

Elasticsearchfeil 发表了文章 • 1 个评论 • 156 次浏览 • 2018-02-06 13:45 • 来自相关话题

本期邀请阿里云MVP & Elastic中文社区发起人曾勇进行专访! 详细专访请戳这里或扫描下图中二维码查看,谢谢! 曾勇用专业传递技术能量,分享自身技术发展之路,从技术人转型到技术管理者的历程,创新的技术能量希望对你有所启发!
封面-曾勇.jpg
 
本期邀请阿里云MVP & Elastic中文社区发起人曾勇进行专访! 详细专访请戳这里或扫描下图中二维码查看,谢谢! 曾勇用专业传递技术能量,分享自身技术发展之路,从技术人转型到技术管理者的历程,创新的技术能量希望对你有所启发!
封面-曾勇.jpg
 

java 客户端 获取 termvectors

ElasticsearchJiaShiwen 发表了文章 • 0 个评论 • 306 次浏览 • 2018-01-19 15:56 • 来自相关话题

elasticsearch的termvectors包括了term的位置、词频等信息。这些信息用于相应的数据统计或开发其他功能,本文介绍termvecters如何使用,如何通过java客户端获取termvectors相关信息。

要使用termvctor首先要配置mapping中field的"term_vector"属性,默认状态es不开启termvector,因为这样会增加索引的体积,毕竟多存了不少元数据。

PUT test
{
  "mappings": {
    "qa_test": {
      "dynamic": "strict",
      "_all": {
        "enabled": false
      },
      "properties": {
        "question": {
          "properties": {
            "cate": {
              "type": "keyword"
            },
            "desc": {
              "type": "text",
              "store": true,
              "term_vector": "with_positions_offsets_payloads",
              "analyzer": "ik_smart"
            },
            "time": {
              "type": "date",
              "store": true,
              "format": "strict_date_optional_time||epoch_millis||yyyy-MM-dd HH:mm:ss"
            },
            "title": {
              "type": "text",
              "store": true,
              "term_vector": "with_positions_offsets_payloads",
              "analyzer": "ik_smart"
            }
          }
        },
        "updatetime": {
          "type": "date",
          "store": true,
          "format": "strict_date_optional_time||epoch_millis||yyyy-MM-dd HH:mm:ss"
        }
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "1",
      "requests": {
        "cache": {
          "enable": "true"
        }
      },
      "number_of_replicas": "1"
    }
  }
}

注意示例中的"title"的"term_vector"属性。

接下来为索引创建一条数据

PUT qa_test_02/qa_test/1
{
  "question": {
    "cate": [
      "装修流程",
      "其它"
    ],
    "desc": "筒灯,大洋和索正这两个牌子,哪个好?希望内行的朋友告知一下,谢谢!",
    "time": "2016-07-02 19:59:00",
    "title": "筒灯大洋和索正这两个牌子哪个好"
  },
  "updatetime": 1467503940000
}

下面我们看看这条数据上question.title字段的termvector信息

GET qa_test_02/qa_test/1/_termvectors
{
  "fields": [
    "question.title"
  ],
  "offsets": true,
  "payloads": true,
  "positions": true,
  "term_statistics": true,
  "field_statistics": true
}

结果大概这个样子

{
  "_index": "qa_test_02",
  "_type": "qa_test",
  "_id": "1",
  "_version": 1,
  "found": true,
  "took": 0,
  "term_vectors": {
    "question.title": {
      "field_statistics": {
        "sum_doc_freq": 9,
        "doc_count": 1,
        "sum_ttf": 9
      },
      "terms": {
        "和": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 2,
              "start_offset": 4,
              "end_offset": 5
            }
          ]
        },
        "哪个": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 7,
              "start_offset": 12,
              "end_offset": 14
            }
          ]
        },
        "大洋": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 1,
              "start_offset": 2,
              "end_offset": 4
            }
          ]
        },
        "好": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 8,
              "start_offset": 14,
              "end_offset": 15
            }
          ]
        },
        "正": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 4,
              "start_offset": 6,
              "end_offset": 7
            }
          ]
        },
        "牌子": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 6,
              "start_offset": 10,
              "end_offset": 12
            }
          ]
        },
        "筒灯": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 0,
              "start_offset": 0,
              "end_offset": 2
            }
          ]
        },
        "索": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 3,
              "start_offset": 5,
              "end_offset": 6
            }
          ]
        },
        "这两个": {
          "doc_freq": 1,
          "ttf": 1,
          "term_freq": 1,
          "tokens": [
            {
              "position": 5,
              "start_offset": 7,
              "end_offset": 10
            }
          ]
        }
      }
    }
  }
}

下面我们说说如何通过java代码实现termvector的获取,不说废话直接上代码

            TermVectorsResponse     termVectorResponse = client.prepareTermVectors().setIndex(sourceindexname).setType(sourceindextype)
                        .setId(id).setSelectedFields(fieldname).setTermStatistics(true).execute()
                        .actionGet();
                XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
                termVectorResponse.toXContent(builder, null);
                System.out.println(builder.string());
                Fields fields = termVectorResponse.getFields();
                Iterator<String> iterator = fields.iterator();
                while (iterator.hasNext()) {
                    String field = iterator.next();
                    Terms terms = fields.terms(field);
                    TermsEnum termsEnum = terms.iterator();
                    while (termsEnum.next() != null) {
                        BytesRef term = termsEnum.term();
                        if (term != null) {
                            System.out.println(term.utf8ToString() + termsEnum.totalTermFreq());
                        }
                    }
                }

获取TermVectorsResponse的代码很好理解,主要是设置索引名称、索引type、索引id以及需要展示的若干属性。

接下来是如何获取某一term的termvector,有两种方案第一种是通过TermVectorsResponse的toXContent方法直接生成XContentBuilder,这种方法可以直接获取和上面通过DSL查询一样的json结果;第二种是通过Fields的iterator遍历fields,获取TermsEnum,熟悉lucene的同学应会更熟悉第二种方法。

elasticsearch java原生打分插件开发

ElasticsearchJiaShiwen 发表了文章 • 0 个评论 • 455 次浏览 • 2018-01-10 16:34 • 来自相关话题

能有影响elasticsearch score的方法有很多,官方推荐的是使用内置的painless脚本语言结合function_score来重新定义score。由于本人开发的项目其算法是由java语言开发的,于是决定尝试原生脚本开发。 elasticsearch脚本由plugin-descriptor.properties文件以及运行jar包组成,plugin-descriptor.properties主要用来定义版本信息、对应es的版本信息等属性。

官方的例子

public class ExpertScriptPlugin extends Plugin implements ScriptPlugin {
    @Override
    public ScriptEngineService getScriptEngineService(Settings settings) {
        return new MyExpertScriptEngine();
    }
    /** An example {@link ScriptEngineService} that uses Lucene segment details to implement pure document frequency scoring. */
    // tag::expert_engine
    private static class MyExpertScriptEngine implements ScriptEngineService {
        @Override
        public String getType() {
            return "expert_scripts";
        }
        @Override
        public Function<Map<String,Object>,SearchScript> compile(String scriptName, String scriptSource, Map<String, String> params) {
            // we use the script "source" as the script identifier
            if ("pure_df".equals(scriptSource)) {
                return p -> new SearchScript() {
                    final String field;
                    final String term;
                    {
                        if (p.containsKey("field") == false) {
                            throw new IllegalArgumentException("Missing parameter [field]");
                        }
                        if (p.containsKey("term") == false) {
                            throw new IllegalArgumentException("Missing parameter [term]");
                        }
                        field = p.get("field").toString();
                        term = p.get("term").toString();
                    }
                    @Override
                    public LeafSearchScript getLeafSearchScript(LeafReaderContext context) throws IOException {
                        PostingsEnum postings = context.reader().postings(new Term(field, term));
                        if (postings == null) {
                            // the field and/or term don't exist in this segment, so always return 0
                            return () -> 0.0d;
                        }
                        return new LeafSearchScript() {
                            int currentDocid = -1;
                            @Override
                            public void setDocument(int docid) {
                                // advance has undefined behavior calling with a docid <= its current docid
                                if (postings.docID() < docid) {
                                    try {
                                        postings.advance(docid);
                                    } catch (IOException e) {
                                        throw new UncheckedIOException(e);
                                    }
                                }
                                currentDocid = docid;
                            }
                            @Override
                            public double runAsDouble() {
                                if (postings.docID() != currentDocid) {
                                    // advance moved past the current doc, so this doc has no occurrences of the term
                                    return 0.0d;
                                }
                                try {
                                    return postings.freq();
                                } catch (IOException e) {
                                    throw new UncheckedIOException(e);
                                }
                            }
                        };
                    }
                    @Override
                    public boolean needsScores() {
                        return false;
                    }
                };
            }
            throw new IllegalArgumentException("Unknown script name " + scriptSource);
        }

        @Override
        @SuppressWarnings("unchecked")
        public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> params) {
          Function<Map<String,Object>,SearchScript> scriptFactory = (Function<Map<String,Object>,SearchScript>) compiledScript.compiled();
          return scriptFactory.apply(params);
        }

        @Override
        public ExecutableScript executable(CompiledScript compiledScript, @Nullable Map<String, Object> params) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isInlineScriptEnabled() {
            return true;
        }

        @Override
        public void close() {}
    }
}

代码解读: 本例在elasticsearch源码中,https://github.com/elastic/elasticsearch/tree/master/plugins/examples/script-expert-scoring

MyExpertScriptEngine类是其中最重要的类,用于实现脚本参数定义,编译,以及打分机制的实现。其中compile方法返回我们定义好打分逻辑的java function。search方法用于我们在搜索过程中实施定义好的打分逻辑。 怎奈笔者对于函数式编程知道的不多(后续需要补课),其实评分逻辑也可以在search方法中实现,于是有了下面的一段代码。

public class fieldaddScriptPlugin extends Plugin implements ScriptPlugin {
    @Override
    public ScriptEngineService getScriptEngineService(Settings settings) {
        return new MyExpertScriptEngine();
    }
    private static class MyExpertScriptEngine implements ScriptEngineService {
        @Override
        public String getType() {
            return "expert_scripts";
        }

        @Override
        public Object compile(String scriptName, String scriptSource, Map<String, String> params) {
            if ("example_add".equals(scriptSource)) {
                return scriptSource;
            }
            throw new IllegalArgumentException("Unknown script name " + scriptSource);
        }

        @Override
        @SuppressWarnings("unchecked")
        public SearchScript search(CompiledScript compiledScript, SearchLookup lookup, @Nullable Map<String, Object> vars) {

            /**
             * 校验输入参数,DSL中params 参数列表
             */
            final long inc;
            final String fieldname;
            if (vars == null || vars.containsKey("inc") == false) {
                inc = 0;
            } else {
                inc = ((Number) vars.get("inc")).longValue();
            }

            if (vars == null || vars.containsKey("fieldname") == false) {
                throw new IllegalArgumentException("Missing parameter [fieldname]");
            } else {
                fieldname = (String) vars.get("fieldname");
            }

            return new SearchScript() {
                @Override
                public LeafSearchScript getLeafSearchScript(LeafReaderContext context) throws IOException {
                    final LeafSearchLookup leafLookup = lookup.getLeafSearchLookup(context);

                    return new LeafSearchScript() {
                        @Override
                        public void setDocument(int doc) {
                            if (leafLookup != null) {
                                leafLookup.setDocument(doc);
                            }
                        }

                        @Override
                        public double runAsDouble() {
                            long values = 0;
                            /**
                             * 获取document中字段内容
                             */
                            for (Object v : (List<?>) leafLookup.doc().get(fieldname)) {
                                values = ((Number) v).longValue() + values;
                            }
                            return values + inc;
                        }
                    };
                }

                @Override
                public boolean needsScores() {
                    return false;
                }
            };
        }
     这段代码的逻辑是把给定的字段(字段类型long)的每个元素相加后再加上给定的增量参数最后形成score分值。为了实现上述逻辑需要实现参数获取、根据给定的字段名获取内容列表量的关键件。下面结合代码说说这两个步骤如何实现的。

search方法中Map<String, Object> vars参数对应DSL中"params"参数,用于接受实际给定的运行时参数。SearchLookup lookup参数由系统传入,通过lookup.getLeafSearchLookup(context)获取LeafSearchLookup通过该对象可以获取给定字段的值。

对于elasticsearch 2.x以前的版本可以通过NativeScriptFactory实现原生脚本。

public class MyNativeScriptPlugin extends Plugin implements ScriptPlugin {
    private final static Logger LOGGER = LogManager.getLogger(MyFirstPlugin.class);

    public MyNativeScriptPlugin() {
        super();
        LOGGER.warn("This is MyNativeScriptPlugin");
    }

    @Override
    public List<NativeScriptFactory> getNativeScripts() {
        return Collections.singletonList(new MyNativeScriptFactory());
    }

    public static class MyNativeScriptFactory implements NativeScriptFactory {
        @Override
        public ExecutableScript newScript(@Nullable Map<String, Object> params) {

//            return new MyNativeScript();
            return new AbstractDoubleSearchScript(){

                @Override
                public double runAsDouble() {
                    int b=0;
                    if(params.get("add")!=null){
                        b= (int) params.get("add");
                    }

                    String s =  source().get("last").toString();
                    double a = s.length()+b;
                    return a;                }
            };
        }

        @Override
        public boolean needsScores() {
            return false;
        }

        @Override
        public String getName() {
            return "my_script";
        }
    }
}

工程组织 elasticsearch工程使用gradle进行依赖管理和生命周期管理,为此es项目自己也开发了esplugin的gradle插件,但不兼容gradle4.2以上的版本。参考github中的成熟插件,使用maven组织工程。

主要涉及两个文件 pom.xml plugin.xml 工程利用maven-assembly-plugin打包jar。

本例github地址:https://github.com/jiashiwen/elasticsearchpluginsample 欢迎点赞或拍砖

elasticsearch批量导入数据注意事项

Elasticsearchwj86611199 发表了文章 • 0 个评论 • 401 次浏览 • 2017-12-16 23:55 • 来自相关话题

刚刚初始化启动kiabna后是没有索引的,当然,如果elasticsearch中导入过数据那么kibana会自动匹配索引 现在按照官方例子开始批量给elasticsearch导入数据 链接如下https://www.elastic.co/guide/e ... .html 我们会依次导入如下 三块数据  1.The Shakespeare data 莎士比亚文集的数据结构 {     "line_id": INT,     "play_name": "String",     "speech_number": INT,     "line_number": "String",     "speaker": "String",     "text_entry": "String", } 2.The accounts data  账户数据结构 {     "account_number": INT,     "balance": INT,     "firstname": "String",     "lastname": "String",     "age": INT,     "gender": "M or F",     "address": "String",     "employer": "String",     "email": "String",     "city": "String",     "state": "String" } 3.The schema for the logs data 日志数据 {     "memory": INT,     "geo.coordinates": "geo_point"     "@timestamp": "date" } 然后向elasticsearch设置字段映射 Use the following command in a terminal (eg bash) to set up a mapping for the Shakespeare data set: 以下是莎士比亚的字段映射 可以用postman或者curl等发出请求~完整的url应该是localhost:9200/shakespear PUT /shakespeare {  "mappings": {   "doc": {    "properties": {     "speaker": {"type": "keyword"},     "play_name": {"type": "keyword"},     "line_id": {"type": "integer"},     "speech_number": {"type": "integer"}    }   }  } } Use the following commands to establish geo_point mapping for the logs: 这是 logs的字段映射 PUT /logstash-2015.05.18 {   "mappings": {     "log": {       "properties": {         "geo": {           "properties": {             "coordinates": {               "type": "geo_point"             }           }         }       }     }   } }   PUT /logstash-2015.05.19 {   "mappings": {     "log": {       "properties": {         "geo": {           "properties": {             "coordinates": {               "type": "geo_point"             }           }         }       }     }   } } COPY AS CURLVIEW IN CONSOLE  PUT /logstash-2015.05.20 {   "mappings": {     "log": {       "properties": {         "geo": {           "properties": {             "coordinates": {               "type": "geo_point"             }           }         }       }     }   } } 账户信息没有字段映射。。。 现在批量导入 curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/bank/account/_bulk?pretty' --data-binary @accounts.json curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/shakespeare/doc/_bulk?pretty' --data-binary @shakespeare_6.0.json curl -H 'Content-Type: application/x-ndjson' -XPOST 'localhost:9200/_bulk?pretty' --data-binary @logs.jsonl windows下的curl命令可以到https://curl.haxx.se/download.html#Win64下载,解压后设置环境变量即可 这里要注意的是 @accounts.json,@shakespeare_6.0.json,@logs.json这些文件的位置应该是你所在的当前目录, 如果你当前位置是D盘~那么这些文件位置就要放在D盘下,否则读不到 还有一点~~~windows下要把命令行中的单引号换成双引号,,。。。否则会报 curl: (6) Could not resolve host: application这样的错误  

Elasticsearch Java API 索引的增删改查(二)

Elasticsearchquanke 发表了文章 • 0 个评论 • 980 次浏览 • 2017-11-16 09:53 • 来自相关话题

本节介绍以下 CRUD API:

 单文档  APIs

多文档 APIs

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";
实例
/**  
 * 手动生成JSON  
 */  
@Test  
public void CreateJSON(){  
      
    String json = "{" +  
            "\"user\":\"fendo\"," +  
            "\"postDate\":\"2013-01-30\"," +  
            "\"message\":\"Hell word\"" +  
        "}";  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  Map方式

Map是key:value数据类型,可以代表json结构.

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
 /**  
 * 使用集合  
 */  
@Test  
public void CreateList(){  
      
    Map<String, Object> json = new HashMap<String, Object>();  
    json.put("user","kimchy");  
    json.put("postDate","2013-01-30");  
    json.put("message","trying out Elasticsearch");  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
      
}  

  序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
/**  
 * 使用JACKSON序列化  
 * @throws Exception  
 */  
@Test  
public void CreateJACKSON() throws Exception{  
      
    CsdnBlog csdn=new CsdnBlog();  
    csdn.setAuthor("fendo");  
    csdn.setContent("这是JAVA书籍");  
    csdn.setTag("C");  
    csdn.setView("100");  
    csdn.setTitile("编程");  
    csdn.setDate(new Date().toString());  
      
    // instance a json mapper  
    ObjectMapper mapper = new ObjectMapper(); // create once, reuse  

    // generate json  
    byte[] json = mapper.writeValueAsBytes(csdn);  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodate")  
            .setSource(json)  
            .get();  
    System.out.println(response.getResult());  
}  

  XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
/**  
 * 使用ElasticSearch 帮助类  
 * @throws IOException   
 */  
@Test  
public void CreateXContentBuilder() throws IOException{  
      
    XContentBuilder builder = XContentFactory.jsonBuilder()  
            .startObject()  
                .field("user", "ccse")  
                .field("postDate", new Date())  
                .field("message", "this is Elasticsearch")  
            .endObject();  
      
    IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
    System.out.println("创建成功!");  
      
      
}  

综合实例

 
import java.io.IOException;  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.Date;  
import java.util.HashMap;  
import java.util.Map;  
  
import org.elasticsearch.action.index.IndexResponse;  
import org.elasticsearch.client.transport.TransportClient;  
import org.elasticsearch.common.settings.Settings;  
import org.elasticsearch.common.transport.InetSocketTransportAddress;  
import org.elasticsearch.common.xcontent.XContentBuilder;  
import org.elasticsearch.common.xcontent.XContentFactory;  
import org.elasticsearch.transport.client.PreBuiltTransportClient;  
import org.junit.Before;  
import org.junit.Test;  
  
import com.fasterxml.jackson.core.JsonProcessingException;  
import com.fasterxml.jackson.databind.ObjectMapper;  
  
public class CreateIndex {  
  
    private TransportClient client;  
      
    @Before  
    public void getClient() throws Exception{  
        //设置集群名称  
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名  
        //创建client  
        client  = new PreBuiltTransportClient(settings)  
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));  
    }  
      
    /**  
     * 手动生成JSON  
     */  
    @Test  
    public void CreateJSON(){  
          
        String json = "{" +  
                "\"user\":\"fendo\"," +  
                "\"postDate\":\"2013-01-30\"," +  
                "\"message\":\"Hell word\"" +  
            "}";  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
      
    /**  
     * 使用集合  
     */  
    @Test  
    public void CreateList(){  
          
        Map<String, Object> json = new HashMap<String, Object>();  
        json.put("user","kimchy");  
        json.put("postDate","2013-01-30");  
        json.put("message","trying out Elasticsearch");  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
          
    }  
      
    /**  
     * 使用JACKSON序列化  
     * @throws Exception  
     */  
    @Test  
    public void CreateJACKSON() throws Exception{  
          
        CsdnBlog csdn=new CsdnBlog();  
        csdn.setAuthor("fendo");  
        csdn.setContent("这是JAVA书籍");  
        csdn.setTag("C");  
        csdn.setView("100");  
        csdn.setTitile("编程");  
        csdn.setDate(new Date().toString());  
          
        // instance a json mapper  
        ObjectMapper mapper = new ObjectMapper(); // create once, reuse  
  
        // generate json  
        byte[] json = mapper.writeValueAsBytes(csdn);  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodate")  
                .setSource(json)  
                .get();  
        System.out.println(response.getResult());  
    }  
      
    /**  
     * 使用ElasticSearch 帮助类  
     * @throws IOException   
     */  
    @Test  
    public void CreateXContentBuilder() throws IOException{  
          
        XContentBuilder builder = XContentFactory.jsonBuilder()  
                .startObject()  
                    .field("user", "ccse")  
                    .field("postDate", new Date())  
                    .field("message", "this is Elasticsearch")  
                .endObject();  
          
        IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();  
        System.out.println("创建成功!");  
          
          
    }  
      
}  

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete API

根据ID删除:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false

GetResponse response = client.prepareGet("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
        .setOperationThreaded(false)
        .get();

Delete By Query API

通过查询条件删除

BulkByScrollResponse response =
    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
        .filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
        .source("persons") //index(索引名)
        .get();  //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))      //查询            
    .source("persons")                //index(索引名)                                    
    .execute(new ActionListener<BulkByScrollResponse>() {     //回调监听     
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();   //删除文档的数量                 
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE 
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()   //合并到现有文档
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Update by script

使用脚本更新文档 

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

如果不存在,会插入新的文档:

{
    "name" : "Joe Smith",
    "gender": "male"
}

Multi Get API

一次获取多个文档

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1") //一个id的方式
    .add("twitter", "tweet", "2", "3", "4") //多个id的方式
    .add("another", "type", "foo")  //可以从另外一个索引获取
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {      //判断是否存在                
        String json = response.getSourceAsString(); //_source 字段
    }
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
    //处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  //增加elasticsearch客户端
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } //调用失败抛 Throwable
        })
        .setBulkActions(10000) //每次10000请求
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
        .setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
        .setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
        .build();

BulkProcessor 默认设置

  • bulkActions  1000 
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requestsBulkProcessor

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitCloseclose 方法关闭BulkProcessor:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

 

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

Elasticsearchquanke 发表了文章 • 0 个评论 • 1520 次浏览 • 2017-11-16 09:52 • 来自相关话题

Elasticsearch Java API 客户端连接

一个是TransportClient,一个是NodeClient,还有一个XPackTransportClient

  • TransportClient:

作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。

  • NodeClient

作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的。

  • XPackTransportClient:

服务安装了 x-pack 插件

重要:客户端版本应该和服务端版本保持一致

TransportClient旨在被Java高级REST客户端取代,该客户端执行HTTP请求而不是序列化的Java请求。 在即将到来的Elasticsearch版本中将不赞成使用TransportClient,建议使用Java高级REST客户端。

上面的警告比较尴尬,但是在 5xx版本中使用还是没有问题的,可能使用rest 客户端兼容性更好做一些。

Elasticsearch Java Rest API 手册

Maven Repository

Elasticsearch Java API包已经上传到 Maven Central

pom.xml文件中增加:

transport 版本号最好就是与Elasticsearch版本号一致。

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>transport</artifactId>
    <version>5.6.3</version>
</dependency>

Transport Client

不设置集群名称

// on startup

//此步骤添加IP,至少一个,如果设置了"client.transport.sniff"= true 一个就够了,因为添加了自动嗅探配置
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

// on shutdown  关闭client

client.close();

设置集群名称

Settings settings = Settings.builder()
        .put("cluster.name", "myClusterName").build();  //设置ES实例的名称
TransportClient client = new PreBuiltTransportClient(settings);  //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
//Add transport addresses and do something with the client...

增加自动嗅探配置

Settings settings = Settings.builder()
        .put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);

其他配置

client.transport.ignore_cluster_name  //设置 true ,忽略连接节点集群名验证
client.transport.ping_timeout       //ping一个节点的响应时间 默认5秒
client.transport.nodes_sampler_interval //sample/ping 节点的时间间隔,默认是5s

对于ES Client,有两种形式,一个是TransportClient,一个是NodeClient。两个的区别为: TransportClient作为一个外部访问者,通过HTTP去请求ES的集群,对于集群而言,它是一个外部因素。 NodeClient顾名思义,是作为ES集群的一个节点,它是ES中的一环,其他的节点对它是感知的,不像TransportClient那样,ES集群对它一无所知。NodeClient通信的性能会更好,但是因为是ES的一环,所以它出问题,也会给ES集群带来问题。NodeClient可以设置不作为数据节点,在elasticsearch.yml中设置,这样就不会在此节点上分配数据。

如果用ES的节点,仁者见仁智者见智。

实例

package name.quanke.es.study;

import name.quanke.es.study.util.Utils;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.After;
import org.junit.Before;

import java.net.InetAddress;

/**
 * Elasticsearch 5.5.1 的client 和 ElasticsearchTemplate的初始化
 * 作为一个外部访问者,请求ES的集群,对于集群而言,它是一个外部因素。
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {

        Settings esSettings = Settings.builder()
                .put("cluster.name", "utan-es") //设置ES实例的名称
                .put("client.transport.sniff", true) //自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
                .build();

        /**
         * 这里的连接方式指的是没有安装x-pack插件,如果安装了x-pack则参考{@link ElasticsearchXPackClient}
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        client = new PreBuiltTransportClient(esSettings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));

        System.out.println("ElasticsearchClient 连接成功");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }

}

本实例代码已经上传到 Git ElasticsearchClient.java

所有实例 已经上传到Git

XPackTransportClient

如果 ElasticSearch 服务安装了 x-pack 插件,需要PreBuiltXPackTransportClient实例才能访问

使用Maven管理项目,把下面代码增加到pom.xml;

一定要修改默认仓库地址为https://artifacts.elastic.co/maven ,因为这个库没有上传到Maven中央仓库,如果有自己的 maven ,请配置代理

<project ...>

   <repositories>
      <!-- add the elasticsearch repo -->
      <repository>
         <id>elasticsearch-releases</id>
         <url>https://artifacts.elastic.co/maven</url>
         <releases>
            <enabled>true</enabled>
         </releases>
         <snapshots>
            <enabled>false</enabled>
         </snapshots>
      </repository>
      ...
   </repositories>
   ...

   <dependencies>
      <!-- add the x-pack jar as a dependency -->
      <dependency>
         <groupId>org.elasticsearch.client</groupId>
         <artifactId>x-pack-transport</artifactId>
         <version>5.6.3</version>
      </dependency>
      ...
   </dependencies>
   ...

 </project>

实例


/**
 * Elasticsearch XPack Client
 * Created by http://quanke.name on 2017/11/10.
 */
public class ElasticsearchXPackClient {

    protected TransportClient client;

    @Before
    public void setUp() throws Exception {
        /**
         * 如果es集群安装了x-pack插件则以此种方式连接集群
         * 1. java客户端的方式是以tcp协议在9300端口上进行通信
         * 2. http客户端的方式是以http协议在9200端口上进行通信
         */
        Settings settings = Settings.builder()
                .put("xpack.security.user", "elastic:utan100")
                .put("cluster.name", "utan-es")
                .build();
        client = new PreBuiltXPackTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("192.168.1.10"), 9300));
//        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//        credentialsProvider.setCredentials(AuthScope.ANY,
//                new UsernamePasswordCredentials("elastic", "utan100"));

        System.out.println("ElasticsearchXPackClient 启动成功");
    }

    @Test
    public void testClientConnection() throws Exception {

        System.out.println("--------------------------");
    }

    @After
    public void tearDown() throws Exception {
        if (client != null) {
            client.close();
        }

    }

    protected void println(SearchResponse searchResponse) {
        Utils.println(searchResponse);
    }
}

本实例代码已经上传到 Git ElasticsearchXPackClient.java

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

如何有任何问题请关注微信公众号给我留言

qrcode_for_gh_26893aa0a4ea_258.jpg

Elasticsearch 5.6 Java API 中文手册

Elasticsearchquanke 发表了文章 • 1 个评论 • 1497 次浏览 • 2017-11-08 22:30 • 来自相关话题

432952-5448f57c503678f5.jpg
 [Elasticsearch 5.6 Java API 中文手册] 本手册由 全科 翻译,并且整理成电子书,支持PDF,ePub,Mobi格式,方便大家下载阅读。 不只是官方文档的翻译,还包含使用实例,包含我们使用踩过的坑 阅读地址:https://es.quanke.name 下载地址:https://www.gitbook.com/book/q ... -java github地址:https://github.com/quanke/elasticsearch-java 编辑:http://quanke.name 编辑整理辛苦,还望大神们点一下star ,抚平我虚荣的心 [全科的公众号]
08183543_ysUa.jpg
 
432952-5448f57c503678f5.jpg
 [Elasticsearch 5.6 Java API 中文手册] 本手册由 全科 翻译,并且整理成电子书,支持PDF,ePub,Mobi格式,方便大家下载阅读。 不只是官方文档的翻译,还包含使用实例,包含我们使用踩过的坑 阅读地址:https://es.quanke.name 下载地址:https://www.gitbook.com/book/q ... -java github地址:https://github.com/quanke/elasticsearch-java 编辑:http://quanke.name 编辑整理辛苦,还望大神们点一下star ,抚平我虚荣的心 [全科的公众号]
08183543_ysUa.jpg
 

Bulk异常引发的Elasticsearch内存泄漏

Elasticsearchkennywu76 发表了文章 • 7 个评论 • 771 次浏览 • 2017-11-08 18:54 • 来自相关话题

原文链接: http://www.jianshu.com/p/d4f7a6d58008

前天公司度假部门一个线上ElasticSearch集群发出报警,有Data Node的Heap使用量持续超过80%警戒线。 收到报警邮件后,不敢怠慢,立即登陆监控系统查看集群状态。还好,所有的结点都在正常服务,只是有2个结点的Heap使用率非常高。此时,Old GC一直在持续的触发,却无法回收内存。

heap_used.jpg


初步排查

问题结点的Heap分配了30GB,80%的使用率约等于24GB。 但集群的数据总量并不大,5个结点所有索引文件加起来占用的磁盘空间还不到10GB。

GET /_cat/allocation?v&h=shards,disk.indices,disk.used,disk.avail

shards disk.indices disk.used disk.avail
     3        1.9gb    38.3gb     89.7gb
     4        2.2gb    13.4gb    114.6gb
     4        2.5gb    20.3gb    107.7gb
     4        2.3gb    33.9gb     94.1gb
     3        1.7gb    12.8gb    115.2gb

查看各结点的segment memory和cache占用量也都非常小,是MB级别的。

GET /_cat/nodes?v&h=id,port,v,m,fdp,mc,mcs,sc,sm,qcm,fm,im,siwm,svmm

id   port v     m fdp mc     mcs sc     sm     qcm      fm siwm svmm
e1LV 9300 5.3.2 -   1  0      0b 68   69mb   1.5mb   1.9mb   0b 499b
5VnU 9300 5.3.2 -   1  0      0b 75   79mb   1.5mb   1.9mb   0b 622b
_Iob 9300 5.3.2 -   1  0      0b 56 55.7mb   1.3mb 914.1kb   0b 499b
4Kyl 9300 5.3.2 *   1  1 330.1mb 81 84.4mb   1.2mb   1.9mb   0b 622b
XEP_ 9300 5.3.2 -   1  0      0b 45 50.4mb 748.5kb     1mb   0b 622b

集群的QPS只有30上下,CPU消耗10%都不到,各类thread pool的活动线程数量也都非常低。

bulk_search_thread_pool.png

非常费解是什么东西占着20多GB的内存不释放?

出现问题的集群ES版本是5.3.2,而这个版本的稳定性在公司内部已经经过长时间的考验,做为稳定版本在线上进行了大规模部署。 其他一些读写负载非常高的集群也未曾出现过类似的状况,看来是遇到新问题了。

查看问题结点ES的日志,除了看到一些Bulk异常以外,未见特别明显的其他和资源相关的错误:

[2017-11-06T16:33:15,668][DEBUG][o.e.a.b.TransportShardBulkAction] [] [suggest-3][0] failed to execute bulk item (update) BulkShardRequest [[suggest-3][0]] containing [44204
] requests
org.elasticsearch.index.engine.DocumentMissingException: [type][纳格尔果德_1198]: document missing
        at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:92) ~[elasticsearch-5.3.2.jar:5.3.2]
        at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:81) ~[elasticsearch-5.3.2.jar:5.3.2]

和用户确认这些异常的原因,是因为写入程序会从数据源拿到数据后,根据doc_id对ES里的数据做update。会有部分doc_id在ES里不存在的情况,但并不影响业务逻辑,因而ES记录的document missing异常应该可以忽略。

至此别无他法,只能对JVM做Dump分析了。


Heap Dump分析

用的工具是Eclipse MAT,从这里下载的Mac版:Downloads 。 使用这个工具需要经过以下2个步骤:

  • 获取二进制的head dump文件 jmap -dump:format=b,file=/tmp/es_heap.bin <pid> 其中pid是ES JAVA进程的进程号。
  • 将生成的dump文件下载到本地开发机器,启动MAT,从其GUI打开文件。

要注意,MAT本身也是JAVA应用,需要有JDK运行环境的支持。

MAT第一次打dump文件的时候,需要对其解析,生成多个索引。这个过程比较消耗CPU和内存,但一旦完成,之后再打开dump文件就很快,消耗很低。 对于这种20多GB的大文件,第一次解析的过程会非常缓慢,并且很可能因为开发机内存的较少而内存溢出。因此,我找了台大内存的服务器来做第一次的解析工作:

  • 将linux版的MAT拷贝上去,解压缩后,修改配置文件MemoryAnalyzer.ini,将内存设置为20GB左右:

    $ cat MemoryAnalyzer.ini 
    
      -startup
      plugins/org.eclipse.equinox.launcher_1.3.100.v20150511-1540.jar
      --launcher.library
      plugins/org.eclipse.equinox.launcher.gtk.linux.x86_64_1.1.300.v20150602-1417
      -vmargs
      -Xmx20240m

    这样能保证解析的过程中不会内存溢出。

  • 将dump文件拷贝上去,执行下面几个命令生成索引及3个分析报告:
    • mat/ParseHeapDump.sh es_heap.bin org.eclipse.mat.api:suspects
    • mat/ParseHeapDump.sh es_heap.bin org.eclipse.mat.api:overview
    • mat/ParseHeapDump.sh es_heap.bin org.eclipse.mat.api:top_components

分析成功以后,会生成如下一堆索引文件(.index)和分析报告(.zip)

-rw-r--r--@ 1 xgwu  staff    62M Nov  6 16:18 es_heap.a2s.index
-rw-r--r--@ 1 xgwu  staff    25G Nov  6 14:59 es_heap.bin
-rw-r--r--@ 1 xgwu  staff    90M Nov  6 16:21 es_heap.domIn.index
-rw-r--r--@ 1 xgwu  staff   271M Nov  6 16:21 es_heap.domOut.index
-rw-r--r--  1 xgwu  staff   144K Nov  7 18:38 es_heap.i2sv2.index
-rw-r--r--@ 1 xgwu  staff   220M Nov  6 16:18 es_heap.idx.index
-rw-r--r--@ 1 xgwu  staff   356M Nov  6 16:20 es_heap.inbound.index
-rw-r--r--@ 1 xgwu  staff   6.8M Nov  6 16:20 es_heap.index
-rw-r--r--@ 1 xgwu  staff    76M Nov  6 16:18 es_heap.o2c.index
-rw-r--r--@ 1 xgwu  staff   231M Nov  6 16:20 es_heap.o2hprof.index
-rw-r--r--@ 1 xgwu  staff   206M Nov  6 16:21 es_heap.o2ret.index
-rw-r--r--@ 1 xgwu  staff   353M Nov  6 16:20 es_heap.outbound.index
-rw-r--r--@ 1 xgwu  staff   399K Nov  6 16:16 es_heap.threads
-rw-r--r--@ 1 xgwu  staff    89K Nov  7 17:40 es_heap_Leak_Suspects.zip
-rw-r--r--@ 1 xgwu  staff    78K Nov  6 19:22 es_heap_System_Overview.zip
-rw-r--r--@ 1 xgwu  staff   205K Nov  6 19:22 es_heap_Top_Components.zip
drwxr-xr-x@ 3 xgwu  staff    96B Nov  6 16:15 workspace

将这些文件打包下载到本地机器上,用MAT GUI打开就可以分析了。

在MAT里打开dump文件的时候,可以选择打开已经生成好的报告,比如Leak suspects: 选择打开leak Suspects报告

通过Leak Suspects,一眼看到这20多GB内存主要是被一堆bulk线程实例占用了,每个实例则占用了接近1.5GB的内存。 Leak Suspects

进入"dominator_tree"面板,按照"Retained Heap"排序,可以看到多个bulk线程的内存占用都非常高。 Dominator Tree

将其中一个thread的引用链条展开,看看这些线程是如何Retain这么多内存的,特别注意红圈部分: 对象引用链

这个引用关系解读如下:

  1. 这个bulk线程的thread local map里保存了一个log4j的MultableLogEvent对象。
  2. MutablelogEvent对象引用了log4j的ParameterizedMessage对象。
  3. ParameterizedMessage引用了bulkShardRequest对象。
  4. bulkShardRequest引用了4万多个BulkitemRequest对象。

这样看下来,似乎是log4j的logevent对一个大的bulk请求对象有强引用而导致其无法被垃圾回收掉,产生内存泄漏。

联想到ES日志里,有记录一些document missing的bulk异常,猜测是否在记录这些异常的时候产生的泄漏。


问题复现

为了验证猜测,我在本地开发机上,启动了一个单结点的5.3.2测试集群,用bulk api做批量的update,并且有意为其中1个update请求设置不存在的doc_id。为了便于测试,我在ES的配置文件elasticsearch.yml里添加了配置项processors: 1。 这个配置项影响集群thread_pool的配置,bulk thread pool的大小将减少为1个,这样可以更快速和便捷的做各类验证。

启动集群,发送完bulk请求后,立即做一个dump,重复之前的分析过程,问题得到了复现。 这时候想,是否其他bulk异常也会引起同样的问题,比如写入的数据和mapping不匹配? 测试了一下,问题果然还是会产生。再用不同的bulk size进行测试,发现无法回收的这段内存大小,取决于最后一次抛过异常的bulk size大小。至此,基本可以确定内存泄漏与log4j记录异常消息的逻辑有关系。

为了搞清楚这个问题是否5.3.2独有,后续版本是否有修复,在最新的5.6.3上做了同样的测试,问题依旧,因此这应该是一个还未发现的深层Bug.


读源码查根源

大致搞清楚问题查找的方向了,但根源还未找到,也就不知道如何修复和避免,只有去扒源码了。 在TransportShardBulkAction 第209行,找到了ES日志里抛异常的代码片段。

 if (isConflictException(failure)) {
     logger.trace((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
             request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
 } else {
     logger.debug((Supplier<?>) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}",
             request.shardId(), docWriteRequest.opType().getLowercase(), request), failure);
 }

这里看到了ParameterizedMessage实例化过程中,request做为一个参数传入了。这里的request是一个BulkShardRequest对象,保存的是要写入到一个shard的一批bulk item request。 这样以来,一个批次写入的请求数量越多,这个对象retain的内存就越多。 可问题是,为什么logger.debug()调用完毕以后,这个引用不会被释放?

通过和之前MAT上的dominator tree仔细对比,可以看到ParameterizedMessage之所以无法释放,是因为被一个MutableLogEvent在引用,而这个MutableLogEvent被做为一个thread local存放起来了。 由于ES的Bulk thread pool是fix size的,也就是预先创建好,不会销毁和再创建。 那么这些MutableLogEvent对象由于是thread local的,只要线程没有销毁,就会对该线程实例一直全局存在,并且其还会一直引用最后一次处理过的ParameterizedMessage。 所以在ES记录bulk exception这种比较大的请求情况下, 整个request对象会被thread local变量一直强引用无法释放,产生大量的内存泄漏。

再继续挖一下log4j的源码,发现MutableLogEvent是在org.apache.logging.log4j.core.impl.ReusableLogEventFactory里做为thread local创建的。

public class ReusableLogEventFactory implements LogEventFactory {
    private static final ThreadNameCachingStrategy THREAD_NAME_CACHING_STRATEGY = ThreadNameCachingStrategy.create();
    private static final Clock CLOCK = ClockFactory.getClock();

    private static ThreadLocal<MutableLogEvent> mutableLogEventThreadLocal = new ThreadLocal<>();

org.apache.logging.log4j.core.config.LoggerConfig则根据一个常数ENABLE_THREADLOCALS的值来决定用哪个LogEventFactory。

        if (LOG_EVENT_FACTORY == null) {
            LOG_EVENT_FACTORY = Constants.ENABLE_THREADLOCALS
                    ? new ReusableLogEventFactory()
                    : new DefaultLogEventFactory();
        }

继续深挖,在org.apache.logging.log4j.util.Constants里看到,log4j会根据运行环境判断是否是WEB应用,如果不是,就从系统参数log4j2.enable.threadlocals读取这个常量,如果没有设置,则默认值是true

public static final boolean ENABLE_THREADLOCALS = !IS_WEB_APP && PropertiesUtil.getProperties().getBooleanProperty(
            "log4j2.enable.threadlocals", true);

由于ES不是一个web应用,导致log4j选择使用了ReusableLogEventFactory,因而使用了thread_local来创建MutableLogEvent对象,最终在ES记录bulk exception这个特殊场景下产生非常显著的内存泄漏。

再问一个问题,为何log4j要将logevent做为thread local创建? 跑到log4j的官网去扒了一下文档,在这里 Garbage-free Steady State Logging 找到了合理的解释。 原来为了减少记录日志过程中的反复创建的对象数量,减轻GC压力从而提高性能,log4j有很多地方使用了thread_local来重用变量。 但使用thread local字段装载非JDK类,可能会产生内存泄漏问题,特别是对于web应用。 因此才会在启动的时候判断运行环境,对于web应用会禁用thread local类型的变量。

ThreadLocal fields holding non-JDK classes can cause memory leaks in web applications when the application server's thread pool continues to reference these fields after the web application is undeployed. To avoid causing memory leaks, Log4j will not use these ThreadLocals when it detects that it is used in a web application (when the javax.servlet.Servlet class is in the classpath, or when system property log4j2.is.webapp is set to "true").

参考上面的文档后,也为ES找到了规避这个问题的措施: 在ES的JVM配置文件jvm.options里,添加一个log4j的系统变量-Dlog4j2.enable.threadlocals=false,禁用掉thread local即可。 经过测试,该选项可以有效避开这个内存泄漏问题。

这个问题Github上也提交了Issue,对应的链接是: Memory leak upon partial TransportShardBulkAction failure


写在最后

ES的确是非常复杂的一个系统,包含非常多的模块和第三方组件,可以支持很多想象不到的用例场景,但一些边缘场景可能会引发一些难以排查的问题。完备的监控体系和一个经验丰富的支撑团队对于提升业务开发人员使用ES开发的效率、提升业务的稳定性是非常重要的!  

【京东商城】ES高级工程师

求职招聘whh32 发表了文章 • 2 个评论 • 789 次浏览 • 2017-11-07 13:53 • 来自相关话题

工作地点:北京 薪资待遇:25k ~ 40k 工作内容: 1、开发、维护ES及相应管理后台 2、ElasticSearch集群的配置管理及优化 3、个性化功能及插件开发。 职位要求: 1、本科以上学历,4年以上工作经验。 2、精通Java,熟悉各种中间件技术及常用框架。 3、熟悉Elasticsearch,有相应开发维护经验者优先。 京东正大力推进Elasticsearch的使用场景,目前已有数千个实例,每日新增数据百T,日查询量千亿级别,技术氛围好,发展潜力大。欢迎您的加入~ 欢迎投递简历至:wanghanghang@jd.com  
工作地点:北京 薪资待遇:25k ~ 40k 工作内容: 1、开发、维护ES及相应管理后台 2、ElasticSearch集群的配置管理及优化 3、个性化功能及插件开发。 职位要求: 1、本科以上学历,4年以上工作经验。 2、精通Java,熟悉各种中间件技术及常用框架。 3、熟悉Elasticsearch,有相应开发维护经验者优先。 京东正大力推进Elasticsearch的使用场景,目前已有数千个实例,每日新增数据百T,日查询量千亿级别,技术氛围好,发展潜力大。欢迎您的加入~ 欢迎投递简历至:wanghanghang@jd.com  

ElasticSearch 集群监控

Elasticsearchzhisheng 发表了文章 • 2 个评论 • 557 次浏览 • 2017-11-07 00:41 • 来自相关话题

原文地址:http://www.54tianzhisheng.cn/2017/10/15/ElasticSearch-cluster-health-metrics/  

最近在做 ElasticSearch 的信息(集群和节点)监控,特此稍微整理下学到的东西。这篇文章主要介绍集群的监控。

要监控哪些 ElasticSearch metrics

Elasticsearch 提供了大量的 Metric,可以帮助您检测到问题的迹象,在遇到节点不可用、out-of-memory、long garbage collection times 的时候采取相应措施。但是指标太多了,有时我们并不需要这么多,这就需要我们进行筛选。

集群健康

一个 Elasticsearch 集群至少包括一个节点和一个索引。或者它 可能有一百个数据节点、三个单独的主节点,以及一小打客户端节点——这些共同操作一千个索引(以及上万个分片)。

不管集群扩展到多大规模,你都会想要一个快速获取集群状态的途径。Cluster Health API 充当的就是这个角色。你可以把它想象成是在一万英尺的高度鸟瞰集群。它可以告诉你安心吧一切都好,或者警告你集群某个地方有问题。

让我们执行一下 cluster-health API 然后看看响应体是什么样子的:

GET _cluster/health

和 Elasticsearch 里其他 API 一样,cluster-health 会返回一个 JSON 响应。这对自动化和告警系统来说,非常便于解析。响应中包含了和你集群有关的一些关键信息:

{
   "cluster_name": "elasticsearch_zach",
   "status": "green",
   "timed_out": false,
   "number_of_nodes": 1,
   "number_of_data_nodes": 1,
   "active_primary_shards": 10,
   "active_shards": 10,
   "relocating_shards": 0,
   "initializing_shards": 0,
   "unassigned_shards": 0
}

响应信息中最重要的一块就是 status 字段。状态可能是下列三个值之一 :

status 含义
green 所有的主分片和副本分片都已分配。你的集群是 100% 可用的。
yellow 所有的主分片已经分片了,但至少还有一个副本是缺失的。不会有数据丢失,所以搜索结果依然是完整的。不过,你的高可用性在某种程度上被弱化。如果 更多的 分片消失,你就会丢数据了。把 yellow 想象成一个需要及时调查的警告。
red 至少一个主分片(以及它的全部副本)都在缺失中。这意味着你在缺少数据:搜索只能返回部分数据,而分配到这个分片上的写入请求会返回一个异常。
  • number_of_nodesnumber_of_data_nodes 这个命名完全是自描述的。
  • active_primary_shards 指出你集群中的主分片数量。这是涵盖了所有索引的汇总值。
  • active_shards 是涵盖了所有索引的所有分片的汇总值,即包括副本分片。
  • relocating_shards 显示当前正在从一个节点迁往其他节点的分片的数量。通常来说应该是 0,不过在 Elasticsearch 发现集群不太均衡时,该值会上涨。比如说:添加了一个新节点,或者下线了一个节点。
  • initializing_shards 是刚刚创建的分片的个数。比如,当你刚创建第一个索引,分片都会短暂的处于 initializing 状态。这通常会是一个临时事件,分片不应该长期停留在 initializing状态。你还可能在节点刚重启的时候看到 initializing 分片:当分片从磁盘上加载后,它们会从initializing 状态开始。
  • unassigned_shards 是已经在集群状态中存在的分片,但是实际在集群里又找不着。通常未分配分片的来源是未分配的副本。比如,一个有 5 分片和 1 副本的索引,在单节点集群上,就会有 5 个未分配副本分片。如果你的集群是 red 状态,也会长期保有未分配分片(因为缺少主分片)。

集群统计

集群统计信息包含 集群的分片数,文档数,存储空间,缓存信息,内存作用率,插件内容,文件系统内容,JVM 作用状况,系统 CPU,OS 信息,段信息。

查看全部统计信息命令:

curl -XGET 'http://localhost:9200/_cluster/stats?human&pretty'

返回 JSON 结果:

{
   "timestamp": 1459427693515,
   "cluster_name": "elasticsearch",
   "status": "green",
   "indices": {
      "count": 2,
      "shards": {
         "total": 10,
         "primaries": 10,
         "replication": 0,
         "index": {
            "shards": {
               "min": 5,
               "max": 5,
               "avg": 5
            },
            "primaries": {
               "min": 5,
               "max": 5,
               "avg": 5
            },
            "replication": {
               "min": 0,
               "max": 0,
               "avg": 0
            }
         }
      },
      "docs": {
         "count": 10,
         "deleted": 0
      },
      "store": {
         "size": "16.2kb",
         "size_in_bytes": 16684,
         "throttle_time": "0s",
         "throttle_time_in_millis": 0
      },
      "fielddata": {
         "memory_size": "0b",
         "memory_size_in_bytes": 0,
         "evictions": 0
      },
      "query_cache": {
         "memory_size": "0b",
         "memory_size_in_bytes": 0,
         "total_count": 0,
         "hit_count": 0,
         "miss_count": 0,
         "cache_size": 0,
         "cache_count": 0,
         "evictions": 0
      },
      "completion": {
         "size": "0b",
         "size_in_bytes": 0
      },
      "segments": {
         "count": 4,
         "memory": "8.6kb",
         "memory_in_bytes": 8898,
         "terms_memory": "6.3kb",
         "terms_memory_in_bytes": 6522,
         "stored_fields_memory": "1.2kb",
         "stored_fields_memory_in_bytes": 1248,
         "term_vectors_memory": "0b",
         "term_vectors_memory_in_bytes": 0,
         "norms_memory": "384b",
         "norms_memory_in_bytes": 384,
         "doc_values_memory": "744b",
         "doc_values_memory_in_bytes": 744,
         "index_writer_memory": "0b",
         "index_writer_memory_in_bytes": 0,
         "version_map_memory": "0b",
         "version_map_memory_in_bytes": 0,
         "fixed_bit_set": "0b",
         "fixed_bit_set_memory_in_bytes": 0,
         "file_sizes": {}
      },
      "percolator": {
         "num_queries": 0
      }
   },
   "nodes": {
      "count": {
         "total": 1,
         "data": 1,
         "coordinating_only": 0,
         "master": 1,
         "ingest": 1
      },
      "versions": [
         "5.6.3"
      ],
      "os": {
         "available_processors": 8,
         "allocated_processors": 8,
         "names": [
            {
               "name": "Mac OS X",
               "count": 1
            }
         ],
         "mem" : {
            "total" : "16gb",
            "total_in_bytes" : 17179869184,
            "free" : "78.1mb",
            "free_in_bytes" : 81960960,
            "used" : "15.9gb",
            "used_in_bytes" : 17097908224,
            "free_percent" : 0,
            "used_percent" : 100
         }
      },
      "process": {
         "cpu": {
            "percent": 9
         },
         "open_file_descriptors": {
            "min": 268,
            "max": 268,
            "avg": 268
         }
      },
      "jvm": {
         "max_uptime": "13.7s",
         "max_uptime_in_millis": 13737,
         "versions": [
            {
               "version": "1.8.0_74",
               "vm_name": "Java HotSpot(TM) 64-Bit Server VM",
               "vm_version": "25.74-b02",
               "vm_vendor": "Oracle Corporation",
               "count": 1
            }
         ],
         "mem": {
            "heap_used": "57.5mb",
            "heap_used_in_bytes": 60312664,
            "heap_max": "989.8mb",
            "heap_max_in_bytes": 1037959168
         },
         "threads": 90
      },
      "fs": {
         "total": "200.6gb",
         "total_in_bytes": 215429193728,
         "free": "32.6gb",
         "free_in_bytes": 35064553472,
         "available": "32.4gb",
         "available_in_bytes": 34802409472
      },
      "plugins": [
        {
          "name": "analysis-icu",
          "version": "5.6.3",
          "description": "The ICU Analysis plugin integrates Lucene ICU module into elasticsearch, adding ICU relates analysis components.",
          "classname": "org.elasticsearch.plugin.analysis.icu.AnalysisICUPlugin",
          "has_native_controller": false
        },
        {
          "name": "ingest-geoip",
          "version": "5.6.3",
          "description": "Ingest processor that uses looksup geo data based on ip adresses using the Maxmind geo database",
          "classname": "org.elasticsearch.ingest.geoip.IngestGeoIpPlugin",
          "has_native_controller": false
        },
        {
          "name": "ingest-user-agent",
          "version": "5.6.3",
          "description": "Ingest processor that extracts information from a user agent",
          "classname": "org.elasticsearch.ingest.useragent.IngestUserAgentPlugin",
          "has_native_controller": false
        }
      ]
   }
}

内存使用和 GC 指标

在运行 Elasticsearch 时,内存是您要密切监控的关键资源之一。 Elasticsearch 和 Lucene 以两种方式利用节点上的所有可用 RAM:JVM heap 和文件系统缓存。 Elasticsearch 运行在Java虚拟机(JVM)中,这意味着JVM垃圾回收的持续时间和频率将成为其他重要的监控领域。

上面返回的 JSON监控的指标有我个人觉得有这些:

  • nodes.successful
  • nodes.failed
  • nodes.total
  • nodes.mem.used_percent
  • nodes.process.cpu.percent
  • nodes.jvm.mem.heap_used

可以看到 JSON 文件是很复杂的,如果从这复杂的 JSON 中获取到对应的指标(key)的值呢,这里请看文章 :JsonPath —— JSON 解析神器

最后

这里主要讲下 ES 集群的一些监控信息,有些监控指标是个人觉得需要监控的,但是具体情况还是得看需求了。下篇文章主要讲节点的监控信息。转载请注明地址:http://www.54tianzhisheng.cn/2017/10/15/ElasticSearch-cluster-health-metrics/

参考资料

1、How to monitor Elasticsearch performance

2、ElasticSearch 性能监控

3、cluster-health

4、cluster-stats

相关阅读

1、Elasticsearch 默认分词器和中分分词器之间的比较及使用方法

2、全文搜索引擎 Elasticsearch 集群搭建入门教程

ElasticSearch 单个节点监控

Elasticsearchzhisheng 发表了文章 • 1 个评论 • 375 次浏览 • 2017-11-07 00:39 • 来自相关话题

原文地址:http://www.54tianzhisheng.cn/2017/10/18/ElasticSearch-nodes-metrics/  

集群健康监控是对集群信息进行高度的概括,节点统计值 API 提供了集群中每个节点的统计值。节点统计值很多,在监控的时候仍需要我们清楚哪些指标是最值得关注的。

集群健康监控可以参考这篇文章:ElasticSearch 集群监控

节点信息   Node Info :

curl -XGET 'http://localhost:9200/_nodes'

执行上述命令可以获取所有 node 的信息

_nodes: {
  total: 2,
  successful: 2,
  failed: 0
},
cluster_name: "elasticsearch",
nodes: {
    MSQ_CZ7mTNyOSlYIfrvHag: {
    name: "node0",
    transport_address: "192.168.180.110:9300",
    host: "192.168.180.110",
    ip: "192.168.180.110",
    version: "5.5.0",
    build_hash: "260387d",
    total_indexing_buffer: 103887667,
    roles:{...},
    settings: {...},
    os: {
      refresh_interval_in_millis: 1000,
      name: "Linux",
      arch: "amd64",
      version: "3.10.0-229.el7.x86_64",
      available_processors: 4,
      allocated_processors: 4
    },
    process: {
      refresh_interval_in_millis: 1000,
      id: 3022,
      mlockall: false
    },
    jvm: {
      pid: 3022,
      version: "1.8.0_121",
      vm_name: "Java HotSpot(TM) 64-Bit Server VM",
      vm_version: "25.121-b13",
      vm_vendor: "Oracle Corporation",
      start_time_in_millis: 1507515225302,
      mem: {
      heap_init_in_bytes: 1073741824,
      heap_max_in_bytes: 1038876672,
      non_heap_init_in_bytes: 2555904,
      non_heap_max_in_bytes: 0,
      direct_max_in_bytes: 1038876672
      },
      gc_collectors: [],
      memory_pools: [],
      using_compressed_ordinary_object_pointers: "true",
      input_arguments:{}
    }
    thread_pool:{
      force_merge: {},
      fetch_shard_started: {},
      listener: {},
      index: {},
      refresh: {},
      generic: {},
      warmer: {},
      search: {},
      flush: {},
      fetch_shard_store: {},
      management: {},
      get: {},
      bulk: {},
      snapshot: {}
    }
    transport: {...},
    http: {...},
    plugins: [],
    modules: [],
    ingest: {...}
 }

上面是我已经简写了很多数据之后的返回值,但是指标还是很多,有些是一些常规的指标,对于监控来说,没必要拿取。从上面我们可以主要关注以下这些指标:

os, process, jvm, thread_pool, transport, http, ingest and indices

节点统计     nodes-statistics

节点统计值 API 可通过如下命令获取:

GET /_nodes/stats

得到:

_nodes: {
  total: 2,
  successful: 2,
  failed: 0
},
cluster_name: "elasticsearch",
nodes: {
  MSQ_CZ7mTNyOSlYI0yvHag: {
    timestamp: 1508312932354,
    name: "node0",
    transport_address: "192.168.180.110:9300",
    host: "192.168.180.110",
    ip: "192.168.180.110:9300",
    roles: [],
    indices: {
      docs: {
           count: 6163666,
           deleted: 0
        },
      store: {
           size_in_bytes: 2301398179,
           throttle_time_in_millis: 122850
        },
      indexing: {},
      get: {},
      search: {},
      merges: {},
      refresh: {},
      flush: {},
      warmer: {},
      query_cache: {},
      fielddata: {},
      completion: {},
      segments: {},
      translog: {},
      request_cache: {},
      recovery: {}
  },
  os: {
    timestamp: 1508312932369,
    cpu: {
      percent: 0,
      load_average: {
        1m: 0.09,
        5m: 0.12,
        15m: 0.08
      }
    },
    mem: {
      total_in_bytes: 8358301696,
      free_in_bytes: 1381613568,
      used_in_bytes: 6976688128,
      free_percent: 17,
      used_percent: 83
    },
    swap: {
      total_in_bytes: 8455712768,
      free_in_bytes: 8455299072,
      used_in_bytes: 413696
    },
    cgroup: {
      cpuacct: {},
      cpu: {
        control_group: "/user.slice",
        cfs_period_micros: 100000,
        cfs_quota_micros: -1,
        stat: {}
      }
  }
},
process: {
  timestamp: 1508312932369,
  open_file_descriptors: 228,
  max_file_descriptors: 65536,
  cpu: {
    percent: 0,
    total_in_millis: 2495040
  },
  mem: {
    total_virtual_in_bytes: 5002465280
  }
},
jvm: {
  timestamp: 1508312932369,
  uptime_in_millis: 797735804,
  mem: {
    heap_used_in_bytes: 318233768,
    heap_used_percent: 30,
    heap_committed_in_bytes: 1038876672,
    heap_max_in_bytes: 1038876672,
    non_heap_used_in_bytes: 102379784,
    non_heap_committed_in_bytes: 108773376,
  pools: {
    young: {
      used_in_bytes: 62375176,
      max_in_bytes: 279183360,
      peak_used_in_bytes: 279183360,
      peak_max_in_bytes: 279183360
    },
    survivor: {
      used_in_bytes: 175384,
      max_in_bytes: 34865152,
      peak_used_in_bytes: 34865152,
      peak_max_in_bytes: 34865152
    },
    old: {
      used_in_bytes: 255683208,
      max_in_bytes: 724828160,
      peak_used_in_bytes: 255683208,
      peak_max_in_bytes: 724828160
    }
  }
  },
  threads: {},
  gc: {},
  buffer_pools: {},
  classes: {}
},
  thread_pool: {
    bulk: {},
    fetch_shard_started: {},
    fetch_shard_store: {},
    flush: {},
    force_merge: {},
    generic: {},
    get: {},
    index: {
       threads: 1,
       queue: 0,
       active: 0,
       rejected: 0,
       largest: 1,
       completed: 1
    }
    listener: {},
    management: {},
    refresh: {},
    search: {},
    snapshot: {},
    warmer: {}
  },
  fs: {},
  transport: {
    server_open: 13,
    rx_count: 11696,
    rx_size_in_bytes: 1525774,
    tx_count: 10282,
    tx_size_in_bytes: 1440101928
  },
  http: {
    current_open: 4,
    total_opened: 23
  },
  breakers: {},
  script: {},
  discovery: {},
  ingest: {}
}

节点名是一个 UUID,上面列举了很多指标,下面讲解下:

索引部分 indices

这部分列出了这个节点上所有索引的聚合过的统计值 :

  • docs 展示节点内存有多少文档,包括还没有从段里清除的已删除文档数量。

  • store 部分显示节点耗用了多少物理存储。这个指标包括主分片和副本分片在内。如果限流时间很大,那可能表明你的磁盘限流设置得过低。

  • indexing 显示已经索引了多少文档。这个值是一个累加计数器。在文档被删除的时候,数值不会下降。还要注意的是,在发生内部 索引操作的时候,这个值也会增加,比如说文档更新。

  还列出了索引操作耗费的时间,正在索引的文档数量,以及删除操作的类似统计值。

  • get 显示通过 ID 获取文档的接口相关的统计值。包括对单个文档的 GETHEAD 请求。

  • search 描述在活跃中的搜索( open_contexts )数量、查询的总数量、以及自节点启动以来在查询上消耗的总时间。用 query_time_in_millis / query_total 计算的比值,可以用来粗略的评价你的查询有多高效。比值越大,每个查询花费的时间越多,你应该要考虑调优了。

  fetch 统计值展示了查询处理的后一半流程(query-then-fetch 里的 fetch )。如果 fetch 耗时比 query 还多,说明磁盘较慢,或者获取了太多文档,或者可能搜索请求设置了太大的分页(比如, size: 10000 )。

  • merges 包括了 Lucene 段合并相关的信息。它会告诉你目前在运行几个合并,合并涉及的文档数量,正在合并的段的总大小,以及在合并操作上消耗的总时间。

  • filter_cache 展示了已缓存的过滤器位集合所用的内存数量,以及过滤器被驱逐出内存的次数。过多的驱逐数 可能 说明你需要加大过滤器缓存的大小,或者你的过滤器不太适合缓存(比如它们因为高基数而在大量产生,就像是缓存一个 now 时间表达式)。

  不过,驱逐数是一个很难评定的指标。过滤器是在每个段的基础上缓存的,而从一个小的段里驱逐过滤器,代价比从一个大的段里要廉价的多。有可能你有很大的驱逐数,但是它们都发生在小段上,也就意味着这些对查询性能只有很小的影响。

  把驱逐数指标作为一个粗略的参考。如果你看到数字很大,检查一下你的过滤器,确保他们都是正常缓存的。不断驱逐着的过滤器,哪怕都发生在很小的段上,效果也比正确缓存住了的过滤器差很多。

  • field_data 显示 fielddata 使用的内存, 用以聚合、排序等等。这里也有一个驱逐计数。和 filter_cache 不同的是,这里的驱逐计数是很有用的:这个数应该或者至少是接近于 0。因为 fielddata 不是缓存,任何驱逐都消耗巨大,应该避免掉。如果你在这里看到驱逐数,你需要重新评估你的内存情况,fielddata 限制,请求语句,或者这三者。

  • segments 会展示这个节点目前正在服务中的 Lucene 段的数量。 这是一个重要的数字。大多数索引会有大概 50–150 个段,哪怕它们存有 TB 级别的数十亿条文档。段数量过大表明合并出现了问题(比如,合并速度跟不上段的创建)。注意这个统计值是节点上所有索引的汇聚总数。记住这点。

  memory 统计值展示了 Lucene 段自己用掉的内存大小。 这里包括底层数据结构,比如倒排表,字典,和布隆过滤器等。太大的段数量会增加这些数据结构带来的开销,这个内存使用量就是一个方便用来衡量开销的度量值。

操作系统和进程部分

OSProcess 部分基本是自描述的,不会在细节中展开讲解。它们列出来基础的资源统计值,比如 CPU 和负载。OS 部分描述了整个操作系统,而 Process 部分只显示 Elasticsearch 的 JVM 进程使用的资源情况。

这些都是非常有用的指标,不过通常在你的监控技术栈里已经都测量好了。统计值包括下面这些:

  • CPU
  • 负载
  • 内存使用率 (mem.used_percent)
  • Swap 使用率
  • 打开的文件描述符 (open_file_descriptors)

JVM 部分

jvm 部分包括了运行 Elasticsearch 的 JVM 进程一些很关键的信息。 最重要的,它包括了垃圾回收的细节,这对你的 Elasticsearch 集群的稳定性有着重大影响。

jvm: {
  timestamp: 1508312932369,
  uptime_in_millis: 797735804,
  mem: {
    heap_used_in_bytes: 318233768,
    heap_used_percent: 30,
    heap_committed_in_bytes: 1038876672,
    heap_max_in_bytes: 1038876672,
    non_heap_used_in_bytes: 102379784,
    non_heap_committed_in_bytes: 108773376,
  }
}

jvm 部分首先列出一些和 heap 内存使用有关的常见统计值。你可以看到有多少 heap 被使用了,多少被指派了(当前被分配给进程的),以及 heap 被允许分配的最大值。理想情况下,heap_committed_in_bytes 应该等于 heap_max_in_bytes 。如果指派的大小更小,JVM 最终会被迫调整 heap 大小——这是一个非常昂贵的操作。如果你的数字不相等,阅读 堆内存:大小和交换 学习如何正确的配置它。

heap_used_percent 指标是值得关注的一个数字。Elasticsearch 被配置为当 heap 达到 75% 的时候开始 GC。如果你的节点一直 >= 75%,你的节点正处于 内存压力 状态。这是个危险信号,不远的未来可能就有慢 GC 要出现了。

如果 heap 使用率一直 >=85%,你就麻烦了。Heap 在 90–95% 之间,则面临可怕的性能风险,此时最好的情况是长达 10–30s 的 GC,最差的情况就是内存溢出(OOM)异常。

线程池部分

Elasticsearch 在内部维护了线程池。 这些线程池相互协作完成任务,有必要的话相互间还会传递任务。通常来说,你不需要配置或者调优线程池,不过查看它们的统计值有时候还是有用的,可以洞察你的集群表现如何。

每个线程池会列出已配置的线程数量( threads ),当前在处理任务的线程数量( active ),以及在队列中等待处理的任务单元数量( queue )。

如果队列中任务单元数达到了极限,新的任务单元会开始被拒绝,你会在 rejected 统计值上看到它反映出来。这通常是你的集群在某些资源上碰到瓶颈的信号。因为队列满意味着你的节点或集群在用最高速度运行,但依然跟不上工作的蜂拥而入。

这里的一系列的线程池,大多数你可以忽略,但是有一小部分还是值得关注的:

  • indexing    普通的索引请求的线程池
  • bulk    批量请求,和单条的索引请求不同的线程池
  • get     Get-by-ID 操作
  • search    所有的搜索和查询请求
  • merging   专用于管理 Lucene 合并的线程池

网络部分

  • transport 显示和 传输地址 相关的一些基础统计值。包括节点间的通信(通常是 9300 端口)以及任意传输客户端或者节点客户端的连接。如果看到这里有很多连接数不要担心;Elasticsearch 在节点之间维护了大量的连接。
  • http 显示 HTTP 端口(通常是 9200)的统计值。如果你看到 total_opened 数很大而且还在一直上涨,这是一个明确信号,说明你的 HTTP 客户端里有没启用 keep-alive 长连接的。持续的 keep-alive 长连接对性能很重要,因为连接、断开套接字是很昂贵的(而且浪费文件描述符)。请确认你的客户端都配置正确。

参考资料

1、nodes-info

2、nodes-stats

3、ES监控指标

最后:

转载请注明地址:http://www.54tianzhisheng.cn/2017/10/18/ElasticSearch-nodes-metrics/

[分享]Kibana,Elasticsearch 指令速查

资料分享medcl 发表了文章 • 3 个评论 • 755 次浏览 • 2017-10-31 14:10 • 来自相关话题

Elasticsearch_CheatSheet.jpg
Kibana_CheatSheet.jpg
 
Elasticsearch_CheatSheet.jpg
Kibana_CheatSheet.jpg