Skip to content
Snippets Groups Projects
Commit fa3e4d8f authored by Hari Shreedharan's avatar Hari Shreedharan Committed by Tathagata Das
Browse files

[SPARK-11019] [STREAMING] [FLUME] Gracefully shutdown Flume receiver th…

…reads.

Wait for a minute for the receiver threads to shutdown before interrupting them.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #9041 from harishreedharan/flume-graceful-shutdown.
parent 8e67882b
No related branches found
No related tags found
No related merge requests found
...@@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume ...@@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{LinkedBlockingQueue, Executors} import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.reflect.ClassTag import scala.reflect.ClassTag
...@@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver( ...@@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver(
override def onStop(): Unit = { override def onStop(): Unit = {
logInfo("Shutting down Flume Polling Receiver") logInfo("Shutting down Flume Polling Receiver")
receiverExecutor.shutdownNow() receiverExecutor.shutdown()
// Wait upto a minute for the threads to die
if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
receiverExecutor.shutdownNow()
}
connections.asScala.foreach(_.transceiver.close()) connections.asScala.foreach(_.transceiver.close())
channelFactory.releaseExternalResources() channelFactory.releaseExternalResources()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment