Skip to content
Snippets Groups Projects
Commit 2daca62c authored by cody koeninger's avatar cody koeninger Committed by Michael Armbrust
Browse files

[SPARK-18212][SS][KAFKA] increase executor poll timeout


## What changes were proposed in this pull request?

Increase poll timeout to try and address flaky test

## How was this patch tested?

Ran existing unit tests

Author: cody koeninger <cody@koeninger.org>

Closes #15737 from koeninger/SPARK-18212.

(cherry picked from commit 67659c9a)
Signed-off-by: default avatarMichael Armbrust <michael@databricks.com>
parent 569f77a1
No related branches found
No related tags found
No related merge requests found
......@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
private val sc = sqlContext.sparkContext
private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
private val pollTimeoutMs = sourceOptions.getOrElse(
"kafkaConsumer.pollTimeoutMs",
sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
).toLong
private val maxOffsetFetchAttempts =
sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
......
......@@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V](
" must be set to false for executor kafka params, else offsets may commit before processing")
// TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
conf.getTimeAsMs("spark.network.timeout", "120s"))
private val cacheInitialCapacity =
conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
private val cacheMaxCapacity =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment