From fa3e4d8f52995bf632e7eda60dbb776c9f637546 Mon Sep 17 00:00:00 2001
From: Hari Shreedharan <hshreedharan@apache.org>
Date: Thu, 8 Oct 2015 18:50:27 -0700
Subject: [PATCH] =?UTF-8?q?[SPARK-11019]=20[STREAMING]=20[FLUME]=20Gracefu?=
 =?UTF-8?q?lly=20shutdown=20Flume=20receiver=20th=E2=80=A6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

…reads.

Wait for a minute for the receiver threads to shutdown before interrupting them.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #9041 from harishreedharan/flume-graceful-shutdown.
---
 .../spark/streaming/flume/FlumePollingInputDStream.scala  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 3b936d88ab..6737750c3d 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume
 
 
 import java.net.InetSocketAddress
-import java.util.concurrent.{LinkedBlockingQueue, Executors}
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver(
 
   override def onStop(): Unit = {
     logInfo("Shutting down Flume Polling Receiver")
-    receiverExecutor.shutdownNow()
+    receiverExecutor.shutdown()
+    // Wait upto a minute for the threads to die
+    if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+      receiverExecutor.shutdownNow()
+    }
     connections.asScala.foreach(_.transceiver.close())
     channelFactory.releaseExternalResources()
   }
-- 
GitLab