Skip to content
Snippets Groups Projects
Commit d68ea24d authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-7776] [STREAMING] Added shutdown hook to StreamingContext

Shutdown hook to stop SparkContext was added recently. This results in ugly errors when a streaming application is terminated by ctrl-C.

```
Exception in thread "Thread-27" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:736)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:735)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:735)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1468)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1403)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1642)
	at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:559)
	at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2266)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2236)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1764)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2236)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2236)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2218)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
```

This is because the Spark's shutdown hook stops the context, and the streaming jobs fail in the middle. The correct solution is to stop the streaming context before the spark context. This PR adds the shutdown hook to do so with a priority higher than the SparkContext's shutdown hooks priority.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6307 from tdas/SPARK-7776 and squashes the following commits:

e3d5475 [Tathagata Das] Added conf to specify graceful shutdown
4c18652 [Tathagata Das] Added shutdown hook to StreamingContxt.
parent 347b5010
No related branches found
No related tags found
No related merge requests found
...@@ -42,7 +42,7 @@ import org.apache.spark.streaming.dstream._ ...@@ -42,7 +42,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.CallSite import org.apache.spark.util.{CallSite, Utils}
/** /**
* Main entry point for Spark Streaming functionality. It provides methods used to create * Main entry point for Spark Streaming functionality. It provides methods used to create
...@@ -201,6 +201,8 @@ class StreamingContext private[streaming] ( ...@@ -201,6 +201,8 @@ class StreamingContext private[streaming] (
private val startSite = new AtomicReference[CallSite](null) private val startSite = new AtomicReference[CallSite](null)
private var shutdownHookRef: AnyRef = _
/** /**
* Return the associated Spark context * Return the associated Spark context
*/ */
...@@ -584,6 +586,8 @@ class StreamingContext private[streaming] ( ...@@ -584,6 +586,8 @@ class StreamingContext private[streaming] (
state = StreamingContextState.ACTIVE state = StreamingContextState.ACTIVE
StreamingContext.setActiveContext(this) StreamingContext.setActiveContext(this)
} }
shutdownHookRef = Utils.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
logInfo("StreamingContext started") logInfo("StreamingContext started")
case ACTIVE => case ACTIVE =>
logWarning("StreamingContext has already been started") logWarning("StreamingContext has already been started")
...@@ -660,6 +664,9 @@ class StreamingContext private[streaming] ( ...@@ -660,6 +664,9 @@ class StreamingContext private[streaming] (
uiTab.foreach(_.detach()) uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null) StreamingContext.setActiveContext(null)
waiter.notifyStop() waiter.notifyStop()
if (shutdownHookRef != null) {
Utils.removeShutdownHook(shutdownHookRef)
}
logInfo("StreamingContext stopped successfully") logInfo("StreamingContext stopped successfully")
} }
// Even if we have already stopped, we still need to attempt to stop the SparkContext because // Even if we have already stopped, we still need to attempt to stop the SparkContext because
...@@ -670,6 +677,13 @@ class StreamingContext private[streaming] ( ...@@ -670,6 +677,13 @@ class StreamingContext private[streaming] (
state = STOPPED state = STOPPED
} }
} }
private def stopOnShutdown(): Unit = {
val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
} }
/** /**
...@@ -685,6 +699,8 @@ object StreamingContext extends Logging { ...@@ -685,6 +699,8 @@ object StreamingContext extends Logging {
*/ */
private val ACTIVATION_LOCK = new Object() private val ACTIVATION_LOCK = new Object()
private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
private val activeContext = new AtomicReference[StreamingContext](null) private val activeContext = new AtomicReference[StreamingContext](null)
private def assertNoOtherContextIsActive(): Unit = { private def assertNoOtherContextIsActive(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment