From 56775571cb938c819e5f7c3d49c5dd416ed034cb Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Mon, 30 Mar 2015 22:10:49 -0700
Subject: [PATCH] [SPARK-5124][Core] Move StopCoordinator to the receive method
 since it does not require a reply

Hotfix for #4588

cc rxin

Author: zsxwing <zsxwing@gmail.com>

Closes #5283 from zsxwing/hotfix and squashes the following commits:

cf3e5a7 [zsxwing] Move StopCoordinator to the receive method since it does not require a reply
---
 .../spark/scheduler/OutputCommitCoordinator.scala      | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index f748f394d1..17055e2f22 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -156,14 +156,16 @@ private[spark] object OutputCommitCoordinator {
       override val rpcEnv: RpcEnv, outputCommitCoordinator: OutputCommitCoordinator)
     extends RpcEndpoint with Logging {
 
+    override def receive: PartialFunction[Any, Unit] = {
+      case StopCoordinator =>
+        logInfo("OutputCommitCoordinator stopped!")
+        stop()
+    }
+
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       case AskPermissionToCommitOutput(stage, partition, taskAttempt) =>
         context.reply(
           outputCommitCoordinator.handleAskPermissionToCommit(stage, partition, taskAttempt))
-      case StopCoordinator =>
-        logInfo("OutputCommitCoordinator stopped!")
-        context.reply(true)
-        stop()
     }
   }
 }
-- 
GitLab