From bc09a2b8c3b03a207a6e20627f2c5ec23c1efe8c Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Tue, 29 Nov 2016 23:08:56 -0800
Subject: [PATCH] [SPARK-18516][STRUCTURED STREAMING] Follow up PR to add
 StreamingQuery.status to Python

## What changes were proposed in this pull request?
- Add StreamingQueryStatus.json
- Make it not case class (to avoid unnecessarily exposing implicit object StreamingQueryStatus, consistent with StreamingQueryProgress)
- Add StreamingQuery.status to Python
- Fix post-termination status

## How was this patch tested?
New unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16075 from tdas/SPARK-18516-1.
---
 python/pyspark/sql/streaming.py               |  8 +++
 python/pyspark/sql/tests.py                   |  5 ++
 .../streaming/ProgressReporter.scala          |  5 +-
 .../execution/streaming/StreamExecution.scala |  4 ++
 .../sql/streaming/StreamingQueryStatus.scala  | 38 ++++++++++++--
 .../apache/spark/sql/streaming/progress.scala |  9 ++--
 .../StreamingQueryListenerSuite.scala         | 29 +++--------
 ...treamingQueryStatusAndProgressSuite.scala} | 34 ++++++++++---
 .../sql/streaming/StreamingQuerySuite.scala   | 49 +++++++++++++------
 9 files changed, 127 insertions(+), 54 deletions(-)
 rename sql/core/src/test/scala/org/apache/spark/sql/streaming/{StreamingQueryProgressSuite.scala => StreamingQueryStatusAndProgressSuite.scala} (75%)

diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index c420b0d016..84f01d3d9a 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -88,6 +88,14 @@ class StreamingQuery(object):
         else:
             return self._jsq.awaitTermination()
 
+    @property
+    @since(2.1)
+    def status(self):
+        """
+        Returns the current status of the query.
+        """
+        return json.loads(self._jsq.status().json())
+
     @property
     @since(2.1)
     def recentProgresses(self):
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 7151f95216..b7b2a5923c 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1097,9 +1097,14 @@ class SQLTests(ReusedPySparkTestCase):
             q.processAllAvailable()
             lastProgress = q.lastProgress
             recentProgresses = q.recentProgresses
+            status = q.status
             self.assertEqual(lastProgress['name'], q.name)
             self.assertEqual(lastProgress['id'], q.id)
             self.assertTrue(any(p == lastProgress for p in recentProgresses))
+            self.assertTrue(
+                "message" in status and
+                "isDataAvailable" in status and
+                "isTriggerActive" in status)
         finally:
             q.stop()
             shutil.rmtree(tmpPath)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index b7b6e1988e..ba77e7c7bf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -70,11 +70,12 @@ trait ProgressReporter extends Logging {
   private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
 
   @volatile
-  protected var currentStatus: StreamingQueryStatus =
-    StreamingQueryStatus(
+  protected var currentStatus: StreamingQueryStatus = {
+    new StreamingQueryStatus(
       message = "Initializing StreamExecution",
       isDataAvailable = false,
       isTriggerActive = false)
+  }
 
   /** Returns the current status of the query. */
   def status: StreamingQueryStatus = currentStatus
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index e4f31af35f..6d0e269d34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -238,8 +238,10 @@ class StreamExecution(
         updateStatusMessage("Waiting for next trigger")
         isTerminated
       })
+      updateStatusMessage("Stopped")
     } catch {
       case _: InterruptedException if state == TERMINATED => // interrupted by stop()
+        updateStatusMessage("Stopped")
       case e: Throwable =>
         streamDeathCause = new StreamingQueryException(
           this,
@@ -247,6 +249,7 @@ class StreamExecution(
           e,
           Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
         logError(s"Query $name terminated with error", e)
+        updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
         // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
         // handle them
         if (!NonFatal(e)) {
@@ -254,6 +257,7 @@ class StreamExecution(
         }
     } finally {
       state = TERMINATED
+      currentStatus = status.copy(isTriggerActive = false, isDataAvailable = false)
 
       // Update metrics and status
       sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index 4c1a7ce6a0..44befa0d2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.streaming
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 /**
  * Reports information about the instantaneous status of a streaming query.
  *
@@ -27,7 +32,32 @@ package org.apache.spark.sql.streaming
  *
  * @since 2.1.0
  */
-case class StreamingQueryStatus protected[sql](
-    message: String,
-    isDataAvailable: Boolean,
-    isTriggerActive: Boolean)
+class StreamingQueryStatus protected[sql](
+    val message: String,
+    val isDataAvailable: Boolean,
+    val isTriggerActive: Boolean) {
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def copy(
+      message: String = this.message,
+      isDataAvailable: Boolean = this.isDataAvailable,
+      isTriggerActive: Boolean = this.isTriggerActive): StreamingQueryStatus = {
+    new StreamingQueryStatus(
+      message = message,
+      isDataAvailable = isDataAvailable,
+      isTriggerActive = isTriggerActive)
+  }
+
+  private[sql] def jsonValue: JValue = {
+    ("message" -> JString(message.toString)) ~
+    ("isDataAvailable" -> JBool(isDataAvailable)) ~
+    ("isTriggerActive" -> JBool(isTriggerActive))
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 7129fa4d15..4c8247458f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -23,7 +23,6 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
-import org.apache.jute.compiler.JLong
 import org.json4s._
 import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
@@ -85,10 +84,10 @@ class StreamingQueryProgress private[sql](
   /** The aggregate (across all sources) rate at which Spark is processing data. */
   def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum
 
-  /** The compact JSON representation of this status. */
+  /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
 
-  /** The pretty (i.e. indented) JSON representation of this status. */
+  /** The pretty (i.e. indented) JSON representation of this progress. */
   def prettyJson: String = pretty(render(jsonValue))
 
   override def toString: String = prettyJson
@@ -179,10 +178,10 @@ class SourceProgress protected[sql](
 class SinkProgress protected[sql](
     val description: String) {
 
-  /** The compact JSON representation of this status. */
+  /** The compact JSON representation of this progress. */
   def json: String = compact(render(jsonValue))
 
-  /** The pretty (i.e. indented) JSON representation of this status. */
+  /** The pretty (i.e. indented) JSON representation of this progress. */
   def prettyJson: String = pretty(render(jsonValue))
 
   override def toString: String = prettyJson
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index c68f953b10..08b93e7d0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -106,6 +106,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
           assert(listener.terminationEvent !== null)
           assert(listener.terminationEvent.id === query.id)
           assert(listener.terminationEvent.exception.nonEmpty)
+          // Make sure that the exception message reported through listener
+          // contains the actual exception and relevant stack trace
+          assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+          assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+          assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
           listener.checkAsyncErrors()
           true
         }
@@ -159,28 +164,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  testQuietly("exception should be reported in QueryTerminated") {
-    val listener = new EventCollector
-    withListenerAdded(listener) {
-      val input = MemoryStream[Int]
-      testStream(input.toDS.map(_ / 0))(
-        StartStream(),
-        AddData(input, 1),
-        ExpectFailure[SparkException](),
-        Assert {
-          spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-          assert(listener.terminationEvent !== null)
-          assert(listener.terminationEvent.exception.nonEmpty)
-          // Make sure that the exception message reported through listener
-          // contains the actual exception and relevant stack trace
-          assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
-          assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
-          assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
-        }
-      )
-    }
-  }
-
   test("QueryStartedEvent serialization") {
     val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
     val json = JsonProtocol.sparkEventToJson(queryStarted)
@@ -190,7 +173,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   test("QueryProgressEvent serialization") {
     val event = new StreamingQueryListener.QueryProgressEvent(
-      StreamingQueryProgressSuite.testProgress)
+      StreamingQueryStatusAndProgressSuite.testProgress)
     val json = JsonProtocol.sparkEventToJson(event)
     val newEvent = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
similarity index 75%
rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 45d29f6b35..4da712fa0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -25,12 +25,12 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
+import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
 
 
-class StreamingQueryProgressSuite extends SparkFunSuite {
+class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
 
-  test("prettyJson") {
+  test("StreamingQueryProgress - prettyJson") {
     val json = testProgress.prettyJson
     assert(json ===
       s"""
@@ -64,16 +64,36 @@ class StreamingQueryProgressSuite extends SparkFunSuite {
 
   }
 
-  test("json") {
+  test("StreamingQueryProgress - json") {
     assert(compact(parse(testProgress.json)) === testProgress.json)
   }
 
-  test("toString") {
+  test("StreamingQueryProgress - toString") {
     assert(testProgress.toString === testProgress.prettyJson)
   }
+
+  test("StreamingQueryStatus - prettyJson") {
+    val json = testStatus.prettyJson
+    assert(json ===
+      """
+        |{
+        |  "message" : "active",
+        |  "isDataAvailable" : true,
+        |  "isTriggerActive" : false
+        |}
+      """.stripMargin.trim)
+  }
+
+  test("StreamingQueryStatus - json") {
+    assert(compact(parse(testStatus.json)) === testStatus.json)
+  }
+
+  test("StreamingQueryStatus - toString") {
+    assert(testStatus.toString === testStatus.prettyJson)
+  }
 }
 
-object StreamingQueryProgressSuite {
+object StreamingQueryStatusAndProgressSuite {
   val testProgress = new StreamingQueryProgress(
     id = UUID.randomUUID(),
     name = "name",
@@ -94,5 +114,7 @@ object StreamingQueryProgressSuite {
     ),
     sink = new SinkProgress("sink")
   )
+
+  val testStatus = new StreamingQueryStatus("active", true, false)
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4f3b4a2d75..56abe1201c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -77,7 +77,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     q2.stop()
   }
 
-  testQuietly("lifecycle states and awaitTermination") {
+  testQuietly("isActive, exception, and awaitTermination") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map { 6 / _}
 
@@ -110,7 +110,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     )
   }
 
-  testQuietly("query statuses and progresses") {
+  testQuietly("status, lastProgress, and recentProgresses") {
     import StreamingQuerySuite._
     clock = new StreamManualClock
 
@@ -133,10 +133,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     }
 
     // This is to make sure thatquery waits for manual clock to be 600 first time there is data
-    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
+    val mapped = inputData.toDS().as[Long].map { x =>
       clock.waitTillTime(1100)
-      x
-    }
+      10 / x
+    }.agg(count("*")).as[Long]
 
     case class AssertStreamExecThreadToWaitForClock()
       extends AssertOnQuery(q => {
@@ -151,25 +151,26 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
         true
       }, "")
 
+    var lastProgressBeforeStop: StreamingQueryProgress = null
+
     testStream(mapped, OutputMode.Complete)(
       StartStream(ProcessingTime(100), triggerClock = clock),
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
-      // TODO: test status.message before trigger has started
-      // AssertOnQuery(_.lastProgress === null)  // there is an empty trigger as soon as started
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while offset is being fetched
+      // Test status and progress while offset is being fetched
       AddData(inputData, 1, 2),
       AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === true),
-      AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")),
+      AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while batch is being fetched
+      // Test status and progress while batch is being fetched
       AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === true),
@@ -177,14 +178,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while batch is being processed
+      // Test status and progress while batch is being processed
       AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
       AssertOnQuery(_.status.isDataAvailable === true),
       AssertOnQuery(_.status.isTriggerActive === true),
       AssertOnQuery(_.status.message === "Processing new data"),
       AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
 
-      // Test status while batch processing has completed
+      // Test status and progress while batch processing has completed
       AdvanceManualClock(500), // time = 1100 to unblock job
       AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
       CheckAnswer(2),
@@ -237,12 +238,32 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
         true
       },
 
-      // Test status after data is not available for a trigger
+      // Test status and progress after data is not available for a trigger
       AdvanceManualClock(100), // allow another trigger
       AssertStreamExecThreadToWaitForClock(),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
-      AssertOnQuery(_.status.message === "Waiting for next trigger")
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
+
+      // Test status and progress after query stopped
+      AssertOnQuery { query =>
+        lastProgressBeforeStop = query.lastProgress
+        true
+      },
+      StopStream,
+      AssertOnQuery(_.lastProgress.json === lastProgressBeforeStop.json),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Stopped"),
+
+      // Test status and progress after query terminated with error
+      StartStream(ProcessingTime(100), triggerClock = clock),
+      AddData(inputData, 0),
+      AdvanceManualClock(100),
+      ExpectFailure[SparkException],
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message.startsWith("Terminated with exception"))
     )
   }
 
-- 
GitLab