Skip to content
Snippets Groups Projects
Commit d9043092 authored by Imran Rashid's avatar Imran Rashid Committed by Kay Ousterhout
Browse files

[SPARK-18967][SCHEDULER] compute locality levels even if delay = 0

## What changes were proposed in this pull request?

Before this change, with delay scheduling off, spark would effectively
ignore locality preferences for bulk scheduling.  With this change,
locality preferences are used when multiple offers are made
simultaneously.

## How was this patch tested?

Test case added which fails without this change.  All unit tests run via jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #16376 from squito/locality_without_delay.
parent 7a0a630e
No related branches found
No related tags found
No related merge requests found
......@@ -54,7 +54,7 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
private[spark] class TaskSchedulerImpl private[scheduler](
val sc: SparkContext,
val maxTaskFailures: Int,
blacklistTrackerOpt: Option[BlacklistTracker],
private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
......@@ -337,8 +337,7 @@ private[spark] class TaskSchedulerImpl private[scheduler](
}
}.getOrElse(offers)
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(filteredOffers)
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
......@@ -375,6 +374,14 @@ private[spark] class TaskSchedulerImpl private[scheduler](
return tasks
}
/**
* Shuffle offers around to avoid always placing tasks on the same workers. Exposed to allow
* overriding in tests, so it can be deterministic.
*/
protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
Random.shuffle(offers)
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
......
......@@ -163,7 +163,12 @@ private[spark] class TaskSetManager(
addPendingTask(i)
}
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
/**
* Track the set of locality levels which are valid given the tasks locality preferences and
* the set of currently available executors. This is updated as executors are added and removed.
* This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
* PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
*/
var myLocalityLevels = computeValidLocalityLevels()
var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
......@@ -961,18 +966,18 @@ private[spark] class TaskSetManager(
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
if (!pendingTasksForExecutor.isEmpty &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
if (!pendingTasksForHost.isEmpty &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksWithNoPrefs.isEmpty) {
levels += NO_PREF
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
if (!pendingTasksForRack.isEmpty &&
pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
levels += RACK_LOCAL
}
......
......@@ -29,7 +29,7 @@ import org.scalatest.mock.MockitoSugar
import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.Logging
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.ManualClock
class FakeSchedulerBackend extends SchedulerBackend {
def start() {}
......@@ -819,4 +819,89 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!taskScheduler.hasExecutorsAliveOnHost("host0"))
assert(taskScheduler.getExecutorsAliveOnHost("host0").isEmpty)
}
test("Locality should be used for bulk offers even with delay scheduling off") {
val conf = new SparkConf()
.set("spark.locality.wait", "0")
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
// we create a manual clock just so we can be sure the clock doesn't advance at all in this test
val clock = new ManualClock()
// We customize the task scheduler just to let us control the way offers are shuffled, so we
// can be sure we try both permutations, and to control the clock on the tasksetmanager.
val taskScheduler = new TaskSchedulerImpl(sc) {
override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
// Don't shuffle the offers around for this test. Instead, we'll just pass in all
// the permutations we care about directly.
offers
}
override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock)
}
}
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
taskScheduler.initialize(new FakeSchedulerBackend)
// Make two different offers -- one in the preferred location, one that is not.
val offers = IndexedSeq(
WorkerOffer("exec1", "host1", 1),
WorkerOffer("exec2", "host2", 1)
)
Seq(false, true).foreach { swapOrder =>
// Submit a taskset with locality preferences.
val taskSet = FakeTask.createTaskSet(
1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1")))
taskScheduler.submitTasks(taskSet)
val shuffledOffers = if (swapOrder) offers.reverse else offers
// Regardless of the order of the offers (after the task scheduler shuffles them), we should
// always take advantage of the local offer.
val taskDescs = taskScheduler.resourceOffers(shuffledOffers).flatten
withClue(s"swapOrder = $swapOrder") {
assert(taskDescs.size === 1)
assert(taskDescs.head.executorId === "exec1")
}
}
}
test("With delay scheduling off, tasks can be run at any locality level immediately") {
val conf = new SparkConf()
.set("spark.locality.wait", "0")
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
// we create a manual clock just so we can be sure the clock doesn't advance at all in this test
val clock = new ManualClock()
val taskScheduler = new TaskSchedulerImpl(sc) {
override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock)
}
}
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
taskScheduler.initialize(new FakeSchedulerBackend)
// make an offer on the preferred host so the scheduler knows its alive. This is necessary
// so that the taskset knows that it *could* take advantage of locality.
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
// Submit a taskset with locality preferences.
val taskSet = FakeTask.createTaskSet(
1, stageId = 1, stageAttemptId = 0, Seq(TaskLocation("host1", "exec1")))
taskScheduler.submitTasks(taskSet)
val tsm = taskScheduler.taskSetManagerForAttempt(1, 0).get
// make sure we've setup our test correctly, so that the taskset knows it *could* use local
// offers.
assert(tsm.myLocalityLevels.contains(TaskLocality.NODE_LOCAL))
// make an offer on a non-preferred location. Since the delay is 0, we should still schedule
// immediately.
val taskDescs =
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec2", "host2", 1))).flatten
assert(taskDescs.size === 1)
assert(taskDescs.head.executorId === "exec2")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment