diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 9335c5f4160bf54fa7c3a60fc967f83a73a9b41e..18278b292ff5a7960929e357d89fc8ee2ff39939 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -53,7 +53,13 @@ sealed trait TaskFailedReason extends TaskEndReason {
   /** Error message displayed in the web UI. */
   def toErrorString: String
 
-  def shouldEventuallyFailJob: Boolean = true
+  /**
+   * Whether this task failure should be counted towards the maximum number of times the task is
+   * allowed to fail before the stage is aborted.  Set to false in cases where the task's failure
+   * was unrelated to the task; for example, if the task failed because the executor it was running
+   * on was killed.
+   */
+  def countTowardsTaskFailures: Boolean = true
 }
 
 /**
@@ -208,7 +214,7 @@ case class TaskCommitDenied(
    * towards failing the stage. This is intended to prevent spurious stage failures in cases
    * where many speculative tasks are launched and denied to commit.
    */
-  override def shouldEventuallyFailJob: Boolean = false
+  override def countTowardsTaskFailures: Boolean = false
 }
 
 /**
@@ -217,14 +223,18 @@ case class TaskCommitDenied(
  * the task crashed the JVM.
  */
 @DeveloperApi
-case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+case class ExecutorLostFailure(execId: String, exitCausedByApp: Boolean = true)
   extends TaskFailedReason {
   override def toErrorString: String = {
-    val exitBehavior = if (isNormalExit) "normally" else "abnormally"
-    s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+    val exitBehavior = if (exitCausedByApp) {
+      "caused by one of the running tasks"
+    } else {
+      "unrelated to the running tasks"
+    }
+    s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
   }
 
-  override def shouldEventuallyFailJob: Boolean = !isNormalExit
+  override def countTowardsTaskFailures: Boolean = exitCausedByApp
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 0a98c69b89ea58b4d3acd6c59f2bb1f8b7ec84e3..33edf25043850ab7f7fdedeb30245ff40d92d2b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -28,12 +28,15 @@ class ExecutorLossReason(val message: String) extends Serializable {
 }
 
 private[spark]
-case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+case class ExecutorExited(exitCode: Int, exitCausedByApp: Boolean, reason: String)
   extends ExecutorLossReason(reason)
 
 private[spark] object ExecutorExited {
-  def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
-    ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+  def apply(exitCode: Int, exitCausedByApp: Boolean): ExecutorExited = {
+    ExecutorExited(
+      exitCode,
+      exitCausedByApp,
+      ExecutorExitCode.explainExitCode(exitCode))
   }
 }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 987800d3d1f1e53738a4af6278da0a754500b50f..9b3fad9012abca50dfa19a6c7a50168202f1d61f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -704,9 +704,10 @@ private[spark] class TaskSetManager(
         }
         ef.exception
 
-      case e: ExecutorLostFailure if e.isNormalExit =>
+      case e: ExecutorLostFailure if !e.exitCausedByApp =>
         logInfo(s"Task $tid failed because while it was being computed, its executor" +
-          s" exited normally. Not marking the task as failed.")
+          "exited for a reason unrelated to the task. Not counting this failure towards the " +
+          "maximum number of failures for the task.")
         None
 
       case e: TaskFailedReason =>  // TaskResultLost, TaskKilled, and others
@@ -724,7 +725,7 @@ private[spark] class TaskSetManager(
     addPendingTask(index)
     if (!isZombie && state != TaskState.KILLED
         && reason.isInstanceOf[TaskFailedReason]
-        && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
+        && reason.asInstanceOf[TaskFailedReason].countTowardsTaskFailures) {
       assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
@@ -797,11 +798,12 @@ private[spark] class TaskSetManager(
       }
     }
     for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
-      val isNormalExit: Boolean = reason match {
-        case exited: ExecutorExited => exited.isNormalExit
-        case _ => false
+      val exitCausedByApp: Boolean = reason match {
+        case exited: ExecutorExited => exited.exitCausedByApp
+        case _ => true
       }
-      handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
+      handleFailedTask(
+        tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp))
     }
     // recalculate valid locality levels and waits when executor is lost
     recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4652df32efa74ba1cd0fa714a1be8e49c302e835..8103efa7302e7f8270c3505bb9cf61331e9fc57b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -98,7 +98,7 @@ private[spark] object CoarseGrainedClusterMessages {
       hostToLocalTaskCount: Map[String, Int])
     extends CoarseGrainedClusterMessage
 
-  // Check if an executor was force-killed but for a normal reason.
+  // Check if an executor was force-killed but for a reason unrelated to the running tasks.
   // This could be the case if the executor is preempted, for instance.
   case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index a4214c496166da368e400b9249b9fa118c71e9cf..05d9bc92f228b6f2fe5154ee3f330fdbf5fe2409 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -137,7 +137,7 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
     val reason: ExecutorLossReason = exitStatus match {
-      case Some(code) => ExecutorExited(code, isNormalExit = false, message)
+      case Some(code) => ExecutorExited(code, exitCausedByApp = true, message)
       case None => SlaveLost(message)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 38218b9c08fd863cbca8d40ce766d4bd590866a0..e483688edef5f68f75b4b57cf5eade2bc7c39e5f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -111,10 +111,10 @@ private[spark] abstract class YarnSchedulerBackend(
      * immediately.
      *
      * In YARN's case however it is crucial to talk to the application master and ask why the
-     * executor had exited. In particular, the executor may have exited due to the executor
-     * having been preempted. If the executor "exited normally" according to the application
-     * master then we pass that information down to the TaskSetManager to inform the
-     * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+     * executor had exited. If the executor exited for some reason unrelated to the running tasks
+     * (e.g., preemption), according to the application master, then we pass that information down
+     * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
+     * not count towards a job failure.
      *
      * TODO there's a race condition where while we are querying the ApplicationMaster for
      * the executor loss reason, there is the potential that tasks will be scheduled on
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 6196176c7cc33a09682d99b9aa9be31c353ce0dc..aaffac604a88560146b2915f15b2db4e1c9669fd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -394,7 +394,7 @@ private[spark] class MesosSchedulerBackend(
                             slaveId: SlaveID, status: Int) {
     logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
                                                                  slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
+    recordSlaveLost(d, slaveId, ExecutorExited(status, exitCausedByApp = true))
   }
 
   override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a06dc6f709d334823a254ab2b1e95893e43e1dd7..ad6615c1124d0f6431b9ae152ab80b018ed95bed 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -367,9 +367,9 @@ private[spark] object JsonProtocol {
         ("Job ID" -> taskCommitDenied.jobID) ~
         ("Partition ID" -> taskCommitDenied.partitionID) ~
         ("Attempt Number" -> taskCommitDenied.attemptNumber)
-      case ExecutorLostFailure(executorId, isNormalExit) =>
+      case ExecutorLostFailure(executorId, exitCausedByApp) =>
         ("Executor ID" -> executorId) ~
-        ("Normal Exit" -> isNormalExit)
+        ("Exit Caused By App" -> exitCausedByApp)
       case _ => Utils.emptyJson
     }
     ("Reason" -> reason) ~ json
@@ -810,10 +810,9 @@ private[spark] object JsonProtocol {
         val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
         TaskCommitDenied(jobId, partitionId, attemptNo)
       case `executorLostFailure` =>
-        val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
-          map(_.extract[Boolean])
+        val exitCausedByApp = Utils.jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
         val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
-        ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
+        ExecutorLostFailure(executorId.getOrElse("Unknown"), exitCausedByApp.getOrElse(true))
       case `unknownReason` => UnknownReason
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index cd6bf723e70cbd8429c0138d9e92a59907c281ad..ecc18fc6e15b4da6251d9647e03a8e0147b95805 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -511,7 +511,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
   }
 
-  test("Executors are added but exit normally while running tasks") {
+  test("Executors exit for reason unrelated to currently running tasks") {
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc)
     val taskSet = FakeTask.createTaskSet(4,
@@ -526,11 +526,15 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     manager.executorAdded()
     assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
     sched.removeExecutor("execA")
-    manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+    manager.executorLost(
+      "execA",
+      "host1",
+      ExecutorExited(143, false, "Terminated for reason unrelated to running tasks"))
     assert(!sched.taskSetsFailed.contains(taskSet.id))
     assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
     sched.removeExecutor("execC")
-    manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+    manager.executorLost(
+      "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks"))
     assert(sched.taskSetsFailed.contains(taskSet.id))
   }
 
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index f9572921f43cb1f26ff09d7d0a586fe96f2bd948..86137f259c13d5fe90c4e1c3ba1121e9d59084d9 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -603,10 +603,10 @@ class JsonProtocolSuite extends SparkFunSuite {
         assert(jobId1 === jobId2)
         assert(partitionId1 === partitionId2)
         assert(attemptNumber1 === attemptNumber2)
-      case (ExecutorLostFailure(execId1, isNormalExit1),
-          ExecutorLostFailure(execId2, isNormalExit2)) =>
+      case (ExecutorLostFailure(execId1, exit1CausedByApp),
+          ExecutorLostFailure(execId2, exit2CausedByApp)) =>
         assert(execId1 === execId2)
-        assert(isNormalExit1 === isNormalExit2)
+        assert(exit1CausedByApp === exit2CausedByApp)
       case (UnknownReason, UnknownReason) =>
       case _ => fail("Task end reasons don't match in types!")
     }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 1deaa3743ddfa2de621a72bc219ce3bc21159e97..875bbd4e4e3d58c2defa4ca9695062191ea02dd2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -445,40 +445,41 @@ private[yarn] class YarnAllocator(
         // there are some exit status' we shouldn't necessarily count against us, but for
         // now I think its ok as none of the containers are expected to exit.
         val exitStatus = completedContainer.getExitStatus
-        val (isNormalExit, containerExitReason) = exitStatus match {
+        val (exitCausedByApp, containerExitReason) = exitStatus match {
           case ContainerExitStatus.SUCCESS =>
-            (true, s"Executor for container $containerId exited normally.")
+            (false, s"Executor for container $containerId exited because of a YARN event (e.g., " +
+              "pre-emption) and not because of an error in the running job.")
           case ContainerExitStatus.PREEMPTED =>
-            // Preemption should count as a normal exit, since YARN preempts containers merely
-            // to do resource sharing, and tasks that fail due to preempted executors could
+            // Preemption is not the fault of the running tasks, since YARN preempts containers
+            // merely to do resource sharing, and tasks that fail due to preempted executors could
             // just as easily finish on any other executor. See SPARK-8167.
-            (true, s"Container ${containerId}${onHostStr} was preempted.")
+            (false, s"Container ${containerId}${onHostStr} was preempted.")
           // Should probably still count memory exceeded exit codes towards task failures
           case VMEM_EXCEEDED_EXIT_CODE =>
-            (false, memLimitExceededLogMessage(
+            (true, memLimitExceededLogMessage(
               completedContainer.getDiagnostics,
               VMEM_EXCEEDED_PATTERN))
           case PMEM_EXCEEDED_EXIT_CODE =>
-            (false, memLimitExceededLogMessage(
+            (true, memLimitExceededLogMessage(
               completedContainer.getDiagnostics,
               PMEM_EXCEEDED_PATTERN))
           case unknown =>
             numExecutorsFailed += 1
-            (false, "Container marked as failed: " + containerId + onHostStr +
+            (true, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
               ". Diagnostics: " + completedContainer.getDiagnostics)
 
         }
-        if (isNormalExit) {
-          logInfo(containerExitReason)
-        } else {
+        if (exitCausedByApp) {
           logWarning(containerExitReason)
+        } else {
+          logInfo(containerExitReason)
         }
-        ExecutorExited(0, isNormalExit, containerExitReason)
+        ExecutorExited(0, exitCausedByApp, containerExitReason)
       } else {
         // If we have already released this container, then it must mean
         // that the driver has explicitly requested it to be killed
-        ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+        ExecutorExited(completedContainer.getExitStatus, exitCausedByApp = false,
           s"Container $containerId exited from explicit termination request.")
       }