diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 3f1cab69068dcf06a8b5a030c93baa2524bfc823..831f60e870f74feef2a84adc2a284f46cb680921 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{NodeSeq, Node}
+import scala.xml.{NodeSeq, Node, Text}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
@@ -28,6 +28,7 @@ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
 import org.apache.spark.streaming.ui.StreamingJobProgressListener.{SparkJobId, OutputOpId}
 import org.apache.spark.ui.jobs.UIData.JobUIData
 
+private case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData])
 
 private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
   private val streamingListener = parent.listener
@@ -44,25 +45,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       <th>Error</th>
   }
 
+  private def generateJobRow(
+      outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      sparkJob: SparkJobIdWithUIData): Seq[Node] = {
+    if (sparkJob.jobUIData.isDefined) {
+      generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
+    } else {
+      generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
+        numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
+    }
+  }
+
   /**
    * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
    * one cell, we use "rowspan" for the first row of a output op.
    */
-  def generateJobRow(
+  private def generateNormalJobRow(
       outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
       formattedOutputOpDuration: String,
       numSparkJobRowsInOutputOp: Int,
       isFirstRow: Boolean,
       sparkJob: JobUIData): Seq[Node] = {
-    val lastStageInfo = Option(sparkJob.stageIds)
-      .filter(_.nonEmpty)
-      .flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
-    val lastStageData = lastStageInfo.flatMap { s =>
-      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
-    }
-
-    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
-    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
     val duration: Option[Long] = {
       sparkJob.submissionTime.map { start =>
         val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
@@ -83,9 +92,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
       if (isFirstRow) {
         <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>
-          <span class="description-input" title={lastStageDescription}>
-            {lastStageDescription}
-          </span>{lastStageName}
+          {outputOpDescription}
         </td>
         <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
       } else {
@@ -122,27 +129,96 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
     </tr>
   }
 
-  private def generateOutputOpIdRow(
-      outputOpId: OutputOpId, sparkJobs: Seq[JobUIData]): Seq[Node] = {
-    val sparkjobDurations = sparkJobs.map(sparkJob => {
-      sparkJob.submissionTime.map { start =>
-        val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
-        end - start
+  /**
+   * If a job is dropped by sparkListener due to exceeding the limitation, we only show the job id
+   * with "-" cells.
+   */
+  private def generateDroppedJobRow(
+      outputOpId: OutputOpId,
+      outputOpDescription: Seq[Node],
+      formattedOutputOpDuration: String,
+      numSparkJobRowsInOutputOp: Int,
+      isFirstRow: Boolean,
+      jobId: Int): Seq[Node] = {
+    // In the first row, output op id and its information needs to be shown. In other rows, these
+    // cells will be taken up due to "rowspan".
+    // scalastyle:off
+    val prefixCells =
+      if (isFirstRow) {
+        <td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
+          <td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
+          <td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
+      } else {
+        Nil
       }
-    })
+    // scalastyle:on
+
+    <tr>
+      {prefixCells}
+      <td sorttable_customkey={jobId.toString}>
+        {jobId.toString}
+      </td>
+      <!-- Duration -->
+      <td>-</td>
+      <!-- Stages: Succeeded/Total -->
+      <td>-</td>
+      <!-- Tasks (for all stages): Succeeded/Total -->
+      <td>-</td>
+      <!-- Error -->
+      <td>-</td>
+    </tr>
+  }
+
+  private def generateOutputOpIdRow(
+      outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+    // We don't count the durations of dropped jobs
+    val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
+      map(sparkJob => {
+        sparkJob.submissionTime.map { start =>
+          val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis())
+          end - start
+        }
+      })
     val formattedOutputOpDuration =
-      if (sparkjobDurations.exists(_ == None)) {
-        // If any job does not finish, set "formattedOutputOpDuration" to "-"
+      if (sparkJobDurations.isEmpty || sparkJobDurations.exists(_ == None)) {
+        // If no job or any job does not finish, set "formattedOutputOpDuration" to "-"
         "-"
       } else {
-        SparkUIUtils.formatDuration(sparkjobDurations.flatMap(x => x).sum)
+        SparkUIUtils.formatDuration(sparkJobDurations.flatMap(x => x).sum)
       }
-    generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
+
+    val description = generateOutputOpDescription(sparkJobs)
+
+    generateJobRow(
+      outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
       sparkJobs.tail.map { sparkJob =>
-        generateJobRow(outputOpId, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
+        generateJobRow(
+          outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
       }.flatMap(x => x)
   }
 
+  private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
+    val lastStageInfo =
+      sparkJobs.flatMap(_.jobUIData).headOption. // Get the first JobUIData
+        flatMap { sparkJob => // For the first job, get the latest Stage info
+          if (sparkJob.stageIds.isEmpty) {
+            None
+          } else {
+            sparkListener.stageIdToInfo.get(sparkJob.stageIds.max)
+          }
+        }
+    val lastStageData = lastStageInfo.flatMap { s =>
+      sparkListener.stageIdToData.get((s.stageId, s.attemptId))
+    }
+
+    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+    val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+
+    <span class="description-input" title={lastStageDescription}>
+      {lastStageDescription}
+    </span> ++ Text(lastStageName)
+  }
+
   private def failureReasonCell(failureReason: String): Seq[Node] = {
     val isMultiline = failureReason.indexOf('\n') >= 0
     // Display the first line by default
@@ -187,10 +263,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
       }
     sparkListener.synchronized {
-      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] =
+      val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
         outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
-          // Filter out spark Job ids that don't exist in sparkListener
-          (outputOpId, sparkJobIds.flatMap(getJobData))
+          (outputOpId,
+            sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
         }
 
       <table id="batch-job-table" class="table table-bordered table-striped table-condensed">
@@ -200,7 +276,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
         <tbody>
           {
             outputOpIdWithJobs.map {
-              case (outputOpId, jobs) => generateOutputOpIdRow(outputOpId, jobs)
+              case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
             }
           }
         </tbody>