diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 7c6745ac8285a2c01674e2b1fd96b7494e0bee8a..a057d1d36c5a839c99b24706e8f86e3824f3ca1b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -84,7 +84,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { CheckAnswer(10, 5), AssertOnQuery { query => assert(listener.progressEvents.nonEmpty) - assert(listener.progressEvents.last.json === query.lastProgress.json) + // SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter + // out non-zero input rows, but the lastProgress may be a zero input row trigger + val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption + .getOrElse(fail("No progress updates received in StreamingQuery!")) + assert(listener.progressEvents.last.json === lastNonZeroProgress.json) assert(listener.terminationEvent === null) true }, @@ -109,14 +113,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AdvanceManualClock(100), ExpectFailure[SparkException], AssertOnQuery { query => - assert(listener.terminationEvent !== null) - assert(listener.terminationEvent.id === query.id) - assert(listener.terminationEvent.exception.nonEmpty) - // Make sure that the exception message reported through listener - // contains the actual exception and relevant stack trace - assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) - assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.exception.nonEmpty) + // Make sure that the exception message reported through listener + // contains the actual exception and relevant stack trace + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert( + listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) + } listener.checkAsyncErrors() true }