Skip to content
Snippets Groups Projects
Commit 452690ba authored by Josh Rosen's avatar Josh Rosen
Browse files

[SPARK-12001] Allow partially-stopped StreamingContext to be completely stopped

If `StreamingContext.stop()` is interrupted midway through the call, the context will be marked as stopped but certain state will have not been cleaned up. Because `state = STOPPED` will be set, subsequent `stop()` calls will be unable to finish stopping the context, preventing any new StreamingContexts from being created.

This patch addresses this issue by only marking the context as `STOPPED` once the `stop()` has successfully completed which allows `stop()` to be called a second time in order to finish stopping the context in case the original `stop()` call was interrupted.

I discovered this issue by examining logs from a failed Jenkins run in which this race condition occurred in `FailureSuite`, leaking an unstoppable context and causing all subsequent tests to fail.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #9982 from JoshRosen/SPARK-12001.
parent a1542ce2
No related branches found
No related tags found
No related merge requests found
...@@ -699,28 +699,33 @@ class StreamingContext private[streaming] ( ...@@ -699,28 +699,33 @@ class StreamingContext private[streaming] (
" AsynchronousListenerBus") " AsynchronousListenerBus")
} }
synchronized { synchronized {
try { // The state should always be Stopped after calling `stop()`, even if we haven't started yet
state match { state match {
case INITIALIZED => case INITIALIZED =>
logWarning("StreamingContext has not been started yet") logWarning("StreamingContext has not been started yet")
case STOPPED => state = STOPPED
logWarning("StreamingContext has already been stopped") case STOPPED =>
case ACTIVE => logWarning("StreamingContext has already been stopped")
scheduler.stop(stopGracefully) state = STOPPED
// Removing the streamingSource to de-register the metrics on stop() case ACTIVE =>
env.metricsSystem.removeSource(streamingSource) // It's important that we don't set state = STOPPED until the very end of this case,
uiTab.foreach(_.detach()) // since we need to ensure that we're still able to call `stop()` to recover from
StreamingContext.setActiveContext(null) // a partially-stopped StreamingContext which resulted from this `stop()` call being
waiter.notifyStop() // interrupted. See SPARK-12001 for more details. Because the body of this case can be
if (shutdownHookRef != null) { // executed twice in the case of a partial stop, all methods called here need to be
shutdownHookRefToRemove = shutdownHookRef // idempotent.
shutdownHookRef = null scheduler.stop(stopGracefully)
} // Removing the streamingSource to de-register the metrics on stop()
logInfo("StreamingContext stopped successfully") env.metricsSystem.removeSource(streamingSource)
} uiTab.foreach(_.detach())
} finally { StreamingContext.setActiveContext(null)
// The state should always be Stopped after calling `stop()`, even if we haven't started yet waiter.notifyStop()
state = STOPPED if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
}
logInfo("StreamingContext stopped successfully")
state = STOPPED
} }
} }
if (shutdownHookRefToRemove != null) { if (shutdownHookRefToRemove != null) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment