From 88504b75ee610e14d7ceed8b038fa698a3d14f81 Mon Sep 17 00:00:00 2001
From: zsxwing <zsxwing@gmail.com>
Date: Fri, 3 Apr 2015 11:44:27 -0700
Subject: [PATCH] [SPARK-6640][Core] Fix the race condition of creating
 HeartbeatReceiver and retrieving HeartbeatReceiver

This PR moved the code of creating `HeartbeatReceiver` above the code of creating `schedulerBackend` to resolve the race condition.

Author: zsxwing <zsxwing@gmail.com>

Closes #5306 from zsxwing/SPARK-6640 and squashes the following commits:

840399d [zsxwing] Don't send TaskScheduler through Akka
a90616a [zsxwing] Fix docs
dd202c7 [zsxwing] Fix typo
d7c250d [zsxwing] Fix the race condition of creating HeartbeatReceiver and retrieving HeartbeatReceiver
---
 .../org/apache/spark/HeartbeatReceiver.scala  | 32 +++++++++++++++----
 .../scala/org/apache/spark/SparkContext.scala | 10 ++++--
 2 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 8435e1ea26..9f8ad03b91 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -37,6 +37,12 @@ private[spark] case class Heartbeat(
     taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
     blockManagerId: BlockManagerId)
 
+/**
+ * An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
+ * created.
+ */
+private[spark] case object TaskSchedulerIsSet
+
 private[spark] case object ExpireDeadHosts 
     
 private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
@@ -44,9 +50,11 @@ private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
 /**
  * Lives in the driver to receive heartbeats from executors..
  */
-private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
+private[spark] class HeartbeatReceiver(sc: SparkContext)
   extends Actor with ActorLogReceive with Logging {
 
+  private var scheduler: TaskScheduler = null
+
   // executor ID -> timestamp of when the last heartbeat from this executor was received
   private val executorLastSeen = new mutable.HashMap[String, Long]
 
@@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
   }
   
   override def receiveWithLogging: PartialFunction[Any, Unit] = {
-    case Heartbeat(executorId, taskMetrics, blockManagerId) =>
-      val unknownExecutor = !scheduler.executorHeartbeatReceived(
-        executorId, taskMetrics, blockManagerId)
-      val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
-      executorLastSeen(executorId) = System.currentTimeMillis()
-      sender ! response
+    case TaskSchedulerIsSet =>
+      scheduler = sc.taskScheduler
+    case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
+      if (scheduler != null) {
+        val unknownExecutor = !scheduler.executorHeartbeatReceived(
+          executorId, taskMetrics, blockManagerId)
+        val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
+        executorLastSeen(executorId) = System.currentTimeMillis()
+        sender ! response
+      } else {
+        // Because Executor will sleep several seconds before sending the first "Heartbeat", this
+        // case rarely happens. However, if it really happens, log it and ask the executor to
+        // register itself again.
+        logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
+        sender ! HeartbeatResponse(reregisterBlockManager = true)
+      }
     case ExpireDeadHosts =>
       expireDeadHosts()
   }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index abf81e312d..fd1838976e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -356,11 +356,17 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   val sparkUser = Utils.getCurrentUserName()
   executorEnvs("SPARK_USER") = sparkUser
 
+  // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
+  // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
+  private val heartbeatReceiver = env.actorSystem.actorOf(
+    Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
+
   // Create and start the scheduler
   private[spark] var (schedulerBackend, taskScheduler) =
     SparkContext.createTaskScheduler(this, master)
-  private val heartbeatReceiver = env.actorSystem.actorOf(
-    Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
+
+  heartbeatReceiver ! TaskSchedulerIsSet
+
   @volatile private[spark] var dagScheduler: DAGScheduler = _
   try {
     dagScheduler = new DAGScheduler(this)
-- 
GitLab