diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 640d7bbcbee7bbd37ede98edcee693ac52779b1c..4f49b078bdc66355e5d84ca8ed5ce8850b41792b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -24,6 +24,7 @@ import java.util.concurrent._ import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.scheduler._ @@ -368,12 +369,17 @@ private[spark] class Executor( } val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId) - val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, - retryAttempts, retryIntervalMs, timeout) - if (response.reregisterBlockManager) { - logWarning("Told to re-register on heartbeat") - env.blockManager.reregister() + try { + val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef, + retryAttempts, retryIntervalMs, timeout) + if (response.reregisterBlockManager) { + logWarning("Told to re-register on heartbeat") + env.blockManager.reregister() + } + } catch { + case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t) } + Thread.sleep(interval) } }