Skip to content
Snippets Groups Projects
Commit 1b7106b8 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-6862] [STREAMING] [WEBUI] Add BatchPage to display details of a batch

This is an initial commit for SPARK-6862. Once SPARK-6796 is merged, I will add the links to StreamingPage so that the user can jump to BatchPage.

Screenshots:
![success](https://cloud.githubusercontent.com/assets/1000778/7102439/bbe75406-e0b3-11e4-84fe-3e6de629a49a.png)
![failure](https://cloud.githubusercontent.com/assets/1000778/7102440/bc124454-e0b3-11e4-921a-c8b39d6b61bc.png)

Author: zsxwing <zsxwing@gmail.com>

Closes #5473 from zsxwing/SPARK-6862 and squashes the following commits:

0727d35 [zsxwing] Change BatchUIData to a case class
b380cfb [zsxwing] Add createJobStart to eliminate duplicate codes
9a3083d [zsxwing] Rename XxxDatas -> XxxData
087ba98 [zsxwing] Refactor BatchInfo to store only necessary fields
cb62e4f [zsxwing] Use Seq[(OutputOpId, SparkJobId)] to store the id relations
72f8e7e [zsxwing] Add unit tests for BatchPage
1282b10 [zsxwing] Handle some corner cases and add tests for StreamingJobProgressListener
77a69ae [zsxwing] Refactor codes as per TD's comments
35ffd80 [zsxwing] Merge branch 'master' into SPARK-6862
15bdf9b [zsxwing] Add batch links and unit tests
4bf66b6 [zsxwing] Merge branch 'master' into SPARK-6862
7168807 [zsxwing] Limit the max width of the error message and fix nits in the UI
0b226f9 [zsxwing] Change 'Last Error' to 'Error'
fc98a43 [zsxwing] Put clearing local properties to finally and remove redundant private[streaming]
0c7b2eb [zsxwing] Add BatchPage to display details of a batch
parent 114bad60
No related branches found
No related tags found
No related merge requests found
Showing
with 710 additions and 81 deletions
......@@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet
import scala.collection.mutable.HashMap
private[jobs] object UIData {
private[spark] object UIData {
class ExecutorSummary {
var taskTime : Long = 0
......
......@@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
println("Time: " + time)
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.size > num) println("...")
if (firstNum.length > num) println("...")
println()
}
}
......
......@@ -25,15 +25,49 @@ import scala.util.Try
*/
private[streaming]
class Job(val time: Time, func: () => _) {
var id: String = _
var result: Try[_] = null
private var _id: String = _
private var _outputOpId: Int = _
private var isSet = false
private var _result: Try[_] = null
def run() {
result = Try(func())
_result = Try(func())
}
def setId(number: Int) {
id = "streaming job " + time + "." + number
def result: Try[_] = {
if (_result == null) {
throw new IllegalStateException("Cannot access result before job finishes")
}
_result
}
/**
* @return the global unique id of this Job.
*/
def id: String = {
if (!isSet) {
throw new IllegalStateException("Cannot access id before calling setId")
}
_id
}
/**
* @return the output op id of this Job. Each Job has a unique output op id in the same JobSet.
*/
def outputOpId: Int = {
if (!isSet) {
throw new IllegalStateException("Cannot access number before calling setId")
}
_outputOpId
}
def setOutputOpId(outputOpId: Int) {
if (isSet) {
throw new IllegalStateException("Cannot call setOutputOpId more than once")
}
isSet = true
_id = s"streaming job $time.$outputOpId"
_outputOpId = outputOpId
}
override def toString: String = id
......
......@@ -172,16 +172,28 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
ssc.waiter.notifyError(e)
}
private class JobHandler(job: Job) extends Runnable {
private class JobHandler(job: Job) extends Runnable with Logging {
def run() {
eventLoop.post(JobStarted(job))
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
// since we may need to write output to an existing directory during checkpoint recovery;
// see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
try {
eventLoop.post(JobStarted(job))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
job.run()
}
eventLoop.post(JobCompleted(job))
} finally {
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
}
eventLoop.post(JobCompleted(job))
}
}
}
private[streaming] object JobScheduler {
val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
}
......@@ -35,7 +35,7 @@ case class JobSet(
private var processingStartTime = -1L // when the first job of this jobset started processing
private var processingEndTime = -1L // when the last job of this jobset finished processing
jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
incompleteJobs ++= jobs
def handleJobStart(job: Job) {
......
......@@ -19,7 +19,6 @@ package org.apache.spark.streaming.ui
import scala.xml.Node
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.ui.UIUtils
private[ui] abstract class BatchTableBase(tableId: String) {
......@@ -31,18 +30,20 @@ private[ui] abstract class BatchTableBase(tableId: String) {
<th>Processing Time</th>
}
protected def baseRow(batch: BatchInfo): Seq[Node] = {
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatDate(batch.batchTime.milliseconds)
val eventCount = batch.receivedBlockInfo.values.map {
receivers => receivers.map(_.numRecords).sum
}.sum
val eventCount = batch.numRecords
val schedulingDelay = batch.schedulingDelay
val formattedSchedulingDelay = schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(UIUtils.formatDuration).getOrElse("-")
<td sorttable_customkey={batchTime.toString}>{formattedBatchTime}</td>
<td sorttable_customkey={batchTime.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={eventCount.toString}>{eventCount.toString} events</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
......@@ -73,8 +74,9 @@ private[ui] abstract class BatchTableBase(tableId: String) {
protected def renderRows: Seq[Node]
}
private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatches: Seq[BatchInfo])
extends BatchTableBase("active-batches-table") {
private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData]) extends BatchTableBase("active-batches-table") {
override protected def columns: Seq[Node] = super.columns ++ <th>Status</th>
......@@ -85,16 +87,16 @@ private[ui] class ActiveBatchTable(runningBatches: Seq[BatchInfo], waitingBatche
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
}
private def runningBatchRow(batch: BatchInfo): Seq[Node] = {
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>processing</td>
}
private def waitingBatchRow(batch: BatchInfo): Seq[Node] = {
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ <td>queued</td>
}
}
private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData])
extends BatchTableBase("completed-batches-table") {
override protected def columns: Seq[Node] = super.columns ++ <th>Total Delay</th>
......@@ -103,7 +105,7 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchInfo])
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}
private def completedBatchRow(batch: BatchInfo): Seq[Node] = {
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(UIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.ui
import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node}
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.streaming.Time
import org.apache.spark.ui.{UIUtils, WebUIPage}
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
import org.apache.spark.ui.jobs.UIData.JobUIData
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener
private val sparkListener = parent.ssc.sc.jobProgressListener
private def columns: Seq[Node] = {
<th>Output Op Id</th>
<th>Description</th>
<th>Duration</th>
<th>Job Id</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
<th>Error</th>
}
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
*/
def generateJobRow(
outputOpId: OutputOpId,
formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
val lastStageInfo = Option(sparkJob.stageIds)
.filter(_.nonEmpty)
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
val lastStageData = lastStageInfo.flatMap { s =>
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
}
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
val duration: Option[Long] = {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
}
val lastFailureReason =
sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("")
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${sparkJob.jobId}"
// In the first row, output op id and its information needs to be shown. In other rows, these
// cells will be taken up due to "rowspan".
// scalastyle:off
val prefixCells =
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>
<span class="description-input" title={lastStageDescription}>
{lastStageDescription}
</span>{lastStageName}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
} else {
Nil
}
// scalastyle:on
<tr>
{prefixCells}
<td sorttable_customkey={sparkJob.jobId.toString}>
<a href={detailUrl}>
{sparkJob.jobId}{sparkJob.jobGroup.map(id => s"($id)").getOrElse("")}
</a>
</td>
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
{formattedDuration}
</td>
<td class="stage-progress-cell">
{sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
{if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
{if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
</td>
<td class="progress-cell">
{
UIUtils.makeProgressBar(
started = sparkJob.numActiveTasks,
completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks,
skipped = sparkJob.numSkippedTasks,
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
{failureReasonCell(lastFailureReason)}
</tr>
}
private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
val sparkjobDurations = sparkJobs.map(sparkJob => {
sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
end - start
}
})
val formattedOutputOpDuration =
if (sparkjobDurations.exists(_ == None)) {
// If any job does not finish, set "formattedOutputOpDuration" to "-"
"-"
} else {
UIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
}
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
}
private def failureReasonCell(failureReason: String): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
</div>
// scalastyle:on
} else {
""
}
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
}
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
sparkListener.activeJobs.get(sparkJobId).orElse {
sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
sparkListener.failedJobs.find(_.jobId == sparkJobId)
}
}
}
/**
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
sortBy(_._1). // sorted by OutputOpId
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
// Filter out spark Job ids that don't exist in sparkListener
(outputOpId, sparkJobIds.flatMap(getJobData))
}
<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
<thead>
{columns}
</thead>
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
}
}
</tbody>
</table>
}
}
def render(request: HttpServletRequest): Seq[Node] = {
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
throw new IllegalArgumentException(s"Missing id parameter")
}
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
val batchUIData = streamingListener.getBatchUIData(batchTime).getOrElse {
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
}
val formattedSchedulingDelay =
batchUIData.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedProcessingTime =
batchUIData.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
val formattedTotalDelay = batchUIData.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<strong>Batch Duration: </strong>
{UIUtils.formatDuration(streamingListener.batchDuration)}
</li>
<li>
<strong>Input data size: </strong>
{batchUIData.numRecords} records
</li>
<li>
<strong>Scheduling delay: </strong>
{formattedSchedulingDelay}
</li>
<li>
<strong>Processing time: </strong>
{formattedProcessingTime}
</li>
<li>
<strong>Total delay: </strong>
{formattedTotalDelay}
</li>
</ul>
</div>
val jobTable =
if (batchUIData.outputOpIdSparkJobIdPairs.isEmpty) {
<div>Cannot find any job for Batch {formattedBatchTime}.</div>
} else {
generateJobTable(batchUIData)
}
val content = summary ++ jobTable
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.ui
import org.apache.spark.streaming.Time
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.ui.StreamingJobProgressListener._
private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
private[ui] case class BatchUIData(
val batchTime: Time,
val receiverNumRecords: Map[Int, Long],
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
* Time taken for the first job of this batch to start processing from the time this batch
* was submitted to the streaming scheduler. Essentially, it is
* `processingStartTime` - `submissionTime`.
*/
def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
/**
* Time taken for the all jobs of this batch to finish processing from the time they started
* processing. Essentially, it is `processingEndTime` - `processingStartTime`.
*/
def processingDelay: Option[Long] = {
for (start <- processingStartTime;
end <- processingEndTime)
yield end - start
}
/**
* Time taken for all the jobs of this batch to finish processing from the time they
* were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
*/
def totalDelay: Option[Long] = processingEndTime.map(_ - submissionTime)
/**
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = receiverNumRecords.map(_._2).sum
}
private[ui] object BatchUIData {
def apply(batchInfo: BatchInfo): BatchUIData = {
new BatchUIData(
batchInfo.batchTime,
batchInfo.receivedBlockInfo.mapValues(_.map(_.numRecords).sum),
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime
)
}
}
......@@ -17,29 +17,58 @@
package org.apache.spark.streaming.ui
import scala.collection.mutable.{Queue, HashMap}
import java.util.LinkedHashMap
import java.util.{Map => JMap}
import java.util.Properties
import scala.collection.mutable.{ArrayBuffer, Queue, HashMap, SynchronizedBuffer}
import org.apache.spark.scheduler._
import org.apache.spark.streaming.{Time, StreamingContext}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
import org.apache.spark.streaming.scheduler.BatchInfo
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
extends StreamingListener {
extends StreamingListener with SparkListener {
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
private val completedBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
private val runningBatchUIData = new HashMap[Time, BatchUIData]
private val completedBatchUIData = new Queue[BatchUIData]
private val batchUIDataLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
// Because onJobStart and onBatchXXX messages are processed in different threads,
// we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
override def removeEldestEntry(
p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
// If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
// SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
// may add some information for a removed batch when processing "onJobStart". It will be a
// memory leak.
//
// To avoid the memory leak, we control the size of "batchTimeToOutputOpIdToSparkJobIds" and
// evict the eldest one.
//
// Note: if "onJobStart" happens before "onBatchSubmitted", the size of
// "batchTimeToOutputOpIdToSparkJobIds" may be greater than the number of the retained
// batches temporarily, so here we use "10" to handle such case. This is not a perfect
// solution, but at least it can handle most of cases.
size() >
waitingBatchUIData.size + runningBatchUIData.size + completedBatchUIData.size + 10
}
}
val batchDuration = ssc.graph.batchDuration.milliseconds
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
......@@ -62,37 +91,62 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
synchronized {
waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
waitingBatchUIData(batchSubmitted.batchInfo.batchTime) =
BatchUIData(batchSubmitted.batchInfo)
}
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted): Unit = synchronized {
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
val batchUIData = BatchUIData(batchStarted.batchInfo)
runningBatchUIData(batchStarted.batchInfo.batchTime) = BatchUIData(batchStarted.batchInfo)
waitingBatchUIData.remove(batchStarted.batchInfo.batchTime)
batchStarted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalReceivedRecords += infos.map(_.numRecords).sum
}
totalReceivedRecords += batchUIData.numRecords
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
synchronized {
waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
completedBatchInfos.enqueue(batchCompleted.batchInfo)
if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
waitingBatchUIData.remove(batchCompleted.batchInfo.batchTime)
runningBatchUIData.remove(batchCompleted.batchInfo.batchTime)
val batchUIData = BatchUIData(batchCompleted.batchInfo)
completedBatchUIData.enqueue(batchUIData)
if (completedBatchUIData.size > batchUIDataLimit) {
val removedBatch = completedBatchUIData.dequeue()
batchTimeToOutputOpIdSparkJobIdPair.remove(removedBatch.batchTime)
}
totalCompletedBatches += 1L
batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
totalProcessedRecords += infos.map(_.numRecords).sum
totalProcessedRecords += batchUIData.numRecords
}
}
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
if (outputOpIdToSparkJobIds == null) {
outputOpIdToSparkJobIds =
new ArrayBuffer[OutputOpIdAndSparkJobId]()
with SynchronizedBuffer[OutputOpIdAndSparkJobId]
batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
}
outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
}
}
def numReceivers: Int = synchronized {
ssc.graph.getReceiverInputStreams().size
private def getBatchTimeAndOutputOpId(properties: Properties): Option[(Time, Int)] = {
val batchTime = properties.getProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY)
if (batchTime == null) {
// Not submitted from JobScheduler
None
} else {
val outputOpId = properties.getProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY)
assert(outputOpId != null)
Some(Time(batchTime.toLong) -> outputOpId.toInt)
}
}
def numReceivers: Int = ssc.graph.getReceiverInputStreams().size
def numTotalCompletedBatches: Long = synchronized {
totalCompletedBatches
}
......@@ -106,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numUnprocessedBatches: Long = synchronized {
waitingBatchInfos.size + runningBatchInfos.size
waitingBatchUIData.size + runningBatchUIData.size
}
def waitingBatches: Seq[BatchInfo] = synchronized {
waitingBatchInfos.values.toSeq
def waitingBatches: Seq[BatchUIData] = synchronized {
waitingBatchUIData.values.toSeq
}
def runningBatches: Seq[BatchInfo] = synchronized {
runningBatchInfos.values.toSeq
def runningBatches: Seq[BatchUIData] = synchronized {
runningBatchUIData.values.toSeq
}
def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
completedBatchInfos.toSeq
def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
completedBatchUIData.toSeq
}
def processingDelayDistribution: Option[Distribution] = synchronized {
......@@ -134,15 +188,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
val latestBatches = retainedBatches.reverse.take(batchUIDataLimit)
(0 until numReceivers).map { receiverId =>
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
batchInfo.get(receiverId).getOrElse(Array.empty)
}
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
// calculate records per second for each batch
blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
val recordsOfParticularReceiver = latestBatches.map { batch =>
// calculate records per second for each batch
batch.receiverNumRecords.get(receiverId).sum.toDouble * 1000 / batchDuration
}
val distributionOption = Distribution(recordsOfParticularReceiver)
(receiverId, distributionOption)
......@@ -150,10 +200,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receiverNumRecords)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId =>
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
(receiverId, lastReceivedBlockInfo.getOrElse(receiverId, 0L))
}.toMap
}.getOrElse {
(0 until numReceivers).map(receiverId => (receiverId, 0L)).toMap
......@@ -164,20 +214,39 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
receiverInfos.get(receiverId)
}
def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
def lastCompletedBatch: Option[BatchUIData] = synchronized {
completedBatchUIData.sortBy(_.batchTime)(Time.ordering).lastOption
}
def lastReceivedBatch: Option[BatchInfo] = synchronized {
def lastReceivedBatch: Option[BatchUIData] = synchronized {
retainedBatches.lastOption
}
private def retainedBatches: Seq[BatchInfo] = {
(waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
private def retainedBatches: Seq[BatchUIData] = {
(waitingBatchUIData.values.toSeq ++
runningBatchUIData.values.toSeq ++ completedBatchUIData).sortBy(_.batchTime)(Time.ordering)
}
private def extractDistribution(getMetric: BatchUIData => Option[Long]): Option[Distribution] = {
Distribution(completedBatchUIData.flatMap(getMetric(_)).map(_.toDouble))
}
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
def getBatchUIData(batchTime: Time): Option[BatchUIData] = synchronized {
val batchUIData = waitingBatchUIData.get(batchTime).orElse {
runningBatchUIData.get(batchTime).orElse {
completedBatchUIData.find(batch => batch.batchTime == batchTime)
}
}
batchUIData.foreach { _batchUIData =>
val outputOpIdToSparkJobIds =
Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
_batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
}
batchUIData
}
}
private[streaming] object StreamingJobProgressListener {
type SparkJobId = Int
type OutputOpId = Int
}
......@@ -27,14 +27,16 @@ import StreamingTab._
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
private[spark] class StreamingTab(ssc: StreamingContext)
private[spark] class StreamingTab(val ssc: StreamingContext)
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {
val parent = getSparkUI(ssc)
val listener = ssc.progressListener
ssc.addStreamingListener(listener)
ssc.sc.addSparkListener(listener)
attachPage(new StreamingPage(this))
attachPage(new BatchPage(this))
parent.attachTab(this)
def detach() {
......
......@@ -17,6 +17,8 @@
package org.apache.spark.streaming
import scala.collection.mutable.Queue
import org.openqa.selenium.WebDriver
import org.openqa.selenium.htmlunit.HtmlUnitDriver
import org.scalatest._
......@@ -60,8 +62,28 @@ class UISeleniumSuite
ssc
}
private def setupStreams(ssc: StreamingContext): Unit = {
val rdds = Queue(ssc.sc.parallelize(1 to 4, 4))
val inputStream = ssc.queueStream(rdds)
inputStream.foreachRDD { rdd =>
rdd.foreach(_ => {})
rdd.foreach(_ => {})
}
inputStream.foreachRDD { rdd =>
rdd.foreach(_ => {})
try {
rdd.foreach(_ => throw new RuntimeException("Oops"))
} catch {
case e: SparkException if e.getMessage.contains("Oops") =>
}
}
}
test("attaching and detaching a Streaming tab") {
withStreamingContext(newSparkStreamingContext()) { ssc =>
setupStreams(ssc)
ssc.start()
val sparkUI = ssc.sparkContext.ui.get
eventually(timeout(10 seconds), interval(50 milliseconds)) {
......@@ -77,8 +99,8 @@ class UISeleniumSuite
statisticText should contain("Batch interval:")
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
h4Text should contain("Active Batches (0)")
h4Text should contain("Completed Batches (last 0 out of 0)")
h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Status")
......@@ -86,6 +108,63 @@ class UISeleniumSuite
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay", "Processing Time", "Total Delay")
}
val batchLinks =
findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
batchLinks.size should be >= 1
// Check a normal batch page
go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
summaryText should contain ("Batch Duration:")
summaryText should contain ("Input data size:")
summaryText should contain ("Scheduling delay:")
summaryText should contain ("Processing time:")
summaryText should contain ("Total delay:")
findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
"Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
}
// Check we have 2 output op ids
val outputOpIds = findAll(cssSelector(".output-op-id-cell")).toSeq
outputOpIds.map(_.attribute("rowspan")) should be (List(Some("2"), Some("2")))
outputOpIds.map(_.text) should be (List("0", "1"))
// Check job ids
val jobIdCells = findAll(cssSelector( """#batch-job-table a""")).toSeq
jobIdCells.map(_.text) should be (List("0", "1", "2", "3"))
val jobLinks = jobIdCells.flatMap(_.attribute("href"))
jobLinks.size should be (4)
// Check stage progress
findAll(cssSelector(""".stage-progress-cell""")).map(_.text).toSeq should be
(List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
// Check job progress
findAll(cssSelector(""".progress-cell""")).map(_.text).toSeq should be
(List("1/1", "1/1", "1/1", "0/1 (1 failed)"))
// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.text).toSeq
errorCells should have size 1
errorCells(0) should include("java.lang.RuntimeException: Oops")
// Check the job link in the batch page is right
go to (jobLinks(0))
val jobDetails = findAll(cssSelector("li strong")).map(_.text).toSeq
jobDetails should contain("Status:")
jobDetails should contain("Completed Stages:")
// Check a batch page without id
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/")
webDriver.getPageSource should include ("Missing id parameter")
// Check a non-exist batch
go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming/batch/?id=12345")
webDriver.getPageSource should include ("does not exist")
}
ssc.stop(false)
......
......@@ -17,8 +17,11 @@
package org.apache.spark.streaming.ui
import java.util.Properties
import org.scalatest.Matchers
import org.apache.spark.scheduler.SparkListenerJobStart
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.{Duration, Time, Milliseconds, TestSuiteBase}
......@@ -28,6 +31,17 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val input = (1 to 4).map(Seq(_)).toSeq
val operation = (d: DStream[Int]) => d.map(x => x)
private def createJobStart(
batchTime: Time, outputOpId: Int, jobId: Int): SparkListenerJobStart = {
val properties = new Properties()
properties.setProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, batchTime.milliseconds.toString)
properties.setProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, outputOpId.toString)
SparkListenerJobStart(jobId = jobId,
0L, // unused
Nil, // unused
properties)
}
override def batchDuration: Duration = Milliseconds(100)
test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
......@@ -43,7 +57,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
listener.waitingBatches should be (List(batchInfoSubmitted))
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (Nil)
listener.lastCompletedBatch should be (None)
......@@ -56,7 +70,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(batchInfoStarted))
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
listener.retainedCompletedBatches should be (Nil)
listener.lastCompletedBatch should be (None)
listener.numUnprocessedBatches should be (1)
......@@ -64,13 +78,40 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalProcessedRecords should be (0)
listener.numTotalReceivedRecords should be (600)
// onJobStart
val jobStart1 = createJobStart(Time(1000), outputOpId = 0, jobId = 0)
listener.onJobStart(jobStart1)
val jobStart2 = createJobStart(Time(1000), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart2)
val jobStart3 = createJobStart(Time(1000), outputOpId = 1, jobId = 0)
listener.onJobStart(jobStart3)
val jobStart4 = createJobStart(Time(1000), outputOpId = 1, jobId = 1)
listener.onJobStart(jobStart4)
val batchUIData = listener.getBatchUIData(Time(1000))
batchUIData should not be None
batchUIData.get.batchTime should be (batchInfoStarted.batchTime)
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
batchUIData.get.receiverNumRecords should be (Map(0 -> 300L, 1 -> 300L))
batchUIData.get.numRecords should be(600)
batchUIData.get.outputOpIdSparkJobIdPairs should be
Seq(OutputOpIdAndSparkJobId(0, 0),
OutputOpIdAndSparkJobId(0, 1),
OutputOpIdAndSparkJobId(1, 0),
OutputOpIdAndSparkJobId(1, 1))
// onBatchCompleted
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (Nil)
listener.retainedCompletedBatches should be (List(batchInfoCompleted))
listener.lastCompletedBatch should be (Some(batchInfoCompleted))
listener.retainedCompletedBatches should be (List(BatchUIData(batchInfoCompleted)))
listener.lastCompletedBatch should be (Some(BatchUIData(batchInfoCompleted)))
listener.numUnprocessedBatches should be (0)
listener.numTotalCompletedBatches should be (1)
listener.numTotalProcessedRecords should be (600)
......@@ -116,4 +157,55 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.retainedCompletedBatches.size should be (limit)
listener.numTotalCompletedBatches should be(limit + 10)
}
test("out-of-order onJobStart and onBatchXXX") {
val ssc = setupStreams(input, operation)
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
val listener = new StreamingJobProgressListener(ssc)
// fulfill completedBatchInfos
for(i <- 0 until limit) {
val batchInfoCompleted =
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart)
}
// onJobStart happens before onBatchSubmitted
val jobStart = createJobStart(Time(1000 + limit * 100), outputOpId = 0, jobId = 0)
listener.onJobStart(jobStart)
val batchInfoSubmitted =
BatchInfo(Time(1000 + limit * 100), Map.empty, (1000 + limit * 100), None, None)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
// We still can see the info retrieved from onJobStart
val batchUIData = listener.getBatchUIData(Time(1000 + limit * 100))
batchUIData should not be None
batchUIData.get.batchTime should be (batchInfoSubmitted.batchTime)
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
batchUIData.get.receiverNumRecords should be (Map.empty)
batchUIData.get.numRecords should be (0)
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
// A lot of "onBatchCompleted"s happen before "onJobStart"
for(i <- limit + 1 to limit * 2) {
val batchInfoCompleted =
BatchInfo(Time(1000 + i * 100), Map.empty, 1000 + i * 100, Some(2000 + i * 100), None)
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
}
for(i <- limit + 1 to limit * 2) {
val jobStart = createJobStart(Time(1000 + i * 100), outputOpId = 0, jobId = 1)
listener.onJobStart(jobStart)
}
// We should not leak memory
listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
(listener.waitingBatches.size + listener.runningBatches.size +
listener.retainedCompletedBatches.size + 10)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment