-
- Downloads
[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations for branch-2.1
This is a follow up PR for merging #16758 to spark 2.1 branch ## What changes were proposed in this pull request? `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* ``` // ------------ New methods on KeyValueGroupedDataset ------------ class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // ------------------- New Java-friendly function classes ------------------- public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable { Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } // ---------------------- Wrapper class for state data ---------------------- trait KeyedState[S] { def exists(): Boolean def get(): S // throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } ``` Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* ``` val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] ``` ## How was this patch tested? New unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16850 from tdas/mapWithState-branch-2.1.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala 10 additions, 1 deletion...k/sql/catalyst/analysis/UnsupportedOperationChecker.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala 49 additions, 0 deletions.../org/apache/spark/sql/catalyst/plans/logical/object.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala 21 additions, 3 deletions...rk/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
- sql/core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsWithStateFunction.java 38 additions, 0 deletions...ark/api/java/function/FlatMapGroupsWithStateFunction.java
- sql/core/src/main/java/org/apache/spark/api/java/function/MapGroupsWithStateFunction.java 38 additions, 0 deletions...e/spark/api/java/function/MapGroupsWithStateFunction.java
- sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 113 additions, 0 deletions...n/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
- sql/core/src/main/scala/org/apache/spark/sql/KeyedState.scala 142 additions, 0 deletions...core/src/main/scala/org/apache/spark/sql/KeyedState.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 20 additions, 1 deletion...cala/org/apache/spark/sql/execution/SparkStrategies.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 22 additions, 0 deletions...c/main/scala/org/apache/spark/sql/execution/objects.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala 14 additions, 5 deletions.../spark/sql/execution/streaming/IncrementalExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/KeyedStateImpl.scala 80 additions, 0 deletions...apache/spark/sql/execution/streaming/KeyedStateImpl.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala 1 addition, 1 deletion...ache/spark/sql/execution/streaming/ProgressReporter.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala 19 additions, 0 deletions...cution/streaming/state/HDFSBackedStateStoreProvider.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala 5 additions, 0 deletions...ache/spark/sql/execution/streaming/state/StateStore.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala 10 additions, 1 deletion.../apache/spark/sql/execution/streaming/state/package.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala 110 additions, 24 deletions...che/spark/sql/execution/streaming/statefulOperators.scala
- sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java 32 additions, 0 deletions...test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
- sql/core/src/test/scala/org/apache/spark/sql/streaming/MapGroupsWithStateSuite.scala 335 additions, 0 deletions.../apache/spark/sql/streaming/MapGroupsWithStateSuite.scala
Loading
Please register or sign in to comment