diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh index abf1c7be6517d5ecbe597c529f5cee88ee306120..fcb8555d4e5706a6f4f2ec9670525437a3a2acae 100755 --- a/bin/stop-slaves.sh +++ b/bin/stop-slaves.sh @@ -17,8 +17,6 @@ # limitations under the License. # -# Starts workers on the machine this script is executed on. - bin=`dirname "$0"` bin=`cd "$bin"; pwd` diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 979e65ac6c4f0a37d08a03753cb39f31dfd0460e..275331724afba010988baf082c6364fbcce9b5bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -21,7 +21,7 @@ import scala.collection.immutable.List import org.apache.spark.deploy.ExecutorState.ExecutorState import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo} -import org.apache.spark.deploy.master.MasterState.MasterState +import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 8291e29ec3aef625f9de1fd96ca75a814c5dd819..5150b7c7dec6ab135ba65b1d7c992901e164f5e0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -39,14 +39,14 @@ private[spark] class ApplicationInfo( @transient private var nextExecutorId: Int = _ - init + init() private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init + init() } - private def init = { + private def init() { state = ApplicationState.WAITING executors = new mutable.HashMap[Int, ExecutorInfo] coresGranted = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index fc8255fa6fa305eebb9bc0cb4b946be3b5429fe6..f25a1ad3bf92afbd22539ba552b76bd60382ec78 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -29,12 +29,12 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]] * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ -trait LeaderElectionAgent extends Actor { +private[spark] trait LeaderElectionAgent extends Actor { val masterActor: ActorRef } /** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ -class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { +private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { override def preStart() { masterActor ! ElectedLeader } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 093ce09b1d585dd7df4c39be6a35fef63eac1f76..cd916672aceffbc09048e24d3e1babdad3e4044d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -79,7 +79,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val masterUrl = "spark://" + host + ":" + port var masterWebUiUrl: String = _ - var state = MasterState.STANDBY + var state = RecoveryState.STANDBY var persistenceEngine: PersistenceEngine = _ @@ -139,13 +139,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case ElectedLeader => { val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedWorkers.isEmpty) - MasterState.ALIVE + RecoveryState.ALIVE else - MasterState.RECOVERING + RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) - if (state == MasterState.RECOVERING) { + if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } @@ -159,7 +159,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (state == MasterState.STANDBY) { + if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") @@ -174,7 +174,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RegisterApplication(description) => { - if (state == MasterState.STANDBY) { + if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) @@ -262,21 +262,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // those we have an entry for in the corresponding actor hashmap actorToWorker.get(actor).foreach(removeWorker) actorToApp.get(actor).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RemoteClientDisconnected(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RemoteClientShutdown(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RequestMasterState => { @@ -324,15 +324,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def completeRecovery() { // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { - if (state != MasterState.RECOVERING) { return } - state = MasterState.COMPLETING_RECOVERY + if (state != RecoveryState.RECOVERING) { return } + state = RecoveryState.COMPLETING_RECOVERY } // Kill off any workers and apps that didn't respond to us. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) - state = MasterState.ALIVE + state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") } @@ -351,7 +351,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act * every time a new app joins or resource availability changes. */ def schedule() { - if (state != MasterState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { return } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 8c4878bd308fdf304fc1abf48244b56dc48e842b..94b986caf283518e2e809d5b6742541bc481ead7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -23,9 +23,10 @@ package org.apache.spark.deploy.master * - addApplication and addWorker are called before completing registration of a new app/worker. * - removeApplication and removeWorker are called at any time. * Given these two requirements, we will have all apps and workers persisted, but - * we might not have yet deleted apps or workers that finished. + * we might not have yet deleted apps or workers that finished (so their liveness must be verified + * during recovery). */ -trait PersistenceEngine { +private[spark] trait PersistenceEngine { def addApplication(app: ApplicationInfo) def removeApplication(app: ApplicationInfo) @@ -43,7 +44,7 @@ trait PersistenceEngine { def close() {} } -class BlackHolePersistenceEngine extends PersistenceEngine { +private[spark] class BlackHolePersistenceEngine extends PersistenceEngine { override def addApplication(app: ApplicationInfo) {} override def removeApplication(app: ApplicationInfo) {} override def addWorker(worker: WorkerInfo) {} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala similarity index 96% rename from core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala rename to core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala index eec3df3b7a6c6cb8621cd9c9c68aaecfc783a5e0..b91be821f016c834f3eff3aa00334a8154c6afe9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.master -private[spark] object MasterState +private[spark] object RecoveryState extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { type MasterState = Value diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala index f43f9f6ed7d349e1ffef555da973d444edb48bed..81e15c534fc31e1e8b8b4914ff9c24a1149014a2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many * times or a semantic exception is thrown (e.g.., "node already exists"). */ -class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { +private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE @@ -53,10 +53,13 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { /** Connect to ZooKeeper to start the session. Must be called before anything else. */ def connect() { connectToZooKeeper() - spawn(sessionMonitorThread) + + new Thread() { + override def run() = sessionMonitorThread() + }.start() } - def sessionMonitorThread = { + def sessionMonitorThread(): Unit = { while (!closed) { Thread.sleep(ZK_CHECK_PERIOD_MILLIS) if (zk.getState != ZooKeeper.States.CONNECTED) { @@ -170,7 +173,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { * * @param fn Block to execute, possibly multiple times. */ - def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = { + def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = { try { fn } catch { @@ -179,7 +182,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { case e if n > 0 => logError("ZooKeeper exception, " + n + " more retries...", e) Thread.sleep(RETRY_WAIT_MILLIS) - retry(fn)(n-1) + retry(fn, n-1) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 26090c6a9c5b6f8d6f417b521552316f26a2f6f8..e05f587b58c6437ce869fd26d7021c8dc20338b0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -42,17 +42,17 @@ private[spark] class WorkerInfo( @transient var lastHeartbeat: Long = _ - init + init() def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init + init() } - private def init = { + private def init() { executors = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 065635af85ab296aa3860a961c720a56f42ffba9..7809013e8383b2b65532d7cb1c1df738ac68c9a8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -17,17 +17,14 @@ package org.apache.spark.deploy.master -import scala.collection.JavaConversions._ - -import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership} -import org.apache.spark.Logging +import akka.actor.ActorRef import org.apache.zookeeper._ import org.apache.zookeeper.Watcher.Event.EventType -import akka.actor.{Cancellable, ActorRef} -import akka.util.duration._ +import org.apache.spark.deploy.master.MasterMessages._ +import org.apache.spark.Logging -class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) +private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index c59e1f4de628bbc2631c13078f9fe071a6169e28..0b38e239f9b024f787c4346d4b9adb076633b190 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo} import org.apache.spark.deploy.worker.ExecutorRunner class JsonProtocolSuite extends FunSuite { @@ -54,7 +54,7 @@ class JsonProtocolSuite extends FunSuite { val activeApps = Array[ApplicationInfo](createAppInfo()) val completedApps = Array[ApplicationInfo]() val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps, - MasterState.ALIVE) + RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) }