diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 4caafcc1d34d036b75aa41a926a3400483dc2057..e6399a3547255e9cde652c7770978a01521a07d0 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -165,24 +165,24 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
       val availableCpus = offers.map(o => o.cores).toArray
       var launchedTask = false
-      val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable()
-      for (schedulable <- sortedLeafSchedulable)
+      val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
+      for (manager <- sortedTaskSetQueue)
       {
-        logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks))
+        logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
       }
-      for (schedulable <- sortedLeafSchedulable) {
+      for (manager <- sortedTaskSetQueue) {
         do {
           launchedTask = false
           for (i <- 0 until offers.size) {
             var launchedTask = true
             val execId = offers(i).executorId
             val host = offers(i).hostname
-            schedulable.slaveOffer(execId,host,availableCpus(i)) match {
+            manager.slaveOffer(execId,host,availableCpus(i)) match {
               case Some(task) =>
                 tasks(i) += task
                 val tid = task.taskId
-                taskIdToTaskSetId(tid) = task.taskSetId
-                taskSetTaskIds(task.taskSetId) += tid
+                taskIdToTaskSetId(tid) = manager.taskSet.id
+                taskSetTaskIds(manager.taskSet.id) += tid
                 taskIdToExecutorId(tid) = execId
                 activeExecutorIds += execId
                 executorsByHost(host) += execId
diff --git a/core/src/main/scala/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
index ae603e7dd96d6cbd5d0a5da16dcfb6bdfb85d9a6..4dc15f413c6606cd469c4be0eb4c8786323005c6 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Pool.scala
@@ -75,17 +75,13 @@ private[spark] class Pool(
     return shouldRevive
   }
 
-  override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
-    return None
-  }
-
-  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
-    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
     val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
     for (schedulable <- sortedSchedulableQueue) {
-      leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable()
+      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
     }
-    return leafSchedulableQueue
+    return sortedTaskSetQueue
   }
 
   override def increaseRunningTasks(taskNum: Int) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index c620588e14972e9d3f5e88d371f3ed532b57e831..6bb7525b49735f6a43ddcf4751964f12fc342b14 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -21,7 +21,6 @@ private[spark] trait Schedulable {
   def removeSchedulable(schedulable: Schedulable): Unit
   def getSchedulableByName(name: String): Schedulable
   def executorLost(executorId: String, host: String): Unit
-  def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
   def checkSpeculatableTasks(): Boolean
-  def getSortedLeafSchedulable(): ArrayBuffer[Schedulable]
+  def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index cdd004c94bae054b7854c0df6db957ad7a178c82..b41e951be99f84c68b800a1cf7a46636d3ced3f8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -5,7 +5,6 @@ import spark.util.SerializableBuffer
 
 private[spark] class TaskDescription(
     val taskId: Long,
-    val taskSetId: String,
     val executorId: String,
     val name: String,
     _serializedTask: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 80edbe77a13ad0b49fff13fc73d4711a3f9beb65..b9d2dbf4874549403290bc5c892213e869ec60bc 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
   }
 
   // Respond to an offer of a single slave from the scheduler by finding a task
-  override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+  def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val time = System.currentTimeMillis
       val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -234,7 +234,7 @@ private[spark] class TaskSetManager(
           logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
             taskSet.id, index, serializedTask.limit, timeTaken))
           val taskName = "task %s:%d".format(taskSet.id, index)
-          return Some(new TaskDescription(taskId, taskSet.id, execId, taskName, serializedTask))
+          return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
         }
         case _ =>
       }
@@ -398,10 +398,10 @@ private[spark] class TaskSetManager(
     //nothing
   }
 
-  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
-    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
-    leafSchedulableQueue += this
-    return leafSchedulableQueue
+  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+    sortedTaskSetQueue += this
+    return sortedTaskSetQueue
   }
 
   override def executorLost(execId: String, hostname: String) {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index 8426be7575a5157d29ba70ca6d2a31d3e80d4620..956cc7421c5aac83f100521d851772469a26f0a7 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -13,18 +13,20 @@ import java.util.Properties
 class DummyTaskSetManager(
     initPriority: Int,
     initStageId: Int,
-    initNumTasks: Int) 
-  extends Schedulable {
-  
-  var parent: Schedulable = null
-  var weight = 1
-  var minShare = 2
-  var runningTasks = 0
-  var priority = initPriority
-  var stageId = initStageId
-  var name = "TaskSet_"+stageId
-  var numTasks = initNumTasks
-  var tasksFinished = 0
+    initNumTasks: Int,
+    clusterScheduler: ClusterScheduler,
+    taskSet: TaskSet)
+  extends TaskSetManager(clusterScheduler,taskSet) {
+
+  parent = null
+  weight = 1
+  minShare = 2
+  runningTasks = 0
+  priority = initPriority
+  stageId = initStageId
+  name = "TaskSet_"+stageId
+  override val numTasks = initNumTasks
+  tasksFinished = 0
 
   override def increaseRunningTasks(taskNum: Int) {
     runningTasks += taskNum
@@ -41,11 +43,11 @@ class DummyTaskSetManager(
   }
 
   override def addSchedulable(schedulable: Schedulable) {
-  } 
-  
+  }
+
   override def removeSchedulable(schedulable: Schedulable) {
   }
-  
+
   override def getSchedulableByName(name: String): Schedulable = {
     return null
   }
@@ -65,11 +67,11 @@ class DummyTaskSetManager(
     return true
   }
 
-  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
-    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
-    leafSchedulableQueue += this
-    return leafSchedulableQueue
-  }
+//  override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
+//    var leafSchedulableQueue = new ArrayBuffer[Schedulable]
+//    leafSchedulableQueue += this
+//    return leafSchedulableQueue
+//  }
 
   def taskFinished() {
     decreaseRunningTasks(1)
@@ -85,10 +87,28 @@ class DummyTaskSetManager(
   }
 }
 
+class DummyTask(stageId: Int) extends Task[Int](stageId)
+{
+  def run(attemptId: Long): Int = {
+    return 0
+  }
+}
+
 class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
-  
+
+  val sc = new SparkContext("local", "ClusterSchedulerSuite")
+  val clusterScheduler = new ClusterScheduler(sc)
+  var tasks = ArrayBuffer[Task[_]]()
+  val task = new DummyTask(0)
+  val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
+  tasks += task
+
+  def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = {
+    new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet)
+  }
+
   def resourceOffer(rootPool: Pool): Int = {
-    val taskSetQueue = rootPool.getSortedLeafSchedulable()
+    val taskSetQueue = rootPool.getSortedTaskSetQueue()
     for (taskSet <- taskSetQueue)
     {
       taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
@@ -109,13 +129,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
     schedulableBuilder.buildPools()
 
-    val taskSetManager0 = new DummyTaskSetManager(0, 0, 2)
-    val taskSetManager1 = new DummyTaskSetManager(0, 1, 2)
-    val taskSetManager2 = new DummyTaskSetManager(0, 2, 2)
+    val taskSetManager0 = createDummyTaskSetManager(0, 0, 2)
+    val taskSetManager1 = createDummyTaskSetManager(0, 1, 2)
+    val taskSetManager2 = createDummyTaskSetManager(0, 2, 2)
     schedulableBuilder.addTaskSetManager(taskSetManager0, null)
     schedulableBuilder.addTaskSetManager(taskSetManager1, null)
     schedulableBuilder.addTaskSetManager(taskSetManager2, null)
-    
+
     checkTaskSetId(rootPool, 0)
     resourceOffer(rootPool)
     checkTaskSetId(rootPool, 1)
@@ -130,7 +150,7 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
     val schedulableBuilder = new FairSchedulableBuilder(rootPool)
     schedulableBuilder.buildPools()
-    
+
     assert(rootPool.getSchedulableByName("default") != null)
     assert(rootPool.getSchedulableByName("1") != null)
     assert(rootPool.getSchedulableByName("2") != null)
@@ -146,16 +166,16 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     properties1.setProperty("spark.scheduler.cluster.fair.pool","1")
     val properties2 = new Properties()
     properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
-    
-    val taskSetManager10 = new DummyTaskSetManager(1, 0, 1)
-    val taskSetManager11 = new DummyTaskSetManager(1, 1, 1)
-    val taskSetManager12 = new DummyTaskSetManager(1, 2, 2)
+
+    val taskSetManager10 = createDummyTaskSetManager(1, 0, 1)
+    val taskSetManager11 = createDummyTaskSetManager(1, 1, 1)
+    val taskSetManager12 = createDummyTaskSetManager(1, 2, 2)
     schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
     schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
     schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
-    
-    val taskSetManager23 = new DummyTaskSetManager(2, 3, 2)
-    val taskSetManager24 = new DummyTaskSetManager(2, 4, 2)
+
+    val taskSetManager23 = createDummyTaskSetManager(2, 3, 2)
+    val taskSetManager24 = createDummyTaskSetManager(2, 4, 2)
     schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
     schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
 
@@ -190,27 +210,27 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
     val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
     pool1.addSchedulable(pool10)
     pool1.addSchedulable(pool11)
-    
-    val taskSetManager000 = new DummyTaskSetManager(0, 0, 5)
-    val taskSetManager001 = new DummyTaskSetManager(0, 1, 5)
+
+    val taskSetManager000 = createDummyTaskSetManager(0, 0, 5)
+    val taskSetManager001 = createDummyTaskSetManager(0, 1, 5)
     pool00.addSchedulable(taskSetManager000)
     pool00.addSchedulable(taskSetManager001)
-    
-    val taskSetManager010 = new DummyTaskSetManager(1, 2, 5)
-    val taskSetManager011 = new DummyTaskSetManager(1, 3, 5)
+
+    val taskSetManager010 = createDummyTaskSetManager(1, 2, 5)
+    val taskSetManager011 = createDummyTaskSetManager(1, 3, 5)
     pool01.addSchedulable(taskSetManager010)
     pool01.addSchedulable(taskSetManager011)
-  
-    val taskSetManager100 = new DummyTaskSetManager(2, 4, 5)
-    val taskSetManager101 = new DummyTaskSetManager(2, 5, 5)
+
+    val taskSetManager100 = createDummyTaskSetManager(2, 4, 5)
+    val taskSetManager101 = createDummyTaskSetManager(2, 5, 5)
     pool10.addSchedulable(taskSetManager100)
     pool10.addSchedulable(taskSetManager101)
 
-    val taskSetManager110 = new DummyTaskSetManager(3, 6, 5)
-    val taskSetManager111 = new DummyTaskSetManager(3, 7, 5)
+    val taskSetManager110 = createDummyTaskSetManager(3, 6, 5)
+    val taskSetManager111 = createDummyTaskSetManager(3, 7, 5)
     pool11.addSchedulable(taskSetManager110)
     pool11.addSchedulable(taskSetManager111)
-    
+
     checkTaskSetId(rootPool, 0)
     checkTaskSetId(rootPool, 4)
     checkTaskSetId(rootPool, 6)