diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7caf6bcf94ef31f6938bc671f7d6f84acc81787e..2cbd38d72caa1e2dff5bd98a636ff81b0123ebf6 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -755,6 +755,7 @@ private[spark] object Utils extends Logging {
   /**
    * Delete a file or directory and its contents recursively.
    * Don't follow directories if they are symlinks.
+   * Throws an exception if deletion is unsuccessful.
    */
   def deleteRecursively(file: File) {
     if (file != null) {
diff --git a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
index 792b9cd8b6ff2f7ee851b45c0afae2c9a1dd6bed..6608ed1e57b383b1de0f017d8a9510728688fe8e 100644
--- a/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
@@ -63,8 +63,9 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
     rdd.count()
     rdd.count()
 
-    // Invalidate the registered executors, disallowing access to their shuffle blocks.
-    rpcHandler.clearRegisteredExecutors()
+    // Invalidate the registered executors, disallowing access to their shuffle blocks (without
+    // deleting the actual shuffle files, so we could access them without the shuffle service).
+    rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */)
 
     // Now Spark will receive FetchFailed, and not retry the stage due to "spark.test.noStageRetry"
     // being set.
diff --git a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 2856d1c8c93376fed7e0d429fe8483a8e3b303d7..75c4a3981a2407c968f9a05fb01b2411ab1d8d63 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -22,16 +22,22 @@ import java.nio.ByteBuffer;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import com.google.common.base.Preconditions;
 import com.google.common.io.Closeables;
 import com.google.common.base.Charsets;
 import io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * General utilities available in the network package. Many of these are sourced from Spark's
+ * own Utils, just accessible within this package.
+ */
 public class JavaUtils {
   private static final Logger logger = LoggerFactory.getLogger(JavaUtils.class);
 
@@ -93,4 +99,57 @@ public class JavaUtils {
   public static String bytesToString(ByteBuffer b) {
     return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
   }
+
+  /*
+   * Delete a file or directory and its contents recursively.
+   * Don't follow directories if they are symlinks.
+   * Throws an exception if deletion is unsuccessful.
+   */
+  public static void deleteRecursively(File file) throws IOException {
+    if (file == null) { return; }
+
+    if (file.isDirectory() && !isSymlink(file)) {
+      IOException savedIOException = null;
+      for (File child : listFilesSafely(file)) {
+        try {
+          deleteRecursively(child);
+        } catch (IOException e) {
+          // In case of multiple exceptions, only last one will be thrown
+          savedIOException = e;
+        }
+      }
+      if (savedIOException != null) {
+        throw savedIOException;
+      }
+    }
+
+    boolean deleted = file.delete();
+    // Delete can also fail if the file simply did not exist.
+    if (!deleted && file.exists()) {
+      throw new IOException("Failed to delete: " + file.getAbsolutePath());
+    }
+  }
+
+  private static File[] listFilesSafely(File file) throws IOException {
+    if (file.exists()) {
+      File[] files = file.listFiles();
+      if (files == null) {
+        throw new IOException("Failed to list files for dir: " + file);
+      }
+      return files;
+    } else {
+      return new File[0];
+    }
+  }
+
+  private static boolean isSymlink(File file) throws IOException {
+    Preconditions.checkNotNull(file);
+    File fileInCanonicalDir = null;
+    if (file.getParent() == null) {
+      fileInCanonicalDir = file;
+    } else {
+      fileInCanonicalDir = new File(file.getParentFile().getCanonicalFile(), file.getName());
+    }
+    return !fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile());
+  }
 }
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index cd3fea85b19a4220bb55768468e2b95b8ff0c9c9..75ebf8c7b0604c7533bb3ceb5b118e3f91dfd8d4 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -94,9 +94,11 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
     return streamManager;
   }
 
-  /** For testing, clears all executors registered with "RegisterExecutor". */
-  @VisibleForTesting
-  public void clearRegisteredExecutors() {
-    blockManager.clearRegisteredExecutors();
+  /**
+   * Removes an application (once it has been terminated), and optionally will clean up any
+   * local directories associated with the executors of that application in a separate thread.
+   */
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    blockManager.applicationRemoved(appId, cleanupLocalDirs);
   }
 }
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
index 6589889fe1be7fc04591b89161d314f179f53d60..98fcfb82aa5d1292dc96d8a04531e5247d37e314 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockManager.java
@@ -21,9 +21,15 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,13 +49,22 @@ import org.apache.spark.network.util.JavaUtils;
 public class ExternalShuffleBlockManager {
   private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
 
-  // Map from "appId-execId" to the executor's configuration.
-  private final ConcurrentHashMap<String, ExecutorShuffleInfo> executors =
-    new ConcurrentHashMap<String, ExecutorShuffleInfo>();
+  // Map containing all registered executors' metadata.
+  private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
 
-  // Returns an id suitable for a single executor within a single application.
-  private String getAppExecId(String appId, String execId) {
-    return appId + "-" + execId;
+  // Single-threaded Java executor used to perform expensive recursive directory deletion.
+  private final Executor directoryCleaner;
+
+  public ExternalShuffleBlockManager() {
+    // TODO: Give this thread a name.
+    this(Executors.newSingleThreadExecutor());
+  }
+
+  // Allows tests to have more control over when directories are cleaned up.
+  @VisibleForTesting
+  ExternalShuffleBlockManager(Executor directoryCleaner) {
+    this.executors = Maps.newConcurrentMap();
+    this.directoryCleaner = directoryCleaner;
   }
 
   /** Registers a new Executor with all the configuration we need to find its shuffle files. */
@@ -57,7 +72,7 @@ public class ExternalShuffleBlockManager {
       String appId,
       String execId,
       ExecutorShuffleInfo executorInfo) {
-    String fullId = getAppExecId(appId, execId);
+    AppExecId fullId = new AppExecId(appId, execId);
     logger.info("Registered executor {} with {}", fullId, executorInfo);
     executors.put(fullId, executorInfo);
   }
@@ -78,7 +93,7 @@ public class ExternalShuffleBlockManager {
     int mapId = Integer.parseInt(blockIdParts[2]);
     int reduceId = Integer.parseInt(blockIdParts[3]);
 
-    ExecutorShuffleInfo executor = executors.get(getAppExecId(appId, execId));
+    ExecutorShuffleInfo executor = executors.get(new AppExecId(appId, execId));
     if (executor == null) {
       throw new RuntimeException(
         String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
@@ -94,6 +109,56 @@ public class ExternalShuffleBlockManager {
     }
   }
 
+  /**
+   * Removes our metadata of all executors registered for the given application, and optionally
+   * also deletes the local directories associated with the executors of that application in a
+   * separate thread.
+   *
+   * It is not valid to call registerExecutor() for an executor with this appId after invoking
+   * this method.
+   */
+  public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+    logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
+    Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = executors.entrySet().iterator();
+    while (it.hasNext()) {
+      Map.Entry<AppExecId, ExecutorShuffleInfo> entry = it.next();
+      AppExecId fullId = entry.getKey();
+      final ExecutorShuffleInfo executor = entry.getValue();
+
+      // Only touch executors associated with the appId that was removed.
+      if (appId.equals(fullId.appId)) {
+        it.remove();
+
+        if (cleanupLocalDirs) {
+          logger.info("Cleaning up executor {}'s {} local dirs", fullId, executor.localDirs.length);
+
+          // Execute the actual deletion in a different thread, as it may take some time.
+          directoryCleaner.execute(new Runnable() {
+            @Override
+            public void run() {
+              deleteExecutorDirs(executor.localDirs);
+            }
+          });
+        }
+      }
+    }
+  }
+
+  /**
+   * Synchronously deletes each directory one at a time.
+   * Should be executed in its own thread, as this may take a long time.
+   */
+  private void deleteExecutorDirs(String[] dirs) {
+    for (String localDir : dirs) {
+      try {
+        JavaUtils.deleteRecursively(new File(localDir));
+        logger.debug("Successfully cleaned up directory: " + localDir);
+      } catch (Exception e) {
+        logger.error("Failed to delete directory: " + localDir, e);
+      }
+    }
+  }
+
   /**
    * Hash-based shuffle data is simply stored as one file per block.
    * This logic is from FileShuffleBlockManager.
@@ -146,9 +211,36 @@ public class ExternalShuffleBlockManager {
     return new File(new File(localDir, String.format("%02x", subDirId)), filename);
   }
 
-  /** For testing, clears all registered executors. */
-  @VisibleForTesting
-  void clearRegisteredExecutors() {
-    executors.clear();
+  /** Simply encodes an executor's full ID, which is appId + execId. */
+  private static class AppExecId {
+    final String appId;
+    final String execId;
+
+    private AppExecId(String appId, String execId) {
+      this.appId = appId;
+      this.execId = execId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppExecId appExecId = (AppExecId) o;
+      return Objects.equal(appId, appExecId.appId) && Objects.equal(execId, appExecId.execId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(appId, execId);
+    }
+
+    @Override
+    public String toString() {
+      return Objects.toStringHelper(this)
+        .add("appId", appId)
+        .add("execId", execId)
+        .toString();
+    }
   }
 }
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
new file mode 100644
index 0000000000000000000000000000000000000000..c8ece3bc53ac385ccac044537634dba73623258c
--- /dev/null
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalShuffleCleanupSuite {
+
+  // Same-thread Executor used to ensure cleanup happens synchronously in test thread.
+  Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
+
+  @Test
+  public void noCleanupAndCleanup() throws IOException {
+    TestShuffleDataContext dataContext = createSomeData();
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", false /* cleanup */);
+
+    assertStillThere(dataContext);
+
+    manager.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true /* cleanup */);
+
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupUsesExecutor() throws IOException {
+    TestShuffleDataContext dataContext = createSomeData();
+
+    final AtomicBoolean cleanupCalled = new AtomicBoolean(false);
+
+    // Executor which does nothing to ensure we're actually using it.
+    Executor noThreadExecutor = new Executor() {
+      @Override public void execute(Runnable runnable) { cleanupCalled.set(true); }
+    };
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(noThreadExecutor);
+
+    manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true);
+
+    assertTrue(cleanupCalled.get());
+    assertStillThere(dataContext);
+
+    dataContext.cleanup();
+    assertCleanedUp(dataContext);
+  }
+
+  @Test
+  public void cleanupMultipleExecutors() throws IOException {
+    TestShuffleDataContext dataContext0 = createSomeData();
+    TestShuffleDataContext dataContext1 = createSomeData();
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+    manager.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+    manager.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
+    manager.applicationRemoved("app", true);
+
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  @Test
+  public void cleanupOnlyRemovedApp() throws IOException {
+    TestShuffleDataContext dataContext0 = createSomeData();
+    TestShuffleDataContext dataContext1 = createSomeData();
+
+    ExternalShuffleBlockManager manager = new ExternalShuffleBlockManager(sameThreadExecutor);
+
+    manager.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
+    manager.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
+
+    manager.applicationRemoved("app-nonexistent", true);
+    assertStillThere(dataContext0);
+    assertStillThere(dataContext1);
+
+    manager.applicationRemoved("app-0", true);
+    assertCleanedUp(dataContext0);
+    assertStillThere(dataContext1);
+
+    manager.applicationRemoved("app-1", true);
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+
+    // Make sure it's not an error to cleanup multiple times
+    manager.applicationRemoved("app-1", true);
+    assertCleanedUp(dataContext0);
+    assertCleanedUp(dataContext1);
+  }
+
+  private void assertStillThere(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertTrue(localDir + " was cleaned up prematurely", new File(localDir).exists());
+    }
+  }
+
+  private void assertCleanedUp(TestShuffleDataContext dataContext) {
+    for (String localDir : dataContext.localDirs) {
+      assertFalse(localDir + " wasn't cleaned up", new File(localDir).exists());
+    }
+  }
+
+  private TestShuffleDataContext createSomeData() throws IOException {
+    Random rand = new Random(123);
+    TestShuffleDataContext dataContext = new TestShuffleDataContext(10, 5);
+
+    dataContext.create();
+    dataContext.insertSortShuffleData(rand.nextInt(1000), rand.nextInt(1000),
+      new byte[][] { "ABC".getBytes(), "DEF".getBytes() } );
+    dataContext.insertHashShuffleData(rand.nextInt(1000), rand.nextInt(1000) + 1000,
+      new byte[][] { "GHI".getBytes(), "JKLMNOPQRSTUVWXYZ".getBytes() } );
+    return dataContext;
+  }
+}
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 06294fef1962178c22205ecebcf8c730ea4f2eb4..3bea5b0f253c6ae9f197fe16dcce9268db3d4cd3 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -105,7 +105,7 @@ public class ExternalShuffleIntegrationSuite {
 
   @After
   public void afterEach() {
-    handler.clearRegisteredExecutors();
+    handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
   }
 
   class FetchResult {
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
index 442b7564674421fed4dd3bb3adc1e01cc40c25bc..337b5c7bdb5dace2ae3dafee7bd3088db9367851 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/TestShuffleDataContext.java
@@ -30,8 +30,8 @@ import com.google.common.io.Files;
  * and cleanup of directories that can be read by the {@link ExternalShuffleBlockManager}.
  */
 public class TestShuffleDataContext {
-  private final String[] localDirs;
-  private final int subDirsPerLocalDir;
+  public final String[] localDirs;
+  public final int subDirsPerLocalDir;
 
   public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
     this.localDirs = new String[numLocalDirs];