Skip to content
Snippets Groups Projects
Commit e0603d7e authored by Andrew xia's avatar Andrew xia
Browse files

refactor the Schedulable interface and add unit test for SchedulingAlgorithm

parent 2f883c51
No related branches found
No related tags found
No related merge requests found
Showing
with 525 additions and 316 deletions
...@@ -146,9 +146,7 @@ class SparkContext( ...@@ -146,9 +146,7 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) => case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this) val scheduler = new ClusterScheduler(this)
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")). scheduler.initialize(backend)
newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
scheduler scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
...@@ -167,9 +165,7 @@ class SparkContext( ...@@ -167,9 +165,7 @@ class SparkContext(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
val sparkUrl = localCluster.start() val sparkUrl = localCluster.start()
val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")). scheduler.initialize(backend)
newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop() localCluster.stop()
} }
...@@ -188,9 +184,7 @@ class SparkContext( ...@@ -188,9 +184,7 @@ class SparkContext(
} else { } else {
new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName) new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
} }
val taskSetQueuesManager = Class.forName(System.getProperty("spark.cluster.taskscheduler")). scheduler.initialize(backend)
newInstance().asInstanceOf[TaskSetQueuesManager]
scheduler.initialize(backend, taskSetQueuesManager)
scheduler scheduler
} }
} }
......
...@@ -61,17 +61,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -61,17 +61,31 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val mapOutputTracker = SparkEnv.get.mapOutputTracker val mapOutputTracker = SparkEnv.get.mapOutputTracker
var taskSetQueuesManager: TaskSetQueuesManager = null var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
override def setListener(listener: TaskSchedulerListener) { override def setListener(listener: TaskSchedulerListener) {
this.listener = listener this.listener = listener
} }
def initialize(context: SchedulerBackend, taskSetQueuesManager: TaskSetQueuesManager) { def initialize(context: SchedulerBackend) {
backend = context backend = context
this.taskSetQueuesManager = taskSetQueuesManager //default scheduler is FIFO
val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
//temporarily set rootPool name to empty
rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
schedulableBuilder = {
schedulingMode match {
case "FIFO" =>
new FIFOSchedulableBuilder(rootPool)
case "FAIR" =>
new FairSchedulableBuilder(rootPool)
}
}
schedulableBuilder.buildPools()
} }
def newTaskId(): Long = nextTaskId.getAndIncrement() def newTaskId(): Long = nextTaskId.getAndIncrement()
override def start() { override def start() {
...@@ -101,7 +115,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -101,7 +115,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
this.synchronized { this.synchronized {
val manager = new TaskSetManager(this, taskSet) val manager = new TaskSetManager(this, taskSet)
activeTaskSets(taskSet.id) = manager activeTaskSets(taskSet.id) = manager
taskSetQueuesManager.addTaskSetManager(manager) schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
taskSetTaskIds(taskSet.id) = new HashSet[Long]() taskSetTaskIds(taskSet.id) = new HashSet[Long]()
if (hasReceivedTask == false) { if (hasReceivedTask == false) {
...@@ -124,26 +138,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -124,26 +138,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def taskSetFinished(manager: TaskSetManager) { def taskSetFinished(manager: TaskSetManager) {
this.synchronized { this.synchronized {
activeTaskSets -= manager.taskSet.id activeTaskSets -= manager.taskSet.id
taskSetQueuesManager.removeTaskSetManager(manager) manager.parent.removeSchedulable(manager)
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id) taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id) taskIdToExecutorId --= taskSetTaskIds(manager.taskSet.id)
taskSetTaskIds.remove(manager.taskSet.id) taskSetTaskIds.remove(manager.taskSet.id)
} }
} }
def taskFinished(manager: TaskSetManager) {
this.synchronized {
taskSetQueuesManager.taskFinished(manager)
}
}
/** /**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task * Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster. * that tasks are balanced across the cluster.
*/ */
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = { def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
synchronized { synchronized {
SparkEnv.set(sc.env) SparkEnv.set(sc.env)
// Mark each slave as alive and remember its hostname // Mark each slave as alive and remember its hostname
for (o <- offers) { for (o <- offers) {
...@@ -155,27 +164,27 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -155,27 +164,27 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Build a list of tasks to assign to each slave // Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray val availableCpus = offers.map(o => o.cores).toArray
for (i <- 0 until offers.size){ for (i <- 0 until offers.size) {
var launchedTask = true var launchedTask = true
val execId = offers(i).executorId val execId = offers(i).executorId
val host = offers(i).hostname val host = offers(i).hostname
while (availableCpus(i) > 0 && launchedTask){ while (availableCpus(i) > 0 && launchedTask) {
launchedTask = false launchedTask = false
taskSetQueuesManager.receiveOffer(execId,host,availableCpus(i)) match { rootPool.receiveOffer(execId,host,availableCpus(i)) match {
case Some(task) => case Some(task) =>
tasks(i) += task tasks(i) += task
val tid = task.taskId val tid = task.taskId
taskIdToTaskSetId(tid) = task.taskSetId taskIdToTaskSetId(tid) = task.taskSetId
taskSetTaskIds(task.taskSetId) += tid taskSetTaskIds(task.taskSetId) += tid
taskIdToExecutorId(tid) = execId taskIdToExecutorId(tid) = execId
activeExecutorIds += execId activeExecutorIds += execId
executorsByHost(host) += execId executorsByHost(host) += execId
availableCpus(i) -= 1 availableCpus(i) -= 1
launchedTask = true launchedTask = true
case None => {} case None => {}
}
} }
}
} }
if (tasks.size > 0) { if (tasks.size > 0) {
hasLaunchedTask = true hasLaunchedTask = true
...@@ -271,7 +280,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -271,7 +280,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def checkSpeculatableTasks() { def checkSpeculatableTasks() {
var shouldRevive = false var shouldRevive = false
synchronized { synchronized {
shouldRevive = taskSetQueuesManager.checkSpeculatableTasks() shouldRevive = rootPool.checkSpeculatableTasks()
} }
if (shouldRevive) { if (shouldRevive) {
backend.reviveOffers() backend.reviveOffers()
...@@ -314,6 +323,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -314,6 +323,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHost -= host executorsByHost -= host
} }
executorIdToHost -= executorId executorIdToHost -= executorId
taskSetQueuesManager.removeExecutor(executorId, host) rootPool.executorLost(executorId, host)
} }
} }
package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
import spark.Logging
/**
* A FIFO Implementation of the TaskSetQueuesManager
*/
private[spark] class FIFOTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
val tasksetSchedulingAlgorithm = new FIFOSchedulingAlgorithm()
override def addTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue += manager
}
override def removeTaskSetManager(manager: TaskSetManager) {
activeTaskSetsQueue -= manager
}
override def taskFinished(manager: TaskSetManager) {
//do nothing
}
override def removeExecutor(executorId: String, host: String) {
activeTaskSetsQueue.foreach(_.executorLost(executorId, host))
}
override def receiveOffer(execId:String, host:String,avaiableCpus:Double):Option[TaskDescription] = {
for (manager <- activeTaskSetsQueue.sortWith(tasksetSchedulingAlgorithm.comparator)) {
val task = manager.slaveOffer(execId,host,avaiableCpus)
if (task != None) {
return task
}
}
return None
}
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
for (ts <- activeTaskSetsQueue) {
shouldRevive |= ts.checkSpeculatableTasks()
}
return shouldRevive
}
}
package spark.scheduler.cluster
import java.io.{File, FileInputStream, FileOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.util.control.Breaks._
import scala.xml._
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* A Fair Implementation of the TaskSetQueuesManager
*
* Currently we support minShare,weight for fair scheduler between pools
* Within a pool, it supports FIFO or FS
* Also, currently we could allocate pools dynamically
*/
private[spark] class FairTaskSetQueuesManager extends TaskSetQueuesManager with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val poolNameToPool= new HashMap[String, Pool]
var pools = new ArrayBuffer[Pool]
val poolScheduleAlgorithm = new FairSchedulingAlgorithm()
val POOL_FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
val POOL_DEFAULT_POOL_NAME = "default"
val POOL_MINIMUM_SHARES_PROPERTY = "minShares"
val POOL_SCHEDULING_MODE_PROPERTY = "schedulingMode"
val POOL_WEIGHT_PROPERTY = "weight"
val POOL_POOL_NAME_PROPERTY = "@name"
val POOL_POOLS_PROPERTY = "pool"
val POOL_DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
val POOL_DEFAULT_MINIMUM_SHARES = 2
val POOL_DEFAULT_WEIGHT = 1
loadPoolProperties()
override def addTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
if (!poolNameToPool.contains(poolName)) {
//we will create a new pool that user has configured in app instead of being defined in xml file
val pool = new Pool(poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(poolName) = pool
logInfo("Create pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
poolName,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
poolNameToPool(poolName).addTaskSetManager(manager)
logInfo("Added task set " + manager.taskSet.id + " tasks to pool "+poolName)
}
override def removeTaskSetManager(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id,poolName))
val pool = poolNameToPool(poolName)
pool.removeTaskSetManager(manager)
pool.runningTasks -= manager.runningTasks
}
override def taskFinished(manager: TaskSetManager) {
var poolName = POOL_DEFAULT_POOL_NAME
if (manager.taskSet.properties != null) {
poolName = manager.taskSet.properties.getProperty(POOL_FAIR_SCHEDULER_PROPERTIES,POOL_DEFAULT_POOL_NAME)
}
val pool = poolNameToPool(poolName)
pool.runningTasks -= 1
manager.runningTasks -=1
}
override def removeExecutor(executorId: String, host: String) {
for (pool <- pools) {
pool.removeExecutor(executorId,host)
}
}
override def receiveOffer(execId: String,host:String,avaiableCpus:Double):Option[TaskDescription] = {
val sortedPools = pools.sortWith(poolScheduleAlgorithm.comparator)
for (pool <- sortedPools) {
logDebug("poolName:%s,tasksetNum:%d,minShares:%d,runningTasks:%d".format(
pool.poolName,pool.activeTaskSetsQueue.length,pool.minShare,pool.runningTasks))
}
for (pool <- sortedPools) {
val task = pool.receiveOffer(execId,host,avaiableCpus)
if(task != None) {
pool.runningTasks += 1
return task
}
}
return None
}
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
for (pool <- pools) {
shouldRevive |= pool.checkSpeculatableTasks()
}
return shouldRevive
}
def loadPoolProperties() {
//first check if the file exists
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOL_POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_POOL_NAME_PROPERTY).text
var schedulingMode = POOL_DEFAULT_SCHEDULING_MODE
var minShares = POOL_DEFAULT_MINIMUM_SHARES
var weight = POOL_DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ POOL_SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try{
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
}
catch{
case e:Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShares = (poolNode \ POOL_MINIMUM_SHARES_PROPERTY).text
if (xmlMinShares != "") {
minShares = xmlMinShares.toInt
}
val xmlWeight = (poolNode \ POOL_WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName,schedulingMode,minShares,weight)
pools += pool
poolNameToPool(poolName) = pool
logInfo("Create new pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
poolName,schedulingMode,minShares,weight))
}
}
if (!poolNameToPool.contains(POOL_DEFAULT_POOL_NAME)) {
val pool = new Pool(POOL_DEFAULT_POOL_NAME, POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT)
pools += pool
poolNameToPool(POOL_DEFAULT_POOL_NAME) = pool
logInfo("Create default pool with name:%s,schedulingMode:%s,minShares:%d,weight:%d".format(
POOL_DEFAULT_POOL_NAME,POOL_DEFAULT_SCHEDULING_MODE,POOL_DEFAULT_MINIMUM_SHARES,POOL_DEFAULT_WEIGHT))
}
}
}
package spark.scheduler.cluster package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import spark.Logging import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** /**
* An Schedulable entity that represent collection of TaskSetManager * An Schedulable entity that represent collection of Pools or TaskSetManagers
*/ */
private[spark] class Pool( private[spark] class Pool(
val poolName: String, val poolName: String,
val schedulingMode: SchedulingMode, val schedulingMode: SchedulingMode,
initMinShare:Int, initMinShare: Int,
initWeight:Int) initWeight: Int)
extends Schedulable extends Schedulable
with Logging { with Logging {
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] var schedulableQueue = new ArrayBuffer[Schedulable]
var schedulableNameToSchedulable = new HashMap[String, Schedulable]
var weight = initWeight var weight = initWeight
var minShare = initMinShare var minShare = initMinShare
var runningTasks = 0 var runningTasks = 0
val priority = 0 var priority = 0
val stageId = 0 var stageId = 0
var name = poolName
var parent:Schedulable = null
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match { schedulingMode match {
case SchedulingMode.FAIR => case SchedulingMode.FAIR =>
val schedule = new FairSchedulingAlgorithm() new FairSchedulingAlgorithm()
schedule
case SchedulingMode.FIFO => case SchedulingMode.FIFO =>
val schedule = new FIFOSchedulingAlgorithm() new FIFOSchedulingAlgorithm()
schedule
} }
} }
def addTaskSetManager(manager:TaskSetManager) { override def addSchedulable(schedulable: Schedulable) {
activeTaskSetsQueue += manager schedulableQueue += schedulable
schedulableNameToSchedulable(schedulable.name) = schedulable
schedulable.parent= this
} }
def removeTaskSetManager(manager:TaskSetManager) { override def removeSchedulable(schedulable: Schedulable) {
activeTaskSetsQueue -= manager schedulableQueue -= schedulable
schedulableNameToSchedulable -= schedulable.name
} }
def removeExecutor(executorId: String, host: String) { override def getSchedulableByName(schedulableName: String): Schedulable = {
activeTaskSetsQueue.foreach(_.executorLost(executorId,host)) if (schedulableNameToSchedulable.contains(schedulableName)) {
return schedulableNameToSchedulable(schedulableName)
}
for (schedulable <- schedulableQueue) {
var sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
}
}
return null
} }
def checkSpeculatableTasks(): Boolean = { override def executorLost(executorId: String, host: String) {
schedulableQueue.foreach(_.executorLost(executorId, host))
}
override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false var shouldRevive = false
for (ts <- activeTaskSetsQueue) { for (schedulable <- schedulableQueue) {
shouldRevive |= ts.checkSpeculatableTasks() shouldRevive |= schedulable.checkSpeculatableTasks()
} }
return shouldRevive return shouldRevive
} }
def receiveOffer(execId:String,host:String,availableCpus:Double):Option[TaskDescription] = { override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
val sortedActiveTasksSetQueue = activeTaskSetsQueue.sortWith(taskSetSchedulingAlgorithm.comparator) val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for (manager <- sortedActiveTasksSetQueue) { for (manager <- sortedSchedulableQueue) {
logDebug("poolname:%s,taskSetId:%s,taskNum:%d,minShares:%d,weight:%d,runningTasks:%d".format( logInfo("parentName:%s,schedulableName:%s,minShares:%d,weight:%d,runningTasks:%d".format(
poolName,manager.taskSet.id,manager.numTasks,manager.minShare,manager.weight,manager.runningTasks)) manager.parent.name, manager.name, manager.minShare, manager.weight, manager.runningTasks))
} }
for (manager <- sortedSchedulableQueue) {
for (manager <- sortedActiveTasksSetQueue) { val task = manager.receiveOffer(execId, host, availableCpus)
val task = manager.slaveOffer(execId,host,availableCpus) if (task != None) {
if (task != None) { return task
manager.runningTasks += 1 }
return task
}
} }
return None return None
} }
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
} }
package spark.scheduler.cluster package spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
/** /**
* An interface for schedulable entities. * An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers) * there are two type of Schedulable entities(Pools and TaskSetManagers)
*/ */
private[spark] trait Schedulable { private[spark] trait Schedulable {
def weight:Int var parent: Schedulable
def minShare:Int def weight: Int
def runningTasks:Int def minShare: Int
def priority:Int def runningTasks: Int
def stageId:Int def priority: Int
def stageId: Int
def name: String
def increaseRunningTasks(taskNum: Int): Unit
def decreaseRunningTasks(taskNum: Int): Unit
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String): Unit
def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
} }
package spark.scheduler.cluster
import java.io.{File, FileInputStream, FileOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.util.control.Breaks._
import scala.xml._
import spark.Logging
import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.util.Properties
/**
* An interface to build Schedulable tree
* buildPools: build the tree nodes(pools)
* addTaskSetManager: build the leaf nodes(TaskSetManagers)
*/
private[spark] trait SchedulableBuilder {
def buildPools()
def addTaskSetManager(manager: Schedulable, properties: Properties)
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
override def buildPools() {
//nothing
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
rootPool.addSchedulable(manager)
}
}
private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
val DEFAULT_POOL_NAME = "default"
val MINIMUM_SHARES_PROPERTY = "minShare"
val SCHEDULING_MODE_PROPERTY = "schedulingMode"
val WEIGHT_PROPERTY = "weight"
val POOL_NAME_PROPERTY = "@name"
val POOLS_PROPERTY = "pool"
val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO
val DEFAULT_MINIMUM_SHARE = 2
val DEFAULT_WEIGHT = 1
override def buildPools() {
val file = new File(schedulerAllocFile)
if (file.exists()) {
val xml = XML.loadFile(file)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: Exception => logInfo("Error xml schedulingMode, using default schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Create new pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, schedulingMode, minShare, weight))
}
}
//finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
if (properties != null) {
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
//we will create a new pool that user has configured in app instead of being defined in xml file
parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
parentPool.addSchedulable(manager)
logInfo("Added task set " + manager.name + " tasks to pool "+poolName)
}
}
...@@ -2,11 +2,11 @@ package spark.scheduler.cluster ...@@ -2,11 +2,11 @@ package spark.scheduler.cluster
/** /**
* An interface for sort algorithm * An interface for sort algorithm
* FIFO: FIFO algorithm for TaskSetManagers * FIFO: FIFO algorithm between TaskSetManagers
* FS: FS algorithm for Pools, and FIFO or FS for TaskSetManagers * FS: FS algorithm between Pools, and FIFO or FS within Pools
*/ */
private[spark] trait SchedulingAlgorithm { private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable,s2: Schedulable): Boolean def comparator(s1: Schedulable, s2: Schedulable): Boolean
} }
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
...@@ -15,40 +15,41 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { ...@@ -15,40 +15,41 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
val priority2 = s2.priority val priority2 = s2.priority
var res = Math.signum(priority1 - priority2) var res = Math.signum(priority1 - priority2)
if (res == 0) { if (res == 0) {
val stageId1 = s1.stageId val stageId1 = s1.stageId
val stageId2 = s2.stageId val stageId2 = s2.stageId
res = Math.signum(stageId1 - stageId2) res = Math.signum(stageId1 - stageId2)
} }
if (res < 0) if (res < 0) {
return true return true
else } else {
return false return false
}
} }
} }
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
def comparator(s1: Schedulable, s2:Schedulable): Boolean = { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare val minShare1 = s1.minShare
val minShare2 = s2.minShare val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1 val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2 val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1,1.0).toDouble val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble
val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2,1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var res:Boolean = true var res:Boolean = true
if (s1Needy && !s2Needy) if (s1Needy && !s2Needy) {
res = true res = true
else if(!s1Needy && s2Needy) } else if (!s1Needy && s2Needy) {
res = false res = false
else if (s1Needy && s2Needy) } else if (s1Needy && s2Needy) {
res = minShareRatio1 <= minShareRatio2 res = minShareRatio1 <= minShareRatio2
else } else {
res = taskToWeightRatio1 <= taskToWeightRatio2 res = taskToWeightRatio1 <= taskToWeightRatio2
}
return res return res
} }
} }
......
...@@ -32,8 +32,6 @@ private[spark] class TaskSetManager( ...@@ -32,8 +32,6 @@ private[spark] class TaskSetManager(
// Maximum times a task is allowed to fail before failing the job // Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4 val MAX_TASK_FAILURES = 4
val TASKSET_MINIMUM_SHARES = 1
val TASKSET_WEIGHT = 1
// Quantile of tasks at which to start speculation // Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
...@@ -41,12 +39,6 @@ private[spark] class TaskSetManager( ...@@ -41,12 +39,6 @@ private[spark] class TaskSetManager(
// Serializer for closures and tasks. // Serializer for closures and tasks.
val ser = SparkEnv.get.closureSerializer.newInstance() val ser = SparkEnv.get.closureSerializer.newInstance()
var weight = TASKSET_WEIGHT
var minShare = TASKSET_MINIMUM_SHARES
var runningTasks = 0
val priority = taskSet.priority
val stageId = taskSet.stageId
val tasks = taskSet.tasks val tasks = taskSet.tasks
val numTasks = tasks.length val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks) val copiesRunning = new Array[Int](numTasks)
...@@ -55,6 +47,14 @@ private[spark] class TaskSetManager( ...@@ -55,6 +47,14 @@ private[spark] class TaskSetManager(
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0 var tasksFinished = 0
var weight = 1
var minShare = 0
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
var parent:Schedulable = null
// Last time when we launched a preferred task (for delay scheduling) // Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis var lastPreferredLaunchTime = System.currentTimeMillis
...@@ -198,7 +198,7 @@ private[spark] class TaskSetManager( ...@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
} }
// Respond to an offer of a single slave from the scheduler by finding a task // Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
...@@ -230,10 +230,11 @@ private[spark] class TaskSetManager( ...@@ -230,10 +230,11 @@ private[spark] class TaskSetManager(
val serializedTask = Task.serializeWithDependencies( val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser) task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = System.currentTimeMillis - startTime val timeTaken = System.currentTimeMillis - startTime
increaseRunningTasks(1)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format( logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken)) taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index) val taskName = "task %s:%d".format(taskSet.id, index)
return Some(new TaskDescription(taskId,taskSet.id,execId, taskName, serializedTask)) return Some(new TaskDescription(taskId, taskSet.id, execId, taskName, serializedTask))
} }
case _ => case _ =>
} }
...@@ -264,7 +265,7 @@ private[spark] class TaskSetManager( ...@@ -264,7 +265,7 @@ private[spark] class TaskSetManager(
} }
val index = info.index val index = info.index
info.markSuccessful() info.markSuccessful()
sched.taskFinished(this) decreaseRunningTasks(1)
if (!finished(index)) { if (!finished(index)) {
tasksFinished += 1 tasksFinished += 1
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format( logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
...@@ -293,7 +294,7 @@ private[spark] class TaskSetManager( ...@@ -293,7 +294,7 @@ private[spark] class TaskSetManager(
} }
val index = info.index val index = info.index
info.markFailed() info.markFailed()
sched.taskFinished(this) decreaseRunningTasks(1)
if (!finished(index)) { if (!finished(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index)) logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1 copiesRunning(index) -= 1
...@@ -308,6 +309,7 @@ private[spark] class TaskSetManager( ...@@ -308,6 +309,7 @@ private[spark] class TaskSetManager(
finished(index) = true finished(index) = true
tasksFinished += 1 tasksFinished += 1
sched.taskSetFinished(this) sched.taskSetFinished(this)
decreaseRunningTasks(runningTasks)
return return
case ef: ExceptionFailure => case ef: ExceptionFailure =>
...@@ -365,10 +367,38 @@ private[spark] class TaskSetManager( ...@@ -365,10 +367,38 @@ private[spark] class TaskSetManager(
causeOfFailure = message causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error // TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message) sched.listener.taskSetFailed(taskSet, message)
decreaseRunningTasks(runningTasks)
sched.taskSetFinished(this) sched.taskSetFinished(this)
} }
def executorLost(execId: String, hostname: String) { override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
//TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
//nothing
}
override def removeSchedulable(schedulable:Schedulable) {
//nothing
}
override def executorLost(execId: String, hostname: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id) logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
val newHostsAlive = sched.hostsAlive val newHostsAlive = sched.hostsAlive
// If some task has preferred locations only on hostname, and there are no more executors there, // If some task has preferred locations only on hostname, and there are no more executors there,
...@@ -409,7 +439,7 @@ private[spark] class TaskSetManager( ...@@ -409,7 +439,7 @@ private[spark] class TaskSetManager(
* TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that * TODO: To make this scale to large jobs, we need to maintain a list of running tasks, so that
* we don't scan the whole task set. It might also help to make this sorted by launch time. * we don't scan the whole task set. It might also help to make this sorted by launch time.
*/ */
def checkSpeculatableTasks(): Boolean = { override def checkSpeculatableTasks(): Boolean = {
// Can't speculate if we only have one task, or if all tasks have finished. // Can't speculate if we only have one task, or if all tasks have finished.
if (numTasks == 1 || tasksFinished == numTasks) { if (numTasks == 1 || tasksFinished == numTasks) {
return false return false
......
<allocations>
<pool name="1">
<minShare>2</minShare>
<weight>1</weight>
<schedulingMode>FIFO</schedulingMode>
</pool>
<pool name="2">
<minShare>3</minShare>
<weight>1</weight>
<schedulingMode>FIFO</schedulingMode>
</pool>
<pool name="3">
</pool>
</allocations>
package spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark._
import spark.scheduler._
import spark.scheduler.cluster._
import java.util.Properties
class DummyTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int)
extends Schedulable {
var parent: Schedulable = null
var weight = 1
var minShare = 2
var runningTasks = 0
var priority = initPriority
var stageId = initStageId
var name = "TaskSet_"+stageId
var numTasks = initNumTasks
var tasksFinished = 0
def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}
def addSchedulable(schedulable: Schedulable) {
}
def removeSchedulable(schedulable: Schedulable) {
}
def getSchedulableByName(name: String): Schedulable = {
return null
}
def executorLost(executorId: String, host: String): Unit = {
}
def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
if (tasksFinished + runningTasks < numTasks) {
increaseRunningTasks(1)
return Some(new TaskDescription(0, stageId.toString, execId, "task 0:0", null))
}
return None
}
def checkSpeculatableTasks(): Boolean = {
return true
}
def taskFinished() {
decreaseRunningTasks(1)
tasksFinished +=1
if (tasksFinished == numTasks) {
parent.removeSchedulable(this)
}
}
def abort() {
decreaseRunningTasks(runningTasks)
parent.removeSchedulable(this)
}
}
class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
def receiveOffer(rootPool: Pool) : Option[TaskDescription] = {
rootPool.receiveOffer("execId_1", "hostname_1", 1)
}
def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
receiveOffer(rootPool) match {
case Some(task) =>
assert(task.taskSetId.toInt === expectedTaskSetId)
case _ =>
}
}
test("FIFO Scheduler Test") {
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
val taskSetManager0 = new DummyTaskSetManager(0, 0, 2)
val taskSetManager1 = new DummyTaskSetManager(0, 1, 2)
val taskSetManager2 = new DummyTaskSetManager(0, 2, 2)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
checkTaskSetId(rootPool, 0)
receiveOffer(rootPool)
checkTaskSetId(rootPool, 1)
receiveOffer(rootPool)
taskSetManager1.abort()
checkTaskSetId(rootPool, 2)
}
test("Fair Scheduler Test") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName("default") != null)
assert(rootPool.getSchedulableByName("1") != null)
assert(rootPool.getSchedulableByName("2") != null)
assert(rootPool.getSchedulableByName("3") != null)
assert(rootPool.getSchedulableByName("1").minShare === 2)
assert(rootPool.getSchedulableByName("1").weight === 1)
assert(rootPool.getSchedulableByName("2").minShare === 3)
assert(rootPool.getSchedulableByName("2").weight === 1)
assert(rootPool.getSchedulableByName("3").minShare === 2)
assert(rootPool.getSchedulableByName("3").weight === 1)
val properties1 = new Properties()
properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
val taskSetManager10 = new DummyTaskSetManager(1, 0, 1)
val taskSetManager11 = new DummyTaskSetManager(1, 1, 1)
val taskSetManager12 = new DummyTaskSetManager(1, 2, 2)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
val taskSetManager23 = new DummyTaskSetManager(2, 3, 2)
val taskSetManager24 = new DummyTaskSetManager(2, 4, 2)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
checkTaskSetId(rootPool, 0)
checkTaskSetId(rootPool, 3)
checkTaskSetId(rootPool, 3)
checkTaskSetId(rootPool, 1)
checkTaskSetId(rootPool, 4)
checkTaskSetId(rootPool, 2)
checkTaskSetId(rootPool, 2)
checkTaskSetId(rootPool, 4)
taskSetManager12.taskFinished()
assert(rootPool.getSchedulableByName("1").runningTasks === 3)
taskSetManager24.abort()
assert(rootPool.getSchedulableByName("2").runningTasks === 2)
}
test("Nested Pool Test") {
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)
val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)
val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
val taskSetManager000 = new DummyTaskSetManager(0, 0, 5)
val taskSetManager001 = new DummyTaskSetManager(0, 1, 5)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
val taskSetManager010 = new DummyTaskSetManager(1, 2, 5)
val taskSetManager011 = new DummyTaskSetManager(1, 3, 5)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
val taskSetManager100 = new DummyTaskSetManager(2, 4, 5)
val taskSetManager101 = new DummyTaskSetManager(2, 5, 5)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
val taskSetManager110 = new DummyTaskSetManager(3, 6, 5)
val taskSetManager111 = new DummyTaskSetManager(3, 7, 5)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)
checkTaskSetId(rootPool, 0)
checkTaskSetId(rootPool, 4)
checkTaskSetId(rootPool, 6)
checkTaskSetId(rootPool, 2)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment