diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 47a5cbff4930bb78cd14bc678b73e1836ca7a97b..7e1197d742802c5a20a46781611f9f406710c05b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,8 @@ private[spark] object ExecutorExited {
   }
 }
 
+private[spark] object ExecutorKilled extends ExecutorLossReason("Executor killed by driver.")
+
 /**
  * A loss reason that means we don't yet know why the executor exited.
  *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bf0419db1f75e44869f73e51485a07bd831eff20..bdf19f9f277d9b1355e4f914df5ee98435aefe3e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -470,25 +470,25 @@ private[spark] class TaskSchedulerImpl(
     synchronized {
       if (executorIdToTaskCount.contains(executorId)) {
         val hostPort = executorIdToHost(executorId)
-        logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
+        logExecutorLoss(executorId, hostPort, reason)
         removeExecutor(executorId, reason)
         failedExecutor = Some(executorId)
       } else {
-         executorIdToHost.get(executorId) match {
-           case Some(_) =>
-             // If the host mapping still exists, it means we don't know the loss reason for the
-             // executor. So call removeExecutor() to update tasks running on that executor when
-             // the real loss reason is finally known.
-             logError(s"Actual reason for lost executor $executorId: ${reason.message}")
-             removeExecutor(executorId, reason)
-
-           case None =>
-             // We may get multiple executorLost() calls with different loss reasons. For example,
-             // one may be triggered by a dropped connection from the slave while another may be a
-             // report of executor termination from Mesos. We produce log messages for both so we
-             // eventually report the termination reason.
-             logError("Lost an executor " + executorId + " (already removed): " + reason)
-         }
+        executorIdToHost.get(executorId) match {
+          case Some(hostPort) =>
+            // If the host mapping still exists, it means we don't know the loss reason for the
+            // executor. So call removeExecutor() to update tasks running on that executor when
+            // the real loss reason is finally known.
+            logExecutorLoss(executorId, hostPort, reason)
+            removeExecutor(executorId, reason)
+
+          case None =>
+            // We may get multiple executorLost() calls with different loss reasons. For example,
+            // one may be triggered by a dropped connection from the slave while another may be a
+            // report of executor termination from Mesos. We produce log messages for both so we
+            // eventually report the termination reason.
+            logError(s"Lost an executor $executorId (already removed): $reason")
+        }
       }
     }
     // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
@@ -498,6 +498,18 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
+  private def logExecutorLoss(
+      executorId: String,
+      hostPort: String,
+      reason: ExecutorLossReason): Unit = reason match {
+    case LossReasonPending =>
+      logDebug(s"Executor $executorId on $hostPort lost, but reason not yet known.")
+    case ExecutorKilled =>
+      logInfo(s"Executor $executorId on $hostPort killed by driver.")
+    case _ =>
+      logError(s"Lost executor $executorId on $hostPort: $reason")
+  }
+
   /**
    * Remove an executor from all our data structures and mark it as lost. If the executor's loss
    * reason is not yet known, do not yet remove its association with its host nor update the status
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 114468c48c44c842750c692be0693765e655c4af..a02f3017cb6e9482425c35eccd953d4c6473eefc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -800,6 +800,7 @@ private[spark] class TaskSetManager(
     for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
       val exitCausedByApp: Boolean = reason match {
         case exited: ExecutorExited => exited.exitCausedByApp
+        case ExecutorKilled => false
         case _ => true
       }
       handleFailedTask(tid, TaskState.FAILED, ExecutorLostFailure(info.executorId, exitCausedByApp,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6f0c910c009a5e4edb5340fc2e79425fed37e768..505c161141c8801873571f7699ae8a9f3e39ddef 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -64,8 +64,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   private val listenerBus = scheduler.sc.listenerBus
 
-  // Executors we have requested the cluster manager to kill that have not died yet
-  private val executorsPendingToRemove = new HashSet[String]
+  // Executors we have requested the cluster manager to kill that have not died yet; maps
+  // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
+  // be considered an app-related failure).
+  private val executorsPendingToRemove = new HashMap[String, Boolean]
 
   // A map to store hostname with its possible task number running on it
   protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
@@ -250,15 +252,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
-          CoarseGrainedSchedulerBackend.this.synchronized {
+          val killed = CoarseGrainedSchedulerBackend.this.synchronized {
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
-            executorsPendingToRemove -= executorId
             executorsPendingLossReason -= executorId
+            executorsPendingToRemove.remove(executorId).getOrElse(false)
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
-          scheduler.executorLost(executorId, reason)
+          scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason)
           listenerBus.post(
             SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString))
         case None => logInfo(s"Asked to remove non-existent executor $executorId")
@@ -459,6 +461,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Request that the cluster manager kill the specified executors.
    *
+   * When asking the executor to be replaced, the executor loss is considered a failure, and
+   * killed tasks that are running on the executor will count towards the failure limits. If no
+   * replacement is being requested, then the tasks will not count towards the limit.
+   *
    * @param executorIds identifiers of executors to kill
    * @param replace whether to replace the killed executors with new ones
    * @param force whether to force kill busy executors
@@ -479,7 +485,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     val executorsToKill = knownExecutors
       .filter { id => !executorsPendingToRemove.contains(id) }
       .filter { id => force || !scheduler.isExecutorBusy(id) }
-    executorsPendingToRemove ++= executorsToKill
+    executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
 
     // If we do not wish to replace the executors we kill, sync the target number of executors
     // with the cluster manager to avoid allocating new ones. When computing the new target,
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 7e39c3ea56af33c4e73954416517069b4fbce8d7..73cd9031f02504c201c905b9757802240c6dfd45 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -481,7 +481,7 @@ private[yarn] class YarnAllocator(
             (true, memLimitExceededLogMessage(
               completedContainer.getDiagnostics,
               PMEM_EXCEEDED_PATTERN))
-          case unknown =>
+          case _ =>
             numExecutorsFailed += 1
             (true, "Container marked as failed: " + containerId + onHostStr +
               ". Exit status: " + completedContainer.getExitStatus +
@@ -493,7 +493,7 @@ private[yarn] class YarnAllocator(
         } else {
           logInfo(containerExitReason)
         }
-        ExecutorExited(0, exitCausedByApp, containerExitReason)
+        ExecutorExited(exitStatus, exitCausedByApp, containerExitReason)
       } else {
         // If we have already released this container, then it must mean
         // that the driver has explicitly requested it to be killed