Skip to content
Snippets Groups Projects
Commit 55b8cfe6 authored by Mark Grover's avatar Mark Grover Committed by Marcelo Vanzin
Browse files

[SPARK-19185][DSTREAM] Make Kafka consumer cache configurable

## What changes were proposed in this pull request?

Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior.

## How was this patch tested?
Running unit tests

Author: Mark Grover <mark@apache.org>
Author: Mark Grover <grover.markgrover@gmail.com>

Closes #18234 from markgrover/spark-19185.
parent b771fed7
No related branches found
No related tags found
No related merge requests found
...@@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i ...@@ -91,7 +91,9 @@ The new Kafka consumer API will pre-fetch messages into buffers. Therefore it i
In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location). In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity` The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`.
If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved.
The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`. The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`.
......
...@@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V]( ...@@ -213,8 +213,10 @@ private[spark] class DirectKafkaInputDStream[K, V](
val fo = currentOffsets(tp) val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo) OffsetRange(tp.topic, tp.partition, fo, uo)
} }
val rdd = new KafkaRDD[K, V]( val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true) true)
val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
getPreferredHosts, useConsumerCache)
// Report the record number and metadata of this batch interval to InputInfoTracker. // Report the record number and metadata of this batch interval to InputInfoTracker.
val description = offsetRanges.filter { offsetRange => val description = offsetRanges.filter { offsetRange =>
...@@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( ...@@ -316,7 +318,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
b.map(OffsetRange(_)), b.map(OffsetRange(_)),
getPreferredHosts, getPreferredHosts,
// during restore, it's possible same partition will be consumed from multiple // during restore, it's possible same partition will be consumed from multiple
// threads, so dont use cache // threads, so do not use cache.
false false
) )
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment