diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index ef876b1d8c15ae8fa8d27f359b9c75e8cee7d2d3..9ae74d9d7b898c0598b7cacf530edfdf605fb238 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -211,33 +211,37 @@ private[netty] class NettyRpcEnv( } } - if (remoteAddr == address) { - val p = Promise[Any]() - p.future.onComplete { - case Success(response) => onSuccess(response) - case Failure(e) => onFailure(e) - }(ThreadUtils.sameThread) - dispatcher.postLocalMessage(message, p) - } else { - val rpcMessage = RpcOutboxMessage(serialize(message), - onFailure, - (client, response) => onSuccess(deserialize[Any](client, response))) - postToOutbox(message.receiver, rpcMessage) - promise.future.onFailure { - case _: TimeoutException => rpcMessage.onTimeout() - case _ => + try { + if (remoteAddr == address) { + val p = Promise[Any]() + p.future.onComplete { + case Success(response) => onSuccess(response) + case Failure(e) => onFailure(e) + }(ThreadUtils.sameThread) + dispatcher.postLocalMessage(message, p) + } else { + val rpcMessage = RpcOutboxMessage(serialize(message), + onFailure, + (client, response) => onSuccess(deserialize[Any](client, response))) + postToOutbox(message.receiver, rpcMessage) + promise.future.onFailure { + case _: TimeoutException => rpcMessage.onTimeout() + case _ => + }(ThreadUtils.sameThread) + } + + val timeoutCancelable = timeoutScheduler.schedule(new Runnable { + override def run(): Unit = { + onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}")) + } + }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) + promise.future.onComplete { v => + timeoutCancelable.cancel(true) }(ThreadUtils.sameThread) + } catch { + case NonFatal(e) => + onFailure(e) } - - val timeoutCancelable = timeoutScheduler.schedule(new Runnable { - override def run(): Unit = { - promise.tryFailure( - new TimeoutException(s"Cannot receive any reply in ${timeout.duration}")) - } - }, timeout.duration.toNanos, TimeUnit.NANOSECONDS) - promise.future.onComplete { v => - timeoutCancelable.cancel(true) - }(ThreadUtils.sameThread) promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) }