From f68d024096c90936f9aa4e325141b39f08c72476 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Mon, 17 Aug 2015 10:34:22 -0700
Subject: [PATCH] [SPARK-7736] [CORE] [YARN] Make pyspark fail YARN app on
 failure.

The YARN backend doesn't like when user code calls `System.exit`,
since it cannot know the exit status and thus cannot set an
appropriate final status for the application.

So, for pyspark, avoid that call and instead throw an exception with
the exit code. SparkSubmit handles that exception and exits with
the given exit code, while YARN uses the exit code as the failure
code for the Spark app.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #7751 from vanzin/SPARK-9416.
---
 .../org/apache/spark/SparkException.scala     |  7 ++++++
 .../apache/spark/deploy/PythonRunner.scala    | 23 +++++++++++++++----
 .../org/apache/spark/deploy/SparkSubmit.scala | 10 ++++++--
 .../spark/deploy/yarn/ApplicationMaster.scala |  8 +++++--
 4 files changed, 40 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 2ebd7a7151..977a27bdfe 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
  */
 private[spark] class SparkDriverExecutionException(cause: Throwable)
   extends SparkException("Execution error", cause)
+
+/**
+ * Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
+ * the parent SparkSubmit process to exit with the same exit code.
+ */
+private[spark] case class SparkUserAppException(exitCode: Int)
+  extends SparkException(s"User application exited with $exitCode")
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index c2ed43a539..4277ac2ad1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 import scala.util.Try
 
+import org.apache.spark.SparkUserAppException
 import org.apache.spark.api.python.PythonUtils
 import org.apache.spark.util.{RedirectThread, Utils}
 
@@ -46,7 +47,14 @@ object PythonRunner {
     // Launch a Py4J gateway server for the process to connect to; this will let it see our
     // Java system properties and such
     val gatewayServer = new py4j.GatewayServer(null, 0)
-    gatewayServer.start()
+    val thread = new Thread(new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions {
+        gatewayServer.start()
+      }
+    })
+    thread.setName("py4j-gateway")
+    thread.setDaemon(true)
+    thread.start()
 
     // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
     // python directories in SPARK_HOME (if set), and any files in the pyFiles argument
@@ -64,11 +72,18 @@ object PythonRunner {
     env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
     env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
     builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
-    val process = builder.start()
+    try {
+      val process = builder.start()
 
-    new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+      new RedirectThread(process.getInputStream, System.out, "redirect output").start()
 
-    System.exit(process.waitFor())
+      val exitCode = process.waitFor()
+      if (exitCode != 0) {
+        throw new SparkUserAppException(exitCode)
+      }
+    } finally {
+      gatewayServer.shutdown()
+    }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 02fa3088ed..86fcf942c2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
 import org.apache.ivy.plugins.repository.file.FileRepository
 import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
 
+import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
 import org.apache.spark.api.r.RUtils
-import org.apache.spark.SPARK_VERSION
 import org.apache.spark.deploy.rest._
 import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
 
@@ -672,7 +672,13 @@ object SparkSubmit {
       mainMethod.invoke(null, childArgs.toArray)
     } catch {
       case t: Throwable =>
-        throw findCause(t)
+        findCause(t) match {
+          case SparkUserAppException(exitCode) =>
+            System.exit(exitCode)
+
+          case t: Throwable =>
+            throw t
+        }
     }
   }
 
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 6a8ddb37b2..991b5cec00 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.rpc._
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.SparkException
+import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
+  SparkException, SparkUserAppException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
@@ -530,6 +530,10 @@ private[spark] class ApplicationMaster(
             e.getCause match {
               case _: InterruptedException =>
                 // Reporter thread can interrupt to stop user class
+              case SparkUserAppException(exitCode) =>
+                val msg = s"User application exited with status $exitCode"
+                logError(msg)
+                finish(FinalApplicationStatus.FAILED, exitCode, msg)
               case cause: Throwable =>
                 logError("User class threw exception: " + cause, cause)
                 finish(FinalApplicationStatus.FAILED,
-- 
GitLab