diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 0d1f9fa8d4f125ee6d03a9e8e0a821220ae9f5b3..97e1aaf49ec6ae506f5444c3ca872f1dc4e654fd 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -579,12 +579,19 @@ class SparkContext( /** * Return pools for fair scheduler - * TODO(xiajunluan):now, we have not taken nested pools into account + * TODO(xiajunluan): We should take nested pools into account */ - def getPools: ArrayBuffer[Schedulable] = { + def getAllPools: ArrayBuffer[Schedulable] = { taskScheduler.rootPool.schedulableQueue } + /** + * Return the pool associated with the given name, if one exists + */ + def getPoolForName(pool: String): Option[Schedulable] = { + taskScheduler.rootPool.schedulableNameToSchedulable.get(pool) + } + /** * Return current scheduling mode */ @@ -592,10 +599,6 @@ class SparkContext( taskScheduler.schedulingMode } - def getPoolNameToPool: HashMap[String, Schedulable] = { - taskScheduler.rootPool.schedulableNameToSchedulable - } - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala index 4fbb503e5ce7573a03644d9c88bbb43021ab398f..3ac35085eb92b63dea36492ed82cfabb38fe572c 100644 --- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -30,9 +30,7 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode */ private[spark] object UIWorkloadGenerator { val NUM_PARTITIONS = 100 - val INTER_JOB_WAIT_MS = 500 - - + val INTER_JOB_WAIT_MS = 5000 def main(args: Array[String]) { if (args.length < 2) { diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 4ad787565df86917c4516c0f2ab3bb887c22ac85..b0d057afa16d773075a71b2660237b3936f96a2b 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -17,16 +17,11 @@ package spark.ui.jobs -import java.util.Date - import javax.servlet.http.HttpServletRequest -import scala.Some import scala.xml.{NodeSeq, Node} -import spark.scheduler.cluster.TaskInfo -import spark.scheduler.Stage -import spark.storage.StorageLevel +import spark.scheduler.cluster.SchedulingMode import spark.ui.Page._ import spark.ui.UIUtils._ import spark.Utils @@ -50,7 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) { val completedStagesTable = new StageTable(completedStages, parent) val failedStagesTable = new StageTable(failedStages, parent) - val poolTable = new PoolTable(parent.stagePagePoolSource, listener) + val poolTable = new PoolTable(listener.sc.getAllPools, listener) val summary: NodeSeq = <div> <ul class="unstyled"> @@ -79,13 +74,17 @@ private[spark] class IndexPage(parent: JobProgressUI) { </div> val content = summary ++ - <h3>Pools </h3> ++ poolTable.toNodeSeq ++ - <h3>Active Stages : {activeStages.size}</h3> ++ - activeStagesTable.toNodeSeq++ - <h3>Completed Stages : {completedStages.size}</h3> ++ - completedStagesTable.toNodeSeq++ - <h3>Failed Stages : {failedStages.size}</h3> ++ - failedStagesTable.toNodeSeq + {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) { + <h3>Pools</h3> ++ poolTable.toNodeSeq + } else { + Seq() + }} ++ + <h3>Active Stages : {activeStages.size}</h3> ++ + activeStagesTable.toNodeSeq++ + <h3>Completed Stages : {completedStages.size}</h3> ++ + completedStagesTable.toNodeSeq++ + <h3>Failed Stages : {failedStages.size}</h3> ++ + failedStagesTable.toNodeSeq headerSparkPage(content, parent.sc, "Spark Stages", Jobs) } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 3832c5d33c631089fc3929e4a8d661ed9b5efd43..c83f102ff32ff024dc1c776741209176bdeaf1f3 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -41,25 +41,12 @@ private[spark] class JobProgressUI(val sc: SparkContext) { def listener = _listener.get val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - private val indexPage = new IndexPage(this) private val stagePage = new StagePage(this) private val poolPage = new PoolPage(this) - var stagePoolInfo: StagePoolInfo = null - var stagePagePoolSource: PoolSource = null - def start() { _listener = Some(new JobProgressListener(sc)) - sc.getSchedulingMode match { - case SchedulingMode.FIFO => - stagePoolInfo = new FIFOStagePoolInfo() - stagePagePoolSource = new FIFOSource() - case SchedulingMode.FAIR => - stagePoolInfo = new FairStagePoolInfo(listener) - stagePagePoolSource = new FairSource(sc) - } - sc.addSparkListener(listener) } diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala index 37d4f8fa6bea57e05e638bbf51df8ef937da62f1..ee5a6a6a48bec821fce2ec5e93b302f30aadffff 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala @@ -17,12 +17,11 @@ private[spark] class PoolPage(parent: JobProgressUI) { val poolName = request.getParameter("poolname") val poolToActiveStages = listener.poolToActiveStages val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq - - val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName) - val poolTable = new PoolTable(poolDetailPoolSource, listener) - val activeStagesTable = new StageTable(activeStages, parent) + val pool = listener.sc.getPoolForName(poolName).get + val poolTable = new PoolTable(Seq(pool), listener) + val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++ <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq() diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala index 8788ed8bc151092d16eaaaf366a51870fefac17c..9cfe0d68f0eb6f921e5b70d2c92be07023ec6594 100644 --- a/core/src/main/scala/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala @@ -1,65 +1,19 @@ package spark.ui.jobs -import java.util.Date - -import javax.servlet.http.HttpServletRequest - -import scala.Some -import scala.xml.{NodeSeq, Node} +import scala.xml.Node import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import spark.SparkContext import spark.scheduler.Stage -import spark.ui.UIUtils._ -import spark.ui.Page._ -import spark.storage.StorageLevel import spark.scheduler.cluster.Schedulable -/* - * Interface for get pools seq showing on Index or pool detail page - */ - -private[spark] trait PoolSource { - def getPools: Seq[Schedulable] -} - -/* - * Pool source for FIFO scheduler algorithm on Index page - */ -private[spark] class FIFOSource() extends PoolSource { - def getPools: Seq[Schedulable] = { - Seq[Schedulable]() - } -} - -/* - * Pool source for Fair scheduler algorithm on Index page - */ -private[spark] class FairSource(sc: SparkContext) extends PoolSource { - def getPools: Seq[Schedulable] = { - sc.getPools.toSeq - } -} - -/* - * specific pool info for pool detail page - */ -private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource { - def getPools: Seq[Schedulable] = { - val pools = HashSet[Schedulable]() - pools += sc.getPoolNameToPool(poolName) - pools.toSeq - } -} - /** Table showing list of pools */ -private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) { +private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) { var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages def toNodeSeq(): Seq[Node] = { - poolTable(poolRow, poolSource.getPools) + poolTable(poolRow, pools) } // pool tables diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index e18b70f0b93a2809e7e9f30ca0d371478a4eec7b..3257f4e36050a1fad5662e5063a425927bd52a26 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -10,51 +10,20 @@ import scala.xml.{NodeSeq, Node} import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet -import spark.scheduler.cluster.TaskInfo +import spark.scheduler.cluster.{SchedulingMode, TaskInfo} import spark.scheduler.Stage import spark.ui.UIUtils._ import spark.ui.Page._ import spark.Utils import spark.storage.StorageLevel -/* - * Interface to get stage's pool name - */ -private[spark] trait StagePoolInfo { - def getStagePoolName(s: Stage): String - - def hasHref: Boolean -} - -/* - * For FIFO scheduler algorithm, just show "N/A" and its link status is false - */ -private[spark] class FIFOStagePoolInfo extends StagePoolInfo { - def getStagePoolName(s: Stage): String = "N/A" - - def hasHref: Boolean = false -} - -/* - * For Fair scheduler algorithm, show its pool name and pool detail link status is true - */ -private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo { - def getStagePoolName(s: Stage): String = { - listener.stageToPool(s) - } - - def hasHref: Boolean = true -} - /** Page showing list of all ongoing and recently finished stages */ -private[spark] class StageTable( - val stages: Seq[Stage], - val parent: JobProgressUI) { +private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) { val listener = parent.listener val dateFmt = parent.dateFmt - var stagePoolInfo = parent.stagePoolInfo - + val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR + def toNodeSeq(): Seq[Node] = { stageTable(stageRow, stages) } @@ -64,7 +33,7 @@ private[spark] class StageTable( <table class="table table-bordered table-striped table-condensed sortable"> <thead> <th>Stage Id</th> - <th>Pool Name</th> + {if (isFairScheduler) {<th>Pool Name</th>} else {}} <th>Origin</th> <th>Submitted</th> <td>Duration</td> @@ -116,15 +85,13 @@ private[spark] class StageTable( val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) val totalTasks = s.numPartitions - val poolName = stagePoolInfo.getStagePoolName(s) + val poolName = listener.stageToPool.get(s) <tr> <td>{s.id}</td> - <td>{if (stagePoolInfo.hasHref) { - <a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a> - } else { - {poolName} - }}</td> + {if (isFairScheduler) { + <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>} + } <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> <td>{submissionTime}</td> <td>{getElapsedTime(s.submissionTime,