Skip to content
Snippets Groups Projects
Commit 70e824f7 authored by Thomas Graves's avatar Thomas Graves
Browse files

[SPARK-3627] - [yarn] - fix exit code and final status reporting to RM

See the description and whats handled in the jira comment: https://issues.apache.org/jira/browse/SPARK-3627?focusedCommentId=14150013&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14150013

This does not handle yarn client mode reporting of the driver to the AM.   I think that should be handled when we make it an unmanaged AM.

Author: Thomas Graves <tgraves@apache.org>

Closes #2577 from tgravescs/SPARK-3627 and squashes the following commits:

9c2efbf [Thomas Graves] review comments
e8cc261 [Thomas Graves] fix accidental typo during fixing comment
24c98e3 [Thomas Graves] rework
85f1901 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into SPARK-3627
fab166d [Thomas Graves] update based on review comments
32f4dfa [Thomas Graves] switch back
f0b6519 [Thomas Graves] change order of cleanup staging dir
d3cc800 [Thomas Graves] SPARK-3627 - yarn - fix exit code and final status reporting to RM
parent 69c3f441
No related branches found
No related tags found
No related merge requests found
...@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC ...@@ -40,6 +40,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
private var rpc: YarnRPC = null private var rpc: YarnRPC = null
private var resourceManager: AMRMProtocol = _ private var resourceManager: AMRMProtocol = _
private var uiHistoryAddress: String = _ private var uiHistoryAddress: String = _
private var registered: Boolean = false
override def register( override def register(
conf: YarnConfiguration, conf: YarnConfiguration,
...@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC ...@@ -51,8 +52,11 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
this.rpc = YarnRPC.create(conf) this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress this.uiHistoryAddress = uiHistoryAddress
resourceManager = registerWithResourceManager(conf) synchronized {
registerApplicationMaster(uiAddress) resourceManager = registerWithResourceManager(conf)
registerApplicationMaster(uiAddress)
registered = true
}
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args, new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations, securityMgr) preferredNodeLocations, securityMgr)
...@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC ...@@ -66,14 +70,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
appAttemptId appAttemptId
} }
override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = { override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) if (registered) {
.asInstanceOf[FinishApplicationMasterRequest] val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
finishReq.setAppAttemptId(getAttemptId()) .asInstanceOf[FinishApplicationMasterRequest]
finishReq.setFinishApplicationStatus(status) finishReq.setAppAttemptId(getAttemptId())
finishReq.setDiagnostics(diagnostics) finishReq.setFinishApplicationStatus(status)
finishReq.setTrackingUrl(uiHistoryAddress) finishReq.setDiagnostics(diagnostics)
resourceManager.finishApplicationMaster(finishReq) finishReq.setTrackingUrl(uiHistoryAddress)
resourceManager.finishApplicationMaster(finishReq)
}
} }
override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = { override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
......
...@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._ ...@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
...@@ -56,8 +57,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -56,8 +57,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3))) sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
@volatile private var exitCode = 0
@volatile private var unregistered = false
@volatile private var finished = false @volatile private var finished = false
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
@volatile private var finalMsg: String = ""
@volatile private var userClassThread: Thread = _ @volatile private var userClassThread: Thread = _
private var reporterThread: Thread = _ private var reporterThread: Thread = _
...@@ -71,80 +75,107 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -71,80 +75,107 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private val sparkContextRef = new AtomicReference[SparkContext](null) private val sparkContextRef = new AtomicReference[SparkContext](null)
final def run(): Int = { final def run(): Int = {
val appAttemptId = client.getAttemptId() try {
val appAttemptId = client.getAttemptId()
if (isDriver) { if (isDriver) {
// Set the web ui port to be ephemeral for yarn so we don't conflict with // Set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box // other spark processes running on the same box
System.setProperty("spark.ui.port", "0") System.setProperty("spark.ui.port", "0")
// Set the master property to match the requested mode. // Set the master property to match the requested mode.
System.setProperty("spark.master", "yarn-cluster") System.setProperty("spark.master", "yarn-cluster")
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
} }
logInfo("ApplicationAttemptId: " + appAttemptId) logInfo("ApplicationAttemptId: " + appAttemptId)
val cleanupHook = new Runnable { val cleanupHook = new Runnable {
override def run() { override def run() {
// If the SparkContext is still registered, shut it down as a best case effort in case // If the SparkContext is still registered, shut it down as a best case effort in case
// users do not call sc.stop or do System.exit(). // users do not call sc.stop or do System.exit().
val sc = sparkContextRef.get() val sc = sparkContextRef.get()
if (sc != null) { if (sc != null) {
logInfo("Invoking sc stop from shutdown hook") logInfo("Invoking sc stop from shutdown hook")
sc.stop() sc.stop()
finish(FinalApplicationStatus.SUCCEEDED) }
} val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
if (!finished) {
// this shouldn't ever happen, but if it does assume weird failure
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"shutdown hook called without cleanly finishing")
}
// Cleanup the staging dir after the app is finished, or if it's the last attempt at if (!unregistered) {
// running the AM. // we only want to unregister if we don't want the RM to retry
val maxAppAttempts = client.getMaxRegAttempts(yarnConf) if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts unregister(finalStatus, finalMsg)
if (finished || isLastAttempt) { cleanupStagingDir()
cleanupStagingDir() }
}
} }
} }
}
// Use higher priority than FileSystem. // Use higher priority than FileSystem.
assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY) assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
ShutdownHookManager ShutdownHookManager
.get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY) .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
// Call this to force generation of secret so it gets populated into the // Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserClass which does a // Hadoop UGI. This has to happen before the startUserClass which does a
// doAs in order for the credentials to be passed on to the executor containers. // doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf) val securityMgr = new SecurityManager(sparkConf)
if (isDriver) { if (isDriver) {
runDriver(securityMgr) runDriver(securityMgr)
} else { } else {
runExecutorLauncher(securityMgr) runExecutorLauncher(securityMgr)
}
} catch {
case e: Exception =>
// catch everything else if not specifically handled
logError("Uncaught exception: ", e)
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_UNCAUGHT_EXCEPTION,
"Uncaught exception: " + e.getMessage())
} }
exitCode
}
if (finalStatus != FinalApplicationStatus.UNDEFINED) { /**
finish(finalStatus) * unregister is used to completely unregister the application from the ResourceManager.
0 * This means the ResourceManager will not retry the application attempt on your behalf if
} else { * a failure occurred.
1 */
final def unregister(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
if (!unregistered) {
logInfo(s"Unregistering ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
unregistered = true
client.unregister(status, Option(diagnostics).getOrElse(""))
} }
} }
final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized { final def finish(status: FinalApplicationStatus, code: Int, msg: String = null) = synchronized {
if (!finished) { if (!finished) {
logInfo(s"Finishing ApplicationMaster with $status" + logInfo(s"Final app status: ${status}, exitCode: ${code}" +
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
finished = true exitCode = code
finalStatus = status finalStatus = status
try { finalMsg = msg
if (Thread.currentThread() != reporterThread) { finished = true
reporterThread.interrupt() if (Thread.currentThread() != reporterThread && reporterThread != null) {
reporterThread.join() logDebug("shutting down reporter thread")
} reporterThread.interrupt()
} finally { }
client.shutdown(status, Option(diagnostics).getOrElse("")) if (Thread.currentThread() != userClassThread && userClassThread != null) {
logDebug("shutting down user thread")
userClassThread.interrupt()
} }
} }
} }
...@@ -182,7 +213,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -182,7 +213,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def runDriver(securityMgr: SecurityManager): Unit = { private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter() addAmIpFilter()
val userThread = startUserClass() setupSystemSecurityManager()
userClassThread = startUserClass()
// This a bit hacky, but we need to wait until the spark.driver.port property has // This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class. // been set by the Thread executing the user class.
...@@ -190,15 +222,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -190,15 +222,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// If there is no SparkContext at this point, just fail the app. // If there is no SparkContext at this point, just fail the app.
if (sc == null) { if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.") finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} else { } else {
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
try { userClassThread.join()
userThread.join()
} finally {
// In cluster mode, ask the reporter thread to stop since the user app is finished.
reporterThread.interrupt()
}
} }
} }
...@@ -211,7 +240,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -211,7 +240,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// In client mode the actor will stop the reporter thread. // In client mode the actor will stop the reporter thread.
reporterThread.join() reporterThread.join()
finalStatus = FinalApplicationStatus.SUCCEEDED
} }
private def launchReporterThread(): Thread = { private def launchReporterThread(): Thread = {
...@@ -231,33 +259,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -231,33 +259,26 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val t = new Thread { val t = new Thread {
override def run() { override def run() {
var failureCount = 0 var failureCount = 0
while (!finished) { while (!finished) {
try { try {
checkNumExecutorsFailed() if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
if (!finished) { finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Max number of executor failures reached")
} else {
logDebug("Sending progress") logDebug("Sending progress")
allocator.allocateResources() allocator.allocateResources()
} }
failureCount = 0 failureCount = 0
} catch { } catch {
case i: InterruptedException =>
case e: Throwable => { case e: Throwable => {
failureCount += 1 failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) { if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
logError("Exception was thrown from Reporter thread.", e) finish(FinalApplicationStatus.FAILED,
finish(FinalApplicationStatus.FAILED, "Exception was thrown" + ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"${failureCount} time(s) from Reporter thread.") s"${failureCount} time(s) from Reporter thread.")
/**
* If exception is thrown from ReporterThread,
* interrupt user class to stop.
* Without this interrupting, if exception is
* thrown before allocating enough executors,
* YarnClusterScheduler waits until timeout even though
* we cannot allocate executors.
*/
logInfo("Interrupting user class to stop.")
userClassThread.interrupt
} else { } else {
logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e) logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
} }
...@@ -308,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -308,7 +329,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.synchronized { sparkContextRef.synchronized {
var count = 0 var count = 0
val waitTime = 10000L val waitTime = 10000L
val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10) val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 10)
while (sparkContextRef.get() == null && count < numTries && !finished) { while (sparkContextRef.get() == null && count < numTries && !finished) {
logInfo("Waiting for spark context initialization ... " + count) logInfo("Waiting for spark context initialization ... " + count)
count = count + 1 count = count + 1
...@@ -328,10 +349,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -328,10 +349,19 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def waitForSparkDriver(): ActorRef = { private def waitForSparkDriver(): ActorRef = {
logInfo("Waiting for Spark driver to be reachable.") logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false var driverUp = false
var count = 0
val hostport = args.userArgs(0) val hostport = args.userArgs(0)
val (driverHost, driverPort) = Utils.parseHostPort(hostport) val (driverHost, driverPort) = Utils.parseHostPort(hostport)
while (!driverUp) {
// spark driver should already be up since it launched us, but we don't want to
// wait forever, so wait 100 seconds max to match the cluster mode setting.
// Leave this config unpublished for now. SPARK-3779 to investigating changing
// this config to be time based.
val numTries = sparkConf.getInt("spark.yarn.applicationMaster.waitTries", 1000)
while (!driverUp && !finished && count < numTries) {
try { try {
count = count + 1
val socket = new Socket(driverHost, driverPort) val socket = new Socket(driverHost, driverPort)
socket.close() socket.close()
logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
...@@ -343,6 +373,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -343,6 +373,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
Thread.sleep(100) Thread.sleep(100)
} }
} }
if (!driverUp) {
throw new SparkException("Failed to connect to driver!")
}
sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString) sparkConf.set("spark.driver.port", driverPort.toString)
...@@ -354,18 +389,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -354,18 +389,6 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM") actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
} }
private def checkNumExecutorsFailed() = {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
val sc = sparkContextRef.get()
if (sc != null) {
logInfo("Invoking sc stop from checkNumExecutorsFailed")
sc.stop()
}
}
}
/** Add the Yarn IP filter that is required for properly securing the UI. */ /** Add the Yarn IP filter that is required for properly securing the UI. */
private def addAmIpFilter() = { private def addAmIpFilter() = {
val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
...@@ -379,40 +402,81 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -379,40 +402,81 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
} }
} }
/**
* This system security manager applies to the entire process.
* It's main purpose is to handle the case if the user code does a System.exit.
* This allows us to catch that and properly set the YARN application status and
* cleanup if needed.
*/
private def setupSystemSecurityManager(): Unit = {
try {
var stopped = false
System.setSecurityManager(new java.lang.SecurityManager() {
override def checkExit(paramInt: Int) {
if (!stopped) {
logInfo("In securityManager checkExit, exit code: " + paramInt)
if (paramInt == 0) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
} else {
finish(FinalApplicationStatus.FAILED,
paramInt,
"User class exited with non-zero exit code")
}
stopped = true
}
}
// required for the checkExit to work properly
override def checkPermission(perm: java.security.Permission): Unit = {}
})
}
catch {
case e: SecurityException =>
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SECURITY,
"Error in setSecurityManager")
logError("Error in setSecurityManager:", e)
}
}
/**
* Start the user class, which contains the spark driver, in a separate Thread.
* If the main routine exits cleanly or exits with System.exit(0) we
* assume it was successful, for all other cases we assume failure.
*
* Returns the user thread that was started.
*/
private def startUserClass(): Thread = { private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread") logInfo("Starting the user JAR in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString) System.setProperty("spark.executor.instances", args.numExecutors.toString)
val mainMethod = Class.forName(args.userClass, false, val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
userClassThread = new Thread { val userThread = new Thread {
override def run() { override def run() {
var status = FinalApplicationStatus.FAILED
try { try {
// Copy
val mainArgs = new Array[String](args.userArgs.size) val mainArgs = new Array[String](args.userArgs.size)
args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs) mainMethod.invoke(null, mainArgs)
// Some apps have "System.exit(0)" at the end. The user thread will stop here unless finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
// it has an uncaught exception thrown out. It needs a shutdown hook to set SUCCEEDED. logDebug("Done running users class")
status = FinalApplicationStatus.SUCCEEDED
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
e.getCause match { e.getCause match {
case _: InterruptedException => case _: InterruptedException =>
// Reporter thread can interrupt to stop user class // Reporter thread can interrupt to stop user class
case e: Exception =>
case e => throw e finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_EXCEPTION_USER_CLASS,
"User class threw exception: " + e.getMessage)
// re-throw to get it logged
throw e
} }
} finally {
logDebug("Finishing main")
finalStatus = status
} }
} }
} }
userClassThread.setName("Driver") userThread.setName("Driver")
userClassThread.start() userThread.start()
userClassThread userThread
} }
// Actor used to monitor the driver when running in client deploy mode. // Actor used to monitor the driver when running in client deploy mode.
...@@ -432,7 +496,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -432,7 +496,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
override def receive = { override def receive = {
case x: DisassociatedEvent => case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x") logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED) finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
case x: AddWebUIFilter => case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x") logInfo(s"Add WebUI Filter. $x")
driver ! x driver ! x
...@@ -446,6 +510,15 @@ object ApplicationMaster extends Logging { ...@@ -446,6 +510,15 @@ object ApplicationMaster extends Logging {
val SHUTDOWN_HOOK_PRIORITY: Int = 30 val SHUTDOWN_HOOK_PRIORITY: Int = 30
// exit codes for different causes, no reason behind the values
private val EXIT_SUCCESS = 0
private val EXIT_UNCAUGHT_EXCEPTION = 10
private val EXIT_MAX_EXECUTOR_FAILURES = 11
private val EXIT_REPORTER_FAILURE = 12
private val EXIT_SC_NOT_INITED = 13
private val EXIT_SECURITY = 14
private val EXIT_EXCEPTION_USER_CLASS = 15
private var master: ApplicationMaster = _ private var master: ApplicationMaster = _
def main(args: Array[String]) = { def main(args: Array[String]) = {
......
...@@ -49,12 +49,12 @@ trait YarnRMClient { ...@@ -49,12 +49,12 @@ trait YarnRMClient {
securityMgr: SecurityManager): YarnAllocator securityMgr: SecurityManager): YarnAllocator
/** /**
* Shuts down the AM. Guaranteed to only be called once. * Unregister the AM. Guaranteed to only be called once.
* *
* @param status The final status of the AM. * @param status The final status of the AM.
* @param diagnostics Diagnostics message to include in the final status. * @param diagnostics Diagnostics message to include in the final status.
*/ */
def shutdown(status: FinalApplicationStatus, diagnostics: String = ""): Unit def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
/** Returns the attempt ID. */ /** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId def getAttemptId(): ApplicationAttemptId
......
...@@ -45,6 +45,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC ...@@ -45,6 +45,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
private var amClient: AMRMClient[ContainerRequest] = _ private var amClient: AMRMClient[ContainerRequest] = _
private var uiHistoryAddress: String = _ private var uiHistoryAddress: String = _
private var registered: Boolean = false
override def register( override def register(
conf: YarnConfiguration, conf: YarnConfiguration,
...@@ -59,13 +60,19 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC ...@@ -59,13 +60,19 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
this.uiHistoryAddress = uiHistoryAddress this.uiHistoryAddress = uiHistoryAddress
logInfo("Registering the ApplicationMaster") logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations, securityMgr) preferredNodeLocations, securityMgr)
} }
override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") = override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) if (registered) {
amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
}
}
override def getAttemptId() = { override def getAttemptId() = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment