diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1c66ef4fc5ea783303f31b3955c68484225232d..6f336a7c299ab95fa8121458a042765a8afe9360 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2658,7 +2658,7 @@ object SparkContext extends Logging {
         val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
         val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
         val backend = if (coarseGrained) {
-          new CoarseMesosSchedulerBackend(scheduler, sc, url)
+          new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
         } else {
           new MesosSchedulerBackend(scheduler, sc, url)
         }
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 4089c3e771fa86e3f87c214c06f5e9797201d1f6..20a9faa1784b716280b6174d93de59faa396de02 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -27,6 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.sasl.SaslServerBootstrap
 import org.apache.spark.network.server.TransportServer
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.Utils
 
 /**
@@ -45,11 +46,16 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
   private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
 
   private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
-  private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
+  private val blockHandler = newShuffleBlockHandler(transportConf)
   private val transportContext: TransportContext = new TransportContext(transportConf, blockHandler)
 
   private var server: TransportServer = _
 
+  /** Create a new shuffle block handler. Factored out for subclasses to override. */
+  protected def newShuffleBlockHandler(conf: TransportConf): ExternalShuffleBlockHandler = {
+    new ExternalShuffleBlockHandler(conf)
+  }
+
   /** Starts the external shuffle service if the user has configured us to. */
   def startIfEnabled() {
     if (enabled) {
@@ -93,6 +99,13 @@ object ExternalShuffleService extends Logging {
   private val barrier = new CountDownLatch(1)
 
   def main(args: Array[String]): Unit = {
+    main(args, (conf: SparkConf, sm: SecurityManager) => new ExternalShuffleService(conf, sm))
+  }
+
+  /** A helper main method that allows the caller to call this with a custom shuffle service. */
+  private[spark] def main(
+      args: Array[String],
+      newShuffleService: (SparkConf, SecurityManager) => ExternalShuffleService): Unit = {
     val sparkConf = new SparkConf
     Utils.loadDefaultSparkProperties(sparkConf)
     val securityManager = new SecurityManager(sparkConf)
@@ -100,7 +113,7 @@ object ExternalShuffleService extends Logging {
     // we override this value since this service is started from the command line
     // and we assume the user really wants it to be running
     sparkConf.set("spark.shuffle.service.enabled", "true")
-    server = new ExternalShuffleService(sparkConf, securityManager)
+    server = newShuffleService(sparkConf, securityManager)
     server.start()
 
     installShutdownHook()
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
new file mode 100644
index 0000000000000000000000000000000000000000..061857476a8a0a71bf6f94f6096e3dbc0282cca7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.mesos
+
+import java.net.SocketAddress
+
+import scala.collection.mutable
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver
+import org.apache.spark.network.util.TransportConf
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers running on Mesos.
+ * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]].
+ */
+private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportConf)
+  extends ExternalShuffleBlockHandler(transportConf) with Logging {
+
+  // Stores a map of driver socket addresses to app ids
+  private val connectedApps = new mutable.HashMap[SocketAddress, String]
+
+  protected override def handleMessage(
+      message: BlockTransferMessage,
+      client: TransportClient,
+      callback: RpcResponseCallback): Unit = {
+    message match {
+      case RegisterDriverParam(appId) =>
+        val address = client.getSocketAddress
+        logDebug(s"Received registration request from app $appId (remote address $address).")
+        if (connectedApps.contains(address)) {
+          val existingAppId = connectedApps(address)
+          if (!existingAppId.equals(appId)) {
+            logError(s"A new app '$appId' has connected to existing address $address, " +
+              s"removing previously registered app '$existingAppId'.")
+            applicationRemoved(existingAppId, true)
+          }
+        }
+        connectedApps(address) = appId
+        callback.onSuccess(new Array[Byte](0))
+      case _ => super.handleMessage(message, client, callback)
+    }
+  }
+
+  /**
+   * On connection termination, clean up shuffle files written by the associated application.
+   */
+  override def connectionTerminated(client: TransportClient): Unit = {
+    val address = client.getSocketAddress
+    if (connectedApps.contains(address)) {
+      val appId = connectedApps(address)
+      logInfo(s"Application $appId disconnected (address was $address).")
+      applicationRemoved(appId, true /* cleanupLocalDirs */)
+      connectedApps.remove(address)
+    } else {
+      logWarning(s"Unknown $address disconnected.")
+    }
+  }
+
+  /** An extractor object for matching [[RegisterDriver]] message. */
+  private object RegisterDriverParam {
+    def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
+  }
+}
+
+/**
+ * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
+ * to associate with. This allows the shuffle service to detect when a driver is terminated
+ * and can clean up the associated shuffle files.
+ */
+private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager)
+  extends ExternalShuffleService(conf, securityManager) {
+
+  protected override def newShuffleBlockHandler(
+      conf: TransportConf): ExternalShuffleBlockHandler = {
+    new MesosExternalShuffleBlockHandler(conf)
+  }
+}
+
+private[spark] object MesosExternalShuffleService extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    ExternalShuffleService.main(args,
+      (conf: SparkConf, sm: SecurityManager) => new MesosExternalShuffleService(conf, sm))
+  }
+}
+
+
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
index d2b2baef1d8c4f00a35feefaafbc174305365b5e..dfcbc51cdf6168eb4240ab33acdb899ee01109d6 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEndpoint.scala
@@ -47,11 +47,11 @@ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
  *
  * It is guaranteed that `onStart`, `receive` and `onStop` will be called in sequence.
  *
- * The lift-cycle will be:
+ * The life-cycle of an endpoint is:
  *
- * constructor onStart receive* onStop
+ * constructor -> onStart -> receive* -> onStop
  *
- * Note: `receive` can be called concurrently. If you want `receive` is thread-safe, please use
+ * Note: `receive` can be called concurrently. If you want `receive` to be thread-safe, please use
  * [[ThreadSafeRpcEndpoint]]
  *
  * If any error is thrown from one of [[RpcEndpoint]] methods except `onError`, `onError` will be
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index b7fde0d9b32655fa44c39d351ee3b47365f3777e..15a0915708c7c54565141f99e15719408431d44b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -26,12 +26,15 @@ import scala.collection.mutable.{HashMap, HashSet}
 
 import com.google.common.collect.HashBiMap
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}
+
+import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -46,7 +49,8 @@ import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 private[spark] class CoarseMesosSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext,
-    master: String)
+    master: String,
+    securityManager: SecurityManager)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
   with MScheduler
   with MesosSchedulerUtils {
@@ -56,12 +60,19 @@ private[spark] class CoarseMesosSchedulerBackend(
   // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
   val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
 
+  // If shuffle service is enabled, the Spark driver will register with the shuffle service.
+  // This is for cleaning up shuffle files reliably.
+  private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
+
   // Cores we have acquired with each Mesos task ID
   val coresByTaskId = new HashMap[Int, Int]
   var totalCoresAcquired = 0
 
   val slaveIdsWithExecutors = new HashSet[String]
 
+  // Maping from slave Id to hostname
+  private val slaveIdToHost = new HashMap[String, String]
+
   val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
   // How many times tasks on each slave failed
   val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
@@ -90,6 +101,19 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
+  // A client for talking to the external shuffle service, if it is a
+  private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
+    if (shuffleServiceEnabled) {
+      Some(new MesosExternalShuffleClient(
+        SparkTransportConf.fromSparkConf(conf),
+        securityManager,
+        securityManager.isAuthenticationEnabled(),
+        securityManager.isSaslEncryptionEnabled()))
+    } else {
+      None
+    }
+  }
+
   var nextMesosTaskId = 0
 
   @volatile var appId: String = _
@@ -188,6 +212,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
     appId = frameworkId.getValue
+    mesosExternalShuffleClient.foreach(_.init(appId))
     logInfo("Registered as framework ID " + appId)
     markRegistered()
   }
@@ -244,6 +269,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
           // accept the offer and launch the task
           logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+          slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
           d.launchTasks(
             Collections.singleton(offer.getId),
             Collections.singleton(taskBuilder.build()), filters)
@@ -261,7 +287,27 @@ private[spark] class CoarseMesosSchedulerBackend(
     val taskId = status.getTaskId.getValue.toInt
     val state = status.getState
     logInfo(s"Mesos task $taskId is now $state")
+    val slaveId: String = status.getSlaveId.getValue
     stateLock.synchronized {
+      // If the shuffle service is enabled, have the driver register with each one of the
+      // shuffle services. This allows the shuffle services to clean up state associated with
+      // this application when the driver exits. There is currently not a great way to detect
+      // this through Mesos, since the shuffle services are set up independently.
+      if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
+          slaveIdToHost.contains(slaveId) &&
+          shuffleServiceEnabled) {
+        assume(mesosExternalShuffleClient.isDefined,
+          "External shuffle client was not instantiated even though shuffle service is enabled.")
+        // TODO: Remove this and allow the MesosExternalShuffleService to detect
+        // framework termination when new Mesos Framework HTTP API is available.
+        val externalShufflePort = conf.getInt("spark.shuffle.service.port", 7337)
+        val hostname = slaveIdToHost.remove(slaveId).get
+        logDebug(s"Connecting to shuffle service on slave $slaveId, " +
+            s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
+        mesosExternalShuffleClient.get
+          .registerDriverWithShuffleService(hostname, externalShufflePort)
+      }
+
       if (TaskState.isFinished(TaskState.fromMesos(state))) {
         val slaveId = taskIdToSlaveId(taskId)
         slaveIdsWithExecutors -= slaveId
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 4b504df7b88511e97e2d08f6d836ea191b8071a2..525ee0d3bdc5a607cefc9214ab74da29acc0d6d3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -30,7 +30,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SecurityManager, SparkFunSuite}
 
 class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
     with LocalSparkContext
@@ -59,7 +59,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
   private def createSchedulerBackend(
       taskScheduler: TaskSchedulerImpl,
       driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
-    val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
+    val securityManager = mock[SecurityManager]
+    val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master", securityManager) {
       override protected def createSchedulerDriver(
         masterUrl: String,
         scheduler: Scheduler,
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index de85720febf23f2a5ce61879c559cd68633970a1..5f95e2c74f902ca51680cb8907f19b62959c9205 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -69,7 +69,8 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
     } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
       javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
       memKey = "SPARK_EXECUTOR_MEMORY";
-    } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService")) {
+    } else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") ||
+        className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) {
       javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
       javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
       memKey = "SPARK_DAEMON_MEMORY";
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 37f2e34ceb24d0f6163af315b6f7395769acd253..e8e7f06247d3e8793d27b2100ba83e6129c9ab96 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.client;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.SocketAddress;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -79,6 +80,10 @@ public class TransportClient implements Closeable {
     return channel.isOpen() || channel.isActive();
   }
 
+  public SocketAddress getSocketAddress() {
+    return channel.remoteAddress();
+  }
+
   /**
    * Requests a single chunk from the remote side, from the pre-negotiated streamId.
    *
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
index e4faaf8854fc7455eeb08d8b754c654f94490539..db9dc4f17cee9b7fe7862a10725992b9b172ce5b 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java
@@ -65,7 +65,13 @@ public class ExternalShuffleBlockHandler extends RpcHandler {
   @Override
   public void receive(TransportClient client, byte[] message, RpcResponseCallback callback) {
     BlockTransferMessage msgObj = BlockTransferMessage.Decoder.fromByteArray(message);
+    handleMessage(msgObj, client, callback);
+  }
 
+  protected void handleMessage(
+      BlockTransferMessage msgObj,
+      TransportClient client,
+      RpcResponseCallback callback) {
     if (msgObj instanceof OpenBlocks) {
       OpenBlocks msg = (OpenBlocks) msgObj;
       List<ManagedBuffer> blocks = Lists.newArrayList();
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 612bce571a49316116f8cbd586c9c0c2f2349062..ea6d248d66be36ad3bb305a93737d78940b60782 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -50,8 +50,8 @@ public class ExternalShuffleClient extends ShuffleClient {
   private final boolean saslEncryptionEnabled;
   private final SecretKeyHolder secretKeyHolder;
 
-  private TransportClientFactory clientFactory;
-  private String appId;
+  protected TransportClientFactory clientFactory;
+  protected String appId;
 
   /**
    * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
@@ -71,6 +71,10 @@ public class ExternalShuffleClient extends ShuffleClient {
     this.saslEncryptionEnabled = saslEncryptionEnabled;
   }
 
+  protected void checkInit() {
+    assert appId != null : "Called before init()";
+  }
+
   @Override
   public void init(String appId) {
     this.appId = appId;
@@ -89,7 +93,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       final String execId,
       String[] blockIds,
       BlockFetchingListener listener) {
-    assert appId != null : "Called before init()";
+    checkInit();
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId);
     try {
       RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
@@ -132,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       int port,
       String execId,
       ExecutorShuffleInfo executorInfo) throws IOException {
-    assert appId != null : "Called before init()";
+    checkInit();
     TransportClient client = clientFactory.createClient(host, port);
     byte[] registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteArray();
     client.sendRpcSync(registerMessage, 5000 /* timeoutMs */);
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
new file mode 100644
index 0000000000000000000000000000000000000000..7543b6be4f2a1ffb7337c0ff06d11f61f15135f8
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.mesos;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.util.TransportConf;
+
+/**
+ * A client for talking to the external shuffle service in Mesos coarse-grained mode.
+ *
+ * This is used by the Spark driver to register with each external shuffle service on the cluster.
+ * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
+ * after the application exits. Mesos does not provide a great alternative to do this, so Spark
+ * has to detect this itself.
+ */
+public class MesosExternalShuffleClient extends ExternalShuffleClient {
+  private final Logger logger = LoggerFactory.getLogger(MesosExternalShuffleClient.class);
+
+  /**
+   * Creates an Mesos external shuffle client that wraps the {@link ExternalShuffleClient}.
+   * Please refer to docs on {@link ExternalShuffleClient} for more information.
+   */
+  public MesosExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled,
+      boolean saslEncryptionEnabled) {
+    super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
+  }
+
+  public void registerDriverWithShuffleService(String host, int port) throws IOException {
+    checkInit();
+    byte[] registerDriver = new RegisterDriver(appId).toByteArray();
+    TransportClient client = clientFactory.createClient(host, port);
+    client.sendRpc(registerDriver, new RpcResponseCallback() {
+      @Override
+      public void onSuccess(byte[] response) {
+        logger.info("Successfully registered app " + appId + " with external shuffle service.");
+      }
+
+      @Override
+      public void onFailure(Throwable e) {
+        logger.warn("Unable to register app " + appId + " with external shuffle service. " +
+          "Please manually remove shuffle data after driver exit. Error: " + e);
+      }
+    });
+  }
+}
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 6c1210b33268a65e881a1bd9d05ffe83e06608d4..fcb52363e632c6ab4a63ee83b5511eaa512d7697 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 
 import org.apache.spark.network.protocol.Encodable;
+import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
 
 /**
  * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
@@ -37,7 +38,7 @@ public abstract class BlockTransferMessage implements Encodable {
 
   /** Preceding every serialized message is its type, which allows us to deserialize it. */
   public static enum Type {
-    OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3);
+    OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4);
 
     private final byte id;
 
@@ -60,6 +61,7 @@ public abstract class BlockTransferMessage implements Encodable {
         case 1: return UploadBlock.decode(buf);
         case 2: return RegisterExecutor.decode(buf);
         case 3: return StreamHandle.decode(buf);
+        case 4: return RegisterDriver.decode(buf);
         default: throw new IllegalArgumentException("Unknown message type: " + type);
       }
     }
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
new file mode 100644
index 0000000000000000000000000000000000000000..1c28fc1dff24631945da3b748baed0cf6efadbad
--- /dev/null
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol.mesos;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+
+/**
+ * A message sent from the driver to register with the MesosExternalShuffleService.
+ */
+public class RegisterDriver extends BlockTransferMessage {
+  private final String appId;
+
+  public RegisterDriver(String appId) {
+    this.appId = appId;
+  }
+
+  public String getAppId() { return appId; }
+
+  @Override
+  protected Type type() { return Type.REGISTER_DRIVER; }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId);
+  }
+
+  public static RegisterDriver decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    return new RegisterDriver(appId);
+  }
+}
diff --git a/sbin/start-mesos-shuffle-service.sh b/sbin/start-mesos-shuffle-service.sh
new file mode 100755
index 0000000000000000000000000000000000000000..64580762c5dc471dd13db63a4e661273c27190a9
--- /dev/null
+++ b/sbin/start-mesos-shuffle-service.sh
@@ -0,0 +1,35 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the Mesos external shuffle server on the machine this script is executed on.
+# The Mesos external shuffle service detects when an application exits and automatically
+# cleans up its shuffle files.
+#
+# Usage: start-mesos-shuffle-server.sh
+#
+# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle service configuration.
+#
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+. "$sbin/spark-config.sh"
+. "$SPARK_PREFIX/bin/load-spark-env.sh"
+
+exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.mesos.MesosExternalShuffleService 1
diff --git a/sbin/stop-mesos-shuffle-service.sh b/sbin/stop-mesos-shuffle-service.sh
new file mode 100755
index 0000000000000000000000000000000000000000..0e965d5ec5886b3ab10080144b78696572e657da
--- /dev/null
+++ b/sbin/stop-mesos-shuffle-service.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the Mesos external shuffle service on the machine this script is executed on.
+
+sbin="`dirname "$0"`"
+sbin="`cd "$sbin"; pwd`"
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.mesos.MesosExternalShuffleService 1