Skip to content
Snippets Groups Projects
Commit 63bdb1f4 authored by CodingCat's avatar CodingCat Committed by Matei Zaharia
Browse files

SPARK-2294: fix locality inversion bug in TaskManager

copied from original JIRA (https://issues.apache.org/jira/browse/SPARK-2294):

If an executor E is free, a task may be speculatively assigned to E when there are other tasks in the job that have not been launched (at all) yet. Similarly, a task without any locality preferences may be assigned to E when there was another NODE_LOCAL task that could have been scheduled.
This happens because TaskSchedulerImpl calls TaskSetManager.resourceOffer (which in turn calls TaskSetManager.findTask) with increasing locality levels, beginning with PROCESS_LOCAL, followed by NODE_LOCAL, and so on until the highest currently allowed level. Now, supposed NODE_LOCAL is the highest currently allowed locality level. The first time findTask is called, it will be called with max level PROCESS_LOCAL; if it cannot find any PROCESS_LOCAL tasks, it will try to schedule tasks with no locality preferences or speculative tasks. As a result, speculative tasks or tasks with no preferences may be scheduled instead of NODE_LOCAL tasks.

----

I added an additional parameter in resourceOffer and findTask, maxLocality, indicating when we should consider the tasks without locality preference

Author: CodingCat <zhunansjtu@gmail.com>

Closes #1313 from CodingCat/SPARK-2294 and squashes the following commits:

bf3f13b [CodingCat] rollback some forgotten changes
89f9bc0 [CodingCat] address matei's comments
18cae02 [CodingCat] add test case for node-local tasks
2ba6195 [CodingCat] fix failed test cases
87dd09e [CodingCat] fix style
9b9432f [CodingCat] remove hasNodeLocalOnlyTasks
fdd1573 [CodingCat] fix failed test cases
941a4fd [CodingCat] see my shocked face..........
f600085 [CodingCat] remove hasNodeLocalOnlyTasks checking
0b8a46b [CodingCat] test whether hasNodeLocalOnlyTasks affect the results
73ceda8 [CodingCat] style fix
b3a430b [CodingCat] remove fine granularity tracking for node-local only tasks
f9a2ad8 [CodingCat] simplify the logic in TaskSchedulerImpl
c8c1de4 [CodingCat] simplify the patch
be652ed [CodingCat] avoid unnecessary delay when we only have nopref tasks
dee9e22 [CodingCat] fix locality inversion bug in TaskManager by moving nopref branch
parent 5a826c00
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
object TaskLocality extends Enumeration {
// Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
val PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY = Value
type TaskLocality = Value
......
......@@ -89,11 +89,11 @@ private[spark] class TaskSchedulerImpl(
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
private val executorsByHost = new HashMap[String, HashSet[String]]
protected val executorsByHost = new HashMap[String, HashSet[String]]
protected val hostsByRack = new HashMap[String, HashSet[String]]
private val executorIdToHost = new HashMap[String, String]
protected val executorIdToHost = new HashMap[String, String]
// Listener object to pass upcalls into
var dagScheduler: DAGScheduler = null
......@@ -249,6 +249,7 @@ private[spark] class TaskSchedulerImpl(
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
......@@ -265,7 +266,7 @@ private[spark] class TaskSchedulerImpl(
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
assert(availableCpus(i) >= 0)
launchedTask = true
}
}
......
......@@ -79,6 +79,7 @@ private[spark] class TaskSetManager(
private val numFailures = new Array[Int](numTasks)
// key is taskId, value is a Map of executor id to when it failed
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0
......@@ -179,26 +180,17 @@ private[spark] class TaskSetManager(
}
}
var hadAliveLocations = false
for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
}
if (sched.hasExecutorsAliveOnHost(loc.host)) {
hadAliveLocations = true
}
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
if(sched.hasHostAliveOnRack(rack)){
hadAliveLocations = true
}
}
}
if (!hadAliveLocations) {
// Even though the task might've had preferred locations, all of those hosts or executors
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
if (tasks(index).preferredLocations == Nil) {
addTo(pendingTasksWithNoPrefs)
}
......@@ -239,7 +231,6 @@ private[spark] class TaskSetManager(
*/
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
var indexOffset = list.size
while (indexOffset > 0) {
indexOffset -= 1
val index = list(indexOffset)
......@@ -288,12 +279,12 @@ private[spark] class TaskSetManager(
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
// Check for process-local tasks; note that tasks can be process-local
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
for (index <- speculatableTasks if canRunOnHost(index)) {
val prefs = tasks(index).preferredLocations
val executors = prefs.flatMap(_.executorId)
if (prefs.size == 0 || executors.contains(execId)) {
if (executors.contains(execId)) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
......@@ -310,6 +301,17 @@ private[spark] class TaskSetManager(
}
}
// Check for no-preference tasks
if (TaskLocality.isAllowed(locality, TaskLocality.NO_PREF)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
val locations = tasks(index).preferredLocations
if (locations.size == 0) {
speculatableTasks -= index
return Some((index, TaskLocality.PROCESS_LOCAL))
}
}
}
// Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
......@@ -341,20 +343,27 @@ private[spark] class TaskSetManager(
*
* @return An option containing (task index within the task set, locality, is speculative?)
*/
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
private def findTask(execId: String, host: String, maxLocality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value, Boolean)] =
{
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL, false))
}
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
// Look for noPref tasks after NODE_LOCAL for minimize cross-rack traffic
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
}
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
......@@ -363,25 +372,27 @@ private[spark] class TaskSetManager(
}
}
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL, false))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY, false))
}
}
// Finally, if all else has failed, find a speculative task
findSpeculativeTask(execId, host, locality).map { case (taskIndex, allowedLocality) =>
(taskIndex, allowedLocality, true)
}
// find a speculative task if all others tasks have been scheduled
findSpeculativeTask(execId, host, maxLocality).map {
case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}
}
/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
*
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*/
def resourceOffer(
execId: String,
......@@ -392,9 +403,14 @@ private[spark] class TaskSetManager(
if (!isZombie) {
val curTime = clock.getTime()
var allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
findTask(execId, host, allowedLocality) match {
......@@ -410,8 +426,11 @@ private[spark] class TaskSetManager(
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
val startTime = clock.getTime()
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
......@@ -639,8 +658,7 @@ private[spark] class TaskSetManager(
override def executorLost(execId: String, host: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
// Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
// task that used to have locations on only this host might now go to the no-prefs list. Note
// Re-enqueue pending tasks for this host based on the status of the cluster. Note
// that it's okay if we add a task to the same queue twice (if it had multiple preferred
// locations), because findTaskFromList will skip already-running tasks.
for (index <- getPendingTasksForExecutor(execId)) {
......@@ -671,6 +689,9 @@ private[spark] class TaskSetManager(
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure)
}
// recalculate valid locality levels and waits when executor is lost
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}
/**
......@@ -722,17 +743,17 @@ private[spark] class TaskSetManager(
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
case _ => 0L
}
}
/**
* Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
* added to queues using addPendingTask.
*
*/
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
......@@ -742,6 +763,9 @@ private[spark] class TaskSetManager(
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
......@@ -751,20 +775,7 @@ private[spark] class TaskSetManager(
levels.toArray
}
// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
def executorAdded() {
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
(sched.getRackForHost(loc.host).isDefined &&
sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
return true
}
}
false
}
logInfo("Re-computing pending task lists.")
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
myLocalityLevels = computeValidLocalityLevels()
localityWaits = myLocalityLevels.map(getLocalityWait)
}
......
......@@ -85,14 +85,31 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
val finishedManagers = new ArrayBuffer[TaskSetManager]
val taskSetsFailed = new ArrayBuffer[String]
val executors = new mutable.HashMap[String, String] ++ liveExecutors
val executors = new mutable.HashMap[String, String]
for ((execId, host) <- liveExecutors) {
addExecutor(execId, host)
}
for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
}
dagScheduler = new FakeDAGScheduler(sc, this)
def removeExecutor(execId: String): Unit = executors -= execId
def removeExecutor(execId: String) {
executors -= execId
val host = executorIdToHost.get(execId)
assert(host != None)
val hostId = host.get
val executorsOnHost = executorsByHost(hostId)
executorsOnHost -= execId
for (rack <- getRackForHost(hostId); hosts <- hostsByRack.get(rack)) {
hosts -= hostId
if (hosts.isEmpty) {
hostsByRack -= rack
}
}
}
override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
......@@ -100,8 +117,15 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
override def hasHostAliveOnRack(rack: String): Boolean = {
hostsByRack.get(rack) != None
}
def addExecutor(execId: String, host: String) {
executors.put(execId, host)
val executorsOnHost = executorsByHost.getOrElseUpdate(host, new mutable.HashSet[String])
executorsOnHost += execId
executorIdToHost += execId -> host
for (rack <- getRackForHost(host)) {
hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
}
......@@ -123,7 +147,7 @@ class LargeTask(stageId: Int) extends Task[Array[Byte]](stageId, 0) {
}
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
import TaskLocality.{ANY, PROCESS_LOCAL, NO_PREF, NODE_LOCAL, RACK_LOCAL}
private val conf = new SparkConf
......@@ -134,18 +158,13 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// Offer a host with process-local as the constraint; this should work because the TaskSet
// above won't have any locality preferences
val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
// Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have
var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
assert(sched.startedTasks.contains(0))
// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0))
......@@ -161,7 +180,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First three offers should all find tasks
for (i <- 0 until 3) {
val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
var taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
......@@ -169,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.startedTasks.toSet === Set(0, 1, 2))
// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None)
// Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0))
......@@ -211,37 +230,40 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1, exec1 again: the last task, which has no prefs, should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
clock.advance(LOCALITY_WAIT)
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
// Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 2) should
// get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
// Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
// Offer host1, exec1 again, at ANY level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
// Offer host2, exec3 again, at NODE_LOCAL level: we should choose task 2
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1)
// Offer host2, exec3 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None)
clock.advance(LOCALITY_WAIT)
assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3)
}
// Offer host1, exec1 again, at ANY level: task 1 should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec3", "host2"))
val taskSet = FakeTask.createTaskSet(3,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host2", "exec3")),
Seq() // Last task has no locality prefs
)
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None)
assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2)
}
test("delay scheduling with fallback") {
......@@ -298,20 +320,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1 again: third task should be chosen immediately because host3 is not up
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
// After this, nothing should get chosen
// After this, nothing should get chosen, because we have separated tasks with unavailable preference
// from the noPrefPendingTasks
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
// Now mark host2 as dead
sched.removeExecutor("exec2")
manager.executorLost("exec2", "host2")
// Task 1 should immediately be launched on host1 because its original host is gone
// nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
clock.advance(LOCALITY_WAIT * 2)
// task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
// Now that all tasks have launched, nothing new should be launched anywhere else
// all finished
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
}
......@@ -373,7 +399,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
......@@ -384,15 +410,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1 fails after failure 1 due to blacklist
assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty)
}
// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL)
val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
......@@ -404,12 +430,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty)
}
// Run the task on exec2 - should work, and then fail it on exec2
{
val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY)
val offerResult = manager.resourceOffer("exec2", "host2", ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
......@@ -420,20 +446,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec2 fails after failure 3 due to blacklist
assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty)
assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty)
}
// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)
{
val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")
assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty)
// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
......@@ -443,7 +469,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.taskSetsFailed.contains(taskSet.id))
}
test("new executors get added") {
test("new executors get added and lost") {
// Assign host2 to rack2
FakeRackUtil.cleanUp()
FakeRackUtil.assignHostToRack("host2", "rack2")
......@@ -456,26 +482,25 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
Seq())
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// All tasks added to no-pref list since no preferred location is available
assert(manager.pendingTasksWithNoPrefs.size === 4)
// Only ANY is valid
assert(manager.myLocalityLevels.sameElements(Array(ANY)))
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
// Add a new executor
sched.addExecutor("execD", "host1")
manager.executorAdded()
// Task 0 and 1 should be removed from no-pref list
assert(manager.pendingTasksWithNoPrefs.size === 2)
// Valid locality should contain NODE_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, ANY)))
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
// Add another executor
sched.addExecutor("execC", "host2")
manager.executorAdded()
// No-pref list now only contains task 3
assert(manager.pendingTasksWithNoPrefs.size === 1)
// Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
assert(manager.myLocalityLevels.sameElements(
Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
FakeRackUtil.cleanUp()
assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
// test if the valid locality is recomputed when the executor is lost
sched.removeExecutor("execC")
manager.executorLost("execC", "host2")
assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
sched.removeExecutor("execD")
manager.executorLost("execD", "host1")
assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
}
test("test RACK_LOCAL tasks") {
......@@ -506,7 +531,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Offer host2
// Task 1 can be scheduled with RACK_LOCAL
assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
FakeRackUtil.cleanUp()
}
test("do not emit warning when serialized task is small") {
......@@ -536,6 +560,53 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(manager.emittedTaskSizeWarning)
}
test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1", "execA")),
Seq(TaskLocation("host2"), TaskLocation("host1")),
Seq(),
Seq(TaskLocation("host3", "execC")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1)
manager.speculatableTasks += 1
clock.advance(LOCALITY_WAIT)
// schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2)
// schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1)
clock.advance(LOCALITY_WAIT * 3)
// schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3)
}
test("node-local tasks should be scheduled right away when there are only node-local and no-preference tasks") {
sc = new SparkContext("local", "test")
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
val taskSet = FakeTask.createTaskSet(4,
Seq(TaskLocation("host1")),
Seq(TaskLocation("host2")),
Seq(),
Seq(TaskLocation("host3")))
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1)
assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3)
assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None)
// schedule no-preference after node local ones
assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2)
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
val valueSer = SparkEnv.get.serializer.newInstance()
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment