From b479278142728eb003b9ee466fab0e8d6ec4b13d Mon Sep 17 00:00:00 2001
From: Tejas Patil <tejasp@fb.com>
Date: Thu, 15 Sep 2016 10:23:41 -0700
Subject: [PATCH] [SPARK-17451][CORE] CoarseGrainedExecutorBackend should
 inform driver before self-kill

## What changes were proposed in this pull request?

Jira : https://issues.apache.org/jira/browse/SPARK-17451

`CoarseGrainedExecutorBackend` in some failure cases exits the JVM. While this does not have any issue, from the driver UI there is no specific reason captured for this. In this PR, I am adding functionality to `exitExecutor` to notify driver that the executor is exiting.

## How was this patch tested?

Ran the change over a test env and took down shuffle service before the executor could register to it. In the driver logs, where the job failure reason is mentioned (ie. `Job aborted due to stage ...` it gives the correct reason:

Before:
`ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.`

After:
`ExecutorLostFailure (executor ZZZZZZZZZ exited caused by one of the running tasks) Reason: Unable to create executor due to java.util.concurrent.TimeoutException: Timeout waiting for task.`

Author: Tejas Patil <tejasp@fb.com>

Closes #15013 from tejasapatil/SPARK-17451_inform_driver.
---
 .../CoarseGrainedExecutorBackend.scala        | 26 ++++++++++++++-----
 .../apache/spark/storage/BlockManager.scala   |  3 +++
 2 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 391b97d73e..7eec4ae64f 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc._
-import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -65,7 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       case Success(msg) =>
         // Always receive `true`. Just ignore it
       case Failure(e) =>
-        exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
+        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
     }(ThreadUtils.sameThread)
   }
 
@@ -129,7 +129,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     if (stopping.get()) {
       logInfo(s"Driver from $remoteAddress disconnected during shutdown")
     } else if (driver.exists(_.address == remoteAddress)) {
-      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
+      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
+        notifyDriver = false)
     } else {
       logWarning(s"An unknown ($remoteAddress) driver disconnected.")
     }
@@ -148,12 +149,25 @@ private[spark] class CoarseGrainedExecutorBackend(
    * executor exits differently. For e.g. when an executor goes down,
    * back-end may not want to take the parent process down.
    */
-  protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
+  protected def exitExecutor(code: Int,
+                             reason: String,
+                             throwable: Throwable = null,
+                             notifyDriver: Boolean = true) = {
+    val message = "Executor self-exiting due to : " + reason
     if (throwable != null) {
-      logError(reason, throwable)
+      logError(message, throwable)
     } else {
-      logError(reason)
+      logError(message)
     }
+
+    if (notifyDriver && driver.nonEmpty) {
+      driver.get.ask[Boolean](
+        RemoveExecutor(executorId, new ExecutorLossReason(reason))
+      ).onFailure { case e =>
+        logWarning(s"Unable to notify the driver due to " + e.getMessage, e)
+      }(ThreadUtils.sameThread)
+    }
+
     System.exit(code)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index a724fdf009..c172ac2cdc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -199,6 +199,9 @@ private[spark] class BlockManager(
           logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}"
             + s" more times after waiting $SLEEP_TIME_SECS seconds...", e)
           Thread.sleep(SLEEP_TIME_SECS * 1000)
+        case NonFatal(e) =>
+          throw new SparkException("Unable to register with external shuffle server due to : " +
+            e.getMessage, e)
       }
     }
   }
-- 
GitLab