Skip to content
Snippets Groups Projects
Commit bcea0bfd authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-11742][STREAMING] Add the failure info to the batch lists

<img width="1365" alt="screen shot 2015-11-13 at 9 57 43 pm" src="https://cloud.githubusercontent.com/assets/1000778/11162322/9b88e204-8a51-11e5-8c57-a44889cab713.png">

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #9711 from zsxwing/failure-info.
parent 3c025087
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,22 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
}
/**
* Return the first failure reason if finding in the batches.
*/
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}
protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
UIUtils.failureReasonCell(
failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false)
}.getOrElse(<td>-</td>)
}
protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
......@@ -97,9 +113,17 @@ private[ui] class ActiveBatchTable(
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
private val firstFailureReason = getFirstFailureReason(runningBatches)
override protected def columns: Seq[Node] = super.columns ++ {
<th>Output Ops: Succeeded/Total</th>
<th>Status</th>
<th>Status</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override protected def renderRows: Seq[Node] = {
......@@ -110,20 +134,41 @@ private[ui] class ActiveBatchTable(
}
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td>
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
}
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
}
}
}
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
private val firstFailureReason = getFirstFailureReason(batches)
override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th>
<th>Output Ops: Succeeded/Total</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override protected def renderRows: Seq[Node] = {
......@@ -138,6 +183,12 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
} ++ createOutputOperationProgressBar(batch)
} ++ createOutputOperationProgressBar(batch)++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
}
}
......@@ -149,7 +149,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
{failureReasonCell(lastFailureReason, rowspan = 1)}
{UIUtils.failureReasonCell(lastFailureReason)}
</tr>
}
......@@ -245,48 +245,6 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</div>
}
private def failureReasonCell(
failureReason: String,
rowspan: Int,
includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val failureDetails =
if (isMultiline && !includeFirstLineInExpandDetails) {
// Skip the first line
failureReason.substring(failureReason.indexOf('\n') + 1)
} else {
failureReason
}
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureDetails}</pre>
</div>
// scalastyle:on
} else {
""
}
if (rowspan == 1) {
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
} else {
<td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
{failureReasonSummary}{details}
</td>
}
}
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
sparkListener.activeJobs.get(sparkJobId).orElse {
sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse {
......@@ -434,8 +392,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private def outputOpStatusCell(outputOp: OutputOperationUIData, rowspan: Int): Seq[Node] = {
outputOp.failureReason match {
case Some(failureReason) =>
val failureReasonForUI = generateOutputOperationStatusForUI(failureReason)
failureReasonCell(failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
UIUtils.failureReasonCell(
failureReasonForUI, rowspan, includeFirstLineInExpandDetails = false)
case None =>
if (outputOp.endTime.isEmpty) {
<td rowspan={rowspan.toString}>-</td>
......
......@@ -17,6 +17,10 @@
package org.apache.spark.streaming.ui
import scala.xml.Node
import org.apache.commons.lang3.StringEscapeUtils
import java.text.SimpleDateFormat
import java.util.TimeZone
import java.util.concurrent.TimeUnit
......@@ -124,4 +128,60 @@ private[streaming] object UIUtils {
}
}
}
def createOutputOperationFailureForUI(failure: String): String = {
if (failure.startsWith("org.apache.spark.Spark")) {
// SparkException or SparkDriverExecutionException
"Failed due to Spark job error\n" + failure
} else {
var nextLineIndex = failure.indexOf("\n")
if (nextLineIndex < 0) {
nextLineIndex = failure.size
}
val firstLine = failure.substring(0, nextLineIndex)
s"Failed due to error: $firstLine\n$failure"
}
}
def failureReasonCell(
failureReason: String,
rowspan: Int = 1,
includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
if (isMultiline) {
failureReason.substring(0, failureReason.indexOf('\n'))
} else {
failureReason
})
val failureDetails =
if (isMultiline && !includeFirstLineInExpandDetails) {
// Skip the first line
failureReason.substring(failureReason.indexOf('\n') + 1)
} else {
failureReason
}
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureDetails}</pre>
</div>
// scalastyle:on
} else {
""
}
if (rowspan == 1) {
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
} else {
<td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
{failureReasonSummary}{details}
</td>
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment