From b2f24f94591082d3ff82bd3db1760b14603b38aa Mon Sep 17 00:00:00 2001
From: Tejas Patil <tejasp@fb.com>
Date: Fri, 15 Jul 2016 14:27:16 -0700
Subject: [PATCH] [SPARK-16230][CORE] CoarseGrainedExecutorBackend to self kill
 if there is an exception while creating an Executor

## What changes were proposed in this pull request?

With the fix from SPARK-13112, I see that `LaunchTask` is always processed after `RegisteredExecutor` is done and so it gets chance to do all retries to startup an executor. There is still a problem that if `Executor` creation itself fails and there is some exception, it gets unnoticed and the executor is killed when it tries to process the `LaunchTask` as `executor` is null : https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L88 So if one looks at the logs, it does not tell that there was problem during `Executor` creation and thats why it was killed.

This PR explicitly catches exception in `Executor` creation, logs a proper message and then exits the JVM. Also, I have changed the `exitExecutor` method to accept `reason` so that backends can use that reason and do stuff like logging to a DB to get an aggregate of such exits at a cluster level

## How was this patch tested?

I am relying on existing tests

Author: Tejas Patil <tejasp@fb.com>

Closes #14202 from tejasapatil/exit_executor_failure.
---
 .../CoarseGrainedExecutorBackend.scala        | 32 ++++++++++++-------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index ccc6c36e9c..e30839c49c 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.collection.mutable
 import scala.util.{Failure, Success}
+import scala.util.control.NonFatal
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
@@ -64,8 +65,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       case Success(msg) =>
         // Always receive `true`. Just ignore it
       case Failure(e) =>
-        logError(s"Cannot register with driver: $driverUrl", e)
-        exitExecutor(1)
+        exitExecutor(1, s"Cannot register with driver: $driverUrl", e)
     }(ThreadUtils.sameThread)
   }
 
@@ -78,16 +78,19 @@ private[spark] class CoarseGrainedExecutorBackend(
   override def receive: PartialFunction[Any, Unit] = {
     case RegisteredExecutor =>
       logInfo("Successfully registered with driver")
-      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
+      try {
+        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
+      } catch {
+        case NonFatal(e) =>
+          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
+      }
 
     case RegisterExecutorFailed(message) =>
-      logError("Slave registration failed: " + message)
-      exitExecutor(1)
+      exitExecutor(1, "Slave registration failed: " + message)
 
     case LaunchTask(data) =>
       if (executor == null) {
-        logError("Received LaunchTask command but executor was null")
-        exitExecutor(1)
+        exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
         val taskDesc = ser.deserialize[TaskDescription](data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,8 +100,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
     case KillTask(taskId, _, interruptThread) =>
       if (executor == null) {
-        logError("Received KillTask command but executor was null")
-        exitExecutor(1)
+        exitExecutor(1, "Received KillTask command but executor was null")
       } else {
         executor.killTask(taskId, interruptThread)
       }
@@ -127,8 +129,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     if (stopping.get()) {
       logInfo(s"Driver from $remoteAddress disconnected during shutdown")
     } else if (driver.exists(_.address == remoteAddress)) {
-      logError(s"Driver $remoteAddress disassociated! Shutting down.")
-      exitExecutor(1)
+      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.")
     } else {
       logWarning(s"An unknown ($remoteAddress) driver disconnected.")
     }
@@ -147,7 +148,14 @@ private[spark] class CoarseGrainedExecutorBackend(
    * executor exits differently. For e.g. when an executor goes down,
    * back-end may not want to take the parent process down.
    */
-  protected def exitExecutor(code: Int): Unit = System.exit(code)
+  protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null) = {
+    if (throwable != null) {
+      logError(reason, throwable)
+    } else {
+      logError(reason)
+    }
+    System.exit(code)
+  }
 }
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
-- 
GitLab