From bc6c56e940fe93591a1e5ba45751f1b243b57e28 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu <shixiong@databricks.com> Date: Tue, 10 Jan 2017 17:58:11 -0800 Subject: [PATCH] [SPARK-19140][SS] Allow update mode for non-aggregation streaming queries ## What changes were proposed in this pull request? This PR allow update mode for non-aggregation streaming queries. It will be same as the append mode if a query has no aggregations. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16520 from zsxwing/update-without-agg. --- .../structured-streaming-programming-guide.md | 4 +-- python/pyspark/sql/streaming.py | 27 +++++++++----- .../spark/sql/streaming/OutputMode.java | 3 +- .../UnsupportedOperationChecker.scala | 2 +- .../streaming/InternalOutputModes.scala | 4 +-- .../analysis/UnsupportedOperationsSuite.scala | 31 ++++++++-------- .../sql/streaming/DataStreamWriter.scala | 18 ++++------ .../execution/streaming/MemorySinkSuite.scala | 35 ++++++++++++------- 8 files changed, 72 insertions(+), 52 deletions(-) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 52dbbc830a..b816072cb8 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. - - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. + - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode. Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes). @@ -977,7 +977,7 @@ Here is the compatibility matrix. </tr> <tr> <td colspan="2" style="vertical-align: middle;">Queries without aggregation</td> - <td style="vertical-align: middle;">Append</td> + <td style="vertical-align: middle;">Append, Update</td> <td style="vertical-align: middle;"> Complete mode not supported as it is infeasible to keep all data in the Result Table. </td> diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 5014299ad2..a10b185cd4 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -665,6 +665,9 @@ class DataStreamWriter(object): the sink * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink every time these is some updates + * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be + written to the sink every time there are some updates. If the query doesn't contain + aggregations, it will be equivalent to `append` mode. .. note:: Experimental. @@ -768,7 +771,8 @@ class DataStreamWriter(object): @ignore_unicode_prefix @since(2.0) - def start(self, path=None, format=None, partitionBy=None, queryName=None, **options): + def start(self, path=None, format=None, outputMode=None, partitionBy=None, queryName=None, + **options): """Streams the contents of the :class:`DataFrame` to a data source. The data source is specified by the ``format`` and a set of ``options``. @@ -779,15 +783,20 @@ class DataStreamWriter(object): :param path: the path in a Hadoop supported file system :param format: the format used to save - - * ``append``: Append contents of this :class:`DataFrame` to existing data. - * ``overwrite``: Overwrite existing data. - * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. + :param outputMode: specifies how data of a streaming DataFrame/Dataset is written to a + streaming sink. + + * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to the + sink + * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + * `update`:only the rows that were updated in the streaming DataFrame/Dataset will be + written to the sink every time there are some updates. If the query doesn't contain + aggregations, it will be equivalent to `append` mode. :param partitionBy: names of partitioning columns :param queryName: unique name for the query :param options: All other string options. You may want to provide a `checkpointLocation` - for most streams, however it is not required for a `memory` stream. + for most streams, however it is not required for a `memory` stream. >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.isActive @@ -798,7 +807,7 @@ class DataStreamWriter(object): >>> sq.isActive False >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( - ... queryName='that_query', format='memory') + ... queryName='that_query', outputMode="append", format='memory') >>> sq.name u'that_query' >>> sq.isActive @@ -806,6 +815,8 @@ class DataStreamWriter(object): >>> sq.stop() """ self.options(**options) + if outputMode is not None: + self.outputMode(outputMode) if partitionBy is not None: self.partitionBy(partitionBy) if format is not None: diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java index cf0579fd36..3f7cdb293e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -57,7 +57,8 @@ public class OutputMode { /** * OutputMode in which only the rows that were updated in the streaming DataFrame/Dataset will - * be written to the sink every time there are some updates. + * be written to the sink every time there are some updates. If the query doesn't contain + * aggregations, it will be equivalent to `Append` mode. * * @since 2.1.1 */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 053c8eb617..c2666b2ab9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -73,7 +73,7 @@ object UnsupportedOperationChecker { s"streaming DataFrames/DataSets")(plan) } - case InternalOutputModes.Complete | InternalOutputModes.Update if aggregates.isEmpty => + case InternalOutputModes.Complete if aggregates.isEmpty => throwError( s"$outputMode output mode not supported when there are no streaming aggregations on " + s"streaming DataFrames/Datasets")(plan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala index 915f4a9e25..351bd6fff4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalOutputModes.scala @@ -40,8 +40,8 @@ private[sql] object InternalOutputModes { /** * OutputMode in which only the rows in the streaming DataFrame/Dataset that were updated will be - * written to the sink every time these is some updates. This output mode can only be used in - * queries that contain aggregations. + * written to the sink every time these is some updates. If the query doesn't contain + * aggregations, it will be equivalent to `Append` mode. */ case object Update extends OutputMode } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index d2c0f8cc9f..58e69f9ebe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -219,9 +219,9 @@ class UnsupportedOperationsSuite extends SparkFunSuite { "window", Window(Nil, Nil, Nil, _), expectedMsg = "non-time-based windows") // Output modes with aggregation and non-aggregation plans - testOutputMode(Append, shouldSupportAggregation = false) - testOutputMode(Update, shouldSupportAggregation = true) - testOutputMode(Complete, shouldSupportAggregation = true) + testOutputMode(Append, shouldSupportAggregation = false, shouldSupportNonAggregation = true) + testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true) + testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false) /* ======================================================================================= @@ -323,30 +323,33 @@ class UnsupportedOperationsSuite extends SparkFunSuite { /** Test output mode with and without aggregation in the streaming plan */ def testOutputMode( outputMode: OutputMode, - shouldSupportAggregation: Boolean): Unit = { + shouldSupportAggregation: Boolean, + shouldSupportNonAggregation: Boolean): Unit = { // aggregation if (shouldSupportAggregation) { - assertNotSupportedInStreamingPlan( - s"$outputMode output mode - no aggregation", - streamRelation.where($"a" > 1), - outputMode = outputMode, - Seq("aggregation", s"$outputMode output mode")) - assertSupportedInStreamingPlan( s"$outputMode output mode - aggregation", streamRelation.groupBy("a")("count(*)"), outputMode = outputMode) - } else { + assertNotSupportedInStreamingPlan( + s"$outputMode output mode - aggregation", + streamRelation.groupBy("a")("count(*)"), + outputMode = outputMode, + Seq("aggregation", s"$outputMode output mode")) + } + + // non aggregation + if (shouldSupportNonAggregation) { assertSupportedInStreamingPlan( s"$outputMode output mode - no aggregation", streamRelation.where($"a" > 1), outputMode = outputMode) - + } else { assertNotSupportedInStreamingPlan( - s"$outputMode output mode - aggregation", - streamRelation.groupBy("a")("count(*)"), + s"$outputMode output mode - no aggregation", + streamRelation.where($"a" > 1), outputMode = outputMode, Seq("aggregation", s"$outputMode output mode")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index abb00ce02e..7e7a1ba223 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -44,6 +44,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * written to the sink * - `OutputMode.Complete()`: all the rows in the streaming DataFrame/Dataset will be written * to the sink every time these is some updates + * - `OutputMode.Update()`: only the rows that were updated in the streaming DataFrame/Dataset + * will be written to the sink every time there are some updates. If + * the query doesn't contain aggregations, it will be equivalent to + * `OutputMode.Append()` mode. * * @since 2.0.0 */ @@ -58,7 +62,9 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { * the sink * - `complete`: all the rows in the streaming DataFrame/Dataset will be written to the sink * every time these is some updates - * + * - `update`: only the rows that were updated in the streaming DataFrame/Dataset will + * be written to the sink every time there are some updates. If the query doesn't + * contain aggregations, it will be equivalent to `append` mode. * @since 2.0.0 */ def outputMode(outputMode: String): DataStreamWriter[T] = { @@ -220,16 +226,6 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (extraOptions.get("queryName").isEmpty) { throw new AnalysisException("queryName must be specified for memory sink") } - val supportedModes = "Output modes supported by the memory sink are 'append' and 'complete'." - outputMode match { - case Append | Complete => // allowed - case Update => - throw new AnalysisException( - s"Update output mode is not supported for memory sink. $supportedModes") - case _ => - throw new AnalysisException( - s"$outputMode is not supported for memory sink. $supportedModes") - } val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala index ca724fc5cc..8f23f98f76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkSuite.scala @@ -137,7 +137,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { } - test("registering as a table in Append output mode - supported") { + test("registering as a table in Append output mode") { val input = MemoryStream[Int] val query = input.toDF().writeStream .format("memory") @@ -160,7 +160,7 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - test("registering as a table in Complete output mode - supported") { + test("registering as a table in Complete output mode") { val input = MemoryStream[Int] val query = input.toDF() .groupBy("value") @@ -186,18 +186,27 @@ class MemorySinkSuite extends StreamTest with BeforeAndAfter { query.stop() } - test("registering as a table in Update output mode - not supported") { + test("registering as a table in Update output mode") { val input = MemoryStream[Int] - val df = input.toDF() - .groupBy("value") - .count() - intercept[AnalysisException] { - df.writeStream - .format("memory") - .outputMode("update") - .queryName("memStream") - .start() - } + val query = input.toDF().writeStream + .format("memory") + .outputMode("update") + .queryName("memStream") + .start() + input.addData(1, 2, 3) + query.processAllAvailable() + + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3) + + input.addData(4, 5, 6) + query.processAllAvailable() + checkDataset( + spark.table("memStream").as[Int], + 1, 2, 3, 4, 5, 6) + + query.stop() } test("MemoryPlan statistics") { -- GitLab