diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 9f05cb4f353d66acc09400bd760d324c24861a94..1d88d4bc84926bdfe318539e18653ffcb1159d4f 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -30,6 +30,8 @@ case class ExecutorStateChanged(
     exitStatus: Option[Int])
   extends DeployMessage
 
+private[spark] case class Heartbeat(workerId: String) extends DeployMessage
+
 // Master to Worker
 
 private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
@@ -45,7 +47,6 @@ private[spark] case class LaunchExecutor(
     sparkHome: String)
   extends DeployMessage
 
-
 // Client to Master
 
 private[spark] case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 5986281d97e2d4949dbf99af30f83997832af2b8..d9852616003af138915c1f1c04d6de5d60151a88 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -3,6 +3,7 @@ package spark.deploy.master
 import akka.actor._
 import akka.actor.Terminated
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
 
 import java.text.SimpleDateFormat
 import java.util.Date
@@ -16,6 +17,7 @@ import spark.util.AkkaUtils
 
 private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For job IDs
+  val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
 
   var nextJobNumber = 0
   val workers = new HashSet[WorkerInfo]
@@ -46,6 +48,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
     startWebUi()
+    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
   }
 
   def startWebUi() {
@@ -111,6 +114,15 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
       }
     }
 
+    case Heartbeat(workerId) => {
+      idToWorker.get(workerId) match {
+        case Some(workerInfo) =>
+	  workerInfo.lastHeartbeat = System.currentTimeMillis()
+	case None =>
+          logWarning("Got heartbeat from unregistered worker " + workerId)
+      }
+    }
+
     case Terminated(actor) => {
       // The disconnected actor could've been either a worker or a job; remove whichever of
       // those we have an entry for in the corresponding actor hashmap
@@ -219,8 +231,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
     actorToWorker -= worker.actor
     addressToWorker -= worker.actor.path.address
     for (exec <- worker.executors.values) {
-      exec.job.driver ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None, None)
-      exec.job.executors -= exec.id
+      logInfo("Telling job of lost executor: " + exec.id)
+      exec.job.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
+      exec.job.removeExecutor(exec)
     }
   }
 
@@ -259,6 +272,18 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
     nextJobNumber += 1
     jobId
   }
+
+  /** Check for, and remove, any timed-out workers */
+  def timeOutDeadWorkers() {
+    // Copy the workers into an array so we don't modify the hashset while iterating through it
+    val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
+    val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+    for (worker <- toRemove) {
+      logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+        worker.id, WORKER_TIMEOUT))
+      removeWorker(worker)
+    }
+  }
 }
 
 private[spark] object Master {
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index 5a7f5fef8a546812d20719af8d2a0f3dcab1af29..2e467007a0165f5b2a2f449d95c31881174d741d 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -18,6 +18,8 @@ private[spark] class WorkerInfo(
   var coresUsed = 0
   var memoryUsed = 0
 
+  var lastHeartbeat = System.currentTimeMillis()
+
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 62f01776a9a64e592f26054dbd4946a74d76150a..924935a5fdb2c0f055aa9e5ddca8782a9e82840f 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -2,6 +2,7 @@ package spark.deploy.worker
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.util.duration._
 import spark.{Logging, Utils}
 import spark.util.AkkaUtils
 import spark.deploy._
@@ -26,6 +27,9 @@ private[spark] class Worker(
 
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
 
+  // Send a heartbeat every (heartbeat timeout) / 4 milliseconds
+  val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+
   var master: ActorRef = null
   var masterWebUiUrl : String = ""
   val workerId = generateWorkerId()
@@ -97,6 +101,9 @@ private[spark] class Worker(
     case RegisteredWorker(url) =>
       masterWebUiUrl = url
       logInfo("Successfully registered with master")
+      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
+        master ! Heartbeat(workerId)
+      }
 
     case RegisterWorkerFailed(message) =>
       logError("Worker registration failed: " + message)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
index bba7de6a65c3d17aab47bdfa07c464ee7e801604..8bf838209f3d8183cf6251bd7def1b8f14dffa75 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ExecutorLossReason.scala
@@ -12,10 +12,10 @@ class ExecutorLossReason(val message: String) {
 
 private[spark]
 case class ExecutorExited(val exitCode: Int)
-    extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
+  extends ExecutorLossReason(ExecutorExitCode.explainExitCode(exitCode)) {
 }
 
 private[spark]
 case class SlaveLost(_message: String = "Slave lost")
-    extends ExecutorLossReason(_message) {
+  extends ExecutorLossReason(_message) {
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 59ff8bcb90fc2620de2b538860cff59e9a2446c8..3c3e83b1385574feb006ae5d41289fd7ab03c7e3 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -67,6 +67,7 @@ private[spark] class SparkDeploySchedulerBackend(
       case None => SlaveLost(message)
     }
     logInfo("Executor %s removed: %s".format(executorId, message))
+    removeExecutor(executorId, reason.toString)
     scheduler.executorLost(executorId, reason)
   }
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index da7dcf4b6b48e8b5eb851fbef8d48d79e20dc09e..d7660678248b2d9f028d8364bd3caa8cbd47660c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -37,3 +37,6 @@ object StatusUpdate {
 // Internal messages in driver
 private[spark] case object ReviveOffers extends StandaloneClusterMessage
 private[spark] case object StopDriver extends StandaloneClusterMessage
+
+private[spark] case class RemoveExecutor(executorId: String, reason: String)
+  extends StandaloneClusterMessage
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 082022be1c9da0a487e65879e57814b793ebe838..4213eb87190c6f2190ad75ce127212f984ef85e1 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -68,6 +68,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
         sender ! true
         context.stop(self)
 
+      case RemoveExecutor(executorId, reason) =>
+        removeExecutor(executorId, reason)
+        sender ! true
+
       case Terminated(actor) =>
         actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
 
@@ -100,7 +104,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
 
     // Remove a disconnected slave from the cluster
     def removeExecutor(executorId: String, reason: String) {
-      logInfo("Slave " + executorId + " disconnected, so removing it")
+      logInfo("Executor " + executorId + " disconnected, so removing it")
       val numCores = freeCores(executorId)
       actorToExecutorId -= executorActor(executorId)
       addressToExecutorId -= executorAddress(executorId)
@@ -139,7 +143,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
       }
     } catch {
       case e: Exception =>
-        throw new SparkException("Error stopping standalone scheduler's master actor", e)
+        throw new SparkException("Error stopping standalone scheduler's driver actor", e)
     }
   }
 
@@ -148,6 +152,18 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
   }
 
   override def defaultParallelism(): Int = math.max(totalCoreCount.get(), 2)
+
+  // Called by backends
+  def removeExecutor(executorId: String, reason: String) {
+    try {
+      val timeout = 5.seconds
+      val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout)
+      Await.result(future, timeout)
+    } catch {
+      case e: Exception =>
+        throw new SparkException("Error notifying standalone scheduler's driver actor", e)
+    }
+  }
 }
 
 private[spark] object StandaloneSchedulerBackend {