diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 934d00dc708b9c1494597ae714e92c71416644b7..2ae878b3e60871d61c880cba8a7d6c04a85c21ac 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -48,6 +48,8 @@ case object Success extends TaskEndReason
 sealed trait TaskFailedReason extends TaskEndReason {
   /** Error message displayed in the web UI. */
   def toErrorString: String
+
+  def shouldEventuallyFailJob: Boolean = true
 }
 
 /**
@@ -194,6 +196,12 @@ case object TaskKilled extends TaskFailedReason {
 case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extends TaskFailedReason {
   override def toErrorString: String = s"TaskCommitDenied (Driver denied task commit)" +
     s" for job: $jobID, partition: $partitionID, attempt: $attemptID"
+  /**
+   * If a task failed because its attempt to commit was denied, do not count this failure
+   * towards failing the stage. This is intended to prevent spurious stage failures in cases
+   * where many speculative tasks are launched and denied to commit.
+   */
+  override def shouldEventuallyFailJob: Boolean = false
 }
 
 /**
@@ -202,8 +210,14 @@ case class TaskCommitDenied(jobID: Int, partitionID: Int, attemptID: Int) extend
  * the task crashed the JVM.
  */
 @DeveloperApi
-case class ExecutorLostFailure(execId: String) extends TaskFailedReason {
-  override def toErrorString: String = s"ExecutorLostFailure (executor ${execId} lost)"
+case class ExecutorLostFailure(execId: String, isNormalExit: Boolean = false)
+  extends TaskFailedReason {
+  override def toErrorString: String = {
+    val exitBehavior = if (isNormalExit) "normally" else "abnormally"
+    s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})"
+  }
+
+  override def shouldEventuallyFailJob: Boolean = !isNormalExit
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 2bc43a91864491ff8f06b8ef2a04ed51131734b8..0a98c69b89ea58b4d3acd6c59f2bb1f8b7ec84e3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -23,16 +23,20 @@ import org.apache.spark.executor.ExecutorExitCode
  * Represents an explanation for a executor or whole slave failing or exiting.
  */
 private[spark]
-class ExecutorLossReason(val message: String) {
+class ExecutorLossReason(val message: String) extends Serializable {
   override def toString: String = message
 }
 
 private[spark]
-case class ExecutorExited(val exitCode: Int)
-  extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+case class ExecutorExited(exitCode: Int, isNormalExit: Boolean, reason: String)
+  extends ExecutorLossReason(reason)
+
+private[spark] object ExecutorExited {
+  def apply(exitCode: Int, isNormalExit: Boolean): ExecutorExited = {
+    ExecutorExited(exitCode, isNormalExit, ExecutorExitCode.explainExitCode(exitCode))
+  }
 }
 
 private[spark]
 case class SlaveLost(_message: String = "Slave lost")
-  extends ExecutorLossReason(_message) {
-}
+  extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 5821afea9898256f5d307207b12682a24d0754c1..551e39a81b6954652ad2e897a8dd53545285028f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -83,8 +83,8 @@ private[spark] class Pool(
     null
   }
 
-  override def executorLost(executorId: String, host: String) {
-    schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
+  override def executorLost(executorId: String, host: String, reason: ExecutorLossReason) {
+    schedulableQueue.asScala.foreach(_.executorLost(executorId, host, reason))
   }
 
   override def checkSpeculatableTasks(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index a87ef030e69c24a5dcf5ebead93b1c5a3d5490c4..ab00bc8f0bf4e5659ad2bb0e206373408a62037f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -42,7 +42,7 @@ private[spark] trait Schedulable {
   def addSchedulable(schedulable: Schedulable): Unit
   def removeSchedulable(schedulable: Schedulable): Unit
   def getSchedulableByName(name: String): Schedulable
-  def executorLost(executorId: String, host: String): Unit
+  def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
   def checkSpeculatableTasks(): Boolean
   def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1705e7f962de2a554cf466332653740d88e9a41c..1c7bfe89c02ac4a2b9bfc96a47cc66a5b3ee1f17 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -332,7 +332,8 @@ private[spark] class TaskSchedulerImpl(
           // We lost this entire executor, so remember that it's gone
           val execId = taskIdToExecutorId(tid)
           if (activeExecutorIds.contains(execId)) {
-            removeExecutor(execId)
+            removeExecutor(execId,
+              SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
             failedExecutor = Some(execId)
           }
         }
@@ -464,7 +465,7 @@ private[spark] class TaskSchedulerImpl(
       if (activeExecutorIds.contains(executorId)) {
         val hostPort = executorIdToHost(executorId)
         logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
-        removeExecutor(executorId)
+        removeExecutor(executorId, reason)
         failedExecutor = Some(executorId)
       } else {
          // We may get multiple executorLost() calls with different loss reasons. For example, one
@@ -482,7 +483,7 @@ private[spark] class TaskSchedulerImpl(
   }
 
   /** Remove an executor from all our data structures and mark it as lost */
-  private def removeExecutor(executorId: String) {
+  private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
     activeExecutorIds -= executorId
     val host = executorIdToHost(executorId)
     val execs = executorsByHost.getOrElse(host, new HashSet)
@@ -497,7 +498,7 @@ private[spark] class TaskSchedulerImpl(
       }
     }
     executorIdToHost -= executorId
-    rootPool.executorLost(executorId, host)
+    rootPool.executorLost(executorId, host, reason)
   }
 
   def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 818b95d67f6beb679a377483bd9f51871543da1e..62af9031b9f8b2d820bf7f0ec93cfccad637f31f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -709,6 +709,11 @@ private[spark] class TaskSetManager(
         }
         ef.exception
 
+      case e: ExecutorLostFailure if e.isNormalExit =>
+        logInfo(s"Task $tid failed because while it was being computed, its executor" +
+          s" exited normally. Not marking the task as failed.")
+        None
+
       case e: TaskFailedReason =>  // TaskResultLost, TaskKilled, and others
         logWarning(failureReason)
         None
@@ -722,10 +727,9 @@ private[spark] class TaskSetManager(
       put(info.executorId, clock.getTimeMillis())
     sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
     addPendingTask(index)
-    if (!isZombie && state != TaskState.KILLED && !reason.isInstanceOf[TaskCommitDenied]) {
-      // If a task failed because its attempt to commit was denied, do not count this failure
-      // towards failing the stage. This is intended to prevent spurious stage failures in cases
-      // where many speculative tasks are launched and denied to commit.
+    if (!isZombie && state != TaskState.KILLED
+        && reason.isInstanceOf[TaskFailedReason]
+        && reason.asInstanceOf[TaskFailedReason].shouldEventuallyFailJob) {
       assert (null != failureReason)
       numFailures(index) += 1
       if (numFailures(index) >= maxTaskFailures) {
@@ -778,7 +782,7 @@ private[spark] class TaskSetManager(
   }
 
   /** Called by TaskScheduler when an executor is lost so we can re-enqueue our tasks */
-  override def executorLost(execId: String, host: String) {
+  override def executorLost(execId: String, host: String, reason: ExecutorLossReason) {
     logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
 
     // Re-enqueue pending tasks for this host based on the status of the cluster. Note
@@ -809,9 +813,12 @@ private[spark] class TaskSetManager(
         }
       }
     }
-    // Also re-enqueue any tasks that were running on the node
     for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
-      handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(execId))
+      val isNormalExit: Boolean = reason match {
+        case exited: ExecutorExited => exited.isNormalExit
+        case _ => false
+      }
+      handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, isNormalExit))
     }
     // recalculate valid locality levels and waits when executor is lost
     recomputeLocality()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 06f5438433b6e14c4c665d3884762692b738ae4a..d94743677783f6d14fa98b7e0e81c6ab6f6b1cae 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.ExecutorLossReason
 import org.apache.spark.util.{SerializableBuffer, Utils}
 
 private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
@@ -70,7 +71,8 @@ private[spark] object CoarseGrainedClusterMessages {
 
   case object StopExecutors extends CoarseGrainedClusterMessage
 
-  case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
+  case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
+    extends CoarseGrainedClusterMessage
 
   case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
 
@@ -92,6 +94,10 @@ private[spark] object CoarseGrainedClusterMessages {
       hostToLocalTaskCount: Map[String, Int])
     extends CoarseGrainedClusterMessage
 
+  // Check if an executor was force-killed but for a normal reason.
+  // This could be the case if the executor is preempted, for instance.
+  case class GetExecutorLossReason(executorId: String) extends CoarseGrainedClusterMessage
+
   case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
 
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5730a87f960a0fab18dbf5d6656429998dc898d4..18771f79b44bb021fdfe7fd38a69a448dde6a58b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rpc._
 import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.ENDPOINT_NAME
 import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
 
 /**
@@ -82,7 +83,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     override protected def log = CoarseGrainedSchedulerBackend.this.log
 
-    private val addressToExecutorId = new HashMap[RpcAddress, String]
+    protected val addressToExecutorId = new HashMap[RpcAddress, String]
 
     private val reviveThread =
       ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
@@ -128,6 +129,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+
       case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
         if (executorDataMap.contains(executorId)) {
@@ -185,8 +187,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-      addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_,
-        "remote Rpc client disassociated"))
+      addressToExecutorId
+        .get(remoteAddress)
+        .foreach(removeExecutor(_, SlaveLost("remote Rpc client disassociated")))
     }
 
     // Make fake resource offers on just one executor
@@ -227,7 +230,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // Remove a disconnected slave from the cluster
-    def removeExecutor(executorId: String, reason: String): Unit = {
+    def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
@@ -239,9 +242,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
-          scheduler.executorLost(executorId, SlaveLost(reason))
+          scheduler.executorLost(executorId, reason)
           listenerBus.post(
-            SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
+            SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
         case None => logInfo(s"Asked to remove non-existent executor $executorId")
       }
     }
@@ -263,8 +266,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // TODO (prashant) send conf instead of properties
-    driverEndpoint = rpcEnv.setupEndpoint(
-      CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
+    driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
+  }
+
+  protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+    new DriverEndpoint(rpcEnv, properties)
   }
 
   def stopExecutors() {
@@ -304,7 +310,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   // Called by subclasses when notified of a lost worker
-  def removeExecutor(executorId: String, reason: String) {
+  def removeExecutor(executorId: String, reason: ExecutorLossReason) {
     try {
       driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
     } catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index bbe51b4a09a22feb6e5416d001b340db69d866ed..27491ecf8b97d269a4ee8c5f0e0e61837486e996 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
+import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
 private[spark] class SparkDeploySchedulerBackend(
@@ -135,11 +135,11 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) {
     val reason: ExecutorLossReason = exitStatus match {
-      case Some(code) => ExecutorExited(code)
+      case Some(code) => ExecutorExited(code, isNormalExit = true, message)
       case None => SlaveLost(message)
     }
     logInfo("Executor %s removed: %s".format(fullId, message))
-    removeExecutor(fullId.split("/")(1), reason.toString)
+    removeExecutor(fullId.split("/")(1), reason)
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 044f6288fabdde0bde6d610cc83fca9dceebc7e5..6a4b536dee191de224b84594466e6c2965dc51ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.scheduler.cluster
 
+import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.{Future, ExecutionContext}
 
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler._
 import org.apache.spark.ui.JettyUtils
 import org.apache.spark.util.{ThreadUtils, RpcUtils}
 
@@ -43,8 +44,10 @@ private[spark] abstract class YarnSchedulerBackend(
 
   protected var totalExpectedExecutors = 0
 
-  private val yarnSchedulerEndpoint = rpcEnv.setupEndpoint(
-    YarnSchedulerBackend.ENDPOINT_NAME, new YarnSchedulerEndpoint(rpcEnv))
+  private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
+
+  private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
+    YarnSchedulerBackend.ENDPOINT_NAME, yarnSchedulerEndpoint)
 
   private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
 
@@ -53,7 +56,7 @@ private[spark] abstract class YarnSchedulerBackend(
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    yarnSchedulerEndpoint.askWithRetry[Boolean](
+    yarnSchedulerEndpointRef.askWithRetry[Boolean](
       RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
@@ -61,7 +64,7 @@ private[spark] abstract class YarnSchedulerBackend(
    * Request that the ApplicationMaster kill the specified executors.
    */
   override def doKillExecutors(executorIds: Seq[String]): Boolean = {
-    yarnSchedulerEndpoint.askWithRetry[Boolean](KillExecutors(executorIds))
+    yarnSchedulerEndpointRef.askWithRetry[Boolean](KillExecutors(executorIds))
   }
 
   override def sufficientResourcesRegistered(): Boolean = {
@@ -90,6 +93,41 @@ private[spark] abstract class YarnSchedulerBackend(
     }
   }
 
+  override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
+    new YarnDriverEndpoint(rpcEnv, properties)
+  }
+
+  /**
+   * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
+   * This endpoint communicates with the executors and queries the AM for an executor's exit
+   * status when the executor is disconnected.
+   */
+  private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
+      extends DriverEndpoint(rpcEnv, sparkProperties) {
+
+    /**
+     * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint
+     * handles it by assuming the Executor was lost for a bad reason and removes the executor
+     * immediately.
+     *
+     * In YARN's case however it is crucial to talk to the application master and ask why the
+     * executor had exited. In particular, the executor may have exited due to the executor
+     * having been preempted. If the executor "exited normally" according to the application
+     * master then we pass that information down to the TaskSetManager to inform the
+     * TaskSetManager that tasks on that lost executor should not count towards a job failure.
+     *
+     * TODO there's a race condition where while we are querying the ApplicationMaster for
+     * the executor loss reason, there is the potential that tasks will be scheduled on
+     * the executor that failed. We should fix this by having this onDisconnected event
+     * also "blacklist" executors so that tasks are not assigned to them.
+     */
+    override def onDisconnected(rpcAddress: RpcAddress): Unit = {
+      addressToExecutorId.get(rpcAddress).foreach { executorId =>
+        yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+      }
+    }
+  }
+
   /**
    * An [[RpcEndpoint]] that communicates with the ApplicationMaster.
    */
@@ -101,6 +139,33 @@ private[spark] abstract class YarnSchedulerBackend(
       ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
     implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
 
+    private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
+        executorId: String,
+        executorRpcAddress: RpcAddress): Unit = {
+      amEndpoint match {
+        case Some(am) =>
+          val lossReasonRequest = GetExecutorLossReason(executorId)
+          val future = am.ask[ExecutorLossReason](lossReasonRequest, askTimeout)
+          future onSuccess {
+            case reason: ExecutorLossReason => {
+              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
+            }
+          }
+          future onFailure {
+            case NonFatal(e) => {
+              logWarning(s"Attempted to get executor loss reason" +
+                s" for executor id ${executorId} at RPC address ${executorRpcAddress}," +
+                s" but got no response. Marking as slave lost.", e)
+              driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, SlaveLost()))
+            }
+            case t => throw t
+          }
+        case None =>
+          logWarning("Attempted to check for an executor loss reason" +
+            " before the AM has registered!")
+      }
+    }
+
     override def receive: PartialFunction[Any, Unit] = {
       case RegisterClusterManager(am) =>
         logInfo(s"ApplicationMaster registered as $am")
@@ -113,6 +178,7 @@ private[spark] abstract class YarnSchedulerBackend(
         removeExecutor(executorId, reason)
     }
 
+
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
       case r: RequestExecutors =>
         amEndpoint match {
@@ -143,7 +209,6 @@ private[spark] abstract class YarnSchedulerBackend(
             logWarning("Attempted to kill executors before the AM has registered!")
             context.reply(false)
         }
-
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 452c32d5411cddcdf32b4385f8f1c73c1ce4a143..65df8874774ca5ac8e3226205c2009e27ad8d572 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -32,7 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcAddress
-import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
 
@@ -364,7 +364,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         if (slaveIdToTaskId.containsKey(slaveId)) {
           val taskId: Int = slaveIdToTaskId.get(slaveId)
           taskIdToSlaveId.remove(taskId)
-          removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
+          removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
         }
         // TODO: This assumes one Spark executor per Mesos slave,
         // which may no longer be true after SPARK-5095
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 2e424054be78514fead41e43e5013e89d4f9890b..18da6d249128057f6e92ee1ef6542841441a16da 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -390,7 +390,7 @@ private[spark] class MesosSchedulerBackend(
                             slaveId: SlaveID, status: Int) {
     logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
                                                                  slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status))
+    recordSlaveLost(d, slaveId, ExecutorExited(status, isNormalExit = false))
   }
 
   override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index cbc94fd6d54d9030a238ab3f3ff35ff61d169025..24f78744ad74cb364791c2ff90b20701d5681819 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -362,8 +362,9 @@ private[spark] object JsonProtocol {
         ("Stack Trace" -> stackTrace) ~
         ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
         ("Metrics" -> metrics)
-      case ExecutorLostFailure(executorId) =>
-        ("Executor ID" -> executorId)
+      case ExecutorLostFailure(executorId, isNormalExit) =>
+        ("Executor ID" -> executorId) ~
+        ("Normal Exit" -> isNormalExit)
       case _ => Utils.emptyJson
     }
     ("Reason" -> reason) ~ json
@@ -794,8 +795,10 @@ private[spark] object JsonProtocol {
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled
       case `executorLostFailure` =>
+        val isNormalExit = Utils.jsonOption(json \ "Normal Exit").
+          map(_.extract[Boolean])
         val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String])
-        ExecutorLostFailure(executorId.getOrElse("Unknown"))
+        ExecutorLostFailure(executorId.getOrElse("Unknown"), isNormalExit.getOrElse(false))
       case `unknownReason` => UnknownReason
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index edbdb485c5ea417e73192168dde68466a0079fcb..f0eadf240943e8764e4f8a538461b1378081c8f1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -334,7 +334,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
 
     // Now mark host2 as dead
     sched.removeExecutor("exec2")
-    manager.executorLost("exec2", "host2")
+    manager.executorLost("exec2", "host2", SlaveLost())
 
     // nothing should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY) === None)
@@ -504,13 +504,36 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Array(PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY)))
     // test if the valid locality is recomputed when the executor is lost
     sched.removeExecutor("execC")
-    manager.executorLost("execC", "host2")
+    manager.executorLost("execC", "host2", SlaveLost())
     assert(manager.myLocalityLevels.sameElements(Array(NODE_LOCAL, NO_PREF, ANY)))
     sched.removeExecutor("execD")
-    manager.executorLost("execD", "host1")
+    manager.executorLost("execD", "host1", SlaveLost())
     assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
   }
 
+  test("Executors are added but exit normally while running tasks") {
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc)
+    val taskSet = FakeTask.createTaskSet(4,
+      Seq(TaskLocation("host1", "execA")),
+      Seq(TaskLocation("host1", "execB")),
+      Seq(TaskLocation("host2", "execC")),
+      Seq())
+    val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+    sched.addExecutor("execA", "host1")
+    manager.executorAdded()
+    sched.addExecutor("execC", "host2")
+    manager.executorAdded()
+    assert(manager.resourceOffer("exec1", "host1", ANY).isDefined)
+    sched.removeExecutor("execA")
+    manager.executorLost("execA", "host1", ExecutorExited(143, true, "Normal termination"))
+    assert(!sched.taskSetsFailed.contains(taskSet.id))
+    assert(manager.resourceOffer("execC", "host2", ANY).isDefined)
+    sched.removeExecutor("execC")
+    manager.executorLost("execC", "host2", ExecutorExited(1, false, "Abnormal termination"))
+    assert(sched.taskSetsFailed.contains(taskSet.id))
+  }
+
   test("test RACK_LOCAL tasks") {
     // Assign host1 to rack1
     FakeRackUtil.assignHostToRack("host1", "rack1")
@@ -721,8 +744,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(manager.resourceOffer("execB.2", "host2", ANY) !== None)
     sched.removeExecutor("execA")
     sched.removeExecutor("execB.2")
-    manager.executorLost("execA", "host1")
-    manager.executorLost("execB.2", "host2")
+    manager.executorLost("execA", "host1", SlaveLost())
+    manager.executorLost("execB.2", "host2", SlaveLost())
     clock.advance(LOCALITY_WAIT_MS * 4)
     sched.addExecutor("execC", "host3")
     manager.executorAdded()
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 343a4139b0ca884b29bb29d1f4c282b6d61fc393..47e548ef0d442e90935d9742974edb67fb1884f2 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -151,7 +151,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testTaskEndReason(exceptionFailure)
     testTaskEndReason(TaskResultLost)
     testTaskEndReason(TaskKilled)
-    testTaskEndReason(ExecutorLostFailure("100"))
+    testTaskEndReason(ExecutorLostFailure("100", true))
     testTaskEndReason(UnknownReason)
 
     // BlockId
@@ -295,10 +295,10 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("ExecutorLostFailure backward compatibility") {
     // ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property.
-    val executorLostFailure = ExecutorLostFailure("100")
+    val executorLostFailure = ExecutorLostFailure("100", true)
     val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
       .removeField({ _._1 == "Executor ID" })
-    val expectedExecutorLostFailure = ExecutorLostFailure("Unknown")
+    val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true)
     assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
@@ -577,8 +577,10 @@ class JsonProtocolSuite extends SparkFunSuite {
         assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
       case (TaskResultLost, TaskResultLost) =>
       case (TaskKilled, TaskKilled) =>
-      case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
+      case (ExecutorLostFailure(execId1, isNormalExit1),
+          ExecutorLostFailure(execId2, isNormalExit2)) =>
         assert(execId1 === execId2)
+        assert(isNormalExit1 === isNormalExit2)
       case (UnknownReason, UnknownReason) =>
       case _ => fail("Task end reasons don't match in types!")
     }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 991b5cec00bd8c3c6053a939499c6ccd72cf3437..93621b44c9183ca9f738e1e494696489869ea1f6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -590,6 +590,13 @@ private[spark] class ApplicationMaster(
           case None => logWarning("Container allocator is not ready to kill executors yet.")
         }
         context.reply(true)
+
+      case GetExecutorLossReason(eid) =>
+        Option(allocator) match {
+          case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
+          case None => logWarning(s"Container allocator is not ready to find" +
+            s" executor loss reasons yet.")
+        }
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5f897cbcb4e9f74bdbfa511ac32176f6a8c7f521..fd88b8b7fe3b97a4107a3c374cea6041b798f2a6 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -21,8 +21,9 @@ import java.util.Collections
 import java.util.concurrent._
 import java.util.regex.Pattern
 
-import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConverters._
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 
@@ -36,8 +37,9 @@ import org.apache.log4j.{Level, Logger}
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
+import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.util.Utils
 
 /**
@@ -93,6 +95,11 @@ private[yarn] class YarnAllocator(
       sparkConf.getInt("spark.executor.instances", YarnSparkHadoopUtil.DEFAULT_NUMBER_EXECUTORS)
     }
 
+  // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
+  // list of requesters that should be responded to once we find out why the given executor
+  // was lost.
+  private val pendingLossReasonRequests = new HashMap[String, mutable.Buffer[RpcCallContext]]
+
   // Keep track of which container is running which executor to remove the executors later
   // Visible for testing.
   private[yarn] val executorIdToContainer = new HashMap[String, Container]
@@ -235,9 +242,7 @@ private[yarn] class YarnAllocator(
     val completedContainers = allocateResponse.getCompletedContainersStatuses()
     if (completedContainers.size > 0) {
       logDebug("Completed %d containers".format(completedContainers.size))
-
       processCompletedContainers(completedContainers.asScala)
-
       logDebug("Finished processing %d completed containers. Current running executor count: %d."
         .format(completedContainers.size, numExecutorsRunning))
     }
@@ -429,7 +434,7 @@ private[yarn] class YarnAllocator(
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
       val alreadyReleased = releasedContainers.remove(containerId)
-      if (!alreadyReleased) {
+      val exitReason = if (!alreadyReleased) {
         // Decrement the number of executors running. The next iteration of
         // the ApplicationMaster's reporting thread will take care of allocating.
         numExecutorsRunning -= 1
@@ -440,22 +445,42 @@ private[yarn] class YarnAllocator(
         // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
         // there are some exit status' we shouldn't necessarily count against us, but for
         // now I think its ok as none of the containers are expected to exit
-        if (completedContainer.getExitStatus == ContainerExitStatus.PREEMPTED) {
-          logInfo("Container preempted: " + containerId)
-        } else if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
-          logWarning(memLimitExceededLogMessage(
-            completedContainer.getDiagnostics,
-            VMEM_EXCEEDED_PATTERN))
-        } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
-          logWarning(memLimitExceededLogMessage(
-            completedContainer.getDiagnostics,
-            PMEM_EXCEEDED_PATTERN))
-        } else if (completedContainer.getExitStatus != 0) {
-          logInfo("Container marked as failed: " + containerId +
-            ". Exit status: " + completedContainer.getExitStatus +
-            ". Diagnostics: " + completedContainer.getDiagnostics)
-          numExecutorsFailed += 1
+        val exitStatus = completedContainer.getExitStatus
+        val (isNormalExit, containerExitReason) = exitStatus match {
+          case ContainerExitStatus.SUCCESS =>
+            (true, s"Executor for container $containerId exited normally.")
+          case ContainerExitStatus.PREEMPTED =>
+            // Preemption should count as a normal exit, since YARN preempts containers merely
+            // to do resource sharing, and tasks that fail due to preempted executors could
+            // just as easily finish on any other executor. See SPARK-8167.
+            (true, s"Container $containerId was preempted.")
+          // Should probably still count memory exceeded exit codes towards task failures
+          case VMEM_EXCEEDED_EXIT_CODE =>
+            (false, memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              VMEM_EXCEEDED_PATTERN))
+          case PMEM_EXCEEDED_EXIT_CODE =>
+            (false, memLimitExceededLogMessage(
+              completedContainer.getDiagnostics,
+              PMEM_EXCEEDED_PATTERN))
+          case unknown =>
+            numExecutorsFailed += 1
+            (false, "Container marked as failed: " + containerId +
+              ". Exit status: " + completedContainer.getExitStatus +
+              ". Diagnostics: " + completedContainer.getDiagnostics)
+
+        }
+        if (isNormalExit) {
+          logInfo(containerExitReason)
+        } else {
+          logWarning(containerExitReason)
         }
+        ExecutorExited(0, isNormalExit, containerExitReason)
+      } else {
+        // If we have already released this container, then it must mean
+        // that the driver has explicitly requested it to be killed
+        ExecutorExited(completedContainer.getExitStatus, isNormalExit = true,
+          s"Container $containerId exited from explicit termination request.")
       }
 
       if (allocatedContainerToHostMap.contains(containerId)) {
@@ -474,18 +499,35 @@ private[yarn] class YarnAllocator(
 
       containerIdToExecutorId.remove(containerId).foreach { eid =>
         executorIdToContainer.remove(eid)
-
+        pendingLossReasonRequests.remove(eid).foreach { pendingRequests =>
+          // Notify application of executor loss reasons so it can decide whether it should abort
+          pendingRequests.foreach(_.reply(exitReason))
+        }
         if (!alreadyReleased) {
           // The executor could have gone away (like no route to host, node failure, etc)
           // Notify backend about the failure of the executor
           numUnexpectedContainerRelease += 1
-          driverRef.send(RemoveExecutor(eid,
-            s"Yarn deallocated the executor $eid (container $containerId)"))
+          driverRef.send(RemoveExecutor(eid, exitReason))
         }
       }
     }
   }
 
+  /**
+   * Register that some RpcCallContext has asked the AM why the executor was lost. Note that
+   * we can only find the loss reason to send back in the next call to allocateResources().
+   */
+  private[yarn] def enqueueGetLossReasonRequest(
+      eid: String,
+      context: RpcCallContext): Unit = synchronized {
+    if (executorIdToContainer.contains(eid)) {
+      pendingLossReasonRequests
+        .getOrElseUpdate(eid, new ArrayBuffer[RpcCallContext]) += context
+    } else {
+      logWarning(s"Tried to get the loss reason for non-existent executor $eid")
+    }
+  }
+
   private def internalReleaseContainer(container: Container): Unit = {
     releasedContainers.add(container.getId())
     amClient.releaseAssignedContainer(container.getId())
@@ -501,6 +543,8 @@ private object YarnAllocator {
     Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used")
   val VMEM_EXCEEDED_PATTERN =
     Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used")
+  val VMEM_EXCEEDED_EXIT_CODE = -103
+  val PMEM_EXCEEDED_EXIT_CODE = -104
 
   def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = {
     val matcher = pattern.matcher(diagnostics)