Skip to content
Snippets Groups Projects
Commit cdd1af56 authored by Mark Hamstra's avatar Mark Hamstra
Browse files

Timeout zombie workers

parent e8bec836
No related branches found
No related tags found
No related merge requests found
...@@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
var nextAppNumber = 0 var nextAppNumber = 0
val workers = new HashSet[WorkerInfo] val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo]
...@@ -337,12 +338,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -337,12 +338,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
/** Check for, and remove, any timed-out workers */ /** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() { def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it // Copy the workers into an array so we don't modify the hashset while iterating through it
val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
for (worker <- toRemove) { for (worker <- toRemove) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format( if (worker.state != WorkerState.DEAD) {
worker.id, WORKER_TIMEOUT)) logWarning("Removing %s because we got no heartbeat in %d seconds".format(
removeWorker(worker) worker.id, WORKER_TIMEOUT))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment