Skip to content
Snippets Groups Projects
Commit 938434dc authored by Dongjoon Hyun's avatar Dongjoon Hyun Committed by Marcelo Vanzin
Browse files

[SPARK-15913][CORE] Dispatcher.stopped should be enclosed by synchronized block.

## What changes were proposed in this pull request?

`Dispatcher.stopped` is guarded by `this`, but it is used without synchronization in `postMessage` function. This PR fixes this and also the exception message became more accurate.

## How was this patch tested?

Pass the existing Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13634 from dongjoon-hyun/SPARK-15913.
parent cd47e233
No related branches found
No related tags found
No related merge requests found
......@@ -144,25 +144,20 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val shouldCallOnStop = synchronized {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped || data == null) {
true
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
false
None
}
}
if (shouldCallOnStop) {
// We don't need to call `onStop` in the `synchronized` block
val error = if (stopped) {
new RpcEnvStoppedException()
} else {
new SparkException(s"Could not find $endpointName or it has been stopped.")
}
callbackIfStopped(error)
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
def stop(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment