From 2080e250060975a876a388eb785e7f2b3cf2c0cd Mon Sep 17 00:00:00 2001 From: Andrew xia <junluan.xia@intel.com> Date: Fri, 12 Jul 2013 14:25:18 +0800 Subject: [PATCH] Enhance job ui in spark ui system with adding pool information --- core/src/main/scala/spark/SparkContext.scala | 17 ++- .../scala/spark/scheduler/DAGScheduler.scala | 4 +- .../scala/spark/scheduler/JobLogger.scala | 2 +- .../scala/spark/scheduler/SparkListener.scala | 2 +- .../scala/spark/scheduler/TaskScheduler.scala | 7 + .../scheduler/cluster/ClusterScheduler.scala | 11 +- .../cluster/ClusterTaskSetManager.scala | 4 +- .../spark/scheduler/cluster/Schedulable.scala | 6 +- .../scheduler/cluster/SchedulingMode.scala | 4 +- .../scheduler/cluster/TaskSetManager.scala | 1 + .../scheduler/local/LocalScheduler.scala | 10 +- .../scheduler/local/LocalTaskSetManager.scala | 1 + .../main/scala/spark/ui/jobs/IndexPage.scala | 121 ++++----------- .../spark/ui/jobs/JobProgressListener.scala | 140 +++++++++++++++++ .../scala/spark/ui/jobs/JobProgressUI.scala | 112 +++----------- .../main/scala/spark/ui/jobs/PoolPage.scala | 38 +++++ .../main/scala/spark/ui/jobs/PoolTable.scala | 98 ++++++++++++ .../main/scala/spark/ui/jobs/StageTable.scala | 143 ++++++++++++++++++ .../spark/scheduler/DAGSchedulerSuite.scala | 6 + .../spark/scheduler/JobLoggerSuite.scala | 2 +- 20 files changed, 530 insertions(+), 199 deletions(-) create mode 100644 core/src/main/scala/spark/ui/jobs/JobProgressListener.scala create mode 100644 core/src/main/scala/spark/ui/jobs/PoolPage.scala create mode 100644 core/src/main/scala/spark/ui/jobs/PoolTable.scala create mode 100644 core/src/main/scala/spark/ui/jobs/StageTable.scala diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 43e6af2351..b5225d5681 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -10,6 +10,7 @@ import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.generic.Growable import scala.collection.mutable.HashMap +import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.util.DynamicVariable import scala.collection.mutable.{ConcurrentMap, HashMap} @@ -43,13 +44,14 @@ import org.apache.mesos.MesosNativeLibrary import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.partial.{ApproximateEvaluator, PartialResult} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} -import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} -import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} +import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob} +import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler, Schedulable} import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.{StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} import ui.{SparkUI} +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -540,6 +542,17 @@ class SparkContext( env.blockManager.master.getStorageStatus } + def getPoolsInfo: ArrayBuffer[Schedulable] = { + taskScheduler.rootPool.schedulableQueue + } + + def getSchedulingMode: SchedulingMode = { + taskScheduler.schedulingMode + } + + def getPoolNameToPool: HashMap[String, Schedulable] = { + taskScheduler.rootPool.schedulableNameToSchedulable + } /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 3d3b9ea011..c865743e37 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -472,11 +472,11 @@ class DAGScheduler( } } if (tasks.size > 0) { - sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size))) + val properties = idToActiveJob(stage.priority).properties + sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size, properties))) logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") myPending ++= tasks logDebug("New pending tasks: " + myPending) - val properties = idToActiveJob(stage.priority).properties taskSched.submitTasks( new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties)) if (!stage.submissionTime.isDefined) { diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 6a9d52f356..8e5540873f 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -45,7 +45,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { event match { case SparkListenerJobStart(job, properties) => processJobStartEvent(job, properties) - case SparkListenerStageSubmitted(stage, taskSize) => + case SparkListenerStageSubmitted(stage, taskSize, properties) => processStageSubmittedEvent(stage, taskSize) case StageCompleted(stageInfo) => processStageCompletedEvent(stageInfo) diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index 8de3aa91a4..94fdad9b98 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -8,7 +8,7 @@ import spark.executor.TaskMetrics sealed trait SparkListenerEvents -case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents +case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties = null) extends SparkListenerEvents case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala index 7787b54762..5cdf846032 100644 --- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala @@ -1,5 +1,7 @@ package spark.scheduler +import spark.scheduler.cluster.Pool +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler. * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage, @@ -8,6 +10,11 @@ package spark.scheduler * the TaskSchedulerListener interface. */ private[spark] trait TaskScheduler { + + def rootPool: Pool + + def schedulingMode: SchedulingMode + def start(): Unit // Invoked after system has successfully initialized (typically in spark context). diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 3a0c29b27f..1b23fd6cef 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -12,6 +12,7 @@ import spark.scheduler._ import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong import java.util.{TimerTask, Timer} +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -97,6 +98,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null + //default scheduler is FIFO + val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO")) override def setListener(listener: TaskSchedulerListener) { this.listener = listener @@ -104,15 +107,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext) def initialize(context: SchedulerBackend) { backend = context - //default scheduler is FIFO - val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") //temporarily set rootPool name to empty - rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0) + rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { - case "FIFO" => + case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) - case "FAIR" => + case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool) } } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index fe6420a522..7a6a6b7826 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -90,8 +90,8 @@ private[spark] class ClusterTaskSetManager( var priority = taskSet.priority var stageId = taskSet.stageId var name = "TaskSet_"+taskSet.stageId.toString - var parent:Schedulable = null - + var parent: Schedulable = null + var schedulableQueue :ArrayBuffer[Schedulable] = null // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala index 2dd9c0564f..2e4f14c11f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala +++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala @@ -1,13 +1,17 @@ package spark.scheduler.cluster -import scala.collection.mutable.ArrayBuffer +import spark.scheduler.cluster.SchedulingMode.SchedulingMode +import scala.collection.mutable.ArrayBuffer /** * An interface for schedulable entities. * there are two type of Schedulable entities(Pools and TaskSetManagers) */ private[spark] trait Schedulable { var parent: Schedulable + //childrens + def schedulableQueue: ArrayBuffer[Schedulable] + def schedulingMode: SchedulingMode def weight: Int def minShare: Int def runningTasks: Int diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala index 6e0c6793e0..c5c7ee3b22 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala @@ -1,7 +1,7 @@ package spark.scheduler.cluster -object SchedulingMode extends Enumeration("FAIR","FIFO"){ +object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE"){ type SchedulingMode = Value - val FAIR,FIFO = Value + val FAIR,FIFO,NONE = Value } diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index b4dd75d90f..472e01b227 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -6,6 +6,7 @@ import spark.TaskState.TaskState import java.nio.ByteBuffer private[spark] trait TaskSetManager extends Schedulable { + def schedulingMode = SchedulingMode.NONE def taskSet: TaskSet def slaveOffer(execId: String, hostPort: String, availableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index b000e328e6..19a48895e3 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -12,6 +12,7 @@ import spark.TaskState.TaskState import spark.executor.ExecutorURLClassLoader import spark.scheduler._ import spark.scheduler.cluster._ +import spark.scheduler.cluster.SchedulingMode.SchedulingMode import akka.actor._ /** @@ -63,6 +64,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null + val schedulingMode: SchedulingMode = SchedulingMode.withName(System.getProperty("spark.cluster.schedulingmode", "FIFO")) val activeTaskSets = new HashMap[String, TaskSetManager] val taskIdToTaskSetId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] @@ -70,15 +72,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: var localActor: ActorRef = null override def start() { - //default scheduler is FIFO - val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO") //temporarily set rootPool name to empty - rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0) + rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { - case "FIFO" => + case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) - case "FAIR" => + case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool) } } diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index f12fec41d5..8954f40ea9 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -14,6 +14,7 @@ import spark.scheduler.cluster._ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet) extends TaskSetManager with Logging { var parent: Schedulable = null + var schedulableQueue :ArrayBuffer[Schedulable] = null var weight: Int = 1 var minShare: Int = 0 var runningTasks: Int = 0 diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 1e675ab2cb..e765cecb01 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -6,107 +6,52 @@ import javax.servlet.http.HttpServletRequest import scala.Some import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet import spark.scheduler.Stage import spark.ui.UIUtils._ import spark.ui.Page._ import spark.storage.StorageLevel +import spark.scheduler.cluster.Schedulable +import spark.scheduler.cluster.SchedulingMode +import spark.scheduler.cluster.SchedulingMode.SchedulingMode -/** Page showing list of all ongoing and recently finished stages */ +/** Page showing list of all ongoing and recently finished stages and pools*/ private[spark] class IndexPage(parent: JobProgressUI) { def listener = parent.listener - val dateFmt = parent.dateFmt + + def stageTable: StageTable = parent.stageTable + + def poolTable: PoolTable = parent.poolTable def render(request: HttpServletRequest): Seq[Node] = { val activeStages = listener.activeStages.toSeq val completedStages = listener.completedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq - /** Special table which merges two header cells. */ - def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { - <table class="table table-bordered table-striped table-condensed sortable"> - <thead> - <th>Stage Id</th> - <th>Origin</th> - <th>Submitted</th> - <td>Duration</td> - <td colspan="2">Tasks: Complete/Total</td> - <td>Shuffle Activity</td> - <td>Stored RDD</td> - </thead> - <tbody> - {rows.map(r => makeRow(r))} - </tbody> - </table> - } - - val activeStageTable: NodeSeq = stageTable(stageRow, activeStages) - val completedStageTable = stageTable(stageRow, completedStages) - val failedStageTable: NodeSeq = stageTable(stageRow, failedStages) - - val content = <h2>Active Stages</h2> ++ activeStageTable ++ - <h2>Completed Stages</h2> ++ completedStageTable ++ - <h2>Failed Stages</h2> ++ failedStageTable - - headerSparkPage(content, parent.sc, "Spark Stages", Jobs) - } - - def getElapsedTime(submitted: Option[Long], completed: Long): String = { - submitted match { - case Some(t) => parent.formatDuration(completed - t) - case _ => "Unknown" - } - } - - def makeProgressBar(completed: Int, total: Int): Seq[Node] = { - val width=130 - val height=15 - val completeWidth = (completed.toDouble / total) * width - - <svg width={width.toString} height={height.toString}> - <rect width={width.toString} height={height.toString} - fill="white" stroke="rgb(51,51,51)" stroke-width="1" /> - <rect width={completeWidth.toString} height={height.toString} - fill="rgb(0,136,204)" stroke="black" stroke-width="1" /> - </svg> - } - - - def stageRow(s: Stage): Seq[Node] = { - val submissionTime = s.submissionTime match { - case Some(t) => dateFmt.format(new Date(t)) - case None => "Unknown" - } - val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id)) - val shuffleInfo = (read, write) match { - case (true, true) => "Read/Write" - case (true, false) => "Read" - case (false, true) => "Write" - case _ => "" - } - val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) - val totalTasks = s.numPartitions - - <tr> - <td>{s.id}</td> - <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td> - <td>{submissionTime}</td> - <td>{getElapsedTime(s.submissionTime, - s.completionTime.getOrElse(System.currentTimeMillis()))}</td> - <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td> - <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks} - {listener.stageToTasksFailed.getOrElse(s.id, 0) match { - case f if f > 0 => "(%s failed)".format(f) - case _ => - }} - </td> - <td>{shuffleInfo}</td> - <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { - <a href={"/storage/rdd?id=%s".format(s.rdd.id)}> - {Option(s.rdd.name).getOrElse(s.rdd.id)} - </a> - }} - </td> - </tr> + stageTable.setStagePoolInfo(parent.stagePoolInfo) + poolTable.setPoolSource(parent.stagePagePoolSource) + + val activeStageNodeSeq = stageTable.toNodeSeq(activeStages) + val completedStageNodeSeq = stageTable.toNodeSeq(completedStages) + val failedStageNodeSeq = stageTable.toNodeSeq(failedStages) + + val content = <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>Active Stages Number:</strong> {activeStages.size} </li> + <li><strong>Completed Stages Number:</strong> {completedStages.size} </li> + <li><strong>Failed Stages Number:</strong> {failedStages.size} </li> + <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li> + </ul> + </div> + </div> ++ + <h3>Pools </h3> ++ poolTable.toNodeSeq ++ + <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq++ + <h3>Completed Stages : {completedStages.size}</h3> ++ completedStageNodeSeq++ + <h3>Failed Stages : {failedStages.size}</h3> ++ failedStageNodeSeq + + headerSparkPage(content, parent.sc, "Spark Stages/Pools", Jobs) } } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala new file mode 100644 index 0000000000..1244f9538b --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -0,0 +1,140 @@ +package spark.ui.jobs + +import scala.Seq +import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} + +import spark.{ExceptionFailure, SparkContext, Success, Utils} +import spark.scheduler._ +import spark.scheduler.cluster.TaskInfo +import spark.executor.TaskMetrics +import collection.mutable + +private[spark] class FairJobProgressListener(val sparkContext: SparkContext) + extends JobProgressListener(sparkContext) { + + val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool" + val DEFAULT_POOL_NAME = "default" + + override val stageToPool = HashMap[Stage, String]() + override val poolToActiveStages = HashMap[String, HashSet[Stage]]() + + override def onStageCompleted(stageCompleted: StageCompleted) = { + super.onStageCompleted(stageCompleted) + val stage = stageCompleted.stageInfo.stage + poolToActiveStages(stageToPool(stage)) -= stage + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { + super.onStageSubmitted(stageSubmitted) + val stage = stageSubmitted.stage + var poolName = DEFAULT_POOL_NAME + if (stageSubmitted.properties != null) { + poolName = stageSubmitted.properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) + } + stageToPool(stage) = poolName + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) + stages += stage + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + super.onJobEnd(jobEnd) + jobEnd match { + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + poolToActiveStages(stageToPool(stage)) -= stage + case _ => + } + case _ => + } + } +} + +private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { + // How many stages to remember + val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt + + def stageToPool: HashMap[Stage, String] = null + def poolToActiveStages: HashMap[String, HashSet[Stage]] =null + + val activeStages = HashSet[Stage]() + val completedStages = ListBuffer[Stage]() + val failedStages = ListBuffer[Stage]() + + val stageToTasksComplete = HashMap[Int, Int]() + val stageToTasksFailed = HashMap[Int, Int]() + val stageToTaskInfos = + HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + + override def onJobStart(jobStart: SparkListenerJobStart) {} + + override def onStageCompleted(stageCompleted: StageCompleted) = { + val stage = stageCompleted.stageInfo.stage + activeStages -= stage + completedStages += stage + trimIfNecessary(completedStages) + } + + /** If stages is too large, remove and garbage collect old stages */ + def trimIfNecessary(stages: ListBuffer[Stage]) { + if (stages.size > RETAINED_STAGES) { + val toRemove = RETAINED_STAGES / 10 + stages.takeRight(toRemove).foreach( s => { + stageToTaskInfos.remove(s.id) + }) + stages.trimEnd(toRemove) + } + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = + activeStages += stageSubmitted.stage + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val sid = taskEnd.task.stageId + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + (Some(e), e.metrics) + case _ => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + (None, Some(taskEnd.taskMetrics)) + } + val taskList = stageToTaskInfos.getOrElse( + sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + stageToTaskInfos(sid) = taskList + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + jobEnd match { + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + activeStages -= stage + failedStages += stage + trimIfNecessary(failedStages) + case _ => + } + case _ => + } + } + + /** Is this stage's input from a shuffle read. */ + def hasShuffleRead(stageID: Int): Boolean = { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined + } + return false // No tasks have finished for this stage + } + + /** Is this stage's output to a shuffle write. */ + def hasShuffleWrite(stageID: Int): Boolean = { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined + } + return false // No tasks have finished for this stage + } +} diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 84730cc091..e610252242 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -14,9 +14,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import spark.ui.JettyUtils._ import spark.{ExceptionFailure, SparkContext, Success, Utils} import spark.scheduler._ -import spark.scheduler.cluster.TaskInfo -import spark.executor.TaskMetrics import collection.mutable +import spark.scheduler.cluster.SchedulingMode +import spark.scheduler.cluster.SchedulingMode.SchedulingMode /** Web UI showing progress status of all jobs in the given SparkContext. */ private[spark] class JobProgressUI(val sc: SparkContext) { @@ -24,104 +24,38 @@ private[spark] class JobProgressUI(val sc: SparkContext) { def listener = _listener.get val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + private val indexPage = new IndexPage(this) private val stagePage = new StagePage(this) + private val poolPage = new PoolPage(this) + + var stageTable: StageTable = null + var stagePoolInfo: StagePoolInfo = null + var poolTable: PoolTable = null + var stagePagePoolSource: PoolSource = null def start() { - _listener = Some(new JobProgressListener) + sc.getSchedulingMode match { + case SchedulingMode.FIFO => + _listener = Some(new JobProgressListener(sc)) + stagePoolInfo = new FIFOStagePoolInfo() + stagePagePoolSource = new FIFOSource() + case SchedulingMode.FAIR => + _listener = Some(new FairJobProgressListener(sc)) + stagePoolInfo = new FairStagePoolInfo(listener) + stagePagePoolSource = new FairSource(sc) + } + sc.addSparkListener(listener) + stageTable = new StageTable(dateFmt, formatDuration, listener) + poolTable = new PoolTable(listener) } def formatDuration(ms: Long) = Utils.msDurationToString(ms) def getHandlers = Seq[(String, Handler)]( ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), + ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)), ("/stages", (request: HttpServletRequest) => indexPage.render(request)) ) } - -private[spark] class JobProgressListener extends SparkListener { - // How many stages to remember - val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt - - val activeStages = HashSet[Stage]() - val completedStages = ListBuffer[Stage]() - val failedStages = ListBuffer[Stage]() - - val stageToTasksComplete = HashMap[Int, Int]() - val stageToTasksFailed = HashMap[Int, Int]() - val stageToTaskInfos = - HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() - - override def onJobStart(jobStart: SparkListenerJobStart) {} - - override def onStageCompleted(stageCompleted: StageCompleted) = { - val stage = stageCompleted.stageInfo.stage - activeStages -= stage - completedStages += stage - trimIfNecessary(completedStages) - } - - /** If stages is too large, remove and garbage collect old stages */ - def trimIfNecessary(stages: ListBuffer[Stage]) { - if (stages.size > RETAINED_STAGES) { - val toRemove = RETAINED_STAGES / 10 - stages.takeRight(toRemove).foreach( s => { - stageToTaskInfos.remove(s.id) - }) - stages.trimEnd(toRemove) - } - } - - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = - activeStages += stageSubmitted.stage - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - val sid = taskEnd.task.stageId - val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = - taskEnd.reason match { - case e: ExceptionFailure => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - (Some(e), e.metrics) - case _ => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - (None, Some(taskEnd.taskMetrics)) - } - val taskList = stageToTaskInfos.getOrElse( - sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) - taskList += ((taskEnd.taskInfo, metrics, failureInfo)) - stageToTaskInfos(sid) = taskList - } - - override def onJobEnd(jobEnd: SparkListenerJobEnd) { - jobEnd match { - case end: SparkListenerJobEnd => - end.jobResult match { - case JobFailed(ex, Some(stage)) => - activeStages -= stage - failedStages += stage - trimIfNecessary(failedStages) - case _ => - } - case _ => - } - } - - /** Is this stage's input from a shuffle read. */ - def hasShuffleRead(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined - } - return false // No tasks have finished for this stage - } - - /** Is this stage's output to a shuffle write. */ - def hasShuffleWrite(stageID: Int): Boolean = { - // This is written in a slightly complicated way to avoid having to scan all tasks - for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { - if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined - } - return false // No tasks have finished for this stage - } -} \ No newline at end of file diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala new file mode 100644 index 0000000000..00703887c3 --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala @@ -0,0 +1,38 @@ +package spark.ui.jobs + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashSet + +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ + +/** Page showing specific pool details*/ +private[spark] class PoolPage(parent: JobProgressUI) { + def listener = parent.listener + + def stageTable: StageTable = parent.stageTable + + def poolTable: PoolTable = parent.poolTable + + def render(request: HttpServletRequest): Seq[Node] = { + val poolName = request.getParameter("poolname") + val poolToActiveStages = listener.poolToActiveStages + val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq + val stageToPool = listener.stageToPool + + val poolDetailPoolSource = new PoolDetailSource(parent.sc, poolName) + poolTable.setPoolSource(poolDetailPoolSource) + + stageTable.setStagePoolInfo(parent.stagePoolInfo) + + val activeStageNodeSeq = stageTable.toNodeSeq(activeStages) + + val content = <h3>Pool </h3> ++ poolTable.toNodeSeq ++ + <h3>Active Stages : {activeStages.size}</h3> ++ activeStageNodeSeq + + headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs) + } +} diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala new file mode 100644 index 0000000000..bb8be4b26e --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala @@ -0,0 +1,98 @@ +package spark.ui.jobs + +import java.util.Date + +import javax.servlet.http.HttpServletRequest + +import scala.Some +import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet + +import spark.SparkContext +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ +import spark.storage.StorageLevel +import spark.scheduler.cluster.Schedulable + +/* + * Interface for get pools seq showing on Index or pool detail page + */ + +private[spark] trait PoolSource { + def getPools: Seq[Schedulable] +} + +/* + * Pool source for FIFO scheduler algorithm on Index page + */ +private[spark] class FIFOSource() extends PoolSource{ + def getPools: Seq[Schedulable] = { + Seq[Schedulable]() + } +} + +/* + * Pool source for Fair scheduler algorithm on Index page + */ +private[spark] class FairSource(sc: SparkContext) extends PoolSource{ + def getPools: Seq[Schedulable] = { + sc.getPoolsInfo.toSeq + } +} + +/* + * specific pool info for pool detail page + */ +private[spark] class PoolDetailSource(sc: SparkContext, poolName: String) extends PoolSource{ + def getPools: Seq[Schedulable] = { + val pools = HashSet[Schedulable]() + pools += sc.getPoolNameToPool(poolName) + pools.toSeq + } +} + +/** Table showing list of pools */ +private[spark] class PoolTable(listener: JobProgressListener) { + + var poolSource: PoolSource = null + var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages + + def toNodeSeq: Seq[Node] = { + poolTable(poolRow, poolSource.getPools) + } + + def setPoolSource(poolSource: PoolSource) { + this.poolSource = poolSource + } + + //pool tables + def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node], rows: Seq[Schedulable]): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Pool Name</th> + <th>Minimum Share</th> + <th>Pool Weight</th> + <td>Active Stages</td> + <td>Running Tasks</td> + <td>SchedulingMode</td> + </thead> + <tbody> + {rows.map(r => makeRow(r, poolToActiveStages))} + </tbody> + </table> + } + + def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = { + <tr> + <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td> + <td>{p.minShare}</td> + <td>{p.weight}</td> + <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td> + <td>{p.runningTasks}</td> + <td>{p.schedulingMode}</td> + </tr> + } +} + diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala new file mode 100644 index 0000000000..83e566c55b --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -0,0 +1,143 @@ +package spark.ui.jobs + +import java.util.Date +import java.text.SimpleDateFormat + +import javax.servlet.http.HttpServletRequest + +import scala.Some +import scala.xml.{NodeSeq, Node} +import scala.collection.mutable.HashMap + +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ +import spark.storage.StorageLevel + +/* + * Interface to get stage's pool name + */ +private[spark] trait StagePoolInfo { + def getStagePoolName(s: Stage): String + + def hasHerf: Boolean +} + +/* + * For FIFO scheduler algorithm, just show "N/A" and its link status is false + */ +private[spark] class FIFOStagePoolInfo extends StagePoolInfo { + def getStagePoolName(s: Stage): String = "N/A" + + def hasHerf: Boolean = false +} + +/* + * For Fair scheduler algorithm, show its pool name and pool detail link status is true + */ +private[spark] class FairStagePoolInfo(listener: JobProgressListener) extends StagePoolInfo { + def getStagePoolName(s: Stage): String = { + listener.stageToPool(s) + } + + def hasHerf: Boolean = true +} + +/** Page showing list of all ongoing and recently finished stages */ +private[spark] class StageTable(val dateFmt: SimpleDateFormat, val formatDuration: Long => String, val listener: JobProgressListener) { + + var stagePoolInfo: StagePoolInfo = null + + def toNodeSeq(stages: Seq[Stage]): Seq[Node] = { + stageTable(stageRow, stages) + } + + def setStagePoolInfo(stagePoolInfo: StagePoolInfo) { + this.stagePoolInfo = stagePoolInfo + } + + /** Special table which merges two header cells. */ + def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Stage Id</th> + <th>Pool Name</th> + <th>Origin</th> + <th>Submitted</th> + <td>Duration</td> + <td colspan="2">Tasks: Complete/Total</td> + <td>Shuffle Activity</td> + <td>Stored RDD</td> + </thead> + <tbody> + {rows.map(r => makeRow(r))} + </tbody> + </table> + } + + def getElapsedTime(submitted: Option[Long], completed: Long): String = { + submitted match { + case Some(t) => formatDuration(completed - t) + case _ => "Unknown" + } + } + + def makeProgressBar(completed: Int, total: Int): Seq[Node] = { + val width=130 + val height=15 + val completeWidth = (completed.toDouble / total) * width + + <svg width={width.toString} height={height.toString}> + <rect width={width.toString} height={height.toString} + fill="white" stroke="rgb(51,51,51)" stroke-width="1" /> + <rect width={completeWidth.toString} height={height.toString} + fill="rgb(0,136,204)" stroke="black" stroke-width="1" /> + </svg> + } + + + def stageRow(s: Stage): Seq[Node] = { + val submissionTime = s.submissionTime match { + case Some(t) => dateFmt.format(new Date(t)) + case None => "Unknown" + } + val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id)) + val shuffleInfo = (read, write) match { + case (true, true) => "Read/Write" + case (true, false) => "Read" + case (false, true) => "Write" + case _ => "" + } + val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) + val totalTasks = s.numPartitions + + val poolName = stagePoolInfo.getStagePoolName(s) + + <tr> + <td>{s.id}</td> + <td>{if (stagePoolInfo.hasHerf) { + <a href={"/stages/pool?poolname=%s".format(poolName)}>{poolName}</a> + } else { + {poolName} + }}</td> + <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td> + <td>{submissionTime}</td> + <td>{getElapsedTime(s.submissionTime, + s.completionTime.getOrElse(System.currentTimeMillis()))}</td> + <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td> + <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks} + {listener.stageToTasksFailed.getOrElse(s.id, 0) match { + case f if f > 0 => "(%s failed)".format(f) + case _ => + }} + </td> + <td>{shuffleInfo}</td> + <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { + <a href={"/storage/rdd?id=%s".format(s.rdd.id)}> + {Option(s.rdd.name).getOrElse(s.rdd.id)} + </a> + }} + </td> + </tr> + } +} diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index 30e6fef950..da72bfbf89 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -22,6 +22,10 @@ import spark.TaskEndReason import spark.{FetchFailed, Success} +import spark.scheduler.cluster.Pool +import spark.scheduler.cluster.SchedulingMode +import spark.scheduler.cluster.SchedulingMode.SchedulingMode + /** * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler * rather than spawning an event loop thread as happens in the real code. They use EasyMock @@ -39,6 +43,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() val taskScheduler = new TaskScheduler() { + override def rootPool: Pool = null + override def schedulingMode: SchedulingMode = SchedulingMode.NONE override def start() = {} override def stop() = {} override def submitTasks(taskSet: TaskSet) = { diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala index 699901f1a1..328e7e7529 100644 --- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala @@ -40,7 +40,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID) val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID) - joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4)) + joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null)) joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName) parentRdd.setName("MyRDD") joblogger.getRddNameTest(parentRdd) should be ("MyRDD") -- GitLab