Skip to content
Snippets Groups Projects
Commit 6d7afd7c authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Merge pull request #768 from pwendell/pr-695

Minor clean-up of fair scheduler UI
parents e466a55a 87fd321a
No related branches found
No related tags found
No related merge requests found
......@@ -579,12 +579,19 @@ class SparkContext(
/**
* Return pools for fair scheduler
* TODO(xiajunluan):now, we have not taken nested pools into account
* TODO(xiajunluan): We should take nested pools into account
*/
def getPools: ArrayBuffer[Schedulable] = {
def getAllPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
}
/**
* Return the pool associated with the given name, if one exists
*/
def getPoolForName(pool: String): Option[Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
}
/**
* Return current scheduling mode
*/
......@@ -592,10 +599,6 @@ class SparkContext(
taskScheduler.schedulingMode
}
def getPoolNameToPool: HashMap[String, Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable
}
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
......
......@@ -30,9 +30,7 @@ import spark.scheduler.cluster.SchedulingMode.SchedulingMode
*/
private[spark] object UIWorkloadGenerator {
val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 500
val INTER_JOB_WAIT_MS = 5000
def main(args: Array[String]) {
if (args.length < 2) {
......
......@@ -17,16 +17,11 @@
package spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.Some
import scala.xml.{NodeSeq, Node}
import spark.scheduler.cluster.TaskInfo
import spark.scheduler.Stage
import spark.storage.StorageLevel
import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
......@@ -50,7 +45,7 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val completedStagesTable = new StageTable(completedStages, parent)
val failedStagesTable = new StageTable(failedStages, parent)
val poolTable = new PoolTable(parent.stagePagePoolSource, listener)
val poolTable = new PoolTable(listener.sc.getAllPools, listener)
val summary: NodeSeq =
<div>
<ul class="unstyled">
......@@ -79,13 +74,17 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</div>
val content = summary ++
<h3>Pools </h3> ++ poolTable.toNodeSeq ++
<h3>Active Stages : {activeStages.size}</h3> ++
activeStagesTable.toNodeSeq++
<h3>Completed Stages : {completedStages.size}</h3> ++
completedStagesTable.toNodeSeq++
<h3>Failed Stages : {failedStages.size}</h3> ++
failedStagesTable.toNodeSeq
{if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
<h3>Pools</h3> ++ poolTable.toNodeSeq
} else {
Seq()
}} ++
<h3>Active Stages : {activeStages.size}</h3> ++
activeStagesTable.toNodeSeq++
<h3>Completed Stages : {completedStages.size}</h3> ++
completedStagesTable.toNodeSeq++
<h3>Failed Stages : {failedStages.size}</h3> ++
failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
}
......
......@@ -41,25 +41,12 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def listener = _listener.get
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
private val poolPage = new PoolPage(this)
var stagePoolInfo: StagePoolInfo = null
var stagePagePoolSource: PoolSource = null
def start() {
_listener = Some(new JobProgressListener(sc))
sc.getSchedulingMode match {
case SchedulingMode.FIFO =>
stagePoolInfo = new FIFOStagePoolInfo()
stagePagePoolSource = new FIFOSource()
case SchedulingMode.FAIR =>
stagePoolInfo = new FairStagePoolInfo(listener)
stagePagePoolSource = new FairSource(sc)
}
sc.addSparkListener(listener)
}
......
......@@ -17,12 +17,11 @@ private[spark] class PoolPage(parent: JobProgressUI) {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName)
val poolTable = new PoolTable(poolDetailPoolSource, listener)
val activeStagesTable = new StageTable(activeStages, parent)
val pool = listener.sc.getPoolForName(poolName).get
val poolTable = new PoolTable(Seq(pool), listener)
val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
<h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
......
package spark.ui.jobs
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.Some
import scala.xml.{NodeSeq, Node}
import scala.xml.Node
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.SparkContext
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.storage.StorageLevel
import spark.scheduler.cluster.Schedulable
/*
* Interface for get pools seq showing on Index or pool detail page
*/
private[spark] trait PoolSource {
def getPools: Seq[Schedulable]
}
/*
* Pool source for FIFO scheduler algorithm on Index page
*/
private[spark] class FIFOSource() extends PoolSource {
def getPools: Seq[Schedulable] = {
Seq[Schedulable]()
}
}
/*
* Pool source for Fair scheduler algorithm on Index page
*/
private[spark] class FairSource(sc: SparkContext) extends PoolSource {
def getPools: Seq[Schedulable] = {
sc.getPools.toSeq
}
}
/*
* specific pool info for pool detail page
*/
private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource {
def getPools: Seq[Schedulable] = {
val pools = HashSet[Schedulable]()
pools += sc.getPoolNameToPool(poolName)
pools.toSeq
}
}
/** Table showing list of pools */
private[spark] class PoolTable(poolSource: PoolSource, listener: JobProgressListener) {
private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
def toNodeSeq(): Seq[Node] = {
poolTable(poolRow, poolSource.getPools)
poolTable(poolRow, pools)
}
// pool tables
......
......@@ -10,51 +10,20 @@ import scala.xml.{NodeSeq, Node}
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import spark.scheduler.cluster.TaskInfo
import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.Utils
import spark.storage.StorageLevel
/*
* Interface to get stage's pool name
*/
private[spark] trait StagePoolInfo {
def getStagePoolName(s: Stage): String
def hasHref: Boolean
}
/*
* For FIFO scheduler algorithm, just show "N/A" and its link status is false
*/
private[spark] class FIFOStagePoolInfo extends StagePoolInfo {
def getStagePoolName(s: Stage): String = "N/A"
def hasHref: Boolean = false
}
/*
* For Fair scheduler algorithm, show its pool name and pool detail link status is true
*/
private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo {
def getStagePoolName(s: Stage): String = {
listener.stageToPool(s)
}
def hasHref: Boolean = true
}
/** Page showing list of all ongoing and recently finished stages */
private[spark] class StageTable(
val stages: Seq[Stage],
val parent: JobProgressUI) {
private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
val listener = parent.listener
val dateFmt = parent.dateFmt
var stagePoolInfo = parent.stagePoolInfo
val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
def toNodeSeq(): Seq[Node] = {
stageTable(stageRow, stages)
}
......@@ -64,7 +33,7 @@ private[spark] class StageTable(
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Stage Id</th>
<th>Pool Name</th>
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Origin</th>
<th>Submitted</th>
<td>Duration</td>
......@@ -116,15 +85,13 @@ private[spark] class StageTable(
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
val poolName = stagePoolInfo.getStagePoolName(s)
val poolName = listener.stageToPool.get(s)
<tr>
<td>{s.id}</td>
<td>{if (stagePoolInfo.hasHref) {
<a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a>
} else {
{poolName}
}}</td>
{if (isFairScheduler) {
<td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
}
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment