-
- Downloads
[SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more...
[SPARK-9556] [SPARK-9619] [SPARK-9624] [STREAMING] Make BlockGenerator more robust and make all BlockGenerators subscribe to rate limit updates In some receivers, instead of using the default `BlockGenerator` in `ReceiverSupervisorImpl`, custom generator with their custom listeners are used for reliability (see [`ReliableKafkaReceiver`](https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala#L99) and [updated `KinesisReceiver`](https://github.com/apache/spark/pull/7825/files)). These custom generators do not receive rate updates. This PR modifies the code to allow custom `BlockGenerator`s to be created through the `ReceiverSupervisorImpl` so that they can be kept track and rate updates can be applied. In the process, I did some simplification, and de-flaki-fication of some rate controller related tests. In particular. - Renamed `Receiver.executor` to `Receiver.supervisor` (to match `ReceiverSupervisor`) - Made `RateControllerSuite` faster (by increasing batch interval) and less flaky - Changed a few internal API to return the current rate of block generators as Long instead of Option\[Long\] (was inconsistent at places). - Updated existing `ReceiverTrackerSuite` to test that custom block generators get rate updates as well. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #7913 from tdas/SPARK-9556 and squashes the following commits: 41d4461 [Tathagata Das] fix scala style eb9fd59 [Tathagata Das] Updated kinesis receiver d24994d [Tathagata Das] Updated BlockGeneratorSuite to use manual clock in BlockGenerator d70608b [Tathagata Das] Updated BlockGenerator with states and proper synchronization f6bd47e [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-9556 31da173 [Tathagata Das] Fix bug 12116df [Tathagata Das] Add BlockGeneratorSuite 74bd069 [Tathagata Das] Fix style 989bb5c [Tathagata Das] Made BlockGenerator fail is used after stop, and added better unit tests for it 3ff618c [Tathagata Das] Fix test b40eff8 [Tathagata Das] slight refactoring f0df0f1 [Tathagata Das] Scala style fixes 51759cb [Tathagata Das] Refactored rate controller tests and added the ability to update rate of any custom block generator
Showing
- core/src/main/scala/org/apache/spark/util/ManualClock.scala 1 addition, 1 deletioncore/src/main/scala/org/apache/spark/util/ManualClock.scala
- external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala 1 addition, 1 deletion.../apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
- extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala 1 addition, 1 deletion.../org/apache/spark/streaming/kinesis/KinesisReceiver.scala
- streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala 4 additions, 4 deletions...a/org/apache/spark/streaming/receiver/ActorReceiver.scala
- streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala 97 additions, 34 deletions.../org/apache/spark/streaming/receiver/BlockGenerator.scala
- streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala 1 addition, 2 deletions...ala/org/apache/spark/streaming/receiver/RateLimiter.scala
- streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala 27 additions, 25 deletions.../scala/org/apache/spark/streaming/receiver/Receiver.scala
- streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala 22 additions, 5 deletions.../apache/spark/streaming/receiver/ReceiverSupervisor.scala
- streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala 25 additions, 8 deletions...che/spark/streaming/receiver/ReceiverSupervisorImpl.scala
- streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 7 additions, 9 deletions...st/scala/org/apache/spark/streaming/CheckpointSuite.scala
- streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala 5 additions, 26 deletions...test/scala/org/apache/spark/streaming/ReceiverSuite.scala
- streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala 253 additions, 0 deletions...apache/spark/streaming/receiver/BlockGeneratorSuite.scala
- streaming/src/test/scala/org/apache/spark/streaming/scheduler/RateControllerSuite.scala 26 additions, 38 deletions...pache/spark/streaming/scheduler/RateControllerSuite.scala
- streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala 64 additions, 65 deletions...ache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
Loading
Please register or sign in to comment