Skip to content
Snippets Groups Projects
Commit 7bb6d31c authored by zsxwing's avatar zsxwing Committed by Marcelo Vanzin
Browse files

[SPARK-11232][CORE] Use 'offer' instead of 'put' to make sure calling send won't be interrupted

The current `NettyRpcEndpointRef.send` can be interrupted because it uses `LinkedBlockingQueue.put`, which may hang the application.

Image the following execution order:

  | thread 1: TaskRunner.kill | thread 2: TaskRunner.run
------------- | ------------- | -------------
1 | killed = true |
2 |  | if (killed) {
3 |  | throw new TaskKilledException
4 |  | case _: TaskKilledException  _: InterruptedException if task.killed =>
5 | task.kill(interruptThread): interruptThread is true |
6 | | execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
7 | | localEndpoint.send(StatusUpdate(taskId, state, serializedData)): in LocalBackend

Then `localEndpoint.send(StatusUpdate(taskId, state, serializedData))` will throw `InterruptedException`. This will prevent the executor from updating the task status and hang the application.

An failure caused by the above issue here: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44062/consoleFull

Since `receivers` is an unbounded `LinkedBlockingQueue`, we can just use `LinkedBlockingQueue.offer` to resolve this issue.

Author: zsxwing <zsxwing@gmail.com>

Closes #9198 from zsxwing/dont-interrupt-send.
parent 42d225f4
No related branches found
No related tags found
No related merge requests found
......@@ -66,7 +66,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.put(data) // for the OnStart message
receivers.offer(data) // for the OnStart message
}
endpointRef
}
......@@ -80,7 +80,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val data = endpoints.remove(name)
if (data != null) {
data.inbox.stop()
receivers.put(data) // for the OnStop message
receivers.offer(data) // for the OnStop message
}
// Don't clean `endpointRefs` here because it's possible that some messages are being processed
// now and they can use `getRpcEndpointRef`. So `endpointRefs` will be cleaned in Inbox via
......@@ -163,7 +163,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
true
} else {
data.inbox.post(createMessageFn(data.ref))
receivers.put(data)
receivers.offer(data)
false
}
}
......@@ -183,7 +183,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
// Stop all endpoints. This will queue all endpoints for processing by the message loops.
endpoints.keySet().asScala.foreach(unregisterRpcEndpoint)
// Enqueue a message that tells the message loops to stop.
receivers.put(PoisonPill)
receivers.offer(PoisonPill)
threadpool.shutdown()
}
......@@ -218,7 +218,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
val data = receivers.take()
if (data == PoisonPill) {
// Put PoisonPill back so that other MessageLoops can see it.
receivers.put(PoisonPill)
receivers.offer(PoisonPill)
return
}
data.inbox.process(Dispatcher.this)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment