diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b4f321ec99e78d9fc85fc1a8cf565db8e4996054..605df0e929faa94d4791e4c59c2860875bb04cc6 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
   /** Create a SparkConf that loads defaults from system properties and the classpath */
   def this() = this(true)
 
-  private val settings = new HashMap[String, String]()
+  private[spark] val settings = new HashMap[String, String]()
 
   if (loadDefaults) {
     // Load any spark.* system properties
@@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     new SparkConf(false).setAll(settings)
   }
 
+  /**
+   * By using this instead of System.getenv(), environment variables can be mocked
+   * in unit tests.
+   */
+  private[spark] def getenv(name: String): String = System.getenv(name)
+
   /** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
     * idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
   private[spark] def validateSettings() {
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 10210a2927dcc99dc077aba83d04e7242b37c02d..747023812f7543539978b4c0ffa1a5aeeb580301 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -62,7 +62,7 @@ private[spark] class PythonRDD(
     val env = SparkEnv.get
     val localdir = env.blockManager.diskBlockManager.localDirs.map(
       f => f.getPath()).mkString(",")
-    envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread
+    envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
     val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
 
     // Start a thread to feed the process input from our parent's iterator
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fb3f7bd54bbfa05c833ecf3a60a8e0406534701a..2f76e532aeb761ad886aa25e8fee91607a6fdf79 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -62,16 +62,6 @@ private[spark] class Executor(
   val conf = new SparkConf(true)
   conf.setAll(properties)
 
-  // If we are in yarn mode, systems can have different disk layouts so we must set it
-  // to what Yarn on this system said was available. This will be used later when SparkEnv
-  // created.
-  if (java.lang.Boolean.valueOf(
-      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))) {
-    conf.set("spark.local.dir", getYarnLocalDirs())
-  } else if (sys.env.contains("SPARK_LOCAL_DIRS")) {
-    conf.set("spark.local.dir", sys.env("SPARK_LOCAL_DIRS"))
-  }
-
   if (!isLocal) {
     // Setup an uncaught exception handler for non-local mode.
     // Make any thread terminations due to uncaught exceptions kill the entire
@@ -134,21 +124,6 @@ private[spark] class Executor(
     threadPool.shutdown()
   }
 
-  /** Get the Yarn approved local directories. */
-  private def getYarnLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .getOrElse(Option(System.getenv("LOCAL_DIRS"))
-      .getOrElse(""))
-
-    if (localDirs.isEmpty) {
-      throw new Exception("Yarn Local dirs can't be empty")
-    }
-    localDirs
-  }
-
   class TaskRunner(
       execBackend: ExecutorBackend, val taskId: Long, taskName: String, serializedTask: ByteBuffer)
     extends Runnable {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index c0491fb55e3a4dc6e37bd4a7ac03a5c50427afbb..12a92d44f4c36ea9200d7c9e2754cb5a1896843c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -67,8 +67,7 @@ private[spark] class BlockManager(
 
   private val port = conf.getInt("spark.blockManager.port", 0)
   val shuffleBlockManager = new ShuffleBlockManager(this, shuffleManager)
-  val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
-    conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+  val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
   val connectionManager =
     new ConnectionManager(port, conf, securityManager, "Connection manager for block manager")
 
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index f3da816389581e54e323f7982a9411a05fa8432d..ec022ce9c048a976dc9bdf14d7b8c3d8591bcbd7 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.text.SimpleDateFormat
 import java.util.{Date, Random, UUID}
 
-import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.{SparkConf, SparkEnv, Logging}
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.network.netty.PathResolver
 import org.apache.spark.util.Utils
@@ -33,9 +33,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
  * However, it is also possible to have a block map to only a segment of a file, by calling
  * mapBlockToFileSegment().
  *
- * @param rootDirs The directories to use for storing block files. Data will be hashed among these.
+ * Block files are hashed among the directories listed in spark.local.dir (or in
+ * SPARK_LOCAL_DIRS, if it's set).
  */
-private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, rootDirs: String)
+private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager, conf: SparkConf)
   extends PathResolver with Logging {
 
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@@ -46,7 +47,7 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
   /* Create one local directory for each path mentioned in spark.local.dir; then, inside this
    * directory, create multiple subdirectories that we will hash files into, in order to avoid
    * having really large inodes at the top level. */
-  val localDirs: Array[File] = createLocalDirs()
+  val localDirs: Array[File] = createLocalDirs(conf)
   if (localDirs.isEmpty) {
     logError("Failed to create any local dir.")
     System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
@@ -130,10 +131,9 @@ private[spark] class DiskBlockManager(shuffleBlockManager: ShuffleBlockManager,
     (blockId, getFile(blockId))
   }
 
-  private def createLocalDirs(): Array[File] = {
-    logDebug(s"Creating local directories at root dirs '$rootDirs'")
+  private def createLocalDirs(conf: SparkConf): Array[File] = {
     val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
-    rootDirs.split(",").flatMap { rootDir =>
+    Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
       var foundLocalDir = false
       var localDir: File = null
       var localDirId: String = null
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 019f68b1608942b9af7120f2e4cc450e9bd58be3..d6d74ce26921959d9310cd965e2fa2602ad43751 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -449,12 +449,71 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
-   * return a single directory, even though the spark.local.dir property might be a list of
-   * multiple paths.
+   * Get the path of a temporary directory.  Spark's local directories can be configured through
+   * multiple settings, which are used with the following precedence:
+   *
+   *   - If called from inside of a YARN container, this will return a directory chosen by YARN.
+   *   - If the SPARK_LOCAL_DIRS environment variable is set, this will return a directory from it.
+   *   - Otherwise, if the spark.local.dir is set, this will return a directory from it.
+   *   - Otherwise, this will return java.io.tmpdir.
+   *
+   * Some of these configuration options might be lists of multiple paths, but this method will
+   * always return a single directory.
    */
   def getLocalDir(conf: SparkConf): String = {
-    conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)
+    getOrCreateLocalRootDirs(conf)(0)
+  }
+
+  private[spark] def isRunningInYarnContainer(conf: SparkConf): Boolean = {
+    // These environment variables are set by YARN.
+    // For Hadoop 0.23.X, we check for YARN_LOCAL_DIRS (we use this below in getYarnLocalDirs())
+    // For Hadoop 2.X, we check for CONTAINER_ID.
+    conf.getenv("CONTAINER_ID") != null || conf.getenv("YARN_LOCAL_DIRS") != null
+  }
+
+  /**
+   * Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS,
+   * and returns only the directories that exist / could be created.
+   *
+   * If no directories could be created, this will return an empty list.
+   */
+  private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = {
+    val confValue = if (isRunningInYarnContainer(conf)) {
+      // If we are in yarn mode, systems can have different disk layouts so we must set it
+      // to what Yarn on this system said was available.
+      getYarnLocalDirs(conf)
+    } else {
+      Option(conf.getenv("SPARK_LOCAL_DIRS")).getOrElse(
+        conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")))
+    }
+    val rootDirs = confValue.split(',')
+    logDebug(s"Getting/creating local root dirs at '$confValue'")
+
+    rootDirs.flatMap { rootDir =>
+      val localDir: File = new File(rootDir)
+      val foundLocalDir = localDir.exists || localDir.mkdirs()
+      if (!foundLocalDir) {
+        logError(s"Failed to create local root dir in $rootDir.  Ignoring this directory.")
+        None
+      } else {
+        Some(rootDir)
+      }
+    }
+  }
+
+  /** Get the Yarn approved local directories. */
+  private def getYarnLocalDirs(conf: SparkConf): String = {
+    // Hadoop 0.23 and 2.x have different Environment variable names for the
+    // local dirs, so lets check both. We assume one of the 2 is set.
+    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
+    val localDirs = Option(conf.getenv("YARN_LOCAL_DIRS"))
+      .getOrElse(Option(conf.getenv("LOCAL_DIRS"))
+      .getOrElse(""))
+
+    if (localDirs.isEmpty) {
+      throw new Exception("Yarn Local dirs can't be empty")
+    }
+    localDirs
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 20bac66105a69eff351c31f5c334413f7f23732c..f32ce6f9fcc7f75f1b5838cf62c25e0e57c0f431 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -825,8 +825,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     val blockManager = mock(classOf[BlockManager])
     val shuffleBlockManager = mock(classOf[ShuffleBlockManager])
     when(shuffleBlockManager.conf).thenReturn(conf)
-    val diskBlockManager = new DiskBlockManager(shuffleBlockManager,
-      System.getProperty("java.io.tmpdir"))
+    val diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
 
     when(blockManager.conf).thenReturn(conf.clone.set(confKey, 0.toString))
     val diskStoreMapped = new DiskStore(blockManager, diskBlockManager)
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 777579bc570db017b4a658d6aaf64d86c005b2cb..aabaeadd7a071139cf063b535b4249286a479934 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -71,7 +71,9 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
   }
 
   override def beforeEach() {
-    diskBlockManager = new DiskBlockManager(shuffleBlockManager, rootDirs)
+    val conf = testConf.clone
+    conf.set("spark.local.dir", rootDirs)
+    diskBlockManager = new DiskBlockManager(shuffleBlockManager, conf)
     shuffleBlockManager.idToSegmentMap.clear()
   }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..dae7bf0e336de9d9099f9b48664ee9ed086656fc
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/LocalDirsSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.File
+
+import org.apache.spark.util.Utils
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+
+
+/**
+ * Tests for the spark.local.dir and SPARK_LOCAL_DIRS configuration options.
+ */
+class LocalDirsSuite extends FunSuite {
+
+  test("Utils.getLocalDir() returns a valid directory, even if some local dirs are missing") {
+    // Regression test for SPARK-2974
+    assert(!new File("/NONEXISTENT_DIR").exists())
+    val conf = new SparkConf(false)
+      .set("spark.local.dir", s"/NONEXISTENT_PATH,${System.getProperty("java.io.tmpdir")}")
+    assert(new File(Utils.getLocalDir(conf)).exists())
+  }
+
+  test("SPARK_LOCAL_DIRS override also affects driver") {
+    // Regression test for SPARK-2975
+    assert(!new File("/NONEXISTENT_DIR").exists())
+    // SPARK_LOCAL_DIRS is a valid directory:
+    class MySparkConf extends SparkConf(false) {
+      override def getenv(name: String) = {
+        if (name == "SPARK_LOCAL_DIRS") System.getProperty("java.io.tmpdir")
+        else super.getenv(name)
+      }
+
+      override def clone: SparkConf = {
+        new MySparkConf().setAll(settings)
+      }
+    }
+    // spark.local.dir only contains invalid directories, but that's not a problem since
+    // SPARK_LOCAL_DIRS will override it on both the driver and workers:
+    val conf = new MySparkConf().set("spark.local.dir", "/NONEXISTENT_PATH")
+    assert(new File(Utils.getLocalDir(conf)).exists())
+  }
+
+}
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 2c68cd4921deb893637101c0dc5839a18ef73dcb..1ebe7df418327feadbec6f901f35b2710b252771 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -214,7 +214,7 @@ class ExternalMerger(Merger):
 
     def _get_dirs(self):
         """ Get all the directories """
-        path = os.environ.get("SPARK_LOCAL_DIR", "/tmp")
+        path = os.environ.get("SPARK_LOCAL_DIRS", "/tmp")
         dirs = path.split(",")
         return [os.path.join(d, "python", str(os.getpid()), str(id(self)))
                 for d in dirs]
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 46a01f5a9a2cc0fe53918185d52c6f87dc16e483..4d4848b1bd8f8de3df10c6f978f93d87ecdf8acf 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private var registered = false
 
   def run() {
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
     // set the web ui port to be ephemeral for yarn so we don't conflict with
     // other spark processes running on the same box
     System.setProperty("spark.ui.port", "0")
@@ -138,20 +134,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       params)
   }
 
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
-
-    localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
-      case Some(l) => l
-    }
-  }
-
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 72c7143edcd71c37c1342eb61ed3b0674cdcef9f..c3310fbc24a98e1127730bae482d0c07ba5e7ecc 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -95,11 +95,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
   }
 
   def run() {
-
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
     appAttemptId = getApplicationAttemptId()
     resourceManager = registerWithResourceManager()
 
@@ -152,20 +147,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     System.exit(0)
   }
 
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
-
-    localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
-      case Some(l) => l
-    }
-  }
-
   private def getApplicationAttemptId(): ApplicationAttemptId = {
     val envs = System.getenv()
     val containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9c2bcf17a8508d15325c2e113c25b5bf7949cbbd..1c4005fd8e78e712a557e5a7320e98e8b7cdbcf1 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -72,10 +72,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
   private var registered = false
 
   def run() {
-    // Setup the directories so things go to YARN approved directories rather
-    // than user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
     // Set the web ui port to be ephemeral for yarn so we don't conflict with
     // other spark processes running on the same box
     System.setProperty("spark.ui.port", "0")
@@ -144,20 +140,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
       "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
   }
 
-  // Get the Yarn approved local directories.
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
- 
-    localDirs match {
-      case None => throw new Exception("Yarn local dirs can't be empty")
-      case Some(l) => l
-    }
-  }
-
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
     logInfo("Registering the ApplicationMaster")
     amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index a7585748b7f88f951bc5f390f846728f0f5561e4..45925f1fea0051f8f9d1e6de8346faa161ed5c58 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -94,11 +94,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
   }
 
   def run() {
-
-    // Setup the directories so things go to yarn approved directories rather
-    // then user specified and /tmp.
-    System.setProperty("spark.local.dir", getLocalDirs())
-
     amClient = AMRMClient.createAMRMClient()
     amClient.init(yarnConf)
     amClient.start()
@@ -141,20 +136,6 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
     System.exit(0)
   }
 
-  /** Get the Yarn approved local directories. */
-  private def getLocalDirs(): String = {
-    // Hadoop 0.23 and 2.x have different Environment variable names for the
-    // local dirs, so lets check both. We assume one of the 2 is set.
-    // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
-    val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
-      .orElse(Option(System.getenv("LOCAL_DIRS")))
-
-    localDirs match {
-      case None => throw new Exception("Yarn Local dirs can't be empty")
-      case Some(l) => l
-    }
-  }
-
   private def registerApplicationMaster(): RegisterApplicationMasterResponse = {
     val appUIAddress = sparkConf.get("spark.driver.appUIAddress", "")
     logInfo(s"Registering the ApplicationMaster with appUIAddress: $appUIAddress")