From 55b8cfe6e6a6759d65bf219ff570fd6154197ec4 Mon Sep 17 00:00:00 2001
From: Mark Grover <mark@apache.org>
Date: Thu, 8 Jun 2017 09:55:43 -0700
Subject: [PATCH] [SPARK-19185][DSTREAM] Make Kafka consumer cache configurable

## What changes were proposed in this pull request?

Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior.

## How was this patch tested?
Running unit tests

Author: Mark Grover <mark@apache.org>
Author: Mark Grover <grover.markgrover@gmail.com>

Closes #18234 from markgrover/spark-19185.
---
 docs/streaming-kafka-0-10-integration.md                  | 4 +++-
 .../streaming/kafka010/DirectKafkaInputDStream.scala      | 8 +++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index 92c296a9e6..386066a857 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into buffers.  Therefore it i
 
 In most cases, you should use `LocationStrategies.PreferConsistent` as shown above.  This will distribute partitions evenly across available executors.  If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition.  Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
 
-The cache for consumers has a default maximum size of 64.  If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`
+The cache for consumers has a default maximum size of 64.  If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`.
+
+If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved.
 
 The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`.
 
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index 6d6983c4bd..9a4a1cf32a 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
       val fo = currentOffsets(tp)
       OffsetRange(tp.topic, tp.partition, fo, uo)
     }
-    val rdd = new KafkaRDD[K, V](
-      context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
+    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
+      true)
+    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
+      getPreferredHosts, useConsumerCache)
 
     // Report the record number and metadata of this batch interval to InputInfoTracker.
     val description = offsetRanges.filter { offsetRange =>
@@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
            b.map(OffsetRange(_)),
            getPreferredHosts,
            // during restore, it's possible same partition will be consumed from multiple
-           // threads, so dont use cache
+           // threads, so do not use cache.
            false
          )
       }
-- 
GitLab