diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5b3778ead6994bed9fe722b2a5b5257487df57ff..abf81e312d8e6fc6626015233a2040e5ef489740 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -23,7 +23,7 @@ import java.io._ import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Properties, UUID} -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.UUID.randomUUID import scala.collection.{Map, Set} @@ -95,10 +95,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val startTime = System.currentTimeMillis() - @volatile private var stopped: Boolean = false + private val stopped: AtomicBoolean = new AtomicBoolean(false) private def assertNotStopped(): Unit = { - if (stopped) { + if (stopped.get()) { throw new IllegalStateException("Cannot call methods on a stopped SparkContext") } } @@ -1390,33 +1390,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli addedJars.clear() } - /** Shut down the SparkContext. */ + // Shut down the SparkContext. def stop() { - SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - if (!stopped) { - stopped = true - postApplicationEnd() - ui.foreach(_.stop()) - env.metricsSystem.report() - metadataCleaner.cancel() - cleaner.foreach(_.stop()) - executorAllocationManager.foreach(_.stop()) - dagScheduler.stop() - dagScheduler = null - listenerBus.stop() - eventLogger.foreach(_.stop()) - env.actorSystem.stop(heartbeatReceiver) - progressBar.foreach(_.stop()) - taskScheduler = null - // TODO: Cache.stop()? - env.stop() - SparkEnv.set(null) - logInfo("Successfully stopped SparkContext") - SparkContext.clearActiveContext() - } else { - logInfo("SparkContext already stopped") - } + // Use the stopping variable to ensure no contention for the stop scenario. + // Still track the stopped variable for use elsewhere in the code. + + if (!stopped.compareAndSet(false, true)) { + logInfo("SparkContext already stopped.") + return } + + postApplicationEnd() + ui.foreach(_.stop()) + env.metricsSystem.report() + metadataCleaner.cancel() + cleaner.foreach(_.stop()) + executorAllocationManager.foreach(_.stop()) + dagScheduler.stop() + dagScheduler = null + listenerBus.stop() + eventLogger.foreach(_.stop()) + env.actorSystem.stop(heartbeatReceiver) + progressBar.foreach(_.stop()) + taskScheduler = null + // TODO: Cache.stop()? + env.stop() + SparkEnv.set(null) + SparkContext.clearActiveContext() + logInfo("Successfully stopped SparkContext") } @@ -1478,7 +1479,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - if (stopped) { + if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 54500f7c2701f2f16413e72df6ba23d6bf7fb4b4..c2d828f982fe09f24b3db3ee813ab0fa88130312 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -60,6 +60,10 @@ object MimaExcludes { ) ++ Seq( // SPARK-6510 Add a Graph#minus method acting as Set#difference ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus") + ) ++ Seq( + // SPARK-6492 Fix deadlock in SparkContext.stop() + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" + + "apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK") ) case v if v.startsWith("1.3") =>