diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index efa88c62e1f5d2b0cd75940b88b4c0298c90ca07..69c98e28931d74023961c5ed7e28b32a7726e894 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 private[deploy] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index afab362e213b5b9501ab6e91d6f03426edb09338..df6ba7d669ce9952450c497c49d7594cc24acfc3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -178,9 +178,6 @@ private[spark] class AppClient(
         val fullId = appId + "/" + id
         logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
           cores))
-        // FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
-        // guaranteed), `ExecutorStateChanged` may be sent to a dead master.
-        sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
       case ExecutorUpdated(id, state, message, exitStatus) =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b25a487806c7fbbfd55c7d4e24938f3301f19694..9952c97dbdffcd63aa391b06beaaaadef8711a5d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -253,9 +253,17 @@ private[deploy] class Master(
       execOption match {
         case Some(exec) => {
           val appInfo = idToApp(appId)
+          val oldState = exec.state
           exec.state = state
-          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
+
+          if (state == ExecutorState.RUNNING) {
+            assert(oldState == ExecutorState.LAUNCHING,
+              s"executor $execId state transfer from $oldState to RUNNING is illegal")
+            appInfo.resetRetryCount()
+          }
+
           exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
+
           if (ExecutorState.isFinished(state)) {
             // Remove this executor from the worker and app
             logInfo(s"Removing executor ${exec.fullId} because it is $state")
@@ -702,8 +710,8 @@ private[deploy] class Master(
     worker.addExecutor(exec)
     worker.endpoint.send(LaunchExecutor(masterUrl,
       exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
-    exec.application.driver.send(ExecutorAdded(
-      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
+    exec.application.driver.send(
+      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
   }
 
   private def registerWorker(worker: WorkerInfo): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a45867e7680ec7f0ccd61e805480d2160c3cb1f4..418faf8fc967fdc15663b8bfa52fa4e7cf78e9a9 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -469,7 +469,7 @@ private[deploy] class Worker(
             executorDir,
             workerUri,
             conf,
-            appLocalDirs, ExecutorState.LOADING)
+            appLocalDirs, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_