diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
index 33edf25043850ab7f7fdedeb30245ff40d92d2b1..47a5cbff4930bb78cd14bc678b73e1836ca7a97b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala
@@ -40,6 +40,15 @@ private[spark] object ExecutorExited {
   }
 }
 
+/**
+ * A loss reason that means we don't yet know why the executor exited.
+ *
+ * This is used by the task scheduler to remove state associated with the executor, but
+ * not yet fail any tasks that were running in the executor before the real loss reason
+ * is known.
+ */
+private [spark] object LossReasonPending extends ExecutorLossReason("Pending loss reason.")
+
 private[spark]
 case class SlaveLost(_message: String = "Slave lost")
   extends ExecutorLossReason(_message)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1c7bfe89c02ac4a2b9bfc96a47cc66a5b3ee1f17..43d7d80b7aae14b4b7c973948c0f0836c163c70f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -468,11 +468,20 @@ private[spark] class TaskSchedulerImpl(
         removeExecutor(executorId, reason)
         failedExecutor = Some(executorId)
       } else {
-         // We may get multiple executorLost() calls with different loss reasons. For example, one
-         // may be triggered by a dropped connection from the slave while another may be a report
-         // of executor termination from Mesos. We produce log messages for both so we eventually
-         // report the termination reason.
-         logError("Lost an executor " + executorId + " (already removed): " + reason)
+         executorIdToHost.get(executorId) match {
+           case Some(_) =>
+             // If the host mapping still exists, it means we don't know the loss reason for the
+             // executor. So call removeExecutor() to update tasks running on that executor when
+             // the real loss reason is finally known.
+             removeExecutor(executorId, reason)
+
+           case None =>
+             // We may get multiple executorLost() calls with different loss reasons. For example,
+             // one may be triggered by a dropped connection from the slave while another may be a
+             // report of executor termination from Mesos. We produce log messages for both so we
+             // eventually report the termination reason.
+             logError("Lost an executor " + executorId + " (already removed): " + reason)
+         }
       }
     }
     // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
@@ -482,7 +491,11 @@ private[spark] class TaskSchedulerImpl(
     }
   }
 
-  /** Remove an executor from all our data structures and mark it as lost */
+  /**
+   * Remove an executor from all our data structures and mark it as lost. If the executor's loss
+   * reason is not yet known, do not yet remove its association with its host nor update the status
+   * of any running tasks, since the loss reason defines whether we'll fail those tasks.
+   */
   private def removeExecutor(executorId: String, reason: ExecutorLossReason) {
     activeExecutorIds -= executorId
     val host = executorIdToHost(executorId)
@@ -497,8 +510,11 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
-    executorIdToHost -= executorId
-    rootPool.executorLost(executorId, host, reason)
+
+    if (reason != LossReasonPending) {
+      executorIdToHost -= executorId
+      rootPool.executorLost(executorId, host, reason)
+    }
   }
 
   def executorAdded(execId: String, host: String) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index ebce5021b19dc554f34dc5b7d59d444820c1eb47..f71d98feac0509248fcbd6c18461c78bc986ccf1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -73,6 +73,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // The number of pending tasks which is locality required
   protected var localityAwareTasks = 0
 
+  // Executors that have been lost, but for which we don't yet know the real exit reason.
+  protected val executorsPendingLossReason = new HashSet[String]
+
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -184,7 +187,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     // Make fake resource offers on all executors
     private def makeOffers() {
       // Filter out executors under killing
-      val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
+      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
       val workOffers = activeExecutors.map { case (id, executorData) =>
         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
       }.toSeq
@@ -202,7 +205,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     // Make fake resource offers on just one executor
     private def makeOffers(executorId: String) {
       // Filter out executors under killing
-      if (!executorsPendingToRemove.contains(executorId)) {
+      if (executorIsAlive(executorId)) {
         val executorData = executorDataMap(executorId)
         val workOffers = Seq(
           new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
@@ -210,6 +213,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
 
+    private def executorIsAlive(executorId: String): Boolean = synchronized {
+      !executorsPendingToRemove.contains(executorId) &&
+        !executorsPendingLossReason.contains(executorId)
+    }
+
     // Launch tasks returned by a set of resource offers
     private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
       for (task <- tasks.flatten) {
@@ -246,6 +254,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
             addressToExecutorId -= executorInfo.executorAddress
             executorDataMap -= executorId
             executorsPendingToRemove -= executorId
+            executorsPendingLossReason -= executorId
           }
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
@@ -256,6 +265,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
 
+    /**
+     * Stop making resource offers for the given executor. The executor is marked as lost with
+     * the loss reason still pending.
+     *
+     * @return Whether executor was alive.
+     */
+    protected def disableExecutor(executorId: String): Boolean = {
+      val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
+        if (executorIsAlive(executorId)) {
+          executorsPendingLossReason += executorId
+          true
+        } else {
+          false
+        }
+      }
+
+      if (shouldDisable) {
+        logInfo(s"Disabling executor $executorId.")
+        scheduler.executorLost(executorId, LossReasonPending)
+      }
+
+      shouldDisable
+    }
+
     override def onStop() {
       reviveThread.shutdownNow()
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index d75d6f673e84ef5e05702201694e4b7212547981..80da37b09b590409ac88d3a93f1119c4c83a9311 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -115,15 +115,12 @@ private[spark] abstract class YarnSchedulerBackend(
      * (e.g., preemption), according to the application master, then we pass that information down
      * to the TaskSetManager to inform the TaskSetManager that tasks on that lost executor should
      * not count towards a job failure.
-     *
-     * TODO there's a race condition where while we are querying the ApplicationMaster for
-     * the executor loss reason, there is the potential that tasks will be scheduled on
-     * the executor that failed. We should fix this by having this onDisconnected event
-     * also "blacklist" executors so that tasks are not assigned to them.
      */
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       addressToExecutorId.get(rpcAddress).foreach { executorId =>
-        yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+        if (disableExecutor(executorId)) {
+          yarnSchedulerEndpoint.handleExecutorDisconnectedFromDriver(executorId, rpcAddress)
+        }
       }
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index c2edd4c317d6e6200ea213bdbfb755fb68ef4b50..2afb595e6f10d2cec583f731cbf676ecd50dca70 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -237,4 +237,40 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
     }
   }
 
+  test("tasks are not re-scheduled while executor loss reason is pending") {
+    sc = new SparkContext("local", "TaskSchedulerImplSuite")
+    val taskScheduler = new TaskSchedulerImpl(sc)
+    taskScheduler.initialize(new FakeSchedulerBackend)
+    // Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
+    new DAGScheduler(sc, taskScheduler) {
+      override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
+      override def executorAdded(execId: String, host: String) {}
+    }
+
+    val e0Offers = Seq(new WorkerOffer("executor0", "host0", 1))
+    val e1Offers = Seq(new WorkerOffer("executor1", "host0", 1))
+    val attempt1 = FakeTask.createTaskSet(1)
+
+    // submit attempt 1, offer resources, task gets scheduled
+    taskScheduler.submitTasks(attempt1)
+    val taskDescriptions = taskScheduler.resourceOffers(e0Offers).flatten
+    assert(1 === taskDescriptions.length)
+
+    // mark executor0 as dead but pending fail reason
+    taskScheduler.executorLost("executor0", LossReasonPending)
+
+    // offer some more resources on a different executor, nothing should change
+    val taskDescriptions2 = taskScheduler.resourceOffers(e1Offers).flatten
+    assert(0 === taskDescriptions2.length)
+
+    // provide the actual loss reason for executor0
+    taskScheduler.executorLost("executor0", SlaveLost("oops"))
+
+    // executor0's tasks should have failed now that the loss reason is known, so offering more
+    // resources should make them be scheduled on the new executor.
+    val taskDescriptions3 = taskScheduler.resourceOffers(e1Offers).flatten
+    assert(1 === taskDescriptions3.length)
+    assert("executor1" === taskDescriptions3(0).executorId)
+  }
+
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 12ae350e4cef6ab2cb90030720f6da950858f717..50ae7ffeec4c552c1474af204af5cb7547be8fb8 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -87,8 +87,27 @@ private[spark] class ApplicationMaster(
 
   @volatile private var reporterThread: Thread = _
   @volatile private var allocator: YarnAllocator = _
+
+  // Lock for controlling the allocator (heartbeat) thread.
   private val allocatorLock = new Object()
 
+  // Steady state heartbeat interval. We want to be reasonably responsive without causing too many
+  // requests to RM.
+  private val heartbeatInterval = {
+    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
+    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
+    math.max(0, math.min(expiryInterval / 2,
+      sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
+  }
+
+  // Initial wait interval before allocator poll, to allow for quicker ramp up when executors are
+  // being requested.
+  private val initialAllocationInterval = math.min(heartbeatInterval,
+    sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
+
+  // Next wait interval before allocator poll.
+  private var nextAllocationInterval = initialAllocationInterval
+
   // Fields used in client mode.
   private var rpcEnv: RpcEnv = null
   private var amEndpoint: RpcEndpointRef = _
@@ -332,19 +351,6 @@ private[spark] class ApplicationMaster(
   }
 
   private def launchReporterThread(): Thread = {
-    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
-    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
-
-    // we want to be reasonably responsive without causing too many requests to RM.
-    val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
-      sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
-
-    // we want to check more frequently for pending containers
-    val initialAllocationInterval = math.min(heartbeatInterval,
-      sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
-
-    var nextAllocationInterval = initialAllocationInterval
-
     // The number of failures in a row until Reporter thread give up
     val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
 
@@ -377,19 +383,19 @@ private[spark] class ApplicationMaster(
           }
           try {
             val numPendingAllocate = allocator.getPendingAllocate.size
-            val sleepInterval =
-              if (numPendingAllocate > 0) {
-                val currentAllocationInterval =
-                  math.min(heartbeatInterval, nextAllocationInterval)
-                nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
-                currentAllocationInterval
-              } else {
-                nextAllocationInterval = initialAllocationInterval
-                heartbeatInterval
-              }
-            logDebug(s"Number of pending allocations is $numPendingAllocate. " +
-                     s"Sleeping for $sleepInterval.")
             allocatorLock.synchronized {
+              val sleepInterval =
+                if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
+                  val currentAllocationInterval =
+                    math.min(heartbeatInterval, nextAllocationInterval)
+                  nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
+                  currentAllocationInterval
+                } else {
+                  nextAllocationInterval = initialAllocationInterval
+                  heartbeatInterval
+                }
+              logDebug(s"Number of pending allocations is $numPendingAllocate. " +
+                       s"Sleeping for $sleepInterval.")
               allocatorLock.wait(sleepInterval)
             }
           } catch {
@@ -560,6 +566,11 @@ private[spark] class ApplicationMaster(
     userThread
   }
 
+  private def resetAllocatorInterval(): Unit = allocatorLock.synchronized {
+    nextAllocationInterval = initialAllocationInterval
+    allocatorLock.notifyAll()
+  }
+
   /**
    * An [[RpcEndpoint]] that communicates with the driver's scheduler backend.
    */
@@ -581,11 +592,9 @@ private[spark] class ApplicationMaster(
       case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
         Option(allocator) match {
           case Some(a) =>
-            allocatorLock.synchronized {
-              if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
-                localityAwareTasks, hostToLocalTaskCount)) {
-                allocatorLock.notifyAll()
-              }
+            if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+              localityAwareTasks, hostToLocalTaskCount)) {
+              resetAllocatorInterval()
             }
 
           case None =>
@@ -603,17 +612,19 @@ private[spark] class ApplicationMaster(
 
       case GetExecutorLossReason(eid) =>
         Option(allocator) match {
-          case Some(a) => a.enqueueGetLossReasonRequest(eid, context)
-          case None => logWarning(s"Container allocator is not ready to find" +
-            s" executor loss reasons yet.")
+          case Some(a) =>
+            a.enqueueGetLossReasonRequest(eid, context)
+            resetAllocatorInterval()
+          case None =>
+            logWarning("Container allocator is not ready to find executor loss reasons yet.")
         }
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
-      logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
       // In cluster mode, do not rely on the disassociated event to exit
       // This avoids potentially reporting incorrect exit codes if the driver fails
       if (!isClusterMode) {
+        logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
       }
     }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index a0cf1b4aa469b903b18388fad84bc17be3f20ff6..4d9e777cb413426c945a093888dc0865a810d20f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -550,6 +550,10 @@ private[yarn] class YarnAllocator(
 
   private[yarn] def getNumUnexpectedContainerRelease = numUnexpectedContainerRelease
 
+  private[yarn] def getNumPendingLossReasonRequests: Int = synchronized {
+    pendingLossReasonRequests.size
+  }
+
   /**
    * Split the pending container requests into 3 groups based on current localities of pending
    * tasks.
@@ -582,6 +586,7 @@ private[yarn] class YarnAllocator(
 
     (localityMatched.toSeq, localityUnMatched.toSeq, localityFree.toSeq)
   }
+
 }
 
 private object YarnAllocator {