diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index 37dfa7fec08317f0353941b433c54b7e0e158b0d..9f34d01e6db48ff451e171f4646e3f379ae31c7c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy
 
 private[spark] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
+  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 
-  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
+  def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c6dec305bffcb6acdd1f7086e8732a8ea5d1fc71..33ffcbd216954d583e4e3c15c3042f67d0fab955 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -303,10 +303,11 @@ private[spark] class Master(
             appInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
 
+            val normalExit = exitStatus.exists(_ == 0)
             // Only retry certain number of times so we don't go into an infinite loop.
-            if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
+            if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
               schedule()
-            } else {
+            } else if (!normalExit) {
               logError("Application %s with ID %s failed %d times, removing it".format(
                 appInfo.desc.name, appInfo.id, appInfo.retryCount))
               removeApplication(appInfo, ApplicationState.FAILED)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index d09136de49807d51081dd455f0da5e7b0bace641..6433aac1c23e0a11e7a3005f6449b5c598365b80 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
       Files.write(header, stderr, Charsets.UTF_8)
       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
-      // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
-      // long-lived processes only. However, in the future, we might restart the executor a few
-      // times on the same machine.
+      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
+      // or with nonzero exit code
       val exitCode = process.waitFor()
-      state = ExecutorState.FAILED
+      state = ExecutorState.EXITED
       val message = "Command exited with code " + exitCode
       worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
     } catch {