如何在搜索结果中加入概率?
Elasticsearch • medcl 回复了问题 • 3 人关注 • 2 个回复 • 1513 次浏览 • 2018-12-11 13:03
logstash日志grok然后output问题
Logstash • rochy 回复了问题 • 2 人关注 • 1 个回复 • 3863 次浏览 • 2018-12-11 11:16
Day 11 -父子关系维护检索实战一 - Elasticsearch 5.x-父子关系维护
Elasticsearch • yinbp 发表了文章 • 0 个评论 • 4632 次浏览 • 2018-12-11 10:00
- 父子关系维护检索实战一 Elasticsearch 5.x 父子关系维护检索实战
- 父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检索实战
本文是其中第一篇- Elasticsearch 5.x 父子关系维护检索实战,涵盖以下部分内容:
- Elasticsearch 5.x 中父子关系mapping结构设计
- Elasticsearch 5.x 中维护父子关系数据
- Elasticsearch 5.x 中has_child和has_parent查询的基本用法
- Elasticsearch 5.x 中如何在检索中同时返回父子数据
案例说明
以一个体检记录相关的数据来介绍本文涉及的相关功能,体检数据包括客户基本信息basic和客户医疗记录medical、客户体检记录exam、客户体检结果分析记录diagnosis,它们之间的关系图如下:
我们采用Elasticsearch java客户端 bboss-elastic 来实现本文相关功能。
1.准备工作
参考文档《高性能elasticsearch ORM开发库使用介绍》导入和配置bboss客户端
2.定义mapping结构-Elasticsearch 5.x 中父子关系mapping结构设计
Elasticsearch 5.x中一个indice mapping支持多个mapping type,通过在子类型mapping中指定父类型的mapping type名字来设置父子关系,例如:
父类型
"basic": {
....
}
子类型:
"medical": {
"_parent": { "type": "basic" },
.................
}
新建dsl配置文件-esmapper/Client_Info.xml,定义完整的mapping结构:createClientIndice
<properties>
<!--
创建客户信息索引索引表
-->
<property name="createClientIndice">
<![CDATA[{
"settings": {
"number_of_shards": 6,
"index.refresh_interval": "5s"
},
"mappings": {
"basic": { ##基本信息
"properties": {
"party_id": {
"type": "keyword"
},
"sex": {
"type": "keyword"
},
"mari_sts": {
"type": "keyword"
},
"ethnic": {
"type": "text"
},
"prof": {
"type": "text"
},
"province": {
"type": "text"
},
"city": {
"type": "text"
},
"client_type": {
"type": "keyword"
},
"client_name": {
"type": "text"
},
"age": {
"type": "integer"
},
"id_type": {
"type": "keyword"
},
"idno": {
"type": "keyword"
},
"education": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"birth_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"diagnosis": { ##结果分析
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"provider": {
"type": "text"
},
"subject": {
"type": "text"
},
"diagnosis_type": {
"type": "text"
},
"icd10_code": {
"type": "text",
"type": "keyword"
},
"sd_disease_name": {
"type": "text",
"type": "keyword"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"medical": { ##医疗情况
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hos_name_yb": {
"type": "text"
},
"eivisions_name": {
"type": "text"
},
"medical_type": {
"type": "text"
},
"medical_common_name": {
"type": "text"
},
"medical_sale_name": {
"type": "text"
},
"medical_code": {
"type": "text"
},
"specification": {
"type": "text"
},
"usage_num": {
"type": "text"
},
"unit": {
"type": "text"
},
"usage_times": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
},
"exam": { ##检查结果
"_parent": {
"type": "basic"
},
"properties": {
"party_id": {
"type": "keyword"
},
"hospital": {
"type": "text"
},
"dept": {
"type": "text"
},
"is_ok": {
"type": "text"
},
"exam_result": {
"type": "text"
},
"fld1": {
"type": "text"
},
"fld2": {
"type": "text"
},
"fld3": {
"type": "text"
},
"fld4": {
"type": "text"
},
"fld5": {
"type": "text"
},
"fld901": {
"type": "text"
},
"fld6": {
"type": "text"
},
"fld902": {
"type": "text"
},
"fld14": {
"type": "text"
},
"fld20": {
"type": "text"
},
"fld21": {
"type": "text"
},
"fld23": {
"type": "text"
},
"fld24": {
"type": "text"
},
"fld65": {
"type": "text"
},
"fld66": {
"type": "text"
},
"fld67": {
"type": "text"
},
"fld68": {
"type": "text"
},
"created_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"last_modified_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
},
"etl_date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd'T'HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||epoch_millis"
}
}
}
}
}]]>
</property>
</properties>
这个mapping中定义了4个索引类型:basic,exam,medical,diagnosis,其中basic是其他类型的父类型。
通过bboss客户端创建名称为client_info 的索引:
public void createClientIndice(){
//定义客户端实例,加载上面建立的dsl配置文件
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
try {
//client_info存在返回true,不存在返回false
boolean exist = clientUtil.existIndice("client_info");
//如果索引表client_info已经存在先删除mapping
if(exist) {//先删除mapping client_info
clientUtil.dropIndice("client_info");
}
} catch (ElasticSearchException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//创建mapping client_info
clientUtil.createIndiceMapping("client_info","createClientIndice");
String client_info = clientUtil.getIndice("client_info");//获取最新建立的索引表结构client_info
System.out.println("after createClientIndice clientUtil.getIndice(\"client_info\") response:"+client_info);
}
3.维护父子关系数据-Elasticsearch 5.x 中维护父子关系数据
- 定义对象
首先定义四个对象,分别对应mapping中的四个索引类型,篇幅关系只列出主要属性
- Basic
- Medical
- Exam
- Diagnosis
通过注解@ESId指定基本信息文档_id
public class Basic extends ESBaseData {
/**
* 索引_id
*/
@ESId
private String party_id;
private String sex; // 性别
......
}
通过注解@ESParentId指定Medical关联的基本信息文档_id,Medical文档_id由ElasticSearch自动生成
public class Medical extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hos_name_yb; //就诊医院
...
}
通过注解@ESParentId指定Exam关联的基本信息文档_id,Exam文档_id由ElasticSearch自动生成
public class Exam extends ESBaseData {
@ESParentId
private String party_id; //父id
private String hospital; // 就诊医院
....
}
通过注解@ESParentId指定Diagnosis关联的基本信息文档_id,Diagnosis文档_id由ElasticSearch自动生成
public class Diagnosis extends ESBaseData {
@ESParentId
private String party_id; //父id
private String provider; //诊断医院
private String subject; //科室
......
}
- 通过api维护测试数据
对象定义好了后,通过bboss客户数据到之前建立好的索引client_info中。
/**
* 录入体检医疗信息
*/
public void importClientInfoDataFromBeans() {
ClientInterface clientUtil = ElasticSearchHelper.getRestClientUtil();
//导入基本信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Basic> basics = buildBasics();
clientUtil.addDocuments("client_info","basic",basics,"refresh");
//导入医疗信息,并且实时刷新,测试需要,实际环境不要带refresh
List<Medical> medicals = buildMedicals();
clientUtil.addDocuments("client_info","medical",medicals,"refresh");
//导入体检结果数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Exam> exams = buildExams();
clientUtil.addDocuments("client_info","exam",exams,"refresh");
//导入结果诊断数据,并且实时刷新,测试需要,实际环境不要带refresh
List<Diagnosis> diagnosiss = buildDiagnosiss();
clientUtil.addDocuments("client_info","diagnosis",diagnosiss,"refresh");
}
//构建基本信息集合
private List<Basic> buildBasics() {
List<Basic> basics = new ArrayList<Basic>();
Basic basic = new Basic();
basic.setParty_id("1");
basic.setAge(60);
basics.add(basic);
//继续添加其他数据
return basics;
}
//
构建医疗信息集合
private List<Medical> buildMedicals() {
List<Medical> medicals = new ArrayList<Medical>();
Medical medical = new Medical();
medical.setParty_id("1");//设置父文档id-基本信息文档_id
medical.setCreated_date(new Date());
medicals.add(medical);
//继续添加其他数据
return medicals;
}
//构建体检结果数据集合
private List<Exam> buildExams() {
List<Exam> exams = new ArrayList<Exam>();
Exam exam = new Exam();
exam.setParty_id("1");//设置父文档id-基本信息文档_id
exams.add(exam);
//继续添加其他数据
return exams;
}
//构建结果诊断数据集合
private List<Diagnosis> buildDiagnosiss() {
List<Diagnosis> diagnosiss = new ArrayList<Diagnosis>();
Diagnosis diagnosis = new Diagnosis();
diagnosis.setParty_id("1");//设置父文档id-基本信息文档_id
diagnosiss.add(diagnosis);
//继续添加其他数据
return diagnosiss;
}
- 通过json报文批量导入测试数据
除了通过addDocuments录入数据,还可以通过json报文批量导入数据
在配置文件esmapper/Client_Info.xml增加以下内容:
<!--
导入基本信息:
-->
<property name="bulkImportBasicData" trim="false">
<![CDATA[
{ "index": { "_id": "1" }}
{ "party_id":"1", "sex":"男", "mari_sts":"不详", "ethnic":"蒙古族", "prof":"放牧","birth_date":"1966-2-14 00:00:00", "province":"内蒙古", "city":"赤峰市","client_type":"1", "client_name":"安", "age":52,"id_type":"1", "idno":"1", "education":"初中","created_date":"2013-04-24 00:00:00","last_modified_date":"2013-04-24 00:00:00", "etl_date":"2013-04-24 00:00:00"}
{ "index": { "_id": "2" }}
{ "party_id":"2", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"公务员","birth_date":"1986-07-06 00:00:00", "province":"广东", "city":"深圳","client_type":"1", "client_name":"彭", "age":32,"id_type":"1", "idno":"2", "education":"本科", "created_date":"2013-05-09 15:49:47","last_modified_date":"2013-05-09 15:49:47", "etl_date":"2013-05-09 15:49:47"}
{ "index": { "_id": "3" }}
{ "party_id":"3", "sex":"男", "mari_sts":"未婚", "ethnic":"汉族", "prof":"无业","birth_date":"2000-08-15 00:00:00", "province":"广东", "city":"佛山","client_type":"1", "client_name":"浩", "age":18,"id_type":"1", "idno":"3", "education":"高中", "created_date":"2014-09-01 09:49:27","last_modified_date":"2014-09-01 09:49:27", "etl_date":"2014-09-01 09:49:27" }
{ "index": { "_id": "4" }}
{ "party_id":"4", "sex":"女", "mari_sts":"未婚", "ethnic":"满族", "prof":"工人","birth_date":"1996-03-14 00:00:00", "province":"江苏", "city":"扬州","client_type":"1", "client_name":"慧", "age":22,"id_type":"1", "idno":"4", "education":"高中", "created_date":"2014-09-16 09:30:37","last_modified_date":"2014-09-16 09:30:37", "etl_date":"2014-09-16 09:30:37" }
{ "index": { "_id": "5" }}
{ "party_id":"5", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"教师","birth_date":"1983-08-14 00:00:00", "province":"宁夏", "city":"灵武","client_type":"1", "client_name":"英", "age":35,"id_type":"1", "idno":"5", "education":"本科", "created_date":"2015-09-16 09:30:37","last_modified_date":"2015-09-16 09:30:37", "etl_date":"2015-09-16 09:30:37" }
{ "index": { "_id": "6" }}
{ "party_id":"6", "sex":"女", "mari_sts":"已婚", "ethnic":"汉族", "prof":"工人","birth_date":"1959-07-04 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"岭", "age":59,"id_type":"1", "idno":"6", "education":"小学", "created_date":"2015-09-01 09:49:27","last_modified_date":"2015-09-01 09:49:27", "etl_date":"2015-09-01 09:49:27" }
{ "index": { "_id": "7" }}
{ "party_id":"7", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"1999-02-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"欣", "age":19,"id_type":"1", "idno":"7", "education":"高中", "created_date":"2016-12-01 09:49:27","last_modified_date":"2016-12-01 09:49:27", "etl_date":"2016-12-01 09:49:27" }
{ "index": { "_id": "8" }}
{ "party_id":"8", "sex":"女", "mari_sts":"未婚", "ethnic":"汉族", "prof":"学生","birth_date":"2007-11-18 00:00:00", "province":"山东", "city":"青岛","client_type":"1", "client_name":"梅", "age":10,"id_type":"1", "idno":"8", "education":"小学", "created_date":"2016-11-21 09:49:27","last_modified_date":"2016-11-21 09:49:27", "etl_date":"2016-11-21 09:49:27" }
{ "index": { "_id": "9" }}
{ "party_id":"9", "sex":"男", "mari_sts":"不详", "ethnic":"回族", "prof":"个体户","birth_date":"1978-03-29 00:00:00", "province":"北京", "city":"北京","client_type":"1", "client_name":"磊", "age":40,"id_type":"1", "idno":"9", "education":"高中", "created_date":"2017-09-01 09:49:27","last_modified_date":"2017-09-01 09:49:27", "etl_date":"2017-09-01 09:49:27" }
{ "index": { "_id": "10" }}
{ "party_id":"10", "sex":"男", "mari_sts":"已婚", "ethnic":"汉族", "prof":"农民","birth_date":"1970-11-14 00:00:00", "province":"浙江", "city":"台州","client_type":"1", "client_name":"强", "age":47,"id_type":"1", "idno":"10", "education":"初中", "created_date":"2018-09-01 09:49:27","last_modified_date":"2018-09-01 09:49:27", "etl_date":"2018-09-01 09:49:27" }
]]>
</property>
<!--
导入诊断信息
-->
<property name="bulkImportDiagnosisData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"J31.0", "sd_disease_name":"鼻炎","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "provider":"内蒙古医院", "subject":"","diagnosis_type":"","icd10_code":"E78.1", "sd_disease_name":"甘油三脂增高","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "provider":"江苏医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"H44", "sd_disease_name":"眼疾","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"M47.8", "sd_disease_name":"颈椎病","created_date":"2016-04-08 10:42:18", "last_modified_date":"2016-04-08 10:42:18", "etl_date":"2016-04-08 10:42:18" }
{ "index": { "parent": "7" }}
{ "party_id":"7", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2017-04-08 10:42:18", "last_modified_date":"2017-04-08 10:42:18", "etl_date":"2017-04-08 10:42:18" }
{ "index": { "parent": "8" }}
{ "party_id":"8", "provider":"山东医院", "subject":"","diagnosis_type":"","icd10_code":"J00", "sd_disease_name":"感冒","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }
{ "index": { "parent": "9" }}
{ "party_id":"9", "provider":"朝阳医院", "subject":"","diagnosis_type":"","icd10_code":"A03.901", "sd_disease_name":"急性细菌性痢疾","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
]]>
</property>
<!--
导入医疗信息
-->
<property name="bulkImportMedicalData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"氟化钠", "medical_sale_name":"", "medical_code":"A01AA01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-05-31 00:00:00", "last_modified_date":"2016-05-31 00:00:00", "etl_date":"2016-05-31 00:00:00" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"", "medical_sale_name":"盐酸多西环素胶丸", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2016-03-18 00:00:00", "last_modified_date":"2016-03-18 00:00:00", "etl_date":"2016-03-18 00:00:00" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸多西环素分散片", "medical_sale_name":"", "medical_code":"A01AB22", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-07-23 20:56:44", "last_modified_date":"2013-07-23 20:56:44", "etl_date":"2013-07-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2013-09-23 20:56:44", "last_modified_date":"2013-09-23 20:56:44", "etl_date":"2013-09-23 20:56:44" }
{ "index": { "parent": "1" }}
{ "party_id":"1", "hos_name_yb":"内蒙古医院", "eivisions_name":"", "medical_type":"","medical_common_name":"肾上腺素", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-09-20 09:27:44", "last_modified_date":"2018-09-20 09:27:44", "etl_date":"2018-09-20 09:27:44" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"地塞米松", "medical_sale_name":"", "medical_code":"A01AC02", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2011-05-19 15:52:55", "last_modified_date":"2011-05-19 15:52:55", "etl_date":"2011-05-19 15:52:55" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"四环素", "medical_sale_name":"", "medical_code":"A01AB13", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-04-08 10:42:18", "last_modified_date":"2018-04-08 10:42:18", "etl_date":"2018-04-08 10:42:18" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hos_name_yb":"江苏医院", "eivisions_name":"", "medical_type":"","medical_common_name":"诺氟沙星胶囊", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2015-06-08 10:42:18", "last_modified_date":"2015-06-08 10:42:18", "etl_date":"2015-06-08 10:42:18" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"盐酸异丙肾上腺素片", "medical_sale_name":"", "medical_code":"A01AD01", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "hos_name_yb":"山东医院", "eivisions_name":"", "medical_type":"","medical_common_name":"甲硝唑栓", "medical_sale_name":"", "medical_code":"A01AB17", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2018-06-08 10:42:18", "last_modified_date":"2018-06-08 10:42:18", "etl_date":"2018-06-08 10:42:18" }
{ "index": { "parent": "9" }}
{ "party_id":"9", "hos_name_yb":"朝阳医院", "eivisions_name":"", "medical_type":"","medical_common_name":"复方克霉唑乳膏", "medical_sale_name":"", "medical_code":"A01AB18", "specification":"","usage_num":"", "unit":"", "usage_times":"","created_date":"2014-01-23 20:56:44", "last_modified_date":"2014-01-23 20:56:44", "etl_date":"2014-01-23 20:56:44"}
]]>
</property>
<!--
导入体检信息
-->
<property name="bulkImportExamData" trim="false">
<![CDATA[
{ "index": { "parent": "1" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"高血压","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "2" }}
{ "party_id":"2", "hospital":"", "dept":"", "is_ok":"Y", "exam_result":"轻度脂肪肝","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "3" }}
{ "party_id":"3", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"急性细菌性痢疾","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "4" }}
{ "party_id":"4", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "5" }}
{ "party_id":"5", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "6" }}
{ "party_id":"6", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"感冒","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "7" }}
{ "party_id":"7", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "8" }}
{ "party_id":"1", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "9" }}
{ "party_id":"9", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
{ "index": { "parent": "10" }}
{ "party_id":"10", "hospital":"", "dept":"", "is_ok":"N", "exam_result":"颈椎病","fld1":"158", "fld2":"63", "fld3":"94", "fld4":"85", "fld5":"131", "fld901":"89", "fld6":"4.9","fld902":"4.8","fld14":"78", "fld21":"78", "fld23":"", "fld24":"5.5", "fld65":"5.5", "fld66":"1.025","fld67":"", "fld68":"","created_date":"2014-03-18 00:00:00", "last_modified_date":"2014-03-18 00:00:00", "etl_date":"2014-03-18 00:00:00" }
]]>
</property>
通过bboss提供的通用api,导入上面定义的数据: /**
* 通过读取配置文件中的dsl json数据导入医疗数据
*/
public void importClientInfoFromJsonData(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
clientUtil.executeHttp("client_info/basic/_bulk?refresh","bulkImportBasicData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/diagnosis/_bulk?refresh","bulkImportDiagnosisData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/medical/_bulk?refresh","bulkImportMedicalData",ClientUtil.HTTP_POST);
clientUtil.executeHttp("client_info/exam/_bulk?refresh","bulkImportExamData",ClientUtil.HTTP_POST);
统计导入的数据
long basiccount = clientUtil.countAll("client_info/basic");
System.out.println(basiccount);
long medicalcount = clientUtil.countAll("client_info/medical");
System.out.println(medicalcount);
long examcount = clientUtil.countAll("client_info/exam");
System.out.println(examcount);
long diagnosiscount = clientUtil.countAll("client_info/diagnosis");
System.out.println(diagnosiscount);
}
4.父子关系查询-Elasticsearch 5.x 中has_child和has_parent查询的基本用法- 根据父查子-通过客户名称信息查询客户端体检结果
在配置文件esmapper/Client_Info.xml增加dsl语句:queryExamSearchByClientName
<!--根据客户名称查询客户体检报告-->
<property name="queryExamSearchByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的体检报告以及总记录数据,返回size对应的1000条数据
/**
* 根据客户名称查询客户体检报告
*/
public void queryExamSearchByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
ESDatas<Exam> exams = clientUtil.searchList("client_info/exam/_search","queryExamSearchByClientName",params,Exam.class);
List<Exam> examList = exams.getDatas();//获取符合条件的体检数据
long totalSize = exams.getTotalSize();//符合条件的总记录数据
}
- 根据子查父数据-通过医疗信息编码查找客户基本数据
在配置文件esmapper/Client_Info.xml增加查询dsl语句:queryClientInfoByMedicalName
<!--通过医疗信息编码查找客户基本数据-->
<property name="queryClientInfoByMedicalName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_child": {
"type": "medical",
"score_mode": "max",
"query": {
"match": {
"medical_code": #[medicalCode] ## 通过变量medicalCode设置医疗编码
}
}
}
}
}
]]>
</property>
执行查询,通过bboss的searchList 方法获取符合条件的客户端基本信息以及总记录数据 /**
* 通过医疗信息编码查找客户基本数据
*/
public void queryClientInfoByMedicalName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("medicalCode","A01AA01"); //通过变量medicalCode设置医疗编码
params.put("size",1000); //最多返回size变量对应的记录条数
ESDatas<Basic> bascis = clientUtil.searchList("client_info/basic/_search","queryClientInfoByMedicalName",params,Basic.class);
List<Basic> bascisList = bascis.getDatas();//获取符合条件的客户信息
long totalSize = bascis.getTotalSize();
}
5.同时返回父子数据-Elasticsearch 5.x 中如何在检索中同时返回父子数据这一节中我们介绍同时返回父子数据的玩法 :inner_hits的妙用
- 根据父条件查询所有子数据集合并返回父数据,根据客户名称查询所有体检数据,同时返回客户信息
在配置文件esmapper/Client_Info.xml增加检索dsl-queryDiagnosisByClientName
<!--根据客户名称获取客户体检诊断数据,并返回客户信息-->
<property name="queryDiagnosisByClientName">
<![CDATA[
{
"query": {
## 最多返回size变量对应的记录条数
"size":#[size],
"has_parent": {
"type": "basic",
"query": {
"match": {
"client_name": #[clientName] ## 通过变量clientName设置客户名称
}
},
"inner_hits": {} ## 通过变量inner_hits表示要返回对应的客户信息
}
}
}
]]>
</property>
执行检索并遍历结果 /**
* 根据客户名称获取客户体检诊断数据,并返回客户数据
*/
public void queryDiagnosisByClientName(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_info.xml");
Map<String,Object> params = new HashMap<String,Object>();
params.put("clientName","张三");
params.put("size",1000);
try {
ESInnerHitSerialThreadLocal.setESInnerTypeReferences(Basic.class);//指定inner查询结果对应的客户基本信息类型,Basic只有一个文档类型,索引不需要显示指定basic对应的mapping type名称
ESDatas<Diagnosis> diagnosiss = clientUtil.searchList("client_info/diagnosis/_search",
"queryDiagnosisByClientName",params,Diagnosis.class);
List<Diagnosis> diagnosisList = diagnosiss.getDatas();//获取符合条件的体检报告数据
long totalSize = diagnosiss.getTotalSize();
//遍历诊断报告信息,并查看报告对应的客户基本信息
for(int i = 0; diagnosisList != null && i < diagnosisList.size(); i ++) {
Diagnosis diagnosis = diagnosisList.get(i);
List<Basic> basics = ResultUtil.getInnerHits(diagnosis.getInnerHits(), "basic");
if(basics != null) {
System.out.println(basics.size());
}
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对应的客户基本信息类型
}
}
- 根据子条件查询父数据并返回符合条件的父的子数据集合,查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
在配置文件esmapper/Client_Info.xml增加检索dsl-queryClientAndAllSons
<!--查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录-->
<property name="queryClientAndAllSons">
<![CDATA[
{
"query": {
"bool": {
"should": [
{
"match_all":{}
}
]
,"must": [
{
"has_child": {
"score_mode": "none",
"type": "diagnosis"
,"query": {
"bool": {
"must": [
{
"term": {
"icd10_code": {
"value": "J00"
}
}
}
]
}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"score_mode": "none",
"type": "medical"
,"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
,"should": [
{
"has_child": {
"type": "exam",
"query": {
"match_all": {}
},"inner_hits":{}
}
}
]
}
}
}
]]>
</property>
执行查询: /**
* 查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
*/
public void queryClientAndAllSons(){
ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/Client_Info.xml");
Map<String,Object> params = null;//没有检索条件,构造一个空的参数对象
try {
//设置子文档的类型和对象映射关系
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("exam",Exam.class);//指定inner查询结果对于exam类型和对应的对象类型Exam
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("diagnosis",Diagnosis.class);//指定inner查询结果对于diagnosis类型和对应的对象类型Diagnosis
ESInnerHitSerialThreadLocal.setESInnerTypeReferences("medical",Medical.class);//指定inner查询结果对于medical类型和对应的对象类型Medical
ESDatas<Basic> escompanys = clientUtil.searchList("client_info/basic/_search",
"queryClientAndAllSons",params,Basic.class);
//String response = clientUtil.executeRequest("client_info/basic/_search","queryClientAndAllSons",params);直接获取原始的json报文
// escompanys = clientUtil.searchAll("client_info",Basic.class);
long totalSize = escompanys.getTotalSize();
List<Basic> clientInfos = escompanys.getDatas();//获取符合条件的数据
//查看公司下面的雇员信息(符合检索条件的雇员信息)
for (int i = 0; clientInfos != null && i < clientInfos.size(); i++) {
Basic clientInfo = clientInfos.get(i);
List<Exam> exams = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "exam");
if(exams != null)
System.out.println(exams.size());
List<Diagnosis> diagnosiss = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "diagnosis");
if(diagnosiss != null)
System.out.println(diagnosiss.size());
List<Medical> medicals = ResultUtil.getInnerHits(clientInfo.getInnerHits(), "medical");
if(medicals != null)
System.out.println(medicals.size());
}
}
finally{
ESInnerHitSerialThreadLocal.clean();//清空inner查询结果对于各种类型信息
}
}
最后我们按顺序执行所有方法,验证功能: @Test
public void testMutil(){
this.createClientIndice();//创建indice client_info
// this.importClientInfoDataFromBeans(); //通过api添加测试数据
this.importClientInfoFromJsonData();//导入测试数据
this.queryExamSearchByClientName(); //根据客户端名称查询提交报告
this.queryClientInfoByMedicalName();//通过医疗信息编码查找客户基本数据
this.queryDiagnosisByClientName();//根据客户名称获取客户体检诊断数据,并返回客户数据
this.queryClientAndAllSons();//查询客户信息,同时返回客户对应的所有体检报告、医疗记录、诊断记录
}
可以下载完整的demo工程运行本文中的测试用例方法,地址见相关资料。到此Elasticsearch 5.x 父子关系维护检索实战介绍完毕,谢谢大家!
相关资料
完整demo工程 https://github.com/bbossgroups/eshelloword-booter
对应的类文件和配置文件
org.bboss.elasticsearchtest.parentchild.ParentChildTest
esmapper/Client_Info.xml
开发交流
bboss交流群 166471282
bboss公众号
敬请关注:父子关系维护检索实战二 Elasticsearch 6.x 父子关系维护检
restclient请求超时后,不断retry,尝试链接停掉的节点,如何限制retry次数?
Elasticsearch • rochy 回复了问题 • 2 人关注 • 1 个回复 • 3363 次浏览 • 2018-12-11 09:53
TF/IDF 计算fieldNorm,不同文档字段的词数(fieldNum)不一样,为什么得分结果都一样(版本2.3)
Elasticsearch • medcl 回复了问题 • 4 人关注 • 5 个回复 • 2143 次浏览 • 2018-12-11 12:52
logstash input插件开发
Logstash • bellengao 发表了文章 • 0 个评论 • 4374 次浏览 • 2018-12-10 19:56
logstash作为一个数据管道中间件,支持对各种类型数据的采集与转换,并将数据发送到各种类型的存储库,比如实现消费kafka数据并且写入到Elasticsearch, 日志文件同步到对象存储S3等,mysql数据同步到Elasticsearch等。
logstash内部主要包含三个模块:
- input: 从数据源获取数据
- filter: 过滤、转换数据
- output: 输出数据

不同类型的数据都可以通过对应的input-plugin, output-plugin完成数据的输入与输出。如需要消费kafka中的数据并写入到Elasticsearch中,则需要使用logstash的kafka-input-plugin完成数据输入,logstash-output-elasticsearch完成数据输出。如果需要对输入数据进行过滤或者转换,比如根据关键词过滤掉不需要的内容,或者时间字段的格式转换,就需要又filter-plugin完成了。
logstash的input插件目前已经有几十种了,支持大多数比较通用或开源的数据源的输入。但如果公司内部开发的数据库或其它存储类的服务不能和开源产品在接口协议上兼容,比如腾讯自研的消息队列服务CMQ不依赖于其它的开源消息队列产品,所以不能直接使用logstash的logstash-input-kafka或logstash-input-rabbitmq同步CMQ中的数据;腾讯云对象存储服务COS, 在鉴权方式上和AWS的S3存在差异,也不能直接使用logstash-input-s3插件从COS中读取数据,对于这种情况,就需要自己开发logstash的input插件了。
本文以开发logstash的cos input插件为例,介绍如何开发logstash的input插件。
logstash官方提供了有个简单的input plugin example可供参考:
[https://github.com/logstash-pl ... mple/](https://github.com/logstash-pl ... ample/)
环境准备
logstash使用jruby开发,首先要配置jruby环境:
- 安装rvm:
rvm是一个ruby管理器,可以安装并管理ruby环境,也可以通过命令行切换到不同的ruby版本。
<br /> gpg --keyserver hkp://keys.gnupg.net --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3 7D2BAF1CF37B13E2069D6956105BD0E739499BDB<br />
<br /> \curl -sSL <a href="https://get.rvm.io" rel="nofollow" target="_blank">https://get.rvm.io</a> | bash -s stable<br />
<br /> source /etc/profile.d/rvm.sh<br />
- 安装jruby
<br /> rvm install jruby<br />
<br /> rvm use jruby<br />
- 安装包管理工具bundle和测试工具rspec
<br /> gem install bundle<br /> gem install rspec<br />
从example开始
- clone logstash-input-example
<br /> git clone <a href="https://github.com/logstash-plugins/logstash-input-example.git" rel="nofollow" target="_blank">https://github.com/logstash-pl ... e.git</a><br />
- 将clone出来的logstash-input-example源码copy到logstash-input-cos目录,并删除.git文件夹,目的是以logstash-input-example的源码为参考进行开发,同时把需要改动名称的地方修改一下:
<br /> mv logstash-input-example.gemspec logstash-input-cos.gemspec<br /> mv lib/logstash/inputs/example.rb lib/logstash/inputs/cos.rb<br /> mv spec/inputs/example_spec.rb spec/inputs/cos_spec.rb<br />
- 建立的源码目录结构如图所示:

其中,重要文件的作用说明如下:
- cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
- cos_spec.rb: 单元测试文件,通过rspec可以对cos.rb中的代码进行测试
- logstash-input-cos.gemspec: 类似于maven中的pom.xml文件,配置工程的版本、名称、licene,包依赖等,通过bundle命令可以下载依赖包
配置并下载依赖
因为腾讯云COS服务没有ruby sdk, 因为只能依赖其Java sdk进行开发,首先添加对cos java sdk的依赖。在logstash-input-cos.gemspec中Gem dependencies配置栏中增加以下内容:
```Gem dependencies
s.requirements << "jar 'com.qcloud:cos_api', '5.4.4'"
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'stud', '>= 0.0.22'
s.add_runtime_dependency 'jar-dependencies'
s.add_development_dependency 'logstash-devutils', '1.3.6'
<br /> 相比logstash-input-example.gemspec,增加了对com.qcloud:cos_api包以及jar-dependencies包的依赖,jar-dependencies用于在ruby环境中管理jar包,并且可以跟踪jar包的加载状态。<br /> <br /> 然后,在logstash-input-cos.gemspec中增加配置:<br /> <br />
s.platform = 'java'
<br /> 这样可以成功下载java依赖包,并且可以在ruby代码中直接调用java代码。<br /> <br /> 最后,执行以下命令下载依赖:<br /> <br />
bundle install
```
编写代码
logstash-input-cos的代码逻辑其实比较简单,主要是通过执行定时任务,调用cos java sdk中的listObjects方法,获取到指定bucket里的数据,并在每次定时任务执行结束后设置marker保存在本地,再次执行时从marker位置获取数据,以实现数据的增量同步。
jar包的引用
因为要调用cos java sdk中的代码,先引用该jar包:
<br /> require 'cos_api-5.4.4.jar'<br /> java_import com.qcloud.cos.COSClient;<br /> java_import com.qcloud.cos.ClientConfig;<br /> java_import com.qcloud.cos.auth.BasicCOSCredentials;<br /> java_import com.qcloud.cos.auth.COSCredentials;<br /> java_import com.qcloud.cos.exception.CosClientException;<br /> java_import com.qcloud.cos.exception.CosServiceException;<br /> java_import com.qcloud.cos.model.COSObjectSummary;<br /> java_import com.qcloud.cos.model.ListObjectsRequest;<br /> java_import com.qcloud.cos.model.ObjectListing;<br /> java_import com.qcloud.cos.region.Region;<br />
读取配置文件
logstash配置文件读取的代码如图所示:

config_name为cos,其它的配置项读取代码按照ruby的代码规范编写,添加类型校验与默认值,就可以从以下配置文件中读取配置项:
<br /> input {<br /> cos {<br /> "endpoint" => "cos.ap-guangzhou.myqcloud.com"<br /> "access_key_id" => "*****"<br /> "access_key_secret" => "****"<br /> "bucket" => "******"<br /> "region" => "ap-guangzhou"<br /> "appId" => "**********"<br /> "interval" => 60<br /> }<br /> }<br /> <br /> output {<br /> stdout {<br /> codec=>rubydebug<br /> }<br /> }<br />
实现register方法
logstash input插件必须实现另个方法:register 和run
register方法类似于初始化方法,在该方法中可以直接使用从配置文件读取并赋值的变量,完成cos client的初始化,代码如下:
```1 初始化用户身份信息(appid, secretId, secretKey)
cred = com.qcloud.cos.auth.BasicCOSCredentials.new(@access_key_id, @access_key_secret)
2 设置bucket的区域, COS地域的简称请参照 https://www.qcloud.com/document/product/436/6224
clientConfig = com.qcloud.cos.ClientConfig.new(com.qcloud.cos.region.Region.new(@region))
3 生成cos客户端
@cosclient = com.qcloud.cos.COSClient.new(cred, clientConfig)
bucket名称, 需包含appid
bucketName = @bucket + "-"+ @appId
@bucketName = bucketName
@listObjectsRequest = com.qcloud.cos.model.ListObjectsRequest.new()设置bucket名称
@listObjectsRequest.setBucketName(bucketName)
prefix表示列出的object的key以prefix开始
@listObjectsRequest.setPrefix(@prefix)
设置最大遍历出多少个对象, 一次listobject最大支持1000
@listObjectsRequest.setMaxKeys(1000)
@listObjectsRequest.setMarker(@markerConfig.getMarker)
```
示例代码中设置了@cosclient和@listObjectRequest为全局变量, 因为在run方法中会用到这两个变量。
注意在ruby中调用java代码的方式:没有变量描述符;不能直接new Object(),而只能Object.new().
实现run方法
run方法获取数据并将数据流转换成event事件
最简单的run方法为:
<br /> def run(queue)<br /> Stud.interval(@interval) do<br /> event = LogStash::Event.new("message" => @message, "host" => @host)<br /> decorate(event)<br /> queue << event<br /> end # loop<br /> end # def run<br />
代码说明:
- 通过Stud ruby模块执行定时任务,interval可自定义,从配置文件中读取
- 生成event, 示例代码生成了一个包含两个字段数据的event
- 调用decorate()方法, 给该event打上tag,如果配置的话
- queue<<event, 将event插入到数据管道中,发送给filter处理
logstash-input-cos的run方法实现为:
```
def run(queue)
@current_thread = Thread.current
Stud.interval(@interval) do
process(queue)
end
end
def process(queue)
@logger.info('Marker from: ' + @markerConfig.getMarker)
objectListing = @cosclient.listObjects(@listObjectsRequest)
nextMarker = objectListing.getNextMarker()
cosObjectSummaries = objectListing.getObjectSummaries()
cosObjectSummaries.each do |obj|
文件的路径key
key = obj.getKey()
if stop?
@logger.info("stop while attempting to read log file")
break
end根据key获取内容
getObject(key) { |log|
发送消息
@codec.decode(log) do |event|
decorate(event)
queue << event
end
}
记录 marker
@markerConfig.setMarker(key)
@logger.info('Marker end: ' + @markerConfig.getMarker)
end
end
获取下载输入流
def getObject(key, &block)
getObjectRequest = com.qcloud.cos.model.GetObjectRequest.new(@bucketName, key)
cosObject = @cosclient.getObject(getObjectRequest)
cosObjectInput = cosObject.getObjectContent()
buffered =BufferedReader.new(InputStreamReader.new(cosObjectInput))
while (line = buffered.readLine())
block.call(line)
end
end
```
测试代码
在spec/inputs/cos_spec.rb中增加如下测试代码:
```encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/cos"
describe LogStash::Inputs::Cos do
it_behaves_like "an interruptible input plugin" do
let(:config) { {
"endpoint" => 'cos.ap-guangzhou.myqcloud.com',
"access_key_id" => '',
"access_key_secret" => '',
"bucket" => '',
"region" => 'ap-guangzhou',
"appId" => '',
"interval" => 60 } }
end
end
<br /> <br /> rspec是一个ruby测试库,通过bundle命令执行rspec:<br /> <br />
bundle exec rspec
<br /> 如果cos.rb中的代码没有语法或运行时错误,则会出现如果信息表明测试成功:<br /> <br />
Finished in 0.8022 seconds (files took 3.45 seconds to load)
1 example, 0 failures
```
构建并测试input-plugin-cos
build
使用gem对input-plugin-cos插件源码进行build:
<br /> gem build logstash-input-cos.gemspec<br />
构建完成后会生成一个名为logstash-input-cos-0.0.1-java.gem的文件
test
在logstash的解压目录下,执行一下命令安装logstash-input-cos plugin:
<br /> ./bin/logstash-plugin install /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem<br />
执行结果为:
<br /> Validating /usr/local/githome/logstash-input-cos/logstash-input-cos-0.0.1-java.gem<br /> Installing logstash-input-cos<br /> Installation successful<br />
另外,可以通过./bin/logstash-plugin list命令查看logstash已经安装的所有input/output/filter/codec插件。
生成配置文件cos.logstash.conf,内容为:
<br /> input {<br /> cos {<br /> "endpoint" => "cos.ap-guangzhou.myqcloud.com"<br /> "access_key_id" => "*****"<br /> "access_key_secret" => "****"<br /> "bucket" => "******"<br /> "region" => "ap-guangzhou"<br /> "appId" => "**********"<br /> "interval" => 60<br /> }<br /> }<br /> <br /> output {<br /> stdout {<br /> codec=>rubydebug<br /> }<br /> }<br />
该配置文件使用腾讯云官网账号的secret_id和secret_key进行权限验证,拉取指定bucket里的数据,为了测试,将output设置为标准输出。
执行logstash:
<br /> ./bin/logstash -f cos.logstash.conf<br />
输出结果为:
<br /> Sending Logstash's logs to /root/logstash-5.6.4/logs which is now configured via log4j2.properties<br /> [2018-07-30T19:26:17,039][WARN ][logstash.runner ] --config.debug was specified, but log.level was not set to 'debug'! No config info will be logged.<br /> [2018-07-30T19:26:17,048][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.4/modules/netflow/configuration"}<br /> [2018-07-30T19:26:17,049][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.4/modules/fb_apache/configuration"}<br /> [2018-07-30T19:26:17,252][INFO ][logstash.inputs.cos ] Using version 0.1.x input plugin 'cos'. This plugin isn't well supported by the community and likely has no maintainer.<br /> [2018-07-30T19:26:17,341][INFO ][logstash.pipeline ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}<br /> [2018-07-30T19:26:17,362][INFO ][logstash.inputs.cos ] Registering cos input {:bucket=>"bellengao", :region=>"ap-guangzhou"}<br /> [2018-07-30T19:26:17,528][INFO ][logstash.pipeline ] Pipeline main started<br /> [2018-07-30T19:26:17,530][INFO ][logstash.inputs.cos ] Marker from:<br /> log4j:WARN No appenders could be found for logger (org.apache.http.client.protocol.RequestAddCookies).<br /> log4j:WARN Please initialize the log4j system properly.<br /> log4j:WARN See <a href="http://logging.apache.org/log4j/1.2/faq.html#noconfig" rel="nofollow" target="_blank">http://logging.apache.org/log4 ... onfig</a> for more info.<br /> [2018-07-30T19:26:17,574][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}<br /> [2018-07-30T19:26:17,714][INFO ][logstash.inputs.cos ] Marker end: access.log<br /> {<br /> "message" => "77.179.66.156 - - [25/Oct/2016:14:49:33 +0200] \"GET / HTTP/1.1\" 200 612 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",<br /> "@version" => "1",<br /> "@timestamp" => 2018-07-30T11:26:17.710Z<br /> }<br /> {<br /> "message" => "77.179.66.156 - - [25/Oct/2016:14:49:34 +0200] \"GET /favicon.ico HTTP/1.1\" 404 571 \"<a href="http://localhost:8080/" rel="nofollow" target="_blank">http://localhost:8080/</a>\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.59 Safari/537.36\"",<br /> "@version" => "1",<br /> "@timestamp" => 2018-07-30T11:26:17.711Z<br /> }<br />
在cos中的bucket里上传了名为access.log的nginx日志,上述输出结果中最后打印出来的每个json结构体构成一个event, 其中message消息即为access.log中每一条日志。
- cos.rb: 主文件,在该文件中编写logstash配置文件的读写与源数据获取的代码,需要继承LogStash::Inputs::Base基类
- 安装rvm:
elastic 过滤器缓存问题
Elasticsearch • rochy 回复了问题 • 4 人关注 • 2 个回复 • 2683 次浏览 • 2018-12-11 09:54
安装search guard后,使用客户端代码连接ES。ES版本是6.5.1。
Elasticsearch • zqc0512 回复了问题 • 3 人关注 • 4 个回复 • 3132 次浏览 • 2018-12-12 09:04
ES数据快照到HDFS
Elasticsearch • medcl 回复了问题 • 3 人关注 • 1 个回复 • 3423 次浏览 • 2018-12-10 16:45
Day 10 - Elasticsearch 分片恢复并发数过大引发的bug分析
Advent • howardhuang 发表了文章 • 4 个评论 • 12124 次浏览 • 2018-12-10 11:43
大家好,今天为大家分享一次 ES 的填坑经验。主要是关于集群恢复过程中,分片恢复并发数调整过大导致集群 hang 死的问题。
场景描述
废话不多说,先来描述场景。某日,腾讯云线上某 ES 集群,15个节点,2700+ 索引,15000+ 分片,数十 TB 数据。由于机器故障,某个节点被重启,此时集群有大量的 unassigned 分片,集群处于 yellow 状态。为了加快集群恢复的速度,手动调整分片恢复并发数,原本想将默认值为2的 node_concurrent_recoveries 调整为10,结果手一抖多加了一个0,设定了如下参数:
<br /> curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'<br /> {<br /> "persistent": {<br /> "cluster.routing.allocation.node_concurrent_recoveries": 100,<br /> "indices.recovery.max_bytes_per_sec": "40mb"<br /> }<br /> }<br /> '<br />
设定之后,观察集群 unassigned 分片,一开始下降的速度很快。大约几分钟后,数量维持在一个固定值不变了,然后,然后就没有然后了,集群所有节点 generic 线程池卡死,虽然已存在的索引读写没问题,但是新建索引以及所有涉及 generic 线程池的操作全部卡住。立马修改分片恢复并发数到10,通过管控平台一把重启了全部节点,约15分钟后集群恢复正常。接下来会先介绍一些基本的概念,然后再重现这个问题并做详细分析。
基本概念
ES 线程池(thread pool)
ES 中每个节点有多种线程池,各有用途。重要的有:
- generic :通用线程池,后台的 node discovery,上述的分片恢复(node recovery)等等一些通用后台的操作都会用到该线程池。该线程池线程数量默认为配置的处理器数量(processors)* 4,最小128,最大512。
- index :index/delete 等索引操作会用到该线程池,包括自动创建索引等。默认线程数量为配置的处理器数量,默认队列大小:200.
- search :查询请求处理线程池。默认线程数量:int((# of available_processors * 3) / 2) + 1,默认队列大小:1000.
- get :get 请求处理线程池。默认线程数量为配置的处理器数量,默认队列大小:1000.
- write :单个文档的 index/delete/update 以及 bulk 请求处理线程。默认线程数量为配置的处理器数量,默认队列大小:200,在写多的日志场景我们一般会将队列调大。
还有其它线程池,例如备份回档(snapshot)、analyze、refresh 等,这里就不一一介绍了。详细可参考官方文档:https://www.elastic.co/guide/e ... .html
集群恢复之分片恢复
我们知道 ES 集群状态分为三种,green、yellow、red。green 状态表示所有分片包括主副本均正常被分配;yellow 状态表示所有主分片已分配,但是有部分副本分片未分配;red 表示有部分主分片未分配。
一般当集群中某个节点因故障失联或者重启之后,如果集群索引有副本的场景,集群将进入分片恢复阶段(recovery)。此时一般是 master 节点发起更新集群元数据任务,分片的分配策略由 master 决定,具体分配策略可以参考腾讯云+社区的这篇文章了解细节:https://cloud.tencent.com/deve ... 34743 。各节点收到集群元数据更新请求,检查分片状态并触发分片恢复流程,根据分片数据所在的位置,有多种恢复的方式,主要有以下几种:
- EXISTING_STORE : 数据在节点本地存在,从本地节点恢复。
- PEER :本地数据不可用或不存在,从远端节点(源分片,一般是主分片)恢复。
- SNAPSHOT : 数据从备份仓库恢复。
- LOCAL_SHARDS : 分片合并(shrink)场景,从本地别的分片恢复。
PEER 场景分片恢复并发数主要由如下参数控制:
- cluster.routing.allocation.node_concurrent_incoming_recoveries:节点上最大接受的分片恢复并发数。一般指分片从其它节点恢复至本节点。
- cluster.routing.allocation.node_concurrent_outgoing_recoveries :节点上最大发送的分片恢复并发数。一般指分片从本节点恢复至其它节点。
- cluster.routing.allocation.node_concurrent_recoveries :该参数同时设置上述接受发送分片恢复并发数为相同的值。
详细参数可参考官方文档:https://www.elastic.co/guide/e ... .html
集群卡住的主要原因就是从远端节点恢复(PEER Recovery)的并发数过多,导致 generic 线程池被用完。涉及目标节点(target)和源节点(source)的恢复交互流程,后面分析问题时我们再来详细讨论。
问题复现与剖析
为了便于描述,我用 ES 6.4.3版本重新搭建了一个三节点的集群。单节点 1 core,2GB memory。新建了300个 index, 单个 index 5个分片一个副本,共 3000 个 shard。每个 index 插入大约100条数据。
先设定分片恢复并发数,为了夸张一点,我直接调整到200,如下所示:
<br /> curl -X PUT "localhost:9200/_cluster/settings" -H 'Content-Type: application/json' -d'<br /> {<br /> "persistent": {<br /> "cluster.routing.allocation.node_concurrent_recoveries": 200 // 设定分片恢复并发数<br /> }<br /> }<br /> '<br />
接下来停掉某节点,模拟机器挂掉场景。几分钟后,观察集群分片恢复数量,卡在固定数值不再变化:
通过 allocation explain 查看分片分配状态,未分配的原因是受到最大恢复并发数的限制:
观察线程池的数量,generic 线程池打满128.
此时查询或写入已有索引不受影响,但是新建索引这种涉及到 generic 线程池的操作都会卡住。
通过堆栈分析,128 个 generic 线程全部卡在 PEER recovery 阶段。
现象有了,我们来分析一下这种场景,远程分片恢复(PEER Recovery)流程为什么会导致集群卡住。
当集群中有分片的状态发生变更时,master 节点会发起集群元数据更新(cluster state update)请求给所有节点。其它节点收到该请求后,感知到分片状态的变更,启动分片恢复流程。部分分片需要从其它节点恢复,代码层面,涉及分片分配的目标节点(target)和源节点(source)的交互流程如下:
6.x 版本之后引入了 seqNo,恢复会涉及到 seqNo+translog,这也是6.x提升恢复速度的一大改进。我们重点关注流程中第 2、4、5、7、10、12 步骤中的远程调用,他们的作用分别是:
- 第2步:分片分配的目标节点向源节点(一般是主分片)发起分片恢复请求,携带起始 seqNo 和 syncId。
- 第4步:发送数据文件信息,告知目标节点待接收的文件清单。
- 第5步:发送 diff 数据文件给目标节点。
- 第7步:源节点发送 prepare translog 请求给目标节点,等目标节点打开 shard level 引擎,准备接受 translog。
- 第10步:源节点发送指定范围的 translog 快照给目标节点。
- 第12步:结束恢复流程。
我们可以看到除第5步发送数据文件外,多次远程交互 submitRequest 都会调用 txGet,这个调用底层用的是基于 AQS 改造过的 sync 对象,是一个同步调用。 如果一端 generic 线程池被这些请求打满,发出的请求等待对端返回,而发出的这些请求由于对端 generic 线程池同样的原因被打满,只能 pending 在队列中,这样两边的线程池都满了而且相互等待对端队列中的线程返回,就出现了分布式死锁现象。
问题处理
为了避免改动太大带来不确定的 side effect,针对腾讯云 ES 集群我们目前先在 rest 层拒掉了并发数超过一定值的参数设定请求并提醒用户。与此同时,我们向官方提交了 issue:https://github.com/elastic/ela ... 36195 进行跟踪。
总结
本文旨在描述集群恢复过程出现的集群卡死场景,避免更多的 ES 用户踩坑,没有对整体分片恢复做详细的分析,大家想了解详细的分片恢复流程可以参考腾讯云+社区 Elasticsearch 专栏相关的文章:https://cloud.tencent.com/developer/column/2428
完结,谢谢!
关于滚动模式,关注terms耗时的化是不是就不需要缩小分片了
Elasticsearch • medcl 回复了问题 • 3 人关注 • 1 个回复 • 2271 次浏览 • 2018-12-10 15:33
社区日报 第474期 (2018-12-10)
社区日报 • cyberdak 发表了文章 • 0 个评论 • 1939 次浏览 • 2018-12-10 10:03
http://t.cn/Ey1omYc
2. Es 另外一款web管理UI,包含导入,查看,编辑等功能
http://t.cn/Ey1dKAj
3. 使用elasticseach 搜索emoji表情
http://t.cn/Rf5r848
编辑:cyberdak
归档:https://elasticsearch.cn/article/6183
订阅:https://tinyletter.com/elastic-daily
Day 9 - 动手实现一个自定义beat
Advent • Xinglong 发表了文章 • 0 个评论 • 4394 次浏览 • 2018-12-09 21:09
参考
https://elasticsearch.cn/article/113
https://www.elastic.co/blog/build-your-own-beat
介绍
公司内部有统一的log收集系统,并且实现了定制的filebeat进行log收集。为了实现实时报警和监控,自定义的beat并没有直接把输出发给elasticsearch后端,而是中间会经过storm或者durid进行实时分析,然后落入es或者hdfs。同时由于是统一log收集,所以目前还没有针对具体的不同应用进行log的内容的切分,导致所有的log都是以一行为单位落入后端存储。于是需要针对不同的业务部门定制不同的beat。
本文初步尝试定制一个可以在beat端解析hdfs audit log的beat,限于篇幅,只实现了基本的文件解析功能。下面会从环境配置,代码实现,运行测试三个方面进行讲解。
环境配置
go version go1.9.4 linux/amd64
python version: 2.7.9
不得不吐槽下python的安装,各种坑。因为系统默认的python版本是2.7.5,而cookiecutter建议使用2.7.9
下面的工具会提供python本身需要依赖的一些native包
<br /> yum install openssl -y<br /> yum install openssl-devel -y<br /> yum install zlib-devel -y<br /> yum install zlib -y<br />
安装python
<br /> wget <a href="https://www.python.org/ftp/python/2.7.9/Python-2.7.9.tgz" rel="nofollow" target="_blank">https://www.python.org/ftp/pyt ... 9.tgz</a><br /> tar -zxvf Python-2.7.9.tgz<br /> cd ~/python/Python-2.7.9<br /> ./configure --prefix=/usr/local/python-2.7.9<br /> make<br /> make install<br /> <br /> rm -f /bin/python<br /> ln -s /usr/local/python-2.7.9/bin/python /bin/python<br />
安装工具包 distribute, setuptools, pip
<br /> cd ~/python/setuptools-19.6 && python setup.py install<br /> cd ~/python/pip-1.5.4 && python setup.py install<br /> cd ~/python/distribute-0.7.3 && python setup.py install<br />
安装cookiecutter
<br /> pip install --user cookiecutter<br />
安装cookiecutter所依赖的工具
<br /> pip install backports.functools-lru-cache<br /> pip install six<br /> pip install virtualenv<br />
*** virtualenv 安装好了之后,所在目录是在python的目录里面 (/usr/local/python-2.7.9/bin/virtualenv),需要配置好PATH,这个工具稍后会被beat的Makefile用到
代码实现
需要实现的功能比较简单,目标是打开hdfs-audit.log文件,然后逐行读取,同时解析出必要的信息,组装成event,然后发送出去,如果对接的es的话,需要同时支持自动在es端创建正确的mapping
使用官方提供的beat模板创建自己的beat
- 需要设置好环境变量$GOPATH,本例子中GOPATH=/root/go
<br /> $ go get github.com/elastic/beats<br /> $ cd $GOPATH/src/github.com/elastic/beats<br /> $ git checkout 5.1<br /> <br /> [root@minikube-2830379 suxingfate]# cookiecutter /root/go/src/github.com/elastic/beats/generate/beat<br /> project_name [Examplebeat]: hdfsauditbeat<br /> github_name [your-github-name]: suxingfate<br /> beat [hdfsauditbeat]:<br /> beat_path [github.com/suxingfate]:<br /> full_name [Firstname Lastname]: xinglong<br /> <br /> make setup<br />
到这里,模板就生成了,然后就是定制需要的东西。
- 1 _meta/beat.yml # 配置模板文件,定义我们的beat会接受哪些配置项
- 2 config/config.go #使用go的struct定义整个config对象,包含所有的配置项
- 3 beater/hdfsauditbeat.go # 核心逻辑代码
- 4 _meta/fields.yml #这里是跟es对接的时候给es定义的mapping
1 _meta/beat.yml
这里增加了path,为后面配置hdfs-audit.log文件的位置留好坑
```
[root@minikube-2830379 hdfsauditbeat]# cat _meta/beat.yml
################### Hdfsauditbeat Configuration Example #########################
############################# Hdfsauditbeat ######################################
hdfsauditbeat:Defines how often an event is sent to the output
period: 1s
path: "."
```
2 config/config.go
这里把path定义到struct里面,后面核心代码就可以从config对象获得path了
<br /> [root@minikube-2830379 hdfsauditbeat]# cat config/config.go<br /> // Config is put into a different package to prevent cyclic imports in case<br /> // it is needed in several locations<br /> <br /> package config<br /> <br /> import "time"<br /> <br /> type Config struct {<br /> Period time.Duration `config:"period"`<br /> Path string `config:"path"`<br /> }<br /> <br /> var DefaultConfig = Config{<br /> Period: 1 * time.Second,<br /> Path: ".",<br /> }<br />
3 beater/hdfsauditbeat.go
这里需要改动的地方是:
3.1 定义了一个catAudit函数来解析目标文件的每一行,同时生成自定义的event,然后发送出去
3.2 Run函数调用自定义的catAudit函数,从而把我们的功能嵌入
<br /> [root@minikube-2830379 hdfsauditbeat]# cat beater/hdfsauditbeat.go<br /> package beater<br /> <br /> import (<br /> "fmt"<br /> "time"<br /> "os"<br /> "io"<br /> "bufio"<br /> "strings"<br /> "github.com/elastic/beats/libbeat/beat"<br /> "github.com/elastic/beats/libbeat/common"<br /> "github.com/elastic/beats/libbeat/logp"<br /> "github.com/elastic/beats/libbeat/publisher"<br /> <br /> "github.com/suxingfate/hdfsauditbeat/config"<br /> )<br /> <br /> type Hdfsauditbeat struct {<br /> done chan struct{}<br /> config config.Config<br /> client publisher.Client<br /> }<br /> <br /> // Creates beater<br /> func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {<br /> config := config.DefaultConfig<br /> if err := cfg.Unpack(&config); err != nil {<br /> return nil, fmt.Errorf("Error reading config file: %v", err)<br /> }<br /> <br /> bt := &Hdfsauditbeat{<br /> done: make(chan struct{}),<br /> config: config,<br /> }<br /> return bt, nil<br /> }<br /> <br /> func (bt *Hdfsauditbeat) Run(b *beat.Beat) error {<br /> logp.Info("hdfsauditbeat is running! Hit CTRL-C to stop it.")<br /> <br /> bt.client = b.Publisher.Connect()<br /> ticker := time.NewTicker(bt.config.Period)<br /> counter := 1<br /> for {<br /> select {<br /> case <-bt.done:<br /> return nil<br /> case <-ticker.C:<br /> }<br /> <br /> bt.catAudit(bt.config.Path)<br /> <br /> logp.Info("Event sent")<br /> counter++<br /> }<br /> }<br /> <br /> func (bt *Hdfsauditbeat) Stop() {<br /> bt.client.Close()<br /> close(bt.done)<br /> }<br /> <br /> func (bt *Hdfsauditbeat) catAudit(auditFile string) {<br /> file, err := os.OpenFile(auditFile, os.O_RDWR, 0666)<br /> if err != nil {<br /> //fmt.Println("Open file error!", err)<br /> return<br /> }<br /> defer file.Close()<br /> <br /> buf := bufio.NewReader(file)<br /> for {<br /> line, err := buf.ReadString('\n')<br /> line = strings.TrimSpace(line)<br /> if line == "" {<br /> return<br /> }<br /> <br /> timeEnd := strings.Index(line, ",")<br /> timeString := line[0 :timeEnd]<br /> tm, _ := time.Parse("2006-01-02 03:04:05", timeString)<br /> <br /> ugiStart := strings.Index(line, "ugi=") + 4<br /> ugiEnd := strings.Index(line, " (auth")<br /> ugi := line[ugiStart :ugiEnd]<br /> <br /> cmdStart := strings.Index(line, "cmd=") + 4<br /> line = line[cmdStart:len(line)]<br /> cmdEnd := strings.Index(line, " ")<br /> cmd := line[0 : cmdEnd]<br /> <br /> srcStart := strings.Index(line, "src=") + 4<br /> line = line[srcStart:len(line)]<br /> srcEnd := strings.Index(line, " ")<br /> src := line[0:srcEnd]<br /> <br /> dstStart := strings.Index(line, "dst=") + 4<br /> line = line[dstStart:len(line)]<br /> dstEnd := strings.Index(line, " ")<br /> dst := line[0:dstEnd]<br /> <br /> event := common.MapStr{<br /> "@timestamp": common.Time(time.Unix(tm.Unix(), 0)),<br /> "ugi": ugi,<br /> "cmd": cmd,<br /> "src": src,<br /> "dst": dst,<br /> }<br /> bt.client.PublishEvent(event)<br /> <br /> if err != nil {<br /> if err == io.EOF {<br /> //fmt.Println("File read ok!")<br /> break<br /> } else {<br /> //fmt.Println("Read file error!", err)<br /> return<br /> }<br /> }<br /> }<br /> }<br /> <br />
4 _meat/fields.yml
```
[root@minikube-2830379 hdfsauditbeat]# less _meta/fields.yml - key: hdfsauditbeat
title: hdfsauditbeat
description:
fields:
- name: counter
type: long
required: true
description: >
PLEASE UPDATE DOCUMENTATION
new fiels added hdfsaudit
- name: entrytime
type: date - name: ugi
type: keyword - name: cmd
type: keyword - name: src
type: keyword - name: dst
type: keyword
```
测试
首先编译好项目
<br /> make update<br /> make<br />
然后会发现生成了一个hdfsauditbeat文件,这个就是二进制的可执行文件。下面进行测试,这里偷了个懒,没有发给es,而是吐到console进行观察。
修改了一下配置文件,需要指定正确的需要消费的audit log文件的路径,另外就是修改了output为console
```
[root@minikube-2830379 hdfsauditbeat]# cat hdfsauditbeat.yml
################### Hdfsauditbeat Configuration Example #########################
############################# Hdfsauditbeat ######################################
hdfsauditbeat:Defines how often an event is sent to the output
period: 1s
path: "/root/go/hdfs-audit.log"
================================ General =====================================
The name of the shipper that publishes the network data. It can be used to group
all the transactions sent by a single shipper in the web interface.
name:
The tags of the shipper are included in their own field with each
transaction published.
tags: ["service-X", "web-tier"]
Optional fields that you can specify to add additional information to the
output.
fields:
env: staging
================================ Outputs =====================================
Configure what outputs to use when sending the data collected by the beat.
Multiple outputs may be used.
-------------------------- Elasticsearch output ------------------------------
output.elasticsearch:
Array of hosts to connect to.
hosts: ["localhost:9200"]
Optional protocol and basic auth credentials.
protocol: "https"
username: "elastic"
password: "changeme"
----------------------------- Logstash output --------------------------------
output.logstash:
The Logstash hosts
hosts: ["localhost:5044"]
Optional SSL. By default is off.
List of root certificates for HTTPS server verifications
ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]
Certificate for SSL client authentication
ssl.certificate: "/etc/pki/client/cert.pem"
Client Certificate Key
ssl.key: "/etc/pki/client/cert.key"
output.console:
pretty: true================================ Logging =====================================
Sets log level. The default log level is info.
Available log levels are: critical, error, warning, info, debug
logging.level: debug
At debug level, you can selectively enable logging only for some components.
To enable all selectors use ["*"]. Examples of other selectors are "beat",
"publish", "service".
logging.selectors: ["*"]
<br /> <br /> 开始执行<br />
[root@minikube-2830379 hdfsauditbeat]# ./hdfsauditbeat
{
"@timestamp": "2018-12-09T03:00:00.000Z",
"beat": {
"hostname": "minikube-2830379.lvs02.dev.abc.com",
"name": "minikube-2830379.lvs02.dev.abc.com",
"version": "5.1.3"
},
"cmd": "create",
"dst": "null",
"src": "/app-logs/app/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
"ugi": "appmon@APD.ABC.COM"
}
{
"@timestamp": "2018-12-09T03:00:00.000Z",
"beat": {
"hostname": "minikube-2830379.lvs02.dev.abc.com",
"name": "minikube-2830379.lvs02.dev.abc.com",
"version": "5.1.3"
},
"cmd": "create",
"dst": "null",
"src": "/app-logs/appmon/logs/application_1540949675029_717305/lvsdpehdc25dn0444.stratus.lvs.abc.com_8042.tmp",
"ugi": "appmon@APD.ABC.COM"
}
```
结束
使用自定义beat给我们提供了很大的灵活性,虽然pipline或者logstash也可以做到,但是使用场景还是有很大差别的。如果是调用特殊的命令获得输出,或者是本文的场景都更适合定制化beat。
- name: counter
社区日报 第473期 (2018-12-09)
社区日报 • 至尊宝 发表了文章 • 0 个评论 • 1532 次浏览 • 2018-12-09 10:11
http://t.cn/EyEJZGO
2.(自备梯子)将full-scale ELK栈部署到Kubernetes。
http://t.cn/EyEiOtk
3.(自备梯子)Facebook建立在不平等的基础之上。
http://t.cn/EyE6quM
编辑:至尊宝
归档:https://elasticsearch.cn/article/6181
订阅:https://tinyletter.com/elastic-daily