Skip to content
Snippets Groups Projects
Commit 1b63dea8 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #769 from markhamstra/NegativeCores

SPARK-847 + SPARK-845: Zombie workers and negative cores
parents 828aff74 35d8f5ee
No related branches found
No related tags found
No related merge requests found
...@@ -109,6 +109,7 @@ private[deploy] object DeployMessages { ...@@ -109,6 +109,7 @@ private[deploy] object DeployMessages {
} }
// WorkerWebUI to Worker // WorkerWebUI to Worker
case object RequestWorkerState case object RequestWorkerState
// Worker to WorkerWebUI // Worker to WorkerWebUI
...@@ -120,4 +121,9 @@ private[deploy] object DeployMessages { ...@@ -120,4 +121,9 @@ private[deploy] object DeployMessages {
Utils.checkHost(host, "Required hostname") Utils.checkHost(host, "Required hostname")
assert (port > 0) assert (port > 0)
} }
// Actor System to Master
case object CheckForWorkerTimeOut
} }
...@@ -52,8 +52,10 @@ private[spark] class ApplicationInfo( ...@@ -52,8 +52,10 @@ private[spark] class ApplicationInfo(
} }
def removeExecutor(exec: ExecutorInfo) { def removeExecutor(exec: ExecutorInfo) {
executors -= exec.id if (executors.contains(exec.id)) {
coresGranted -= exec.cores executors -= exec.id
coresGranted -= exec.cores
}
} }
def coresLeft: Int = desc.maxCores - coresGranted def coresLeft: Int = desc.maxCores - coresGranted
......
...@@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -39,7 +39,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
var nextAppNumber = 0 var nextAppNumber = 0
val workers = new HashSet[WorkerInfo] val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo]
...@@ -79,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -79,7 +80,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Listen for remote client disconnection events, since they don't go through Akka's watch() // Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start() webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start() masterMetricsSystem.start()
...@@ -175,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -175,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RequestMasterState => { case RequestMasterState => {
sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
} }
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}
} }
/** /**
...@@ -337,12 +342,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act ...@@ -337,12 +342,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
/** Check for, and remove, any timed-out workers */ /** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() { def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it // Copy the workers into an array so we don't modify the hashset while iterating through it
val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT val currentTime = System.currentTimeMillis()
val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
for (worker <- toRemove) { for (worker <- toRemove) {
logWarning("Removing %s because we got no heartbeat in %d seconds".format( if (worker.state != WorkerState.DEAD) {
worker.id, WORKER_TIMEOUT)) logWarning("Removing %s because we got no heartbeat in %d seconds".format(
removeWorker(worker) worker.id, WORKER_TIMEOUT/1000))
removeWorker(worker)
} else {
if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
}
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment