diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 7c37a166157a9c787fd2b81bf09b6be73824d2e2..31861f3ac2d985b5615eed7bb08adca40a6e02af 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -109,6 +109,7 @@ private[deploy] object DeployMessages { } // WorkerWebUI to Worker + case object RequestWorkerState // Worker to WorkerWebUI @@ -120,4 +121,9 @@ private[deploy] object DeployMessages { Utils.checkHost(host, "Required hostname") assert (port > 0) } + + // Actor System to Master + + case object CheckForWorkerTimeOut + } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index bd7924c71dfcc5741b6ea7773a33860fc7ac8398..4a4d9908a06f769f562789078e4d72fdb88b8e04 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -80,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() - context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) + context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() @@ -176,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RequestMasterState => { sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) } + + case CheckForWorkerTimeOut => { + timeOutDeadWorkers() + } } /**