diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala index f748f394d1347921e72280b4e76c6ce5d6a6fb7a..17055e2f22d0d8bd1815d2c1d00736b2be830e28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala @@ -156,14 +156,16 @@ private[spark] object OutputCommitCoordinator { override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator) extends RpcEndpoint with Logging { + override def receive: PartialFunction[Any, Unit] = { + case StopCoordinator => + logInfo("OutputCommitCoordinator stopped!") + stop() + } + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case AskPermissionToCommitOutput(stage, partition, taskAttempt) => context.reply( outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt)) - case StopCoordinator => - logInfo("OutputCommitCoordinator stopped!") - context.reply(true) - stop() } } }