Skip to content
Snippets Groups Projects
Commit 28215891 authored by Imran Rashid's avatar Imran Rashid
Browse files

[SPARK-16136][CORE] Fix flaky TaskManagerSuite

## What changes were proposed in this pull request?

TaskManagerSuite "Kill other task attempts when one attempt belonging to the same task succeeds" was flaky.  When checking whether a task is speculatable, at least one millisecond must pass since the task was submitted.  Use a manual clock to avoid the problem.

I noticed these tests were leaving lots of threads lying around as well (which prevented me from running the test repeatedly), so I fixed that too.

## How was this patch tested?

Ran the test 1k times on my laptop, passed every time (it failed about 20% of the time before this).

Author: Imran Rashid <irashid@cloudera.com>

Closes #13848 from squito/fix_flaky_taskmanagersuite.
parent 1aa191e5
No related branches found
No related tags found
No related merge requests found
......@@ -157,14 +157,27 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
val MAX_TASK_FAILURES = 4
override def beforeEach() {
var sched: FakeTaskScheduler = null
override def beforeEach(): Unit = {
super.beforeEach()
FakeRackUtil.cleanUp()
sched = null
}
override def afterEach(): Unit = {
super.afterEach()
if (sched != null) {
sched.dagScheduler.stop()
sched.stop()
sched = null
}
}
test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
......@@ -183,7 +196,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("multiple offers with no preferences") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(3)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
......@@ -217,7 +230,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("skip unsatisfiable locality levels") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
......@@ -233,7 +246,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("basic delay scheduling") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec2")),
......@@ -263,7 +276,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec3")),
......@@ -280,7 +293,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("delay scheduling with fallback") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
sched = new FakeTaskScheduler(sc,
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(5,
Seq(TaskLocation("host1")),
......@@ -320,7 +333,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
......@@ -357,7 +370,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("task result lost") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
......@@ -374,7 +387,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("repeated failures lead to task set abortion") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
......@@ -404,7 +417,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
sc = new SparkContext("local", "test", conf)
// two executors on same host, one on different.
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
("exec1.1", "host1"), ("exec2", "host2"))
// affinity to exec1 on host1 - which we will fail.
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
......@@ -486,7 +499,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Assign host2 to rack2
FakeRackUtil.assignHostToRack("host2", "rack2")
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execB")),
......@@ -518,7 +531,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Executors exit for reason unrelated to currently running tasks") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host1", "execB")),
......@@ -551,7 +564,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
// Assign host3 to rack2
FakeRackUtil.assignHostToRack("host3", "rack2")
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
sched = new FakeTaskScheduler(sc,
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
......@@ -574,7 +587,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("do not emit warning when serialized task is small") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
......@@ -587,7 +600,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("emit warning when serialized task is large") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
......@@ -601,7 +614,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Not serializable exception thrown if the task cannot be serialized") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = new TaskSet(
Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null)
......@@ -640,7 +653,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(
sched = new FakeTaskScheduler(
sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
......@@ -668,7 +681,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("node-local tasks should be scheduled right away " +
"when there are only node-local and no-preference tasks") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(
sched = new FakeTaskScheduler(
sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
......@@ -691,7 +704,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished")
{
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
......@@ -712,7 +725,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
......@@ -733,7 +746,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Ensure TaskSetManager is usable after addition of levels") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc)
sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(2,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2", "execB.1")))
......@@ -765,7 +778,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Test that locations with HDFSCacheTaskLocation are treated as PROCESS_LOCAL.") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc,
sched = new FakeTaskScheduler(sc,
("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
......@@ -793,11 +806,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Kill other task attempts when one attempt belonging to the same task succeeds") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val taskSet = FakeTask.createTaskSet(4)
// Set the speculation multiplier to be 0 so speculative tasks are launched immediately
sc.conf.set("spark.speculation.multiplier", "0.0")
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
task.metrics.internalAccums
}
......@@ -819,6 +833,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(sched.endedTasks(id) === Success)
}
// checkSpeculatableTasks checks that the task runtime is greater than the threshold for
// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
// > 0ms, so advance the clock by 1ms here.
clock.advance(1)
assert(manager.checkSpeculatableTasks(0))
// Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment