diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 30bceb47b9e7dd48dea86f746c1d7d414711f5b5..a92922166f595bee9effbe747dda959eb186d514 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -62,6 +62,9 @@ private[spark] class TaskSchedulerImpl( // Threshold above which we warn user initial TaskSet may be starved val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000) + // CPUs to request per task + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. val activeTaskSets = new HashMap[String, TaskSetManager] @@ -228,16 +231,18 @@ private[spark] class TaskSchedulerImpl( for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host - for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) { - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetId(tid) = taskSet.taskSet.id - taskIdToExecutorId(tid) = execId - activeExecutorIds += execId - executorsByHost(host) += execId - availableCpus(i) -= taskSet.CPUS_PER_TASK - assert (availableCpus(i) >= 0) - launchedTask = true + if (availableCpus(i) >= CPUS_PER_TASK) { + for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetId(tid) = taskSet.taskSet.id + taskIdToExecutorId(tid) = execId + activeExecutorIds += execId + executorsByHost(host) += execId + availableCpus(i) -= CPUS_PER_TASK + assert (availableCpus(i) >= 0) + launchedTask = true + } } } } while (launchedTask) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a73343c1c08260bfa8e2dd35b83d96193fcbd4f7..86d2050a03f181d6cda7ec27c9c5830abc25fe0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -56,9 +56,6 @@ private[spark] class TaskSetManager( { val conf = sched.sc.conf - // CPUs to request per task - val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) - /* * Sometimes if an executor is dead or in an otherwise invalid state, the driver * does not realize right away leading to repeated task failures. If enabled, @@ -384,11 +381,10 @@ private[spark] class TaskSetManager( def resourceOffer( execId: String, host: String, - availableCpus: Int, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { - if (!isZombie && availableCpus >= CPUS_PER_TASK) { + if (!isZombie) { val curTime = clock.getTime() var allowedLocality = getAllowedLocalityLevel(curTime) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index fad03731572e7c6532670645b54608c24a61edc4..990e01a3e79592685c9c52b05481cba13bc79542 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -89,7 +89,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { if (executorActor.contains(executorId)) { - freeCores(executorId) += 1 + freeCores(executorId) += scheduler.CPUS_PER_TASK makeOffers(executorId) } else { // Ignoring the update since we don't know about the executor. @@ -140,7 +140,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A // Launch tasks returned by a set of resource offers def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { - freeCores(task.executorId) -= 1 + freeCores(task.executorId) -= scheduler.CPUS_PER_TASK executorActor(task.executorId) ! LaunchTask(task) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 4092dd04b112b9ed2456546c412a3b49e1d80293..dfdcafe19fb93e337702cc48253173589616e214 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -246,7 +246,7 @@ private[spark] class MesosSchedulerBackend( val cpuResource = Resource.newBuilder() .setName("cpus") .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(1).build()) + .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) .build() MesosTaskInfo.newBuilder() .setTaskId(taskId) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9274e01632d5864effa3bdc0aa0e4b3935b3bd96..356e28dd19bc5f2c19d7c3430a0aef01b6212432 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -80,7 +80,6 @@ class FakeTaskSetManager( override def resourceOffer( execId: String, host: String, - availableCpus: Int, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription] = { @@ -125,7 +124,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks)) } for (taskSet <- taskSetQueue) { - taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match { + taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match { case Some(task) => return taskSet.stageId case None => {} @@ -293,4 +292,43 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin assert(count > 0) assert(count < numTrials) } + + test("Scheduler correctly accounts for multiple CPUs per task") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskCpus = 2 + + sc.conf.set("spark.task.cpus", taskCpus.toString) + val taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler.initialize(new FakeSchedulerBackend) + // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks. + val dagScheduler = new DAGScheduler(sc, taskScheduler) { + override def taskStarted(task: Task[_], taskInfo: TaskInfo) {} + override def executorAdded(execId: String, host: String) {} + } + + // Give zero core offers. Should not generate any tasks + val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0), + new WorkerOffer("executor1", "host1", 0)) + val taskSet = FakeTask.createTaskSet(1) + taskScheduler.submitTasks(taskSet) + var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten + assert(0 === taskDescriptions.length) + + // No tasks should run as we only have 1 core free. + val numFreeCores = 1 + val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores), + new WorkerOffer("executor1", "host1", numFreeCores)) + taskScheduler.submitTasks(taskSet) + taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten + assert(0 === taskDescriptions.length) + + // Now change the offers to have 2 cores in one executor and verify if it + // is chosen. + val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus), + new WorkerOffer("executor1", "host1", numFreeCores)) + taskScheduler.submitTasks(taskSet) + taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten + assert(1 === taskDescriptions.length) + assert("executor0" === taskDescriptions(0).executorId) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 9af5d3a303ae25d7e6d0e5ddc24f10432558b0d5..c92b6dc96c8eb111eabdb8e02367a484547cc131 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -93,19 +93,16 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val taskSet = FakeTask.createTaskSet(1) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) - // Offer a host with no CPUs - assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None) - // Offer a host with process-local as the constraint; this should work because the TaskSet // above won't have any locality preferences - val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL) + val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL) assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === "exec1") assert(sched.startedTasks.contains(0)) // Re-offer the host -- now we should get no more tasks - assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) // Tell it the task has finished manager.handleSuccessfulTask(0, createTaskResult(0)) @@ -121,7 +118,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // First three offers should all find tasks for (i <- 0 until 3) { - val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) + val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) assert(taskOption.isDefined) val task = taskOption.get assert(task.executorId === "exec1") @@ -129,7 +126,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(sched.startedTasks.toSet === Set(0, 1, 2)) // Re-offer the host -- now we should get no more tasks - assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) // Finish the first two tasks manager.handleSuccessfulTask(0, createTaskResult(0)) @@ -157,35 +154,35 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // First offer host1, exec1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) // Offer host1, exec1 again: the last task, which has no prefs, should be chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3) // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) clock.advance(LOCALITY_WAIT) // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None) // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2 - assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None) + assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None) // Offer host1, exec1 again, at ANY level: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY) === None) clock.advance(LOCALITY_WAIT) // Offer host1, exec1 again, at ANY level: task 1 should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks - assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY) === None) } test("delay scheduling with fallback") { @@ -203,29 +200,29 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // First offer host1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) // Offer host1 again: nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY) === None) clock.advance(LOCALITY_WAIT) // Offer host1 again: second task (on host2) should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) // Offer host1 again: third task (on host2) should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) // Offer host2: fifth task (also on host2) should get chosen - assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4) + assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4) // Now that we've launched a local task, we should no longer launch the task for host3 - assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None) + assert(manager.resourceOffer("exec2", "host2", ANY) === None) clock.advance(LOCALITY_WAIT) // After another delay, we can go ahead and launch that task non-locally - assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3) + assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3) } test("delay scheduling with failed hosts") { @@ -240,24 +237,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) // First offer host1: first task should be chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) // Offer host1 again: third task should be chosen immediately because host3 is not up - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) // After this, nothing should get chosen - assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY) === None) // Now mark host2 as dead sched.removeExecutor("exec2") manager.executorLost("exec2", "host2") // Task 1 should immediately be launched on host1 because its original host is gone - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) // Now that all tasks have launched, nothing new should be launched anywhere else - assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None) - assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None) + assert(manager.resourceOffer("exec1", "host1", ANY) === None) + assert(manager.resourceOffer("exec2", "host2", ANY) === None) } test("task result lost") { @@ -267,14 +264,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val clock = new FakeClock val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock) - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) // Tell it the task has finished but the result was lost. manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost) assert(sched.endedTasks(0) === TaskResultLost) // Re-offer the host -- now we should get task 0 again. - assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) + assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) } test("repeated failures lead to task set abortion") { @@ -287,7 +284,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // after the last failure. (1 to manager.maxTaskFailures).foreach { index => - val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY) + val offerResult = manager.resourceOffer("exec1", "host1", ANY) assert(offerResult.isDefined, "Expect resource offer on iteration %s to return a task".format(index)) assert(offerResult.get.index === 0) @@ -317,7 +314,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { val manager = new TaskSetManager(sched, taskSet, 4, clock) { - val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL) assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -328,15 +325,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1 fails after failure 1 due to blacklist - assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty) - assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty) + assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty) } // Run the task on exec1.1 - should work, and then fail it on exec1.1 { - val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL) + val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL) assert(offerResult.isDefined, "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) @@ -348,12 +345,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist - assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty) } // Run the task on exec2 - should work, and then fail it on exec2 { - val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY) + val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY) assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) @@ -364,20 +361,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { assert(!sched.taskSetsFailed.contains(taskSet.id)) // Ensure scheduling on exec2 fails after failure 3 due to blacklist - assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty) + assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty) } // After reschedule delay, scheduling on exec1 should be possible. clock.advance(rescheduleDelay) { - val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL) + val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL) assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.get.index === 0) assert(offerResult.get.executorId === "exec1") - assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty) + assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty) // Cause exec1 to fail : failure 4 manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) diff --git a/docs/configuration.md b/docs/configuration.md index 16ee5ec0f230f1fecf4f0f978485e909b0d46bde..1ff01505672558c50d37607caad4d57696b44785 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -579,6 +579,13 @@ Apart from these, the following properties are also available, and may be useful out and giving up. </td> </tr> +<tr> + <td>spark.task.cpus</td> + <td>1</td> + <td> + Number of cores to allocate for each task. + </td> +</tr> </table> ## Viewing Spark Properties