Skip to content
Snippets Groups Projects
Commit e1eef248 authored by zsxwing's avatar zsxwing Committed by Tathagata Das
Browse files

[SPARK-11104] [STREAMING] Fix a deadlock in StreamingContex.stop

The following deadlock may happen if shutdownHook and StreamingContext.stop are running at the same time.
```
Java stack information for the threads listed above:
===================================================
"Thread-2":
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:699)
	- waiting to lock <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
	at org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:729)
	at org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:625)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:266)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1697)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:236)
	- locked <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
"main":
	at org.apache.spark.util.SparkShutdownHookManager.remove(ShutdownHookManager.scala:248)
	- waiting to lock <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
	at org.apache.spark.util.ShutdownHookManager$.removeShutdownHook(ShutdownHookManager.scala:199)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:712)
	- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:684)
	- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
	at org.apache.spark.streaming.SessionByKeyBenchmark$.main(SessionByKeyBenchmark.scala:108)
	at org.apache.spark.streaming.SessionByKeyBenchmark.main(SessionByKeyBenchmark.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

This PR just moved `ShutdownHookManager.removeShutdownHook` out of `synchronized` to avoid deadlock.

Author: zsxwing <zsxwing@gmail.com>

Closes #9116 from zsxwing/stop-deadlock.
parent 369d786f
No related branches found
No related tags found
No related merge requests found
......@@ -694,32 +694,39 @@ class StreamingContext private[streaming] (
* @param stopGracefully if true, stops gracefully by waiting for the processing of all
* received data to be completed
*/
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
try {
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
case STOPPED =>
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
ShutdownHookManager.removeShutdownHook(shutdownHookRef)
}
logInfo("StreamingContext stopped successfully")
def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
var shutdownHookRefToRemove: AnyRef = null
synchronized {
try {
state match {
case INITIALIZED =>
logWarning("StreamingContext has not been started yet")
case STOPPED =>
logWarning("StreamingContext has already been stopped")
case ACTIVE =>
scheduler.stop(stopGracefully)
// Removing the streamingSource to de-register the metrics on stop()
env.metricsSystem.removeSource(streamingSource)
uiTab.foreach(_.detach())
StreamingContext.setActiveContext(null)
waiter.notifyStop()
if (shutdownHookRef != null) {
shutdownHookRefToRemove = shutdownHookRef
shutdownHookRef = null
}
logInfo("StreamingContext stopped successfully")
}
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
}
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
} finally {
// The state should always be Stopped after calling `stop()`, even if we haven't started yet
state = STOPPED
}
if (shutdownHookRefToRemove != null) {
ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
}
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
}
private def stopOnShutdown(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment