Skip to content
Snippets Groups Projects
Commit eacd4a92 authored by linweizhong's avatar linweizhong Committed by Sean Owen
Browse files

[SPARK-7705] [YARN] Cleanup of .sparkStaging directory fails if application is killed

As I have tested, if we cancel or kill the app then the final status may be undefined, killed or succeeded, so clean up staging directory when appMaster exit at any final application status.

Author: linweizhong <linweizhong@huawei.com>

Closes #6409 from Sephiroth-Lin/SPARK-7705 and squashes the following commits:

3a5a0a5 [linweizhong] Update
83dc274 [linweizhong] Update
923d44d [linweizhong] Update
0dd7c2d [linweizhong] Update
b76a102 [linweizhong] Update code style
7846b69 [linweizhong] Update
bd6cf0d [linweizhong] Refactor
aed9f18 [linweizhong] Clean up stagingDir when launch app on yarn
95595c3 [linweizhong] Cleanup of .sparkStaging directory when AppMaster exit at any final application status
parent 10fc2f6f
No related branches found
No related tags found
No related merge requests found
......@@ -121,24 +121,31 @@ private[spark] class Client(
} catch {
case e: Throwable =>
if (appId != null) {
val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
}
cleanupStagingDir(appId)
}
throw e
}
}
/**
* Cleanup application staging directory.
*/
private def cleanupStagingDir(appId: ApplicationId): Unit = {
val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.getBoolean("spark.yarn.preserve.staging.files", false)
val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
}
}
/**
* Set up the context for submitting our ApplicationMaster.
* This uses the YarnClientApplication not available in the Yarn alpha API.
......@@ -782,6 +789,7 @@ private[spark] class Client(
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
cleanupStagingDir(appId)
return (state, report.getFinalApplicationStatus)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment