-
- Downloads
[SPARK-20792][SS] Support same timeout operations in mapGroupsWithState...
[SPARK-20792][SS] Support same timeout operations in mapGroupsWithState function in batch queries as in streaming queries ## What changes were proposed in this pull request? Currently, in the batch queries, timeout is disabled (i.e. GroupStateTimeout.NoTimeout) which means any GroupState.setTimeout*** operation would throw UnsupportedOperationException. This makes it weird when converting a streaming query into a batch query by changing the input DF from streaming to a batch DF. If the timeout was enabled and used, then the batch query will start throwing UnsupportedOperationException. This PR creates the dummy state in batch queries with the provided timeoutConf so that it behaves in the same way. The code has been refactored to make it obvious when the state is being created for a batch query or a streaming query. ## How was this patch tested? Additional tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #18024 from tdas/SPARK-20792.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 3 additions, 2 deletions...cala/org/apache/spark/sql/execution/SparkStrategies.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 5 additions, 1 deletion...c/main/scala/org/apache/spark/sql/execution/objects.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala 1 addition, 1 deletion.../sql/execution/streaming/FlatMapGroupsWithStateExec.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/GroupStateImpl.scala 22 additions, 20 deletions...apache/spark/sql/execution/streaming/GroupStateImpl.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala 85 additions, 28 deletions...che/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
Loading
Please register or sign in to comment