diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b8414b5d099c566cd916293e906fe2861cb25799..8f8392fa646deee7d0f340ecb6905d54a47ce3ff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1757,25 +1757,30 @@ class SparkContext(config: SparkConf) extends Logging { def listJars(): Seq[String] = addedJars.keySet.toSeq /** - * Shut down the SparkContext. + * When stopping SparkContext inside Spark components, it's easy to cause dead-lock since Spark + * may wait for some internal threads to finish. It's better to use this method to stop + * SparkContext instead. */ - def stop(): Unit = { - if (env.rpcEnv.isInRPCThread) { - // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread. - // We should launch a new thread to call `stop` to avoid dead-lock. - new Thread("stop-spark-context") { - setDaemon(true) - - override def run(): Unit = { - _stop() + private[spark] def stopInNewThread(): Unit = { + new Thread("stop-spark-context") { + setDaemon(true) + + override def run(): Unit = { + try { + SparkContext.this.stop() + } catch { + case e: Throwable => + logError(e.getMessage, e) + throw e } - }.start() - } else { - _stop() - } + } + }.start() } - private def _stop() { + /** + * Shut down the SparkContext. + */ + def stop(): Unit = { if (LiveListenerBus.withinListenerThread.value) { throw new SparkException( s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index bbc416381490bc3d658b89fbf5c3570d96d3f098..530743c03640b3e4bd0c1d8f7dd949bef7e8fb91 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -146,11 +146,6 @@ private[spark] abstract class RpcEnv(conf: SparkConf) { * @param uri URI with location of the file. */ def openChannel(uri: String): ReadableByteChannel - - /** - * Return if the current thread is a RPC thread. - */ - def isInRPCThread: Boolean } /** diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 67baabd2cbff26f41255d2c16d9058308bdd4bd7..a02cf30a5d8317c65123019a93344ad2ab29c0bc 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -201,7 +201,6 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { /** Message loop used for dispatching messages. */ private class MessageLoop extends Runnable { override def run(): Unit = { - NettyRpcEnv.rpcThreadFlag.value = true try { while (true) { try { diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 0b8cd144a2161a9273fa580faeae26bc1a08a5fc..e56943da1303a3ad17e88d6560b8263abbab8b52 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -407,14 +407,9 @@ private[netty] class NettyRpcEnv( } } - - override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value } private[netty] object NettyRpcEnv extends Logging { - - private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false) - /** * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]]. * Use `currentEnv` to wrap the deserialization codes. E.g., diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f2517401cb76b38ae5e9ce76c7426481ddc3b56d..01a95c06fc69c1c39950b4f654a5a5c7cd901e17 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1660,7 +1660,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler } catch { case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) } - dagScheduler.sc.stop() + dagScheduler.sc.stopInNewThread() } override def onStop(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 04d40e2907cffeaa4315bf963e03eac70b824545..4a9af80f4537b382d9ff2cca60edb65f52529f53 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -139,7 +139,7 @@ private[spark] class StandaloneSchedulerBackend( scheduler.error(reason) } finally { // Ensure the application terminates, as we can no longer run jobs. - sc.stop() + sc.stopInNewThread() } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ded3416299e9af524964fe7d6c3c56aef681c2d0..071515134503fc15dbb1a02fb391bf622b7a0560 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1249,7 +1249,7 @@ private[spark] object Utils extends Logging { val currentThreadName = Thread.currentThread().getName if (sc != null) { logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) - sc.stop() + sc.stopInNewThread() } if (!NonFatal(t)) { logError(s"throw uncaught fatal error in thread $currentThreadName", t) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index aa0705987d837225653b2987f06dfc2605f05c82..acdf21df9a16157b3f1be95945fb3a94a6ffdead 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -870,19 +870,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { verify(endpoint, never()).onDisconnected(any()) verify(endpoint, never()).onNetworkError(any(), any()) } - - test("isInRPCThread") { - val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint { - override val rpcEnv = env - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case m => context.reply(rpcEnv.isInRPCThread) - } - }) - assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true) - assert(env.isInRPCThread === false) - env.stop(rpcEndpointRef) - } } class UnserializableClass