Skip to content
Snippets Groups Projects
Commit 88875d94 authored by jerryshao's avatar jerryshao Committed by Andrew Or
Browse files

[SPARK-10558][CORE] Fix wrong executor state in Master

`ExecutorAdded` can only be sent to `AppClient` when worker report back the executor state as `LOADING`, otherwise because of concurrency issue, `AppClient` will possibly receive `ExectuorAdded` at first, then `ExecutorStateUpdated` with `LOADING` state.

Also Master will change the executor state from `LAUNCHING` to `RUNNING` (`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> `RUNNING`.

Also it is wrongly shown in master UI, the state of executor should be `RUNNING` rather than `LOADING`:

![screen shot 2015-09-11 at 2 30 28 pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png)

Author: jerryshao <sshao@hortonworks.com>

Closes #8714 from jerryshao/SPARK-10558.
parent 9f3e59a1
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@ package org.apache.spark.deploy
private[deploy] object ExecutorState extends Enumeration {
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
type ExecutorState = Value
......
......@@ -178,9 +178,6 @@ private[spark] class AppClient(
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
// FIXME if changing master and `ExecutorAdded` happen at the same time (the order is not
// guaranteed), `ExecutorStateChanged` may be sent to a dead master.
sendToMaster(ExecutorStateChanged(appId.get, id, ExecutorState.RUNNING, None, None))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
case ExecutorUpdated(id, state, message, exitStatus) =>
......
......@@ -253,9 +253,17 @@ private[deploy] class Master(
execOption match {
case Some(exec) => {
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state
if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
......@@ -702,8 +710,8 @@ private[deploy] class Master(
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
private def registerWorker(worker: WorkerInfo): Boolean = {
......
......@@ -469,7 +469,7 @@ private[deploy] class Worker(
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.LOADING)
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment