-
- Downloads
[SPARK-21145][SS] Added StateStoreProviderId with queryRunId to reload...
[SPARK-21145][SS] Added StateStoreProviderId with queryRunId to reload StateStoreProviders when query is restarted ## What changes were proposed in this pull request? StateStoreProvider instances are loaded on-demand in a executor when a query is started. When a query is restarted, the loaded provider instance will get reused. Now, there is a non-trivial chance, that the task of the previous query run is still running, while the tasks of the restarted run has started. So for a stateful partition, there may be two concurrent tasks related to the same stateful partition, and there for using the same provider instance. This can lead to inconsistent results and possibly random failures, as state store implementations are not designed to be thread-safe. To fix this, I have introduced a `StateStoreProviderId`, that unique identifies a provider loaded in an executor. It has the query run id in it, thus making sure that restarted queries will force the executor to load a new provider instance, thus avoiding two concurrent tasks (from two different runs) from reusing the same provider instance. Additional minor bug fixes - All state stores related to query run is marked as deactivated in the `StateStoreCoordinator` so that the executors can unload them and clear resources. - Moved the code that determined the checkpoint directory of a state store from implementation-specific code (`HDFSBackedStateStoreProvider`) to non-specific code (StateStoreId), so that implementation do not accidentally get it wrong. - Also added store name to the path, to support multiple stores per sql operator partition. *Note:* This change does not address the scenario where two tasks of the same run (e.g. speculative tasks) are concurrently running in the same executor. The chance of this very small, because ideally speculative tasks should never run in the same executor. ## How was this patch tested? Existing unit tests + new unit test. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #18355 from tdas/SPARK-21145.
Showing
- sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala 1 addition, 1 deletion...a/org/apache/spark/sql/execution/aggregate/AggUtils.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 4 additions, 1 deletion...ala/org/apache/spark/sql/execution/command/commands.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala 2 additions, 5 deletions.../sql/execution/streaming/FlatMapGroupsWithStateExec.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala 14 additions, 13 deletions.../spark/sql/execution/streaming/IncrementalExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala 1 addition, 0 deletions...pache/spark/sql/execution/streaming/StreamExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala 8 additions, 8 deletions...cution/streaming/state/HDFSBackedStateStoreProvider.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala 67 additions, 24 deletions...ache/spark/sql/execution/streaming/state/StateStore.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala 24 additions, 17 deletions...sql/execution/streaming/state/StateStoreCoordinator.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala 16 additions, 5 deletions...e/spark/sql/execution/streaming/state/StateStoreRDD.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala 9 additions, 16 deletions.../apache/spark/sql/execution/streaming/state/package.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala 16 additions, 22 deletions...che/spark/sql/execution/streaming/statefulOperators.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala 1 addition, 0 deletions...rg/apache/spark/sql/streaming/StreamingQueryManager.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala 54 additions, 7 deletions...xecution/streaming/state/StateStoreCoordinatorSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala 29 additions, 22 deletions...rk/sql/execution/streaming/state/StateStoreRDDSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala 73 additions, 20 deletions...spark/sql/execution/streaming/state/StateStoreSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 1 addition, 1 deletion...st/scala/org/apache/spark/sql/streaming/StreamSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 9 additions, 4 deletions...est/scala/org/apache/spark/sql/streaming/StreamTest.scala
Loading
Please register or sign in to comment