-
- Downloads
[SPARK-20883][SPARK-20376][SS] Refactored StateStore APIs and added conf to choose implementation
## What changes were proposed in this pull request? A bunch of changes to the StateStore APIs and implementation. Current state store API has a bunch of problems that causes too many transient objects causing memory pressure. - `StateStore.get(): Option` forces creation of Some/None objects for every get. Changed this to return the row or null. - `StateStore.iterator(): (UnsafeRow, UnsafeRow)` forces creation of new tuple for each record returned. Changed this to return a UnsafeRowTuple which can be reused across records. - `StateStore.updates()` requires the implementation to keep track of updates, while this is used minimally (only by Append mode in streaming aggregations). Removed updates() and updated StateStoreSaveExec accordingly. - `StateStore.filter(condition)` and `StateStore.remove(condition)` has been merge into a single API `getRange(start, end)` which allows a state store to do optimized range queries (i.e. avoid full scans). Stateful operators have been updated accordingly. - Removed a lot of unnecessary row copies Each operator copied rows before calling StateStore.put() even if the implementation does not require it to be copied. It is left up to the implementation on whether to copy the row or not. Additionally, - Added a name to the StateStoreId so that each operator+partition can use multiple state stores (different names) - Added a configuration that allows the user to specify which implementation to use. - Added new metrics to understand the time taken to update keys, remove keys and commit all changes to the state store. These metrics will be visible on the plan diagram in the SQL tab of the UI. - Refactored unit tests such that they can be reused to test any implementation of StateStore. ## How was this patch tested? Old and new unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #18107 from tdas/SPARK-20376.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 11 additions, 0 deletions...rc/main/scala/org/apache/spark/sql/internal/SQLConf.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala 23 additions, 16 deletions.../sql/execution/streaming/FlatMapGroupsWithStateExec.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala 73 additions, 145 deletions...cution/streaming/state/HDFSBackedStateStoreProvider.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala 119 additions, 44 deletions...ache/spark/sql/execution/streaming/state/StateStore.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala 23 additions, 5 deletions.../spark/sql/execution/streaming/state/StateStoreConf.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala 7 additions, 4 deletions...e/spark/sql/execution/streaming/state/StateStoreRDD.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala 10 additions, 1 deletion.../apache/spark/sql/execution/streaming/state/package.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala 105 additions, 37 deletions...che/spark/sql/execution/streaming/statefulOperators.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala 23 additions, 18 deletions...rk/sql/execution/streaming/state/StateStoreRDDSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala 247 additions, 287 deletions...spark/sql/execution/streaming/state/StateStoreSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala 9 additions, 31 deletions...che/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 45 additions, 0 deletions...st/scala/org/apache/spark/sql/streaming/StreamSuite.scala
Loading
Please register or sign in to comment