diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index d8084a57658ad08aafdba54a81753682a9865548..3feb7cea593e04e486a0c752b7ba8faeddfcdf5d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -69,9 +69,14 @@ private[deploy] object DeployMessages { // Master to Worker + sealed trait RegisterWorkerResponse + case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage + with RegisterWorkerResponse + + case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse - case class RegisterWorkerFailed(message: String) extends DeployMessage + case object MasterInStandby extends DeployMessage with RegisterWorkerResponse case class ReconnectWorker(masterUrl: String) extends DeployMessage diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index d518e92133aad3f6a2e16f5ab8a5aea9a85175fe..6715d6c70f49743fd6d690f557d2680a9b3cdd5d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -233,31 +233,6 @@ private[deploy] class Master( System.exit(0) } - case RegisterWorker( - id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { - logInfo("Registering worker %s:%d with %d cores, %s RAM".format( - workerHost, workerPort, cores, Utils.megabytesToString(memory))) - if (state == RecoveryState.STANDBY) { - // ignore, don't send response - } else if (idToWorker.contains(id)) { - workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) - } else { - val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, - workerRef, workerUiPort, publicAddress) - if (registerWorker(worker)) { - persistenceEngine.addWorker(worker) - workerRef.send(RegisteredWorker(self, masterWebUiUrl)) - schedule() - } else { - val workerAddress = worker.endpoint.address - logWarning("Worker registration failed. Attempted to re-register worker at same " + - "address: " + workerAddress) - workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " - + workerAddress)) - } - } - } - case RegisterApplication(description, driver) => { // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { @@ -387,6 +362,31 @@ private[deploy] class Master( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + case RegisterWorker( + id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { + logInfo("Registering worker %s:%d with %d cores, %s RAM".format( + workerHost, workerPort, cores, Utils.megabytesToString(memory))) + if (state == RecoveryState.STANDBY) { + context.reply(MasterInStandby) + } else if (idToWorker.contains(id)) { + context.reply(RegisterWorkerFailed("Duplicate worker ID")) + } else { + val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, + workerRef, workerUiPort, publicAddress) + if (registerWorker(worker)) { + persistenceEngine.addWorker(worker) + context.reply(RegisteredWorker(self, masterWebUiUrl)) + schedule() + } else { + val workerAddress = worker.endpoint.address + logWarning("Worker registration failed. Attempted to re-register worker at same " + + "address: " + workerAddress) + context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + + workerAddress)) + } + } + } + case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 93a1b3f3104223cb6ddd2aba4bab2ac5c3e09db5..a45867e7680ec7f0ccd61e805480d2160c3cb1f4 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -26,7 +26,7 @@ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFut import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext -import scala.util.Random +import scala.util.{Failure, Random, Success} import scala.util.control.NonFatal import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -213,8 +213,7 @@ private[deploy] class Worker( logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) - masterEndpoint.send(RegisterWorker( - workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) @@ -271,8 +270,7 @@ private[deploy] class Worker( logInfo("Connecting to master " + masterAddress + "...") val masterEndpoint = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME) - masterEndpoint.send(RegisterWorker( - workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + registerWithMaster(masterEndpoint) } catch { case ie: InterruptedException => // Cancelled case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e) @@ -341,25 +339,54 @@ private[deploy] class Worker( } } - override def receive: PartialFunction[Any, Unit] = { - case RegisteredWorker(masterRef, masterWebUiUrl) => - logInfo("Successfully registered with master " + masterRef.address.toSparkURL) - registered = true - changeMaster(masterRef, masterWebUiUrl) - forwordMessageScheduler.scheduleAtFixedRate(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - self.send(SendHeartbeat) - } - }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) - if (CLEANUP_ENABLED) { - logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { + masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( + workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress)) + .onComplete { + // This is a very fast action so we can use "ThreadUtils.sameThread" + case Success(msg) => + Utils.tryLogNonFatalError { + handleRegisterResponse(msg) + } + case Failure(e) => + logError(s"Cannot register with master: ${masterEndpoint.address}", e) + System.exit(1) + }(ThreadUtils.sameThread) + } + + private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { + msg match { + case RegisteredWorker(masterRef, masterWebUiUrl) => + logInfo("Successfully registered with master " + masterRef.address.toSparkURL) + registered = true + changeMaster(masterRef, masterWebUiUrl) forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(WorkDirCleanup) + self.send(SendHeartbeat) } - }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) - } + }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) + if (CLEANUP_ENABLED) { + logInfo( + s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + forwordMessageScheduler.scheduleAtFixedRate(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(WorkDirCleanup) + } + }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) + } + case RegisterWorkerFailed(message) => + if (!registered) { + logError("Worker registration failed: " + message) + System.exit(1) + } + + case MasterInStandby => + // Ignore. Master not yet ready. + } + } + + override def receive: PartialFunction[Any, Unit] = synchronized { case SendHeartbeat => if (connected) { sendToMaster(Heartbeat(workerId, self)) } @@ -399,12 +426,6 @@ private[deploy] class Worker( map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq)) - case RegisterWorkerFailed(message) => - if (!registered) { - logError("Worker registration failed: " + message) - System.exit(1) - } - case ReconnectWorker(masterUrl) => logInfo(s"Master with url $masterUrl requested this worker to reconnect.") registerWithMaster() diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 49059de50b42b770bc0eea1308742985c457c87e..a9c6a05ecd4345133d2fe293db096d0a7e28efd3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -59,12 +59,12 @@ private[spark] class CoarseGrainedExecutorBackend( rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) - ref.ask[RegisteredExecutor.type]( + ref.ask[RegisterExecutorResponse]( RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { - Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor + Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse } case Failure(e) => { logError(s"Cannot register with driver: $driverUrl", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index e0d25dc50c988d7396fcdc7247bd495a3f67ed54..4652df32efa74ba1cd0fa714a1be8e49c302e835 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -36,9 +36,13 @@ private[spark] object CoarseGrainedClusterMessages { case class KillTask(taskId: Long, executor: String, interruptThread: Boolean) extends CoarseGrainedClusterMessage + sealed trait RegisterExecutorResponse + case object RegisteredExecutor extends CoarseGrainedClusterMessage + with RegisterExecutorResponse case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage + with RegisterExecutorResponse // Executors to driver case class RegisterExecutor( diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 18f2229fea39b6d0ac50faf6f8f4339f847be730..3cd80c0f7d171dde11603ec7df1c26a7470b71ec 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -173,9 +173,9 @@ class HeartbeatReceiverSuite val dummyExecutorEndpoint2 = new FakeExecutorEndpoint(rpcEnv) val dummyExecutorEndpointRef1 = rpcEnv.setupEndpoint("fake-executor-1", dummyExecutorEndpoint1) val dummyExecutorEndpointRef2 = rpcEnv.setupEndpoint("fake-executor-2", dummyExecutorEndpoint2) - fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( + fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "dummy:4040", 0, Map.empty)) - fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisteredExecutor.type]( + fakeSchedulerBackend.driverEndpoint.askWithRetry[RegisterExecutorResponse]( RegisterExecutor(executorId2, dummyExecutorEndpointRef2, "dummy:4040", 0, Map.empty)) heartbeatReceiverRef.askWithRetry[Boolean](TaskSchedulerIsSet) addExecutorAndVerify(executorId1)