diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index 8a177077775c632a4b78572fbfc822ba880440ea..1000094e93cb35fead6e8e304cc64309fdedcb8d 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -95,8 +95,13 @@ class DirectKafkaInputDStream[
 
     val effectiveRateLimitPerPartition = estimatedRateLimit
       .filter(_ > 0)
-      .map(limit => Math.min(maxRateLimitPerPartition, (limit / numPartitions)))
-      .getOrElse(maxRateLimitPerPartition)
+      .map { limit =>
+        if (maxRateLimitPerPartition > 0) {
+          Math.min(maxRateLimitPerPartition, (limit / numPartitions))
+        } else {
+          limit / numPartitions
+        }
+      }.getOrElse(maxRateLimitPerPartition)
 
     if (effectiveRateLimitPerPartition > 0) {
       val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000