Skip to content
Snippets Groups Projects
Commit 9d112a95 authored by Iulian Dragos's avatar Iulian Dragos Committed by Sean Owen
Browse files

[SPARK-6286][minor] Handle missing Mesos case TASK_ERROR.

Author: Iulian Dragos <jaguarul@gmail.com>

Closes #5000 from dragos/issue/task-error-case and squashes the following commits:

e063627 [Iulian Dragos] Handle TASK_ERROR in Mesos scheduler backends.
ac17cf0 [Iulian Dragos] Handle missing Mesos case TASK_ERROR.
parent e09c852d
No related branches found
No related tags found
No related merge requests found
......@@ -46,5 +46,6 @@ private[spark] object TaskState extends Enumeration {
case MesosTaskState.TASK_FAILED => FAILED
case MesosTaskState.TASK_KILLED => KILLED
case MesosTaskState.TASK_LOST => LOST
case MesosTaskState.TASK_ERROR => LOST
}
}
......@@ -28,7 +28,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.{Utils, AkkaUtils}
......@@ -262,20 +262,12 @@ private[spark] class CoarseMesosSchedulerBackend(
.build()
}
/** Check whether a Mesos task state represents a finished task */
private def isFinished(state: MesosTaskState) = {
state == MesosTaskState.TASK_FINISHED ||
state == MesosTaskState.TASK_FAILED ||
state == MesosTaskState.TASK_KILLED ||
state == MesosTaskState.TASK_LOST
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo("Mesos task " + taskId + " is now " + state)
synchronized {
if (isFinished(state)) {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
taskIdToSlaveId -= taskId
......
......@@ -313,14 +313,6 @@ private[spark] class MesosSchedulerBackend(
.build()
}
/** Check whether a Mesos task state represents a finished task */
def isFinished(state: MesosTaskState) = {
state == MesosTaskState.TASK_FINISHED ||
state == MesosTaskState.TASK_FAILED ||
state == MesosTaskState.TASK_KILLED ||
state == MesosTaskState.TASK_LOST
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
inClassLoader() {
val tid = status.getTaskId.getValue.toLong
......@@ -330,7 +322,7 @@ private[spark] class MesosSchedulerBackend(
// We lost the executor on this slave, so remember that it's gone
removeExecutor(taskIdToSlaveId(tid), "Lost executor")
}
if (isFinished(status.getState)) {
if (TaskState.isFinished(state)) {
taskIdToSlaveId.remove(tid)
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment