Skip to content
Snippets Groups Projects
Commit bad0f7db authored by peng.zhang's avatar peng.zhang Committed by Marcelo Vanzin
Browse files

[SPARK-16095][YARN] Yarn cluster mode should report correct state to SparkLauncher

## What changes were proposed in this pull request?
Yarn cluster mode should return correct state for SparkLauncher

## How was this patch tested?
unit test

Author: peng.zhang <peng.zhang@xiaomi.com>

Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state.
parent d17e5f2f
No related branches found
No related tags found
No related merge requests found
......@@ -1080,7 +1080,14 @@ private[spark] class Client(
case YarnApplicationState.RUNNING =>
reportLauncherState(SparkAppHandle.State.RUNNING)
case YarnApplicationState.FINISHED =>
reportLauncherState(SparkAppHandle.State.FINISHED)
report.getFinalApplicationStatus match {
case FinalApplicationStatus.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case FinalApplicationStatus.KILLED =>
reportLauncherState(SparkAppHandle.State.KILLED)
case _ =>
reportLauncherState(SparkAppHandle.State.FINISHED)
}
case YarnApplicationState.FAILED =>
reportLauncherState(SparkAppHandle.State.FAILED)
case YarnApplicationState.KILLED =>
......
......@@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
finalState should be (SparkAppHandle.State.FAILED)
}
test("run Spark in yarn-cluster mode failure after sc initialized") {
val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass))
finalState should be (SparkAppHandle.State.FAILED)
}
test("run Python application in yarn-client mode") {
testPySpark(true)
}
......@@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener {
}
}
private object YarnClusterDriverWithFailure extends Logging with Matchers {
def main(args: Array[String]): Unit = {
val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn test with failure"))
throw new Exception("exception after sc initialized")
}
}
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
......@@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers {
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
data should be (Set(1, 2, 3, 4))
result = "success"
// Verify that the config archive is correctly placed in the classpath of all containers.
val confFile = "/" + Client.SPARK_CONF_FILE
assert(getClass().getResource(confFile) != null)
val configFromExecutors = sc.parallelize(1 to 4, 4)
.map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
.collect()
assert(configFromExecutors.find(_ == null) === None)
} finally {
Files.write(result, status, StandardCharsets.UTF_8)
sc.stop()
}
// Verify that the config archive is correctly placed in the classpath of all containers.
val confFile = "/" + Client.SPARK_CONF_FILE
assert(getClass().getResource(confFile) != null)
val configFromExecutors = sc.parallelize(1 to 4, 4)
.map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
.collect()
assert(configFromExecutors.find(_ == null) === None)
// verify log urls are present
val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
assert(listeners.size === 1)
......@@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers {
}
private object YarnClasspathTest extends Logging {
var exitCode = 0
def error(m: String, ex: Throwable = null): Unit = {
logError(m, ex)
// scalastyle:off println
......@@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging {
} finally {
sc.stop()
}
System.exit(exitCode)
}
private def readResource(resultPath: String): Unit = {
......@@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging {
} catch {
case t: Throwable =>
error(s"loading test.resource to $resultPath", t)
// set the exit code if not yet set
exitCode = 2
} finally {
Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment