From 6cf335d79a2f69ecd9a139dd0a03acff60585be4 Mon Sep 17 00:00:00 2001
From: Kay Ousterhout <kayousterhout@gmail.com>
Date: Mon, 9 Jun 2014 13:13:53 -0700
Subject: [PATCH] Added a TaskSetManager unit test.

This test ensures that when there are no
alive executors that satisfy a particular locality level,
the TaskSetManager doesn't ever use that as the maximum
allowed locality level (this optimization ensures that a
job doesn't wait extra time in an attempt to satisfy
a scheduling locality level that is impossible).

@mateiz and @lirui-intel this unit test illustrates an issue
with #892 (it fails with that patch).

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #1024 from kayousterhout/scheduler_unit_test and squashes the following commits:

de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
---
 .../spark/scheduler/TaskSetManagerSuite.scala    | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index c92b6dc96c..6f1fd25764 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     assert(sched.finishedManagers.contains(manager))
   }
 
+  test("skip unsatisfiable locality levels") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
+    val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+    // An executor that is not NODE_LOCAL should be rejected.
+    assert(manager.resourceOffer("execC", "host2", ANY) === None)
+
+    // Because there are no alive PROCESS_LOCAL executors, the base locality level should be
+    // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
+    // any of the locality wait timers expire.
+    assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
+  }
+
   test("basic delay scheduling") {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
-- 
GitLab