From 8ce60963cb0928058ef7b6e29ff94eb69d1143af Mon Sep 17 00:00:00 2001
From: cody koeninger <cody@koeninger.org>
Date: Wed, 12 Aug 2015 17:44:16 -0700
Subject: [PATCH] =?UTF-8?q?[SPARK-9780]=20[STREAMING]=20[KAFKA]=20prevent?=
 =?UTF-8?q?=20NPE=20if=20KafkaRDD=20instantiation=20=E2=80=A6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

…fails

Author: cody koeninger <cody@koeninger.org>

Closes #8133 from koeninger/SPARK-9780 and squashes the following commits:

406259d [cody koeninger] [SPARK-9780][Streaming][Kafka] prevent NPE if KafkaRDD instantiation fails
---
 .../scala/org/apache/spark/streaming/kafka/KafkaRDD.scala   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index 1a9d78c0d4..ea5f842c6c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -197,7 +197,11 @@ class KafkaRDD[
         .dropWhile(_.offset < requestOffset)
     }
 
-    override def close(): Unit = consumer.close()
+    override def close(): Unit = {
+      if (consumer != null) {
+        consumer.close()
+      }
+    }
 
     override def getNext(): R = {
       if (iter == null || !iter.hasNext) {
-- 
GitLab