diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79d55a09eb67126170d74fb766676af62f610da0..7219852c0a75237f0c25dc02d98c8fa03c7ee11a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.Records
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
@@ -561,7 +562,14 @@ private[spark] class Client(
     var lastState: YarnApplicationState = null
     while (true) {
       Thread.sleep(interval)
-      val report = getApplicationReport(appId)
+      val report: ApplicationReport =
+        try {
+          getApplicationReport(appId)
+        } catch {
+          case e: ApplicationNotFoundException =>
+            logError(s"Application $appId not found.")
+            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+        }
       val state = report.getYarnApplicationState
 
       if (logApplicationReport) {
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 8abdc26b43806222fd9ccac0ae75f205afa49f49..407dc1ac4d37d1821a28f3d0ff2982f921da020d 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   private var client: Client = null
   private var appId: ApplicationId = null
-  @volatile private var stopping: Boolean = false
+  private var monitorThread: Thread = null
 
   /**
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend(
     client = new Client(args, conf)
     appId = client.submitApplication()
     waitForApplication()
-    asyncMonitorApplication()
+    monitorThread = asyncMonitorApplication()
+    monitorThread.start()
   }
 
   /**
@@ -123,34 +124,19 @@ private[spark] class YarnClientSchedulerBackend(
    * If the application has exited for any reason, stop the SparkContext.
    * This assumes both `client` and `appId` have already been set.
    */
-  private def asyncMonitorApplication(): Unit = {
+  private def asyncMonitorApplication(): Thread = {
     assert(client != null && appId != null, "Application has not been submitted yet!")
     val t = new Thread {
       override def run() {
-        while (!stopping) {
-          var state: YarnApplicationState = null
-          try {
-            val report = client.getApplicationReport(appId)
-            state = report.getYarnApplicationState()
-          } catch {
-            case e: ApplicationNotFoundException =>
-              state = YarnApplicationState.KILLED
-          }
-          if (state == YarnApplicationState.FINISHED ||
-            state == YarnApplicationState.KILLED ||
-            state == YarnApplicationState.FAILED) {
-            logError(s"Yarn application has already exited with state $state!")
-            sc.stop()
-            stopping = true
-          }
-          Thread.sleep(1000L)
-        }
+        val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+        logError(s"Yarn application has already exited with state $state!")
+        sc.stop()
         Thread.currentThread().interrupt()
       }
     }
     t.setName("Yarn application state monitor")
     t.setDaemon(true)
-    t.start()
+    t
   }
 
   /**
@@ -158,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   override def stop() {
     assert(client != null, "Attempted to stop this scheduler before starting it!")
-    stopping = true
+    monitorThread.interrupt()
     super.stop()
     client.stop()
     logInfo("Stopped")