Skip to content
Snippets Groups Projects
Commit 2f5187bd authored by xiaojian.fxj's avatar xiaojian.fxj Committed by Shixiong Zhu
Browse files

[SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the...

[SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block

Cleaning the application may cost much time at worker, then it will block that the worker send heartbeats master because the worker is extend ThreadSafeRpcEndpoint. If the heartbeat from a worker is blocked by the message ApplicationFinished, master will think the worker is dead. If the worker has a driver, the driver will be scheduled by master again.
It had better reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block.

Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>

Closes #17189 from hustfxj/worker-hearbeat.
parent e29a74d5
No related branches found
No related tags found
No related merge requests found
......@@ -62,8 +62,8 @@ private[deploy] class Worker(
private val forwordMessageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
// A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
// methods.
// A separated thread to clean up the workDir and the directories of finished applications.
// Used to provide the implicit parameter of `Future` methods.
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
......@@ -578,10 +578,15 @@ private[deploy] class Worker(
if (shouldCleanup) {
finishedApps -= id
appDirectories.remove(id).foreach { dirList =>
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
concurrent.Future {
logInfo(s"Cleaning up local directories for application $id")
dirList.foreach { dir =>
Utils.deleteRecursively(new File(dir))
}
}(cleanupThreadExecutor).onFailure {
case e: Throwable =>
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
}(cleanupThreadExecutor)
}
shuffleService.applicationRemoved(id)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment