Skip to content
Snippets Groups Projects
Commit f68d0240 authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-7736] [CORE] [YARN] Make pyspark fail YARN app on failure.

The YARN backend doesn't like when user code calls `System.exit`,
since it cannot know the exit status and thus cannot set an
appropriate final status for the application.

So, for pyspark, avoid that call and instead throw an exception with
the exit code. SparkSubmit handles that exception and exits with
the given exit code, while YARN uses the exit code as the failure
code for the Spark app.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #7751 from vanzin/SPARK-9416.
parent ed092a06
No related branches found
No related tags found
No related merge requests found
......@@ -30,3 +30,10 @@ class SparkException(message: String, cause: Throwable)
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)
/**
* Exception thrown when the main user code is run as a child process (e.g. pyspark) and we want
* the parent SparkSubmit process to exit with the same exit code.
*/
private[spark] case class SparkUserAppException(exitCode: Int)
extends SparkException(s"User application exited with $exitCode")
......@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.Try
import org.apache.spark.SparkUserAppException
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
......@@ -46,7 +47,14 @@ object PythonRunner {
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
val gatewayServer = new py4j.GatewayServer(null, 0)
gatewayServer.start()
val thread = new Thread(new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions {
gatewayServer.start()
}
})
thread.setName("py4j-gateway")
thread.setDaemon(true)
thread.start()
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
......@@ -64,11 +72,18 @@ object PythonRunner {
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()
try {
val process = builder.start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
System.exit(process.waitFor())
val exitCode = process.waitFor()
if (exitCode != 0) {
throw new SparkUserAppException(exitCode)
}
} finally {
gatewayServer.shutdown()
}
}
/**
......
......@@ -39,8 +39,8 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{FileSystemResolver, ChainResolver, IBiblioResolver}
import org.apache.spark.{SparkUserAppException, SPARK_VERSION}
import org.apache.spark.api.r.RUtils
import org.apache.spark.SPARK_VERSION
import org.apache.spark.deploy.rest._
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
......@@ -672,7 +672,13 @@ object SparkSubmit {
mainMethod.invoke(null, childArgs.toArray)
} catch {
case t: Throwable =>
throw findCause(t)
findCause(t) match {
case SparkUserAppException(exitCode) =>
System.exit(exitCode)
case t: Throwable =>
throw t
}
}
}
......
......@@ -30,8 +30,8 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.rpc._
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv,
SparkException, SparkUserAppException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend}
......@@ -530,6 +530,10 @@ private[spark] class ApplicationMaster(
e.getCause match {
case _: InterruptedException =>
// Reporter thread can interrupt to stop user class
case SparkUserAppException(exitCode) =>
val msg = s"User application exited with status $exitCode"
logError(msg)
finish(FinalApplicationStatus.FAILED, exitCode, msg)
case cause: Throwable =>
logError("User class threw exception: " + cause, cause)
finish(FinalApplicationStatus.FAILED,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment