From c6a4e3d96997bf166360524a95510b3490b68b49 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Mon, 5 Dec 2016 14:59:42 -0800 Subject: [PATCH] [SPARK-18694][SS] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException (branch 2.1) ## What changes were proposed in this pull request? Backport #16125 to branch 2.1. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16153 from zsxwing/SPARK-18694-2.1. --- project/MimaExcludes.scala | 9 +++- python/pyspark/sql/streaming.py | 40 ++++++++++++++++++ python/pyspark/sql/tests.py | 29 +++++++++++++ .../execution/streaming/StreamExecution.scala | 5 ++- .../streaming/StreamingQueryException.scala | 42 ++++++++++++------- .../apache/spark/sql/streaming/progress.scala | 7 ++++ .../spark/sql/streaming/StreamTest.scala | 2 - .../sql/streaming/StreamingQuerySuite.scala | 10 +++-- 8 files changed, 119 insertions(+), 25 deletions(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9739164332..9e6325432c 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -97,7 +97,14 @@ object MimaExcludes { // [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness. ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"), ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"), - ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=") + ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="), + + // [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query") ) } diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 84f01d3d9a..4a7d17ba51 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -30,6 +30,7 @@ from pyspark import since, keyword_only from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.readwriter import OptionUtils, to_str from pyspark.sql.types import * +from pyspark.sql.utils import StreamingQueryException __all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"] @@ -132,6 +133,45 @@ class StreamingQuery(object): """ self._jsq.stop() + @since(2.1) + def explain(self, extended=False): + """Prints the (logical and physical) plans to the console for debugging purpose. + + :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. + + >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start() + >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans. + >>> sq.explain() + == Physical Plan == + ... + >>> sq.explain(True) + == Parsed Logical Plan == + ... + == Analyzed Logical Plan == + ... + == Optimized Logical Plan == + ... + == Physical Plan == + ... + >>> sq.stop() + """ + # Cannot call `_jsq.explain(...)` because it will print in the JVM process. + # We should print it in the Python process. + print(self._jsq.explainInternal(extended)) + + @since(2.1) + def exception(self): + """ + :return: the StreamingQueryException if the query was terminated by an exception, or None. + """ + if self._jsq.exception().isDefined(): + je = self._jsq.exception().get() + msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info + stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace())) + return StreamingQueryException(msg, stackTrace) + else: + return None + class StreamingQueryManager(object): """A class to manage all the :class:`StreamingQuery` StreamingQueries active. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 0aff9cebe9..9f34414f64 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1137,6 +1137,35 @@ class SQLTests(ReusedPySparkTestCase): q.stop() shutil.rmtree(tmpPath) + def test_stream_exception(self): + sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming') + sq = sdf.writeStream.format('memory').queryName('query_explain').start() + try: + sq.processAllAvailable() + self.assertEqual(sq.exception(), None) + finally: + sq.stop() + + from pyspark.sql.functions import col, udf + from pyspark.sql.utils import StreamingQueryException + bad_udf = udf(lambda x: 1 / 0) + sq = sdf.select(bad_udf(col("value")))\ + .writeStream\ + .format('memory')\ + .queryName('this_query')\ + .start() + try: + # Process some data to fail the query + sq.processAllAvailable() + self.fail("bad udf should fail the query") + except StreamingQueryException as e: + # This is expected + self.assertTrue("ZeroDivisionError" in e.desc) + finally: + sq.stop() + self.assertTrue(type(sq.exception()) is StreamingQueryException) + self.assertTrue("ZeroDivisionError" in sq.exception().desc) + def test_query_manager_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for q in self.spark._wrapped.streams.active: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8804c647a7..6b1c01ab2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -93,7 +93,7 @@ class StreamExecution( * once, since the field's value may change at any time. */ @volatile - protected var availableOffsets = new StreamProgress + var availableOffsets = new StreamProgress /** The current batchId or -1 if execution has not yet been initialized. */ protected var currentBatchId: Long = -1 @@ -263,7 +263,8 @@ class StreamExecution( this, s"Query $name terminated with exception: ${e.getMessage}", e, - Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json))) + committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString, + availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString) logError(s"Query $name terminated with error", e) updateStatusMessage(s"Terminated with exception: ${e.getMessage}") // Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala index 13f11ba1c9..a96150aa89 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala @@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut * :: Experimental :: * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception * that caused the failure. - * @param query Query that caused the exception * @param message Message of this exception * @param cause Internal cause of this exception - * @param startOffset Starting offset (if known) of the range of data in which exception occurred - * @param endOffset Ending offset (if known) of the range of data in exception occurred + * @param startOffset Starting offset in json of the range of data in which exception occurred + * @param endOffset Ending offset in json of the range of data in exception occurred * @since 2.0.0 */ @Experimental -class StreamingQueryException private[sql]( - @transient val query: StreamingQuery, +class StreamingQueryException private( + causeString: String, val message: String, val cause: Throwable, - val startOffset: Option[OffsetSeq] = None, - val endOffset: Option[OffsetSeq] = None) + val startOffset: String, + val endOffset: String) extends Exception(message, cause) { + private[sql] def this( + query: StreamingQuery, + message: String, + cause: Throwable, + startOffset: String, + endOffset: String) { + this( + // scalastyle:off + s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")} + | + |${query.asInstanceOf[StreamExecution].toDebugString} + """.stripMargin, + // scalastyle:on + message, + cause, + startOffset, + endOffset) + } + /** Time when the exception occurred */ val time: Long = System.currentTimeMillis - override def toString(): String = { - val causeStr = - s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" - s""" - |$causeStr - | - |${query.asInstanceOf[StreamExecution].toDebugString} - """.stripMargin - } + override def toString(): String = causeString } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 4c8247458f..fb5bad0123 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long) { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index a2629f7f68..4332265129 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { eventually("microbatch thread not stopped after termination with failure") { assert(!currentStream.microBatchThread.isAlive) } - verify(thrownException.query.eq(currentStream), - s"incorrect query reference in exception") verify(currentStream.exception === Some(thrownException), s"incorrect exception returned by query.exception()") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 56abe1201c..f7fc19494d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { TestAwaitTermination(ExpectException[SparkException]), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), - AssertOnQuery( - q => q.exception.get.startOffset.get.offsets === - q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets, - "incorrect start offset on exception") + AssertOnQuery(q => { + q.exception.get.startOffset === + q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString && + q.exception.get.endOffset === + q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString + }, "incorrect start offset or end offset on exception") ) } -- GitLab