From c4de90fc76d5aa5d2c8fee4ed692d4ab922cbab0 Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Wed, 14 Dec 2016 13:36:41 -0800
Subject: [PATCH] [SPARK-18852][SS] StreamingQuery.lastProgress should be null
 when recentProgress is empty

## What changes were proposed in this pull request?

Right now `StreamingQuery.lastProgress` throws NoSuchElementException and it's hard to be used in Python since Python user will just see Py4jError.

This PR just makes it return null instead.

## How was this patch tested?

`test("lastProgress should be null when recentProgress is empty")`

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16273 from zsxwing/SPARK-18852.

(cherry picked from commit 1ac6567bdb03d7cc5c5f3473827a102280cb1030)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
---
 python/pyspark/sql/streaming.py               |  9 ++++++--
 python/pyspark/sql/tests.py                   | 18 +++++++++++++++-
 .../streaming/ProgressReporter.scala          |  4 ++--
 .../StreamingQueryManagerSuite.scala          |  9 +++-----
 .../sql/streaming/StreamingQuerySuite.scala   | 21 ++++++++++++++++++-
 ...faultSource.scala => BlockingSource.scala} | 10 +++++++--
 6 files changed, 57 insertions(+), 14 deletions(-)
 rename sql/core/src/test/scala/org/apache/spark/sql/streaming/util/{DefaultSource.scala => BlockingSource.scala} (92%)

diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9cfb3fe25c..eabd5ef54c 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -125,10 +125,15 @@ class StreamingQuery(object):
     @since(2.1)
     def lastProgress(self):
         """
-        Returns the most recent :class:`StreamingQueryProgress` update of this streaming query.
+        Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
+        None if there were no progress updates
         :return: a map
         """
-        return json.loads(self._jsq.lastProgress().json())
+        lastProgress = self._jsq.lastProgress()
+        if lastProgress:
+            return json.loads(lastProgress.json())
+        else:
+            return None
 
     @since(2.0)
     def processAllAvailable(self):
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 66320bd050..115b4a9bef 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1119,9 +1119,25 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertTrue(df.isStreaming)
         out = os.path.join(tmpPath, 'out')
         chk = os.path.join(tmpPath, 'chk')
-        q = df.writeStream \
+
+        def func(x):
+            time.sleep(1)
+            return x
+
+        from pyspark.sql.functions import col, udf
+        sleep_udf = udf(func)
+
+        # Use "sleep_udf" to delay the progress update so that we can test `lastProgress` when there
+        # were no updates.
+        q = df.select(sleep_udf(col("value")).alias('value')).writeStream \
             .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
         try:
+            # "lastProgress" will return None in most cases. However, as it may be flaky when
+            # Jenkins is very slow, we don't assert it. If there is something wrong, "lastProgress"
+            # may throw error with a high chance and make this test flaky, so we should still be
+            # able to detect broken codes.
+            q.lastProgress
+
             q.processAllAvailable()
             lastProgress = q.lastProgress
             recentProgress = q.recentProgress
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 549b93694d..e40135fdd7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -100,9 +100,9 @@ trait ProgressReporter extends Logging {
     progressBuffer.toArray
   }
 
-  /** Returns the most recent query progress update. */
+  /** Returns the most recent query progress update or null if there were no progress updates. */
   def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
-    progressBuffer.last
+    progressBuffer.lastOption.orNull
   }
 
   /** Begins recording statistics about query progress for a given trigger. */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index d188319fe3..1742a5474c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -32,6 +32,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark.SparkException
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.Utils
 
 class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
@@ -217,7 +218,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
 
   test("SPARK-18811: Source resolution should not block main thread") {
     failAfter(streamingTimeout) {
-      StreamingQueryManagerSuite.latch = new CountDownLatch(1)
+      BlockingSource.latch = new CountDownLatch(1)
       withTempDir { tempDir =>
         // if source resolution was happening on the main thread, it would block the start call,
         // now it should only be blocking the stream execution thread
@@ -231,7 +232,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
         eventually(Timeout(streamingTimeout)) {
           assert(sq.status.message.contains("Initializing sources"))
         }
-        StreamingQueryManagerSuite.latch.countDown()
+        BlockingSource.latch.countDown()
         sq.stop()
       }
     }
@@ -321,7 +322,3 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
     (inputData, mapped)
   }
 }
-
-object StreamingQueryManagerSuite {
-  var latch: CountDownLatch = null
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index afd788ce3d..b052bd9e6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.streaming
 
-import scala.collection.JavaConverters._
+import java.util.concurrent.CountDownLatch
 
 import org.apache.commons.lang3.RandomStringUtils
 import org.scalactic.TolerantNumerics
@@ -32,6 +32,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.util.BlockingSource
 import org.apache.spark.util.ManualClock
 
 
@@ -312,6 +313,24 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
     )
   }
 
+  test("lastProgress should be null when recentProgress is empty") {
+    BlockingSource.latch = new CountDownLatch(1)
+    withTempDir { tempDir =>
+      val sq = spark.readStream
+        .format("org.apache.spark.sql.streaming.util.BlockingSource")
+        .load()
+        .writeStream
+        .format("org.apache.spark.sql.streaming.util.BlockingSource")
+        .option("checkpointLocation", tempDir.toString)
+        .start()
+      // Creating source is blocked so recentProgress is empty and lastProgress should be null
+      assert(sq.lastProgress === null)
+      // Release the latch and stop the query
+      BlockingSource.latch.countDown()
+      sq.stop()
+    }
+  }
+
   test("codahale metrics") {
     val inputData = MemoryStream[Int]
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
similarity index 92%
rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
index b0adf76814..19ab2ff13e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/DefaultSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/util/BlockingSource.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.streaming.util
 
+import java.util.concurrent.CountDownLatch
+
 import org.apache.spark.sql.{SQLContext, _}
 import org.apache.spark.sql.execution.streaming.{LongOffset, Offset, Sink, Source}
 import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
-import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryManagerSuite}
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 
 /** Dummy provider: returns a SourceProvider with a blocking `createSource` call. */
@@ -42,7 +44,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
       schema: Option[StructType],
       providerName: String,
       parameters: Map[String, String]): Source = {
-    StreamingQueryManagerSuite.latch.await()
+    BlockingSource.latch.await()
     new Source {
       override def schema: StructType = fakeSchema
       override def getOffset: Option[Offset] = Some(new LongOffset(0))
@@ -64,3 +66,7 @@ class BlockingSource extends StreamSourceProvider with StreamSinkProvider {
     }
   }
 }
+
+object BlockingSource {
+  var latch: CountDownLatch = null
+}
-- 
GitLab