Skip to content
Snippets Groups Projects
Commit 0a89b156 authored by Davies Liu's avatar Davies Liu Committed by Kay Ousterhout
Browse files

[SPARK-4939] move to next locality when no pending tasks

Currently, if there are different locality in a task set, the tasks with NODE_LOCAL only get scheduled after all the PROCESS_LOCAL tasks are scheduled and timeout with spark.locality.wait.process (3 seconds by default). In local mode, the LocalScheduler will never call resourceOffer() again once it failed to get a task with same locality, then all the NODE_LOCAL tasks will be never scheduled.

This bug could be reproduced by run example python/streaming/stateful_network_wordcount.py, it will hang after finished a batch with some data.

This patch will check whether there is task for current locality level, if not, it will change to next locality level without waiting for `spark.locality.wait.process` seconds. It works for all locality levels.

Because the list of pending tasks are updated lazily, the check can be false-positive, it means it will not move to next locality level even there is no valid pending tasks, it will wait for timeout.

Author: Davies Liu <davies@databricks.com>

Closes #3779 from davies/local_streaming and squashes the following commits:

2d25fb3 [Davies Liu] Update TaskSetManager.scala
1550668 [Davies Liu] add comment
1c37aac [Davies Liu] address comments
6b13824 [Davies Liu] address comments
906f456 [Davies Liu] Merge branch 'master' of github.com:apache/spark into local_streaming
414e79e [Davies Liu] fix bug, add logging
ff8eabb [Davies Liu] Merge branch 'master' into local_streaming
28d1b3c [Davies Liu] check tasks
9d0ceab [Davies Liu] Merge branch 'master' of github.com:apache/spark into local_streaming
37a2804 [Davies Liu] fix tests
49bda82 [Davies Liu] address comment
d8fb95a [Davies Liu] move to next locality level if no more tasks
2d6ae73 [Davies Liu] add comments
32d363f [Davies Liu] add regression test
7d8c5a5 [Davies Liu] jump to next locality if no pending tasks for executors
parent f0500f9f
No related branches found
No related tags found
No related merge requests found
......@@ -506,13 +506,64 @@ private[spark] class TaskSetManager(
* Get the level we can launch tasks according to delay scheduling, based on current wait time.
*/
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
currentLocalityIndex < myLocalityLevels.length - 1)
{
// Jump to the next locality level, and remove our waiting time for the current one since
// we don't want to count it again on the next one
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
......
......@@ -314,7 +314,8 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
test("delay scheduling with failed hosts") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"),
("exec3", "host3"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
......@@ -649,6 +650,47 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
}
test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3)
// node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None)
}
test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(),
Seq(ExecutorCacheTaskLocation("host1", "execA")),
Seq(ExecutorCacheTaskLocation("host2", "execB")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2)
// no-pref tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0)
assert(manager.resourceOffer("execA", "host1", ANY) == None)
}
test("Ensure TaskSetManager is usable after addition of levels") {
// Regression test for SPARK-2931
sc = new SparkContext("local", "test")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment