diff --git a/core/src/main/resources/spark/ui/static/webui.css b/core/src/main/resources/spark/ui/static/webui.css index f7537bb7661919e20f66298273471d1629feb0a3..8b9f4ee93841d15f4fcda92013f274d755a181f2 100644 --- a/core/src/main/resources/spark/ui/static/webui.css +++ b/core/src/main/resources/spark/ui/static/webui.css @@ -47,3 +47,7 @@ padding-top: 7px; padding-left: 4px; } + +.table td { + vertical-align: middle !important; +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bd7fe34c646b74d22ae70d6c095927d0cdeb4f65..40b30e4d236cf766dc6a6a094e7a64fb26e1054e 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -267,12 +267,18 @@ class SparkContext( localProperties.value = new Properties() } - def addLocalProperties(key: String, value: String) { + def addLocalProperty(key: String, value: String) { if(localProperties.value == null) { localProperties.value = new Properties() } localProperties.value.setProperty(key,value) } + + /** Set a human readable description of the current job. */ + def setDescription(value: String) { + addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) + } + // Post init taskScheduler.postStartHook() @@ -843,6 +849,7 @@ class SparkContext( * various Spark features. */ object SparkContext { + val SPARK_JOB_DESCRIPTION = "spark.job.description" implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 @@ -960,7 +967,6 @@ object SparkContext { } } - /** * A class encapsulating how to convert some type T to Writable. It stores both the Writable class * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. @@ -972,3 +978,4 @@ private[spark] class WritableConverter[T]( val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable + diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index c7e8f8a9a1d067647e17decd3006e14c0fbd07fb..ad2efcec63706edc222e9f6dc357f6c9eccec2d0 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.{Map, HashMap, ListBuffer} import scala.io.Source import spark._ +import spark.SparkContext import spark.executor.TaskMetrics import spark.scheduler.cluster.TaskInfo @@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { protected def recordJobProperties(jobID: Int, properties: Properties) { if(properties != null) { - val annotation = properties.getProperty("spark.job.annotation", "") - jobLogInfo(jobID, annotation, false) + val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "") + jobLogInfo(jobID, description, false) } } diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala index 3ac35085eb92b63dea36492ed82cfabb38fe572c..97ea644021fff6c7c4490064850c3b5a3e73df4b 100644 --- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -46,13 +46,11 @@ private[spark] object UIWorkloadGenerator { } val sc = new SparkContext(master, appName) - // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase, - // but we pass it here anyways since it will be useful once we do. def setProperties(s: String) = { if(schedulingMode == SchedulingMode.FAIR) { - sc.addLocalProperties("spark.scheduler.cluster.fair.pool", s) + sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s) } - sc.addLocalProperties("spark.job.annotation", s) + sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) } val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala index 200e13cf99c1fcb977b70a7e3bc75d5ef01a8065..f22c4e39e358dfeb58d71d6a8851d4017a061ce8 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala @@ -15,6 +15,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList val DEFAULT_POOL_NAME = "default" val stageToPool = new HashMap[Stage, String]() + val stageToDescription = new HashMap[Stage, String]() val poolToActiveStages = new HashMap[String, HashSet[Stage]]() val activeStages = HashSet[Stage]() @@ -57,6 +58,8 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList stageToTasksActive.remove(s.id) stageToTasksComplete.remove(s.id) stageToTasksFailed.remove(s.id) + stageToPool.remove(s) + if (stageToDescription.contains(s)) {stageToDescription.remove(s)} }) stages.trimEnd(toRemove) } @@ -66,12 +69,17 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { val stage = stageSubmitted.stage activeStages += stage - var poolName = DEFAULT_POOL_NAME - if (stageSubmitted.properties != null) { - poolName = stageSubmitted.properties.getProperty("spark.scheduler.cluster.fair.pool", - DEFAULT_POOL_NAME) - } + + val poolName = Option(stageSubmitted.properties).map { + p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) + }.getOrElse(DEFAULT_POOL_NAME) stageToPool(stage) = poolName + + val description = Option(stageSubmitted.properties).flatMap { + p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + } + description.map(d => stageToDescription(stage) = d) + val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]()) stages += stage } diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala index 3257f4e36050a1fad5662e5063a425927bd52a26..38fa3bcbcd17369bf69e64a74ad5a0709dae1af3 100644 --- a/core/src/main/scala/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala @@ -34,7 +34,7 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU <thead> <th>Stage Id</th> {if (isFairScheduler) {<th>Pool Name</th>} else {}} - <th>Origin</th> + <th>Description</th> <th>Submitted</th> <td>Duration</td> <td colspan="2">Tasks: Complete/Total</td> @@ -87,13 +87,17 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU val poolName = listener.stageToPool.get(s) + val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a> + val description = listener.stageToDescription.get(s) + .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink) + <tr> <td>{s.id}</td> {if (isFairScheduler) { <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>} } - <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td> - <td>{submissionTime}</td> + <td>{description}</td> + <td valign="middle">{submissionTime}</td> <td>{getElapsedTime(s.submissionTime, s.completionTime.getOrElse(System.currentTimeMillis()))}</td> <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td> diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala index 14bb58731b7449a06a1d4a56c7099e016269090e..66fd59e8bbee62853c13dc77e2ba2c0546307eae 100644 --- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala @@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext { TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1) new Thread { if (poolName != null) { - sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName) + sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName) } override def run() { val ans = nums.map(number => {