From a41b68b954ba47284a1df312f0aaea29b0721b0a Mon Sep 17 00:00:00 2001
From: Nilanjan Raychaudhuri <nraychaudhuri@gmail.com>
Date: Mon, 1 Feb 2016 13:33:24 -0800
Subject: [PATCH] [SPARK-12265][MESOS] Spark calls System.exit inside driver
 instead of throwing exception

This takes over #10729 and makes sure that `spark-shell` fails with a proper error message. There is a slight behavioral change: before this change `spark-shell` would exit, while now the REPL is still there, but `sc` and `sqlContext` are not defined and the error is visible to the user.

Author: Nilanjan Raychaudhuri <nraychaudhuri@gmail.com>
Author: Iulian Dragos <jaguarul@gmail.com>

Closes #10921 from dragos/pr/10729.
---
 .../cluster/mesos/MesosClusterScheduler.scala |  1 +
 .../cluster/mesos/MesosSchedulerBackend.scala |  1 +
 .../cluster/mesos/MesosSchedulerUtils.scala   | 21 +++++++++++++++----
 3 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 05fda0fded..e77d77208c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -573,6 +573,7 @@ private[spark] class MesosClusterScheduler(
   override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
   override def error(driver: SchedulerDriver, error: String): Unit = {
     logError("Error received: " + error)
+    markErr()
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index eaf0cb06d6..a8bf79a78c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -375,6 +375,7 @@ private[spark] class MesosSchedulerBackend(
   override def error(d: SchedulerDriver, message: String) {
     inClassLoader() {
       logError("Mesos error: " + message)
+      markErr()
       scheduler.error(message)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 010caff3e3..f9f5da9bc8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -106,28 +106,37 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
         registerLatch.await()
         return
       }
+      @volatile
+      var error: Option[Exception] = None
 
+      // We create a new thread that will block inside `mesosDriver.run`
+      // until the scheduler exists
       new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
         setDaemon(true)
-
         override def run() {
-          mesosDriver = newDriver
           try {
+            mesosDriver = newDriver
             val ret = mesosDriver.run()
             logInfo("driver.run() returned with code " + ret)
             if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
-              System.exit(1)
+              error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
+              markErr()
             }
           } catch {
             case e: Exception => {
               logError("driver.run() failed", e)
-              System.exit(1)
+              error = Some(e)
+              markErr()
             }
           }
         }
       }.start()
 
       registerLatch.await()
+
+      // propagate any error to the calling thread. This ensures that SparkContext creation fails
+      // without leaving a broken context that won't be able to schedule any tasks
+      error.foreach(throw _)
     }
   }
 
@@ -144,6 +153,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
     registerLatch.countDown()
   }
 
+  protected def markErr(): Unit = {
+    registerLatch.countDown()
+  }
+
   def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
     val builder = Resource.newBuilder()
       .setName(name)
-- 
GitLab