diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 982e8915a8ded5bac7edef45854f412d32268bed..7953d77fd7ece33434fac0be1d7fca7a1cb59343 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.ui.exec
 
-import java.net.URLEncoder
 import javax.servlet.http.HttpServletRequest
 
 import scala.xml.Node
 
 import org.apache.spark.status.api.v1.ExecutorSummary
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.util.Utils
+import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 // This isn't even used anymore -- but we need to keep it b/c of a MiMa false positive
 private[ui] case class ExecutorSummaryInfo(
@@ -83,18 +81,7 @@ private[spark] object ExecutorsPage {
     val memUsed = status.memUsed
     val maxMem = status.maxMem
     val diskUsed = status.diskUsed
-    val totalCores = listener.executorToTotalCores.getOrElse(execId, 0)
-    val maxTasks = listener.executorToTasksMax.getOrElse(execId, 0)
-    val activeTasks = listener.executorToTasksActive.getOrElse(execId, 0)
-    val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
-    val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
-    val totalTasks = activeTasks + failedTasks + completedTasks
-    val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
-    val totalGCTime = listener.executorToJvmGCTime.getOrElse(execId, 0L)
-    val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
-    val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
-    val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
-    val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
+    val taskSummary = listener.executorToTaskSummary.getOrElse(execId, ExecutorTaskSummary(execId))
 
     new ExecutorSummary(
       execId,
@@ -103,19 +90,19 @@ private[spark] object ExecutorsPage {
       rddBlocks,
       memUsed,
       diskUsed,
-      totalCores,
-      maxTasks,
-      activeTasks,
-      failedTasks,
-      completedTasks,
-      totalTasks,
-      totalDuration,
-      totalGCTime,
-      totalInputBytes,
-      totalShuffleRead,
-      totalShuffleWrite,
+      taskSummary.totalCores,
+      taskSummary.tasksMax,
+      taskSummary.tasksActive,
+      taskSummary.tasksFailed,
+      taskSummary.tasksComplete,
+      taskSummary.tasksActive + taskSummary.tasksFailed + taskSummary.tasksComplete,
+      taskSummary.duration,
+      taskSummary.jvmGCTime,
+      taskSummary.inputBytes,
+      taskSummary.shuffleRead,
+      taskSummary.shuffleWrite,
       maxMem,
-      executorLogs
+      taskSummary.executorLogs
     )
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 676f4457510c21fa077e4ce0b3fa3889a3af65bd..678571fd4f5acea89776b85336c09ad69c7a8176 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.ui.exec
 
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{LinkedHashMap, ListBuffer}
 
 import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
 import org.apache.spark.ui.{SparkUI, SparkUITab}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
 
 private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
   val listener = parent.executorsListener
@@ -38,6 +37,25 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
   }
 }
 
+private[ui] case class ExecutorTaskSummary(
+    var executorId: String,
+    var totalCores: Int = 0,
+    var tasksMax: Int = 0,
+    var tasksActive: Int = 0,
+    var tasksFailed: Int = 0,
+    var tasksComplete: Int = 0,
+    var duration: Long = 0L,
+    var jvmGCTime: Long = 0L,
+    var inputBytes: Long = 0L,
+    var inputRecords: Long = 0L,
+    var outputBytes: Long = 0L,
+    var outputRecords: Long = 0L,
+    var shuffleRead: Long = 0L,
+    var shuffleWrite: Long = 0L,
+    var executorLogs: Map[String, String] = Map.empty,
+    var isAlive: Boolean = true
+)
+
 /**
  * :: DeveloperApi ::
  * A SparkListener that prepares information to be displayed on the ExecutorsTab
@@ -45,21 +63,11 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "exec
 @DeveloperApi
 class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: SparkConf)
     extends SparkListener {
-  val executorToTotalCores = HashMap[String, Int]()
-  val executorToTasksMax = HashMap[String, Int]()
-  val executorToTasksActive = HashMap[String, Int]()
-  val executorToTasksComplete = HashMap[String, Int]()
-  val executorToTasksFailed = HashMap[String, Int]()
-  val executorToDuration = HashMap[String, Long]()
-  val executorToJvmGCTime = HashMap[String, Long]()
-  val executorToInputBytes = HashMap[String, Long]()
-  val executorToInputRecords = HashMap[String, Long]()
-  val executorToOutputBytes = HashMap[String, Long]()
-  val executorToOutputRecords = HashMap[String, Long]()
-  val executorToShuffleRead = HashMap[String, Long]()
-  val executorToShuffleWrite = HashMap[String, Long]()
-  val executorToLogUrls = HashMap[String, Map[String, String]]()
-  val executorIdToData = HashMap[String, ExecutorUIData]()
+  var executorToTaskSummary = LinkedHashMap[String, ExecutorTaskSummary]()
+  var executorEvents = new ListBuffer[SparkListenerEvent]()
+
+  private val maxTimelineExecutors = conf.getInt("spark.ui.timeline.executors.maximum", 1000)
+  private val retainedDeadExecutors = conf.getInt("spark.ui.retainedDeadExecutors", 100)
 
   def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
 
@@ -67,18 +75,29 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
 
   override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
     val eid = executorAdded.executorId
-    executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
-    executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
-    executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
-    executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
+    val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+    taskSummary.executorLogs = executorAdded.executorInfo.logUrlMap
+    taskSummary.totalCores = executorAdded.executorInfo.totalCores
+    taskSummary.tasksMax = taskSummary.totalCores / conf.getInt("spark.task.cpus", 1)
+    executorEvents += executorAdded
+    if (executorEvents.size > maxTimelineExecutors) {
+      executorEvents.remove(0)
+    }
+
+    val deadExecutors = executorToTaskSummary.filter(e => !e._2.isAlive)
+    if (deadExecutors.size > retainedDeadExecutors) {
+      val head = deadExecutors.head
+      executorToTaskSummary.remove(head._1)
+    }
   }
 
   override def onExecutorRemoved(
       executorRemoved: SparkListenerExecutorRemoved): Unit = synchronized {
-    val eid = executorRemoved.executorId
-    val uiData = executorIdToData(eid)
-    uiData.finishTime = Some(executorRemoved.time)
-    uiData.finishReason = Some(executorRemoved.reason)
+    executorEvents += executorRemoved
+    if (executorEvents.size > maxTimelineExecutors) {
+      executorEvents.remove(0)
+    }
+    executorToTaskSummary.get(executorRemoved.executorId).foreach(e => e.isAlive = false)
   }
 
   override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
@@ -87,19 +106,25 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
         s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
         s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
       }
-      storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
+      storageStatus.foreach { s =>
+        val eid = s.blockManagerId.executorId
+        val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+        taskSummary.executorLogs = logs.toMap
+      }
     }
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
     val eid = taskStart.taskInfo.executorId
-    executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
+    val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
+    taskSummary.tasksActive += 1
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
     val info = taskEnd.taskInfo
     if (info != null) {
       val eid = info.executorId
+      val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid))
       taskEnd.reason match {
         case Resubmitted =>
           // Note: For resubmitted tasks, we continue to use the metrics that belong to the
@@ -108,31 +133,26 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
           // metrics added by each attempt, but this is much more complicated.
           return
         case e: ExceptionFailure =>
-          executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+          taskSummary.tasksFailed += 1
         case _ =>
-          executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+          taskSummary.tasksComplete += 1
       }
-
-      executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1
-      executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration
+      if (taskSummary.tasksActive >= 1) {
+        taskSummary.tasksActive -= 1
+      }
+      taskSummary.duration += info.duration
 
       // Update shuffle read/write
       val metrics = taskEnd.taskMetrics
       if (metrics != null) {
-        executorToInputBytes(eid) =
-          executorToInputBytes.getOrElse(eid, 0L) + metrics.inputMetrics.bytesRead
-        executorToInputRecords(eid) =
-          executorToInputRecords.getOrElse(eid, 0L) + metrics.inputMetrics.recordsRead
-        executorToOutputBytes(eid) =
-          executorToOutputBytes.getOrElse(eid, 0L) + metrics.outputMetrics.bytesWritten
-        executorToOutputRecords(eid) =
-          executorToOutputRecords.getOrElse(eid, 0L) + metrics.outputMetrics.recordsWritten
-
-        executorToShuffleRead(eid) =
-          executorToShuffleRead.getOrElse(eid, 0L) + metrics.shuffleReadMetrics.remoteBytesRead
-        executorToShuffleWrite(eid) =
-          executorToShuffleWrite.getOrElse(eid, 0L) + metrics.shuffleWriteMetrics.bytesWritten
-        executorToJvmGCTime(eid) = executorToJvmGCTime.getOrElse(eid, 0L) + metrics.jvmGCTime
+        taskSummary.inputBytes += metrics.inputMetrics.bytesRead
+        taskSummary.inputRecords += metrics.inputMetrics.recordsRead
+        taskSummary.outputBytes += metrics.outputMetrics.bytesWritten
+        taskSummary.outputRecords += metrics.outputMetrics.recordsWritten
+
+        taskSummary.shuffleRead += metrics.shuffleReadMetrics.remoteBytesRead
+        taskSummary.shuffleWrite += metrics.shuffleWriteMetrics.bytesWritten
+        taskSummary.jvmGCTime += metrics.jvmGCTime
       }
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index e5363ce8ca9dcf71d66a49ca4bf8fafd8d5d7a5e..c04964ec66479c252fa07677b6fe5e6e7300ca1c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -28,9 +28,9 @@ import scala.xml._
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler._
 import org.apache.spark.ui._
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.ui.jobs.UIData.{JobUIData, StageUIData}
 import org.apache.spark.util.Utils
 
 /** Page showing list of all ongoing and recently finished jobs */
@@ -123,55 +123,55 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
     }
   }
 
-  private def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = {
+  private def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]):
+      Seq[String] = {
     val events = ListBuffer[String]()
     executorUIDatas.foreach {
-      case (executorId, event) =>
+      case a: SparkListenerExecutorAdded =>
         val addedEvent =
           s"""
              |{
              |  'className': 'executor added',
              |  'group': 'executors',
-             |  'start': new Date(${event.startTime}),
+             |  'start': new Date(${a.time}),
              |  'content': '<div class="executor-event-content"' +
              |    'data-toggle="tooltip" data-placement="bottom"' +
-             |    'data-title="Executor ${executorId}<br>' +
-             |    'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' +
-             |    'data-html="true">Executor ${executorId} added</div>'
+             |    'data-title="Executor ${a.executorId}<br>' +
+             |    'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
+             |    'data-html="true">Executor ${a.executorId} added</div>'
              |}
            """.stripMargin
         events += addedEvent
+      case e: SparkListenerExecutorRemoved =>
+        val removedEvent =
+          s"""
+             |{
+             |  'className': 'executor removed',
+             |  'group': 'executors',
+             |  'start': new Date(${e.time}),
+             |  'content': '<div class="executor-event-content"' +
+             |    'data-toggle="tooltip" data-placement="bottom"' +
+             |    'data-title="Executor ${e.executorId}<br>' +
+             |    'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
+             |    '${
+                      if (e.reason != null) {
+                        s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
+                      } else {
+                        ""
+                      }
+                   }"' +
+             |    'data-html="true">Executor ${e.executorId} removed</div>'
+             |}
+           """.stripMargin
+        events += removedEvent
 
-        if (event.finishTime.isDefined) {
-          val removedEvent =
-            s"""
-               |{
-               |  'className': 'executor removed',
-               |  'group': 'executors',
-               |  'start': new Date(${event.finishTime.get}),
-               |  'content': '<div class="executor-event-content"' +
-               |    'data-toggle="tooltip" data-placement="bottom"' +
-               |    'data-title="Executor ${executorId}<br>' +
-               |    'Removed at ${UIUtils.formatDate(new Date(event.finishTime.get))}' +
-               |    '${
-                        if (event.finishReason.isDefined) {
-                          s"""<br>Reason: ${event.finishReason.get.replace("\n", " ")}"""
-                        } else {
-                          ""
-                        }
-                     }"' +
-               |    'data-html="true">Executor ${executorId} removed</div>'
-               |}
-             """.stripMargin
-          events += removedEvent
-        }
     }
     events.toSeq
   }
 
   private def makeTimeline(
       jobs: Seq[JobUIData],
-      executors: HashMap[String, ExecutorUIData],
+      executors: Seq[SparkListenerEvent],
       startTime: Long): Seq[Node] = {
 
     val jobEventJsonAsStrSeq = makeJobEvent(jobs)
@@ -353,7 +353,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
       var content = summary
       val executorListener = parent.executorListener
       content ++= makeTimeline(activeJobs ++ completedJobs ++ failedJobs,
-          executorListener.executorIdToData, startTime)
+          executorListener.executorEvents, startTime)
 
       if (shouldShowActiveJobs) {
         content ++= <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 133c3b1b9aca8338523dd788e3cb6ece6015ce8b..9fb3f35fd96852e5a504fb372630d571114868e3 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -118,7 +118,8 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage
               <div style="float: left">{k}</div>
               <div style="float: right">
               {
-                val logs = parent.executorsListener.executorToLogUrls.getOrElse(k, Map.empty)
+                val logs = parent.executorsListener.executorToTaskSummary.get(k)
+                  .map(_.executorLogs).getOrElse(Map.empty)
                 logs.map {
                   case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div>
                 }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 0ec42d68d3dccf761112870350b9e30c2678b70c..2f7f8976a8899cdff4d99b319b43fe8351af310b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -20,15 +20,14 @@ package org.apache.spark.ui.jobs
 import java.util.Date
 import javax.servlet.http.HttpServletRequest
 
-import scala.collection.mutable.{Buffer, HashMap, ListBuffer}
+import scala.collection.mutable.{Buffer, ListBuffer}
 import scala.xml.{Node, NodeSeq, Unparsed, Utility}
 
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.scheduler._
 import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.ExecutorUIData
 
 /** Page showing statistics and stage list for a given job */
 private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
@@ -93,55 +92,55 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
     }
   }
 
-  def makeExecutorEvent(executorUIDatas: HashMap[String, ExecutorUIData]): Seq[String] = {
+  def makeExecutorEvent(executorUIDatas: Seq[SparkListenerEvent]): Seq[String] = {
     val events = ListBuffer[String]()
     executorUIDatas.foreach {
-      case (executorId, event) =>
+      case a: SparkListenerExecutorAdded =>
         val addedEvent =
           s"""
              |{
              |  'className': 'executor added',
              |  'group': 'executors',
-             |  'start': new Date(${event.startTime}),
+             |  'start': new Date(${a.time}),
              |  'content': '<div class="executor-event-content"' +
              |    'data-toggle="tooltip" data-placement="bottom"' +
-             |    'data-title="Executor ${executorId}<br>' +
-             |    'Added at ${UIUtils.formatDate(new Date(event.startTime))}"' +
-             |    'data-html="true">Executor ${executorId} added</div>'
+             |    'data-title="Executor ${a.executorId}<br>' +
+             |    'Added at ${UIUtils.formatDate(new Date(a.time))}"' +
+             |    'data-html="true">Executor ${a.executorId} added</div>'
              |}
            """.stripMargin
         events += addedEvent
 
-        if (event.finishTime.isDefined) {
-          val removedEvent =
-            s"""
-               |{
-               |  'className': 'executor removed',
-               |  'group': 'executors',
-               |  'start': new Date(${event.finishTime.get}),
-               |  'content': '<div class="executor-event-content"' +
-               |    'data-toggle="tooltip" data-placement="bottom"' +
-               |    'data-title="Executor ${executorId}<br>' +
-               |    'Removed at ${UIUtils.formatDate(new Date(event.finishTime.get))}' +
-               |    '${
-                        if (event.finishReason.isDefined) {
-                          s"""<br>Reason: ${event.finishReason.get.replace("\n", " ")}"""
-                        } else {
-                          ""
-                        }
-                     }"' +
-               |    'data-html="true">Executor ${executorId} removed</div>'
-               |}
-             """.stripMargin
-            events += removedEvent
-        }
+      case e: SparkListenerExecutorRemoved =>
+        val removedEvent =
+          s"""
+             |{
+             |  'className': 'executor removed',
+             |  'group': 'executors',
+             |  'start': new Date(${e.time}),
+             |  'content': '<div class="executor-event-content"' +
+             |    'data-toggle="tooltip" data-placement="bottom"' +
+             |    'data-title="Executor ${e.executorId}<br>' +
+             |    'Removed at ${UIUtils.formatDate(new Date(e.time))}' +
+             |    '${
+                      if (e.reason != null) {
+                        s"""<br>Reason: ${e.reason.replace("\n", " ")}"""
+                      } else {
+                        ""
+                      }
+                   }"' +
+             |    'data-html="true">Executor ${e.executorId} removed</div>'
+             |}
+           """.stripMargin
+          events += removedEvent
+
     }
     events.toSeq
   }
 
   private def makeTimeline(
       stages: Seq[StageInfo],
-      executors: HashMap[String, ExecutorUIData],
+      executors: Seq[SparkListenerEvent],
       appStartTime: Long): Seq[Node] = {
 
     val stageEventJsonAsStrSeq = makeStageEvent(stages)
@@ -319,7 +318,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
       val operationGraphListener = parent.operationGraphListener
 
       content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
-          executorListener.executorIdToData, appStartTime)
+          executorListener.executorEvents, appStartTime)
 
       content ++= UIUtils.showDagVizForJob(
         jobId, operationGraphListener.getOperationGraphForJob(jobId))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index de787f257737dbd7d0101950e3567b57823f7641..c322ae0972ad7f4e7f8fe482a56c06a2a7414c0f 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -1017,8 +1017,8 @@ private[ui] class TaskDataSource(
         None
       }
 
-    val logs = executorsListener.executorToLogUrls.getOrElse(info.executorId, Map.empty)
-
+    val logs = executorsListener.executorToTaskSummary.get(info.executorId)
+      .map(_.executorLogs).getOrElse(Map.empty)
     new TaskTableRowData(
       info.index,
       info.taskId,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 74bca9931acf78890e9a914b43bf8ede14782b42..c729f03b3c383d82c2af39c62c19a19af7ecef14 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -177,11 +177,6 @@ private[spark] object UIData {
     }
   }
 
-  class ExecutorUIData(
-      val startTime: Long,
-      var finishTime: Option[Long] = None,
-      var finishReason: Option[String] = None)
-
   case class TaskMetricsUIData(
       executorDeserializeTime: Long,
       executorRunTime: Long,
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fbd78aeb20dd6dfe7b51f68dc8e322c81a3033e9..37fff2efa4eaea0bd2e02d534182d79302cd6148 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -426,6 +426,18 @@ object MimaExcludes {
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatusListener.this"),
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.streaming.scheduler.BatchInfo.streamIdToNumRecords"),
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.storageStatusList"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorIdToData"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksActive"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksComplete"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputRecords"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleRead"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToTasksFailed"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToShuffleWrite"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToDuration"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToInputBytes"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToLogUrls"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputBytes"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.executorToOutputRecords"),
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.exec.ExecutorsListener.this"),
       ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ui.storage.StorageListener.storageStatusList"),
       ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ExceptionFailure.apply"),