diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 9b2632c22954811bb510aa8ddf93bacb00b8e958..051f53de64cd50cb97c271a50585d6bc6b302f32 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -694,32 +694,39 @@ class StreamingContext private[streaming] ( * @param stopGracefully if true, stops gracefully by waiting for the processing of all * received data to be completed */ - def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized { - try { - state match { - case INITIALIZED => - logWarning("StreamingContext has not been started yet") - case STOPPED => - logWarning("StreamingContext has already been stopped") - case ACTIVE => - scheduler.stop(stopGracefully) - // Removing the streamingSource to de-register the metrics on stop() - env.metricsSystem.removeSource(streamingSource) - uiTab.foreach(_.detach()) - StreamingContext.setActiveContext(null) - waiter.notifyStop() - if (shutdownHookRef != null) { - ShutdownHookManager.removeShutdownHook(shutdownHookRef) - } - logInfo("StreamingContext stopped successfully") + def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { + var shutdownHookRefToRemove: AnyRef = null + synchronized { + try { + state match { + case INITIALIZED => + logWarning("StreamingContext has not been started yet") + case STOPPED => + logWarning("StreamingContext has already been stopped") + case ACTIVE => + scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) + uiTab.foreach(_.detach()) + StreamingContext.setActiveContext(null) + waiter.notifyStop() + if (shutdownHookRef != null) { + shutdownHookRefToRemove = shutdownHookRef + shutdownHookRef = null + } + logInfo("StreamingContext stopped successfully") + } + } finally { + // The state should always be Stopped after calling `stop()`, even if we haven't started yet + state = STOPPED } - // Even if we have already stopped, we still need to attempt to stop the SparkContext because - // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). - if (stopSparkContext) sc.stop() - } finally { - // The state should always be Stopped after calling `stop()`, even if we haven't started yet - state = STOPPED } + if (shutdownHookRefToRemove != null) { + ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove) + } + // Even if we have already stopped, we still need to attempt to stop the SparkContext because + // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true). + if (stopSparkContext) sc.stop() } private def stopOnShutdown(): Unit = {