Skip to content
Snippets Groups Projects
Commit d19753b9 authored by Andrew xia's avatar Andrew xia
Browse files

expose TaskSetManager type to resourceOffer function in ClusterScheduler

parent c6e2770b
No related branches found
No related tags found
No related merge requests found
......@@ -165,24 +165,24 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
var launchedTask = false
val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable()
for (schedulable <- sortedLeafSchedulable)
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
for (manager <- sortedTaskSetQueue)
{
logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks))
logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
}
for (schedulable <- sortedLeafSchedulable) {
for (manager <- sortedTaskSetQueue) {
do {
launchedTask = false
for (i <- 0 until offers.size) {
var launchedTask = true
val execId = offers(i).executorId
val host = offers(i).hostname
schedulable.slaveOffer(execId,host,availableCpus(i)) match {
manager.slaveOffer(execId,host,availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = task.taskSetId
taskSetTaskIds(task.taskSetId) += tid
taskIdToTaskSetId(tid) = manager.taskSet.id
taskSetTaskIds(manager.taskSet.id) += tid
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
......
......@@ -75,17 +75,13 @@ private[spark] class Pool(
return shouldRevive
}
override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
return None
}
override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
var leafSchedulableQueue = new ArrayBuffer[Schedulable]
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable()
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
}
return leafSchedulableQueue
return sortedTaskSetQueue
}
override def increaseRunningTasks(taskNum: Int) {
......
......@@ -21,7 +21,6 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String): Unit
def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
def getSortedLeafSchedulable(): ArrayBuffer[Schedulable]
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
}
......@@ -5,7 +5,6 @@ import spark.util.SerializableBuffer
private[spark] class TaskDescription(
val taskId: Long,
val taskSetId: String,
val executorId: String,
val name: String,
_serializedTask: ByteBuffer)
......
......@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
}
// Respond to an offer of a single slave from the scheduler by finding a task
override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
......@@ -234,7 +234,7 @@ private[spark] class TaskSetManager(
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
return Some(new TaskDescription(taskId, taskSet.id, execId, taskName, serializedTask))
return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
}
case _ =>
}
......@@ -398,10 +398,10 @@ private[spark] class TaskSetManager(
//nothing
}
override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
var leafSchedulableQueue = new ArrayBuffer[Schedulable]
leafSchedulableQueue += this
return leafSchedulableQueue
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
override def executorLost(execId: String, hostname: String) {
......
......@@ -13,18 +13,20 @@ import java.util.Properties
class DummyTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int)
extends Schedulable {
var parent: Schedulable = null
var weight = 1
var minShare = 2
var runningTasks = 0
var priority = initPriority
var stageId = initStageId
var name = "TaskSet_"+stageId
var numTasks = initNumTasks
var tasksFinished = 0
initNumTasks: Int,
clusterScheduler: ClusterScheduler,
taskSet: TaskSet)
extends TaskSetManager(clusterScheduler,taskSet) {
parent = null
weight = 1
minShare = 2
runningTasks = 0
priority = initPriority
stageId = initStageId
name = "TaskSet_"+stageId
override val numTasks = initNumTasks
tasksFinished = 0
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
......@@ -41,11 +43,11 @@ class DummyTaskSetManager(
}
override def addSchedulable(schedulable: Schedulable) {
}
}
override def removeSchedulable(schedulable: Schedulable) {
}
override def getSchedulableByName(name: String): Schedulable = {
return null
}
......@@ -65,11 +67,11 @@ class DummyTaskSetManager(
return true
}
override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
var leafSchedulableQueue = new ArrayBuffer[Schedulable]
leafSchedulableQueue += this
return leafSchedulableQueue
}
// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
// var leafSchedulableQueue = new ArrayBuffer[Schedulable]
// leafSchedulableQueue += this
// return leafSchedulableQueue
// }
def taskFinished() {
decreaseRunningTasks(1)
......@@ -85,10 +87,28 @@ class DummyTaskSetManager(
}
}
class DummyTask(stageId: Int) extends Task[Int](stageId)
{
def run(attemptId: Long): Int = {
return 0
}
}
class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new DummyTask(0)
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
tasks += task
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet)
}
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedLeafSchedulable()
val taskSetQueue = rootPool.getSortedTaskSetQueue()
for (taskSet <- taskSetQueue)
{
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
......@@ -109,13 +129,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
val taskSetManager0 = new DummyTaskSetManager(0, 0, 2)
val taskSetManager1 = new DummyTaskSetManager(0, 1, 2)
val taskSetManager2 = new DummyTaskSetManager(0, 2, 2)
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
checkTaskSetId(rootPool, 0)
resourceOffer(rootPool)
checkTaskSetId(rootPool, 1)
......@@ -130,7 +150,7 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName("default") != null)
assert(rootPool.getSchedulableByName("1") != null)
assert(rootPool.getSchedulableByName("2") != null)
......@@ -146,16 +166,16 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
val taskSetManager10 = new DummyTaskSetManager(1, 0, 1)
val taskSetManager11 = new DummyTaskSetManager(1, 1, 1)
val taskSetManager12 = new DummyTaskSetManager(1, 2, 2)
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
val taskSetManager23 = new DummyTaskSetManager(2, 3, 2)
val taskSetManager24 = new DummyTaskSetManager(2, 4, 2)
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
......@@ -190,27 +210,27 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
val taskSetManager000 = new DummyTaskSetManager(0, 0, 5)
val taskSetManager001 = new DummyTaskSetManager(0, 1, 5)
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
val taskSetManager010 = new DummyTaskSetManager(1, 2, 5)
val taskSetManager011 = new DummyTaskSetManager(1, 3, 5)
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
val taskSetManager100 = new DummyTaskSetManager(2, 4, 5)
val taskSetManager101 = new DummyTaskSetManager(2, 5, 5)
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
val taskSetManager110 = new DummyTaskSetManager(3, 6, 5)
val taskSetManager111 = new DummyTaskSetManager(3, 7, 5)
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)
checkTaskSetId(rootPool, 0)
checkTaskSetId(rootPool, 4)
checkTaskSetId(rootPool, 6)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment