diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 82bb33a2ec028ac4a37434d742d19e008b54adf0..7bfc3779abb9197f9a71f8a0fd60bf600dd12207 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -83,8 +83,7 @@ private[deploy] object DeployMessages { sparkHome: String) extends DeployMessage - case class LaunchDriver(driverId: String, driverDesc: DriverDescription) - extends DeployMessage + case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage case class KillDriver(driverId: String) extends DeployMessage @@ -134,8 +133,8 @@ private[deploy] object DeployMessages { // Master to MasterWebUI case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], - activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], + activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], status: MasterState) { Utils.checkHost(host, "Required hostname") diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala index 60e654918807a397671e64b8d2252c00174264af..28bc54962e520a14281326390814efebdc3681c6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClientArguments.scala @@ -89,7 +89,7 @@ private[spark] class DriverClientArguments(args: Array[String]) { */ def printUsageAndExit(exitCode: Int) { System.err.println( - "usage: DriverClient launch [options] <active-master> <jar-url> <main-class> " + + "usage: DriverClient [options] launch <active-master> <jar-url> <main-class> " + "[driver options]\n" + "usage: DriverClient kill <active-master> <driver-id>\n\n" + "Options:\n" + diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 052c474d2c71f0baeaf361fa567e0dce6afa714e..33377931d69931eb822af6a3183bc117e3ebdb3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -33,5 +33,4 @@ private[spark] class DriverInfo( @transient var exception: Option[Exception] = None /* Most recent worker assigned to this driver */ @transient var worker: Option[WorkerInfo] = None - } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0528ef43a155c2f3b90644488db12d618a647a8f..7f9ad8a7efeffbc131561a7142103bd1bd911a48 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -19,11 +19,11 @@ package org.apache.spark.deploy.master import java.text.SimpleDateFormat import java.util.Date -import java.util.concurrent.TimeUnit import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.Random import akka.actor._ import akka.pattern.ask @@ -174,8 +174,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestSubmitDriver(description) => { - if (state == RecoveryState.STANDBY) { - sender ! SubmitDriverResponse(false, "Standby master cannot accept driver submission") + if (state != RecoveryState.ALIVE) { + val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." + sender ! SubmitDriverResponse(false, msg) } else { logInfo("Driver submitted " + description.mainClass) val driver = createDriver(description) @@ -192,14 +193,18 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestKillDriver(driverId) => { - if (state == RecoveryState.STANDBY) { - sender ! KillDriverResponse(false, "Standby master cannot kill drivers") + if (state != RecoveryState.ALIVE) { + val msg = s"Can only kill drivers in ALIVE state. Current state: $state." + sender ! KillDriverResponse(false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => - if (waitingDrivers.contains(d)) { waitingDrivers -= d } + if (waitingDrivers.contains(d)) { + waitingDrivers -= d + self ! DriverStateChanged(driverId, DriverState.KILLED, None) + } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master @@ -208,6 +213,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act w.actor ! KillDriver(driverId) } } + // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) sender ! KillDriverResponse(true, msg) @@ -338,8 +344,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestMasterState => { - sender ! MasterStateResponse(host, port, workers.toArray, drivers.toArray, - completedDrivers.toArray ,apps.toArray, completedApps.toArray, state) + sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, + drivers.toArray, completedDrivers.toArray, state) } case CheckForWorkerTimeOut => { @@ -423,10 +429,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act */ def schedule() { if (state != RecoveryState.ALIVE) { return } + // First schedule drivers, they take strict precedence over applications - for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { - for (driver <- Seq(waitingDrivers: _*)) { - if (worker.memoryFree > driver.desc.mem && worker.coresFree > driver.desc.cores) { + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 28cd46359c918ce4acee55a65d1f26e8aa138e97..c5fa9cf7d7c2d549aa4dbc421942af1f493020bb 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -37,8 +37,8 @@ private[spark] class WorkerInfo( Utils.checkHost(host, "Expected hostname") assert (port > 0) - @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info - @transient var drivers: mutable.HashMap[String, DriverInfo] = _ + @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info + @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @transient var memoryUsed: Int = _ diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 3c6fca378042963f098d0ed867189a67b4148fac..951fc679ef381b88f59951b263a6697f426e6378 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -94,7 +94,6 @@ private[spark] class IndexPage(parent: MasterWebUI) { <div class="row-fluid"> <div class="span12"> <h4> Running Applications </h4> - {activeAppsTable} </div> </div> @@ -109,7 +108,6 @@ private[spark] class IndexPage(parent: MasterWebUI) { <div class="row-fluid"> <div class="span12"> <h4> Active Drivers </h4> - {activeDriversTable} </div> </div> @@ -167,7 +165,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td>{driver.worker.map(w => w.id.toString).getOrElse("None")}</td> <td>{driver.state}</td> <td sorttable_customkey={driver.desc.cores.toString}> - {driver.desc.cores.toString} + {driver.desc.cores} </td> <td sorttable_customkey={driver.desc.mem.toString}> {Utils.megabytesToString(driver.desc.mem.toLong)} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 42c28cf22d04a33831cb2b5d204d83548c5688ed..21ec881f465a38747d5e94386899410cb8f263a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -222,8 +222,8 @@ private[spark] class Worker( logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) - finishedExecutors(fullId) = executor executors -= fullId + finishedExecutors(fullId) = executor coresUsed -= executor.cores memoryUsed -= executor.memory } @@ -248,8 +248,8 @@ private[spark] class Worker( drivers(driverId) = driver driver.start() - coresUsed += 1 - memoryUsed += memory + coresUsed += driverDesc.cores + memoryUsed += driverDesc.mem } case KillDriver(driverId) => { @@ -269,16 +269,16 @@ private[spark] class Worker( case DriverState.FINISHED => logInfo(s"Driver $driverId exited successfully") case DriverState.KILLED => - logInfo(s"Driver $driverId was killed") + logInfo(s"Driver $driverId was killed by user") } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception) } val driver = drivers(driverId) - memoryUsed -= driver.driverDesc.mem - coresUsed -= driver.driverDesc.cores drivers -= driverId finishedDrivers(driverId) = driver + memoryUsed -= driver.driverDesc.mem + coresUsed -= driver.driverDesc.cores } case x: DisassociatedEvent if x.remoteAddress == masterAddress =>