Skip to content
Snippets Groups Projects
Commit d9c25dec authored by cody koeninger's avatar cody koeninger Committed by Tathagata Das
Browse files

[SPARK-9786] [STREAMING] [KAFKA] fix backpressure so it works with defa…

…ult maxRatePerPartition setting of 0

Author: cody koeninger <cody@koeninger.org>

Closes #8413 from koeninger/backpressure-testing-master.
parent 5175ca0c
No related branches found
No related tags found
No related merge requests found
......@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
.getOrElse(maxRateLimitPerPartition)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
limit / numPartitions
}
}.getOrElse(maxRateLimitPerPartition)
if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment