From 88875d9413ec7d64a88d40857ffcf97b5853a7f2 Mon Sep 17 00:00:00 2001
From: jerryshao <sshao@hortonworks.com>
Date: Wed, 25 Nov 2015 11:42:53 -0800
Subject: [PATCH] [SPARK-10558][CORE] Fix wrong executor state in Master

`ExecutorAdded` can only be sent to `AppClient` when worker report back the executor state as `LOADING`, otherwise because of concurrency issue, `AppClient` will possibly receive `ExectuorAdded` at first, then `ExecutorStateUpdated` with `LOADING` state.

Also Master will change the executor state from `LAUNCHING` to `RUNNING` (`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> `RUNNING`.

Also it is wrongly shown in master UI, the state of executor should be `RUNNING` rather than `LOADING`:

![screen shot 2015-09-11 at 2 30 28 pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png)

Author: jerryshao <sshao@hortonworks.com>

Closes #8714 from jerryshao/SPARK-10558.
---
 .../org/apache/spark/deploy/ExecutorState.scala    |  2 +-
 .../org/apache/spark/deploy/client/AppClient.scala |  3 ---
 .../org/apache/spark/deploy/master/Master.scala    | 14 +++++++++++---
 .../org/apache/spark/deploy/worker/Worker.scala    |  2 +-
 4 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index efa88c62e1..69c98e2893 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 private[deploy] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index afab362e21..df6ba7d669 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -178,9 +178,6 @@ private[spark] class AppClient(
         val fullId = appId + "/" + id
         logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
           cores))
-        // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
-        // guaranteed), `ExecutorStateChanged` may be sent to a dead master.
-        sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
       case ExecutorUpdated(id, state, message, exitStatus) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b25a487806..9952c97dbd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -253,9 +253,17 @@ private[deploy] class Master(
       execOption match {
         case Some(exec) => {
           val appInfo = idToApp(appId)
+          val oldState = exec.state
           exec.state = state
-          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
+
+          if (state == ExecutorState.RUNNING) {
+            assert(oldState == ExecutorState.LAUNCHING,
+              s"executor $execId state transfer from $oldState to RUNNING is illegal")
+            appInfo.resetRetryCount()
+          }
+
           exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
+
           if (ExecutorState.isFinished(state)) {
             // Remove this executor from the worker and app
             logInfo(s"Removing executor ${exec.fullId} because it is $state")
@@ -702,8 +710,8 @@ private[deploy] class Master(
     worker.addExecutor(exec)
     worker.endpoint.send(LaunchExecutor(masterUrl,
       exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
-    exec.application.driver.send(ExecutorAdded(
-      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
+    exec.application.driver.send(
+      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
   }
 
   private def registerWorker(worker: WorkerInfo): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a45867e768..418faf8fc9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -469,7 +469,7 @@ private[deploy] class Worker(
             executorDir,
             workerUri,
             conf,
-            appLocalDirs, ExecutorState.LOADING)
+            appLocalDirs, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
-- 
GitLab