Skip to content
Snippets Groups Projects
Commit 0dcad2ed authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

Added additional unit test for repeated task failures

parent dea4677c
No related branches found
No related tags found
No related merge requests found
...@@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ...@@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
val startedTasks = new ArrayBuffer[Long] val startedTasks = new ArrayBuffer[Long]
val endedTasks = new mutable.HashMap[Long, TaskEndReason] val endedTasks = new mutable.HashMap[Long, TaskEndReason]
val finishedManagers = new ArrayBuffer[TaskSetManager] val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
val executors = new mutable.HashMap[String, String] ++ liveExecutors val executors = new mutable.HashMap[String, String] ++ liveExecutors
...@@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ...@@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
def executorLost(execId: String) {} def executorLost(execId: String) {}
def taskSetFailed(taskSet: TaskSet, reason: String) {} def taskSetFailed(taskSet: TaskSet, reason: String) {
taskSetsFailed += taskSet.id
}
} }
def removeExecutor(execId: String): Unit = executors -= execId def removeExecutor(execId: String): Unit = executors -= execId
...@@ -270,6 +273,30 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo ...@@ -270,6 +273,30 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
} }
test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
val taskSet = createTaskSet(1)
val clock = new FakeClock
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
(0 until manager.MAX_TASK_FAILURES).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
assert(offerResult != None,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
if (index < manager.MAX_TASK_FAILURES) {
assert(!sched.taskSetsFailed.contains(taskSet.id))
} else {
assert(sched.taskSetsFailed.contains(taskSet.id))
}
}
}
/** /**
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
* locations for each task (given as varargs) if this sequence is not empty. * locations for each task (given as varargs) if this sequence is not empty.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment