Skip to content
Snippets Groups Projects
Commit db36e1e7 authored by Tao Lin's avatar Tao Lin Committed by Shixiong Zhu
Browse files

[SPARK-15590][WEBUI] Paginate Job Table in Jobs tab

## What changes were proposed in this pull request?

This patch adds pagination support for the Job Tables in the Jobs tab. Pagination is provided for all of the three Job Tables (active, completed, and failed). Interactions (jumping, sorting, and setting page size) for paged tables are also included.

The diff didn't keep track of some lines based on the original ones. The function `makeRow`of the original `AllJobsPage.scala` is reused. They are separated at the beginning of the function `jobRow` (L427-439) and the function `row`(L594-618) in the new `AllJobsPage.scala`.

## How was this patch tested?

Tested manually by using checking the Web UI after completing and failing hundreds of jobs.
Generate completed jobs by:
```scala
val d = sc.parallelize(Array(1,2,3,4,5))
for(i <- 1 to 255){ var b = d.collect() }
```
Generate failed jobs by calling the following code multiple times:
```scala
var b = d.map(_/0).collect()
```
Interactions like jumping, sorting, and setting page size are all tested.

This shows the pagination for completed jobs:
![paginate success jobs](https://cloud.githubusercontent.com/assets/5558370/15986498/efa12ef6-303b-11e6-8b1d-c3382aeb9ad0.png)

This shows the sorting works in job tables:
![sorting](https://cloud.githubusercontent.com/assets/5558370/15986539/98c8a81a-303c-11e6-86f2-8d2bc7924ee9.png)

This shows the pagination for failed jobs and the effect of jumping and setting page size:
![paginate failed jobs](https://cloud.githubusercontent.com/assets/5558370/15986556/d8c1323e-303c-11e6-8e4b-7bdb030ea42b.png)

Author: Tao Lin <nblintao@gmail.com>

Closes #13620 from nblintao/dev.
parent c979c8bb
No related branches found
No related tags found
No related merge requests found
......@@ -17,17 +17,21 @@
package org.apache.spark.ui.jobs
import java.net.URLEncoder
import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
......@@ -210,64 +214,69 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
</script>
}
private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
private def jobsTable(
request: HttpServletRequest,
jobTag: String,
jobs: Seq[JobUIData]): Seq[Node] = {
val allParameters = request.getParameterMap.asScala.toMap
val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
.map(para => para._1 + "=" + para._2(0))
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
val columns: Seq[Node] = {
<th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
}
val parameterJobPage = request.getParameter(jobTag + ".page")
val parameterJobSortColumn = request.getParameter(jobTag + ".sort")
val parameterJobSortDesc = request.getParameter(jobTag + ".desc")
val parameterJobPageSize = request.getParameter(jobTag + ".pageSize")
val parameterJobPrevPageSize = request.getParameter(jobTag + ".prevPageSize")
def makeRow(job: JobUIData): Seq[Node] = {
val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(job)
val duration: Option[Long] = {
job.submissionTime.map { start =>
val end = job.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
UIUtils.decodeURLParameter(sortColumn)
}.getOrElse(jobIdTitle)
val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
// New jobs should be shown above old jobs by default.
if (jobSortColumn == jobIdTitle) true else false
)
val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
val jobPrevPageSize = Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
val page: Int = {
// If the user has changed to a larger page size, then go to page 1 in order to avoid
// IndexOutOfBoundsException.
if (jobPageSize <= jobPrevPageSize) {
jobPage
} else {
1
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val formattedSubmissionTime = job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val basePathUri = UIUtils.prependBaseUri(parent.basePath)
val jobDescription =
UIUtils.makeDescription(lastStageDescription, basePathUri, plainText = false)
val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId)
<tr id={"job-" + job.jobId}>
<td sorttable_customkey={job.jobId.toString}>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
{jobDescription}
<a href={detailUrl} class="name-link">{lastStageName}</a>
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
{formattedSubmissionTime}
</td>
<td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
<td class="stage-progress-cell">
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
</tr>
}
val currentTime = System.currentTimeMillis()
<table class="table table-bordered table-striped table-condensed sortable">
<thead>{columns}</thead>
<tbody>
{jobs.map(makeRow)}
</tbody>
</table>
try {
new JobPagedTable(
jobs,
jobTag,
UIUtils.prependBaseUri(parent.basePath),
"jobs", // subPath
parameterOtherTable,
parent.jobProgresslistener.stageIdToInfo,
parent.jobProgresslistener.stageIdToData,
currentTime,
jobIdTitle,
pageSize = jobPageSize,
sortColumn = jobSortColumn,
desc = jobSortDesc
).table(page)
} catch {
case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException) =>
<div class="alert alert-error">
<p>Error while rendering job table:</p>
<pre>
{Utils.exceptionString(e)}
</pre>
</div>
}
}
def render(request: HttpServletRequest): Seq[Node] = {
......@@ -279,12 +288,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
val activeJobsTable =
jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
val completedJobsTable =
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val failedJobsTable =
jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
val activeJobsTable = jobsTable(request, "activeJob", activeJobs)
val completedJobsTable = jobsTable(request, "completedJob", completedJobs)
val failedJobsTable = jobsTable(request, "failedJob", failedJobs)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
......@@ -369,3 +375,246 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
}
}
}
private[ui] class JobTableRowData(
val jobData: JobUIData,
val lastStageName: String,
val lastStageDescription: String,
val duration: Long,
val formattedDuration: String,
val submissionTime: Long,
val formattedSubmissionTime: String,
val jobDescription: NodeSeq,
val detailUrl: String)
private[ui] class JobDataSource(
jobs: Seq[JobUIData],
stageIdToInfo: HashMap[Int, StageInfo],
stageIdToData: HashMap[(Int, Int), StageUIData],
basePath: String,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
// Convert JobUIData to JobTableRowData which contains the final contents to show in the table
// so that we can avoid creating duplicate contents during sorting the data
private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
private var _slicedJobIds: Set[Int] = null
override def dataSize: Int = data.size
override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
val r = data.slice(from, to)
_slicedJobIds = r.map(_.jobData.jobId).toSet
r
}
private def getLastStageNameAndDescription(job: JobUIData): (String, String) = {
val lastStageInfo = Option(job.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => stageIdToInfo.get(ids.max)}
val lastStageData = lastStageInfo.flatMap { s =>
stageIdToData.get((s.stageId, s.attemptId))
}
val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val description = lastStageData.flatMap(_.description).getOrElse("")
(name, description)
}
private def jobRow(jobData: JobUIData): JobTableRowData = {
val (lastStageName, lastStageDescription) = getLastStageNameAndDescription(jobData)
val duration: Option[Long] = {
jobData.submissionTime.map { start =>
val end = jobData.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
val submissionTime = jobData.submissionTime
val formattedSubmissionTime = submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
val jobDescription = UIUtils.makeDescription(lastStageDescription, basePath, plainText = false)
val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
new JobTableRowData (
jobData,
lastStageName,
lastStageDescription,
duration.getOrElse(-1),
formattedDuration,
submissionTime.getOrElse(-1),
formattedSubmissionTime,
jobDescription,
detailUrl
)
}
/**
* Return Ordering according to sortColumn and desc
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[JobTableRowData] = {
val ordering = sortColumn match {
case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
override def compare(x: JobTableRowData, y: JobTableRowData): Int =
Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
}
case "Description" => new Ordering[JobTableRowData] {
override def compare(x: JobTableRowData, y: JobTableRowData): Int =
Ordering.String.compare(x.lastStageDescription, y.lastStageDescription)
}
case "Submitted" => new Ordering[JobTableRowData] {
override def compare(x: JobTableRowData, y: JobTableRowData): Int =
Ordering.Long.compare(x.submissionTime, y.submissionTime)
}
case "Duration" => new Ordering[JobTableRowData] {
override def compare(x: JobTableRowData, y: JobTableRowData): Int =
Ordering.Long.compare(x.duration, y.duration)
}
case "Stages: Succeeded/Total" | "Tasks (for all stages): Succeeded/Total" =>
throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
}
}
}
private[ui] class JobPagedTable(
data: Seq[JobUIData],
jobTag: String,
basePath: String,
subPath: String,
parameterOtherTable: Iterable[String],
stageIdToInfo: HashMap[Int, StageInfo],
stageIdToData: HashMap[(Int, Int), StageUIData],
currentTime: Long,
jobIdTitle: String,
pageSize: Int,
sortColumn: String,
desc: Boolean
) extends PagedTable[JobTableRowData] {
val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
parameterOtherTable.mkString("&")
override def tableId: String = jobTag + "-table"
override def tableCssClass: String =
"table table-bordered table-condensed table-striped table-head-clickable"
override def pageSizeFormField: String = jobTag + ".pageSize"
override def prevPageSizeFormField: String = jobTag + ".prevPageSize"
override def pageNumberFormField: String = jobTag + ".page"
override val dataSource = new JobDataSource(
data,
stageIdToInfo,
stageIdToData,
basePath,
currentTime,
pageSize,
sortColumn,
desc)
override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$jobTag.sort=$encodedSortColumn" +
s"&$jobTag.desc=$desc" +
s"&$pageSizeFormField=$pageSize"
}
override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc"
}
override def headers: Seq[Node] = {
// Information for each header: title, cssClass, and sortable
val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] =
Seq(
(jobIdTitle, "", true),
("Description", "", true), ("Submitted", "", true), ("Duration", "", true),
("Stages: Succeeded/Total", "", false),
("Tasks (for all stages): Succeeded/Total", "", false)
)
if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
throw new IllegalArgumentException(s"Unknown column: $sortColumn")
}
val headerRow: Seq[Node] = {
jobHeadersAndCssClasses.map { case (header, cssClass, sortable) =>
if (header == sortColumn) {
val headerLink = Unparsed(
parameterPath +
s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
s"&$jobTag.desc=${!desc}" +
s"&$jobTag.pageSize=$pageSize")
val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN
<th class={cssClass}>
<a href={headerLink}>
{header}<span>
&nbsp;{Unparsed(arrow)}
</span>
</a>
</th>
} else {
if (sortable) {
val headerLink = Unparsed(
parameterPath +
s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
s"&$jobTag.pageSize=$pageSize")
<th class={cssClass}>
<a href={headerLink}>
{header}
</a>
</th>
} else {
<th class={cssClass}>
{header}
</th>
}
}
}
}
<thead>{headerRow}</thead>
}
override def row(jobTableRow: JobTableRowData): Seq[Node] = {
val job = jobTableRow.jobData
<tr id={"job-" + job.jobId}>
<td>
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
{jobTableRow.jobDescription}
<a href={jobTableRow.detailUrl} class="name-link">{jobTableRow.lastStageName}</a>
</td>
<td>
{jobTableRow.formattedSubmissionTime}
</td>
<td>{jobTableRow.formattedDuration}</td>
<td class="stage-progress-cell">
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed = job.numKilledTasks,
total = job.numTasks - job.numSkippedTasks)}
</td>
</tr>
}
}
......@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders should not contain "Job Id (Job Group)"
tableHeaders(0) should not startWith "Job Id (Job Group)"
}
// Once at least one job has been run in a job group, then we should display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
......@@ -226,7 +226,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
tableHeaders should contain ("Job Id (Job Group)")
// Can suffix up/down arrow in the header
tableHeaders(0) should startWith ("Job Id (Job Group)")
}
val jobJson = getJson(sc.ui.get, "jobs")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment