Skip to content
Snippets Groups Projects
Commit 1b47fa27 authored by root's avatar root Committed by Matei Zaharia
Browse files

Detect hard crashes of workers using a heartbeat mechanism.

Also fixes some issues in the rest of the code with detecting workers this way.

Conflicts:
	core/src/main/scala/spark/deploy/master/Master.scala
	core/src/main/scala/spark/deploy/worker/Worker.scala
	core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
	core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
	core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
parent 05d2e948
No related branches found
No related tags found
No related merge requests found
...@@ -30,6 +30,8 @@ case class ExecutorStateChanged( ...@@ -30,6 +30,8 @@ case class ExecutorStateChanged(
exitStatus: Option[Int]) exitStatus: Option[Int])
extends DeployMessage extends DeployMessage
private[spark] case class Heartbeat(workerId: String) extends DeployMessage
// Master to Worker // Master to Worker
private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
...@@ -45,7 +47,6 @@ private[spark] case class LaunchExecutor( ...@@ -45,7 +47,6 @@ private[spark] case class LaunchExecutor(
sparkHome: String) sparkHome: String)
extends DeployMessage extends DeployMessage
// Client to Master // Client to Master
private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
......
...@@ -3,6 +3,7 @@ package spark.deploy.master ...@@ -3,6 +3,7 @@ package spark.deploy.master
import akka.actor._ import akka.actor._
import akka.actor.Terminated import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
import akka.util.duration._
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
...@@ -16,6 +17,7 @@ import spark.util.AkkaUtils ...@@ -16,6 +17,7 @@ import spark.util.AkkaUtils
private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
var nextJobNumber = 0 var nextJobNumber = 0
val workers = new HashSet[WorkerInfo] val workers = new HashSet[WorkerInfo]
...@@ -46,6 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -46,6 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Listen for remote client disconnection events, since they don't go through Akka's watch() // Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi() startWebUi()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
} }
def startWebUi() { def startWebUi() {
...@@ -111,6 +114,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -111,6 +114,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
} }
} }
case Heartbeat(workerId) => {
idToWorker.get(workerId) match {
case Some(workerInfo) =>
workerInfo.lastHeartbeat = System.currentTimeMillis()
case None =>
logWarning("Got heartbeat from unregistered worker " + workerId)
}
}
case Terminated(actor) => { case Terminated(actor) => {
// The disconnected actor could've been either a worker or a job; remove whichever of // The disconnected actor could've been either a worker or a job; remove whichever of
// those we have an entry for in the corresponding actor hashmap // those we have an entry for in the corresponding actor hashmap
...@@ -219,8 +231,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -219,8 +231,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
actorToWorker -= worker.actor actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) { for (exec <- worker.executors.values) {
exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None) logInfo("Telling job of lost executor: " + exec.id)
exec.job.executors -= exec.id exec.job.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.job.removeExecutor(exec)
} }
} }
...@@ -259,6 +272,18 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor ...@@ -259,6 +272,18 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
nextJobNumber += 1 nextJobNumber += 1
jobId jobId
} }
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
for (worker <- toRemove) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format(
worker.id, WORKER_TIMEOUT))
removeWorker(worker)
}
}
} }
private[spark] object Master { private[spark] object Master {
......
...@@ -18,6 +18,8 @@ private[spark] class WorkerInfo( ...@@ -18,6 +18,8 @@ private[spark] class WorkerInfo(
var coresUsed = 0 var coresUsed = 0
var memoryUsed = 0 var memoryUsed = 0
var lastHeartbeat = System.currentTimeMillis()
def coresFree: Int = cores - coresUsed def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed def memoryFree: Int = memory - memoryUsed
......
...@@ -2,6 +2,7 @@ package spark.deploy.worker ...@@ -2,6 +2,7 @@ package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import akka.util.duration._
import spark.{Logging, Utils} import spark.{Logging, Utils}
import spark.util.AkkaUtils import spark.util.AkkaUtils
import spark.deploy._ import spark.deploy._
...@@ -26,6 +27,9 @@ private[spark] class Worker( ...@@ -26,6 +27,9 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
var master: ActorRef = null var master: ActorRef = null
var masterWebUiUrl : String = "" var masterWebUiUrl : String = ""
val workerId = generateWorkerId() val workerId = generateWorkerId()
...@@ -97,6 +101,9 @@ private[spark] class Worker( ...@@ -97,6 +101,9 @@ private[spark] class Worker(
case RegisteredWorker(url) => case RegisteredWorker(url) =>
masterWebUiUrl = url masterWebUiUrl = url
logInfo("Successfully registered with master") logInfo("Successfully registered with master")
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
master ! Heartbeat(workerId)
}
case RegisterWorkerFailed(message) => case RegisterWorkerFailed(message) =>
logError("Worker registration failed: " + message) logError("Worker registration failed: " + message)
......
...@@ -12,10 +12,10 @@ class ExecutorLossReason(val message: String) { ...@@ -12,10 +12,10 @@ class ExecutorLossReason(val message: String) {
private[spark] private[spark]
case class ExecutorExited(val exitCode: Int) case class ExecutorExited(val exitCode: Int)
extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) { extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
} }
private[spark] private[spark]
case class SlaveLost(_message: String = "Slave lost") case class SlaveLost(_message: String = "Slave lost")
extends ExecutorLossReason(_message) { extends ExecutorLossReason(_message) {
} }
...@@ -67,6 +67,7 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -67,6 +67,7 @@ private[spark] class SparkDeploySchedulerBackend(
case None => SlaveLost(message) case None => SlaveLost(message)
} }
logInfo("Executor %s removed: %s".format(executorId, message)) logInfo("Executor %s removed: %s".format(executorId, message))
removeExecutor(executorId, reason.toString)
scheduler.executorLost(executorId, reason) scheduler.executorLost(executorId, reason)
} }
} }
...@@ -37,3 +37,6 @@ object StatusUpdate { ...@@ -37,3 +37,6 @@ object StatusUpdate {
// Internal messages in driver // Internal messages in driver
private[spark] case object ReviveOffers extends StandaloneClusterMessage private[spark] case object ReviveOffers extends StandaloneClusterMessage
private[spark] case object StopDriver extends StandaloneClusterMessage private[spark] case object StopDriver extends StandaloneClusterMessage
private[spark] case class RemoveExecutor(executorId: String, reason: String)
extends StandaloneClusterMessage
...@@ -68,6 +68,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor ...@@ -68,6 +68,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
sender ! true sender ! true
context.stop(self) context.stop(self)
case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
sender ! true
case Terminated(actor) => case Terminated(actor) =>
actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated")) actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
...@@ -100,7 +104,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor ...@@ -100,7 +104,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Remove a disconnected slave from the cluster // Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: String) { def removeExecutor(executorId: String, reason: String) {
logInfo("Slave " + executorId + " disconnected, so removing it") logInfo("Executor " + executorId + " disconnected, so removing it")
val numCores = freeCores(executorId) val numCores = freeCores(executorId)
actorToExecutorId -= executorActor(executorId) actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId) addressToExecutorId -= executorAddress(executorId)
...@@ -139,7 +143,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor ...@@ -139,7 +143,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
} }
} catch { } catch {
case e: Exception => case e: Exception =>
throw new SparkException("Error stopping standalone scheduler's master actor", e) throw new SparkException("Error stopping standalone scheduler's driver actor", e)
} }
} }
...@@ -148,6 +152,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor ...@@ -148,6 +152,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
} }
override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2) override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
// Called by backends
def removeExecutor(executorId: String, reason: String) {
try {
val timeout = 5.seconds
val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
Await.result(future, timeout)
} catch {
case e: Exception =>
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
}
}
} }
private[spark] object StandaloneSchedulerBackend { private[spark] object StandaloneSchedulerBackend {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment