From ddf64f019fa98010e0a59e6e1559f4e3f8b25b5f Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@apache.org> Date: Thu, 10 Oct 2013 13:20:27 -0700 Subject: [PATCH] Support job cancellation in multi-pool scheduler. --- .../org/apache/spark/scheduler/Pool.scala | 2 +- .../spark/scheduler/SchedulableBuilder.scala | 19 +++++++++++++++++-- .../scheduler/cluster/ClusterScheduler.scala | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 9eb8d48501..8b33319d02 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -43,7 +43,7 @@ private[spark] class Pool( var runningTasks = 0 var priority = 0 - var stageId = 0 + var stageId = -1 var name = poolName var parent: Pool = null diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index c4f555bfe1..a4e86538f9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -36,8 +36,23 @@ private[spark] trait SchedulableBuilder { def addTaskSetManager(manager: Schedulable, properties: Properties) - def getTaskSetManagers(stageId: Int): Iterable[Schedulable] = { - rootPool.schedulableQueue.filter(_.stageId == stageId) + /** + * Find the TaskSetManager for the given stage. In fair scheduler, this function examines + * all the pools to find the TaskSetManager. + */ + def getTaskSetManagers(stageId: Int): Option[TaskSetManager] = { + def getTsm(pool: Pool): Option[TaskSetManager] = { + pool.schedulableQueue.foreach { + case tsm: TaskSetManager => + if (tsm.stageId == stageId) { + return Some(tsm) + } + case pool: Pool => + getTsm(pool) + } + return None + } + getTsm(rootPool) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala index 031d0b1ef7..250dec5126 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala @@ -172,7 +172,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) override def cancelTasks(stageId: Int): Unit = synchronized { logInfo("Cancelling stage " + stageId) - schedulableBuilder.getTaskSetManagers(stageId).foreach { case tsm: TaskSetManager => + schedulableBuilder.getTaskSetManagers(stageId).foreach { tsm => // There are two possible cases here: // 1. The task set manager has been created and some tasks have been scheduled. // In this case, send a kill signal to the executors to kill the task. -- GitLab