diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1e9644d06e1d0a0ddd75db9c5e627f44e36abdf4..39b0f4d0e2c0e21fce74107a9b0119dc25be17e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -537,10 +537,16 @@ private[spark] class MesosClusterScheduler(
           .addAllResources(memResourcesToUse.asJava)
         offer.resources = finalResources.asJava
         submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
-          MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-            image,
-            submission.schedulerProperties.get,
-            taskInfo.getContainerBuilder())
+          val container = taskInfo.getContainerBuilder()
+          val volumes = submission.schedulerProperties
+            .get("spark.mesos.executor.docker.volumes")
+            .map(MesosSchedulerBackendUtil.parseVolumesSpec)
+          val portmaps = submission.schedulerProperties
+            .get("spark.mesos.executor.docker.portmaps")
+            .map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
+          MesosSchedulerBackendUtil.addDockerInfo(
+            container, image, volumes = volumes, portmaps = portmaps)
+          taskInfo.setContainer(container.build())
         }
         val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
         queuedTasks += taskInfo.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 52993caad1aa992143753079bb9ed8a2e76018ec..99e6d3958374791a0c6b9a60287e29a78b6482ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -408,11 +408,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             .addAllResources(memResourcesToUse.asJava)
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-            MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-              image,
-              sc.conf.getOption,
-              taskBuilder.getContainerBuilder
-            )
+            MesosSchedulerBackendUtil
+              .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
           }
 
           tasks(offer.getId) ::= taskBuilder.build()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 8d4fc9eed7af765fdbdb0c6a044efc2aeefaa4d5..e08dc3b5957bbf8109d665c98f079f682a7fe267 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -151,11 +151,8 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       .setData(ByteString.copyFrom(createExecArg()))
 
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-        image,
-        sc.conf.getOption,
-        executorInfo.getContainerBuilder()
-      )
+      MesosSchedulerBackendUtil
+        .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
     }
 
     (executorInfo.build(), resourcesAfterMem.asJava)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index aa669f01bd60785e68050b1e82b02d5cc97024b4..05b2b089440980c12a8ea774edaf2d2b878700b1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
 import org.apache.mesos.Protos.{ContainerInfo, Volume}
 import org.apache.mesos.Protos.ContainerInfo.DockerInfo
 
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 
 /**
@@ -104,14 +105,11 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
   def addDockerInfo(
       container: ContainerInfo.Builder,
       image: String,
-      forcePullImage: Boolean = false,
       volumes: Option[List[Volume]] = None,
       network: Option[ContainerInfo.DockerInfo.Network] = None,
       portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
 
-    val docker = ContainerInfo.DockerInfo.newBuilder()
-      .setImage(image)
-      .setForcePullImage(forcePullImage)
+    val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
 
     network.foreach(docker.setNetwork)
     portmaps.foreach(_.foreach(docker.addPortMappings))
@@ -121,23 +119,21 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
   }
 
   /**
-   * Setup a docker containerizer from MesosDriverDescription scheduler properties
+   * Setup a docker containerizer
    */
   def setupContainerBuilderDockerInfo(
     imageName: String,
-    conf: String => Option[String],
+    conf: SparkConf,
     builder: ContainerInfo.Builder): Unit = {
-    val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
-      .exists(_.equals("true"))
-    val volumes = conf("spark.mesos.executor.docker.volumes")
+    val volumes = conf
+      .getOption("spark.mesos.executor.docker.volumes")
       .map(parseVolumesSpec)
-    val portmaps = conf("spark.mesos.executor.docker.portmaps")
+    val portmaps = conf
+      .getOption("spark.mesos.executor.docker.portmaps")
       .map(parsePortMappingsSpec)
-
     addDockerInfo(
       builder,
       imageName,
-      forcePullImage = forcePullImage,
       volumes = volumes,
       portmaps = portmaps)
     logDebug("setupContainerDockerInfo: using docker image: " + imageName)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d3a85c654ee91c5dc11ed34ca46478fa4cbe98b3..c2779d7b3565a8689ca2ccddf7b91c80d012ba55 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -252,69 +252,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend.start()
   }
 
-  test("docker settings are reflected in created tasks") {
-    setBackend(Map(
-      "spark.mesos.executor.docker.image" -> "some_image",
-      "spark.mesos.executor.docker.forcePullImage" -> "true",
-      "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
-      "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
-    ))
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val launchedTasks = verifyTaskLaunched("o1").asScala
-    assert(launchedTasks.size == 1)
-
-    val containerInfo = launchedTasks.head.getContainer
-    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
-    val volumes = containerInfo.getVolumesList.asScala
-    assert(volumes.size == 1)
-
-    val volume = volumes.head
-    assert(volume.getHostPath == "/host_vol")
-    assert(volume.getContainerPath == "/container_vol")
-    assert(volume.getMode == Volume.Mode.RO)
-
-    val dockerInfo = containerInfo.getDocker
-
-    assert(dockerInfo.getImage == "some_image")
-    assert(dockerInfo.getForcePullImage)
-
-    val portMappings = dockerInfo.getPortMappingsList.asScala
-    assert(portMappings.size == 1)
-
-    val portMapping = portMappings.head
-    assert(portMapping.getHostPort == 8080)
-    assert(portMapping.getContainerPort == 80)
-    assert(portMapping.getProtocol == "tcp")
-  }
-
-  test("force-pull-image option is disabled by default") {
-    setBackend(Map(
-      "spark.mesos.executor.docker.image" -> "some_image"
-    ))
-
-    val (mem, cpu) = (backend.executorMemory(sc), 4)
-
-    val offer1 = createOffer("o1", "s1", mem, cpu)
-    backend.resourceOffers(driver, List(offer1).asJava)
-
-    val launchedTasks = verifyTaskLaunched("o1").asScala
-    assert(launchedTasks.size == 1)
-
-    val containerInfo = launchedTasks.head.getContainer
-    assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
-    val dockerInfo = containerInfo.getDocker
-
-    assert(dockerInfo.getImage == "some_image")
-    assert(!dockerInfo.getForcePullImage)
-  }
-
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index fcf39f63915f7a520e53a65c3570e1a9283ee182..41693b1191a3c10be97e809cd071263e64a67be2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -150,7 +150,6 @@ class MesosFineGrainedSchedulerBackendSuite
 
     val conf = new SparkConf()
       .set("spark.mesos.executor.docker.image", "spark/mock")
-      .set("spark.mesos.executor.docker.forcePullImage", "true")
       .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
       .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
 
@@ -170,7 +169,6 @@ class MesosFineGrainedSchedulerBackendSuite
     val (execInfo, _) = backend.createExecutorInfo(
       Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
     assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
-    assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
     val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
     assert(portmaps.get(0).getHostPort.equals(80))
     assert(portmaps.get(0).getContainerPort.equals(8080))
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index ff1587314030e96592999e15cfd5bbe0796c408b..5d536b7c245e38f2bdaf40bdb39698859717dfd2 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
 libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-0.21.1-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 2b5764f8686907e3e1f4692d9c6d94757319f996..d16f42a97d37052389cc2b31b6b7ee0c6162623a 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-0.21.1-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 3f53fdb09c64acbe194283548778012e663984e0..2e261cb9a543274d0a574bb41b0ffb02f638a810 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-0.21.1-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index d3a7ab8bb457dcbb99828873f00c7239ef67308c..67f38f4c220de1b5758abad17a71047faac92fa0 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-0.21.1-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 05317a044d65e40fcbb55075e0cd8c8356ca11fb..07583963d913b76a12a5faf3c9d6a179d17710aa 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-0.21.1-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/docs/_config.yml b/docs/_config.yml
index bbb576e0e7bb3b84b0c7cf1e4d2ae521b7494fdf..be3d8a2fe6022bde20c47cf21bfb5b7270131143 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
 SPARK_VERSION_SHORT: 2.1.0
 SCALA_BINARY_VERSION: "2.11"
 SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.22.0
+MESOS_VERSION: 0.21.0
 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
 SPARK_GITHUB_URL: https://github.com/apache/spark
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ce888b544516e1549f83fd59e90c18e1082acfd5..10dc9ce890041c584c9cb70c65aac2160a5d1547 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -260,10 +260,6 @@ have Mesos download Spark via the usual methods.
 
 Requires Mesos version 0.20.1 or later.
 
-Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
-tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
-image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
-
 # Running Alongside Hadoop
 
 You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
@@ -338,14 +334,6 @@ See the [configuration page](configuration.html) for information on Spark config
     the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY</code>.
   </td>
 </tr>
-<tr>
-  <td><code>spark.mesos.executor.docker.forcePullImage</code></td>
-  <td>false</td>
-  <td>
-    Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
-    By default Mesos agents will not pull images they already have cached.
-  </td>
-</tr>
 <tr>
   <td><code>spark.mesos.executor.docker.volumes</code></td>
   <td>(none)</td>
diff --git a/pom.xml b/pom.xml
index b69292d1880905e1130ed593ce926aeb2fefc530..d064cb57dd6cf3e9f13cebf8d941af875dae0a03 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>0.22.2</mesos.version>
+    <mesos.version>0.21.1</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>