From b424dc947be8ea7230bfdf7f66976fbf63c85f85 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Tue, 15 Nov 2016 15:12:30 -0800 Subject: [PATCH] [SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter ## What changes were proposed in this pull request? SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with ``` 16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds +- LocalRelation [timestamp#7] at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) ``` This PR fixes it by passing the correct query execution. ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15885 from tdas/SPARK-18440. (cherry picked from commit 1ae4652b7e1f77a984b8459c778cb06c814192c5) Signed-off-by: Michael Armbrust <michael@databricks.com> --- .../datasources/FileFormatWriter.scala | 9 +-- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../execution/streaming/FileStreamSink.scala | 2 +- .../sql/streaming/FileStreamSinkSuite.scala | 78 +++++++++++++++++-- 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index edcce103d0..a9f79da635 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter} +import org.apache.spark.sql.execution.{QueryExecution, SQLExecution, UnsafeKVExternalSorter} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter @@ -85,7 +85,7 @@ object FileFormatWriter extends Logging { */ def write( sparkSession: SparkSession, - plan: LogicalPlan, + queryExecution: QueryExecution, fileFormat: FileFormat, committer: FileCommitProtocol, outputSpec: OutputSpec, @@ -101,8 +101,7 @@ object FileFormatWriter extends Logging { FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) val partitionSet = AttributeSet(partitionColumns) - val dataColumns = plan.output.filterNot(partitionSet.contains) - val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution + val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains) // Note: prepareWrite has side effect. It sets "job". val outputWriterFactory = @@ -112,7 +111,7 @@ object FileFormatWriter extends Logging { uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = plan.output, + allColumns = queryExecution.logical.output, partitionColumns = partitionColumns, nonPartitionColumns = dataColumns, bucketSpec = bucketSpec, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 28975e1546..a9bde903b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -100,7 +100,7 @@ case class InsertIntoHadoopFsRelationCommand( FileFormatWriter.write( sparkSession = sparkSession, - plan = query, + queryExecution = Dataset.ofRows(sparkSession, query).queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala index f1c5f9ab50..0dbe2a71ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala @@ -77,7 +77,7 @@ class FileStreamSink( FileFormatWriter.write( sparkSession = sparkSession, - plan = data.logicalPlan, + queryExecution = data.queryExecution, fileFormat = fileFormat, committer = committer, outputSpec = FileFormatWriter.OutputSpec(path, Map.empty), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index fa97d9292e..09613ef9e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -21,13 +21,14 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.apache.spark.util.Utils class FileStreamSinkSuite extends StreamTest { import testImplicits._ - test("FileStreamSink - unpartitioned writing and batch reading") { + test("unpartitioned writing and batch reading") { val inputData = MemoryStream[Int] val df = inputData.toDF() @@ -59,7 +60,7 @@ class FileStreamSinkSuite extends StreamTest { } } - test("FileStreamSink - partitioned writing and batch reading") { + test("partitioned writing and batch reading") { val inputData = MemoryStream[Int] val ds = inputData.toDS() @@ -142,16 +143,83 @@ class FileStreamSinkSuite extends StreamTest { } } - test("FileStreamSink - parquet") { + // This tests whether FileStreamSink works with aggregations. Specifically, it tests + // whether the the correct streaming QueryExecution (i.e. IncrementalExecution) is used to + // to execute the trigger for writing data to file sink. See SPARK-18440 for more details. + test("writing with aggregation") { + + // Since FileStreamSink currently only supports append mode, we will test FileStreamSink + // with aggregations using event time windows and watermark, which allows + // aggregation + append mode. + val inputData = MemoryStream[Long] + val inputDF = inputData.toDF.toDF("time") + val outputDf = inputDF + .selectExpr("CAST(time AS timestamp) AS timestamp") + .withWatermark("timestamp", "10 seconds") + .groupBy(window($"timestamp", "5 seconds")) + .count() + .select("window.start", "window.end", "count") + + val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath + val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath + + var query: StreamingQuery = null + + try { + query = + outputDf.writeStream + .option("checkpointLocation", checkpointDir) + .format("parquet") + .start(outputDir) + + + def addTimestamp(timestampInSecs: Int*): Unit = { + inputData.addData(timestampInSecs.map(_ * 1L): _*) + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + + def check(expectedResult: ((Long, Long), Long)*): Unit = { + val outputDf = spark.read.parquet(outputDir) + .selectExpr( + "CAST(start as BIGINT) AS start", + "CAST(end as BIGINT) AS end", + "count") + checkDataset( + outputDf.as[(Long, Long, Long)], + expectedResult.map(x => (x._1._1, x._1._2, x._2)): _*) + } + + addTimestamp(100) // watermark = None before this, watermark = 100 - 10 = 90 after this + check() // nothing emitted yet + + addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this + check() // nothing emitted yet + + addTimestamp(140) // wm = 113 before this, emit results on 100-105, wm = 130 after this + check((100L, 105L) -> 2L) + + addTimestamp(150) // wm = 130s before this, emit results on 120-125, wm = 150 after this + check((100L, 105L) -> 2L, (120L, 125L) -> 1L) + + } finally { + if (query != null) { + query.stop() + } + } + } + + test("parquet") { testFormat(None) // should not throw error as default format parquet when not specified testFormat(Some("parquet")) } - test("FileStreamSink - text") { + test("text") { testFormat(Some("text")) } - test("FileStreamSink - json") { + test("json") { testFormat(Some("json")) } -- GitLab