Skip to content
Snippets Groups Projects
Commit df3d559b authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Sean Owen
Browse files

[SPARK-5801] [core] Avoid creating nested directories.

Cache the value of the local root dirs to use for storing local data,
so that the same directories are reused.

Also, to avoid an extra level of nesting, use a different env variable
to propagate the local dirs from the Worker to the executors. And make
the executor directory use a different name.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #4747 from vanzin/SPARK-5801 and squashes the following commits:

e0114e1 [Marcelo Vanzin] Update unit test.
18ee0a7 [Marcelo Vanzin] [SPARK-5801] [core] Avoid creating nested directories.
parent 192e42a2
No related branches found
No related tags found
No related merge requests found
......@@ -135,7 +135,7 @@ private[spark] class ExecutorRunner(
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
builder.directory(executorDir)
builder.environment.put("SPARK_LOCAL_DIRS", appLocalDirs.mkString(","))
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
......
......@@ -345,11 +345,11 @@ private[spark] class Worker(
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
Utils.createDirectory(dir).getAbsolutePath()
Utils.createDirectory(dir, namePrefix = "executor").getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
......
......@@ -63,6 +63,7 @@ private[spark] object Utils extends Logging {
val random = new Random()
private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
......@@ -683,14 +684,31 @@ private[spark] object Utils extends Logging {
* and returns only the directories that exist / could be created.
*
* If no directories could be created, this will return an empty list.
*
* This method will cache the local directories for the application when it's first invoked.
* So calling it multiple times with a different configuration will always return the same
* set of directories.
*/
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
if (localRootDirs == null) {
this.synchronized {
if (localRootDirs == null) {
localRootDirs = getOrCreateLocalRootDirsImpl(conf)
}
}
}
localRootDirs
}
private def getOrCreateLocalRootDirsImpl(conf: SparkConf): Array[String] = {
if (isRunningInYarnContainer(conf)) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
getYarnLocalDirs(conf).split(",")
} else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
} else {
// In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
// configuration to point to a secure directory. So create a subdirectory with restricted
......@@ -734,6 +752,11 @@ private[spark] object Utils extends Logging {
localDirs
}
/** Used by unit tests. Do not call from other places. */
private[spark] def clearLocalRootDirs(): Unit = {
localRootDirs = null
}
/**
* Shuffle the elements of a collection into a random order, returning the
* result in a new collection. Unlike scala.util.Random.shuffle, this method
......
......@@ -20,7 +20,7 @@ package org.apache.spark.storage
import java.io.File
import org.apache.spark.util.Utils
import org.scalatest.FunSuite
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.SparkConf
......@@ -28,7 +28,11 @@ import org.apache.spark.SparkConf
/**
* Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
*/
class LocalDirsSuite extends FunSuite {
class LocalDirsSuite extends FunSuite with BeforeAndAfter {
before {
Utils.clearLocalRootDirs()
}
test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
// Regression test for SPARK-2974
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment