From 452690ba1cc3c667bdd9f3022c43c9a10267880b Mon Sep 17 00:00:00 2001
From: Josh Rosen <joshrosen@databricks.com>
Date: Wed, 2 Dec 2015 13:44:01 -0800
Subject: [PATCH] [SPARK-12001] Allow partially-stopped StreamingContext to be
 completely stopped

If `StreamingContext.stop()` is interrupted midway through the call, the context will be marked as stopped but certain state will have not been cleaned up. Because `state = STOPPED` will be set, subsequent `stop()` calls will be unable to finish stopping the context, preventing any new StreamingContexts from being created.

This patch addresses this issue by only marking the context as `STOPPED` once the `stop()` has successfully completed which allows `stop()` to be called a second time in order to finish stopping the context in case the original `stop()` call was interrupted.

I discovered this issue by examining logs from a failed Jenkins run in which this race condition occurred in `FailureSuite`, leaking an unstoppable context and causing all subsequent tests to fail.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9982 from JoshRosen/SPARK-12001.
---
 .../spark/streaming/StreamingContext.scala    | 49 ++++++++++---------
 1 file changed, 27 insertions(+), 22 deletions(-)

diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6fb8ad38ab..cf843e3e8b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -699,28 +699,33 @@ class StreamingContext private[streaming] (
         " AsynchronousListenerBus")
     }
     synchronized {
-      try {
-        state match {
-          case INITIALIZED =>
-            logWarning("StreamingContext has not been started yet")
-          case STOPPED =>
-            logWarning("StreamingContext has already been stopped")
-          case ACTIVE =>
-            scheduler.stop(stopGracefully)
-            // Removing the streamingSource to de-register the metrics on stop()
-            env.metricsSystem.removeSource(streamingSource)
-            uiTab.foreach(_.detach())
-            StreamingContext.setActiveContext(null)
-            waiter.notifyStop()
-            if (shutdownHookRef != null) {
-              shutdownHookRefToRemove = shutdownHookRef
-              shutdownHookRef = null
-            }
-            logInfo("StreamingContext stopped successfully")
-        }
-      } finally {
-        // The state should always be Stopped after calling `stop()`, even if we haven't started yet
-        state = STOPPED
+      // The state should always be Stopped after calling `stop()`, even if we haven't started yet
+      state match {
+        case INITIALIZED =>
+          logWarning("StreamingContext has not been started yet")
+          state = STOPPED
+        case STOPPED =>
+          logWarning("StreamingContext has already been stopped")
+          state = STOPPED
+        case ACTIVE =>
+          // It's important that we don't set state = STOPPED until the very end of this case,
+          // since we need to ensure that we're still able to call `stop()` to recover from
+          // a partially-stopped StreamingContext which resulted from this `stop()` call being
+          // interrupted. See SPARK-12001 for more details. Because the body of this case can be
+          // executed twice in the case of a partial stop, all methods called here need to be
+          // idempotent.
+          scheduler.stop(stopGracefully)
+          // Removing the streamingSource to de-register the metrics on stop()
+          env.metricsSystem.removeSource(streamingSource)
+          uiTab.foreach(_.detach())
+          StreamingContext.setActiveContext(null)
+          waiter.notifyStop()
+          if (shutdownHookRef != null) {
+            shutdownHookRefToRemove = shutdownHookRef
+            shutdownHookRef = null
+          }
+          logInfo("StreamingContext stopped successfully")
+          state = STOPPED
       }
     }
     if (shutdownHookRefToRemove != null) {
-- 
GitLab