From 35d8f5ee521dc1873548a978d27b10644076a0c0 Mon Sep 17 00:00:00 2001 From: Mark Hamstra <markhamstra@gmail.com> Date: Sun, 4 Aug 2013 09:13:41 -0700 Subject: [PATCH] Moved handling of timed out workers within the Master actor --- core/src/main/scala/spark/deploy/DeployMessage.scala | 6 ++++++ core/src/main/scala/spark/deploy/master/Master.scala | 6 +++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 7c37a16615..31861f3ac2 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -109,6 +109,7 @@ private[deploy] object DeployMessages { } // WorkerWebUI to Worker + case object RequestWorkerState // Worker to WorkerWebUI @@ -120,4 +121,9 @@ private[deploy] object DeployMessages { Utils.checkHost(host, "Required hostname") assert (port > 0) } + + // Actor System to Master + + case object CheckForWorkerTimeOut + } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index bd7924c71d..4a4d9908a0 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -80,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() - context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) + context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() @@ -176,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RequestMasterState => { sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) } + + case CheckForWorkerTimeOut => { + timeOutDeadWorkers() + } } /** -- GitLab