From 614ee16cc4c63260f13d0c7494fbaafa8a061e95 Mon Sep 17 00:00:00 2001
From: Andrew xia <junluan.xia@intel.com>
Date: Tue, 30 Jul 2013 10:57:26 +0800
Subject: [PATCH] refactor job ui with pool information

---
 core/src/main/scala/spark/SparkContext.scala  | 16 +++--
 .../scala/spark/scheduler/SparkListener.scala |  2 +-
 .../scheduler/cluster/ClusterScheduler.scala  | 12 ++--
 .../cluster/ClusterTaskSetManager.scala       |  7 +--
 .../spark/scheduler/cluster/Schedulable.scala |  2 +-
 .../cluster/SchedulableBuilder.scala          |  6 +-
 .../scheduler/cluster/SchedulingMode.scala    |  6 +-
 .../scheduler/cluster/TaskSetManager.scala    |  1 +
 .../scheduler/local/LocalScheduler.scala      |  5 +-
 .../scheduler/local/LocalTaskSetManager.scala |  7 +--
 .../scala/spark/ui/UIWorkloadGenerator.scala  | 35 ++++++++---
 .../main/scala/spark/ui/jobs/IndexPage.scala  | 28 +++------
 .../spark/ui/jobs/JobProgressListener.scala   | 62 +++++--------------
 .../scala/spark/ui/jobs/JobProgressUI.scala   |  7 +--
 .../main/scala/spark/ui/jobs/PoolPage.scala   | 17 ++---
 .../main/scala/spark/ui/jobs/PoolTable.scala  | 23 +++----
 .../main/scala/spark/ui/jobs/StageTable.scala | 16 ++---
 17 files changed, 116 insertions(+), 136 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index b5225d5681..375636071d 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -45,13 +45,13 @@ import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import spark.partial.{ApproximateEvaluator, PartialResult}
 import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
 import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable,
+SchedulingMode}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
 import ui.{SparkUI}
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -542,17 +542,25 @@ class SparkContext(
     env.blockManager.master.getStorageStatus
   }
 
-  def getPoolsInfo: ArrayBuffer[Schedulable] = {
+  /**
+   *  Return pools for fair scheduler
+   *  TODO:now, we have not taken nested pools into account
+   */
+  def getPools: ArrayBuffer[Schedulable] = {
     taskScheduler.rootPool.schedulableQueue
   }
 
-  def getSchedulingMode: SchedulingMode = {
+  /**
+   *  Return current scheduling mode
+   */
+  def getSchedulingMode: SchedulingMode.SchedulingMode = {
     taskScheduler.schedulingMode
   }
 
   def getPoolNameToPool: HashMap[String, Schedulable] = {
     taskScheduler.rootPool.schedulableNameToSchedulable
   }
+
   /**
    * Clear the job's list of files added by `addFile` so that they do not get downloaded to
    * any new nodes.
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 94fdad9b98..07372ee786 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -8,7 +8,7 @@ import spark.executor.TaskMetrics
 
 sealed trait SparkListenerEvents
 
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties = null) extends SparkListenerEvents
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties) extends SparkListenerEvents
 
 case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1b23fd6cef..74b3e43d2b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -98,8 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   var schedulableBuilder: SchedulableBuilder = null
   var rootPool: Pool = null
-  //default scheduler is FIFO
-  val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+  // default scheduler is FIFO
+  val schedulingMode: SchedulingMode = SchedulingMode.withName(
+    System.getProperty("spark.cluster.schedulingmode", "FIFO"))
 
   override def setListener(listener: TaskSchedulerListener) {
     this.listener = listener
@@ -107,7 +108,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   def initialize(context: SchedulerBackend) {
     backend = context
-    //temporarily set rootPool name to empty
+    // temporarily set rootPool name to empty
     rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
@@ -254,10 +255,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       }
       var launchedTask = false
       val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-      for (manager <- sortedTaskSetQueue)
-      {
+
+      for (manager <- sortedTaskSetQueue) {
         logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
       }
+
       for (manager <- sortedTaskSetQueue) {
 
         // Split offers based on node local, rack local and off-rack tasks.
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 7a6a6b7826..4d11b0959a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -91,7 +91,6 @@ private[spark] class ClusterTaskSetManager(
   var stageId = taskSet.stageId
   var name = "TaskSet_"+taskSet.stageId.toString
   var parent: Schedulable = null
-  var schedulableQueue :ArrayBuffer[Schedulable] = null
   // Last time when we launched a preferred task (for delay scheduling)
   var lastPreferredLaunchTime = System.currentTimeMillis
 
@@ -645,17 +644,17 @@ private[spark] class ClusterTaskSetManager(
     }
   }
 
-  //TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
+  // TODO: for now we just find Pool not TaskSetManager, we can extend this function in future if needed
   override def getSchedulableByName(name: String): Schedulable = {
     return null
   }
 
   override def addSchedulable(schedulable:Schedulable) {
-    //nothing
+    // nothing
   }
 
   override def removeSchedulable(schedulable:Schedulable) {
-    //nothing
+    // nothing
   }
 
   override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index 2e4f14c11f..c410af8af4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -9,7 +9,7 @@ import scala.collection.mutable.ArrayBuffer
  */
 private[spark] trait Schedulable {
   var parent: Schedulable
-  //childrens
+  // child queues
   def schedulableQueue: ArrayBuffer[Schedulable]
   def schedulingMode: SchedulingMode
   def weight: Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index 18cc15c2a5..a2fa80aa36 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -27,7 +27,7 @@ private[spark] trait SchedulableBuilder {
 private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
 
   override def buildPools() {
-    //nothing
+    // nothing
   }
 
   override def addTaskSetManager(manager: Schedulable, properties: Properties) {
@@ -86,7 +86,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
       }
     }
 
-    //finally create "default" pool
+    // finally create "default" pool
     if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
       val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
       rootPool.addSchedulable(pool)
@@ -102,7 +102,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
       poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
       parentPool = rootPool.getSchedulableByName(poolName)
       if (parentPool == null) {
-        //we will create a new pool that user has configured in app instead of being defined in xml file
+        // we will create a new pool that user has configured in app instead of being defined in xml file
         parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
         rootPool.addSchedulable(parentPool)
         logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index c5c7ee3b22..a7f0f6f393 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -1,6 +1,10 @@
 package spark.scheduler.cluster
 
-object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE"){
+/**
+ *  "FAIR" and "FIFO" determines which policy is used to order tasks amongst a Schedulable's sub-queues
+ *  "NONE" is used when the a Schedulable has no sub-queues.
+ */
+object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
 
   type SchedulingMode = Value
   val FAIR,FIFO,NONE = Value
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 472e01b227..4e6bc51278 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -6,6 +6,7 @@ import spark.TaskState.TaskState
 import java.nio.ByteBuffer
 
 private[spark] trait TaskSetManager extends Schedulable {
+  def schedulableQueue = null
   def schedulingMode = SchedulingMode.NONE
   def taskSet: TaskSet
   def slaveOffer(execId: String, hostPort: String, availableCpus: Double,
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 19a48895e3..f4411582f1 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -64,7 +64,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
 
   var schedulableBuilder: SchedulableBuilder = null
   var rootPool: Pool = null
-  val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO"))
+  val schedulingMode: SchedulingMode = SchedulingMode.withName(
+    System.getProperty("spark.cluster.schedulingmode", "FIFO"))
   val activeTaskSets = new HashMap[String, TaskSetManager]
   val taskIdToTaskSetId = new HashMap[Long, String]
   val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -72,7 +73,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
   var localActor: ActorRef = null
 
   override def start() {
-    //temporarily set rootPool name to empty
+    // temporarily set rootPool name to empty
     rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 8954f40ea9..cc27f1ecca 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -14,7 +14,6 @@ import spark.scheduler.cluster._
 
 private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging {
   var parent: Schedulable = null
-  var schedulableQueue :ArrayBuffer[Schedulable] = null
   var weight: Int = 1
   var minShare: Int = 0
   var runningTasks: Int = 0
@@ -48,11 +47,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   def addSchedulable(schedulable: Schedulable): Unit = {
-    //nothing
+    // nothing
   }
 
   def removeSchedulable(schedulable: Schedulable): Unit = {
-    //nothing
+    // nothing
   }
 
   def getSchedulableByName(name: String): Schedulable = {
@@ -60,7 +59,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   def executorLost(executorId: String, host: String): Unit = {
-    //nothing
+    // nothing
   }
 
   def checkSpeculatableTasks(): Boolean = {
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index 8bbc6ce88e..840ac9773e 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -4,7 +4,8 @@ import scala.util.Random
 
 import spark.SparkContext
 import spark.SparkContext._
-
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 /**
  * Continuously generates jobs that expose various features of the WebUI (internal testing tool).
  *
@@ -15,8 +16,17 @@ private[spark] object UIWorkloadGenerator {
   val INTER_JOB_WAIT_MS = 500
 
   def main(args: Array[String]) {
+    if (args.length < 2) {
+      println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+      System.exit(1)
+    }
     val master = args(0)
+    val schedulingMode = SchedulingMode.withName(args(1))
     val appName = "Spark UI Tester"
+
+    if (schedulingMode == SchedulingMode.FAIR) {
+      System.setProperty("spark.cluster.schedulingmode", "FAIR")
+    }
     val sc = new SparkContext(master, appName)
 
     // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
@@ -56,14 +66,21 @@ private[spark] object UIWorkloadGenerator {
 
     while (true) {
       for ((desc, job) <- jobs) {
-        try {
-          setName(desc)
-          job()
-          println("Job funished: " + desc)
-        } catch {
-          case e: Exception =>
-            println("Job Failed: " + desc)
-        }
+        new Thread {
+          override def run() {
+            if(schedulingMode == SchedulingMode.FAIR) {
+              sc.addLocalProperties("spark.scheduler.cluster.fair.pool",desc)
+            }
+            try {
+              setName(desc)
+              job()
+              println("Job funished: " + desc)
+            } catch {
+              case e: Exception =>
+                println("Job Failed: " + desc)
+            }
+          }
+        }.start
         Thread.sleep(INTER_JOB_WAIT_MS)
       }
     }
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index e765cecb01..abef683791 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -4,38 +4,30 @@ import java.util.Date
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.Some
-import scala.xml.{NodeSeq, Node}
 import scala.collection.mutable.HashMap
 import scala.collection.mutable.HashSet
+import scala.Some
+import scala.xml.{NodeSeq, Node}
 
 import spark.scheduler.Stage
+import spark.storage.StorageLevel
 import spark.ui.UIUtils._
 import spark.ui.Page._
-import spark.storage.StorageLevel
-import spark.scheduler.cluster.Schedulable
-import spark.scheduler.cluster.SchedulingMode
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
 /** Page showing list of all ongoing and recently finished stages and pools*/
 private[spark] class IndexPage(parent: JobProgressUI) {
   def listener = parent.listener
 
-  def stageTable: StageTable = parent.stageTable
-
-  def poolTable: PoolTable = parent.poolTable
-
   def render(request: HttpServletRequest): Seq[Node] = {
     val activeStages = listener.activeStages.toSeq
     val completedStages = listener.completedStages.reverse.toSeq
     val failedStages = listener.failedStages.reverse.toSeq
 
-    stageTable.setStagePoolInfo(parent.stagePoolInfo)
-    poolTable.setPoolSource(parent.stagePagePoolSource)
+    val activeStagesTable = new StageTable(activeStages, parent)
+    val completedStagesTable = new StageTable(completedStages, parent)
+    val failedStagesTable = new StageTable(failedStages, parent)
 
-    val activeStageNodeSeq = stageTable.toNodeSeq(activeStages)
-    val completedStageNodeSeq = stageTable.toNodeSeq(completedStages)
-    val failedStageNodeSeq = stageTable.toNodeSeq(failedStages)
+    val poolTable = new PoolTable(parent.stagePagePoolSource, listener)
 
     val content = <div class="row">
                     <div class="span12">
@@ -48,9 +40,9 @@ private[spark] class IndexPage(parent: JobProgressUI) {
                     </div>
                   </div> ++
                   <h3>Pools </h3> ++ poolTable.toNodeSeq ++
-                  <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq++
-                  <h3>Completed Stages : {completedStages.size}</h3> ++ completedStageNodeSeq++
-                  <h3>Failed Stages : {failedStages.size}</h3> ++ failedStageNodeSeq
+                  <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq++
+                  <h3>Completed Stages : {completedStages.size}</h3> ++ completedStagesTable.toNodeSeq++
+                  <h3>Failed Stages : {failedStages.size}</h3> ++ failedStagesTable.toNodeSeq
 
     headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs)
   }
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
index 1244f9538b..d4767bea22 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -9,53 +9,13 @@ import spark.scheduler.cluster.TaskInfo
 import spark.executor.TaskMetrics
 import collection.mutable
 
-private[spark] class FairJobProgressListener(val sparkContext: SparkContext)
-  extends JobProgressListener(sparkContext) {
-
-  val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
-  val DEFAULT_POOL_NAME = "default"
-
-  override val stageToPool = HashMap[Stage, String]()
-  override val poolToActiveStages = HashMap[String, HashSet[Stage]]()
-
-  override def onStageCompleted(stageCompleted: StageCompleted) = {
-    super.onStageCompleted(stageCompleted)
-    val stage = stageCompleted.stageInfo.stage
-    poolToActiveStages(stageToPool(stage)) -= stage
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
-    super.onStageSubmitted(stageSubmitted)
-    val stage = stageSubmitted.stage
-    var poolName = DEFAULT_POOL_NAME
-    if (stageSubmitted.properties != null) {
-      poolName = stageSubmitted.properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
-    }
-    stageToPool(stage) = poolName
-    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
-    stages += stage
-  }
-  
-  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
-    super.onJobEnd(jobEnd)
-    jobEnd match {
-      case end: SparkListenerJobEnd =>
-        end.jobResult match {
-          case JobFailed(ex, Some(stage)) =>
-            poolToActiveStages(stageToPool(stage)) -= stage
-          case _ =>
-        }
-      case _ =>
-    }
-  }
-}
-
 private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
   // How many stages to remember
   val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+  val DEFAULT_POOL_NAME = "default"
 
-  def stageToPool: HashMap[Stage, String] = null
-  def poolToActiveStages: HashMap[String, HashSet[Stage]] =null
+  val stageToPool = new HashMap[Stage, String]()
+  val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
 
   val activeStages = HashSet[Stage]()
   val completedStages = ListBuffer[Stage]()
@@ -70,6 +30,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
 
   override def onStageCompleted(stageCompleted: StageCompleted) = {
     val stage = stageCompleted.stageInfo.stage
+    poolToActiveStages(stageToPool(stage)) -= stage
     activeStages -= stage
     completedStages += stage
     trimIfNecessary(completedStages)
@@ -86,8 +47,18 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
     }
   }
 
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
-    activeStages += stageSubmitted.stage
+  /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
+    val stage = stageSubmitted.stage
+    activeStages += stage
+    var poolName = DEFAULT_POOL_NAME
+    if (stageSubmitted.properties != null) {
+      poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+    }
+    stageToPool(stage) = poolName
+    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+    stages += stage
+  }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
     val sid = taskEnd.task.stageId
@@ -112,6 +83,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
         end.jobResult match {
           case JobFailed(ex, Some(stage)) =>
             activeStages -= stage
+            poolToActiveStages(stageToPool(stage)) -= stage
             failedStages += stage
             trimIfNecessary(failedStages)
           case _ =>
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index e610252242..5703b146df 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -29,26 +29,21 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
   private val stagePage = new StagePage(this)
   private val poolPage = new PoolPage(this)
 
-  var stageTable: StageTable = null
   var stagePoolInfo: StagePoolInfo = null
-  var poolTable: PoolTable = null
   var stagePagePoolSource: PoolSource = null
 
   def start() {
+    _listener = Some(new JobProgressListener(sc))
     sc.getSchedulingMode match {
       case SchedulingMode.FIFO =>
-        _listener = Some(new JobProgressListener(sc))
         stagePoolInfo = new FIFOStagePoolInfo()
         stagePagePoolSource = new FIFOSource()
       case SchedulingMode.FAIR =>
-        _listener = Some(new FairJobProgressListener(sc))
         stagePoolInfo = new FairStagePoolInfo(listener)
         stagePagePoolSource = new FairSource(sc)
     }
 
     sc.addSparkListener(listener)
-    stageTable = new StageTable(dateFmt, formatDuration, listener)
-    poolTable = new PoolTable(listener)
   }
 
   def formatDuration(ms: Long) = Utils.msDurationToString(ms)
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
index 00703887c3..37d4f8fa6b 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -9,30 +9,23 @@ import spark.scheduler.Stage
 import spark.ui.UIUtils._
 import spark.ui.Page._
 
-/** Page showing specific pool details*/
+/** Page showing specific pool details */
 private[spark] class PoolPage(parent: JobProgressUI) {
   def listener = parent.listener
 
-  def stageTable: StageTable = parent.stageTable
-
-  def poolTable: PoolTable = parent.poolTable
-
   def render(request: HttpServletRequest): Seq[Node] = {
     val poolName = request.getParameter("poolname")
     val poolToActiveStages = listener.poolToActiveStages
     val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
-    val stageToPool = listener.stageToPool
 
     val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName)
-    poolTable.setPoolSource(poolDetailPoolSource)
+    val poolTable = new PoolTable(poolDetailPoolSource, listener)
 
-    stageTable.setStagePoolInfo(parent.stagePoolInfo)
+    val activeStagesTable = new StageTable(activeStages, parent)
 
-    val activeStageNodeSeq = stageTable.toNodeSeq(activeStages)
+    val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
+                  <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
 
-    val content = <h3>Pool </h3> ++ poolTable.toNodeSeq ++
-                  <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq
-    
     headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
   }
 }
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
index bb8be4b26e..8788ed8bc1 100644
--- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -27,7 +27,7 @@ private[spark] trait PoolSource {
 /*
  * Pool source for FIFO scheduler algorithm on Index page
  */
-private[spark] class FIFOSource() extends PoolSource{
+private[spark] class FIFOSource() extends PoolSource {
   def getPools: Seq[Schedulable] = {
     Seq[Schedulable]()
   }
@@ -36,16 +36,16 @@ private[spark] class FIFOSource() extends PoolSource{
 /*
  * Pool source for Fair scheduler algorithm on Index page
  */
-private[spark] class FairSource(sc: SparkContext) extends PoolSource{
+private[spark] class FairSource(sc: SparkContext) extends PoolSource {
   def getPools: Seq[Schedulable] = {
-    sc.getPoolsInfo.toSeq
+    sc.getPools.toSeq
   }
 }
 
 /*
  * specific pool info for pool detail page
  */
-private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource{
+private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource {
   def getPools: Seq[Schedulable] = {
     val pools = HashSet[Schedulable]()
     pools += sc.getPoolNameToPool(poolName)
@@ -54,21 +54,18 @@ private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extend
 }
 
 /** Table showing list of pools */
-private[spark] class PoolTable(listener: JobProgressListener) {
+private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) {
 
-  var poolSource: PoolSource = null
   var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
 
-  def toNodeSeq: Seq[Node] = {
+  def toNodeSeq(): Seq[Node] = {
     poolTable(poolRow, poolSource.getPools)
   }
 
-  def setPoolSource(poolSource: PoolSource) {
-    this.poolSource = poolSource
-  }
-
-  //pool tables
-  def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], rows: Seq[Schedulable]): Seq[Node] = {
+  // pool tables
+  def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
+    rows: Seq[Schedulable]
+    ): Seq[Node] = {
     <table class="table table-bordered table-striped table-condensed sortable">
       <thead>
         <th>Pool Name</th>
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 83e566c55b..82fb0bd5cc 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -44,18 +44,18 @@ private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends St
 }
 
 /** Page showing list of all ongoing and recently finished stages */
-private[spark] class StageTable(val dateFmt: SimpleDateFormat, val formatDuration: Long => String, val listener: JobProgressListener) {
+private[spark] class StageTable(
+  val stages: Seq[Stage],
+  val parent: JobProgressUI) {
 
-  var stagePoolInfo: StagePoolInfo = null
+  val listener = parent.listener
+  val dateFmt = parent.dateFmt
+  var stagePoolInfo = parent.stagePoolInfo
   
-  def toNodeSeq(stages: Seq[Stage]): Seq[Node] = {
+  def toNodeSeq(): Seq[Node] = {
     stageTable(stageRow, stages)
   }
 
-  def setStagePoolInfo(stagePoolInfo: StagePoolInfo) {
-    this.stagePoolInfo = stagePoolInfo
-  }
-
   /** Special table which merges two header cells. */
   def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
      <table class="table table-bordered table-striped table-condensed sortable">
@@ -77,7 +77,7 @@ private[spark] class StageTable(val dateFmt: SimpleDateFormat, val formatDuratio
 
   def getElapsedTime(submitted: Option[Long], completed: Long): String = {
     submitted match {
-      case Some(t) => formatDuration(completed - t)
+      case Some(t) => parent.formatDuration(completed - t)
       case _ => "Unknown"
     }
   }
-- 
GitLab