Skip to content
Snippets Groups Projects
Commit 0f9d2ace authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding polling to driver submission client.

parent 62b08faa
No related branches found
No related tags found
No related merge requests found
......@@ -22,60 +22,30 @@ import scala.collection.mutable.Map
import scala.concurrent._
import akka.actor._
import akka.pattern.ask
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
import akka.actor.Actor.emptyBehavior
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
/**
* Actor that sends a single message to the standalone master and returns the response in the
* given promise.
* Proxy that relays messages to the driver.
*/
class DriverActor(master: String, response: Promise[(Boolean, String)]) extends Actor with Logging {
override def receive = {
case SubmitDriverResponse(success, message) => {
response.success((success, message))
}
case KillDriverResponse(success, message) => {
response.success((success, message))
}
class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
// Relay all other messages to the master.
case message => {
logInfo(s"Sending message to master $master...")
val masterActor = context.actorSelection(Master.toAkkaUrl(master))
masterActor ! message
}
}
}
override def preStart() = {
masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
/**
* Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
object Client {
def main(args: Array[String]) {
val driverArgs = new ClientArguments(args)
val conf = new SparkConf()
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "5")
Logger.getRootLogger.setLevel(driverArgs.logLevel)
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, false, conf)
val master = driverArgs.master
val response = promise[(Boolean, String)]
val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}")
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
......@@ -94,21 +64,88 @@ object Client {
driverArgs.cores,
driverArgs.supervise,
command)
driver ! RequestSubmitDriver(driverDescription)
masterActor ! RequestSubmitDriver(driverDescription)
case "kill" =>
val driverId = driverArgs.driverId
driver ! RequestKillDriver(driverId)
val killFuture = masterActor ! RequestKillDriver(driverId)
}
}
/* Find out driver status then exit the JVM */
def pollAndReportStatus(driverId: String) {
println(s"... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
System.exit(-1)
case true =>
println(s"State of $driverId is ${statusResponse.state.get}")
// Worker node, if present
(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
println(s"Driver running on $hostPort ($id)")
case _ =>
}
// Exception, if present
statusResponse.exception.map { e =>
println(s"Exception from cluster was: $e")
System.exit(-1)
}
System.exit(0)
}
}
override def receive = {
case SubmitDriverResponse(success, driverId, message) =>
println(message)
if (success) pollAndReportStatus(driverId.get) else System.exit(-1)
case KillDriverResponse(driverId, success, message) =>
println(message)
if (success) pollAndReportStatus(driverId) else System.exit(-1)
case DisassociatedEvent(_, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)
case AssociationErrorEvent(cause, _, remoteAddress, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
}
}
/**
* Executable utility for starting and terminating drivers inside of a standalone cluster.
*/
object Client {
def main(args: Array[String]) {
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.akka.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0, false, conf)
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
val (success, message) =
try {
Await.result(response.future, AkkaUtils.askTimeout(conf))
} catch {
case e: TimeoutException => (false, s"Error: Timed out sending message to $master")
}
println(message)
actorSystem.shutdown()
actorSystem.awaitTermination()
}
}
......@@ -112,11 +112,18 @@ private[deploy] object DeployMessages {
case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
case class SubmitDriverResponse(success: Boolean, message: String) extends DeployMessage
case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String)
extends DeployMessage
case class RequestKillDriver(driverId: String) extends DeployMessage
case class KillDriverResponse(success: Boolean, message: String) extends DeployMessage
case class KillDriverResponse(driverId: String, success: Boolean, message: String)
extends DeployMessage
case class RequestDriverStatus(driverId: String) extends DeployMessage
case class DriverStatusResponse(found: Boolean, state: Option[DriverState],
workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception])
// Internal message in AppClient
......
......@@ -27,6 +27,7 @@ private[spark] object DriverState extends Enumeration {
// RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again
// UNKNOWN: The state of the driver is temporarily not known due to master failure recovery
// KILLED: A user manually killed this driver
// FAILED: Unable to run due to an unrecoverable error (e.g. missing jar file)
val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED = Value
// FAILED: The driver exited non-zero and was not supervised
// ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file)
val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR = Value
}
......@@ -186,7 +186,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
sender ! SubmitDriverResponse(false, msg)
sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
......@@ -198,14 +198,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
sender ! SubmitDriverResponse(true, s"Driver successfully submitted as ${driver.id}")
sender ! SubmitDriverResponse(true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}")
}
}
case RequestKillDriver(driverId) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only kill drivers in ALIVE state. Current state: $state."
sender ! KillDriverResponse(false, msg)
sender ! KillDriverResponse(driverId, success = false, msg)
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
......@@ -226,15 +227,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// TODO: It would be nice for this to be a synchronous response
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
sender ! KillDriverResponse(true, msg)
sender ! KillDriverResponse(driverId, success = true, msg)
case None =>
val msg = s"Could not find running driver $driverId"
val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
sender ! KillDriverResponse(false, msg)
sender ! KillDriverResponse(driverId, success = false, msg)
}
}
}
case RequestDriverStatus(driverId) => {
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
sender ! DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)
case None =>
sender ! DriverStatusResponse(found = false, None, None, None, None)
}
}
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
......@@ -279,7 +290,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
......@@ -410,7 +421,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
logWarning(s"Re-launching ${d.id}")
relaunchDriver(d)
} else {
removeDriver(d.id, DriverState.FAILED, None)
removeDriver(d.id, DriverState.ERROR, None)
logWarning(s"Did not re-launch ${d.id} because it was not supervised")
}
}
......@@ -539,7 +550,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
relaunchDriver(driver)
} else {
logInfo(s"Not re-launching ${driver.id} because it was not supervised")
removeDriver(driver.id, DriverState.FAILED, None)
removeDriver(driver.id, DriverState.ERROR, None)
}
}
persistenceEngine.removeWorker(worker)
......
......@@ -52,6 +52,7 @@ private[spark] class DriverRunner(
// Populated once finished
var finalState: Option[DriverState] = None
var finalException: Option[Exception] = None
var finalExitCode: Option[Int] = None
// Decoupled for testing
private[deploy] def setClock(_clock: Clock) = clock = _clock
......@@ -87,8 +88,14 @@ private[spark] class DriverRunner(
val state =
if (killed) { DriverState.KILLED }
else if (finalException.isDefined) { DriverState.FAILED }
else { DriverState.FINISHED }
else if (finalException.isDefined) { DriverState.ERROR }
else {
finalExitCode match {
case Some(0) => DriverState.FINISHED
case _ => DriverState.FAILED
}
}
finalState = Some(state)
worker ! DriverStateChanged(driverId, state, finalException)
......@@ -200,6 +207,7 @@ private[spark] class DriverRunner(
}
keepTrying = supervise && exitCode != 0 && !killed
finalExitCode = Some(exitCode)
}
}
}
......
......@@ -272,7 +272,7 @@ private[spark] class Worker(
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.FAILED =>
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment