diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 7c96ae637b3eff8ec5e677347aace56ca7011ef1..5d9a0357ad480ab2e2c1457921ee706d95a3636f 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -146,9 +146,7 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) - val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")). - newInstance().asInstanceOf[TaskSetQueuesManager] - scheduler.initialize(backend, taskSetQueuesManager) + scheduler.initialize(backend) scheduler case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => @@ -167,9 +165,7 @@ class SparkContext( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) val sparkUrl = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) - val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")). - newInstance().asInstanceOf[TaskSetQueuesManager] - scheduler.initialize(backend, taskSetQueuesManager) + scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() } @@ -188,9 +184,7 @@ class SparkContext( } else { new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName) } - val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")). - newInstance().asInstanceOf[TaskSetQueuesManager] - scheduler.initialize(backend, taskSetQueuesManager) + scheduler.initialize(backend) scheduler } } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 2ddac0ff30f750e5933e91337961691f044ce54a..1a300c9e8cbe7b5c26d4c37c4f5a4284b60f45f6 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -61,17 +61,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val mapOutputTracker = SparkEnv.get.mapOutputTracker - var taskSetQueuesManager: TaskSetQueuesManager = null + var schedulableBuilder: SchedulableBuilder = null + var rootPool: Pool = null override def setListener(listener: TaskSchedulerListener) { this.listener = listener } - def initialize(context: SchedulerBackend, taskSetQueuesManager: TaskSetQueuesManager) { + def initialize(context: SchedulerBackend) { backend = context - this.taskSetQueuesManager = taskSetQueuesManager + //default scheduler is FIFO + val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") + //temporarily set rootPool name to empty + rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0) + schedulableBuilder = { + schedulingMode match { + case "FIFO" => + new FIFOSchedulableBuilder(rootPool) + case "FAIR" => + new FairSchedulableBuilder(rootPool) + } + } + schedulableBuilder.buildPools() } + def newTaskId(): Long = nextTaskId.getAndIncrement() override def start() { @@ -101,7 +115,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) this.synchronized { val manager = new TaskSetManager(this, taskSet) activeTaskSets(taskSet.id) = manager - taskSetQueuesManager.addTaskSetManager(manager) + schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) taskSetTaskIds(taskSet.id) = new HashSet[Long]() if (hasReceivedTask == false) { @@ -124,26 +138,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def taskSetFinished(manager: TaskSetManager) { this.synchronized { activeTaskSets -= manager.taskSet.id - taskSetQueuesManager.removeTaskSetManager(manager) + manager.parent.removeSchedulable(manager) + logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name)) taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id) taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id) taskSetTaskIds.remove(manager.taskSet.id) } } - def taskFinished(manager: TaskSetManager) { - this.synchronized { - taskSetQueuesManager.taskFinished(manager) - } - } - /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = { - synchronized { + synchronized { SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname for (o <- offers) { @@ -155,27 +164,27 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // Build a list of tasks to assign to each slave val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = offers.map(o => o.cores).toArray - for (i <- 0 until offers.size){ - var launchedTask = true - val execId = offers(i).executorId - val host = offers(i).hostname - while (availableCpus(i) > 0 && launchedTask){ - launchedTask = false - taskSetQueuesManager.receiveOffer(execId,host,availableCpus(i)) match { - case Some(task) => - tasks(i) += task - val tid = task.taskId - taskIdToTaskSetId(tid) = task.taskSetId - taskSetTaskIds(task.taskSetId) += tid - taskIdToExecutorId(tid) = execId - activeExecutorIds += execId - executorsByHost(host) += execId - availableCpus(i) -= 1 - launchedTask = true - - case None => {} - } + for (i <- 0 until offers.size) { + var launchedTask = true + val execId = offers(i).executorId + val host = offers(i).hostname + while (availableCpus(i) > 0 && launchedTask) { + launchedTask = false + rootPool.receiveOffer(execId,host,availableCpus(i)) match { + case Some(task) => + tasks(i) += task + val tid = task.taskId + taskIdToTaskSetId(tid) = task.taskSetId + taskSetTaskIds(task.taskSetId) += tid + taskIdToExecutorId(tid) = execId + activeExecutorIds += execId + executorsByHost(host) += execId + availableCpus(i) -= 1 + launchedTask = true + + case None => {} } + } } if (tasks.size > 0) { hasLaunchedTask = true @@ -271,7 +280,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def checkSpeculatableTasks() { var shouldRevive = false synchronized { - shouldRevive = taskSetQueuesManager.checkSpeculatableTasks() + shouldRevive = rootPool.checkSpeculatableTasks() } if (shouldRevive) { backend.reviveOffers() @@ -314,6 +323,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) executorsByHost -= host } executorIdToHost -= executorId - taskSetQueuesManager.removeExecutor(executorId, host) + rootPool.executorLost(executorId, host) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala deleted file mode 100644 index 62d31303414de7f718a3bb510cf52826d47f3389..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/scheduler/cluster/FIFOTaskSetQueuesManager.scala +++ /dev/null @@ -1,49 +0,0 @@ -package spark.scheduler.cluster - -import scala.collection.mutable.ArrayBuffer - -import spark.Logging - -/** - * A FIFO Implementation of the TaskSetQueuesManager - */ -private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging { - - var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] - val tasksetSchedulingAlgorithm = new FIFOSchedulingAlgorithm() - - override def addTaskSetManager(manager: TaskSetManager) { - activeTaskSetsQueue += manager - } - - override def removeTaskSetManager(manager: TaskSetManager) { - activeTaskSetsQueue -= manager - } - - override def taskFinished(manager: TaskSetManager) { - //do nothing - } - - override def removeExecutor(executorId: String, host: String) { - activeTaskSetsQueue.foreach(_.executorLost(executorId, host)) - } - - override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] = { - - for (manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator)) { - val task = manager.slaveOffer(execId,host,avaiableCpus) - if (task != None) { - return task - } - } - return None - } - - override def checkSpeculatableTasks(): Boolean = { - var shouldRevive = false - for (ts <- activeTaskSetsQueue) { - shouldRevive |= ts.checkSpeculatableTasks() - } - return shouldRevive - } -} diff --git a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala b/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala deleted file mode 100644 index 89b74fbb4731dd7d0f1e0bede31d0413272f5053..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/scheduler/cluster/FairTaskSetQueuesManager.scala +++ /dev/null @@ -1,157 +0,0 @@ -package spark.scheduler.cluster - -import java.io.{File, FileInputStream, FileOutputStream} - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet -import scala.util.control.Breaks._ -import scala.xml._ - -import spark.Logging -import spark.scheduler.cluster.SchedulingMode.SchedulingMode - -/** - * A Fair Implementation of the TaskSetQueuesManager - * - * Currently we support minShare,weight for fair scheduler between pools - * Within a pool, it supports FIFO or FS - * Also, currently we could allocate pools dynamically - */ -private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging { - - val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") - val poolNameToPool= new HashMap[String, Pool] - var pools = new ArrayBuffer[Pool] - val poolScheduleAlgorithm = new FairSchedulingAlgorithm() - val POOL_FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" - val POOL_DEFAULT_POOL_NAME = "default" - val POOL_MINIMUM_SHARES_PROPERTY = "minShares" - val POOL_SCHEDULING_MODE_PROPERTY = "schedulingMode" - val POOL_WEIGHT_PROPERTY = "weight" - val POOL_POOL_NAME_PROPERTY = "@name" - val POOL_POOLS_PROPERTY = "pool" - val POOL_DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO - val POOL_DEFAULT_MINIMUM_SHARES = 2 - val POOL_DEFAULT_WEIGHT = 1 - - loadPoolProperties() - - override def addTaskSetManager(manager: TaskSetManager) { - var poolName = POOL_DEFAULT_POOL_NAME - if (manager.taskSet.properties != null) { - poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME) - if (!poolNameToPool.contains(poolName)) { - //we will create a new pool that user has configured in app instead of being defined in xml file - val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT) - pools += pool - poolNameToPool(poolName) = pool - logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format( - poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)) - } - } - poolNameToPool(poolName).addTaskSetManager(manager) - logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName) - } - - override def removeTaskSetManager(manager: TaskSetManager) { - var poolName = POOL_DEFAULT_POOL_NAME - if (manager.taskSet.properties != null) { - poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME) - } - logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName)) - val pool = poolNameToPool(poolName) - pool.removeTaskSetManager(manager) - pool.runningTasks -= manager.runningTasks - } - - override def taskFinished(manager: TaskSetManager) { - var poolName = POOL_DEFAULT_POOL_NAME - if (manager.taskSet.properties != null) { - poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME) - } - val pool = poolNameToPool(poolName) - pool.runningTasks -= 1 - manager.runningTasks -=1 - } - - override def removeExecutor(executorId: String, host: String) { - for (pool <- pools) { - pool.removeExecutor(executorId,host) - } - } - - override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] = { - val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator) - for (pool <- sortedPools) { - logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format( - pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks)) - } - for (pool <- sortedPools) { - val task = pool.receiveOffer(execId,host,avaiableCpus) - if(task != None) { - pool.runningTasks += 1 - return task - } - } - return None - } - - override def checkSpeculatableTasks(): Boolean = { - var shouldRevive = false - for (pool <- pools) { - shouldRevive |= pool.checkSpeculatableTasks() - } - return shouldRevive - } - - def loadPoolProperties() { - //first check if the file exists - val file = new File(schedulerAllocFile) - if (file.exists()) { - val xml = XML.loadFile(file) - for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) { - - val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text - var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE - var minShares = POOL_DEFAULT_MINIMUM_SHARES - var weight = POOL_DEFAULT_WEIGHT - - val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text - if (xmlSchedulingMode != "") { - try{ - schedulingMode = SchedulingMode.withName(xmlSchedulingMode) - } - catch{ - case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode") - } - } - - val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text - if (xmlMinShares != "") { - minShares = xmlMinShares.toInt - } - - val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text - if (xmlWeight != "") { - weight = xmlWeight.toInt - } - - val pool = new Pool(poolName,schedulingMode,minShares,weight) - pools += pool - poolNameToPool(poolName) = pool - logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format( - poolName,schedulingMode,minShares,weight)) - } - } - - if (!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME)) { - val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT) - pools += pool - poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool - logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format( - POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)) - } - } - } diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala index e0917ca1ca21b190461ddf5400042e73a22d5d27..d5482f71add04fa2acd405ccb642fcddc26a1e08 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala @@ -1,74 +1,106 @@ package spark.scheduler.cluster import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap import spark.Logging import spark.scheduler.cluster.SchedulingMode.SchedulingMode + /** - * An Schedulable entity that represent collection of TaskSetManager + * An Schedulable entity that represent collection of Pools or TaskSetManagers */ + private[spark] class Pool( val poolName: String, val schedulingMode: SchedulingMode, - initMinShare:Int, - initWeight:Int) + initMinShare: Int, + initWeight: Int) extends Schedulable with Logging { - var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] + var schedulableQueue = new ArrayBuffer[Schedulable] + var schedulableNameToSchedulable = new HashMap[String, Schedulable] var weight = initWeight var minShare = initMinShare var runningTasks = 0 - val priority = 0 - val stageId = 0 + var priority = 0 + var stageId = 0 + var name = poolName + var parent:Schedulable = null var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { - schedulingMode match { + schedulingMode match { case SchedulingMode.FAIR => - val schedule = new FairSchedulingAlgorithm() - schedule + new FairSchedulingAlgorithm() case SchedulingMode.FIFO => - val schedule = new FIFOSchedulingAlgorithm() - schedule + new FIFOSchedulingAlgorithm() } } - def addTaskSetManager(manager:TaskSetManager) { - activeTaskSetsQueue += manager + override def addSchedulable(schedulable: Schedulable) { + schedulableQueue += schedulable + schedulableNameToSchedulable(schedulable.name) = schedulable + schedulable.parent= this } - def removeTaskSetManager(manager:TaskSetManager) { - activeTaskSetsQueue -= manager + override def removeSchedulable(schedulable: Schedulable) { + schedulableQueue -= schedulable + schedulableNameToSchedulable -= schedulable.name } - def removeExecutor(executorId: String, host: String) { - activeTaskSetsQueue.foreach(_.executorLost(executorId,host)) + override def getSchedulableByName(schedulableName: String): Schedulable = { + if (schedulableNameToSchedulable.contains(schedulableName)) { + return schedulableNameToSchedulable(schedulableName) + } + for (schedulable <- schedulableQueue) { + var sched = schedulable.getSchedulableByName(schedulableName) + if (sched != null) { + return sched + } + } + return null } - def checkSpeculatableTasks(): Boolean = { + override def executorLost(executorId: String, host: String) { + schedulableQueue.foreach(_.executorLost(executorId, host)) + } + + override def checkSpeculatableTasks(): Boolean = { var shouldRevive = false - for (ts <- activeTaskSetsQueue) { - shouldRevive |= ts.checkSpeculatableTasks() + for (schedulable <- schedulableQueue) { + shouldRevive |= schedulable.checkSpeculatableTasks() } return shouldRevive } - def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] = { - val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator) - for (manager <- sortedActiveTasksSetQueue) { - logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format( - poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks)) + override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { + val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator) + for (manager <- sortedSchedulableQueue) { + logInfo("parentName:%s,schedulableName:%s,minShares:%d,weight:%d,runningTasks:%d".format( + manager.parent.name, manager.name, manager.minShare, manager.weight, manager.runningTasks)) } - - for (manager <- sortedActiveTasksSetQueue) { - val task = manager.slaveOffer(execId,host,availableCpus) - if (task != None) { - manager.runningTasks += 1 - return task - } + for (manager <- sortedSchedulableQueue) { + val task = manager.receiveOffer(execId, host, availableCpus) + if (task != None) { + return task + } } return None } + + override def increaseRunningTasks(taskNum: Int) { + runningTasks += taskNum + if (parent != null) { + parent.increaseRunningTasks(taskNum) + } + } + + override def decreaseRunningTasks(taskNum: Int) { + runningTasks -= taskNum + if (parent != null) { + parent.decreaseRunningTasks(taskNum) + } + } } diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala index 8dfc369c035bed29d6adfc898e085fe2a94ad91a..54e8ae95f9f37338c3e209ba2bd8502a3cb50e91 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala @@ -1,13 +1,26 @@ package spark.scheduler.cluster +import scala.collection.mutable.ArrayBuffer + /** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { - def weight:Int - def minShare:Int - def runningTasks:Int - def priority:Int - def stageId:Int + var parent: Schedulable + def weight: Int + def minShare: Int + def runningTasks: Int + def priority: Int + def stageId: Int + def name: String + + def increaseRunningTasks(taskNum: Int): Unit + def decreaseRunningTasks(taskNum: Int): Unit + def addSchedulable(schedulable: Schedulable): Unit + def removeSchedulable(schedulable: Schedulable): Unit + def getSchedulableByName(name: String): Schedulable + def executorLost(executorId: String, host: String): Unit + def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] + def checkSpeculatableTasks(): Boolean } diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala new file mode 100644 index 0000000000000000000000000000000000000000..47a426a45b8bc25bb3492b5ad8c0e2dbf36a0b16 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala @@ -0,0 +1,115 @@ +package spark.scheduler.cluster + +import java.io.{File, FileInputStream, FileOutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet +import scala.util.control.Breaks._ +import scala.xml._ + +import spark.Logging +import spark.scheduler.cluster.SchedulingMode.SchedulingMode + +import java.util.Properties + +/** + * An interface to build Schedulable tree + * buildPools: build the tree nodes(pools) + * addTaskSetManager: build the leaf nodes(TaskSetManagers) + */ +private[spark] trait SchedulableBuilder { + def buildPools() + def addTaskSetManager(manager: Schedulable, properties: Properties) +} + +private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { + + override def buildPools() { + //nothing + } + + override def addTaskSetManager(manager: Schedulable, properties: Properties) { + rootPool.addSchedulable(manager) + } +} + +private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging { + + val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified") + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" + val DEFAULT_POOL_NAME = "default" + val MINIMUM_SHARES_PROPERTY = "minShare" + val SCHEDULING_MODE_PROPERTY = "schedulingMode" + val WEIGHT_PROPERTY = "weight" + val POOL_NAME_PROPERTY = "@name" + val POOLS_PROPERTY = "pool" + val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO + val DEFAULT_MINIMUM_SHARE = 2 + val DEFAULT_WEIGHT = 1 + + override def buildPools() { + val file = new File(schedulerAllocFile) + if (file.exists()) { + val xml = XML.loadFile(file) + for (poolNode <- (xml \\ POOLS_PROPERTY)) { + + val poolName = (poolNode \ POOL_NAME_PROPERTY).text + var schedulingMode = DEFAULT_SCHEDULING_MODE + var minShare = DEFAULT_MINIMUM_SHARE + var weight = DEFAULT_WEIGHT + + val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text + if (xmlSchedulingMode != "") { + try { + schedulingMode = SchedulingMode.withName(xmlSchedulingMode) + } catch { + case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode") + } + } + + val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text + if (xmlMinShare != "") { + minShare = xmlMinShare.toInt + } + + val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text + if (xmlWeight != "") { + weight = xmlWeight.toInt + } + + val pool = new Pool(poolName, schedulingMode, minShare, weight) + rootPool.addSchedulable(pool) + logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( + poolName, schedulingMode, minShare, weight)) + } + } + + //finally create "default" pool + if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { + val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + rootPool.addSchedulable(pool) + logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( + DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + } +} + + override def addTaskSetManager(manager: Schedulable, properties: Properties) { + var poolName = DEFAULT_POOL_NAME + var parentPool = rootPool.getSchedulableByName(poolName) + if (properties != null) { + poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) + parentPool = rootPool.getSchedulableByName(poolName) + if (parentPool == null) { + //we will create a new pool that user has configured in app instead of being defined in xml file + parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + rootPool.addSchedulable(parentPool) + logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format( + poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + } + } + parentPool.addSchedulable(manager) + logInfo("Added task set " + manager.name + " tasks to pool "+poolName) + } +} diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala index ac2237a7ef1ff6bed32de859b8b195f9bbb538c6..a5d6285c993c407574007a92e9f03038195f85b1 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala @@ -2,11 +2,11 @@ package spark.scheduler.cluster /** * An interface for sort algorithm - * FIFO: FIFO algorithm for TaskSetManagers - * FS: FS algorithm for Pools, and FIFO or FS for TaskSetManagers + * FIFO: FIFO algorithm between TaskSetManagers + * FS: FS algorithm between Pools, and FIFO or FS within Pools */ private[spark] trait SchedulingAlgorithm { - def comparator(s1: Schedulable,s2: Schedulable): Boolean + def comparator(s1: Schedulable, s2: Schedulable): Boolean } private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { @@ -15,40 +15,41 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { val priority2 = s2.priority var res = Math.signum(priority1 - priority2) if (res == 0) { - val stageId1 = s1.stageId - val stageId2 = s2.stageId - res = Math.signum(stageId1 - stageId2) + val stageId1 = s1.stageId + val stageId2 = s2.stageId + res = Math.signum(stageId1 - stageId2) } - if (res < 0) + if (res < 0) { return true - else + } else { return false + } } } private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { - def comparator(s1: Schedulable, s2:Schedulable): Boolean = { + override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 - val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1,1.0).toDouble - val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2,1.0).toDouble + val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble + val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var res:Boolean = true - if (s1Needy && !s2Needy) + if (s1Needy && !s2Needy) { res = true - else if(!s1Needy && s2Needy) + } else if (!s1Needy && s2Needy) { res = false - else if (s1Needy && s2Needy) + } else if (s1Needy && s2Needy) { res = minShareRatio1 <= minShareRatio2 - else + } else { res = taskToWeightRatio1 <= taskToWeightRatio2 - + } return res } } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 7ec2f69da559a3d960a331a5a0a990216fb4a814..baaaa41a37f6bedf4bd8ab5d61f05e6b07189148 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -32,8 +32,6 @@ private[spark] class TaskSetManager( // Maximum times a task is allowed to fail before failing the job val MAX_TASK_FAILURES = 4 - val TASKSET_MINIMUM_SHARES = 1 - val TASKSET_WEIGHT = 1 // Quantile of tasks at which to start speculation val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble @@ -41,12 +39,6 @@ private[spark] class TaskSetManager( // Serializer for closures and tasks. val ser = SparkEnv.get.closureSerializer.newInstance() - var weight = TASKSET_WEIGHT - var minShare = TASKSET_MINIMUM_SHARES - var runningTasks = 0 - val priority = taskSet.priority - val stageId = taskSet.stageId - val tasks = taskSet.tasks val numTasks = tasks.length val copiesRunning = new Array[Int](numTasks) @@ -55,6 +47,14 @@ private[spark] class TaskSetManager( val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 + var weight = 1 + var minShare = 0 + var runningTasks = 0 + var priority = taskSet.priority + var stageId = taskSet.stageId + var name = "TaskSet_"+taskSet.stageId.toString + var parent:Schedulable = null + // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -198,7 +198,7 @@ private[spark] class TaskSetManager( } // Respond to an offer of a single slave from the scheduler by finding a task - def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { + override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -230,10 +230,11 @@ private[spark] class TaskSetManager( val serializedTask = Task.serializeWithDependencies( task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = System.currentTimeMillis - startTime + increaseRunningTasks(1) logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) - return Some(new TaskDescription(taskId,taskSet.id,execId, taskName, serializedTask)) + return Some(new TaskDescription(taskId, taskSet.id, execId, taskName, serializedTask)) } case _ => } @@ -264,7 +265,7 @@ private[spark] class TaskSetManager( } val index = info.index info.markSuccessful() - sched.taskFinished(this) + decreaseRunningTasks(1) if (!finished(index)) { tasksFinished += 1 logInfo("Finished TID %s in %d ms (progress: %d/%d)".format( @@ -293,7 +294,7 @@ private[spark] class TaskSetManager( } val index = info.index info.markFailed() - sched.taskFinished(this) + decreaseRunningTasks(1) if (!finished(index)) { logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) copiesRunning(index) -= 1 @@ -308,6 +309,7 @@ private[spark] class TaskSetManager( finished(index) = true tasksFinished += 1 sched.taskSetFinished(this) + decreaseRunningTasks(runningTasks) return case ef: ExceptionFailure => @@ -365,10 +367,38 @@ private[spark] class TaskSetManager( causeOfFailure = message // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.listener.taskSetFailed(taskSet, message) + decreaseRunningTasks(runningTasks) sched.taskSetFinished(this) } - def executorLost(execId: String, hostname: String) { + override def increaseRunningTasks(taskNum: Int) { + runningTasks += taskNum + if (parent != null) { + parent.increaseRunningTasks(taskNum) + } + } + + override def decreaseRunningTasks(taskNum: Int) { + runningTasks -= taskNum + if (parent != null) { + parent.decreaseRunningTasks(taskNum) + } + } + + //TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed + override def getSchedulableByName(name: String): Schedulable = { + return null + } + + override def addSchedulable(schedulable:Schedulable) { + //nothing + } + + override def removeSchedulable(schedulable:Schedulable) { + //nothing + } + + override def executorLost(execId: String, hostname: String) { logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) val newHostsAlive = sched.hostsAlive // If some task has preferred locations only on hostname, and there are no more executors there, @@ -409,7 +439,7 @@ private[spark] class TaskSetManager( * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that * we don't scan the whole task set. It might also help to make this sorted by launch time. */ - def checkSpeculatableTasks(): Boolean = { + override def checkSpeculatableTasks(): Boolean = { // Can't speculate if we only have one task, or if all tasks have finished. if (numTasks == 1 || tasksFinished == numTasks) { return false diff --git a/core/src/test/resources/fairscheduler.xml b/core/src/test/resources/fairscheduler.xml new file mode 100644 index 0000000000000000000000000000000000000000..5a688b0ebb973105fa603f924fe819eecda8ba46 --- /dev/null +++ b/core/src/test/resources/fairscheduler.xml @@ -0,0 +1,14 @@ +<allocations> +<pool name="1"> + <minShare>2</minShare> + <weight>1</weight> + <schedulingMode>FIFO</schedulingMode> +</pool> +<pool name="2"> + <minShare>3</minShare> + <weight>1</weight> + <schedulingMode>FIFO</schedulingMode> +</pool> +<pool name="3"> +</pool> +</allocations> diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..2eda48196be74470d37d3c057876e3f068f9460d --- /dev/null +++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala @@ -0,0 +1,207 @@ +package spark.scheduler + +import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfter + +import spark._ +import spark.scheduler._ +import spark.scheduler.cluster._ + +import java.util.Properties + +class DummyTaskSetManager( + initPriority: Int, + initStageId: Int, + initNumTasks: Int) + extends Schedulable { + + var parent: Schedulable = null + var weight = 1 + var minShare = 2 + var runningTasks = 0 + var priority = initPriority + var stageId = initStageId + var name = "TaskSet_"+stageId + var numTasks = initNumTasks + var tasksFinished = 0 + + def increaseRunningTasks(taskNum: Int) { + runningTasks += taskNum + if (parent != null) { + parent.increaseRunningTasks(taskNum) + } + } + + def decreaseRunningTasks(taskNum: Int) { + runningTasks -= taskNum + if (parent != null) { + parent.decreaseRunningTasks(taskNum) + } + } + + def addSchedulable(schedulable: Schedulable) { + } + + def removeSchedulable(schedulable: Schedulable) { + } + + def getSchedulableByName(name: String): Schedulable = { + return null + } + + def executorLost(executorId: String, host: String): Unit = { + } + + def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = { + if (tasksFinished + runningTasks < numTasks) { + increaseRunningTasks(1) + return Some(new TaskDescription(0, stageId.toString, execId, "task 0:0", null)) + } + return None + } + + def checkSpeculatableTasks(): Boolean = { + return true + } + + def taskFinished() { + decreaseRunningTasks(1) + tasksFinished +=1 + if (tasksFinished == numTasks) { + parent.removeSchedulable(this) + } + } + + def abort() { + decreaseRunningTasks(runningTasks) + parent.removeSchedulable(this) + } +} + +class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter { + + def receiveOffer(rootPool: Pool) : Option[TaskDescription] = { + rootPool.receiveOffer("execId_1", "hostname_1", 1) + } + + def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) { + receiveOffer(rootPool) match { + case Some(task) => + assert(task.taskSetId.toInt === expectedTaskSetId) + case _ => + } + } + + test("FIFO Scheduler Test") { + val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) + val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) + schedulableBuilder.buildPools() + + val taskSetManager0 = new DummyTaskSetManager(0, 0, 2) + val taskSetManager1 = new DummyTaskSetManager(0, 1, 2) + val taskSetManager2 = new DummyTaskSetManager(0, 2, 2) + schedulableBuilder.addTaskSetManager(taskSetManager0, null) + schedulableBuilder.addTaskSetManager(taskSetManager1, null) + schedulableBuilder.addTaskSetManager(taskSetManager2, null) + + checkTaskSetId(rootPool, 0) + receiveOffer(rootPool) + checkTaskSetId(rootPool, 1) + receiveOffer(rootPool) + taskSetManager1.abort() + checkTaskSetId(rootPool, 2) + } + + test("Fair Scheduler Test") { + val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() + System.setProperty("spark.fairscheduler.allocation.file", xmlPath) + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val schedulableBuilder = new FairSchedulableBuilder(rootPool) + schedulableBuilder.buildPools() + + assert(rootPool.getSchedulableByName("default") != null) + assert(rootPool.getSchedulableByName("1") != null) + assert(rootPool.getSchedulableByName("2") != null) + assert(rootPool.getSchedulableByName("3") != null) + assert(rootPool.getSchedulableByName("1").minShare === 2) + assert(rootPool.getSchedulableByName("1").weight === 1) + assert(rootPool.getSchedulableByName("2").minShare === 3) + assert(rootPool.getSchedulableByName("2").weight === 1) + assert(rootPool.getSchedulableByName("3").minShare === 2) + assert(rootPool.getSchedulableByName("3").weight === 1) + + val properties1 = new Properties() + properties1.setProperty("spark.scheduler.cluster.fair.pool","1") + val properties2 = new Properties() + properties2.setProperty("spark.scheduler.cluster.fair.pool","2") + + val taskSetManager10 = new DummyTaskSetManager(1, 0, 1) + val taskSetManager11 = new DummyTaskSetManager(1, 1, 1) + val taskSetManager12 = new DummyTaskSetManager(1, 2, 2) + schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) + schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) + schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) + + val taskSetManager23 = new DummyTaskSetManager(2, 3, 2) + val taskSetManager24 = new DummyTaskSetManager(2, 4, 2) + schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) + schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) + + checkTaskSetId(rootPool, 0) + checkTaskSetId(rootPool, 3) + checkTaskSetId(rootPool, 3) + checkTaskSetId(rootPool, 1) + checkTaskSetId(rootPool, 4) + checkTaskSetId(rootPool, 2) + checkTaskSetId(rootPool, 2) + checkTaskSetId(rootPool, 4) + + taskSetManager12.taskFinished() + assert(rootPool.getSchedulableByName("1").runningTasks === 3) + taskSetManager24.abort() + assert(rootPool.getSchedulableByName("2").runningTasks === 2) + } + + test("Nested Pool Test") { + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) + val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) + rootPool.addSchedulable(pool0) + rootPool.addSchedulable(pool1) + + val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2) + val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1) + pool0.addSchedulable(pool00) + pool0.addSchedulable(pool01) + + val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2) + val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) + pool1.addSchedulable(pool10) + pool1.addSchedulable(pool11) + + val taskSetManager000 = new DummyTaskSetManager(0, 0, 5) + val taskSetManager001 = new DummyTaskSetManager(0, 1, 5) + pool00.addSchedulable(taskSetManager000) + pool00.addSchedulable(taskSetManager001) + + val taskSetManager010 = new DummyTaskSetManager(1, 2, 5) + val taskSetManager011 = new DummyTaskSetManager(1, 3, 5) + pool01.addSchedulable(taskSetManager010) + pool01.addSchedulable(taskSetManager011) + + val taskSetManager100 = new DummyTaskSetManager(2, 4, 5) + val taskSetManager101 = new DummyTaskSetManager(2, 5, 5) + pool10.addSchedulable(taskSetManager100) + pool10.addSchedulable(taskSetManager101) + + val taskSetManager110 = new DummyTaskSetManager(3, 6, 5) + val taskSetManager111 = new DummyTaskSetManager(3, 7, 5) + pool11.addSchedulable(taskSetManager110) + pool11.addSchedulable(taskSetManager111) + + checkTaskSetId(rootPool, 0) + checkTaskSetId(rootPool, 4) + checkTaskSetId(rootPool, 6) + checkTaskSetId(rootPool, 2) + } +}