diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index f963a4606068bf0b5191f10c22ae9ef4d28b7f6a..e48817ebbafddb2d96d1d2799408e226f359d5bb 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -445,12 +445,25 @@ private[deploy] class Worker(
           // Create local dirs for the executor. These are passed to the executor via the
           // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
           // application finishes.
-          val appLocalDirs = appDirectories.getOrElse(appId,
-            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
-              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
-              Utils.chmod700(appDir)
-              appDir.getAbsolutePath()
-            }.toSeq)
+          val appLocalDirs = appDirectories.getOrElse(appId, {
+            val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
+            val dirs = localRootDirs.flatMap { dir =>
+              try {
+                val appDir = Utils.createDirectory(dir, namePrefix = "executor")
+                Utils.chmod700(appDir)
+                Some(appDir.getAbsolutePath())
+              } catch {
+                case e: IOException =>
+                  logWarning(s"${e.getMessage}. Ignoring this directory.")
+                  None
+              }
+            }.toSeq
+            if (dirs.isEmpty) {
+              throw new IOException("No subfolder can be created in " +
+                s"${localRootDirs.mkString(",")}.")
+            }
+            dirs
+          })
           appDirectories(appId) = appLocalDirs
           val manager = new ExecutorRunner(
             appId,