diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 00a43673e5cd3e545d5d2c4bb9375bea28490418..71650cd773bcfb5904120f0475f003652bc7607f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -42,7 +42,7 @@ private[spark] class ExecutorRunner(
     val workerId: String,
     val host: String,
     val sparkHome: File,
-    val workDir: File,
+    val executorDir: File,
     val workerUrl: String,
     val conf: SparkConf,
     var state: ExecutorState.Value)
@@ -130,12 +130,6 @@ private[spark] class ExecutorRunner(
    */
   def fetchAndRunExecutor() {
     try {
-      // Create the executor's working directory
-      val executorDir = new File(workDir, appId + "/" + execId)
-      if (!executorDir.mkdirs()) {
-        throw new IOException("Failed to create directory " + executorDir)
-      }
-
       // Launch the process
       val command = getCommandSeq
       logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0c454e4138c968c2d30d09e3f1c30d8c6d5ab01f..3b13f43a1868c1cc2207d09eec7abefd0f1af9ad 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -18,15 +18,18 @@
 package org.apache.spark.deploy.worker
 
 import java.io.File
+import java.io.IOException
 import java.text.SimpleDateFormat
 import java.util.Date
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
 import akka.actor._
 import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
@@ -191,6 +194,7 @@ private[spark] class Worker(
       changeMaster(masterUrl, masterWebUiUrl)
       context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
       if (CLEANUP_ENABLED) {
+        logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
         context.system.scheduler.schedule(CLEANUP_INTERVAL_MILLIS millis,
           CLEANUP_INTERVAL_MILLIS millis, self, WorkDirCleanup)
       }
@@ -201,10 +205,23 @@ private[spark] class Worker(
     case WorkDirCleanup =>
       // Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
       val cleanupFuture = concurrent.future {
-        logInfo("Cleaning up oldest application directories in " + workDir + " ...")
-        Utils.findOldFiles(workDir, APP_DATA_RETENTION_SECS)
-          .foreach(Utils.deleteRecursively)
+        val appDirs = workDir.listFiles()
+        if (appDirs == null) {
+          throw new IOException("ERROR: Failed to list files in " + appDirs)
+        }
+        appDirs.filter { dir =>
+          // the directory is used by an application - check that the application is not running
+          // when cleaning up
+          val appIdFromDir = dir.getName
+          val isAppStillRunning = executors.values.map(_.appId).contains(appIdFromDir)
+          dir.isDirectory && !isAppStillRunning &&
+          !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECS)
+        }.foreach { dir => 
+          logInfo(s"Removing directory: ${dir.getPath}")
+          Utils.deleteRecursively(dir)
+        }
       }
+
       cleanupFuture onFailure {
         case e: Throwable =>
           logError("App dir cleanup failed: " + e.getMessage, e)
@@ -233,8 +250,15 @@ private[spark] class Worker(
       } else {
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+
+          // Create the executor's working directory
+          val executorDir = new File(workDir, appId + "/" + execId)
+          if (!executorDir.mkdirs()) {
+            throw new IOException("Failed to create directory " + executorDir)
+          }
+
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, sparkHome, workDir, akkaUrl, conf, ExecutorState.LOADING)
+            self, workerId, host, sparkHome, executorDir, akkaUrl, conf, ExecutorState.LOADING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -242,12 +266,13 @@ private[spark] class Worker(
           master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
         } catch {
           case e: Exception => {
-            logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
             if (executors.contains(appId + "/" + execId)) {
               executors(appId + "/" + execId).kill()
               executors -= appId + "/" + execId
             }
-            master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+            master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
+              Some(e.toString), None)
           }
         }
       }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9399ddab7633126be2ace318ea685c22b9ead15a..a67124140f9da8f28641ba62fbd46f94ef3104e0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -35,6 +35,8 @@ import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.commons.io.FileUtils
+import org.apache.commons.io.filefilter.TrueFileFilter
 import org.apache.commons.lang3.SystemUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.log4j.PropertyConfigurator
@@ -705,17 +707,20 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Finds all the files in a directory whose last modified time is older than cutoff seconds.
-   * @param dir  must be the path to a directory, or IllegalArgumentException is thrown
-   * @param cutoff measured in seconds. Files older than this are returned.
+   * Determines if a directory contains any files newer than cutoff seconds.
+   * 
+   * @param dir must be the path to a directory, or IllegalArgumentException is thrown
+   * @param cutoff measured in seconds. Returns true if there are any files in dir newer than this.
    */
-  def findOldFiles(dir: File, cutoff: Long): Seq[File] = {
+  def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
     val currentTimeMillis = System.currentTimeMillis
-    if (dir.isDirectory) {
-      val files = listFilesSafely(dir)
-      files.filter { file => file.lastModified < (currentTimeMillis - cutoff * 1000) }
+    if (!dir.isDirectory) {
+      throw new IllegalArgumentException (dir + " is not a directory!")
     } else {
-      throw new IllegalArgumentException(dir + " is not a directory!")
+      val files = FileUtils.listFilesAndDirs(dir, TrueFileFilter.TRUE, TrueFileFilter.TRUE)
+      val cutoffTimeInMillis = (currentTimeMillis - (cutoff * 1000))
+      val newFiles = files.filter { _.lastModified > cutoffTimeInMillis }
+      newFiles.nonEmpty
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 70d423ba8a04d7930e382d04f731d0630b2175dd..e63d9d085e385495c8936d817de786e59565d3f7 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -189,17 +189,28 @@ class UtilsSuite extends FunSuite {
     assert(Utils.getIteratorSize(iterator) === 5L)
   }
 
-  test("findOldFiles") {
+  test("doesDirectoryContainFilesNewerThan") {
     // create some temporary directories and files
     val parent: File = Utils.createTempDir()
     val child1: File = Utils.createTempDir(parent.getCanonicalPath) // The parent directory has two child directories
     val child2: File = Utils.createTempDir(parent.getCanonicalPath)
-    // set the last modified time of child1 to 10 secs old
-    child1.setLastModified(System.currentTimeMillis() - (1000 * 10))
+    val child3: File = Utils.createTempDir(child1.getCanonicalPath)
+    // set the last modified time of child1 to 30 secs old
+    child1.setLastModified(System.currentTimeMillis() - (1000 * 30))
 
-    val result = Utils.findOldFiles(parent, 5) // find files older than 5 secs
-    assert(result.size.equals(1))
-    assert(result(0).getCanonicalPath.equals(child1.getCanonicalPath))
+    // although child1 is old, child2 is still new so return true
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+
+    child2.setLastModified(System.currentTimeMillis - (1000 * 30))
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+
+    parent.setLastModified(System.currentTimeMillis - (1000 * 30))
+    // although parent and its immediate children are new, child3 is still old
+    // we expect a full recursive search for new files.
+    assert(Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
+
+    child3.setLastModified(System.currentTimeMillis - (1000 * 30))
+    assert(!Utils.doesDirectoryContainAnyNewFiles(parent, 5)) 
   }
 
   test("resolveURI") {