Skip to content
Snippets Groups Projects
Commit 56775571 authored by zsxwing's avatar zsxwing Committed by Reynold Xin
Browse files

[SPARK-5124][Core] Move StopCoordinator to the receive method since it does not require a reply

Hotfix for #4588

cc rxin

Author: zsxwing <zsxwing@gmail.com>

Closes #5283 from zsxwing/hotfix and squashes the following commits:

cf3e5a7 [zsxwing] Move StopCoordinator to the receive method since it does not require a reply
parent b8ff2bc6
No related branches found
No related tags found
No related merge requests found
...@@ -156,14 +156,16 @@ private[spark] object OutputCommitCoordinator { ...@@ -156,14 +156,16 @@ private[spark] object OutputCommitCoordinator {
override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator) override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
extends RpcEndpoint with Logging { extends RpcEndpoint with Logging {
override def receive: PartialFunction[Any, Unit] = {
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
stop()
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case AskPermissionToCommitOutput(stage, partition, taskAttempt) => case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
context.reply( context.reply(
outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt)) outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
case StopCoordinator =>
logInfo("OutputCommitCoordinator stopped!")
context.reply(true)
stop()
} }
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment