Skip to content
Snippets Groups Projects
Commit 2c43ea38 authored by Ilya Ganelin's avatar Ilya Ganelin Committed by Sean Owen
Browse files

[SPARK-6492][CORE] SparkContext.stop() can deadlock when DAGSchedulerEventProcessLoop dies

I've added a timeout and retry loop around the SparkContext shutdown code that should fix this deadlock. If a SparkContext shutdown is in progress when another thread comes knocking, it will wait for 10 seconds for the lock, then fall through where the outer loop will re-submit the request.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>

Closes #5277 from ilganeli/SPARK-6492 and squashes the following commits:

8617a7e [Ilya Ganelin] Resolved merge conflict
2fbab66 [Ilya Ganelin] Added MIMA Exclude
a0e2c70 [Ilya Ganelin] Deleted stale imports
fa28ce7 [Ilya Ganelin] reverted to just having a single stopped
76fc825 [Ilya Ganelin] Updated to use atomic booleans instead of the synchronized vars
6e8a7f7 [Ilya Ganelin] Removing unecessary null check for now since i'm not fixing stop ordering yet
cdf7073 [Ilya Ganelin] [SPARK-6492] Moved stopped=true back to the start of the shutdown sequence so this can be addressed in a seperate PR
7fb795b [Ilya Ganelin] Spacing
b7a0c5c [Ilya Ganelin] Import ordering
df8224f [Ilya Ganelin] Added comment for added lock
343cb94 [Ilya Ganelin] [SPARK-6492] Added timeout/retry logic to fix a deadlock in SparkContext shutdown
parent c23ba81b
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,7 @@ import java.io._ ...@@ -23,7 +23,7 @@ import java.io._
import java.lang.reflect.Constructor import java.lang.reflect.Constructor
import java.net.URI import java.net.URI
import java.util.{Arrays, Properties, UUID} import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID import java.util.UUID.randomUUID
import scala.collection.{Map, Set} import scala.collection.{Map, Set}
...@@ -95,10 +95,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -95,10 +95,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
@volatile private var stopped: Boolean = false private val stopped: AtomicBoolean = new AtomicBoolean(false)
private def assertNotStopped(): Unit = { private def assertNotStopped(): Unit = {
if (stopped) { if (stopped.get()) {
throw new IllegalStateException("Cannot call methods on a stopped SparkContext") throw new IllegalStateException("Cannot call methods on a stopped SparkContext")
} }
} }
...@@ -1390,33 +1390,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1390,33 +1390,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
addedJars.clear() addedJars.clear()
} }
/** Shut down the SparkContext. */ // Shut down the SparkContext.
def stop() { def stop() {
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { // Use the stopping variable to ensure no contention for the stop scenario.
if (!stopped) { // Still track the stopped variable for use elsewhere in the code.
stopped = true
postApplicationEnd() if (!stopped.compareAndSet(false, true)) {
ui.foreach(_.stop()) logInfo("SparkContext already stopped.")
env.metricsSystem.report() return
metadataCleaner.cancel()
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
eventLogger.foreach(_.stop())
env.actorSystem.stop(heartbeatReceiver)
progressBar.foreach(_.stop())
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
logInfo("Successfully stopped SparkContext")
SparkContext.clearActiveContext()
} else {
logInfo("SparkContext already stopped")
}
} }
postApplicationEnd()
ui.foreach(_.stop())
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
eventLogger.foreach(_.stop())
env.actorSystem.stop(heartbeatReceiver)
progressBar.foreach(_.stop())
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
} }
...@@ -1478,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1478,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
partitions: Seq[Int], partitions: Seq[Int],
allowLocal: Boolean, allowLocal: Boolean,
resultHandler: (Int, U) => Unit) { resultHandler: (Int, U) => Unit) {
if (stopped) { if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown") throw new IllegalStateException("SparkContext has been shutdown")
} }
val callSite = getCallSite val callSite = getCallSite
......
...@@ -60,6 +60,10 @@ object MimaExcludes { ...@@ -60,6 +60,10 @@ object MimaExcludes {
) ++ Seq( ) ++ Seq(
// SPARK-6510 Add a Graph#minus method acting as Set#difference // SPARK-6510 Add a Graph#minus method acting as Set#difference
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus") ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
) ++ Seq(
// SPARK-6492 Fix deadlock in SparkContext.stop()
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
"apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
) )
case v if v.startsWith("1.3") => case v if v.startsWith("1.3") =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment