diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d97fa2e2151bc86afcba47cf16cab79212c8ea24..d225061fcd1b4797376b857e8b0f4e7d5e6b95bc 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   private var client: Client = null
   private var appId: ApplicationId = null
-  private var monitorThread: Thread = null
+  private var monitorThread: MonitorThread = null
 
   /**
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -131,24 +131,42 @@ private[spark] class YarnClientSchedulerBackend(
     }
   }
 
+  /**
+   * We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's
+   * because the SparkContext is being shut down(sc.stop() called by user code), but if
+   * monitorApplication return, it means the Yarn application finished before sc.stop() was called,
+   * which means we should call sc.stop() here, and we don't allow the monitor to be interrupted
+   * before SparkContext stops successfully.
+   */
+  private class MonitorThread extends Thread {
+    private var allowInterrupt = true
+
+    override def run() {
+      try {
+        val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+        logError(s"Yarn application has already exited with state $state!")
+        allowInterrupt = false
+        sc.stop()
+      } catch {
+        case e: InterruptedException => logInfo("Interrupting monitor thread")
+      }
+    }
+
+    def stopMonitor(): Unit = {
+      if (allowInterrupt) {
+        this.interrupt()
+      }
+    }
+  }
+
   /**
    * Monitor the application state in a separate thread.
    * If the application has exited for any reason, stop the SparkContext.
    * This assumes both `client` and `appId` have already been set.
    */
-  private def asyncMonitorApplication(): Thread = {
+  private def asyncMonitorApplication(): MonitorThread = {
     assert(client != null && appId != null, "Application has not been submitted yet!")
-    val t = new Thread {
-      override def run() {
-        try {
-          val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
-          logError(s"Yarn application has already exited with state $state!")
-          sc.stop()
-        } catch {
-          case e: InterruptedException => logInfo("Interrupting monitor thread")
-        }
-      }
-    }
+    val t = new MonitorThread
     t.setName("Yarn application state monitor")
     t.setDaemon(true)
     t
@@ -160,7 +178,7 @@ private[spark] class YarnClientSchedulerBackend(
   override def stop() {
     assert(client != null, "Attempted to stop this scheduler before starting it!")
     if (monitorThread != null) {
-      monitorThread.interrupt()
+      monitorThread.stopMonitor()
     }
     super.stop()
     client.stop()
@@ -174,5 +192,4 @@ private[spark] class YarnClientSchedulerBackend(
       super.applicationId
     }
   }
-
 }