Skip to content
Snippets Groups Projects
Commit a02eed68 authored by Reynold Xin's avatar Reynold Xin
Browse files

Ignore a task update status if the executor doesn't exist anymore.

parent 9f7b9bb1
No related branches found
No related tags found
No related merge requests found
...@@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac ...@@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
case StatusUpdate(executorId, taskId, state, data) => case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value) scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) { if (TaskState.isFinished(state)) {
freeCores(executorId) += 1 if (executorActor.contains(executorId)) {
makeOffers(executorId) freeCores(executorId) += 1
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId, state, sender, executorId))
}
} }
case ReviveOffers => case ReviveOffers =>
...@@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac ...@@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
} }
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") private val timeout = {
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
}
def stopExecutors() { def stopExecutors() {
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment