From 55a92ef34c0b57b6e379523d5d79baa05392de37 Mon Sep 17 00:00:00 2001
From: unknown <l00251599@HGHY1L002515991.china.huawei.com>
Date: Wed, 8 Apr 2015 13:56:42 -0700
Subject: [PATCH] [SPARK-4346][SPARK-3596][YARN] Commonize the monitor logic

1. YarnClientSchedulerBack.asyncMonitorApplication use Client.monitorApplication so that commonize the monitor logic
2. Support changing the yarn client monitor interval, see #5292
3. More details see discussion on https://github.com/apache/spark/pull/3143

Author: unknown <l00251599@HGHY1L002515991.china.huawei.com>
Author: Sephiroth-Lin <linwzhong@gmail.com>

Closes #5305 from Sephiroth-Lin/SPARK-4346_3596 and squashes the following commits:

47c0014 [unknown] Edit conflicts
52b29fe [unknown] Interrupt thread when we call stop()
d4298a1 [unknown] Unused, don't push
aaacb42 [Sephiroth-Lin] don't wrap the entire block in the try
ee2b2fd [Sephiroth-Lin] update
6483a2a [unknown] Catch exception
6b47ff7 [unknown] Update code
568f46f [unknown] YarnClientSchedulerBack.asyncMonitorApplication should be common with Client.monitorApplication
---
 .../org/apache/spark/deploy/yarn/Client.scala | 10 +++++-
 .../cluster/YarnClientSchedulerBackend.scala  | 32 ++++++-------------
 2 files changed, 18 insertions(+), 24 deletions(-)

diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79d55a09eb..7219852c0a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
@@ -561,7 +562,14 @@ private[spark] class Client(
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
-      val report = getApplicationReport(appId)
+      val report: ApplicationReport =
+        try {
+          getApplicationReport(appId)
+        } catch {
+          case e: ApplicationNotFoundException =>
+            logError(s"Application $appId not found.")
+            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+        }
       val state = report.getYarnApplicationState
 
       if (logApplicationReport) {
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 8abdc26b43..407dc1ac4d 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   private var client: Client = null
   private var appId: ApplicationId = null
-  @volatile private var stopping: Boolean = false
+  private var monitorThread: Thread = null
 
   /**
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend(
     client = new Client(args, conf)
     appId = client.submitApplication()
     waitForApplication()
-    asyncMonitorApplication()
+    monitorThread = asyncMonitorApplication()
+    monitorThread.start()
   }
 
   /**
@@ -123,34 +124,19 @@ private[spark] class YarnClientSchedulerBackend(
    * If the application has exited for any reason, stop the SparkContext.
    * This assumes both `client` and `appId` have already been set.
    */
-  private def asyncMonitorApplication(): Unit = {
+  private def asyncMonitorApplication(): Thread = {
     assert(client != null && appId != null, "Application has not been submitted yet!")
     val t = new Thread {
       override def run() {
-        while (!stopping) {
-          var state: YarnApplicationState = null
-          try {
-            val report = client.getApplicationReport(appId)
-            state = report.getYarnApplicationState()
-          } catch {
-            case e: ApplicationNotFoundException =>
-              state = YarnApplicationState.KILLED
-          }
-          if (state == YarnApplicationState.FINISHED ||
-            state == YarnApplicationState.KILLED ||
-            state == YarnApplicationState.FAILED) {
-            logError(s"Yarn application has already exited with state $state!")
-            sc.stop()
-            stopping = true
-          }
-          Thread.sleep(1000L)
-        }
+        val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+        logError(s"Yarn application has already exited with state $state!")
+        sc.stop()
         Thread.currentThread().interrupt()
       }
     }
     t.setName("Yarn application state monitor")
     t.setDaemon(true)
-    t.start()
+    t
   }
 
   /**
@@ -158,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   override def stop() {
     assert(client != null, "Attempted to stop this scheduler before starting it!")
-    stopping = true
+    monitorThread.interrupt()
     super.stop()
     client.stop()
     logInfo("Stopped")
-- 
GitLab