From 0f9d2ace6baefeacb1abf9d51a457644b67f2f8d Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Wed, 8 Jan 2014 16:53:04 -0800 Subject: [PATCH] Adding polling to driver submission client. --- .../org/apache/spark/deploy/Client.scala | 141 +++++++++++------- .../apache/spark/deploy/DeployMessage.scala | 11 +- .../spark/deploy/master/DriverState.scala | 5 +- .../apache/spark/deploy/master/Master.scala | 29 ++-- .../spark/deploy/worker/DriverRunner.scala | 12 +- .../apache/spark/deploy/worker/Worker.scala | 2 +- 6 files changed, 132 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 43b9b1cff9..e133893f6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -22,60 +22,30 @@ import scala.collection.mutable.Map import scala.concurrent._ import akka.actor._ +import akka.pattern.ask import org.apache.log4j.{Level, Logger} import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.Master +import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.util.{AkkaUtils, Utils} +import akka.actor.Actor.emptyBehavior +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} /** - * Actor that sends a single message to the standalone master and returns the response in the - * given promise. + * Proxy that relays messages to the driver. */ -class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging { - override def receive = { - case SubmitDriverResponse(success, message) => { - response.success((success, message)) - } - - case KillDriverResponse(success, message) => { - response.success((success, message)) - } +class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { + var masterActor: ActorSelection = _ + val timeout = AkkaUtils.askTimeout(conf) - // Relay all other messages to the master. - case message => { - logInfo(s"Sending message to master $master...") - val masterActor = context.actorSelection(Master.toAkkaUrl(master)) - masterActor ! message - } - } -} + override def preStart() = { + masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) -/** - * Executable utility for starting and terminating drivers inside of a standalone cluster. - */ -object Client { - - def main(args: Array[String]) { - val driverArgs = new ClientArguments(args) - val conf = new SparkConf() - - if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { - conf.set("spark.akka.logLifecycleEvents", "true") - } - conf.set("spark.akka.askTimeout", "5") - Logger.getRootLogger.setLevel(driverArgs.logLevel) - - // TODO: See if we can initialize akka so return messages are sent back using the same TCP - // flow. Else, this (sadly) requires the DriverClient be routable from the Master. - val (actorSystem, _) = AkkaUtils.createActorSystem( - "driverClient", Utils.localHostName(), 0, false, conf) - val master = driverArgs.master - val response = promise[(Boolean, String)] - val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response))) + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") + driverArgs.cmd match { case "launch" => // TODO: We could add an env variable here and intercept it in `sc.addJar` that would @@ -94,21 +64,88 @@ object Client { driverArgs.cores, driverArgs.supervise, command) - driver ! RequestSubmitDriver(driverDescription) + + masterActor ! RequestSubmitDriver(driverDescription) case "kill" => val driverId = driverArgs.driverId - driver ! RequestKillDriver(driverId) + val killFuture = masterActor ! RequestKillDriver(driverId) + } + } + + /* Find out driver status then exit the JVM */ + def pollAndReportStatus(driverId: String) { + println(s"... waiting before polling master for driver state") + Thread.sleep(5000) + println("... polling master for driver state") + val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout) + .mapTo[DriverStatusResponse] + val statusResponse = Await.result(statusFuture, timeout) + + statusResponse.found match { + case false => + println(s"ERROR: Cluster master did not recognize $driverId") + System.exit(-1) + case true => + println(s"State of $driverId is ${statusResponse.state.get}") + // Worker node, if present + (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + println(s"Driver running on $hostPort ($id)") + case _ => + } + // Exception, if present + statusResponse.exception.map { e => + println(s"Exception from cluster was: $e") + System.exit(-1) + } + System.exit(0) } + } + + override def receive = { + + case SubmitDriverResponse(success, driverId, message) => + println(message) + if (success) pollAndReportStatus(driverId.get) else System.exit(-1) + + case KillDriverResponse(driverId, success, message) => + println(message) + if (success) pollAndReportStatus(driverId) else System.exit(-1) + + case DisassociatedEvent(_, remoteAddress, _) => + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + System.exit(-1) + + case AssociationErrorEvent(cause, _, remoteAddress, _) => + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + println(s"Cause was: $cause") + System.exit(-1) + } +} + +/** + * Executable utility for starting and terminating drivers inside of a standalone cluster. + */ +object Client { + def main(args: Array[String]) { + val conf = new SparkConf() + val driverArgs = new ClientArguments(args) + + if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { + conf.set("spark.akka.logLifecycleEvents", "true") + } + conf.set("spark.akka.askTimeout", "10") + conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) + Logger.getRootLogger.setLevel(driverArgs.logLevel) + + // TODO: See if we can initialize akka so return messages are sent back using the same TCP + // flow. Else, this (sadly) requires the DriverClient be routable from the Master. + val (actorSystem, _) = AkkaUtils.createActorSystem( + "driverClient", Utils.localHostName(), 0, false, conf) + + actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) - val (success, message) = - try { - Await.result(response.future, AkkaUtils.askTimeout(conf)) - } catch { - case e: TimeoutException => (false, s"Error: Timed out sending message to $master") - } - println(message) - actorSystem.shutdown() actorSystem.awaitTermination() } } diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 34460d359d..5e824e1a67 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -112,11 +112,18 @@ private[deploy] object DeployMessages { case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage - case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage + case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String) + extends DeployMessage case class RequestKillDriver(driverId: String) extends DeployMessage - case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage + case class KillDriverResponse(driverId: String, success: Boolean, message: String) + extends DeployMessage + + case class RequestDriverStatus(driverId: String) extends DeployMessage + + case class DriverStatusResponse(found: Boolean, state: Option[DriverState], + workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception]) // Internal message in AppClient diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala index 93b260740e..26a68bade3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala @@ -27,6 +27,7 @@ private[spark] object DriverState extends Enumeration { // RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again // UNKNOWN: The state of the driver is temporarily not known due to master failure recovery // KILLED: A user manually killed this driver - // FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file) - val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value + // FAILED: The driver exited non-zero and was not supervised + // ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file) + val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR = Value } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f62601fa6c..cd3f3ebefc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -186,7 +186,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." - sender ! SubmitDriverResponse(false, msg) + sender ! SubmitDriverResponse(false, None, msg) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) @@ -198,14 +198,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". - sender ! SubmitDriverResponse(true, s"Driver successfully submitted as ${driver.id}") + sender ! SubmitDriverResponse(true, Some(driver.id), + s"Driver successfully submitted as ${driver.id}") } } case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only kill drivers in ALIVE state. Current state: $state." - sender ! KillDriverResponse(false, msg) + sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) @@ -226,15 +227,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) - sender ! KillDriverResponse(true, msg) + sender ! KillDriverResponse(driverId, success = true, msg) case None => - val msg = s"Could not find running driver $driverId" + val msg = s"Driver $driverId has already finished or does not exist" logWarning(msg) - sender ! KillDriverResponse(false, msg) + sender ! KillDriverResponse(driverId, success = false, msg) } } } + case RequestDriverStatus(driverId) => { + (drivers ++ completedDrivers).find(_.id == driverId) match { + case Some(driver) => + sender ! DriverStatusResponse(found = true, Some(driver.state), + driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) + case None => + sender ! DriverStatusResponse(found = false, None, None, None, None) + } + } + case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response @@ -279,7 +290,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case DriverStateChanged(driverId, state, exception) => { state match { - case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED => + case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") @@ -410,7 +421,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act logWarning(s"Re-launching ${d.id}") relaunchDriver(d) } else { - removeDriver(d.id, DriverState.FAILED, None) + removeDriver(d.id, DriverState.ERROR, None) logWarning(s"Did not re-launch ${d.id} because it was not supervised") } } @@ -539,7 +550,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act relaunchDriver(driver) } else { logInfo(s"Not re-launching ${driver.id} because it was not supervised") - removeDriver(driver.id, DriverState.FAILED, None) + removeDriver(driver.id, DriverState.ERROR, None) } } persistenceEngine.removeWorker(worker) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index ad70345a7f..b4df1a0dd4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -52,6 +52,7 @@ private[spark] class DriverRunner( // Populated once finished var finalState: Option[DriverState] = None var finalException: Option[Exception] = None + var finalExitCode: Option[Int] = None // Decoupled for testing private[deploy] def setClock(_clock: Clock) = clock = _clock @@ -87,8 +88,14 @@ private[spark] class DriverRunner( val state = if (killed) { DriverState.KILLED } - else if (finalException.isDefined) { DriverState.FAILED } - else { DriverState.FINISHED } + else if (finalException.isDefined) { DriverState.ERROR } + else { + finalExitCode match { + case Some(0) => DriverState.FINISHED + case _ => DriverState.FAILED + } + } + finalState = Some(state) worker ! DriverStateChanged(driverId, state, finalException) @@ -200,6 +207,7 @@ private[spark] class DriverRunner( } keepTrying = supervise && exitCode != 0 && !killed + finalExitCode = Some(exitCode) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 2a2b7a3881..273bacded6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -272,7 +272,7 @@ private[spark] class Worker( case DriverStateChanged(driverId, state, exception) => { state match { - case DriverState.FAILED => + case DriverState.ERROR => logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") -- GitLab