-
- Downloads
[SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter
## What changes were proposed in this pull request? SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with ``` 16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds +- LocalRelation [timestamp#7] at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74) ``` This PR fixes it by passing the correct query execution. ## How was this patch tested? New unit test Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15885 from tdas/SPARK-18440. (cherry picked from commit 1ae4652b) Signed-off-by:Michael Armbrust <michael@databricks.com>
Showing
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala 4 additions, 5 deletions...he/spark/sql/execution/datasources/FileFormatWriter.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala 1 addition, 1 deletion...ution/datasources/InsertIntoHadoopFsRelationCommand.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala 1 addition, 1 deletion...apache/spark/sql/execution/streaming/FileStreamSink.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala 73 additions, 5 deletions.../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
Loading
Please register or sign in to comment