-
- Downloads
[SPARK-18790][SS] Keep a general offset history of stream batches
## What changes were proposed in this pull request? Instead of only keeping the minimum number of offsets around, we should keep enough information to allow us to roll back n batches and reexecute the stream starting from a given point. In particular, we should create a config in SQLConf, spark.sql.streaming.retainedBatches that defaults to 100 and ensure that we keep enough log files in the following places to roll back the specified number of batches: the offsets that are present in each batch versions of the state store the files lists stored for the FileStreamSource the metadata log stored by the FileStreamSink marmbrus zsxwing ## How was this patch tested? The following tests were added. ### StreamExecution offset metadata Test added to StreamingQuerySuite that ensures offset metadata is garbage collected according to minBatchesRetain ### CompactibleFileStreamLog Tests added in CompactibleFileStreamLogSuite to ensure that logs are purged starting before the first compaction file that proceeds the current batch id - minBatchesToRetain. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <tcondie@gmail.com> Closes #16219 from tcondie/offset_hist.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala 44 additions, 25 deletions...rk/sql/execution/streaming/CompactibleFileStreamLog.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala 7 additions, 3 deletions...pache/spark/sql/execution/streaming/StreamExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala 0 additions, 1 deletion...cution/streaming/state/HDFSBackedStateStoreProvider.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala 3 additions, 1 deletion.../spark/sql/execution/streaming/state/StateStoreConf.scala
- sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 8 additions, 9 deletions...rc/main/scala/org/apache/spark/sql/internal/SQLConf.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLogSuite.scala 14 additions, 2 deletions...l/execution/streaming/CompactibleFileStreamLogSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala 44 additions, 4 deletions...park/sql/execution/streaming/FileStreamSinkLogSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala 4 additions, 1 deletion...spark/sql/execution/streaming/state/StateStoreSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala 46 additions, 18 deletions.../org/apache/spark/sql/streaming/StreamingQuerySuite.scala
Loading
Please register or sign in to comment