Skip to content
Snippets Groups Projects
Commit 81f8f340 authored by lianhuiwang's avatar lianhuiwang Committed by Andrew Or
Browse files

[SPARK-4955]With executor dynamic scaling enabled,executor shoude be added or...

[SPARK-4955]With executor dynamic scaling enabled,executor shoude be added or killed in yarn-cluster mode.

With executor dynamic scaling enabled, executor number shoude be added or killed in yarn-cluster mode.so in yarn-cluster mode, ApplicationMaster start a AMActor that add or kill a executor. then YarnSchedulerActor  in YarnSchedulerBackend send message to am's AMActor.
andrewor14 ChengXiangLi tdas

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes #3962 from lianhuiwang/SPARK-4955 and squashes the following commits:

48d9ebb [lianhuiwang] update with andrewor14's comments
12426af [lianhuiwang] refactor am's code
45da3b0 [lianhuiwang] remove unrelated code
9318fc1 [lianhuiwang] update with andrewor14's comments
08ba473 [lianhuiwang] address andrewor14's comments
265c36d [lianhuiwang] fix small change
f43bda8 [lianhuiwang] fix address andrewor14's comments
7a7767a [lianhuiwang] fix address andrewor14's comments
bbc4d5a [lianhuiwang] address andrewor14's comments
1b029a4 [lianhuiwang] fix bug
7d33791 [lianhuiwang] in AM create a new actorSystem
2164ea8 [lianhuiwang] fix a min bug
6dfeeec [lianhuiwang] in yarn-cluster mode,executor number can be added or killed.
parent 456c11f1
No related branches found
No related tags found
No related merge requests found
...@@ -43,8 +43,11 @@ import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils} ...@@ -43,8 +43,11 @@ import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
/** /**
* Common application master functionality for Spark on Yarn. * Common application master functionality for Spark on Yarn.
*/ */
private[spark] class ApplicationMaster(args: ApplicationMasterArguments, private[spark] class ApplicationMaster(
client: YarnRMClient) extends Logging { args: ApplicationMasterArguments,
client: YarnRMClient)
extends Logging {
// TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
// optimal as more containers are available. Might need to handle this better. // optimal as more containers are available. Might need to handle this better.
...@@ -231,6 +234,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -231,6 +234,24 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
reporterThread = launchReporterThread() reporterThread = launchReporterThread()
} }
/**
* Create an actor that communicates with the driver.
*
* In cluster mode, the AM and the driver belong to same process
* so the AM actor need not monitor lifecycle of the driver.
*/
private def runAMActor(
host: String,
port: String,
isDriver: Boolean): Unit = {
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
SparkEnv.driverActorSystemName,
host,
port,
YarnSchedulerBackend.ACTOR_NAME)
actor = actorSystem.actorOf(Props(new AMActor(driverUrl, isDriver)), name = "YarnAM")
}
private def runDriver(securityMgr: SecurityManager): Unit = { private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter() addAmIpFilter()
userClassThread = startUserClass() userClassThread = startUserClass()
...@@ -245,6 +266,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -245,6 +266,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
ApplicationMaster.EXIT_SC_NOT_INITED, ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.") "Timed out waiting for SparkContext.")
} else { } else {
actorSystem = sc.env.actorSystem
runAMActor(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isDriver = true)
registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr)
userClassThread.join() userClassThread.join()
} }
...@@ -253,7 +279,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -253,7 +279,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0, actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
conf = sparkConf, securityManager = securityMgr)._1 conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver() waitForSparkDriver()
addAmIpFilter() addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
...@@ -367,7 +393,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -367,7 +393,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
} }
} }
private def waitForSparkDriver(): ActorRef = { private def waitForSparkDriver(): Unit = {
logInfo("Waiting for Spark driver to be reachable.") logInfo("Waiting for Spark driver to be reachable.")
var driverUp = false var driverUp = false
val hostport = args.userArgs(0) val hostport = args.userArgs(0)
...@@ -399,12 +425,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -399,12 +425,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkConf.set("spark.driver.host", driverHost) sparkConf.set("spark.driver.host", driverHost)
sparkConf.set("spark.driver.port", driverPort.toString) sparkConf.set("spark.driver.port", driverPort.toString)
val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( runAMActor(driverHost, driverPort.toString, isDriver = false)
SparkEnv.driverActorSystemName,
driverHost,
driverPort.toString,
YarnSchedulerBackend.ACTOR_NAME)
actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
} }
/** Add the Yarn IP filter that is required for properly securing the UI. */ /** Add the Yarn IP filter that is required for properly securing the UI. */
...@@ -462,9 +483,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -462,9 +483,9 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
} }
/** /**
* Actor that communicates with the driver in client deploy mode. * An actor that communicates with the driver's scheduler backend.
*/ */
private class AMActor(driverUrl: String) extends Actor { private class AMActor(driverUrl: String, isDriver: Boolean) extends Actor {
var driver: ActorSelection = _ var driver: ActorSelection = _
override def preStart() = { override def preStart() = {
...@@ -474,13 +495,21 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments, ...@@ -474,13 +495,21 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
// we can monitor Lifecycle Events. // we can monitor Lifecycle Events.
driver ! "Hello" driver ! "Hello"
driver ! RegisterClusterManager driver ! RegisterClusterManager
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) // In cluster mode, the AM can directly monitor the driver status instead
// of trying to deduce it from the lifecycle of the driver's actor
if (!isDriver) {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
} }
override def receive = { override def receive = {
case x: DisassociatedEvent => case x: DisassociatedEvent =>
logInfo(s"Driver terminated or disconnected! Shutting down. $x") logInfo(s"Driver terminated or disconnected! Shutting down. $x")
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) // In cluster mode, do not rely on the disassociated event to exit
// This avoids potentially reporting incorrect exit codes if the driver fails
if (!isDriver) {
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
}
case x: AddWebUIFilter => case x: AddWebUIFilter =>
logInfo(s"Add WebUI Filter. $x") logInfo(s"Add WebUI Filter. $x")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment