Skip to content
Snippets Groups Projects
Commit 52983d7f authored by Sephiroth-Lin's avatar Sephiroth-Lin Committed by Sean Owen
Browse files

[SPARK-5644] [Core]Delete tmp dir when sc is stop

When we run driver as a service, and for each time we run job we only call sc.stop, then will not delete tmp dir create by HttpFileServer and SparkEnv, it will be deleted until the service process exit, so we need to delete these tmp dirs when sc is stop directly.

Author: Sephiroth-Lin <linwzhong@gmail.com>

Closes #4412 from Sephiroth-Lin/bug-fix-master-01 and squashes the following commits:

fbbc785 [Sephiroth-Lin] using an interpolated string
b968e14 [Sephiroth-Lin] using an interpolated string
4edf394 [Sephiroth-Lin] rename the variable and update comment
1339c96 [Sephiroth-Lin] add a member to store the reference of tmp dir
b2018a5 [Sephiroth-Lin] check sparkFilesDir before delete
f48a3c6 [Sephiroth-Lin] don't check sparkFilesDir, check executorId
dd9686e [Sephiroth-Lin] format code
b38e0f0 [Sephiroth-Lin] add dir check before delete
d7ccc64 [Sephiroth-Lin] Change log level
1d70926 [Sephiroth-Lin] update comment
e2a2b1b [Sephiroth-Lin] update comment
aeac518 [Sephiroth-Lin] Delete tmp dir when sc is stop
c0d5b28 [Sephiroth-Lin] Delete tmp dir when sc is stop
parent 58209612
No related branches found
No related tags found
No related merge requests found
......@@ -50,6 +50,15 @@ private[spark] class HttpFileServer(
def stop() {
httpServer.stop()
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs
try {
Utils.deleteRecursively(baseDir)
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: ${baseDir.getAbsolutePath}", e)
}
}
def addFile(file: File) : String = {
......
......@@ -76,6 +76,8 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
private var driverTmpDirToDelete: Option[String] = None
private[spark] def stop() {
isStopped = true
pythonWorkers.foreach { case(key, worker) => worker.stop() }
......@@ -93,6 +95,22 @@ class SparkEnv (
// actorSystem.awaitTermination()
// Note that blockTransferService is stopped by BlockManager since it is started by it.
// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}
private[spark]
......@@ -350,7 +368,7 @@ object SparkEnv extends Logging {
"levels using the RDD.persist() method instead.")
}
new SparkEnv(
val envInstance = new SparkEnv(
executorId,
actorSystem,
serializer,
......@@ -367,6 +385,15 @@ object SparkEnv extends Logging {
metricsSystem,
shuffleMemoryManager,
conf)
// Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
// called, and we only need to do it for driver. Because driver may run as a service, and if we
// don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
if (isDriver) {
envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
}
envInstance
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment