居然是你

discovery.zen.ping_timeout 参数作用的疑惑和探究

Elasticsearch | 作者 code4j | 发布于2018年05月11日 | 阅读数:14740

这个参数网上的解释都是集群ping过程的超时等待时间。

但是我发现这个参数配置越大,选主的过程越长,我配置了一分钟,结果每次主节点重启的时候整个集群都会有一段时间不可用,而且选主过程非常慢;但是当我设置了10s后,选主过程快了很多,虽然也会抛异常但是很快就能选举出主节点。 

然后看了下代码,发现ping的回调函数确实需要等待discovery.zen.ping_timeout 这个配置对应的时间才会返回。代码如下:

ZenDiscovery类的findMaster开头有这么一句,就是选主的方法调用。

ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout);

这个方法的执行时间也就是选主所需要的时间,然后接着看
public PingResponse[] pingAndWait(TimeValue timeout) {
final AtomicReference<PingResponse[]> response = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
ping(new PingListener() {
@Override
public void onPing(PingResponse[] pings) {
response.set(pings);
latch.countDown();
}
}, timeout);
try {
latch.await();
return response.get();
} catch (InterruptedException e) {
logger.trace("pingAndWait interrupted");
return null;
}
}


pingAndWait方法里面执行了ping,并等待回调通知后再继续执行,所以timeout究竟做了什么呢?
 @Override
public void ping(PingListener listener, TimeValue timeout) {
List<? extends ZenPing> zenPings = this.zenPings;
CompoundPingListener compoundPingListener = new CompoundPingListener(listener, zenPings);
for (ZenPing zenPing : zenPings) {
try {
zenPing.ping(compoundPingListener, timeout);
} catch (EsRejectedExecutionException ex) {
logger.debug("Ping execution rejected", ex);
compoundPingListener.onPing(null);
}
}
}


这个是ping的实现,其实就是把所有节点ping了一遍,具体看try-catch的那个ping调用:
@Override
public void ping(final PingListener listener, final TimeValue timeout) {
final SendPingsHandler sendPingsHandler = new SendPingsHandler(pingHandlerIdGenerator.incrementAndGet());
try {
receivedResponses.put(sendPingsHandler.id(), sendPingsHandler);
try {
sendPings(timeout, null, sendPingsHandler);
} catch (RejectedExecutionException e) {
logger.debug("Ping execution rejected", e);
// The RejectedExecutionException can come from the fact unicastConnectExecutor is at its max down in sendPings
// But don't bail here, we can retry later on after the send ping has been scheduled.
}
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() {
sendPings(timeout, null, sendPingsHandler);
threadPool.schedule(TimeValue.timeValueMillis(timeout.millis() / 2), ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
sendPings(timeout, TimeValue.timeValueMillis(timeout.millis() / 2), sendPingsHandler);
sendPingsHandler.close();
listener.onPing(sendPingsHandler.pingCollection().toArray());
for (DiscoveryNode node : sendPingsHandler.nodeToDisconnect) {
logger.trace("[{}] disconnecting from {}", sendPingsHandler.id(), node);
transportService.disconnectFromNode(node);
}
}

@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
}

@Override
public void onFailure(Throwable t) {
logger.debug("Ping execution failed", t);
sendPingsHandler.close();
}
});
} catch (EsRejectedExecutionException ex) { // TODO: remove this once ScheduledExecutor has support for AbstractRunnable
sendPingsHandler.close();
// we are shutting down
} catch (Exception e) {
sendPingsHandler.close();
throw new ElasticsearchException("Ping execution failed", e);
}
}

前面注意到,我们的findmaster中的选主时间是由pingAndWait 这个方法决定的,而这个方法一直在等待onPing回调的执行,所以onPing执行完才会结束。所以我们只要关注PingListener的onPing什么时候触发,就知道什么时候选主完成了。
很显然,是在scheduler中执行的,但是看下threadPool.schedule,这个本身就是ScheduledThreadPoolExecutor的包装,其第一个参数对应的就是ScheduledThreadPoolExecutor的delay,也就算是延迟多久执行,很显然他传递的是(timeout.millis() / 2),一半的discovery.zen.ping_timeout对应的时间。

在就是sendPings,这个方法也设置了等待时间,点进去看的话会发现等待时间也是一半的ping_time.
void sendPings(final TimeValue timeout, @Nullable TimeValue waitTime, final SendPingsHandler sendPingsHandler) {
final UnicastPingRequest pingRequest = new UnicastPingRequest();
pingRequest.id = sendPingsHandler.id();
pingRequest.timeout = timeout;
DiscoveryNodes discoNodes = contextProvider.nodes();

pingRequest.pingResponse = createPingResponse(discoNodes);

HashSet<DiscoveryNode> nodesToPingSet = new HashSet<>();
for (PingResponse temporalResponse : temporalResponses) {
// Only send pings to nodes that have the same cluster name.
if (clusterName.equals(temporalResponse.clusterName())) {
nodesToPingSet.add(temporalResponse.node());
}
}

for (UnicastHostsProvider provider : hostsProviders) {
nodesToPingSet.addAll(provider.buildDynamicNodes());
}

// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor<DiscoveryNode> masterNode : discoNodes.getMasterNodes().values()) {
nodesToPingSet.add(masterNode.value);
}

// sort the nodes by likelihood of being an active master
List<DiscoveryNode> sortedNodesToPing = electMasterService.sortByMasterLikelihood(nodesToPingSet);

// new add the the unicast targets first
List<DiscoveryNode> nodesToPing = CollectionUtils.arrayAsArrayList(configuredTargetNodes);
nodesToPing.addAll(sortedNodesToPing);

final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
for (final DiscoveryNode node : nodesToPing) {
// make sure we are connected
final boolean nodeFoundByAddress;
DiscoveryNode nodeToSend = discoNodes.findByAddress(node.address());
if (nodeToSend != null) {
nodeFoundByAddress = true;
} else {
nodeToSend = node;
nodeFoundByAddress = false;
}

if (!transportService.nodeConnected(nodeToSend)) {
if (sendPingsHandler.isClosed()) {
return;
}
// if we find on the disco nodes a matching node by address, we are going to restore the connection
// anyhow down the line if its not connected...
// if we can't resolve the node, we don't know and we have to clean up after pinging. We do have
// to make sure we don't disconnect a true node which was temporarily removed from the DiscoveryNodes
// but will be added again during the pinging. We therefore create a new temporary node
if (!nodeFoundByAddress) {
if (!nodeToSend.id().startsWith(UNICAST_NODE_PREFIX)) {
DiscoveryNode tempNode = new DiscoveryNode("",
UNICAST_NODE_PREFIX + unicastNodeIdGenerator.incrementAndGet() + "_" + nodeToSend.id() + "#",
nodeToSend.getHostName(), nodeToSend.getHostAddress(), nodeToSend.address(), nodeToSend.attributes(), nodeToSend.version()
);
logger.trace("replacing {} with temp node {}", nodeToSend, tempNode);
nodeToSend = tempNode;
}
sendPingsHandler.nodeToDisconnect.add(nodeToSend);
}
// fork the connection to another thread
final DiscoveryNode finalNodeToSend = nodeToSend;
unicastConnectExecutor.execute(new Runnable() {
@Override
public void run() {
if (sendPingsHandler.isClosed()) {
return;
}
boolean success = false;
try {
// connect to the node, see if we manage to do it, if not, bail
if (!nodeFoundByAddress) {
logger.trace("[{}] connecting (light) to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNodeLight(finalNodeToSend);
} else {
logger.trace("[{}] connecting to {}", sendPingsHandler.id(), finalNodeToSend);
transportService.connectToNode(finalNodeToSend);
}
logger.trace("[{}] connected to {}", sendPingsHandler.id(), node);
if (receivedResponses.containsKey(sendPingsHandler.id())) {
// we are connected and still in progress, send the ping request
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, finalNodeToSend);
} else {
// connect took too long, just log it and bail
latch.countDown();
logger.trace("[{}] connect to {} was too long outside of ping window, bailing", sendPingsHandler.id(), node);
}
success = true;
} catch (ConnectTransportException e) {
// can't connect to the node - this is a more common path!
logger.trace("[{}] failed to connect to {}", e, sendPingsHandler.id(), finalNodeToSend);
} catch (RemoteTransportException e) {
// something went wrong on the other side
logger.debug("[{}] received a remote error as a response to ping {}", e, sendPingsHandler.id(), finalNodeToSend);
} catch (Throwable e) {
logger.warn("[{}] failed send ping to {}", e, sendPingsHandler.id(), finalNodeToSend);
} finally {
if (!success) {
latch.countDown();
}
}
}
});
} else {
sendPingRequestToNode(sendPingsHandler.id(), timeout, pingRequest, latch, node, nodeToSend);
}
}
if (waitTime != null) {
try {
latch.await(waitTime.millis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}
}
}


注意最后这个if,如果waitTime!=null 则latch.await。外面传递的waitTime是一半的ping_time哦。

所以初步得出结论  ping_time 代表的是ping请求调用超时时间,但同时也是选主的delay time。

社区的同学们你们怎么理解的呢?
 
已邀请:

kennywu76 - Wood

赞同来自: code4j famoss kwan PerryPcd

@yayg2008 补充得是对的。 我最初的分析里面有一些不正确的地方。  discovery.zen.ping_timeout主要是控制master选举过程中,发现其他node存活的超时设置,主要影响选举的耗时,  判断结点是否脱离是discovery.zen.fd.ping_timeout这个参数。
 
我们生产环境discovery.zen.ping_timeout用的默认的3s, 而discovery.zen.fd相关的几个参数设置如下:


ping_interval: 10s  (默认1s)
ping_timeout: 60s   (默认30)
ping_retries: 3   (等于默认值)


这些参数在我们的环境长期运行后验证基本是比较理想的。 只有负载最重的日志集群,在夜间做force merge的时候,因为某些shard过大(300 - 400GB), 大量的IO操作因为机器load过高,偶尔出现结点被误判脱离,然后马上又加回的现象。 虽然继续增大上面的几个参数可以减少误判的机会,但是如果真的有结点故障,将其剔除掉的周期又太长。 所以我们还是通过增加shard数量,限制shard的size来缓解forcemerge带来的压力,降低高负载结点被误判脱离的几率。 
 

kennywu76 - Wood

赞同来自: code4j wayne

discovery.zen.ping_timeout 设置比较长,的确会延缓选主的时间,这点官方文档有说明。 根据你贴的代码看,选举master的时候,会连续发送3次ping测试,顺序是这样的:


  1. 发送第一轮ping
  2.  shedule第二轮ping,间隔为1/2 timeout时间
  3.  schedule第三轮 ping,间隔为 1/2 timeout时间。 
  4.  第三轮sendpings传递了waitTime参数,其值也是1/2 timeout时间,用于设置countdown latch await时长。如果对每个node的ping测试很快顺利完成,latch countdown应该也是瞬间的,这里几乎没有什么耗时。
  5. 通知listener结果,结束选主过程。


 
假设discovery.zen.ping_timeout是默认的3s, 并且所有结点都正常工作,立即响应ping请求。那么上述步骤耗时大致应该为:


  1.  ~ 0
  2. 1.5s
  3. 1.5s 
  4. ~ 0
  5. ~ 0


即大约3s完成,也就是选主过程基本和timeout时长一致。 
 
再假设只有第一轮检测timeout,后面两轮顺利,则这个过程耗时应该大致为:


  1. ~ 3s  (timed out)
  2. 1.5s
  3. 1.5s
  4. ~ 0 
  5. ~ 0


总共是6s。 
 
所以discovery.zen.ping_timeout 这个参数设置比较大,可以减少master因为负载过重掉出集群的风险。 但同时如果master真出问题了,重新选举过程会很长。
 
PS. 我在我本地机器上,用默认的3s超时设置,测试将master停止,选出新master的耗时差不多就是3s多一点点。
[2018-05-11T16:45:09,484][INFO ][o.e.d.z.ZenDiscovery     ] [node-2] master_left [{node-1}{3nqaDcu_SCO8JC9MJrBY8g}{E3SdgEkTSEii_G1W8XN-zQ}{10.32.10.11}{10.32.10.11:9300}{ml.machine_memory=17179869184, ml.max_open_jobs=20, ml.enabled=true}], reason [shut_down]
[2018-05-11T16:45:09,485][WARN ][o.e.d.z.ZenDiscovery ] [node-2] master left (reason = shut_down), current nodes: nodes:
{node-2}{HiyRXBBfR1i-odeE5iLcdw}{i_isvs7ySIK2KXISxISVHw}{10.32.10.11}{10.32.10.11:9301}{ml.machine_memory=17179869184, ml.max_open_jobs=20, ml.enabled=true}, local
{node-1}{3nqaDcu_SCO8JC9MJrBY8g}{E3SdgEkTSEii_G1W8XN-zQ}{10.32.10.11}{10.32.10.11:9300}{ml.machine_memory=17179869184, ml.max_open_jobs=20, ml.enabled=true}, master
{node-3}{wko23-UqTUux31z28IYsnw}{k2jPMlnLQFefLNaYd_9siw}{10.32.10.11}{10.32.10.11:9302}{ml.machine_memory=17179869184, ml.max_open_jobs=20, ml.enabled=true}

[2018-05-11T16:45:09,488][INFO ][o.e.x.w.WatcherService ] [node-2] stopping watch service, reason [no master node]
[2018-05-11T16:45:12,507][INFO ][o.e.c.s.MasterService ] [node-2] zen-disco-elected-as-master ([1] nodes joined)[{node-3}{wko23-UqTUux31z28IYsnw}{k2jPMlnLQFefLNaYd_9siw}{10.32.10.11}{10.32.10.11:9302}{ml.machine_memory=17179869184, ml.max_open_jobs=20, ml.enabled=true}], reason: new_master {node-2}{HiyRXBBfR1i-odeE5iLcdw}{i_isvs7ySIK2KXISxISVHw}{10.32.10.11}{10.32.10.11:9301}{ml.machine_memory=17179869184, ml.max_open_jobs=20, ml.enabled=true}

 
 
 
 

yayg2008

赞同来自:

@kennywu76分析的非常详细。我之前也遇到过这个困惑,当时想着为了防止节点误脱离,将discovery.zen.ping_timeout 这个参数设置了120秒。导致的后果就是节点启动非常慢,慢的原因就是因为要做3轮ping。每一轮之间都会有一个增量delay,加起来差不多要等待180秒,虽然每一次ping 都几乎是实时响应。
所以我的建议是这个参数保持默认3秒就好。至于节点脱离问题,其实是由另一个参数 discovery.zen.fd.ping_timeout 控制的。

liuliuliu

赞同来自:

这个问题现在有解决方法了吗?最近也遇到了这种情况,集群压力不是很大,网络状态也很好,可是集群间隔一两天就有节点脱离集群,然后12分钟左右,集群又恢复正常。我们用的是es6.8版本。

要回复问题请先登录注册