diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 2ebd7a7151a59438615acd969b19e3a80e8be34e..977a27bdfe1bfbed0113e440ce4655533dd38057 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable) */ private[spark] class SparkDriverExecutionException(cause: Throwable) extends SparkException("Execution error", cause) + +/** + * Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want + * the parent SparkSubmit process to exit with the same exit code. + */ +private[spark] case class SparkUserAppException(exitCode: Int) + extends SparkException(s"User application exited with $exitCode") diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index c2ed43a5397d6627ced47ee14f640aef3c964fc8..4277ac2ad13eabb24dbb458ea66412a65d48cb3c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.util.Try +import org.apache.spark.SparkUserAppException import org.apache.spark.api.python.PythonUtils import org.apache.spark.util.{RedirectThread, Utils} @@ -46,7 +47,14 @@ object PythonRunner { // Launch a Py4J gateway server for the process to connect to; this will let it see our // Java system properties and such val gatewayServer = new py4j.GatewayServer(null, 0) - gatewayServer.start() + val thread = new Thread(new Runnable() { + override def run(): Unit = Utils.logUncaughtExceptions { + gatewayServer.start() + } + }) + thread.setName("py4j-gateway") + thread.setDaemon(true) + thread.start() // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument @@ -64,11 +72,18 @@ object PythonRunner { env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize - val process = builder.start() + try { + val process = builder.start() - new RedirectThread(process.getInputStream, System.out, "redirect output").start() + new RedirectThread(process.getInputStream, System.out, "redirect output").start() - System.exit(process.waitFor()) + val exitCode = process.waitFor() + if (exitCode != 0) { + throw new SparkUserAppException(exitCode) + } + } finally { + gatewayServer.shutdown() + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 02fa3088eded079df0c46e6efc3d3bef91f440c7..86fcf942c2c4e4d07b0b2f25208cb91ab9dec807 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher import org.apache.ivy.plugins.repository.file.FileRepository import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver} +import org.apache.spark.{SparkUserAppException, SPARK_VERSION} import org.apache.spark.api.r.RUtils -import org.apache.spark.SPARK_VERSION import org.apache.spark.deploy.rest._ import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils} @@ -672,7 +672,13 @@ object SparkSubmit { mainMethod.invoke(null, childArgs.toArray) } catch { case t: Throwable => - throw findCause(t) + findCause(t) match { + case SparkUserAppException(exitCode) => + System.exit(exitCode) + + case t: Throwable => + throw t + } } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 6a8ddb37b29e887460c10639b21bd7a20d7d2bb9..991b5cec00bd8c3c6053a939499c6ccd72cf3437 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.rpc._ -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} -import org.apache.spark.SparkException +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv, + SparkException, SparkUserAppException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} @@ -530,6 +530,10 @@ private[spark] class ApplicationMaster( e.getCause match { case _: InterruptedException => // Reporter thread can interrupt to stop user class + case SparkUserAppException(exitCode) => + val msg = s"User application exited with status $exitCode" + logError(msg) + finish(FinalApplicationStatus.FAILED, exitCode, msg) case cause: Throwable => logError("User class threw exception: " + cause, cause) finish(FinalApplicationStatus.FAILED,