From ef3fb801ae971656ed9cd1b0ab95bc5a1548adbd Mon Sep 17 00:00:00 2001 From: zsxwing <zsxwing@gmail.com> Date: Thu, 16 Apr 2015 13:45:55 -0500 Subject: [PATCH] [SPARK-6934][Core] Use 'spark.akka.askTimeout' for the ask timeout Fixed my mistake in #4588 Author: zsxwing <zsxwing@gmail.com> Closes #5529 from zsxwing/SPARK-6934 and squashes the following commits: 9890b2d [zsxwing] Use 'spark.akka.askTimeout' for the ask timeout --- core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index e259867c14..f2c1c86af7 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -284,7 +284,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) private[this] val maxRetries = conf.getInt("spark.akka.num.retries", 3) private[this] val retryWaitMs = conf.getLong("spark.akka.retry.wait", 3000) - private[this] val defaultTimeout = conf.getLong("spark.akka.lookupTimeout", 30) seconds + private[this] val defaultAskTimeout = conf.getLong("spark.akka.askTimeout", 30) seconds /** * return the address for the [[RpcEndpointRef]] @@ -304,7 +304,8 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) * * This method only sends the message once and never retries. */ - def sendWithReply[T: ClassTag](message: Any): Future[T] = sendWithReply(message, defaultTimeout) + def sendWithReply[T: ClassTag](message: Any): Future[T] = + sendWithReply(message, defaultAskTimeout) /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a `Future` to @@ -327,7 +328,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf) * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ - def askWithReply[T: ClassTag](message: Any): T = askWithReply(message, defaultTimeout) + def askWithReply[T: ClassTag](message: Any): T = askWithReply(message, defaultAskTimeout) /** * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a -- GitLab