ES5.6.12查询search队列大小是如何突破queue_size的限制的
看文档thread_pool.search.queue_size: 1000默认配置是1000,压测es的过程中发现当我直接往某个节点发送查询请求时,search队列的这个值是有可能突破这个1000的大小限制,感到挺惊讶的,莫非es5的代码有bug?
测试用例是每秒1000个查询,按照自己想象应该是超过1000es直接拒绝的,但是查看线程池取发现查询能够进入队列排队。
http://10.30.111.128:9222/_cat ... %3Dip,id,name,active,queue,rejected,completed,queue_size
ip id name active queue rejected completed queue_size
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ bulk 0 0 0 0 200
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ fetch_shard_started 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ fetch_shard_store 0 0 0 1684
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ flush 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ force_merge 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ generic 0 0 0 7285
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ get 0 0 0 0 1000
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ index 0 0 0 0 200
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ listener 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ management 1 0 0 2317
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ refresh 0 0 0 103813
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ search 49 999 546055 420137 1000
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ snapshot 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ warmer 0 0 0 840
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg bulk 0 0 0 0 200
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg fetch_shard_started 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg fetch_shard_store 0 0 0 842
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg flush 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg force_merge 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg generic 0 0 0 9228
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg get 0 0 0 0 1000
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg index 0 0 0 0 200
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg listener 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg management 1 0 0 2006
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg refresh 0 0 0 101278
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg search 49 1912 495861 708533 1000
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg snapshot 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg warmer 0 0 0 840
用例是直接往127发的,127的队列突破了限制,但是128的队列并没有。难道是往哪里发哪里会突破限制吗?于是将查询发往128,果然128的突破了队列大小限制,127并不会。
然后又去看了es5.6.12的源码,search队列是Fix类型的
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
该队列实际是SizeBlockingQueue,内部包了
BlockingQueue<E> queue;
int capacity; // 容量
AtomicInteger size = new AtomicInteger(); // 队列实际大小
入队函数会检验capacity,超过容量是无法入队的
@Override
public boolean offer(E e) {
while (true) {
final int current = size.get();
if (current >= capacity()) {
return false;
}
if (size.compareAndSet(current, 1 + current)) {
break;
}
}
boolean offered = queue.offer(e);
if (!offered) {
size.decrementAndGet();
}
return offered;
}
另外一个强制入队函数倒是有可能突破capacity的大小限制
/**
* Forces adding an element to the queue, without doing size checks.
*/
public void forcePut(E e) throws InterruptedException {
size.incrementAndGet();
try {
queue.put(e);
} catch (InterruptedException ie) {
size.decrementAndGet();
throw ie;
}
}
但是forcePut调用的地方只有一处EsAbortPolicy,只有被拒绝并且是强制执行的任务,才会不校验大小强制入队。但是一般的普通查询失强制执行的任务吗?
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
刚接触es源码,有没有人有什么想法的?希望可以一起探讨一下
看文档thread_pool.search.queue_size: 1000默认配置是1000,压测es的过程中发现当我直接往某个节点发送查询请求时,search队列的这个值是有可能突破这个1000的大小限制,感到挺惊讶的,莫非es5的代码有bug?
测试用例是每秒1000个查询,按照自己想象应该是超过1000es直接拒绝的,但是查看线程池取发现查询能够进入队列排队。
http://10.30.111.128:9222/_cat ... %3Dip,id,name,active,queue,rejected,completed,queue_size
ip id name active queue rejected completed queue_size
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ bulk 0 0 0 0 200
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ fetch_shard_started 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ fetch_shard_store 0 0 0 1684
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ flush 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ force_merge 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ generic 0 0 0 7285
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ get 0 0 0 0 1000
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ index 0 0 0 0 200
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ listener 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ management 1 0 0 2317
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ refresh 0 0 0 103813
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ search 49 999 546055 420137 1000
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ snapshot 0 0 0 0
10.30.111.128 c8-YRxgPTxajlTQ3tmIgqQ warmer 0 0 0 840
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg bulk 0 0 0 0 200
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg fetch_shard_started 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg fetch_shard_store 0 0 0 842
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg flush 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg force_merge 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg generic 0 0 0 9228
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg get 0 0 0 0 1000
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg index 0 0 0 0 200
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg listener 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg management 1 0 0 2006
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg refresh 0 0 0 101278
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg search 49 1912 495861 708533 1000
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg snapshot 0 0 0 0
10.30.111.127 cCKKZrRdSlWfU1lMT4XjXg warmer 0 0 0 840
用例是直接往127发的,127的队列突破了限制,但是128的队列并没有。难道是往哪里发哪里会突破限制吗?于是将查询发往128,果然128的突破了队列大小限制,127并不会。
然后又去看了es5.6.12的源码,search队列是Fix类型的
builders.put(Names.SEARCH, new FixedExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000));
该队列实际是SizeBlockingQueue,内部包了
BlockingQueue<E> queue;
int capacity; // 容量
AtomicInteger size = new AtomicInteger(); // 队列实际大小
入队函数会检验capacity,超过容量是无法入队的
@Override
public boolean offer(E e) {
while (true) {
final int current = size.get();
if (current >= capacity()) {
return false;
}
if (size.compareAndSet(current, 1 + current)) {
break;
}
}
boolean offered = queue.offer(e);
if (!offered) {
size.decrementAndGet();
}
return offered;
}
另外一个强制入队函数倒是有可能突破capacity的大小限制
/**
* Forces adding an element to the queue, without doing size checks.
*/
public void forcePut(E e) throws InterruptedException {
size.incrementAndGet();
try {
queue.put(e);
} catch (InterruptedException ie) {
size.decrementAndGet();
throw ie;
}
}
但是forcePut调用的地方只有一处EsAbortPolicy,只有被拒绝并且是强制执行的任务,才会不校验大小强制入队。但是一般的普通查询失强制执行的任务吗?
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof AbstractRunnable) {
if (((AbstractRunnable) r).isForceExecution()) {
BlockingQueue<Runnable> queue = executor.getQueue();
if (!(queue instanceof SizeBlockingQueue)) {
throw new IllegalStateException("forced execution, but expected a size queue");
}
try {
((SizeBlockingQueue) queue).forcePut(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("forced execution, but got interrupted", e);
}
return;
}
}
rejected.inc();
throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
}
刚接触es源码,有没有人有什么想法的?希望可以一起探讨一下
6 个回复
yangjianxuan
赞同来自: caizhongao
Search分为查询和取回两个阶段。当提高查询并发数时,查询阶段InitialSearchPhase,在每个shard上执行查询时performPhaseOnShard,由于es连接异常导致查询失败,会调用EsAbortPolicy策略。InitialSearchPhase属于强制执行的阶段,强制入队列。从而突破了队列的限制。
yangjianxuan
赞同来自:
匿名用户
赞同来自:
你既然看源码了,看看计数有没有问题。我感觉不会出现一直加入队列的情况,那样的话,内存炸了,
你可以测试10000并发,10W并发,百万并发,如果内存没啥起伏 ,计数变化很大,那可能真的是计数部分代码的BUG。
如果内存变化,甚至爆了,那可能真的是无限制的加入队列了。
匿名用户
赞同来自:
这样的话,就知道问题所在了。
匿名用户
赞同来自:
1000并发是太大了,估计2台机器200并发,已经可以了。
拒绝严格来说是不能出现的,或者很少出现,或者刚刚出现,此时到了瓶颈。
压测就应该结束了,而不是一上来就用1000并发。当然了,这都不是问题。
问题是,压测过程中出现了疑惑,那就需要解决。还是非常敬佩楼主的。
80%的用户是不做压测的。
只有20%的用户会压测。
Charele - Cisco4321
赞同来自:
都这么老的版本了。
现在search池是QueueResizing类型的了。