From d9c25dec87e6da7d66a47ff94e7eefa008081b9d Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Mon, 24 Aug 2015 23:26:14 -0700
Subject: [PATCH] =?UTF-8?q?[SPARK-9786]=20[STREAMING]=20[KAFKA]=20fix=20ba?=
 =?UTF-8?q?ckpressure=20so=20it=20works=20with=20defa=E2=80=A6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

…ult maxRatePerPartition setting of 0

Author: cody koeninger <cody@koeninger.org>

Closes #8413 from koeninger/backpressure-testing-master.
---
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala  | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a17707777..1000094e93 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
     val effectiveRateLimitPerPartition = estimatedRateLimit
       .filter(_ > 0)
-      .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
-      .getOrElse(maxRateLimitPerPartition)
+      .map { limit =>
+        if (maxRateLimitPerPartition > 0) {
+          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+        } else {
+          limit / numPartitions
+        }
+      }.getOrElse(maxRateLimitPerPartition)
 
     if (effectiveRateLimitPerPartition > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
-- 
GitLab