From 28b57c8a124fe55501c4ca4b91320851ace5d735 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Tue, 29 Nov 2016 17:24:17 -0800
Subject: [PATCH] [SPARK-18516][SQL] Split state and progress in streaming

This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, including what phase of processing is currently happening and if data is available.
 - `recentProgress` - an array of statistics about the most recent microbatches that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across restarts, we change the `id` field from an integer that is unique with in the JVM to a `UUID` that is globally unique.

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #15954 from marmbrus/queryProgress.

(cherry picked from commit c3d08e2f29baeebe09bf4c059ace4336af9116b5)
Signed-off-by: Michael Armbrust <michael@databricks.com>
---
 .../spark/sql/kafka010/KafkaSourceSuite.scala |   7 +-
 project/MimaExcludes.scala                    |  11 +
 python/pyspark/sql/streaming.py               | 326 ++----------------
 python/pyspark/sql/tests.py                   |  22 ++
 .../execution/streaming/MetricsReporter.scala |  53 +++
 .../streaming/ProgressReporter.scala          | 234 +++++++++++++
 .../execution/streaming/StreamExecution.scala | 282 ++++-----------
 .../execution/streaming/StreamMetrics.scala   | 243 -------------
 .../apache/spark/sql/internal/SQLConf.scala   |   8 +
 .../spark/sql/streaming/SinkStatus.scala      |  66 ----
 .../spark/sql/streaming/SourceStatus.scala    |  95 -----
 .../spark/sql/streaming/StreamingQuery.scala  |  33 +-
 .../streaming/StreamingQueryException.scala   |   2 +-
 .../streaming/StreamingQueryListener.scala    |  24 +-
 .../sql/streaming/StreamingQueryManager.scala |  27 +-
 .../sql/streaming/StreamingQueryStatus.scala  | 151 +-------
 .../apache/spark/sql/streaming/progress.scala | 193 +++++++++++
 .../streaming/StreamMetricsSuite.scala        | 213 ------------
 .../sql/streaming/FileStreamSourceSuite.scala |  10 +-
 .../spark/sql/streaming/StreamTest.scala      |  73 +---
 .../StreamingQueryListenerSuite.scala         | 267 +++++++-------
 .../StreamingQueryManagerSuite.scala          |   2 +-
 .../StreamingQueryProgressSuite.scala         |  98 ++++++
 .../streaming/StreamingQueryStatusSuite.scala | 123 -------
 .../sql/streaming/StreamingQuerySuite.scala   | 260 ++++++++------
 .../spark/sql/streaming/WatermarkSuite.scala  |  16 +-
 26 files changed, 1087 insertions(+), 1752 deletions(-)
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
 delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
 create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
 delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
 delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala

diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f95d..2d6ccb22dd 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
     val mapped = kafka.map(kv => kv._2.toInt + 1)
     testStream(mapped)(
+      StartStream(trigger = ProcessingTime(1)),
       makeSureGetOffsetCalled,
       AddKafkaData(Set(topic), 1, 2, 3),
       CheckAnswer(2, 3, 4),
-      AssertOnLastQueryStatus { status =>
-        assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
-        assert(status.sourceStatuses(0).processingRate > 0.0)
+      AssertOnQuery { query =>
+        val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+        recordsRead == 3
       }
     )
   }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 03c9fcc012..9739164332 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -78,6 +78,17 @@ object MimaExcludes {
       ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
       ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
 
+      // [SPARK-18516][SQL] Split state and progress in streaming
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceStatus"),
+      ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SinkStatus"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sinkStatus"),
+      ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
+      ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+      ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
+      ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
+
       // [SPARK-17338][SQL] add global temp view
       ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
       ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"),
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9c3a237699..c420b0d016 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -16,6 +16,8 @@
 #
 
 import sys
+import json
+
 if sys.version >= '3':
     intlike = int
     basestring = unicode = str
@@ -48,10 +50,9 @@ class StreamingQuery(object):
     @property
     @since(2.0)
     def id(self):
-        """The id of the streaming query. This id is unique across all queries that have been
-        started in the current process.
+        """The id of the streaming query.
         """
-        return self._jsq.id()
+        return self._jsq.id().toString()
 
     @property
     @since(2.0)
@@ -87,6 +88,24 @@ class StreamingQuery(object):
         else:
             return self._jsq.awaitTermination()
 
+    @property
+    @since(2.1)
+    def recentProgresses(self):
+        """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
+        The number of progress updates retained for each stream is configured by Spark session
+        configuration `spark.sql.streaming.numRecentProgresses`.
+        """
+        return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+
+    @property
+    @since(2.1)
+    def lastProgress(self):
+        """
+        Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
+        :return: a map
+        """
+        return json.loads(self._jsq.lastProgress().json())
+
     @since(2.0)
     def processAllAvailable(self):
         """Blocks until all available data in the source has been processed and committed to the
@@ -149,8 +168,6 @@ class StreamingQueryManager(object):
         True
         >>> sq.stop()
         """
-        if not isinstance(id, intlike):
-            raise ValueError("The id for the query must be an integer. Got: %s" % id)
         return StreamingQuery(self._jsqm.get(id))
 
     @since(2.0)
@@ -191,303 +208,6 @@ class StreamingQueryManager(object):
         self._jsqm.resetTerminated()
 
 
-class StreamingQueryStatus(object):
-    """A class used to report information about the progress of a StreamingQuery.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.1
-    """
-
-    def __init__(self, jsqs):
-        self._jsqs = jsqs
-
-    def __str__(self):
-        """
-        Pretty string of this query status.
-
-        >>> print(sqs)
-        Status of query 'query'
-            Query id: 1
-            Status timestamp: 123
-            Input rate: 15.5 rows/sec
-            Processing rate 23.5 rows/sec
-            Latency: 345.0 ms
-            Trigger details:
-                batchId: 5
-                isDataPresentInTrigger: true
-                isTriggerActive: true
-                latency.getBatch.total: 20
-                latency.getOffset.total: 10
-                numRows.input.total: 100
-            Source statuses [1 source]:
-                Source 1 - MySource1
-                    Available offset: 0
-                    Input rate: 15.5 rows/sec
-                    Processing rate: 23.5 rows/sec
-                    Trigger details:
-                        numRows.input.source: 100
-                        latency.getOffset.source: 10
-                        latency.getBatch.source: 20
-            Sink status - MySink
-                Committed offsets: [1, -]
-        """
-        return self._jsqs.toString()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def name(self):
-        """
-        Name of the query. This name is unique across all active queries.
-
-        >>> sqs.name
-        u'query'
-        """
-        return self._jsqs.name()
-
-    @property
-    @since(2.1)
-    def id(self):
-        """
-        Id of the query. This id is unique across all queries that have been started in
-        the current process.
-
-        >>> int(sqs.id)
-        1
-        """
-        return self._jsqs.id()
-
-    @property
-    @since(2.1)
-    def timestamp(self):
-        """
-        Timestamp (ms) of when this query was generated.
-
-        >>> int(sqs.timestamp)
-        123
-        """
-        return self._jsqs.timestamp()
-
-    @property
-    @since(2.1)
-    def inputRate(self):
-        """
-        Current total rate (rows/sec) at which data is being generated by all the sources.
-
-        >>> sqs.inputRate
-        15.5
-        """
-        return self._jsqs.inputRate()
-
-    @property
-    @since(2.1)
-    def processingRate(self):
-        """
-        Current rate (rows/sec) at which the query is processing data from all the sources.
-
-        >>> sqs.processingRate
-        23.5
-        """
-        return self._jsqs.processingRate()
-
-    @property
-    @since(2.1)
-    def latency(self):
-        """
-        Current average latency between the data being available in source and the sink
-        writing the corresponding output.
-
-        >>> sqs.latency
-        345.0
-        """
-        if (self._jsqs.latency().nonEmpty()):
-            return self._jsqs.latency().get()
-        else:
-            return None
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def sourceStatuses(self):
-        """
-        Current statuses of the sources as a list.
-
-        >>> len(sqs.sourceStatuses)
-        1
-        >>> sqs.sourceStatuses[0].description
-        u'MySource1'
-        """
-        return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def sinkStatus(self):
-        """
-        Current status of the sink.
-
-        >>> sqs.sinkStatus.description
-        u'MySink'
-        """
-        return SinkStatus(self._jsqs.sinkStatus())
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def triggerDetails(self):
-        """
-        Low-level details of the currently active trigger (e.g. number of rows processed
-        in trigger, latency of intermediate steps, etc.).
-
-        If no trigger is currently active, then it will have details of the last completed trigger.
-
-        >>> sqs.triggerDetails
-        {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
-        u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
-        u'isDataPresentInTrigger': u'true'}
-        """
-        return self._jsqs.triggerDetails()
-
-
-class SourceStatus(object):
-    """
-    Status and metrics of a streaming Source.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.1
-    """
-
-    def __init__(self, jss):
-        self._jss = jss
-
-    def __str__(self):
-        """
-        Pretty string of this source status.
-
-        >>> print(sqs.sourceStatuses[0])
-        Status of source MySource1
-            Available offset: 0
-            Input rate: 15.5 rows/sec
-            Processing rate: 23.5 rows/sec
-            Trigger details:
-                numRows.input.source: 100
-                latency.getOffset.source: 10
-                latency.getBatch.source: 20
-        """
-        return self._jss.toString()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def description(self):
-        """
-        Description of the source corresponding to this status.
-
-        >>> sqs.sourceStatuses[0].description
-        u'MySource1'
-        """
-        return self._jss.description()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def offsetDesc(self):
-        """
-        Description of the current offset if known.
-
-        >>> sqs.sourceStatuses[0].offsetDesc
-        u'0'
-        """
-        return self._jss.offsetDesc()
-
-    @property
-    @since(2.1)
-    def inputRate(self):
-        """
-        Current rate (rows/sec) at which data is being generated by the source.
-
-        >>> sqs.sourceStatuses[0].inputRate
-        15.5
-        """
-        return self._jss.inputRate()
-
-    @property
-    @since(2.1)
-    def processingRate(self):
-        """
-        Current rate (rows/sec) at which the query is processing data from the source.
-
-        >>> sqs.sourceStatuses[0].processingRate
-        23.5
-        """
-        return self._jss.processingRate()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def triggerDetails(self):
-        """
-        Low-level details of the currently active trigger (e.g. number of rows processed
-        in trigger, latency of intermediate steps, etc.).
-
-        If no trigger is currently active, then it will have details of the last completed trigger.
-
-        >>> sqs.sourceStatuses[0].triggerDetails
-        {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
-        u'latency.getBatch.source': u'20'}
-       """
-        return self._jss.triggerDetails()
-
-
-class SinkStatus(object):
-    """
-    Status and metrics of a streaming Sink.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.1
-    """
-
-    def __init__(self, jss):
-        self._jss = jss
-
-    def __str__(self):
-        """
-        Pretty string of this source status.
-
-        >>> print(sqs.sinkStatus)
-        Status of sink MySink
-            Committed offsets: [1, -]
-        """
-        return self._jss.toString()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def description(self):
-        """
-        Description of the source corresponding to this status.
-
-        >>> sqs.sinkStatus.description
-        u'MySink'
-        """
-        return self._jss.description()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def offsetDesc(self):
-        """
-        Description of the current offsets up to which data has been written by the sink.
-
-        >>> sqs.sinkStatus.offsetDesc
-        u'[1, -]'
-        """
-        return self._jss.offsetDesc()
-
-
 class Trigger(object):
     """Used to indicate how often results should be produced by a :class:`StreamingQuery`.
 
@@ -1053,8 +773,6 @@ def _test():
     globs['sdf_schema'] = StructType([StructField("data", StringType(), False)])
     globs['df'] = \
         globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
-    globs['sqs'] = StreamingQueryStatus(
-        spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.streaming, globs=globs,
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3d46b852c5..7151f95216 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1082,6 +1082,28 @@ class SQLTests(ReusedPySparkTestCase):
             q.stop()
             shutil.rmtree(tmpPath)
 
+    def test_stream_status_and_progress(self):
+        df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+        for q in self.spark._wrapped.streams.active:
+            q.stop()
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        self.assertTrue(df.isStreaming)
+        out = os.path.join(tmpPath, 'out')
+        chk = os.path.join(tmpPath, 'chk')
+        q = df.writeStream \
+            .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+        try:
+            q.processAllAvailable()
+            lastProgress = q.lastProgress
+            recentProgresses = q.recentProgresses
+            self.assertEqual(lastProgress['name'], q.name)
+            self.assertEqual(lastProgress['id'], q.id)
+            self.assertTrue(any(p == lastProgress for p in recentProgresses))
+        finally:
+            q.stop()
+            shutil.rmtree(tmpPath)
+
     def test_stream_await_termination(self):
         df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
         for q in self.spark._wrapped.streams.active:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
new file mode 100644
index 0000000000..5551d12fa8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.{util => ju}
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+/**
+ * Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to
+ * Codahale/DropWizard metrics
+ */
+class MetricsReporter(
+    stream: StreamExecution,
+    override val sourceName: String) extends CodahaleSource with Logging {
+
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // Metric names should not have . in them, so that all the metrics of a query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", () => stream.lastProgress.inputRowsPerSecond)
+  registerGauge("processingRate-total", () => stream.lastProgress.inputRowsPerSecond)
+  registerGauge("latency", () => stream.lastProgress.durationMs.get("triggerExecution").longValue())
+
+  private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {
+    synchronized {
+      metricRegistry.register(name, new Gauge[T] {
+        override def getValue: T = f()
+      })
+    }
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
new file mode 100644
index 0000000000..b7b6e1988e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.streaming._
+import org.apache.spark.util.Clock
+
+/**
+ * Responsible for continually reporting statistics about the amount of data processed as well
+ * as latency for a streaming query.  This trait is designed to be mixed into the
+ * [[StreamExecution]], who is responsible for calling `startTrigger` and `finishTrigger`
+ * at the appropriate times. Additionally, the status can updated with `updateStatusMessage` to
+ * allow reporting on the streams current state (i.e. "Fetching more data").
+ */
+trait ProgressReporter extends Logging {
+
+  case class ExecutionStats(
+    inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
+
+  // Internal state of the stream, required for computing metrics.
+  protected def id: UUID
+  protected def name: String
+  protected def triggerClock: Clock
+  protected def logicalPlan: LogicalPlan
+  protected def lastExecution: QueryExecution
+  protected def newData: Map[Source, DataFrame]
+  protected def availableOffsets: StreamProgress
+  protected def committedOffsets: StreamProgress
+  protected def sources: Seq[Source]
+  protected def sink: Sink
+  protected def streamExecutionMetadata: StreamExecutionMetadata
+  protected def currentBatchId: Long
+  protected def sparkSession: SparkSession
+
+  // Local timestamps and counters.
+  private var currentTriggerStartTimestamp = -1L
+  private var currentTriggerEndTimestamp = -1L
+  // TODO: Restore this from the checkpoint when possible.
+  private var lastTriggerStartTimestamp = -1L
+  private val currentDurationsMs = new mutable.HashMap[String, Long]()
+
+  /** Flag that signals whether any error with input metrics have already been logged */
+  private var metricWarningLogged: Boolean = false
+
+  /** Holds the most recent query progress updates.  Accesses must lock on the queue itself. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  @volatile
+  protected var currentStatus: StreamingQueryStatus =
+    StreamingQueryStatus(
+      message = "Initializing StreamExecution",
+      isDataAvailable = false,
+      isTriggerActive = false)
+
+  /** Returns the current status of the query. */
+  def status: StreamingQueryStatus = currentStatus
+
+  /** Returns an array containing the most recent query progress updates. */
+  def recentProgresses: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  /** Returns the most recent query progress update. */
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.last
+  }
+
+  /** Begins recording statistics about query progress for a given trigger. */
+  protected def startTrigger(): Unit = {
+    logDebug("Starting Trigger Calculation")
+    lastTriggerStartTimestamp = currentTriggerStartTimestamp
+    currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+    currentStatus = currentStatus.copy(isTriggerActive = true)
+    currentDurationsMs.clear()
+  }
+
+  /** Finalizes the query progress and adds it to list of recent status updates. */
+  protected def finishTrigger(hasNewData: Boolean): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+
+    val executionStats: ExecutionStats = if (!hasNewData) {
+      ExecutionStats(Map.empty, Seq.empty)
+    } else {
+      extractExecutionStats
+    }
+
+    val processingTimeSec =
+      (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000
+
+    val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
+      (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 1000
+    } else {
+      Double.NaN
+    }
+    logDebug(s"Execution stats: $executionStats")
+
+    val sourceProgress = sources.map { source =>
+      val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+      new SourceProgress(
+        description = source.toString,
+        startOffset = committedOffsets.get(source).map(_.json).orNull,
+        endOffset = availableOffsets.get(source).map(_.json).orNull,
+        numInputRows = numRecords,
+        inputRowsPerSecond = numRecords / inputTimeSec,
+        processedRowsPerSecond = numRecords / processingTimeSec
+      )
+    }
+    val sinkProgress = new SinkProgress(sink.toString)
+
+    val newProgress = new StreamingQueryProgress(
+      id = id,
+      name = name,
+      timestamp = currentTriggerStartTimestamp,
+      batchId = currentBatchId,
+      durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
+      currentWatermark = streamExecutionMetadata.batchWatermarkMs,
+      stateOperators = executionStats.stateOperators.toArray,
+      sources = sourceProgress.toArray,
+      sink = sinkProgress)
+
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+
+    logInfo(s"Streaming query made progress: $newProgress")
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /** Extracts statistics from the most recent query execution. */
+  private def extractExecutionStats: ExecutionStats = {
+    // We want to associate execution plan leaves to sources that generate them, so that we match
+    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
+    // Consider the translation from the streaming logical plan to the final executed plan.
+    //
+    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
+    //
+    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
+    //    - Each logical plan leaf will be associated with a single streaming source.
+    //    - There can be multiple logical plan leaves associated with a streaming source.
+    //    - There can be leaves not associated with any streaming source, because they were
+    //      generated from a batch source (e.g. stream-batch joins)
+    //
+    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
+    //    the trigger logical plan, we associate executed plan leaves with corresponding
+    //    streaming sources.
+    //
+    // 3. For each source, we sum the metrics of the associated execution plan leaves.
+    //
+    val logicalPlanLeafToSource = newData.flatMap { case (source, df) =>
+      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+    }
+    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming
+    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
+    val numInputRows: Map[Source, Long] =
+      if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
+        val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
+          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
+        }
+        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
+          val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+          source -> numRows
+        }
+        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
+      } else {
+        if (!metricWarningLogged) {
+          def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
+          logWarning(
+            "Could not report metrics as number leaves in trigger logical plan did not match that" +
+                s" of the execution plan:\n" +
+                s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
+                s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+          metricWarningLogged = true
+        }
+        Map.empty
+      }
+
+    // Extract statistics about stateful operators in the query plan.
+    val stateNodes = lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreSaveExec] => p
+    }
+    val stateOperators = stateNodes.map { node =>
+      new StateOperatorProgress(
+        numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
+        numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
+    }
+
+    ExecutionStats(numInputRows, stateOperators)
+  }
+
+  /** Records the duration of running `body` for the next query progress update. */
+  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
+    val startTime = triggerClock.getTimeMillis()
+    val result = body
+    val endTime = triggerClock.getTimeMillis()
+    val timeTaken = math.max(endTime - startTime, 0)
+
+    val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L)
+    currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken)
+    logDebug(s"$triggerDetailKey took $timeTaken ms")
+    result
+  }
+
+  /** Updates the message returned in `status`. */
+  protected def updateStatusMessage(message: String): Unit = {
+    currentStatus = currentStatus.copy(message = message)
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 21664d7fd0..e4f31af35f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.mutable.ArrayBuffer
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -47,7 +47,6 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
  */
 class StreamExecution(
     override val sparkSession: SparkSession,
-    override val id: Long,
     override val name: String,
     checkpointRoot: String,
     val logicalPlan: LogicalPlan,
@@ -55,10 +54,12 @@ class StreamExecution(
     val trigger: Trigger,
     val triggerClock: Clock,
     val outputMode: OutputMode)
-  extends StreamingQuery with Logging {
+  extends StreamingQuery with ProgressReporter with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
-  import StreamMetrics._
+
+  // TODO: restore this from the checkpoint directory.
+  override val id: UUID = UUID.randomUUID()
 
   private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
 
@@ -89,16 +90,16 @@ class StreamExecution(
    * once, since the field's value may change at any time.
    */
   @volatile
-  private var availableOffsets = new StreamProgress
+  protected var availableOffsets = new StreamProgress
 
   /** The current batchId or -1 if execution has not yet been initialized. */
-  private var currentBatchId: Long = -1
+  protected var currentBatchId: Long = -1
 
   /** Stream execution metadata */
-  private var streamExecutionMetadata = StreamExecutionMetadata()
+  protected var streamExecutionMetadata = StreamExecutionMetadata()
 
   /** All stream sources present in the query plan. */
-  private val sources =
+  protected val sources =
     logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
@@ -113,7 +114,10 @@ class StreamExecution(
   private var state: State = INITIALIZED
 
   @volatile
-  var lastExecution: QueryExecution = null
+  var lastExecution: QueryExecution = _
+
+  /** Holds the most recent input data for each source. */
+  protected var newData: Map[Source, DataFrame] = _
 
   @volatile
   private var streamDeathCause: StreamingQueryException = null
@@ -121,16 +125,8 @@ class StreamExecution(
   /* Get the call site in the caller thread; will pass this into the micro batch thread */
   private val callSite = Utils.getCallSite()
 
-  /** Metrics for this query */
-  private val streamMetrics =
-    new StreamMetrics(uniqueSources.toSet, triggerClock, s"StructuredStreaming.$name")
-
-  @volatile
-  private var currentStatus: StreamingQueryStatus = null
-
-  /** Flag that signals whether any error with input metrics have already been logged */
-  @volatile
-  private var metricWarningLogged: Boolean = false
+  /** Used to report metrics to coda-hale. */
+  lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name")
 
   /**
    * The thread that runs the micro-batches of this stream. Note that this thread must be
@@ -158,15 +154,6 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
-  /** Returns the current status of the query. */
-  override def status: StreamingQueryStatus = currentStatus
-
-  /** Returns current status of all the sources. */
-  override def sourceStatuses: Array[SourceStatus] = currentStatus.sourceStatuses.toArray
-
-  /** Returns current status of the sink. */
-  override def sinkStatus: SinkStatus = currentStatus.sinkStatus
-
   /** Returns the [[StreamingQueryException]] if the query was terminated by an exception. */
   override def exception: Option[StreamingQueryException] = Option(streamDeathCause)
 
@@ -200,8 +187,8 @@ class StreamExecution(
       if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
         sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
-      updateStatus()
-      postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not throw exception.
+
+      postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -210,40 +197,45 @@ class StreamExecution(
       SparkSession.setActiveSession(sparkSession)
 
       triggerExecutor.execute(() => {
-        streamMetrics.reportTriggerStarted(currentBatchId)
-        streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data from sources")
-        updateStatus()
-        val isTerminated = reportTimeTaken(TRIGGER_LATENCY) {
+        startTrigger()
+
+        val isTerminated =
           if (isActive) {
-            if (currentBatchId < 0) {
-              // We'll do this initialization only once
-              populateStartOffsets()
-              logDebug(s"Stream running from $committedOffsets to $availableOffsets")
-            } else {
-              constructNextBatch()
+            reportTimeTaken("triggerExecution") {
+              if (currentBatchId < 0) {
+                // We'll do this initialization only once
+                populateStartOffsets()
+                logDebug(s"Stream running from $committedOffsets to $availableOffsets")
+              } else {
+                constructNextBatch()
+              }
+              if (dataAvailable) {
+                currentStatus = currentStatus.copy(isDataAvailable = true)
+                updateStatusMessage("Processing new data")
+                runBatch()
+              }
             }
+
+            // Report trigger as finished and construct progress object.
+            finishTrigger(dataAvailable)
+            postEvent(new QueryProgressEvent(lastProgress))
+
             if (dataAvailable) {
-              streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, true)
-              streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing new data")
-              updateStatus()
-              runBatch()
               // We'll increase currentBatchId after we complete processing current batch's data
               currentBatchId += 1
             } else {
-              streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, false)
-              streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data")
-              updateStatus()
+              currentStatus = currentStatus.copy(isDataAvailable = false)
+              updateStatusMessage("Waiting for data to arrive")
               Thread.sleep(pollingDelayMs)
             }
             true
           } else {
             false
           }
-        }
-        // Update metrics and notify others
-        streamMetrics.reportTriggerFinished()
-        updateStatus()
-        postEvent(new QueryProgressEvent(currentStatus))
+
+        // Update committed offsets.
+        committedOffsets ++= availableOffsets
+        updateStatusMessage("Waiting for next trigger")
         isTerminated
       })
     } catch {
@@ -264,14 +256,12 @@ class StreamExecution(
       state = TERMINATED
 
       // Update metrics and status
-      streamMetrics.stop()
       sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
-      updateStatus()
 
       // Notify others
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
       postEvent(
-        new QueryTerminatedEvent(currentStatus, exception.map(_.cause).map(Utils.exceptionString)))
+       new QueryTerminatedEvent(id, exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }
@@ -328,14 +318,13 @@ class StreamExecution(
     val hasNewData = {
       awaitBatchLock.lock()
       try {
-        reportTimeTaken(GET_OFFSET_LATENCY) {
-          val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
-            reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
-              (s, s.getOffset)
-            }
-          }.toMap
-          availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
-        }
+        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s =>
+          updateStatusMessage(s"Getting offsets from $s")
+          reportTimeTaken("getOffset") {
+            (s, s.getOffset)
+          }
+        }.toMap
+        availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get)
 
         if (dataAvailable) {
           true
@@ -350,8 +339,10 @@ class StreamExecution(
     if (hasNewData) {
       // Current batch timestamp in milliseconds
       streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis()
-      reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-        assert(offsetLog.add(currentBatchId,
+      updateStatusMessage("Writing offsets to log")
+      reportTimeTaken("walCommit") {
+        assert(offsetLog.add(
+          currentBatchId,
           availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)),
           s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId. " +
@@ -384,30 +375,24 @@ class StreamExecution(
         awaitBatchLock.unlock()
       }
     }
-    reportTimestamp(GET_OFFSET_TIMESTAMP)
   }
 
   /**
    * Processes any data available between `availableOffsets` and `committedOffsets`.
    */
   private def runBatch(): Unit = {
-    // TODO: Move this to IncrementalExecution.
-
     // Request unprocessed data from all sources.
-    val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+    newData = reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
         case (source, available)
           if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
           val current = committedOffsets.get(source)
-          val batch = reportTimeTaken(source, SOURCE_GET_BATCH_LATENCY) {
-            source.getBatch(current, available)
-          }
+          val batch = source.getBatch(current, available)
           logDebug(s"Retrieving data from $source: $current -> $available")
           Some(source -> batch)
         case _ => None
       }
     }
-    reportTimestamp(GET_BATCH_TIMESTAMP)
 
     // A list of attributes that will need to be updated.
     var replacements = new ArrayBuffer[(Attribute, Attribute)]
@@ -438,7 +423,7 @@ class StreamExecution(
           cd.dataType)
     }
 
-    val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) {
+    val executedPlan = reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSession,
         triggerLogicalPlan,
@@ -451,11 +436,12 @@ class StreamExecution(
 
     val nextBatch =
       new Dataset(sparkSession, lastExecution, RowEncoder(lastExecution.analyzed.schema))
-    sink.addBatch(currentBatchId, nextBatch)
-    reportNumRows(executedPlan, triggerLogicalPlan, newData)
+
+    reportTimeTaken("addBatch") {
+      sink.addBatch(currentBatchId, nextBatch)
+    }
 
     // Update the eventTime watermark if we find one in the plan.
-    // TODO: Does this need to be an AttributeMap?
     lastExecution.executedPlan.collect {
       case e: EventTimeWatermarkExec =>
         logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
@@ -468,10 +454,6 @@ class StreamExecution(
         logTrace(s"Event time didn't move: $newWatermark < " +
           s"$streamExecutionMetadata.currentEventTimeWatermark")
       }
-
-      if (newWatermark != 0) {
-        streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark)
-      }
     }
 
     awaitBatchLock.lock()
@@ -481,9 +463,6 @@ class StreamExecution(
     } finally {
       awaitBatchLock.unlock()
     }
-
-    // Update committed offsets.
-    committedOffsets ++= availableOffsets
   }
 
   private def postEvent(event: StreamingQueryListener.Event) {
@@ -616,145 +595,12 @@ class StreamExecution(
      """.stripMargin
   }
 
-  /**
-   * Report row metrics of the executed trigger
-   * @param triggerExecutionPlan Execution plan of the trigger
-   * @param triggerLogicalPlan Logical plan of the trigger, generated from the query logical plan
-   * @param sourceToDF Source to DataFrame returned by the source.getBatch in this trigger
-   */
-  private def reportNumRows(
-      triggerExecutionPlan: SparkPlan,
-      triggerLogicalPlan: LogicalPlan,
-      sourceToDF: Map[Source, DataFrame]): Unit = {
-    // We want to associate execution plan leaves to sources that generate them, so that we match
-    // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following.
-    // Consider the translation from the streaming logical plan to the final executed plan.
-    //
-    //  streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan
-    //
-    // 1. We keep track of streaming sources associated with each leaf in the trigger's logical plan
-    //    - Each logical plan leaf will be associated with a single streaming source.
-    //    - There can be multiple logical plan leaves associated with a streaming source.
-    //    - There can be leaves not associated with any streaming source, because they were
-    //      generated from a batch source (e.g. stream-batch joins)
-    //
-    // 2. Assuming that the executed plan has same number of leaves in the same order as that of
-    //    the trigger logical plan, we associate executed plan leaves with corresponding
-    //    streaming sources.
-    //
-    // 3. For each source, we sum the metrics of the associated execution plan leaves.
-    //
-    val logicalPlanLeafToSource = sourceToDF.flatMap { case (source, df) =>
-      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
-    }
-    val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // includes non-streaming sources
-    val allExecPlanLeaves = triggerExecutionPlan.collectLeaves()
-    val sourceToNumInputRows: Map[Source, Long] =
-      if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
-        val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
-          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source }
-        }
-        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, source) =>
-          val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
-          source -> numRows
-        }
-        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source
-      } else {
-        if (!metricWarningLogged) {
-          def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}"
-          logWarning(
-            "Could not report metrics as number leaves in trigger logical plan did not match that" +
-              s" of the execution plan:\n" +
-              s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
-              s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
-          metricWarningLogged = true
-        }
-        Map.empty
-      }
-    val numOutputRows = triggerExecutionPlan.metrics.get("numOutputRows").map(_.value)
-    val stateNodes = triggerExecutionPlan.collect {
-      case p if p.isInstanceOf[StateStoreSaveExec] => p
-    }
-
-    streamMetrics.reportNumInputRows(sourceToNumInputRows)
-    stateNodes.zipWithIndex.foreach { case (s, i) =>
-      streamMetrics.reportTriggerDetail(
-        NUM_TOTAL_STATE_ROWS(i + 1),
-        s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
-      streamMetrics.reportTriggerDetail(
-        NUM_UPDATED_STATE_ROWS(i + 1),
-        s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
-    }
-    updateStatus()
-  }
-
-  private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    val timeTaken = math.max(endTime - startTime, 0)
-    streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken)
-    updateStatus()
-    if (triggerDetailKey == TRIGGER_LATENCY) {
-      logInfo(s"Completed up to $availableOffsets in $timeTaken ms")
-    }
-    result
-  }
-
-  private def reportTimeTaken[T](source: Source, triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    streamMetrics.reportSourceTriggerDetail(
-      source, triggerDetailKey, math.max(endTime - startTime, 0))
-    updateStatus()
-    result
-  }
-
-  private def reportTimestamp(triggerDetailKey: String): Unit = {
-    streamMetrics.reportTriggerDetail(triggerDetailKey, triggerClock.getTimeMillis)
-    updateStatus()
-  }
-
-  private def updateStatus(): Unit = {
-    val localAvailableOffsets = availableOffsets
-    val sourceStatuses = sources.map { s =>
-      SourceStatus(
-        s.toString,
-        localAvailableOffsets.get(s).map(_.json).getOrElse("-"),
-        streamMetrics.currentSourceInputRate(s),
-        streamMetrics.currentSourceProcessingRate(s),
-        streamMetrics.currentSourceTriggerDetails(s))
-    }.toArray
-    val sinkStatus = SinkStatus(
-      sink.toString,
-      committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
-
-    currentStatus =
-      StreamingQueryStatus(
-        name = name,
-        id = id,
-        timestamp = triggerClock.getTimeMillis(),
-        inputRate = streamMetrics.currentInputRate(),
-        processingRate = streamMetrics.currentProcessingRate(),
-        latency = streamMetrics.currentLatency(),
-        sourceStatuses = sourceStatuses,
-        sinkStatus = sinkStatus,
-        triggerDetails = streamMetrics.currentTriggerDetails())
-  }
-
   trait State
   case object INITIALIZED extends State
   case object ACTIVE extends State
   case object TERMINATED extends State
 }
 
-object StreamExecution {
-  private val _nextId = new AtomicLong(0)
-
-  def nextId: Long = _nextId.getAndIncrement()
-}
-
 /**
  * Contains metadata associated with a stream execution. This information is
  * persisted to the offset log via the OffsetSeq metadata field. Current
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
deleted file mode 100644
index 942e6ed894..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.{util => ju}
-
-import scala.collection.mutable
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.{Source => CodahaleSource}
-import org.apache.spark.util.Clock
-
-/**
- * Class that manages all the metrics related to a StreamingQuery. It does the following.
- * - Calculates metrics (rates, latencies, etc.) based on information reported by StreamExecution.
- * - Allows the current metric values to be queried
- * - Serves some of the metrics through Codahale/DropWizard metrics
- *
- * @param sources Unique set of sources in a query
- * @param triggerClock Clock used for triggering in StreamExecution
- * @param codahaleSourceName Root name for all the Codahale metrics
- */
-class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceName: String)
-  extends CodahaleSource with Logging {
-
-  import StreamMetrics._
-
-  // Trigger infos
-  private val triggerDetails = new mutable.HashMap[String, String]
-  private val sourceTriggerDetails = new mutable.HashMap[Source, mutable.HashMap[String, String]]
-
-  // Rate estimators for sources and sinks
-  private val inputRates = new mutable.HashMap[Source, RateCalculator]
-  private val processingRates = new mutable.HashMap[Source, RateCalculator]
-
-  // Number of input rows in the current trigger
-  private val numInputRows = new mutable.HashMap[Source, Long]
-  private var currentTriggerStartTimestamp: Long = -1
-  private var previousTriggerStartTimestamp: Long = -1
-  private var latency: Option[Double] = None
-
-  override val sourceName: String = codahaleSourceName
-  override val metricRegistry: MetricRegistry = new MetricRegistry
-
-  // =========== Initialization ===========
-
-  // Metric names should not have . in them, so that all the metrics of a query are identified
-  // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", currentInputRate)
-  registerGauge("processingRate-total", () => currentProcessingRate)
-  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
-
-  sources.foreach { s =>
-    inputRates.put(s, new RateCalculator)
-    processingRates.put(s, new RateCalculator)
-    sourceTriggerDetails.put(s, new mutable.HashMap[String, String])
-
-    registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s))
-    registerGauge(s"processingRate-${s.toString}", () => currentSourceProcessingRate(s))
-  }
-
-  // =========== Setter methods ===========
-
-  def reportTriggerStarted(batchId: Long): Unit = synchronized {
-    numInputRows.clear()
-    triggerDetails.clear()
-    sourceTriggerDetails.values.foreach(_.clear())
-
-    reportTriggerDetail(BATCH_ID, batchId)
-    sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
-    reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
-    currentTriggerStartTimestamp = triggerClock.getTimeMillis()
-    reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
-  }
-
-  def reportTriggerDetail[T](key: String, value: T): Unit = synchronized {
-    triggerDetails.put(key, value.toString)
-  }
-
-  def reportSourceTriggerDetail[T](source: Source, key: String, value: T): Unit = synchronized {
-    sourceTriggerDetails(source).put(key, value.toString)
-  }
-
-  def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized {
-    numInputRows ++= inputRows
-  }
-
-  def reportTriggerFinished(): Unit = synchronized {
-    require(currentTriggerStartTimestamp >= 0)
-    val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
-    reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
-    triggerDetails.remove(STATUS_MESSAGE)
-    reportTriggerDetail(IS_TRIGGER_ACTIVE, false)
-
-    // Report number of rows
-    val totalNumInputRows = numInputRows.values.sum
-    reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows)
-    numInputRows.foreach { case (s, r) =>
-      reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r)
-    }
-
-    val currentTriggerDuration = currentTriggerFinishTimestamp - currentTriggerStartTimestamp
-    val previousInputIntervalOption = if (previousTriggerStartTimestamp >= 0) {
-      Some(currentTriggerStartTimestamp - previousTriggerStartTimestamp)
-    } else None
-
-    // Update input rate = num rows received by each source during the previous trigger interval
-    // Interval is measures as interval between start times of previous and current trigger.
-    //
-    // TODO: Instead of trigger start, we should use time when getOffset was called on each source
-    // as this may be different for each source if there are many sources in the query plan
-    // and getOffset is called serially on them.
-    if (previousInputIntervalOption.nonEmpty) {
-      sources.foreach { s =>
-        inputRates(s).update(numInputRows.getOrElse(s, 0), previousInputIntervalOption.get)
-      }
-    }
-
-    // Update processing rate = num rows processed for each source in current trigger duration
-    sources.foreach { s =>
-      processingRates(s).update(numInputRows.getOrElse(s, 0), currentTriggerDuration)
-    }
-
-    // Update latency = if data present, 0.5 * previous trigger interval + current trigger duration
-    if (previousInputIntervalOption.nonEmpty && totalNumInputRows > 0) {
-      latency = Some((previousInputIntervalOption.get.toDouble / 2) + currentTriggerDuration)
-    } else {
-      latency = None
-    }
-
-    previousTriggerStartTimestamp = currentTriggerStartTimestamp
-    currentTriggerStartTimestamp = -1
-  }
-
-  // =========== Getter methods ===========
-
-  def currentInputRate(): Double = synchronized {
-    // Since we are calculating source input rates using the same time interval for all sources
-    // it is fine to calculate total input rate as the sum of per source input rate.
-    inputRates.map(_._2.currentRate).sum
-  }
-
-  def currentSourceInputRate(source: Source): Double = synchronized {
-    inputRates(source).currentRate
-  }
-
-  def currentProcessingRate(): Double = synchronized {
-    // Since we are calculating source processing rates using the same time interval for all sources
-    // it is fine to calculate total processing rate as the sum of per source processing rate.
-    processingRates.map(_._2.currentRate).sum
-  }
-
-  def currentSourceProcessingRate(source: Source): Double = synchronized {
-    processingRates(source).currentRate
-  }
-
-  def currentLatency(): Option[Double] = synchronized { latency }
-
-  def currentTriggerDetails(): Map[String, String] = synchronized { triggerDetails.toMap }
-
-  def currentSourceTriggerDetails(source: Source): Map[String, String] = synchronized {
-    sourceTriggerDetails(source).toMap
-  }
-
-  // =========== Other methods ===========
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: Numeric[T]): Unit = {
-    synchronized {
-      metricRegistry.register(name, new Gauge[T] {
-        override def getValue: T = f()
-      })
-    }
-  }
-
-  def stop(): Unit = synchronized {
-    triggerDetails.clear()
-    inputRates.valuesIterator.foreach { _.stop() }
-    processingRates.valuesIterator.foreach { _.stop() }
-    latency = None
-  }
-}
-
-object StreamMetrics extends Logging {
-  /** Simple utility class to calculate rate while avoiding DivideByZero */
-  class RateCalculator {
-    @volatile private var rate: Option[Double] = None
-
-    def update(numRows: Long, timeGapMs: Long): Unit = {
-      if (timeGapMs > 0) {
-        rate = Some(numRows.toDouble * 1000 / timeGapMs)
-      } else {
-        rate = None
-        logDebug(s"Rate updates cannot with zero or negative time gap $timeGapMs")
-      }
-    }
-
-    def currentRate: Double = rate.getOrElse(0.0)
-
-    def stop(): Unit = { rate = None }
-  }
-
-
-  val BATCH_ID = "batchId"
-  val IS_TRIGGER_ACTIVE = "isTriggerActive"
-  val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
-  val STATUS_MESSAGE = "statusMessage"
-  val EVENT_TIME_WATERMARK = "eventTimeWatermark"
-
-  val START_TIMESTAMP = "timestamp.triggerStart"
-  val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"
-  val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch"
-  val FINISH_TIMESTAMP = "timestamp.triggerFinish"
-
-  val GET_OFFSET_LATENCY = "latency.getOffset.total"
-  val GET_BATCH_LATENCY = "latency.getBatch.total"
-  val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite"
-  val OPTIMIZER_LATENCY = "latency.optimizer"
-  val TRIGGER_LATENCY = "latency.fullTrigger"
-  val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source"
-  val SOURCE_GET_BATCH_LATENCY = "latency.getBatch.source"
-
-  val NUM_INPUT_ROWS = "numRows.input.total"
-  val NUM_SOURCE_INPUT_ROWS = "numRows.input.source"
-  def NUM_TOTAL_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.total"
-  def NUM_UPDATED_STATE_ROWS(aggId: Int): String = s"numRows.state.aggregation$aggId.updated"
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5589805212..21b26b8146 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -583,6 +583,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val STREAMING_PROGRESS_RETENTION =
+    SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
+      .doc("The number of progress updates to retain for a streaming query")
+      .intConf
+      .createWithDefault(100)
+
   val NDV_MAX_ERROR =
     SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
       .internal()
@@ -654,6 +660,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
 
+  def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
+
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
   def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
deleted file mode 100644
index ab19602207..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming sink.
- *
- * @param description Description of the source corresponding to this status.
- * @param offsetDesc Description of the current offsets up to which data has been written
- *                   by the sink.
- * @since 2.0.0
- */
-@Experimental
-class SinkStatus private(
-    val description: String,
-    val offsetDesc: String) {
-
-  /** The compact JSON representation of this status. */
-  def json: String = compact(render(jsonValue))
-
-  /** The pretty (i.e. indented) JSON representation of this status. */
-  def prettyJson: String = pretty(render(jsonValue))
-
-  override def toString: String =
-    "Status of sink " + indent(prettyString).trim
-
-  private[sql] def jsonValue: JValue = {
-    ("description" -> JString(description)) ~
-    ("offsetDesc" -> JString(offsetDesc))
-  }
-
-  private[sql] def prettyString: String = {
-    s"""$description
-       |Committed offsets: $offsetDesc
-       |""".stripMargin
-  }
-}
-
-/** Companion object, primarily for creating SinkStatus instances internally */
-private[sql] object SinkStatus {
-  def apply(desc: String, offsetDesc: String): SinkStatus = new SinkStatus(desc, offsetDesc)
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
deleted file mode 100644
index cfdf11370e..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.{util => ju}
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
-import org.apache.spark.util.JsonProtocol
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming Source.
- *
- * @param description Description of the source corresponding to this status.
- * @param offsetDesc Description of the current offset if known.
- * @param inputRate Current rate (rows/sec) at which data is being generated by the source.
- * @param processingRate Current rate (rows/sec) at which the query is processing data from
- *                       the source.
- * @param triggerDetails Low-level details of the currently active trigger (e.g. number of
- *                      rows processed in trigger, latency of intermediate steps, etc.).
- *                      If no trigger is active, then it will have details of the last completed
- *                      trigger.
- * @since 2.0.0
- */
-@Experimental
-class SourceStatus private(
-    val description: String,
-    val offsetDesc: String,
-    val inputRate: Double,
-    val processingRate: Double,
-    val triggerDetails: ju.Map[String, String]) {
-
-  /** The compact JSON representation of this status. */
-  def json: String = compact(render(jsonValue))
-
-  /** The pretty (i.e. indented) JSON representation of this status. */
-  def prettyJson: String = pretty(render(jsonValue))
-
-  override def toString: String =
-    "Status of source " + indent(prettyString).trim
-
-  private[sql] def jsonValue: JValue = {
-    ("description" -> JString(description)) ~
-    ("offsetDesc" -> JString(offsetDesc)) ~
-    ("inputRate" -> JDouble(inputRate)) ~
-    ("processingRate" -> JDouble(processingRate)) ~
-    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
-  }
-
-  private[sql] def prettyString: String = {
-    val triggerDetailsLines =
-      triggerDetails.asScala.map { case (k, v) => s"$k: $v" }
-    s"""$description
-       |Available offset: $offsetDesc
-       |Input rate: $inputRate rows/sec
-       |Processing rate: $processingRate rows/sec
-       |Trigger details:
-       |""".stripMargin + indent(triggerDetailsLines)
-  }
-}
-
-/** Companion object, primarily for creating SourceStatus instances internally */
-private[sql] object SourceStatus {
-  def apply(
-      desc: String,
-      offsetDesc: String,
-      inputRate: Double,
-      processingRate: Double,
-      triggerDetails: Map[String, String]): SourceStatus = {
-    new SourceStatus(desc, offsetDesc, inputRate, processingRate, triggerDetails.asJava)
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 374313f2ca..8fc4e43b6d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.SparkSession
 
@@ -33,25 +35,27 @@ trait StreamingQuery {
    * Returns the name of the query. This name is unique across all active queries. This can be
    * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as
    * `dataframe.writeStream.queryName("query").start()`.
+   *
    * @since 2.0.0
    */
   def name: String
 
   /**
-   * Returns the unique id of this query. This id is automatically generated and is unique across
-   * all queries that have been started in the current process.
-   * @since 2.0.0
+   * Returns the unique id of this query.
+   * @since 2.1.0
    */
-  def id: Long
+  def id: UUID
 
   /**
    * Returns the `SparkSession` associated with `this`.
+   *
    * @since 2.0.0
    */
   def sparkSession: SparkSession
 
   /**
-   * Whether the query is currently active or not
+   * Returns `true` if this query is actively running.
+   *
    * @since 2.0.0
    */
   def isActive: Boolean
@@ -64,23 +68,26 @@ trait StreamingQuery {
 
   /**
    * Returns the current status of the query.
+   *
    * @since 2.0.2
    */
   def status: StreamingQueryStatus
 
   /**
-   * Returns current status of all the sources.
-   * @since 2.0.0
+   * Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
+   * The number of progress updates retained for each stream is configured by Spark session
+   * configuration `spark.sql.streaming.numRecentProgresses`.
+   *
+   * @since 2.1.0
    */
-  @deprecated("use status.sourceStatuses", "2.0.2")
-  def sourceStatuses: Array[SourceStatus]
+  def recentProgresses: Array[StreamingQueryProgress]
 
   /**
-   * Returns current status of the sink.
-   * @since 2.0.0
+   * Returns the most recent [[StreamingQueryProgress]] update of this streaming query.
+   *
+   * @since 2.1.0
    */
-  @deprecated("use status.sinkStatus", "2.0.2")
-  def sinkStatus: SinkStatus
+  def lastProgress: StreamingQueryProgress
 
   /**
    * Waits for the termination of `this` query, either by `query.stop()` or by an exception.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 0a58142e06..13f11ba1c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
  * :: Experimental ::
  * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
  * that caused the failure.
- * @param query      Query that caused the exception
+ * @param query       Query that caused the exception
  * @param message     Message of this exception
  * @param cause       Internal cause of this exception
  * @param startOffset Starting offset (if known) of the range of data in which exception occurred
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 9e311fae84..d9ee75c064 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.scheduler.SparkListenerEvent
 
@@ -81,30 +83,28 @@ object StreamingQueryListener {
   /**
    * :: Experimental ::
    * Event representing the start of a query
-   * @since 2.0.0
+   * @since 2.1.0
    */
   @Experimental
-  class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
+  class QueryStartedEvent private[sql](val id: UUID, val name: String) extends Event
 
   /**
    * :: Experimental ::
-   * Event representing any progress updates in a query
-   * @since 2.0.0
+   * Event representing any progress updates in a query.
+   * @since 2.1.0
    */
   @Experimental
-  class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) extends Event
+  class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event
 
   /**
    * :: Experimental ::
-   * Event representing that termination of a query
+   * Event representing that termination of a query.
    *
-   * @param queryStatus Information about the status of the query.
-   * @param exception The exception message of the [[StreamingQuery]] if the query was terminated
+   * @param id The query id.
+   * @param exception The exception message of the query if the query was terminated
    *                  with an exception. Otherwise, it will be `None`.
-   * @since 2.0.0
+   * @since 2.1.0
    */
   @Experimental
-  class QueryTerminatedEvent private[sql](
-      val queryStatus: StreamingQueryStatus,
-      val exception: Option[String]) extends Event
+  class QueryTerminatedEvent private[sql](val id: UUID, val exception: Option[String]) extends Event
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 53968a82d8..c448468bea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
@@ -41,7 +44,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
   private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
-  private val activeQueries = new mutable.HashMap[Long, StreamingQuery]
+  private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
   private val activeQueriesLock = new Object
   private val awaitTerminationLock = new Object
 
@@ -59,12 +62,19 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
   /**
    * Returns the query if there is an active query with the given id, or null.
    *
-   * @since 2.0.0
+   * @since 2.1.0
    */
-  def get(id: Long): StreamingQuery = activeQueriesLock.synchronized {
+  def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
     activeQueries.get(id).orNull
   }
 
+  /**
+   * Returns the query if there is an active query with the given id, or null.
+   *
+   * @since 2.1.0
+   */
+  def get(id: String): StreamingQuery = get(UUID.fromString(id))
+
   /**
    * Wait until any of the queries on the associated SQLContext has terminated since the
    * creation of the context, or since `resetTerminated()` was called. If any query was terminated
@@ -197,8 +207,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       trigger: Trigger = ProcessingTime(0),
       triggerClock: Clock = new SystemClock()): StreamingQuery = {
     activeQueriesLock.synchronized {
-      val id = StreamExecution.nextId
-      val name = userSpecifiedName.getOrElse(s"query-$id")
+      val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}")
       if (activeQueries.values.exists(_.name == name)) {
         throw new IllegalArgumentException(
           s"Cannot start query with name $name as a query with that name is already active")
@@ -252,7 +261,6 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
       }
       val query = new StreamExecution(
         sparkSession,
-        id,
         name,
         checkpointLocation,
         logicalPlan,
@@ -261,7 +269,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
         triggerClock,
         outputMode)
       query.start()
-      activeQueries.put(id, query)
+      activeQueries.put(query.id, query)
       query
     }
   }
@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
     }
   }
 }
+
+private object StreamingQueryManager {
+  private val _nextId = new AtomicLong(0)
+  private def nextId: Long = _nextId.getAndIncrement()
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index ba732ff7fc..4c1a7ce6a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,146 +17,17 @@
 
 package org.apache.spark.sql.streaming
 
-import java.{util => ju}
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq}
-import org.apache.spark.util.JsonProtocol
-
 /**
- * :: Experimental ::
- * A class used to report information about the progress of a [[StreamingQuery]].
+ * Reports information about the instantaneous status of a streaming query.
  *
- * @param name Name of the query. This name is unique across all active queries.
- * @param id Id of the query. This id is unique across
- *          all queries that have been started in the current process.
- * @param timestamp Timestamp (ms) of when this query was generated.
- * @param inputRate Current rate (rows/sec) at which data is being generated by all the sources.
- * @param processingRate Current rate (rows/sec) at which the query is processing data from
- *                       all the sources.
- * @param latency  Current average latency between the data being available in source and the sink
- *                   writing the corresponding output.
- * @param sourceStatuses Current statuses of the sources.
- * @param sinkStatus Current status of the sink.
- * @param triggerDetails Low-level details of the currently active trigger (e.g. number of
- *                      rows processed in trigger, latency of intermediate steps, etc.).
- *                      If no trigger is active, then it will have details of the last completed
- *                      trigger.
- * @since 2.0.0
+ * @param message A human readable description of what the stream is currently doing.
+ * @param isDataAvailable True when there is new data to be processed.
+ * @param isTriggerActive True when the trigger is actively firing, false when waiting for the
+ *                        next trigger time.
+ *
+ * @since 2.1.0
  */
-@Experimental
-class StreamingQueryStatus private(
-  val name: String,
-  val id: Long,
-  val timestamp: Long,
-  val inputRate: Double,
-  val processingRate: Double,
-  val latency: Option[Double],
-  val sourceStatuses: Array[SourceStatus],
-  val sinkStatus: SinkStatus,
-  val triggerDetails: ju.Map[String, String]) {
-
-  import StreamingQueryStatus._
-
-  /** The compact JSON representation of this status. */
-  def json: String = compact(render(jsonValue))
-
-  /** The pretty (i.e. indented) JSON representation of this status. */
-  def prettyJson: String = pretty(render(jsonValue))
-
-  override def toString: String = {
-    val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
-      s"Source ${i + 1} - " + indent(s.prettyString).trim
-    }
-    val sinkStatusLines = sinkStatus.prettyString.trim
-    val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted
-    val numSources = sourceStatuses.length
-    val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" }
-
-    val allLines =
-      s"""|Query id: $id
-          |Status timestamp: $timestamp
-          |Input rate: $inputRate rows/sec
-          |Processing rate $processingRate rows/sec
-          |Latency: ${latency.getOrElse("-")} ms
-          |Trigger details:
-          |${indent(triggerDetailsLines)}
-          |Source statuses [$numSourcesString]:
-          |${indent(sourceStatusLines)}
-          |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin
-
-    s"Status of query '$name'\n${indent(allLines)}"
-  }
-
-  private[sql] def jsonValue: JValue = {
-    ("name" -> JString(name)) ~
-    ("id" -> JInt(id)) ~
-    ("timestamp" -> JInt(timestamp)) ~
-    ("inputRate" -> JDouble(inputRate)) ~
-    ("processingRate" -> JDouble(processingRate)) ~
-    ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
-    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
-    ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
-    ("sinkStatus" -> sinkStatus.jsonValue)
-  }
-}
-
-/** Companion object, primarily for creating StreamingQueryInfo instances internally */
-private[sql] object StreamingQueryStatus {
-  def apply(
-      name: String,
-      id: Long,
-      timestamp: Long,
-      inputRate: Double,
-      processingRate: Double,
-      latency: Option[Double],
-      sourceStatuses: Array[SourceStatus],
-      sinkStatus: SinkStatus,
-      triggerDetails: Map[String, String]): StreamingQueryStatus = {
-    new StreamingQueryStatus(name, id, timestamp, inputRate, processingRate,
-      latency, sourceStatuses, sinkStatus, triggerDetails.asJava)
-  }
-
-  def indent(strings: Iterable[String]): String = strings.map(indent).mkString("\n")
-  def indent(string: String): String = string.split("\n").map("    " + _).mkString("\n")
-
-  /** Create an instance of status for python testing */
-  def testStatus(): StreamingQueryStatus = {
-    import org.apache.spark.sql.execution.streaming.StreamMetrics._
-    StreamingQueryStatus(
-      name = "query",
-      id = 1,
-      timestamp = 123,
-      inputRate = 15.5,
-      processingRate = 23.5,
-      latency = Some(345),
-      sourceStatuses = Array(
-        SourceStatus(
-          desc = "MySource1",
-          offsetDesc = LongOffset(0).json,
-          inputRate = 15.5,
-          processingRate = 23.5,
-          triggerDetails = Map(
-            NUM_SOURCE_INPUT_ROWS -> "100",
-            SOURCE_GET_OFFSET_LATENCY -> "10",
-            SOURCE_GET_BATCH_LATENCY -> "20"))),
-      sinkStatus = SinkStatus(
-        desc = "MySink",
-        offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
-      triggerDetails = Map(
-        BATCH_ID -> "5",
-        IS_TRIGGER_ACTIVE -> "true",
-        IS_DATA_PRESENT_IN_TRIGGER -> "true",
-        GET_OFFSET_LATENCY -> "10",
-        GET_BATCH_LATENCY -> "20",
-        NUM_INPUT_ROWS -> "100"
-      ))
-  }
-}
+case class StreamingQueryStatus protected[sql](
+    message: String,
+    isDataAvailable: Boolean,
+    isTriggerActive: Boolean)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
new file mode 100644
index 0000000000..7129fa4d15
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long) {
+  private[sql] def jsonValue: JValue = {
+    ("numRowsTotal" -> JInt(numRowsTotal)) ~
+    ("numRowsUpdated" -> JInt(numRowsUpdated))
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made in the execution of a [[StreamingQuery]] during
+ * a trigger. Each event relates to processing done for a single trigger of the streaming
+ * query. Events are emitted even when no new data is available to be processed.
+ *
+ * @param id A unique id of the query.
+ * @param name Name of the query. This name is unique across all active queries.
+ * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param batchId A unique id for the current batch of data being processed.  Note that in the
+ *                case of retries after a failure a given batchId my be executed more than once.
+ *                Similarly, when there is no data to be processed, the batchId will not be
+ *                incremented.
+ * @param durationMs The amount of time taken to perform various operations in milliseconds.
+ * @param currentWatermark The current event time watermark in milliseconds
+ * @param stateOperators Information about operators in the query that store state.
+ * @param sources detailed statistics on data being read from each of the streaming sources.
+ * @since 2.1.0
+ */
+@Experimental
+class StreamingQueryProgress private[sql](
+  val id: UUID,
+  val name: String,
+  val timestamp: Long,
+  val batchId: Long,
+  val durationMs: ju.Map[String, java.lang.Long],
+  val currentWatermark: Long,
+  val stateOperators: Array[StateOperatorProgress],
+  val sources: Array[SourceProgress],
+  val sink: SinkProgress) {
+
+  /** The aggregate (across all sources) number of records processed in a trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum
+
+  /** The aggregate (across all sources) rate of data arriving. */
+  def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
+
+  /** The aggregate (across all sources) rate at which Spark is processing data. */
+  def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    def safeDoubleToJValue(value: Double): JValue = {
+      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+    }
+
+    ("id" -> JString(id.toString)) ~
+    ("name" -> JString(name)) ~
+    ("timestamp" -> JInt(timestamp)) ~
+    ("numInputRows" -> JInt(numInputRows)) ~
+    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
+    ("durationMs" -> durationMs
+        .asScala
+        .map { case (k, v) => k -> JInt(v.toLong): JObject }
+        .reduce(_ ~ _)) ~
+    ("currentWatermark" -> JInt(currentWatermark)) ~
+    ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
+    ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
+    ("sink" -> sink.jsonValue)
+
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made for a source in the execution of a [[StreamingQuery]]
+ * during a trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description            Description of the source.
+ * @param startOffset            The starting offset for data being read.
+ * @param endOffset              The ending offset for data being read.
+ * @param numInputRows           The number of records read from this source.
+ * @param inputRowsPerSecond     The rate at which data is arriving from this source.
+ * @param processedRowsPerSecond The rate at which data from this source is being procressed by
+ *                               Spark.
+ * @since 2.1.0
+ */
+@Experimental
+class SourceProgress protected[sql](
+  val description: String,
+  val startOffset: String,
+  val endOffset: String,
+  val numInputRows: Long,
+  val inputRowsPerSecond: Double,
+  val processedRowsPerSecond: Double) {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    def safeDoubleToJValue(value: Double): JValue = {
+      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+    }
+
+    ("description" -> JString(description)) ~
+      ("startOffset" -> tryParse(startOffset)) ~
+      ("endOffset" -> tryParse(endOffset)) ~
+      ("numInputRows" -> JInt(numInputRows)) ~
+      ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+      ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
+  }
+
+  private def tryParse(json: String) = try {
+    parse(json)
+  } catch {
+    case NonFatal(e) => JString(json)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made for a sink in the execution of a [[StreamingQuery]]
+ * during a trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description Description of the source corresponding to this status.
+ * @since 2.1.0
+ */
+@Experimental
+class SinkProgress protected[sql](
+    val description: String) {
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    ("description" -> JString(description))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
deleted file mode 100644
index 38c4ece439..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.scalactic.TolerantNumerics
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.util.ManualClock
-
-class StreamMetricsSuite extends SparkFunSuite {
-  import StreamMetrics._
-
-  // To make === between double tolerate inexact values
-  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
-
-  test("rates, latencies, trigger details - basic life cycle") {
-    val sm = newStreamMetrics(source)
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails().isEmpty)
-
-    // When trigger started, the rates should not change, but should return
-    // reported trigger details
-    sm.reportTriggerStarted(1)
-    sm.reportTriggerDetail("key", "value")
-    sm.reportSourceTriggerDetail(source, "key2", "value2")
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails() ===
-      Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
-        START_TIMESTAMP -> "0", "key" -> "value"))
-    assert(sm.currentSourceTriggerDetails(source) ===
-      Map(BATCH_ID -> "1", "key2" -> "value2"))
-
-    // Finishing the trigger should calculate the rates, except input rate which needs
-    // to have another trigger interval
-    sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output rows
-    clock.advance(1000)
-    sm.reportTriggerFinished()
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 100.0)  // 100 input rows processed in 1 sec
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 100.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails() ===
-      Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
-        START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
-        NUM_INPUT_ROWS -> "100", "key" -> "value"))
-    assert(sm.currentSourceTriggerDetails(source) ===
-      Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
-
-    // After another trigger starts, the rates and latencies should not change until
-    // new rows are reported
-    clock.advance(1000)
-    sm.reportTriggerStarted(2)
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 100.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 100.0)
-    assert(sm.currentLatency() === None)
-
-    // Reporting new rows should update the rates and latencies
-    sm.reportNumInputRows(Map(source -> 200L))     // 200 input rows
-    clock.advance(500)
-    sm.reportTriggerFinished()
-    assert(sm.currentInputRate() === 100.0)      // 200 input rows generated in 2 seconds b/w starts
-    assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
-    assert(sm.currentSourceInputRate(source) === 100.0)
-    assert(sm.currentSourceProcessingRate(source) === 400.0)
-    assert(sm.currentLatency().get === 1500.0)       // 2000 ms / 2 + 500 ms
-
-    // Rates should be set to 0 after stop
-    sm.stop()
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails().isEmpty)
-  }
-
-  test("rates and latencies - after trigger with no data") {
-    val sm = newStreamMetrics(source)
-    // Trigger 1 with data
-    sm.reportTriggerStarted(1)
-    sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows
-    clock.advance(1000)
-    sm.reportTriggerFinished()
-
-    // Trigger 2 with data
-    clock.advance(1000)
-    sm.reportTriggerStarted(2)
-    sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
-    clock.advance(500)
-    sm.reportTriggerFinished()
-
-    // Make sure that all rates are set
-    require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts
-    require(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec
-    require(sm.currentSourceInputRate(source) === 100.0)
-    require(sm.currentSourceProcessingRate(source) === 400.0)
-    require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
-
-    // Trigger 3 with data
-    clock.advance(500)
-    sm.reportTriggerStarted(3)
-    clock.advance(500)
-    sm.reportTriggerFinished()
-
-    // Rates are set to zero and latency is set to None
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    sm.stop()
-  }
-
-  test("rates - after trigger with multiple sources, and one source having no info") {
-    val source1 = TestSource(1)
-    val source2 = TestSource(2)
-    val sm = newStreamMetrics(source1, source2)
-    // Trigger 1 with data
-    sm.reportTriggerStarted(1)
-    sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L))
-    clock.advance(1000)
-    sm.reportTriggerFinished()
-
-    // Trigger 2 with data
-    clock.advance(1000)
-    sm.reportTriggerStarted(2)
-    sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L))
-    clock.advance(500)
-    sm.reportTriggerFinished()
-
-    // Make sure that all rates are set
-    assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 seconds b/w starts
-    assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows processed in 0.5 sec
-    assert(sm.currentSourceInputRate(source1) === 100.0)
-    assert(sm.currentSourceInputRate(source2) === 100.0)
-    assert(sm.currentSourceProcessingRate(source1) === 400.0)
-    assert(sm.currentSourceProcessingRate(source2) === 400.0)
-
-    // Trigger 3 with data
-    clock.advance(500)
-    sm.reportTriggerStarted(3)
-    clock.advance(500)
-    sm.reportNumInputRows(Map(source1 -> 200L))
-    sm.reportTriggerFinished()
-
-    // Rates are set to zero and latency is set to None
-    assert(sm.currentInputRate() === 200.0)
-    assert(sm.currentProcessingRate() === 400.0)
-    assert(sm.currentSourceInputRate(source1) === 200.0)
-    assert(sm.currentSourceInputRate(source2) === 0.0)
-    assert(sm.currentSourceProcessingRate(source1) === 400.0)
-    assert(sm.currentSourceProcessingRate(source2) === 0.0)
-    sm.stop()
-  }
-
-  test("registered Codahale metrics") {
-    import scala.collection.JavaConverters._
-    val sm = newStreamMetrics(source)
-    val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala
-
-    // so that all metrics are considered as a single metric group in Ganglia
-    assert(!gaugeNames.exists(_.contains(".")))
-    assert(gaugeNames === Set(
-      "inputRate-total",
-      "inputRate-source0",
-      "processingRate-total",
-      "processingRate-source0",
-      "latency"))
-  }
-
-  private def newStreamMetrics(sources: Source*): StreamMetrics = {
-    new StreamMetrics(sources.toSet, clock, "test")
-  }
-
-  private val clock = new ManualClock()
-  private val source = TestSource(0)
-
-  case class TestSource(id: Int) extends Source {
-    override def schema: StructType = StructType(Array.empty[StructField])
-    override def getOffset: Option[Offset] = Some(new LongOffset(0))
-    override def getBatch(start: Option[Offset], end: Offset): DataFrame = { null }
-    override def stop() {}
-    override def toString(): String = s"source$id"
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bad6642ea4..8256c63d87 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1006,9 +1006,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       testStream(input)(
         AddTextFileData("100", src, tmp),
         CheckAnswer("100"),
-        AssertOnLastQueryStatus { status =>
-          assert(status.triggerDetails.get("numRows.input.total") === "1")
-          assert(status.sourceStatuses(0).processingRate > 0.0)
+        AssertOnQuery { query =>
+          val actualProgress = query.recentProgresses
+              .find(_.numInputRows > 0)
+              .getOrElse(sys.error("Could not find records with data."))
+          assert(actualProgress.numInputRows === 1)
+          assert(actualProgress.sources(0).processedRowsPerSecond > 0.0)
+          true
         }
       )
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a6b2d4b9ab..a2629f7f68 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
 
 import org.scalatest.Assertions
 import org.scalatest.concurrent.{Eventually, Timeouts}
-import org.scalatest.concurrent.AsyncAssertions.Waiter
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
@@ -202,10 +201,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     }
   }
 
-  case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
-    extends StreamAction
-
-  class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
+  class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
     private var waitStartTime: Option[Long] = None
 
     override def waitTillTime(targetTime: Long): Long = synchronized {
@@ -325,10 +321,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
 
     val testThread = Thread.currentThread()
     val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
-    val statusCollector = new QueryStatusCollector
     var manualClockExpectedTime = -1L
     try {
-      spark.streams.addListener(statusCollector)
       startedTest.foreach { action =>
         logInfo(s"Processing test stream action: $action")
         action match {
@@ -375,10 +369,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                    s"can not advance clock of type ${currentStream.triggerClock.getClass}")
             val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
             assert(manualClockExpectedTime >= 0)
+
             // Make sure we don't advance ManualClock too early. See SPARK-16002.
             eventually("StreamManualClock has not yet entered the waiting state") {
               assert(clock.isStreamWaitingAt(manualClockExpectedTime))
             }
+
             clock.advance(timeToAdd)
             manualClockExpectedTime += timeToAdd
             verify(clock.getTimeMillis() === manualClockExpectedTime,
@@ -447,13 +443,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
             val streamToAssert = Option(currentStream).getOrElse(lastStream)
             verify({ a.run(); true }, s"Assert failed: ${a.message}")
 
-          case a: AssertOnLastQueryStatus =>
-            Eventually.eventually(timeout(streamingTimeout)) {
-              require(statusCollector.lastTriggerStatus.nonEmpty)
-            }
-            val status = statusCollector.lastTriggerStatus.get
-            verify({ a.condition(status); true }, "Assert on last query status failed")
-
           case a: AddData =>
             try {
               // Add data and get the source where it was added, and the expected offset of the
@@ -528,7 +517,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
       if (currentStream != null && currentStream.microBatchThread.isAlive) {
         currentStream.stop()
       }
-      spark.streams.removeListener(statusCollector)
 
       // Rollback prev configuration values
       resetConfValues.foreach {
@@ -614,7 +602,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     testStream(ds)(actions: _*)
   }
 
-
   object AwaitTerminationTester {
 
     trait ExpectedBehavior
@@ -668,58 +655,4 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
       }
     }
   }
-
-
-  class QueryStatusCollector extends StreamingQueryListener {
-    // to catch errors in the async listener events
-    @volatile private var asyncTestWaiter = new Waiter
-
-    @volatile var startStatus: StreamingQueryStatus = null
-    @volatile var terminationStatus: StreamingQueryStatus = null
-    @volatile var terminationException: Option[String] = null
-
-    private val progressStatuses = new mutable.ArrayBuffer[StreamingQueryStatus]
-
-    /** Get the info of the last trigger that processed data */
-    def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized {
-      progressStatuses.filter { i =>
-        i.triggerDetails.get("isTriggerActive").toBoolean == false &&
-          i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true
-      }.lastOption
-    }
-
-    def reset(): Unit = {
-      startStatus = null
-      terminationStatus = null
-      progressStatuses.clear()
-      asyncTestWaiter = new Waiter
-    }
-
-    def checkAsyncErrors(): Unit = {
-      asyncTestWaiter.await(timeout(10 seconds))
-    }
-
-
-    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
-      asyncTestWaiter {
-        startStatus = queryStarted.queryStatus
-      }
-    }
-
-    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryProgress called before onQueryStarted")
-        synchronized { progressStatuses += queryProgress.queryStatus }
-      }
-    }
-
-    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
-        terminationStatus = queryTerminated.queryStatus
-        terminationException = queryTerminated.exception
-      }
-      asyncTestWaiter.dismiss()
-    }
-  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 98f3bec708..c68f953b10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,24 +17,26 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+
 import scala.collection.mutable
 
 import org.scalactic.TolerantNumerics
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.BeforeAndAfter
 import org.scalatest.PrivateMethodTester._
 
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions._
-import org.apache.spark.util.{JsonProtocol, ManualClock}
-
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.util.JsonProtocol
 
 class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   import testImplicits._
-  import StreamingQueryListenerSuite._
 
   // To make === between double tolerate inexact values
   implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
@@ -46,86 +48,86 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     // Make sure we don't leak any events to the next test
   }
 
-  test("single listener, check trigger statuses") {
-    import StreamingQueryListenerSuite._
-    clock = new StreamManualClock
-
-    /** Custom MemoryStream that waits for manual clock to reach a time */
-    val inputData = new MemoryStream[Int](0, sqlContext) {
-      // Wait for manual clock to be 100 first time there is data
-      override def getOffset: Option[Offset] = {
-        val offset = super.getOffset
-        if (offset.nonEmpty) {
-          clock.waitTillTime(100)
+  testQuietly("single listener, check trigger events are generated correctly") {
+    val clock = new StreamManualClock
+    val inputData = new MemoryStream[Int](0, sqlContext)
+    val df = inputData.toDS().as[Long].map { 10 / _ }
+    val listener = new EventCollector
+    try {
+      // No events until started
+      spark.streams.addListener(listener)
+      assert(listener.startEvent === null)
+      assert(listener.progressEvents.isEmpty)
+      assert(listener.terminationEvent === null)
+
+      testStream(df, OutputMode.Append)(
+
+        // Start event generated when query started
+        StartStream(ProcessingTime(100), triggerClock = clock),
+        AssertOnQuery { query =>
+          assert(listener.startEvent !== null)
+          assert(listener.startEvent.id === query.id)
+          assert(listener.startEvent.name === query.name)
+          assert(listener.progressEvents.isEmpty)
+          assert(listener.terminationEvent === null)
+          true
+        },
+
+        // Progress event generated when data processed
+        AddData(inputData, 1, 2),
+        AdvanceManualClock(100),
+        CheckAnswer(10, 5),
+        AssertOnQuery { query =>
+          assert(listener.progressEvents.nonEmpty)
+          assert(listener.progressEvents.last.json === query.lastProgress.json)
+          assert(listener.terminationEvent === null)
+          true
+        },
+
+        // Termination event generated when stopped cleanly
+        StopStream,
+        AssertOnQuery { query =>
+          eventually(Timeout(streamingTimeout)) {
+            assert(listener.terminationEvent !== null)
+            assert(listener.terminationEvent.id === query.id)
+            assert(listener.terminationEvent.exception === None)
+          }
+          listener.checkAsyncErrors()
+          listener.reset()
+          true
+        },
+
+        // Termination event generated with exception message when stopped with error
+        StartStream(ProcessingTime(100), triggerClock = clock),
+        AddData(inputData, 0),
+        AdvanceManualClock(100),
+        ExpectFailure[SparkException],
+        AssertOnQuery { query =>
+          assert(listener.terminationEvent !== null)
+          assert(listener.terminationEvent.id === query.id)
+          assert(listener.terminationEvent.exception.nonEmpty)
+          listener.checkAsyncErrors()
+          true
         }
-        offset
-      }
-
-      // Wait for manual clock to be 300 first time there is data
-      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-        clock.waitTillTime(300)
-        super.getBatch(start, end)
-      }
-    }
-
-    // This is to make sure thatquery waits for manual clock to be 600 first time there is data
-    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
-      clock.waitTillTime(600)
-      x
+      )
+    } finally {
+      spark.streams.removeListener(listener)
     }
-
-    testStream(mapped, OutputMode.Complete)(
-      StartStream(triggerClock = clock),
-      AddData(inputData, 1, 2),
-      AdvanceManualClock(100),  // unblock getOffset, will block on getBatch
-      AdvanceManualClock(200),  // unblock getBatch, will block on computation
-      AdvanceManualClock(300),  // unblock computation
-      AssertOnQuery { _ => clock.getTimeMillis() === 600 },
-      AssertOnLastQueryStatus { status: StreamingQueryStatus =>
-        // Check the correctness of the trigger info of the last completed batch reported by
-        // onQueryProgress
-        assert(status.triggerDetails.containsKey("batchId"))
-        assert(status.triggerDetails.get("isTriggerActive") === "false")
-        assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
-
-        assert(status.triggerDetails.get("timestamp.triggerStart") === "0")
-        assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100")
-        assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300")
-        assert(status.triggerDetails.get("timestamp.triggerFinish") === "600")
-
-        assert(status.triggerDetails.get("latency.getOffset.total") === "100")
-        assert(status.triggerDetails.get("latency.getBatch.total") === "200")
-        assert(status.triggerDetails.get("latency.optimizer") === "0")
-        assert(status.triggerDetails.get("latency.offsetLogWrite") === "0")
-        assert(status.triggerDetails.get("latency.fullTrigger") === "600")
-
-        assert(status.triggerDetails.get("numRows.input.total") === "2")
-        assert(status.triggerDetails.get("numRows.state.aggregation1.total") === "1")
-        assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
-
-        assert(status.sourceStatuses.length === 1)
-        assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
-        assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
-        assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
-        assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
-      },
-      CheckAnswer(2)
-    )
   }
 
   test("adding and removing listener") {
-    def isListenerActive(listener: QueryStatusCollector): Boolean = {
+    def isListenerActive(listener: EventCollector): Boolean = {
       listener.reset()
       testStream(MemoryStream[Int].toDS)(
         StartStream(),
         StopStream
       )
-      listener.startStatus != null
+      listener.startEvent != null
     }
 
     try {
-      val listener1 = new QueryStatusCollector
-      val listener2 = new QueryStatusCollector
+      val listener1 = new EventCollector
+      val listener2 = new EventCollector
 
       spark.streams.addListener(listener1)
       assert(isListenerActive(listener1) === true)
@@ -142,14 +144,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   }
 
   test("event ordering") {
-    val listener = new QueryStatusCollector
+    val listener = new EventCollector
     withListenerAdded(listener) {
       for (i <- 1 to 100) {
         listener.reset()
-        require(listener.startStatus === null)
+        require(listener.startEvent === null)
         testStream(MemoryStream[Int].toDS)(
           StartStream(),
-          Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
+          Assert(listener.startEvent !== null, "onQueryStarted not called before query returned"),
           StopStream,
           Assert { listener.checkAsyncErrors() }
         )
@@ -158,7 +160,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   }
 
   testQuietly("exception should be reported in QueryTerminated") {
-    val listener = new QueryStatusCollector
+    val listener = new EventCollector
     withListenerAdded(listener) {
       val input = MemoryStream[Int]
       testStream(input.toDS.map(_ / 0))(
@@ -167,49 +169,46 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
         ExpectFailure[SparkException](),
         Assert {
           spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-          assert(listener.terminationStatus !== null)
-          assert(listener.terminationException.isDefined)
+          assert(listener.terminationEvent !== null)
+          assert(listener.terminationEvent.exception.nonEmpty)
           // Make sure that the exception message reported through listener
           // contains the actual exception and relevant stack trace
-          assert(!listener.terminationException.get.contains("StreamingQueryException"))
-          assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
-          assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
+          assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+          assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+          assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
         }
       )
     }
   }
 
-  test("QueryStarted serialization") {
-    val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
+  test("QueryStartedEvent serialization") {
+    val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
     val json = JsonProtocol.sparkEventToJson(queryStarted)
     val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryStartedEvent]
-    assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus)
   }
 
-  test("QueryProgress serialization") {
-    val queryProcess = new StreamingQueryListener.QueryProgressEvent(
-      StreamingQueryStatus.testStatus)
-    val json = JsonProtocol.sparkEventToJson(queryProcess)
-    val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+  test("QueryProgressEvent serialization") {
+    val event = new StreamingQueryListener.QueryProgressEvent(
+      StreamingQueryProgressSuite.testProgress)
+    val json = JsonProtocol.sparkEventToJson(event)
+    val newEvent = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
-    assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus)
+    assert(event.progress.json === newEvent.progress.json)
   }
 
-  test("QueryTerminated serialization") {
+  test("QueryTerminatedEvent serialization") {
     val exception = new RuntimeException("exception")
     val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
-      StreamingQueryStatus.testStatus,
-      Some(exception.getMessage))
-    val json =
-      JsonProtocol.sparkEventToJson(queryQueryTerminated)
+      UUID.randomUUID, Some(exception.getMessage))
+    val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
     val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
-    assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus)
+    assert(queryQueryTerminated.id === newQueryTerminated.id)
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }
 
-  test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
     // query-event-logs-version-2.0.0.txt has all types of events generated by
     // Structured Streaming in Spark 2.0.0.
     // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
@@ -217,7 +216,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
   }
 
-  test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
     // query-event-logs-version-2.0.1.txt has all types of events generated by
     // Structured Streaming in Spark 2.0.1.
     // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
@@ -248,28 +247,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  private def assertStreamingQueryInfoEquals(
-      expected: StreamingQueryStatus,
-      actual: StreamingQueryStatus): Unit = {
-    assert(expected.name === actual.name)
-    assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
-    expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
-      case (expectedSource, actualSource) =>
-        assertSourceStatus(expectedSource, actualSource)
-    }
-    assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
-  }
-
-  private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = {
-    assert(expected.description === actual.description)
-    assert(expected.offsetDesc === actual.offsetDesc)
-  }
-
-  private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = {
-    assert(expected.description === actual.description)
-    assert(expected.offsetDesc === actual.offsetDesc)
-  }
-
   private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = {
     try {
       failAfter(streamingTimeout) {
@@ -287,9 +264,51 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     val listenerBus = spark.streams invokePrivate listenerBusMethod()
     listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
   }
-}
 
-object StreamingQueryListenerSuite {
-  // Singleton reference to clock that does not get serialized in task closures
-  @volatile var clock: ManualClock = null
+  /** Collects events from the StreamingQueryListener for testing */
+  class EventCollector extends StreamingQueryListener {
+    // to catch errors in the async listener events
+    @volatile private var asyncTestWaiter = new Waiter
+
+    @volatile var startEvent: QueryStartedEvent = null
+    @volatile var terminationEvent: QueryTerminatedEvent = null
+
+    private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+    def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized {
+      _progressEvents.filter(_.numInputRows > 0)
+    }
+
+    def reset(): Unit = {
+      startEvent = null
+      terminationEvent = null
+      _progressEvents.clear()
+      asyncTestWaiter = new Waiter
+    }
+
+    def checkAsyncErrors(): Unit = {
+      asyncTestWaiter.await(timeout(streamingTimeout))
+    }
+
+    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+      asyncTestWaiter {
+        startEvent = queryStarted
+      }
+    }
+
+    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+      asyncTestWaiter {
+        assert(startEvent != null, "onQueryProgress called before onQueryStarted")
+        _progressEvents.synchronized { _progressEvents += queryProgress.progress }
+      }
+    }
+
+    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
+      asyncTestWaiter {
+        assert(startEvent != null, "onQueryTerminated called before onQueryStarted")
+        terminationEvent = queryTerminated
+      }
+      asyncTestWaiter.dismiss()
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 41ffd56cf1..268b8ff7b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -62,7 +62,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
       assert(spark.streams.get(q1.id).eq(q1))
       assert(spark.streams.get(q2.id).eq(q2))
       assert(spark.streams.get(q3.id).eq(q3))
-      assert(spark.streams.get(-1) === null) // non-existent id
+      assert(spark.streams.get(java.util.UUID.randomUUID()) === null) // non-existent id
       q1.stop()
 
       assert(spark.streams.active.toSet === Set(q2, q3))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
new file mode 100644
index 0000000000..45d29f6b35
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
+
+
+class StreamingQueryProgressSuite extends SparkFunSuite {
+
+  test("prettyJson") {
+    val json = testProgress.prettyJson
+    assert(json ===
+      s"""
+        |{
+        |  "id" : "${testProgress.id.toString}",
+        |  "name" : "name",
+        |  "timestamp" : 1,
+        |  "numInputRows" : 678,
+        |  "inputRowsPerSecond" : 10.0,
+        |  "durationMs" : {
+        |    "total" : 0
+        |  },
+        |  "currentWatermark" : 3,
+        |  "stateOperators" : [ {
+        |    "numRowsTotal" : 0,
+        |    "numRowsUpdated" : 1
+        |  } ],
+        |  "sources" : [ {
+        |    "description" : "source",
+        |    "startOffset" : 123,
+        |    "endOffset" : 456,
+        |    "numInputRows" : 678,
+        |    "inputRowsPerSecond" : 10.0
+        |  } ],
+        |  "sink" : {
+        |    "description" : "sink"
+        |  }
+        |}
+      """.stripMargin.trim)
+    assert(compact(parse(json)) === testProgress.json)
+
+  }
+
+  test("json") {
+    assert(compact(parse(testProgress.json)) === testProgress.json)
+  }
+
+  test("toString") {
+    assert(testProgress.toString === testProgress.prettyJson)
+  }
+}
+
+object StreamingQueryProgressSuite {
+  val testProgress = new StreamingQueryProgress(
+    id = UUID.randomUUID(),
+    name = "name",
+    timestamp = 1L,
+    batchId = 2L,
+    durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+    currentWatermark = 3L,
+    stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
+    sources = Array(
+      new SourceProgress(
+        description = "source",
+        startOffset = "123",
+        endOffset = "456",
+        numInputRows = 678,
+        inputRowsPerSecond = 10.0,
+        processedRowsPerSecond = Double.PositiveInfinity  // should not be present in the json
+      )
+    ),
+    sink = new SinkProgress("sink")
+  )
+}
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
deleted file mode 100644
index 50a7d92ede..0000000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.SparkFunSuite
-
-class StreamingQueryStatusSuite extends SparkFunSuite {
-  test("toString") {
-    assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
-      """
-        |Status of source MySource1
-        |    Available offset: 0
-        |    Input rate: 15.5 rows/sec
-        |    Processing rate: 23.5 rows/sec
-        |    Trigger details:
-        |        numRows.input.source: 100
-        |        latency.getOffset.source: 10
-        |        latency.getBatch.source: 20
-      """.stripMargin.trim, "SourceStatus.toString does not match")
-
-    assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
-      """
-        |Status of sink MySink
-        |    Committed offsets: [1, -]
-      """.stripMargin.trim, "SinkStatus.toString does not match")
-
-    assert(StreamingQueryStatus.testStatus.toString ===
-      """
-        |Status of query 'query'
-        |    Query id: 1
-        |    Status timestamp: 123
-        |    Input rate: 15.5 rows/sec
-        |    Processing rate 23.5 rows/sec
-        |    Latency: 345.0 ms
-        |    Trigger details:
-        |        batchId: 5
-        |        isDataPresentInTrigger: true
-        |        isTriggerActive: true
-        |        latency.getBatch.total: 20
-        |        latency.getOffset.total: 10
-        |        numRows.input.total: 100
-        |    Source statuses [1 source]:
-        |        Source 1 - MySource1
-        |            Available offset: 0
-        |            Input rate: 15.5 rows/sec
-        |            Processing rate: 23.5 rows/sec
-        |            Trigger details:
-        |                numRows.input.source: 100
-        |                latency.getOffset.source: 10
-        |                latency.getBatch.source: 20
-        |    Sink status - MySink
-        |        Committed offsets: [1, -]
-      """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
-
-  }
-
-  test("json") {
-    assert(StreamingQueryStatus.testStatus.json ===
-      """
-        |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
-        |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
-        |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
-        |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
-        |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
-        |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
-        |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
-        |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
-      """.stripMargin.replace("\n", "").trim)
-  }
-
-  test("prettyJson") {
-    assert(
-      StreamingQueryStatus.testStatus.prettyJson ===
-        """
-          |{
-          |  "name" : "query",
-          |  "id" : 1,
-          |  "timestamp" : 123,
-          |  "inputRate" : 15.5,
-          |  "processingRate" : 23.5,
-          |  "latency" : 345.0,
-          |  "triggerDetails" : {
-          |    "latency.getBatch.total" : "20",
-          |    "numRows.input.total" : "100",
-          |    "isTriggerActive" : "true",
-          |    "batchId" : "5",
-          |    "latency.getOffset.total" : "10",
-          |    "isDataPresentInTrigger" : "true"
-          |  },
-          |  "sourceStatuses" : [ {
-          |    "description" : "MySource1",
-          |    "offsetDesc" : "0",
-          |    "inputRate" : 15.5,
-          |    "processingRate" : 23.5,
-          |    "triggerDetails" : {
-          |      "numRows.input.source" : "100",
-          |      "latency.getOffset.source" : "10",
-          |      "latency.getBatch.source" : "20"
-          |    }
-          |  } ],
-          |  "sinkStatus" : {
-          |    "description" : "MySink",
-          |    "offsetDesc" : "[1, -]"
-          |  }
-          |}
-        """.stripMargin.trim)
-  }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 8ecb33cf9d..4f3b4a2d75 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.streaming
 import org.scalactic.TolerantNumerics
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{ManualClock, Utils}
 
 
 class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
@@ -109,85 +110,139 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     )
   }
 
-  testQuietly("query statuses") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(6 / _)
-    testStream(mapped)(
-      AssertOnQuery(q => q.status.name === q.name),
-      AssertOnQuery(q => q.status.id === q.id),
-      AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
-      AssertOnQuery(_.status.inputRate === 0.0),
-      AssertOnQuery(_.status.processingRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString),
-      AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
-      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString),
+  testQuietly("query statuses and progresses") {
+    import StreamingQuerySuite._
+    clock = new StreamManualClock
+
+    /** Custom MemoryStream that waits for manual clock to reach a time */
+    val inputData = new MemoryStream[Int](0, sqlContext) {
+      // Wait for manual clock to be 100 first time there is data
+      override def getOffset: Option[Offset] = {
+        val offset = super.getOffset
+        if (offset.nonEmpty) {
+          clock.waitTillTime(300)
+        }
+        offset
+      }
 
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3),
-      AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
-      AssertOnQuery(_.status.inputRate >= 0.0),
-      AssertOnQuery(_.status.processingRate >= 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
-      AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(0)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json),
-      AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString),
+      // Wait for manual clock to be 300 first time there is data
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        clock.waitTillTime(600)
+        super.getBatch(start, end)
+      }
+    }
 
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3, 6, 3),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
-      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
+    // This is to make sure thatquery waits for manual clock to be 600 first time there is data
+    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+      clock.waitTillTime(1100)
+      x
+    }
 
-      StopStream,
-      AssertOnQuery(_.status.inputRate === 0.0),
-      AssertOnQuery(_.status.processingRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
-      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.status.triggerDetails.isEmpty),
+    case class AssertStreamExecThreadToWaitForClock()
+      extends AssertOnQuery(q => {
+        eventually(Timeout(streamingTimeout)) {
+          if (q.exception.isEmpty) {
+            assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+          }
+        }
+        if (q.exception.isDefined) {
+          throw q.exception.get
+        }
+        true
+      }, "")
+
+    testStream(mapped, OutputMode.Complete)(
+      StartStream(ProcessingTime(100), triggerClock = clock),
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      // TODO: test status.message before trigger has started
+      // AssertOnQuery(_.lastProgress === null)  // there is an empty trigger as soon as started
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while offset is being fetched
+      AddData(inputData, 1, 2),
+      AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === true),
+      AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while batch is being fetched
+      AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === true),
+      AssertOnQuery(_.status.message === "Processing new data"),
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while batch is being processed
+      AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === true),
+      AssertOnQuery(_.status.message === "Processing new data"),
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while batch processing has completed
+      AdvanceManualClock(500), // time = 1100 to unblock job
+      AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
+      CheckAnswer(2),
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
+      AssertOnQuery { query =>
+        assert(query.lastProgress != null)
+        assert(query.recentProgresses.exists(_.numInputRows > 0))
+        assert(query.recentProgresses.last.eq(query.lastProgress))
+
+        val progress = query.lastProgress
+        assert(progress.id === query.id)
+        assert(progress.name === query.name)
+        assert(progress.batchId === 0)
+        assert(progress.timestamp === 100)
+        assert(progress.numInputRows === 2)
+        assert(progress.processedRowsPerSecond === 2.0)
+
+        assert(progress.durationMs.get("getOffset") === 200)
+        assert(progress.durationMs.get("getBatch") === 300)
+        assert(progress.durationMs.get("queryPlanning") === 0)
+        assert(progress.durationMs.get("walCommit") === 0)
+        assert(progress.durationMs.get("triggerExecution") === 1000)
+
+        assert(progress.sources.length === 1)
+        assert(progress.sources(0).description contains "MemoryStream")
+        assert(progress.sources(0).startOffset === null)
+        assert(progress.sources(0).endOffset !== null)
+        assert(progress.sources(0).processedRowsPerSecond === 2.0)
+
+        assert(progress.stateOperators.length === 1)
+        assert(progress.stateOperators(0).numRowsUpdated === 1)
+        assert(progress.stateOperators(0).numRowsTotal === 1)
+
+        assert(progress.sink.description contains "MemorySink")
+        true
+      },
 
-      StartStream(),
-      AddData(inputData, 0),
-      ExpectFailure[SparkException],
-      AssertOnQuery(_.status.inputRate === 0.0),
-      AssertOnQuery(_.status.processingRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json),
-      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString)
+      AddData(inputData, 1, 2),
+      AdvanceManualClock(100), // allow another trigger
+      CheckAnswer(4),
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
+      AssertOnQuery { query =>
+        assert(query.recentProgresses.last.eq(query.lastProgress))
+        assert(query.lastProgress.batchId === 1)
+        assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
+        true
+      },
+
+      // Test status after data is not available for a trigger
+      AdvanceManualClock(100), // allow another trigger
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Waiting for next trigger")
     )
   }
 
@@ -196,7 +251,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
 
     /** Whether metrics of a query is registered for reporting */
     def isMetricsRegistered(query: StreamingQuery): Boolean = {
-      val sourceName = s"StructuredStreaming.${query.name}"
+      val sourceName = s"spark.streaming.${query.name}"
       val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
       require(sources.size <= 1)
       sources.nonEmpty
@@ -229,23 +284,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
 
     // Trigger input has 10 rows, static input has 2 rows,
     // therefore after the first trigger, the calculated input rows should be 10
-    val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, "value"))
-    assert(status.triggerDetails.get("numRows.input.total") === "10")
-    assert(status.sourceStatuses.size === 1)
-    assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10")
+    val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value"))
+    assert(progress.numInputRows === 10)
+    assert(progress.sources.size === 1)
+    assert(progress.sources(0).numInputRows === 10)
   }
 
-  test("input row calculation with trigger DF having multiple leaves") {
+  test("input row calculation with trigger input DF having multiple leaves") {
     val streamingTriggerDF =
       spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF)
     require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1)
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
 
     // After the first trigger, the calculated input rows should be 10
-    val status = getFirstTriggerStatus(streamingInputDF)
-    assert(status.triggerDetails.get("numRows.input.total") === "10")
-    assert(status.sourceStatuses.size === 1)
-    assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10")
+    val progress = getFirstProgress(streamingInputDF)
+    assert(progress.numInputRows === 10)
+    assert(progress.sources.size === 1)
+    assert(progress.sources(0).numInputRows === 10)
   }
 
   testQuietly("StreamExecution metadata garbage collection") {
@@ -285,34 +340,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     StreamingExecutionRelation(source)
   }
 
-  /** Returns the query status at the end of the first trigger of streaming DF */
-  private def getFirstTriggerStatus(streamingDF: DataFrame): StreamingQueryStatus = {
-    // A StreamingQueryListener that gets the query status after the first completed trigger
-    val listener = new StreamingQueryListener {
-      @volatile var firstStatus: StreamingQueryStatus = null
-      @volatile var queryStartedEvent = 0
-      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
-        queryStartedEvent += 1
-      }
-      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
-       if (firstStatus == null) firstStatus = queryProgress.queryStatus
-      }
-      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { }
-    }
-
+  /** Returns the query progress at the end of the first trigger of streaming DF */
+  private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = {
     try {
-      spark.streams.addListener(listener)
       val q = streamingDF.writeStream.format("memory").queryName("test").start()
       q.processAllAvailable()
-      eventually(timeout(streamingTimeout)) {
-        assert(listener.firstStatus != null)
-        // test if QueryStartedEvent callback is called for only once
-        assert(listener.queryStartedEvent === 1)
-      }
-      listener.firstStatus
+      q.recentProgresses.head
     } finally {
       spark.streams.active.map(_.stop())
-      spark.streams.removeListener(listener)
     }
   }
 
@@ -369,3 +404,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     }
   }
 }
+
+object StreamingQuerySuite {
+  // Singleton reference to clock that does not get serialized in task closures
+  var clock: ManualClock = null
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 3e9488c7dc..12f3c3e5ff 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -51,6 +51,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
 
 
   test("watermark metric") {
+
     val inputData = MemoryStream[Int]
 
     val windowedAggregation = inputData.toDF()
@@ -62,16 +63,19 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
 
     testStream(windowedAggregation)(
       AddData(inputData, 15),
-      AssertOnLastQueryStatus { status =>
-        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+      CheckAnswer(),
+      AssertOnQuery { query =>
+        query.lastProgress.currentWatermark === 5000
       },
       AddData(inputData, 15),
-      AssertOnLastQueryStatus { status =>
-        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000"
+      CheckAnswer(),
+      AssertOnQuery { query =>
+        query.lastProgress.currentWatermark === 5000
       },
       AddData(inputData, 25),
-      AssertOnLastQueryStatus { status =>
-        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "15000"
+      CheckAnswer(),
+      AssertOnQuery { query =>
+        query.lastProgress.currentWatermark === 15000
       }
     )
   }
-- 
GitLab