From bad0f7dbba2eda149ee4fc5810674d971d17874a Mon Sep 17 00:00:00 2001
From: "peng.zhang" <peng.zhang@xiaomi.com>
Date: Fri, 1 Jul 2016 15:51:21 -0700
Subject: [PATCH] [SPARK-16095][YARN] Yarn cluster mode should report correct
 state to SparkLauncher

## What changes were proposed in this pull request?
Yarn cluster mode should return correct state for SparkLauncher

## How was this patch tested?
unit test

Author: peng.zhang <peng.zhang@xiaomi.com>

Closes #13962 from renozhang/SPARK-16095-spark-launcher-wrong-state.
---
 .../org/apache/spark/deploy/yarn/Client.scala |  9 ++++-
 .../spark/deploy/yarn/YarnClusterSuite.scala  | 37 ++++++++++++-------
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index d63579ff82..244d1a4e33 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1080,7 +1080,14 @@ private[spark] class Client(
           case YarnApplicationState.RUNNING =>
             reportLauncherState(SparkAppHandle.State.RUNNING)
           case YarnApplicationState.FINISHED =>
-            reportLauncherState(SparkAppHandle.State.FINISHED)
+            report.getFinalApplicationStatus match {
+              case FinalApplicationStatus.FAILED =>
+                reportLauncherState(SparkAppHandle.State.FAILED)
+              case FinalApplicationStatus.KILLED =>
+                reportLauncherState(SparkAppHandle.State.KILLED)
+              case _ =>
+                reportLauncherState(SparkAppHandle.State.FINISHED)
+            }
           case YarnApplicationState.FAILED =>
             reportLauncherState(SparkAppHandle.State.FAILED)
           case YarnApplicationState.KILLED =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 6b20dea590..9085fca1d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -120,6 +120,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     finalState should be (SparkAppHandle.State.FAILED)
   }
 
+  test("run Spark in yarn-cluster mode failure after sc initialized") {
+    val finalState = runSpark(false, mainClassName(YarnClusterDriverWithFailure.getClass))
+    finalState should be (SparkAppHandle.State.FAILED)
+  }
+
   test("run Python application in yarn-client mode") {
     testPySpark(true)
   }
@@ -259,6 +264,16 @@ private[spark] class SaveExecutorInfo extends SparkListener {
   }
 }
 
+private object YarnClusterDriverWithFailure extends Logging with Matchers {
+  def main(args: Array[String]): Unit = {
+    val sc = new SparkContext(new SparkConf()
+      .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
+      .setAppName("yarn test with failure"))
+
+    throw new Exception("exception after sc initialized")
+  }
+}
+
 private object YarnClusterDriver extends Logging with Matchers {
 
   val WAIT_TIMEOUT_MILLIS = 10000
@@ -287,19 +302,19 @@ private object YarnClusterDriver extends Logging with Matchers {
       sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
       data should be (Set(1, 2, 3, 4))
       result = "success"
+
+      // Verify that the config archive is correctly placed in the classpath of all containers.
+      val confFile = "/" + Client.SPARK_CONF_FILE
+      assert(getClass().getResource(confFile) != null)
+      val configFromExecutors = sc.parallelize(1 to 4, 4)
+        .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
+        .collect()
+      assert(configFromExecutors.find(_ == null) === None)
     } finally {
       Files.write(result, status, StandardCharsets.UTF_8)
       sc.stop()
     }
 
-    // Verify that the config archive is correctly placed in the classpath of all containers.
-    val confFile = "/" + Client.SPARK_CONF_FILE
-    assert(getClass().getResource(confFile) != null)
-    val configFromExecutors = sc.parallelize(1 to 4, 4)
-      .map { _ => Option(getClass().getResource(confFile)).map(_.toString).orNull }
-      .collect()
-    assert(configFromExecutors.find(_ == null) === None)
-
     // verify log urls are present
     val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
     assert(listeners.size === 1)
@@ -330,9 +345,6 @@ private object YarnClusterDriver extends Logging with Matchers {
 }
 
 private object YarnClasspathTest extends Logging {
-
-  var exitCode = 0
-
   def error(m: String, ex: Throwable = null): Unit = {
     logError(m, ex)
     // scalastyle:off println
@@ -361,7 +373,6 @@ private object YarnClasspathTest extends Logging {
     } finally {
       sc.stop()
     }
-    System.exit(exitCode)
   }
 
   private def readResource(resultPath: String): Unit = {
@@ -374,8 +385,6 @@ private object YarnClasspathTest extends Logging {
     } catch {
       case t: Throwable =>
         error(s"loading test.resource to $resultPath", t)
-        // set the exit code if not yet set
-        exitCode = 2
     } finally {
       Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
     }
-- 
GitLab