-
- Downloads
[SPARK-11932][STREAMING] Partition previous TrackStateRDD if partitioner not present
The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). While #9983 solves SPARK-12004 by preserving the partitioner through RDD checkpoints, there may be a non-zero chance that the saving and recovery fails. To be resilient, this PR repartitions the previous state RDD if the partitioner is not detected. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9988 from tdas/SPARK-11932.
Showing
- streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 1 addition, 1 deletion...rc/main/scala/org/apache/spark/streaming/Checkpoint.scala
- streaming/src/main/scala/org/apache/spark/streaming/dstream/TrackStateDStream.scala 27 additions, 12 deletions...rg/apache/spark/streaming/dstream/TrackStateDStream.scala
- streaming/src/main/scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala 25 additions, 4 deletions.../scala/org/apache/spark/streaming/rdd/TrackStateRDD.scala
- streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 138 additions, 51 deletions...st/scala/org/apache/spark/streaming/CheckpointSuite.scala
- streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 6 additions, 0 deletions...test/scala/org/apache/spark/streaming/TestSuiteBase.scala
- streaming/src/test/scala/org/apache/spark/streaming/TrackStateByKeySuite.scala 61 additions, 16 deletions...ala/org/apache/spark/streaming/TrackStateByKeySuite.scala
Loading
Please register or sign in to comment