diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 140c52fd12f9466b7678dc65157f691354e942a5..3583856d88998ea470603d256c566440107ee99e 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -139,6 +139,11 @@ public class SparkFirehoseListener implements SparkListenerInterface { onEvent(blockUpdated); } + @Override + public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted speculativeTask) { + onEvent(speculativeTask); + } + @Override public void onOtherEvent(SparkListenerEvent event) { onEvent(event); diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 337631a6f9a343404eb8cb2b798025178067b060..33503260bbe02df850556fa13ff43339288a8a42 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -373,8 +373,14 @@ private[spark] class ExecutorAllocationManager( // If our target has not changed, do not send a message // to the cluster manager and reset our exponential growth if (delta == 0) { - numExecutorsToAdd = 1 - return 0 + // Check if there is any speculative jobs pending + if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) { + numExecutorsTarget = + math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors) + } else { + numExecutorsToAdd = 1 + return 0 + } } val addRequestAcknowledged = try { @@ -588,17 +594,22 @@ private[spark] class ExecutorAllocationManager( * A listener that notifies the given allocation manager of when to add and remove executors. * * This class is intentionally conservative in its assumptions about the relative ordering - * and consistency of events returned by the listener. For simplicity, it does not account - * for speculated tasks. + * and consistency of events returned by the listener. */ private class ExecutorAllocationListener extends SparkListener { private val stageIdToNumTasks = new mutable.HashMap[Int, Int] private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]] - // Number of tasks currently running on the cluster. Should be 0 when no stages are active. + // Number of tasks currently running on the cluster including speculative tasks. + // Should be 0 when no stages are active. private var numRunningTasks: Int = _ + // Number of speculative tasks to be scheduled in each stage + private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int] + // The speculative tasks started in each stage + private val stageIdToSpeculativeTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]] + // stageId to tuple (the number of task with locality preferences, a map where each pair is a // node and the number of tasks that would like to be scheduled on that node) map, // maintain the executor placement hints for each stage Id used by resource framework to better @@ -637,7 +648,9 @@ private[spark] class ExecutorAllocationManager( val stageId = stageCompleted.stageInfo.stageId allocationManager.synchronized { stageIdToNumTasks -= stageId + stageIdToNumSpeculativeTasks -= stageId stageIdToTaskIndices -= stageId + stageIdToSpeculativeTaskIndices -= stageId stageIdToExecutorPlacementHints -= stageId // Update the executor placement hints @@ -645,7 +658,7 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason - if (stageIdToNumTasks.isEmpty) { + if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) { allocationManager.onSchedulerQueueEmpty() if (numRunningTasks != 0) { logWarning("No stages are running, but numRunningTasks != 0") @@ -671,7 +684,12 @@ private[spark] class ExecutorAllocationManager( } // If this is the last pending task, mark the scheduler queue as empty - stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex + if (taskStart.taskInfo.speculative) { + stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += + taskIndex + } else { + stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex + } if (totalPendingTasks() == 0) { allocationManager.onSchedulerQueueEmpty() } @@ -705,7 +723,11 @@ private[spark] class ExecutorAllocationManager( if (totalPendingTasks() == 0) { allocationManager.onSchedulerBacklogged() } - stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) } + if (taskEnd.taskInfo.speculative) { + stageIdToSpeculativeTaskIndices.get(stageId).foreach {_.remove(taskIndex)} + } else { + stageIdToTaskIndices.get(stageId).foreach {_.remove(taskIndex)} + } } } } @@ -726,18 +748,39 @@ private[spark] class ExecutorAllocationManager( allocationManager.onExecutorRemoved(executorRemoved.executorId) } + override def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted) + : Unit = { + val stageId = speculativeTask.stageId + + allocationManager.synchronized { + stageIdToNumSpeculativeTasks(stageId) = + stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 + allocationManager.onSchedulerBacklogged() + } + } + /** * An estimate of the total number of pending tasks remaining for currently running stages. Does * not account for tasks which may have failed and been resubmitted. * * Note: This is not thread-safe without the caller owning the `allocationManager` lock. */ - def totalPendingTasks(): Int = { + def pendingTasks(): Int = { stageIdToNumTasks.map { case (stageId, numTasks) => numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0) }.sum } + def pendingSpeculativeTasks(): Int = { + stageIdToNumSpeculativeTasks.map { case (stageId, numTasks) => + numTasks - stageIdToSpeculativeTaskIndices.get(stageId).map(_.size).getOrElse(0) + }.sum + } + + def totalPendingTasks(): Int = { + pendingTasks + pendingSpeculativeTasks + } + /** * The number of tasks currently running across all stages. */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 21bf9d013ebefc4e68a97273c2b752c624601f07..562dd1da4fe142ff0825be8b24909a2c7cfebb10 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -281,6 +281,13 @@ class DAGScheduler( eventProcessLoop.post(TaskSetFailed(taskSet, reason, exception)) } + /** + * Called by the TaskSetManager when it decides a speculative task is needed. + */ + def speculativeTaskSubmitted(task: Task[_]): Unit = { + eventProcessLoop.post(SpeculativeTaskSubmitted(task)) + } + private[scheduler] def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized { // Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times @@ -812,6 +819,10 @@ class DAGScheduler( listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) } + private[scheduler] def handleSpeculativeTaskSubmitted(task: Task[_]): Unit = { + listenerBus.post(SparkListenerSpeculativeTaskSubmitted(task.stageId)) + } + private[scheduler] def handleTaskSetFailed( taskSet: TaskSet, reason: String, @@ -1778,6 +1789,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) + case SpeculativeTaskSubmitted(task) => + dagScheduler.handleSpeculativeTaskSubmitted(task) + case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala index 3f8d5639a2b908591c59cc3e4956285c1a20b614..54ab8f8b3e1d828d2b9e8ecb74ca439a33e2417c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala @@ -94,3 +94,7 @@ case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Thr extends DAGSchedulerEvent private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent + +private[scheduler] +case class SpeculativeTaskSubmitted(task: Task[_]) extends DAGSchedulerEvent + diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 59f89a82a1da85d563d81625b045bd85f8b9af73..b76e560669d596ab8aa62bfbffdb3968818e92b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -52,6 +52,9 @@ case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: T @DeveloperApi case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerSpeculativeTaskSubmitted(stageId: Int) extends SparkListenerEvent + @DeveloperApi case class SparkListenerTaskEnd( stageId: Int, @@ -290,6 +293,11 @@ private[spark] trait SparkListenerInterface { */ def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit + /** + * Called when a speculative task is submitted + */ + def onSpeculativeTaskSubmitted(speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit + /** * Called when other events like SQL-specific events are posted. */ @@ -354,5 +362,8 @@ abstract class SparkListener extends SparkListenerInterface { override def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated): Unit = { } + override def onSpeculativeTaskSubmitted( + speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { } + override def onOtherEvent(event: SparkListenerEvent): Unit = { } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 3b0d3b1b150feaf779e3eabe1d48b740d3602771..056c0cbded4357d2de72ee0706aeaf936d3e744d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -71,6 +71,8 @@ private[spark] trait SparkListenerBus listener.onNodeUnblacklisted(nodeUnblacklisted) case blockUpdated: SparkListenerBlockUpdated => listener.onBlockUpdated(blockUpdated) + case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => + listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) case _ => listener.onOtherEvent(event) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c2f817858473c4833ff1885a5d38f5b2982b404b..3804ea863b4f9c33605500bc33d521e6f2798710 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -966,6 +966,7 @@ private[spark] class TaskSetManager( "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms" .format(index, taskSet.id, info.host, threshold)) speculatableTasks += index + sched.dagScheduler.speculativeTaskSubmitted(tasks(index)) foundTasks = true } } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index b9ce71a0c52546c61dc89e15b93d71f26952a10e..7da4bae0ab7ebc3ee9d12aeb726ac558d6491fa6 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -188,6 +188,40 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } + test("add executors when speculative tasks added") { + sc = createSparkContext(0, 10, 0) + val manager = sc.executorAllocationManager.get + + // Verify that we're capped at number of tasks including the speculative ones in the stage + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) + assert(numExecutorsTarget(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + assert(addExecutors(manager) === 1) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) + sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) + sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + assert(numExecutorsTarget(manager) === 1) + assert(numExecutorsToAdd(manager) === 2) + assert(addExecutors(manager) === 2) + assert(numExecutorsTarget(manager) === 3) + assert(numExecutorsToAdd(manager) === 4) + assert(addExecutors(manager) === 2) + assert(numExecutorsTarget(manager) === 5) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that running a task doesn't affect the target + sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + assert(numExecutorsTarget(manager) === 5) + assert(addExecutors(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + + // Verify that running a speculative task doesn't affect the target + sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) + assert(numExecutorsTarget(manager) === 5) + assert(addExecutors(manager) === 0) + assert(numExecutorsToAdd(manager) === 1) + } + test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get @@ -1031,10 +1065,15 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { taskLocalityPreferences = taskLocalityPreferences) } - private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = { - new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative = false) + private def createTaskInfo( + taskId: Int, + taskIndex: Int, + executorId: String, + speculative: Boolean = false): TaskInfo = { + new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, speculative) } + /* ------------------------------------------------------- * | Helper methods for accessing private methods and fields | * ------------------------------------------------------- */ @@ -1061,6 +1100,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy) private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks) private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount) + private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted) private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = { manager invokePrivate _numExecutorsToAdd() @@ -1136,6 +1176,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { manager invokePrivate _onExecutorBusy(id) } + private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, id: String) : Unit = { + manager invokePrivate _onSpeculativeTaskSubmitted(id) + } + private def localityAwareTasks(manager: ExecutorAllocationManager): Int = { manager invokePrivate _localityAwareTasks() } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 6f1663b2109693653915424f945bc46f0a0d5eb5..ae43f4cadc0370bf79b01823424c061e844bb372 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -60,6 +60,10 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler) exception: Option[Throwable]): Unit = { taskScheduler.taskSetsFailed += taskSet.id } + + override def speculativeTaskSubmitted(task: Task[_]): Unit = { + taskScheduler.speculativeTasks += task.partitionId + } } // Get the rack for a given host @@ -92,6 +96,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex val endedTasks = new mutable.HashMap[Long, TaskEndReason] val finishedManagers = new ArrayBuffer[TaskSetManager] val taskSetsFailed = new ArrayBuffer[String] + val speculativeTasks = new ArrayBuffer[Int] val executors = new mutable.HashMap[String, String] for ((execId, host) <- liveExecutors) { @@ -139,6 +144,7 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex } } + override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value) } @@ -929,6 +935,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // > 0ms, so advance the clock by 1ms here. clock.advance(1) assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(3)) + // Offer resource to start the speculative attempt for the running task val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption5.isDefined) @@ -1016,6 +1024,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg // > 0ms, so advance the clock by 1ms here. clock.advance(1) assert(manager.checkSpeculatableTasks(0)) + assert(sched.speculativeTasks.toSet === Set(3, 4)) // Offer resource to start the speculative attempt for the running task val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) assert(taskOption5.isDefined)