使用 nohup 或 disown 如果你要让某个进程运行在后台。

可定制的 elasticsearch 数据导入工具 ——mysql_2_elasticsearch

parksben 发表了文章 • 7 个评论 • 7017 次浏览 • 2016-12-13 23:55 • 来自相关话题

A customizable importer from mysql to elasticsearch.
可定制的 elasticsearch 数据导入工具 ——基于 elasticsearch 的 JS API
 
 
【github 项目地址】
https://github.com/parksben/mysql_2_elasticsearch 


【主要功能】
1. 完全使用 JS 实现数据从 MySQL 到 elasticsearch 的迁移;
2. 可批量导入多张 MySQL 表;
3. 可自定义的数据迁移规则(数据表/字段关系、字段过滤、使用正则进行数据处理);
4. 可自定义的异步分片导入方式,数据导入效率更高。


【一键安装】
npm install mysql_2_elasticsearch


【快速开始(简单用例)】
var esMysqlRiver = require('mysql_2_elasticsearch');

var river_config = {
mysql: {
host: '127.0.0.1',
user: 'root',
password: 'root',
database: 'users',
port: 3306
},
elasticsearch: {
host_config: { // es客户端的配置参数
host: 'localhost:9200',
// log: 'trace'
},
index: 'myIndex'
},
riverMap: {
'users => users': {} // 将数据表 users 导入到 es 类型: /myIndex/users
}
};


/*
** 以下代码内容:
** 通过 esMysqlRiver 方法进行数据传输,方法的回调参数(一个JSON对象) obj 包含此次数据传输的结果
** 其中:
** 1. obj.total => 需要传输的数据表数量
** 2. obj.success => 传输成功的数据表数量
** 3. obj.failed => 传输失败的数据表数量
** 4. obj.result => 本次数据传输的结论
*/

esMysqlRiver(river_config, function(obj) {
/* 将传输结果打印到终端 */
console.log('\n---------------------------------');
console.log('总传送:' + obj.total + '项');
console.log('成功:' + obj.success + '项');
console.log('失败:' + obj.failed + '项');
if (obj.result == 'success') {
console.log('\n结论:全部数据传送完成!');
} else {
console.log('\n结论:传送未成功...');
}
console.log('---------------------------------');
console.log('\n(使用 Ctrl + C 退出进程)');
/* 将传输结果打印到终端 */
});


【最佳实现(完整用例)】
var esMysqlRiver = require('mysql_2_elasticsearch');

/*
** mysql_2_elasticsearch 的相关参数配置(详情见注释)
*/

var river_config = {

/* [必需] MySQL数据库的相关参数(根据实际情况进行修改) */
mysql: {
host: '127.0.0.1',
user: 'root',
password: 'root',
database: 'users',
port: 3306
},

/* [必需] es 相关参数(根据实际情况进行修改) */
elasticsearch: {
host_config: { // [必需] host_config 即 es客户端的配置参数,详细配置参考 es官方文档
host: 'localhost:9200',
log: 'trace',
// Other options...
},
index: 'myIndex', // [必需] es 索引名
chunkSize: 8000, // [非必需] 单分片最大数据量,默认为 5000 (条数据)
timeout: '2m' // [非必需] 单次分片请求的超时时间,默认为 1m
//(注意:此 timeout 并非es客户端请求的timeout,后者请在 host_config 中设置)
},

/* [必需] 数据传送的规则 */
riverMap: {
'users => users': { // [必需] 'a => b' 表示将 mysql数据库中名为 'a' 的 table 的所有数据 输送到 es中名为 'b' 的 type 中去
filter_out: [ // [非必需] 需要过滤的字段名,即 filter_out 中的设置的所有字段将不会被导入 elasticsearch 的数据中
'password',
'age'
],
exception_handler: { // [非必需] 异常处理器,使用JS正则表达式处理异常数据,避免 es 入库时由于类型不合法造成数据缺失
'birthday': [ // [示例] 对 users 表的 birthday 字段的异常数据进行处理
{
match: /NaN/gi, // [示例] 正则条件(此例匹配字段值为 "NaN" 的情况)
writeAs: null // [示例] 将 "NaN" 重写为 null
},
{
match: /(\d{4})年/gi, // [示例] 正则表达式(此例匹配字段值为形如 "2016年" 的情况)
writeAs: '$1.1' // [示例] 将 "2015年" 样式的数据重写为 "2016.1" 样式的数据
}
]
}
},
// Other fields' options...
}

};


/*
** 将传输结果打印到终端
*/

esMysqlRiver(river_config, function(obj) {
console.log('\n---------------------------------');
console.log('总传送:' + obj.total + '项');
console.log('成功:' + obj.success + '项');
console.log('失败:' + obj.failed + '项');
if (obj.result == 'success') {
console.log('\n结论:全部数据传送完成!');
} else {
console.log('\n结论:传送未成功...');
}
console.log('---------------------------------');
console.log('\n(使用 Ctrl + C 退出进程)');
});


【注意事项及参考】
1. elasticsearch数据导入前请先配置好数据的 mapping;
2. "host_config" 更多参数设置详见 [es官方API文档] https://www.elastic.co/guide/e ... .html
3. mysql 表的自增 id 将自动替换为 "表名+_id" 的格式,如:"users_id";
4. 如出现数据缺失情况,请注意查看 elasticsearch 终端进程或日志,找出未成功导入的数据,通过设置 exception_handler 参数处理它。

搜索"hello world”同义成“we can”这种要怎么做?

回复

zplzpl 发起了问题 • 1 人关注 • 0 个回复 • 3903 次浏览 • 2016-12-13 19:31 • 来自相关话题

Day6:《记一次es性能调优》

xiaorui 发表了文章 • 5 个评论 • 13325 次浏览 • 2016-12-13 11:49 • 来自相关话题


一.前言
应medcl写es文章的时候,其实这段时间es研究的不多,感觉没什么新东西可写。
考虑只有这次调优心得可与大家分享,文笔有限,见谅!
二.背景
先交代一下背景,调优的项目是某电商类搜索项目,流量来自于前端的app和h5。
搜索主要是根据用户的地理位置和关键字等条件搜索附近的商家和商品。
商品数据大概在5000w左右,商品更新很频繁,更新量大概是每天2000w条左右,(因商家经常会促销、或者调上下架状态、改价格等)查询也相当频繁。
集群有2个集群,一个主一个备,用于有问题的时候随时切换。主集群有8个节点,配置是32核,
32g内存的docker的机器。给es jvm分配20g内存,jdk 版本是1.7,gc 是使用parnew/cms gc。
这个项目我是后期加入的,来的时候项目已上线。由于参与进来的时候es跑的也还是比较稳定,所以也一直
没调过es的参数。程序,参数基本上也就保持上线的时候那个样子。
es上线的时候是用的1.5版本,后期没升过级。
三.问题
项目大概跑了一年多,时间来到大概16年的9月份。搜索请求响应时间开始出现几秒才完成的情况,
我就被拉过来调优了。通过我们自己内部的调用方法监控,tp99和avg这些值还好,维持在200ms以下。max 最大有5,6s的情况,而且次数有点多。
这里没怎么折腾,很快定位就是es gc导致的。翻了一下es gc日志,就是cms remark这个阶段时间特别长,
而且 这个阶段是stop the world的。
四.解决
为什么remark阶段这么长时间? 直接上结论,就是一次cms 周期内,并发标记后到remark这个期间jvm 堆内存对象
变化很大。说白了对应我们的场景就是一大波 es bulk操作。对应Bigdesk观察,几秒的卡顿基本都出现在一大波 es bulk操作之后。
这里解释一下,引用网上文章的说法:
remark如果耗时较长,通常原因是在cms gc已经结束了concurrent-mark步骤后,旧生代的引用关系仍然发生了很多的变化,旧生代的引用关系发生变化的原因主要是:
* 在这个间隔时间段内,新生代晋升到旧生代的对象比较多;
* 在这个间隔时间段内,新生代没怎么发生ygc,但创建出来的对象又比较多,这种通常就只能是新生代比较大的原因;
原文地址:
http://hellojava.info/?tag=cms-gc-remark

调整一:
加cms gc 的 线程
直接从根源入手,你remark 慢,我就让你跑快点。
因为我们是32 核的cpu ,cpu 利用率用bigdesk观察还是很低的,5%左右。这么低,那就加点线程呗。
-XX:ParallelGCThreads= N
-XX:ParallelCMSThreads= M
调整这2个参数都可以,它们的关系:ParallelCMSThreads = (ParallelGCThreads + 3)/4)
调整后情况缓解了一些,remark还是有3,4秒的情况。
调整二:
关于这点是我们自己的问题。一次bulk 操作就写1条数据。
是的,你没有看错,我们这边的工程师就是这么干的。
一年以前提过这里最好是能合并写,但现在还是这个样子。
这里有一些业务上的原因,合并写会导致一些字段值不准确。
合并写暂时没办法,只能调整es 了。(这里说明一下,其实合并写应该是本次优化比较有效果的办法,可惜这招不让我用。)
通过bigdesk观察,bulk线程池有reject的情况。
但就增加bulk线程池的消费线程,加快数据的消费速度,减少对象驻留在jvm 的时间。
调整后情况没有明显的好转,
但肯定有用,能优化一点是一点嘛。

调整三:
再次从gc入手, -XX:+CMSParallelRemarkEnabled -XX:+CMSScavengeBeforeRemark
这个是网上找的办法:
为了减少第二次暂停的时间,开启并行remark: -XX:+CMSParallelRemarkEnabled。 如果remark还是过长的话,可以开启-XX:+CMSScavengeBeforeRemark选项,强制 remark之前开始一次minor gc,减少remark的暂停时间,但是在remark之后也将立即开始又一次minor gc。调整后情况也没有特别的好转。
以上都是从减小单次cms gc的开销的方向去解决问题,然后我就换了个方向,降低cms gc发生的次数,让它少发生或者不发生。
调整四:
这里调整了一共5个参数,
Xmn10g ==> 8g
CMSInitiatingOccupancyFraction=70 ==>80

index.cache.filter.max_size 2g==>1g
index.cache.filter.expire 2m==>40s
index.refresh_interval 20s==>30s

前2个参数没什么好说的,提高cms gc 被触发的条件,降低cms gc 触发几率。
后3个参数是本文的重点,这里大概讲下es 对于filter cache的管理。
这部分是通过阅读源码分析出来的,涉及代码还挺多,有点复杂,还有很大一部分还是lucene的代码。
这里就不贴大段代码了。
es 对于 filter cache管理是内部维护了一个map的结构(实际是利用com.google.common.cache实现的),关键是这个map 的key 是个它自己定义的类
叫 FilterCacheKey,它override了equals方法
public FilterCacheKey(Object readerKey, Object filterKey) {
    this.readerKey = readerKey;
    this.filterKey = filterKey;
}
...
@Override
public boolean equals(Object o) {
    if (this == o) return true;
//            if (o == null || getClass() != o.getClass()) return false;
    FilterCacheKey that = (FilterCacheKey) o;
    return (readerKey().equals(that.readerKey()) && filterKey.equals(that.filterKey));
}
从这里可以看出,filter cache 能否被再次利用到就跟readerKey和filterKey 有关。
filterkey如果你build 查询语句的时候什么都没设置的话,就是filter对象本身。
举个例子,TermFilter,如果term一样,那前后2次查询filterKey是一致的。
关键是这个readerKey是否一致呢?这个readerKey其实就是lucene 的 indexReader,如果前后2次查询有数据更新并且
index.refresh_interval 这个参数设置的比较小,es 会去刷新indexReader,那么很有可能readerKey不一致。
对应我们的场景,数据更新频繁,而且index.refresh_interval 比较小,那么缓存利用率就不太高。
后一次查询的filter cache 会重新put 到上面提到的map里,然后这个index.cache.filter.max_size 2g 
就迅速占满(我们程序代码里很多地方使用到了filter),配多大都有可能占满。那这个filter cache什么时候被移除呢,index.cache.filter.expire 2m管移除这个事,当然应该还有size满了移除的策略。
这就造成了缓存没什么用,还占这么大地方,还占那么久。
然后这个filter cache就很可能跑到 old gen去了。
那么就让它占少点,不干活就快点走:
index.cache.filter.max_size 2g==>1g
index.cache.filter.expire 2m==>40s
index.refresh_interval 20s==>30s
这些调整完毕,重启es ,通过bigdesk ,gc a线图好看多了,cms gc 基本没了,monitor gc 也顺畅多了。
五.总结和备注
总结:
1.优化这个事,我认为是业务上的优化要比技术上优化有效的多
2.日志和监控是你最好的朋友,仔细的看,你总会看出什么
3.es 缓存要利用好,还是需要好好去设计,回头考虑单独写一篇。
备注:
因为这次优化后,我就离开了原来的公司,没有了原来的环境。所以本文
部分参数和数字可能不准确,我仅凭记忆完成。
 

elasticsearch 2.4 跟flume1.7结合老是报错

howardhuang 回复了问题 • 2 人关注 • 1 个回复 • 5322 次浏览 • 2016-12-13 21:11 • 来自相关话题

ES 5.0 java api shield 连接问题

novia 回复了问题 • 2 人关注 • 1 个回复 • 5237 次浏览 • 2016-12-13 10:45 • 来自相关话题

[Mapping definition for [_source] has unsupported parameters: [compress : true]];

lopince 回复了问题 • 2 人关注 • 3 个回复 • 8906 次浏览 • 2017-03-03 09:41 • 来自相关话题

Spark读取ES地理位置字段出错

coyotey 回复了问题 • 3 人关注 • 2 个回复 • 11460 次浏览 • 2016-12-15 16:41 • 来自相关话题

es5.0启动报错

wungking 回复了问题 • 2 人关注 • 2 个回复 • 8273 次浏览 • 2016-12-12 14:38 • 来自相关话题

关于elasticsearch的子查询搜索

回复

cggy2012 回复了问题 • 1 人关注 • 1 个回复 • 7273 次浏览 • 2016-12-12 10:28 • 来自相关话题

通过快照恢复数据,启动了以后,集群是红的,主节点没有分片数据,求救

回复

show 回复了问题 • 1 人关注 • 1 个回复 • 5129 次浏览 • 2017-01-04 17:55 • 来自相关话题

Elasticsearch排序和修改文档问题

回复

匿名用户 发起了问题 • 1 人关注 • 0 个回复 • 5076 次浏览 • 2016-12-10 16:32 • 来自相关话题

Elasticsearch5.0.2如何安装head插件

novia 回复了问题 • 2 人关注 • 1 个回复 • 7476 次浏览 • 2016-12-09 15:41 • 来自相关话题

routing问题(生产环境发现一个分片上的数据为空)

lbeny 回复了问题 • 4 人关注 • 2 个回复 • 4427 次浏览 • 2017-04-12 19:47 • 来自相关话题

用es作为数据源,sparksql做count查询速度问题

medcl 回复了问题 • 6 人关注 • 4 个回复 • 15737 次浏览 • 2017-02-23 16:16 • 来自相关话题

ElasticSearch多词搜索,权重问题

strglee 回复了问题 • 4 人关注 • 1 个回复 • 7350 次浏览 • 2016-12-08 08:52 • 来自相关话题