Skip to content
Snippets Groups Projects
Commit 7a969a69 authored by linweizhong's avatar linweizhong Committed by Marcelo Vanzin
Browse files

[SPARK-9519] [YARN] Confirm stop sc successfully when application was killed

Currently, when we kill application on Yarn, then will call sc.stop() at Yarn application state monitor thread, then in YarnClientSchedulerBackend.stop() will call interrupt this will cause SparkContext not stop fully as we will wait executor to exit.

Author: linweizhong <linweizhong@huawei.com>

Closes #7846 from Sephiroth-Lin/SPARK-9519 and squashes the following commits:

1ae736d [linweizhong] Update comments
2e8e365 [linweizhong] Add comment explaining the code
ad0e23b [linweizhong] Update
243d2c7 [linweizhong] Confirm stop sc successfully when application was killed
parent 23d98220
No related branches found
No related tags found
No related merge requests found
......@@ -33,7 +33,7 @@ private[spark] class YarnClientSchedulerBackend(
private var client: Client = null
private var appId: ApplicationId = null
private var monitorThread: Thread = null
private var monitorThread: MonitorThread = null
/**
* Create a Yarn client to submit an application to the ResourceManager.
......@@ -131,24 +131,42 @@ private[spark] class YarnClientSchedulerBackend(
}
}
/**
* We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's
* because the SparkContext is being shut down(sc.stop() called by user code), but if
* monitorApplication return, it means the Yarn application finished before sc.stop() was called,
* which means we should call sc.stop() here, and we don't allow the monitor to be interrupted
* before SparkContext stops successfully.
*/
private class MonitorThread extends Thread {
private var allowInterrupt = true
override def run() {
try {
val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
allowInterrupt = false
sc.stop()
} catch {
case e: InterruptedException => logInfo("Interrupting monitor thread")
}
}
def stopMonitor(): Unit = {
if (allowInterrupt) {
this.interrupt()
}
}
}
/**
* Monitor the application state in a separate thread.
* If the application has exited for any reason, stop the SparkContext.
* This assumes both `client` and `appId` have already been set.
*/
private def asyncMonitorApplication(): Thread = {
private def asyncMonitorApplication(): MonitorThread = {
assert(client != null && appId != null, "Application has not been submitted yet!")
val t = new Thread {
override def run() {
try {
val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
sc.stop()
} catch {
case e: InterruptedException => logInfo("Interrupting monitor thread")
}
}
}
val t = new MonitorThread
t.setName("Yarn application state monitor")
t.setDaemon(true)
t
......@@ -160,7 +178,7 @@ private[spark] class YarnClientSchedulerBackend(
override def stop() {
assert(client != null, "Attempted to stop this scheduler before starting it!")
if (monitorThread != null) {
monitorThread.interrupt()
monitorThread.stopMonitor()
}
super.stop()
client.stop()
......@@ -174,5 +192,4 @@ private[spark] class YarnClientSchedulerBackend(
super.applicationId
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment