diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 37e98e01fddf79d823e89f62561f9ac98a785479..4cc320c5d59b5c1df357df2ea4d5c78ece5b2e15 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -54,7 +54,7 @@ private[spark] class ApplicationMaster( private val sparkConf = new SparkConf() private val yarnConf: YarnConfiguration = SparkHadoopUtil.get.newConfiguration(sparkConf) .asInstanceOf[YarnConfiguration] - private val isDriver = args.userClass != null + private val isClusterMode = args.userClass != null // Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures", @@ -81,7 +81,7 @@ private[spark] class ApplicationMaster( try { val appAttemptId = client.getAttemptId() - if (isDriver) { + if (isClusterMode) { // Set the web ui port to be ephemeral for yarn so we don't conflict with // other spark processes running on the same box System.setProperty("spark.ui.port", "0") @@ -139,7 +139,7 @@ private[spark] class ApplicationMaster( // doAs in order for the credentials to be passed on to the executor containers. val securityMgr = new SecurityManager(sparkConf) - if (isDriver) { + if (isClusterMode) { runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) @@ -162,7 +162,7 @@ private[spark] class ApplicationMaster( * from the application code. */ final def getDefaultFinalStatus() = { - if (isDriver) { + if (isClusterMode) { FinalApplicationStatus.SUCCEEDED } else { FinalApplicationStatus.UNDEFINED @@ -243,7 +243,7 @@ private[spark] class ApplicationMaster( private def runAMActor( host: String, port: String, - isDriver: Boolean): Unit = { + isClusterMode: Boolean): Unit = { val driverUrl = AkkaUtils.address( AkkaUtils.protocol(actorSystem), @@ -251,7 +251,7 @@ private[spark] class ApplicationMaster( host, port, YarnSchedulerBackend.ACTOR_NAME) - actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM") + actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isClusterMode)), name = "YarnAM") } private def runDriver(securityMgr: SecurityManager): Unit = { @@ -272,7 +272,7 @@ private[spark] class ApplicationMaster( runAMActor( sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.port"), - isDriver = true) + isClusterMode = true) registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) userClassThread.join() } @@ -427,7 +427,7 @@ private[spark] class ApplicationMaster( sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.port", driverPort.toString) - runAMActor(driverHost, driverPort.toString, isDriver = false) + runAMActor(driverHost, driverPort.toString, isClusterMode = false) } /** Add the Yarn IP filter that is required for properly securing the UI. */ @@ -435,7 +435,7 @@ private[spark] class ApplicationMaster( val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV) val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter" val params = client.getAmIpFilterParams(yarnConf, proxyBase) - if (isDriver) { + if (isClusterMode) { System.setProperty("spark.ui.filters", amFilter) params.foreach { case (k, v) => System.setProperty(s"spark.$amFilter.param.$k", v) } } else { @@ -491,7 +491,7 @@ private[spark] class ApplicationMaster( /** * An actor that communicates with the driver's scheduler backend. */ - private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor { + private class AMActor(driverUrl: String, isClusterMode: Boolean) extends Actor { var driver: ActorSelection = _ override def preStart() = { @@ -503,7 +503,7 @@ private[spark] class ApplicationMaster( driver ! RegisterClusterManager // In cluster mode, the AM can directly monitor the driver status instead // of trying to deduce it from the lifecycle of the driver's actor - if (!isDriver) { + if (!isClusterMode) { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } } @@ -513,7 +513,7 @@ private[spark] class ApplicationMaster( logInfo(s"Driver terminated or disconnected! Shutting down. $x") // In cluster mode, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails - if (!isDriver) { + if (!isClusterMode) { finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) }