diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index c1a91c27eef2d96bd6d8f27242518d3f65c9821b..49a319abb3238c2bc55eff4cdb5d82cf9f729505 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -158,6 +158,8 @@ private[deploy] object DeployMessages {
 
   case class ApplicationRemoved(message: String)
 
+  case class WorkerRemoved(id: String, host: String, message: String)
+
   // DriverClient <-> Master
 
   case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
index 93f58ce63799fcb422413351cf9d19ca136a756b..757c930b84eb27bb849f8d030a001178d8e80397 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala
@@ -182,6 +182,10 @@ private[spark] class StandaloneAppClient(
           listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost)
         }
 
+      case WorkerRemoved(id, host, message) =>
+        logInfo("Master removed worker %s: %s".format(id, message))
+        listener.workerRemoved(id, host, message)
+
       case MasterChanged(masterRef, masterWebUiUrl) =>
         logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
         master = Some(masterRef)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
index 64255ec92b72a762727d256bc8f79907b376dbe1..d8bc1a883def13f389f184eaba461ef3148aaba4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.deploy.client
 
 /**
- * Callbacks invoked by deploy client when various events happen. There are currently four events:
- * connecting to the cluster, disconnecting, being given an executor, and having an executor
- * removed (either due to failure or due to revocation).
+ * Callbacks invoked by deploy client when various events happen. There are currently five events:
+ * connecting to the cluster, disconnecting, being given an executor, having an executor removed
+ * (either due to failure or due to revocation), and having a worker removed.
  *
  * Users of this API should *not* block inside the callback methods.
  */
@@ -38,4 +38,6 @@ private[spark] trait StandaloneAppClientListener {
 
   def executorRemoved(
       fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit
+
+  def workerRemoved(workerId: String, host: String, message: String): Unit
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f10a41286c52fa4047ad922b9abc6b47c51c1cbc..c192a0cc82ef6b2d7a8d1882181157b69cda2376 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -498,7 +498,7 @@ private[deploy] class Master(
   override def onDisconnected(address: RpcAddress): Unit = {
     // The disconnected client could've been either a worker or an app; remove whichever it was
     logInfo(s"$address got disassociated, removing it.")
-    addressToWorker.get(address).foreach(removeWorker)
+    addressToWorker.get(address).foreach(removeWorker(_, s"${address} got disassociated"))
     addressToApp.get(address).foreach(finishApplication)
     if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
   }
@@ -544,7 +544,8 @@ private[deploy] class Master(
     state = RecoveryState.COMPLETING_RECOVERY
 
     // Kill off any workers and apps that didn't respond to us.
-    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
+    workers.filter(_.state == WorkerState.UNKNOWN).foreach(
+      removeWorker(_, "Not responding for recovery"))
     apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
 
     // Update the state of recovered apps to RUNNING
@@ -755,7 +756,7 @@ private[deploy] class Master(
       if (oldWorker.state == WorkerState.UNKNOWN) {
         // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
         // The old worker must thus be dead, so we will remove it and accept the new worker.
-        removeWorker(oldWorker)
+        removeWorker(oldWorker, "Worker replaced by a new worker with same address")
       } else {
         logInfo("Attempted to re-register worker at same address: " + workerAddress)
         return false
@@ -771,7 +772,7 @@ private[deploy] class Master(
     true
   }
 
-  private def removeWorker(worker: WorkerInfo) {
+  private def removeWorker(worker: WorkerInfo, msg: String) {
     logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
     worker.setState(WorkerState.DEAD)
     idToWorker -= worker.id
@@ -795,6 +796,10 @@ private[deploy] class Master(
         removeDriver(driver.id, DriverState.ERROR, None)
       }
     }
+    logInfo(s"Telling app of lost worker: " + worker.id)
+    apps.filterNot(completedApps.contains(_)).foreach { app =>
+      app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
+    }
     persistenceEngine.removeWorker(worker)
   }
 
@@ -979,7 +984,7 @@ private[deploy] class Master(
       if (worker.state != WorkerState.DEAD) {
         logWarning("Removing %s because we got no heartbeat in %d seconds".format(
           worker.id, WORKER_TIMEOUT_MS / 1000))
-        removeWorker(worker)
+        removeWorker(worker, s"Not receiving heartbeat for ${WORKER_TIMEOUT_MS / 1000} seconds")
       } else {
         if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
           workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index fafe9cafdc18f1a7faa9b150fe2dcac2e15284b5..3422a5f204b12fd393a54f9997602ae5010d6ee2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -259,6 +259,13 @@ class DAGScheduler(
     eventProcessLoop.post(ExecutorLost(execId, reason))
   }
 
+  /**
+   * Called by TaskScheduler implementation when a worker is removed.
+   */
+  def workerRemoved(workerId: String, host: String, message: String): Unit = {
+    eventProcessLoop.post(WorkerRemoved(workerId, host, message))
+  }
+
   /**
    * Called by TaskScheduler implementation when a host is added.
    */
@@ -1432,6 +1439,26 @@ class DAGScheduler(
     }
   }
 
+  /**
+   * Responds to a worker being removed. This is called inside the event loop, so it assumes it can
+   * modify the scheduler's internal state. Use workerRemoved() to post a loss event from outside.
+   *
+   * We will assume that we've lost all shuffle blocks associated with the host if a worker is
+   * removed, so we will remove them all from MapStatus.
+   *
+   * @param workerId identifier of the worker that is removed.
+   * @param host host of the worker that is removed.
+   * @param message the reason why the worker is removed.
+   */
+  private[scheduler] def handleWorkerRemoved(
+      workerId: String,
+      host: String,
+      message: String): Unit = {
+    logInfo("Shuffle files lost for worker %s on host %s".format(workerId, host))
+    mapOutputTracker.removeOutputsOnHost(host)
+    clearCacheLocs()
+  }
+
   private[scheduler] def handleExecutorAdded(execId: String, host: String) {
     // remove from failedEpoch(execId) ?
     if (failedEpoch.contains(execId)) {
@@ -1727,6 +1754,9 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
       }
       dagScheduler.handleExecutorLost(execId, workerLost)
 
+    case WorkerRemoved(workerId, host, message) =>
+      dagScheduler.handleWorkerRemoved(workerId, host, message)
+
     case BeginEvent(task, taskInfo) =>
       dagScheduler.handleBeginEvent(task, taskInfo)
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index cda0585f154a92135abf23a7964d235860a84c0f..3f8d5639a2b908591c59cc3e4956285c1a20b614 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -86,6 +86,9 @@ private[scheduler] case class ExecutorAdded(execId: String, host: String) extend
 private[scheduler] case class ExecutorLost(execId: String, reason: ExecutorLossReason)
   extends DAGSchedulerEvent
 
+private[scheduler] case class WorkerRemoved(workerId: String, host: String, message: String)
+  extends DAGSchedulerEvent
+
 private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String, exception: Option[Throwable])
   extends DAGSchedulerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 3de7d1f7de22b37552c0f0e4be29ec90b7ebfb8c..90644fea23ab14276d08c978dbfd494d3f57a918 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -89,6 +89,11 @@ private[spark] trait TaskScheduler {
    */
   def executorLost(executorId: String, reason: ExecutorLossReason): Unit
 
+  /**
+   * Process a removed worker
+   */
+  def workerRemoved(workerId: String, host: String, message: String): Unit
+
   /**
    * Get an application's attempt ID associated with the job.
    *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 629cfc7c7a8ceea949429e8071e52f351112dd44..bba0b294f1afb7b22b4a079743a9e72c6f5d1cf4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -569,6 +569,11 @@ private[spark] class TaskSchedulerImpl private[scheduler](
     }
   }
 
+  override def workerRemoved(workerId: String, host: String, message: String): Unit = {
+    logInfo(s"Handle removed worker $workerId: $message")
+    dagScheduler.workerRemoved(workerId, host, message)
+  }
+
   private def logExecutorLoss(
       executorId: String,
       hostPort: String,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 6b49bd699a13a318321c273d69b6a5d7147f9079..89a9ad6811e18206e021f66447e9421b955c4ac4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -85,6 +85,9 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RemoveExecutor(executorId: String, reason: ExecutorLossReason)
     extends CoarseGrainedClusterMessage
 
+  case class RemoveWorker(workerId: String, host: String, message: String)
+    extends CoarseGrainedClusterMessage
+
   case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage
 
   // Exchanged between the driver and the AM in Yarn client mode
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index dc82bb7704727ca0229c5861a66c8407629a25a3..0b396b794ddcef93606919eff12994742ea3c6ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -219,6 +219,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
         removeExecutor(executorId, reason)
         context.reply(true)
 
+      case RemoveWorker(workerId, host, message) =>
+        removeWorker(workerId, host, message)
+        context.reply(true)
+
       case RetrieveSparkAppConfig =>
         val reply = SparkAppConfig(sparkProperties,
           SparkEnv.get.securityManager.getIOEncryptionKey())
@@ -231,8 +235,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
         // Filter out executors under killing
         val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
-        val workOffers = activeExecutors.map { case (id, executorData) =>
-          new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
+        val workOffers = activeExecutors.map {
+          case (id, executorData) =>
+            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
         }.toIndexedSeq
         scheduler.resourceOffers(workOffers)
       }
@@ -331,6 +336,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       }
     }
 
+    // Remove a lost worker from the cluster
+    private def removeWorker(workerId: String, host: String, message: String): Unit = {
+      logDebug(s"Asked to remove worker $workerId with reason $message")
+      scheduler.workerRemoved(workerId, host, message)
+    }
+
     /**
      * Stop making resource offers for the given executor. The executor is marked as lost with
      * the loss reason still pending.
@@ -449,8 +460,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    */
   protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
     // Only log the failure since we don't care about the result.
-    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { case t =>
-      logError(t.getMessage, t)
+    driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure {
+      case t => logError(t.getMessage, t)
+    }(ThreadUtils.sameThread)
+  }
+
+  protected def removeWorker(workerId: String, host: String, message: String): Unit = {
+    driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure {
+      case t => logError(t.getMessage, t)
     }(ThreadUtils.sameThread)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 0529fe9eed4dae5499aea1c68f5de0c0714bce47..fd8e64454bf70b85f642ca8375e9724ad4cb3ac0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -161,6 +161,11 @@ private[spark] class StandaloneSchedulerBackend(
     removeExecutor(fullId.split("/")(1), reason)
   }
 
+  override def workerRemoved(workerId: String, host: String, message: String): Unit = {
+    logInfo("Worker %s removed: %s".format(workerId, message))
+    removeWorker(workerId, host, message)
+  }
+
   override def sufficientResourcesRegistered(): Boolean = {
     totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
index 936639b845789b12b57318370d8de20cd43462c8..a1707e6540b39141cbb0f029f8c9f0dd3cdbec0a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala
@@ -214,6 +214,8 @@ class AppClientSuite
         id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = {
       execRemovedList.add(id)
     }
+
+    def workerRemoved(workerId: String, host: String, message: String): Unit = {}
   }
 
   /** Create AppClient and supporting objects */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index ddd32811067459c3701f66e1f99e695e5288c772..453be26ed8d0c3fa9cf151c9ec99abc026565252 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -131,6 +131,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
     override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+    override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
     override def applicationAttemptId(): Option[String] = None
   }
 
@@ -632,6 +633,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
           blockManagerId: BlockManagerId): Boolean = true
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+      override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
       override def applicationAttemptId(): Option[String] = None
     }
     val noKillScheduler = new DAGScheduler(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index ba56af8215cd7a82bffc43aee6f3e8ed4734f408..a4e4ea7cd2894b0e8ff839434a86ce05c04a8d8b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -84,6 +84,7 @@ private class DummyTaskScheduler extends TaskScheduler {
   override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
   override def defaultParallelism(): Int = 2
   override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+  override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
   override def applicationAttemptId(): Option[String] = None
   def executorHeartbeatReceived(
       execId: String,