diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 83ccaadfe7447fe9e40cccaa4724ee494d688c4a..5bb62d37d6374ee9d20ece9645136305d3fb1b91 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -75,6 +75,8 @@ class LocalSparkCluster( // Stop the workers before the master so they don't get upset that it disconnected workerRpcEnvs.foreach(_.shutdown()) masterRpcEnvs.foreach(_.shutdown()) + workerRpcEnvs.foreach(_.awaitTermination()) + masterRpcEnvs.foreach(_.awaitTermination()) masterRpcEnvs.clear() workerRpcEnvs.clear() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 04b20e0d6ab9cf0b728cc5033a1f090407a191cf..1355e1ad1b5233831af2040bc4e511dce4d8240b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -257,9 +257,8 @@ private[deploy] class Master( exec.state = state if (state == ExecutorState.RUNNING) { - if (oldState != ExecutorState.LAUNCHING) { - logWarning(s"Executor $execId state transfer from $oldState to RUNNING is unexpected") - } + assert(oldState == ExecutorState.LAUNCHING, + s"executor $execId state transfer from $oldState to RUNNING is illegal") appInfo.resetRetryCount() } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 25a17473e4b53d638e3edde3b106cf056903d0e2..9a42487bb37aa4b72f14bb98f702b2dabad74065 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -71,6 +71,11 @@ private[deploy] class ExecutorRunner( workerThread.start() // Shutdown hook that kills actors on shutdown. shutdownHook = ShutdownHookManager.addShutdownHook { () => + // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will + // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`. + if (state == ExecutorState.RUNNING) { + state = ExecutorState.FAILED + } killProcess(Some("Worker shutting down")) } }