-
- Downloads
[SPARK-15022][SPARK-15023][SQL][STREAMING] Add support for testing against the...
[SPARK-15022][SPARK-15023][SQL][STREAMING] Add support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `ManualClock` ## What changes were proposed in this pull request? Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`. We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`. This patch: - fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions; - adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action; - adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](https://github.com/apache/spark/pull/12725). ## How was this patch tested? N/A Author: Liwei Lin <lwlin7@gmail.com> Closes #12797 from lw-lin/add-trigger-test-support.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala 5 additions, 2 deletions...n/scala/org/apache/spark/sql/ContinuousQueryManager.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala 5 additions, 4 deletions...pache/spark/sql/execution/streaming/StreamExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala 7 additions, 2 deletions...pache/spark/sql/execution/streaming/TriggerExecutor.scala
- sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala 24 additions, 9 deletions...core/src/test/scala/org/apache/spark/sql/StreamTest.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala 17 additions, 2 deletions...sql/execution/streaming/ProcessingTimeExecutorSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala 1 addition, 1 deletion...org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala 4 additions, 4 deletions...rg/apache/spark/sql/streaming/FileStreamSourceSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 20 additions, 4 deletions...st/scala/org/apache/spark/sql/streaming/StreamSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala 3 additions, 3 deletions...pache/spark/sql/streaming/StreamingAggregationSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala 3 additions, 3 deletions.../apache/spark/sql/util/ContinuousQueryListenerSuite.scala
Loading
Please register or sign in to comment