From 2daca62cd342203694f22232ceb026dcaf56d3d5 Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Thu, 3 Nov 2016 14:43:25 -0700
Subject: [PATCH] [SPARK-18212][SS][KAFKA] increase executor poll timeout

## What changes were proposed in this pull request?

Increase poll timeout to try and address flaky test

## How was this patch tested?

Ran existing unit tests

Author: cody koeninger <cody@koeninger.org>

Closes #15737 from koeninger/SPARK-18212.

(cherry picked from commit 67659c9afaeb2289e56fd87fafee953e8f050383)
Signed-off-by: Michael Armbrust <michael@databricks.com>
---
 .../scala/org/apache/spark/sql/kafka010/KafkaSource.scala    | 5 ++++-
 .../scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala | 3 ++-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 61cba737d1..b21508cd7e 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -88,7 +88,10 @@ private[kafka010] case class KafkaSource(
 
   private val sc = sqlContext.sparkContext
 
-  private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+    "kafkaConsumer.pollTimeoutMs",
+    sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
 
   private val maxOffsetFetchAttempts =
     sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
index 5b5a9ac48c..98394251bb 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
@@ -66,7 +66,8 @@ private[spark] class KafkaRDD[K, V](
       " must be set to false for executor kafka params, else offsets may commit before processing")
 
   // TODO is it necessary to have separate configs for initial poll time vs ongoing poll time?
-  private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms", 512)
+  private val pollTimeout = conf.getLong("spark.streaming.kafka.consumer.poll.ms",
+    conf.getTimeAsMs("spark.network.timeout", "120s"))
   private val cacheInitialCapacity =
     conf.getInt("spark.streaming.kafka.consumer.cache.initialCapacity", 16)
   private val cacheMaxCapacity =
-- 
GitLab