Skip to content
Snippets Groups Projects
Commit be36c2f1 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-20131][CORE] Don't use `this` lock in StandaloneSchedulerBackend.stop


## What changes were proposed in this pull request?

`o.a.s.streaming.StreamingContextSuite.SPARK-18560 Receiver data should be deserialized properly` is flaky is because there is a potential dead-lock in StandaloneSchedulerBackend which causes `await` timeout. Here is the related stack trace:
```
"Thread-31" #211 daemon prio=5 os_prio=31 tid=0x00007fedd4808000 nid=0x16403 waiting on condition [0x00007000239b7000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x000000079b49ca10> (a scala.concurrent.impl.Promise$CompletionLatch)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1037)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:402)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.org$apache$spark$scheduler$cluster$StandaloneSchedulerBackend$$stop(StandaloneSchedulerBackend.scala:213)
	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.stop(StandaloneSchedulerBackend.scala:116)
	- locked <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:517)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1657)
	at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1921)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1302)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1920)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:708)
	at org.apache.spark.streaming.StreamingContextSuite$$anonfun$43$$anonfun$apply$mcV$sp$66$$anon$3.run(StreamingContextSuite.scala:827)

"dispatcher-event-loop-3" #18 daemon prio=5 os_prio=31 tid=0x00007fedd603a000 nid=0x6203 waiting for monitor entry [0x0000700003be4000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:253)
	- waiting to lock <0x00000007066fca38> (a org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:124)
	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:213)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```

This PR removes `synchronized` and changes `stopping` to AtomicBoolean to ensure idempotent to fix the dead-lock.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17610 from zsxwing/SPARK-20131.

(cherry picked from commit c5f1cc37)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent 7e0ddda3
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.scheduler.cluster package org.apache.spark.scheduler.cluster
import java.util.concurrent.Semaphore import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicBoolean
import scala.concurrent.Future import scala.concurrent.Future
...@@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend( ...@@ -42,7 +43,7 @@ private[spark] class StandaloneSchedulerBackend(
with Logging { with Logging {
private var client: StandaloneAppClient = null private var client: StandaloneAppClient = null
private var stopping = false private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() { private val launcherBackend = new LauncherBackend() {
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED) override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
} }
...@@ -112,7 +113,7 @@ private[spark] class StandaloneSchedulerBackend( ...@@ -112,7 +113,7 @@ private[spark] class StandaloneSchedulerBackend(
launcherBackend.setState(SparkAppHandle.State.RUNNING) launcherBackend.setState(SparkAppHandle.State.RUNNING)
} }
override def stop(): Unit = synchronized { override def stop(): Unit = {
stop(SparkAppHandle.State.FINISHED) stop(SparkAppHandle.State.FINISHED)
} }
...@@ -125,14 +126,14 @@ private[spark] class StandaloneSchedulerBackend( ...@@ -125,14 +126,14 @@ private[spark] class StandaloneSchedulerBackend(
override def disconnected() { override def disconnected() {
notifyContext() notifyContext()
if (!stopping) { if (!stopping.get) {
logWarning("Disconnected from Spark cluster! Waiting for reconnection...") logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
} }
} }
override def dead(reason: String) { override def dead(reason: String) {
notifyContext() notifyContext()
if (!stopping) { if (!stopping.get) {
launcherBackend.setState(SparkAppHandle.State.KILLED) launcherBackend.setState(SparkAppHandle.State.KILLED)
logError("Application has been killed. Reason: " + reason) logError("Application has been killed. Reason: " + reason)
try { try {
...@@ -206,20 +207,20 @@ private[spark] class StandaloneSchedulerBackend( ...@@ -206,20 +207,20 @@ private[spark] class StandaloneSchedulerBackend(
registrationBarrier.release() registrationBarrier.release()
} }
private def stop(finalState: SparkAppHandle.State): Unit = synchronized { private def stop(finalState: SparkAppHandle.State): Unit = {
try { if (stopping.compareAndSet(false, true)) {
stopping = true try {
super.stop()
super.stop() client.stop()
client.stop()
val callback = shutdownCallback val callback = shutdownCallback
if (callback != null) { if (callback != null) {
callback(this) callback(this)
}
} finally {
launcherBackend.setState(finalState)
launcherBackend.close()
} }
} finally {
launcherBackend.setState(finalState)
launcherBackend.close()
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment