Skip to content
Snippets Groups Projects
Commit 35d8f5ee authored by Mark Hamstra's avatar Mark Hamstra
Browse files

Moved handling of timed out workers within the Master actor

parent 37ccf930
No related branches found
No related tags found
No related merge requests found
......@@ -109,6 +109,7 @@ private[deploy] object DeployMessages {
}
// WorkerWebUI to Worker
case object RequestWorkerState
// Worker to WorkerWebUI
......@@ -120,4 +121,9 @@ private[deploy] object DeployMessages {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
}
// Actor System to Master
case object CheckForWorkerTimeOut
}
......@@ -80,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
......@@ -176,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
}
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment