Skip to content
Snippets Groups Projects
Commit 55a92ef3 authored by unknown's avatar unknown Committed by Andrew Or
Browse files

[SPARK-4346][SPARK-3596][YARN] Commonize the monitor logic

1. YarnClientSchedulerBack.asyncMonitorApplication use Client.monitorApplication so that commonize the monitor logic
2. Support changing the yarn client monitor interval, see #5292
3. More details see discussion on https://github.com/apache/spark/pull/3143

Author: unknown <l00251599@HGHY1L002515991.china.huawei.com>
Author: Sephiroth-Lin <linwzhong@gmail.com>

Closes #5305 from Sephiroth-Lin/SPARK-4346_3596 and squashes the following commits:

47c0014 [unknown] Edit conflicts
52b29fe [unknown] Interrupt thread when we call stop()
d4298a1 [unknown] Unused, don't push
aaacb42 [Sephiroth-Lin] don't wrap the entire block in the try
ee2b2fd [Sephiroth-Lin] update
6483a2a [unknown] Catch exception
6b47ff7 [unknown] Update code
568f46f [unknown] YarnClientSchedulerBack.asyncMonitorApplication should be common with Client.monitorApplication
parent 86403f55
No related branches found
No related tags found
No related merge requests found
......@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
......@@ -561,7 +562,14 @@ private[spark] class Client(
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
val report = getApplicationReport(appId)
val report: ApplicationReport =
try {
getApplicationReport(appId)
} catch {
case e: ApplicationNotFoundException =>
logError(s"Application $appId not found.")
return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
}
val state = report.getYarnApplicationState
if (logApplicationReport) {
......
......@@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend(
private var client: Client = null
private var appId: ApplicationId = null
@volatile private var stopping: Boolean = false
private var monitorThread: Thread = null
/**
* Create a Yarn client to submit an application to the ResourceManager.
......@@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
appId = client.submitApplication()
waitForApplication()
asyncMonitorApplication()
monitorThread = asyncMonitorApplication()
monitorThread.start()
}
/**
......@@ -123,34 +124,19 @@ private[spark] class YarnClientSchedulerBackend(
* If the application has exited for any reason, stop the SparkContext.
* This assumes both `client` and `appId` have already been set.
*/
private def asyncMonitorApplication(): Unit = {
private def asyncMonitorApplication(): Thread = {
assert(client != null && appId != null, "Application has not been submitted yet!")
val t = new Thread {
override def run() {
while (!stopping) {
var state: YarnApplicationState = null
try {
val report = client.getApplicationReport(appId)
state = report.getYarnApplicationState()
} catch {
case e: ApplicationNotFoundException =>
state = YarnApplicationState.KILLED
}
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.KILLED ||
state == YarnApplicationState.FAILED) {
logError(s"Yarn application has already exited with state $state!")
sc.stop()
stopping = true
}
Thread.sleep(1000L)
}
val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
sc.stop()
Thread.currentThread().interrupt()
}
}
t.setName("Yarn application state monitor")
t.setDaemon(true)
t.start()
t
}
/**
......@@ -158,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
override def stop() {
assert(client != null, "Attempted to stop this scheduler before starting it!")
stopping = true
monitorThread.interrupt()
super.stop()
client.stop()
logInfo("Stopped")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment