diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 3b936d88abd3ed2b6135cd10e374ad2e9204819b..6737750c3d63eb931aeee8fcb1e609cfe87b263e 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress -import java.util.concurrent.{LinkedBlockingQueue, Executors} +import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit} import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver( override def onStop(): Unit = { logInfo("Shutting down Flume Polling Receiver") - receiverExecutor.shutdownNow() + receiverExecutor.shutdown() + // Wait upto a minute for the threads to die + if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + receiverExecutor.shutdownNow() + } connections.asScala.foreach(_.transceiver.close()) channelFactory.releaseExternalResources() }