From cdd1af562ef3fb480f2e98300e3d463657c09681 Mon Sep 17 00:00:00 2001
From: Mark Hamstra <markhamstra@gmail.com>
Date: Thu, 1 Aug 2013 15:14:39 -0700
Subject: [PATCH] Timeout zombie workers

---
 .../scala/spark/deploy/master/Master.scala     | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 0aed4b9802..b50613f866 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
   val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
   val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
-
+  val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
+ 
   var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
   val idToWorker = new HashMap[String, WorkerInfo]
@@ -337,12 +338,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   /** Check for, and remove, any timed-out workers */
   def timeOutDeadWorkers() {
     // Copy the workers into an array so we don't modify the hashset while iterating through it
-    val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
-    val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+    val currentTime = System.currentTimeMillis()
+    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
     for (worker <- toRemove) {
-      logWarning("Removing %s because we got no heartbeat in %d seconds".format(
-        worker.id, WORKER_TIMEOUT))
-      removeWorker(worker)
+      if (worker.state != WorkerState.DEAD) {
+        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+          worker.id, WORKER_TIMEOUT))
+        removeWorker(worker)
+      } else {
+        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it 
+      }
     }
   }
 }
-- 
GitLab