diff --git a/.rat-excludes b/.rat-excludes
index 08fba6d351d6a8eb4c1566b60d4691a612a836da..7262c960ed6bbaa6c1b77218357d36483e8bbe0e 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -82,4 +82,5 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+org.apache.spark.scheduler.SparkHistoryListenerFactory
 .*parquet
diff --git a/core/src/main/java/org/apache/spark/JavaSparkListener.java b/core/src/main/java/org/apache/spark/JavaSparkListener.java
index fa9acf0a15b88566ee152b80b36b39c3f1668f2b..23bc9a2e817274802d98a653313e242b92a4f342 100644
--- a/core/src/main/java/org/apache/spark/JavaSparkListener.java
+++ b/core/src/main/java/org/apache/spark/JavaSparkListener.java
@@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener {
   @Override
   public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
 
+  @Override
+  public void onOtherEvent(SparkListenerEvent event) { }
+
 }
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 1214d05ba6063a82bada4ed33179a481bcb40286..e6b24afd88ad45cb04086ef637dfb79970d4df50 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener {
         onEvent(blockUpdated);
     }
 
+    @Override
+    public void onOtherEvent(SparkListenerEvent event) {
+        onEvent(event);
+    }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 000a021a528cf6b98a0f025593da2d4e32205cf7..eaa07acc5132e0ad944819848c4336e19c5bf91a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
   // No-op because logging every update would be overkill
   override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
 
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    logEvent(event, flushLogger = true)
+  }
+
   /**
    * Stop logging events. The event log file will be renamed so that it loses the
    * ".inprogress" suffix.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 896f1743332f1a9a3aa6dce92085c69bbb3085be..075a7f13172de2d4177915b982f5ff52d287a633 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -22,15 +22,19 @@ import java.util.Properties
 import scala.collection.Map
 import scala.collection.mutable
 
-import org.apache.spark.{Logging, TaskEndReason}
+import com.fasterxml.jackson.annotation.JsonTypeInfo
+
+import org.apache.spark.{Logging, SparkConf, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.ui.SparkUI
 
 @DeveloperApi
-sealed trait SparkListenerEvent
+@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
+trait SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
@@ -130,6 +134,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
  */
 private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
 
+/**
+ * Interface for creating history listeners defined in other modules like SQL, which are used to
+ * rebuild the history UI.
+ */
+private[spark] trait SparkHistoryListenerFactory {
+  /**
+   * Create listeners used to rebuild the history UI.
+   */
+  def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
+}
+
 /**
  * :: DeveloperApi ::
  * Interface for listening to events from the Spark scheduler. Note that this is an internal
@@ -223,6 +238,11 @@ trait SparkListener {
    * Called when the driver receives a block update info.
    */
   def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
+
+  /**
+   * Called when other events like SQL-specific events are posted.
+   */
+  def onOtherEvent(event: SparkListenerEvent) { }
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 04afde33f5aad7170be6469b098f4d64d799932a..95722a07144ec32aca4c21e450ed2deeb8b7bf69 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
       case blockUpdated: SparkListenerBlockUpdated =>
         listener.onBlockUpdated(blockUpdated)
       case logStart: SparkListenerLogStart => // ignore event log metadata
+      case _ => listener.onOtherEvent(event)
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 4608bce202ec8d3b3e9d405021f8084bafb5af42..8da6884a38535a504b5fca13563a30e406d4bbed 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.ui
 
-import java.util.Date
+import java.util.{Date, ServiceLoader}
+
+import scala.collection.JavaConverters._
 
 import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
   UIRoot}
+import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.StorageStatusListener
@@ -154,7 +157,16 @@ private[spark] object SparkUI {
       appName: String,
       basePath: String,
       startTime: Long): SparkUI = {
-    create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+    val sparkUI = create(
+      None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+
+    val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
+      Utils.getContextOrSparkClassLoader).asScala
+    listenerFactories.foreach { listenerFactory =>
+      val listeners = listenerFactory.createListeners(conf, sparkUI)
+      listeners.foreach(listenerBus.addListener)
+    }
+    sparkUI
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 2d2bd90eb339e6b51492be1464e5085fd93844e6..cb0f1bf79f3d560581017a85aa02e9bc2c1ce095 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -19,19 +19,21 @@ package org.apache.spark.util
 
 import java.util.{Properties, UUID}
 
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
+import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark._
 import org.apache.spark.executor._
 import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage._
 
 /**
@@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
 
   private implicit val format = DefaultFormats
 
+  private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
@@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
         executorMetricsUpdateToJson(metricsUpdate)
       case blockUpdated: SparkListenerBlockUpdated =>
         throw new MatchError(blockUpdated)  // TODO(ekl) implement this
+      case _ => parse(mapper.writeValueAsString(event))
     }
   }
 
@@ -506,6 +511,8 @@ private[spark] object JsonProtocol {
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+      case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
+        .asInstanceOf[SparkListenerEvent]
     }
   }
 
diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
new file mode 100644
index 0000000000000000000000000000000000000000..507100be90967209537b6649c8ea3c98059c7d1c
--- /dev/null
+++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.SparkHistoryListenerFactory
@@ -0,0 +1 @@
+org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4e26250868374383254bdddaa2af9aa8401b8c7b..db286ea8700b615cf6d96e76dea0f5fde39cb110 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1245,6 +1245,7 @@ class SQLContext private[sql](
   sparkContext.addSparkListener(new SparkListener {
     override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
       SQLContext.clearInstantiatedContext()
+      SQLContext.clearSqlListener()
     }
   })
 
@@ -1272,6 +1273,8 @@ object SQLContext {
    */
   @transient private val instantiatedContext = new AtomicReference[SQLContext]()
 
+  @transient private val sqlListener = new AtomicReference[SQLListener]()
+
   /**
    * Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
    *
@@ -1316,6 +1319,10 @@ object SQLContext {
     Option(instantiatedContext.get())
   }
 
+  private[sql] def clearSqlListener(): Unit = {
+    sqlListener.set(null)
+  }
+
   /**
    * Changes the SQLContext that will be returned in this thread and its children when
    * SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@@ -1364,9 +1371,13 @@ object SQLContext {
    * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
    */
   private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
-    val listener = new SQLListener(sc.conf)
-    sc.addSparkListener(listener)
-    sc.ui.foreach(new SQLTab(listener, _))
-    listener
+    if (sqlListener.get() == null) {
+      val listener = new SQLListener(sc.conf)
+      if (sqlListener.compareAndSet(null, listener)) {
+        sc.addSparkListener(listener)
+        sc.ui.foreach(new SQLTab(listener, _))
+      }
+    }
+    sqlListener.get()
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 1422e15549c9456a72e01142f6855cbf60ea1888..34971986261c2e4aad5bec1cf6edfb729ed6c014 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
+  SparkListenerSQLExecutionEnd}
 import org.apache.spark.util.Utils
 
 private[sql] object SQLExecution {
@@ -45,25 +46,14 @@ private[sql] object SQLExecution {
       sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
       val r = try {
         val callSite = Utils.getCallSite()
-        sqlContext.listener.onExecutionStart(
-          executionId,
-          callSite.shortForm,
-          callSite.longForm,
-          queryExecution.toString,
-          SparkPlanGraph(queryExecution.executedPlan),
-          System.currentTimeMillis())
+        sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+          executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
+          SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
         try {
           body
         } finally {
-          // Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
-          // However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
-          // SQL event types to SparkListener since it's a public API, we cannot guarantee that.
-          //
-          // SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
-          //
-          // The worst case is onExecutionEnd may happen before onJobStart when the listener thread
-          // is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
-          sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
+          sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+            executionId, System.currentTimeMillis()))
         }
       } finally {
         sc.setLocalProperty(EXECUTION_ID_KEY, null)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
new file mode 100644
index 0000000000000000000000000000000000000000..486ce34064e431d1237b19203bdcd1e599a45278
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.execution.metric.SQLMetricInfo
+import org.apache.spark.util.Utils
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a SQL SparkPlan.
+ */
+@DeveloperApi
+class SparkPlanInfo(
+    val nodeName: String,
+    val simpleString: String,
+    val children: Seq[SparkPlanInfo],
+    val metrics: Seq[SQLMetricInfo])
+
+private[sql] object SparkPlanInfo {
+
+  def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
+    val metrics = plan.metrics.toSeq.map { case (key, metric) =>
+      new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
+        Utils.getFormattedClassName(metric.param))
+    }
+    val children = plan.children.map(fromSparkPlan)
+
+    new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
new file mode 100644
index 0000000000000000000000000000000000000000..2708219ad34857383597ec2e3a645b114e0f0f7d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetricInfo.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.metric
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * Stores information about a SQL Metric.
+ */
+@DeveloperApi
+class SQLMetricInfo(
+    val name: String,
+    val accumulatorId: Long,
+    val metricParam: String)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1c253e3942e952307e90d2d22658e0e40e8f0421..6c0f6f8a52dc53e69c2dca4572cb3d53f0350601 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -104,21 +104,39 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
   override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
 }
 
+private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
+
+private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
+  (values: Seq[Long]) => {
+    // This is a workaround for SPARK-11013.
+    // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
+    // it at the end of task and the value will be at least 0.
+    val validValues = values.filter(_ >= 0)
+    val Seq(sum, min, med, max) = {
+      val metric = if (validValues.length == 0) {
+        Seq.fill(4)(0L)
+      } else {
+        val sorted = validValues.sorted
+        Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
+      }
+      metric.map(Utils.bytesToString)
+    }
+    s"\n$sum ($min, $med, $max)"
+  }, -1L)
+
 private[sql] object SQLMetrics {
 
   private def createLongMetric(
       sc: SparkContext,
       name: String,
-      stringValue: Seq[Long] => String,
-      initialValue: Long): LongSQLMetric = {
-    val param = new LongSQLMetricParam(stringValue, initialValue)
+      param: LongSQLMetricParam): LongSQLMetric = {
     val acc = new LongSQLMetric(name, param)
     sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
     acc
   }
 
   def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
-    createLongMetric(sc, name, _.sum.toString, 0L)
+    createLongMetric(sc, name, LongSQLMetricParam)
   }
 
   /**
@@ -126,31 +144,25 @@ private[sql] object SQLMetrics {
    * spill size, etc.
    */
   def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = {
-    val stringValue = (values: Seq[Long]) => {
-      // This is a workaround for SPARK-11013.
-      // We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
-      // it at the end of task and the value will be at least 0.
-      val validValues = values.filter(_ >= 0)
-      val Seq(sum, min, med, max) = {
-        val metric = if (validValues.length == 0) {
-          Seq.fill(4)(0L)
-        } else {
-          val sorted = validValues.sorted
-          Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
-        }
-        metric.map(Utils.bytesToString)
-      }
-      s"\n$sum ($min, $med, $max)"
-    }
     // The final result of this metric in physical operator UI may looks like:
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
-    createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
+    createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
+  }
+
+  def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = {
+    val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
+    val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
+    val metricParam = metricParamName match {
+      case `longSQLMetricParam` => LongSQLMetricParam
+      case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
+    }
+    metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
   }
 
   /**
    * A metric that its value will be ignored. Use this one when we need a metric parameter but don't
    * care about the value.
    */
-  val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L))
+  val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index e74d6fb396e1c427c2b91788df3e59f269cdecb5..c74ad40406992a9b7967ced6b5f70ee2ccaaeb09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.ui
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{Node, Unparsed}
-
-import org.apache.commons.lang3.StringEscapeUtils
+import scala.xml.Node
 
 import org.apache.spark.Logging
 import org.apache.spark.ui.{UIUtils, WebUIPage}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5a072de400b6a860f7822a37d2074ecd06a13bb5..e19a1e3e5851f6cad7a5d863c2f204e88244ec72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,11 +19,34 @@ package org.apache.spark.sql.execution.ui
 
 import scala.collection.mutable
 
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricValue, SQLMetricParam}
 import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
+import org.apache.spark.ui.SparkUI
+
+@DeveloperApi
+case class SparkListenerSQLExecutionStart(
+    executionId: Long,
+    description: String,
+    details: String,
+    physicalPlanDescription: String,
+    sparkPlanInfo: SparkPlanInfo,
+    time: Long)
+  extends SparkListenerEvent
+
+@DeveloperApi
+case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
+  extends SparkListenerEvent
+
+private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
+
+  override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
+    List(new SQLHistoryListener(conf, sparkUI))
+  }
+}
 
 private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
 
@@ -118,7 +141,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
   override def onExecutorMetricsUpdate(
       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
     for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) {
-      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask = false)
+      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
+        finishTask = false)
     }
   }
 
@@ -140,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
       taskEnd.taskInfo.taskId,
       taskEnd.stageId,
       taskEnd.stageAttemptId,
-      taskEnd.taskMetrics,
+      taskEnd.taskMetrics.accumulatorUpdates(),
       finishTask = true)
   }
 
@@ -148,15 +172,12 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
    * Update the accumulator values of a task with the latest metrics for this task. This is called
    * every time we receive an executor heartbeat or when a task finishes.
    */
-  private def updateTaskAccumulatorValues(
+  protected def updateTaskAccumulatorValues(
       taskId: Long,
       stageId: Int,
       stageAttemptID: Int,
-      metrics: TaskMetrics,
+      accumulatorUpdates: Map[Long, Any],
       finishTask: Boolean): Unit = {
-    if (metrics == null) {
-      return
-    }
 
     _stageIdToStageMetrics.get(stageId) match {
       case Some(stageMetrics) =>
@@ -174,9 +195,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
             case Some(taskMetrics) =>
               if (finishTask) {
                 taskMetrics.finished = true
-                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+                taskMetrics.accumulatorUpdates = accumulatorUpdates
               } else if (!taskMetrics.finished) {
-                taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
+                taskMetrics.accumulatorUpdates = accumulatorUpdates
               } else {
                 // If a task is finished, we should not override with accumulator updates from
                 // heartbeat reports
@@ -185,7 +206,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
               // TODO Now just set attemptId to 0. Should fix here when we can get the attempt
               // id from SparkListenerExecutorMetricsUpdate
               stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
-                  attemptId = 0, finished = finishTask, metrics.accumulatorUpdates())
+                  attemptId = 0, finished = finishTask, accumulatorUpdates)
           }
         }
       case None =>
@@ -193,38 +214,40 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
     }
   }
 
-  def onExecutionStart(
-      executionId: Long,
-      description: String,
-      details: String,
-      physicalPlanDescription: String,
-      physicalPlanGraph: SparkPlanGraph,
-      time: Long): Unit = {
-    val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
-      node.metrics.map(metric => metric.accumulatorId -> metric)
-    }
-
-    val executionUIData = new SQLExecutionUIData(executionId, description, details,
-      physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
-    synchronized {
-      activeExecutions(executionId) = executionUIData
-      _executionIdToData(executionId) = executionUIData
-    }
-  }
-
-  def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
-    _executionIdToData.get(executionId).foreach { executionUIData =>
-      executionUIData.completionTime = Some(time)
-      if (!executionUIData.hasRunningJobs) {
-        // onExecutionEnd happens after all "onJobEnd"s
-        // So we should update the execution lists.
-        markExecutionFinished(executionId)
-      } else {
-        // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
-        // Then we don't if the execution is successful, so let the last onJobEnd updates the
-        // execution lists.
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case SparkListenerSQLExecutionStart(executionId, description, details,
+      physicalPlanDescription, sparkPlanInfo, time) =>
+      val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
+      val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
+        node.metrics.map(metric => metric.accumulatorId -> metric)
+      }
+      val executionUIData = new SQLExecutionUIData(
+        executionId,
+        description,
+        details,
+        physicalPlanDescription,
+        physicalPlanGraph,
+        sqlPlanMetrics.toMap,
+        time)
+      synchronized {
+        activeExecutions(executionId) = executionUIData
+        _executionIdToData(executionId) = executionUIData
+      }
+    case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
+      _executionIdToData.get(executionId).foreach { executionUIData =>
+        executionUIData.completionTime = Some(time)
+        if (!executionUIData.hasRunningJobs) {
+          // onExecutionEnd happens after all "onJobEnd"s
+          // So we should update the execution lists.
+          markExecutionFinished(executionId)
+        } else {
+          // There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
+          // Then we don't if the execution is successful, so let the last onJobEnd updates the
+          // execution lists.
+        }
       }
     }
+    case _ => // Ignore
   }
 
   private def markExecutionFinished(executionId: Long): Unit = {
@@ -289,6 +312,38 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
 
 }
 
+private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
+  extends SQLListener(conf) {
+
+  private var sqlTabAttached = false
+
+  override def onExecutorMetricsUpdate(
+      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
+    // Do nothing
+  }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
+    updateTaskAccumulatorValues(
+      taskEnd.taskInfo.taskId,
+      taskEnd.stageId,
+      taskEnd.stageAttemptId,
+      taskEnd.taskInfo.accumulables.map { acc =>
+        (acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
+      }.toMap,
+      finishTask = true)
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case _: SparkListenerSQLExecutionStart =>
+      if (!sqlTabAttached) {
+        new SQLTab(this, sparkUI)
+        sqlTabAttached = true
+      }
+      super.onOtherEvent(event)
+    case _ => super.onOtherEvent(event)
+  }
+}
+
 /**
  * Represent all necessary data for an execution that will be used in Web UI.
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 9c27944d42fc6a72fd7d1df7095ddaf156f86d17..4f50b2ecdc8f881da6b0f39668c2a9dc736392ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.sql.execution.ui
 
-import java.util.concurrent.atomic.AtomicInteger
-
 import org.apache.spark.Logging
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
-  extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
+  extends SparkUITab(sparkUI, "SQL") with Logging {
 
   val parent = sparkUI
 
@@ -35,13 +33,5 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
 }
 
 private[sql] object SQLTab {
-
   private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
-
-  private val nextTabId = new AtomicInteger(0)
-
-  private def nextTabName: String = {
-    val nextId = nextTabId.getAndIncrement()
-    if (nextId == 0) "SQL" else s"SQL$nextId"
-  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index f1fce5478a3fe835fb90f3772652542ad30013fa..7af0ff09c5c6d35a54330eac41267ea046098703 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
 
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.sql.execution.SparkPlanInfo
+import org.apache.spark.sql.execution.metric.SQLMetrics
 
 /**
  * A graph used for storing information of an executionPlan of DataFrame.
@@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph {
   /**
    * Build a SparkPlanGraph from the root of a SparkPlan tree.
    */
-  def apply(plan: SparkPlan): SparkPlanGraph = {
+  def apply(planInfo: SparkPlanInfo): SparkPlanGraph = {
     val nodeIdGenerator = new AtomicLong(0)
     val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
     val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
-    buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
+    buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
     new SparkPlanGraph(nodes, edges)
   }
 
   private def buildSparkPlanGraphNode(
-      plan: SparkPlan,
+      planInfo: SparkPlanInfo,
       nodeIdGenerator: AtomicLong,
       nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
       edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
-    val metrics = plan.metrics.toSeq.map { case (key, metric) =>
-      SQLPlanMetric(metric.name.getOrElse(key), metric.id,
-        metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
+    val metrics = planInfo.metrics.map { metric =>
+      SQLPlanMetric(metric.name, metric.accumulatorId,
+        SQLMetrics.getMetricParam(metric.metricParam))
     }
     val node = SparkPlanGraphNode(
-      nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
+      nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
     nodes += node
-    val childrenNodes = plan.children.map(
+    val childrenNodes = planInfo.children.map(
       child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
     for (child <- childrenNodes) {
       edges += SparkPlanGraphEdge(child.id, node.id)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 82867ab4967bb7b64321948b2d0aab9fd5e84552..4f2cad19bfb6bdf78c842f43f9a536fbb64e1ad8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.xbean.asm5.Opcodes._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
+import org.apache.spark.sql.execution.SparkPlanInfo
 import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
@@ -82,7 +83,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
     if (jobs.size == expectedNumOfJobs) {
       // If we can track all jobs, check the metric values
       val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
-      val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
+      val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
+        df.queryExecution.executedPlan)).nodes.filter { node =>
         expectedMetrics.contains(node.id)
       }.map { node =>
         val nodeMetrics = node.metrics.map { metric =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index c15aac775096c7b67785c0338c7d0ff3a4ed82a7..12a4e1356fed06c7b885da4a4860e26cbc1ed8b1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -21,10 +21,10 @@ import java.util.Properties
 
 import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.sql.execution.metric.LongSQLMetricValue
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.{DataFrame, SQLContext}
-import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
+import org.apache.spark.sql.execution.metric.LongSQLMetricValue
 import org.apache.spark.sql.test.SharedSQLContext
 
 class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
@@ -82,7 +82,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val executionId = 0
     val df = createTestDataFrame
     val accumulatorIds =
-      SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId))
+      SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
+        .nodes.flatMap(_.metrics.map(_.accumulatorId))
     // Assume all accumulators are long
     var accumulatorValue = 0L
     val accumulatorUpdates = accumulatorIds.map { id =>
@@ -90,13 +91,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       (id, accumulatorValue)
     }.toMap
 
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
 
     val executionUIData = listener.executionIdToData(0)
 
@@ -206,7 +207,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       time = System.currentTimeMillis(),
       JobSucceeded
     ))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
 
     assert(executionUIData.runningJobs.isEmpty)
     assert(executionUIData.succeededJobs === Seq(0))
@@ -219,19 +221,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
       stageInfos = Nil,
       createProperties(executionId)))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -248,13 +251,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -271,7 +274,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
       time = System.currentTimeMillis(),
       stageInfos = Nil,
       createProperties(executionId)))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 1,
       time = System.currentTimeMillis(),
@@ -288,19 +292,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
     val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
-    listener.onExecutionStart(
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
       executionId,
       "test",
       "test",
       df.queryExecution.toString,
-      SparkPlanGraph(df.queryExecution.executedPlan),
-      System.currentTimeMillis())
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
     listener.onJobStart(SparkListenerJobStart(
       jobId = 0,
       time = System.currentTimeMillis(),
       stageInfos = Seq.empty,
       createProperties(executionId)))
-    listener.onExecutionEnd(executionId, System.currentTimeMillis())
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
+      executionId, System.currentTimeMillis()))
     listener.onJobEnd(SparkListenerJobEnd(
       jobId = 0,
       time = System.currentTimeMillis(),
@@ -338,6 +343,7 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
       .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
     val sc = new SparkContext(conf)
     try {
+      SQLContext.clearSqlListener()
       val sqlContext = new SQLContext(sc)
       import sqlContext.implicits._
       // Run 100 successful executions and 100 failed executions.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
index 963d10eed62edbbce9247fff76ad82432b9c5a57..e7b376548787cfdbb817d02e213684aac6ed5f07 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala
@@ -42,6 +42,7 @@ trait SharedSQLContext extends SQLTestUtils {
    * Initialize the [[TestSQLContext]].
    */
   protected override def beforeAll(): Unit = {
+    SQLContext.clearSqlListener()
     if (_ctx == null) {
       _ctx = new TestSQLContext
     }