From ffd97be32a53d033ed5ca7545b6d84f0794774cf Mon Sep 17 00:00:00 2001
From: Sandy Ryza <sandy@cloudera.com>
Date: Tue, 23 Sep 2014 13:44:18 -0700
Subject: [PATCH] SPARK-3612. Executor shouldn't quit if heartbeat message
 fails to reach ...

...the driver

Author: Sandy Ryza <sandy@cloudera.com>

Closes #2487 from sryza/sandy-spark-3612 and squashes the following commits:

2b7353d [Sandy Ryza] SPARK-3612. Executor shouldn't quit if heartbeat message fails to reach the driver
(cherry picked from commit d79238d03a2ffe0cf5fc6166543d67768693ddbe)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>
---
 .../org/apache/spark/executor/Executor.scala     | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 640d7bbcbe..4f49b078bd 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,6 +24,7 @@ import java.util.concurrent._
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.util.control.NonFatal
 
 import org.apache.spark._
 import org.apache.spark.scheduler._
@@ -368,12 +369,17 @@ private[spark] class Executor(
           }
 
           val message = Heartbeat(executorId, tasksMetrics.toArray, env.blockManager.blockManagerId)
-          val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
-            retryAttempts, retryIntervalMs, timeout)
-          if (response.reregisterBlockManager) {
-            logWarning("Told to re-register on heartbeat")
-            env.blockManager.reregister()
+          try {
+            val response = AkkaUtils.askWithReply[HeartbeatResponse](message, heartbeatReceiverRef,
+              retryAttempts, retryIntervalMs, timeout)
+            if (response.reregisterBlockManager) {
+              logWarning("Told to re-register on heartbeat")
+              env.blockManager.reregister()
+            }
+          } catch {
+            case NonFatal(t) => logWarning("Issue communicating with driver in heartbeater", t)
           }
+
           Thread.sleep(interval)
         }
       }
-- 
GitLab