From 266b92faffb66af24d8ed2725beb80770a2d91f8 Mon Sep 17 00:00:00 2001
From: Michael Gummelt <mgummelt@mesosphere.io>
Date: Fri, 29 Jul 2016 05:50:47 -0700
Subject: [PATCH] [SPARK-16637] Unified containerizer

## What changes were proposed in this pull request?

New config var: spark.mesos.docker.containerizer={"mesos","docker" (default)}

This adds support for running docker containers via the Mesos unified containerizer: http://mesos.apache.org/documentation/latest/container-image/

The benefit is losing the dependency on `dockerd`, and all the costs which it incurs.

I've also updated the supported Mesos version to 0.28.2 for support of the required protobufs.

This is blocked on: https://github.com/apache/spark/pull/14167

## How was this patch tested?

- manually testing jobs submitted with both "mesos" and "docker" settings for the new config var.
- spark/mesos integration test suite

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #14275 from mgummelt/unified-containerizer.
---
 .../scala/org/apache/spark/SparkConf.scala    | 13 ++-
 .../scala/org/apache/spark/TaskState.scala    |  8 +-
 .../deploy/mesos/MesosDriverDescription.scala | 11 ++-
 .../spark/deploy/mesos/ui/DriverPage.scala    |  2 +-
 .../cluster/mesos/MesosClusterScheduler.scala | 85 ++++++++++---------
 .../MesosCoarseGrainedSchedulerBackend.scala  |  2 +-
 .../MesosFineGrainedSchedulerBackend.scala    |  2 +-
 .../mesos/MesosSchedulerBackendUtil.scala     | 46 +++++++---
 .../cluster/mesos/MesosSchedulerUtils.scala   |  3 +-
 ...osCoarseGrainedSchedulerBackendSuite.scala | 32 ++++++-
 dev/deps/spark-deps-hadoop-2.2                |  2 +-
 dev/deps/spark-deps-hadoop-2.3                |  2 +-
 dev/deps/spark-deps-hadoop-2.4                |  2 +-
 dev/deps/spark-deps-hadoop-2.6                |  2 +-
 dev/deps/spark-deps-hadoop-2.7                |  2 +-
 docs/_config.yml                              |  2 +-
 docs/running-on-mesos.md                      | 10 +++
 pom.xml                                       |  2 +-
 18 files changed, 149 insertions(+), 79 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index f6af9ccc41..b6d244b1a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
  *
  * @param loadDefaults whether to also load values from Java system properties
  */
-class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
+class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
 
   import SparkConf._
 
@@ -370,6 +370,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
   }
 
+  /** Get all parameters that start with `prefix` */
+  def getAllWithPrefix(prefix: String): Array[(String, String)] = {
+    getAll.filter { case (k, v) => k.startsWith(prefix) }
+      .map { case (k, v) => (k.substring(prefix.length), v) }
+  }
+
+
   /** Get a parameter as an integer, falling back to a default if not set */
   def getInt(key: String, defaultValue: Int): Int = {
     getOption(key).map(_.toInt).getOrElse(defaultValue)
@@ -392,9 +399,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
   /** Get all executor environment variables set on this SparkConf */
   def getExecutorEnv: Seq[(String, String)] = {
-    val prefix = "spark.executorEnv."
-    getAll.filter{case (k, v) => k.startsWith(prefix)}
-          .map{case (k, v) => (k.substring(prefix.length), v)}
+    getAllWithPrefix("spark.executorEnv.")
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index fe19f07e32..d232fae6b1 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -41,13 +41,11 @@ private[spark] object TaskState extends Enumeration {
   }
 
   def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
-    case MesosTaskState.TASK_STAGING => LAUNCHING
-    case MesosTaskState.TASK_STARTING => LAUNCHING
-    case MesosTaskState.TASK_RUNNING => RUNNING
+    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
+    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
     case MesosTaskState.TASK_FINISHED => FINISHED
     case MesosTaskState.TASK_FAILED => FAILED
     case MesosTaskState.TASK_KILLED => KILLED
-    case MesosTaskState.TASK_LOST => LOST
-    case MesosTaskState.TASK_ERROR => LOST
+    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
index 1948226800..d4c7022f00 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.mesos
 
 import java.util.Date
 
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy.Command
 import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
 
@@ -40,12 +41,15 @@ private[spark] class MesosDriverDescription(
     val cores: Double,
     val supervise: Boolean,
     val command: Command,
-    val schedulerProperties: Map[String, String],
+    schedulerProperties: Map[String, String],
     val submissionId: String,
     val submissionDate: Date,
     val retryState: Option[MesosClusterRetryState] = None)
   extends Serializable {
 
+  val conf = new SparkConf(false)
+  schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
+
   def copy(
       name: String = name,
       jarUrl: String = jarUrl,
@@ -53,11 +57,12 @@ private[spark] class MesosDriverDescription(
       cores: Double = cores,
       supervise: Boolean = supervise,
       command: Command = command,
-      schedulerProperties: Map[String, String] = schedulerProperties,
+      schedulerProperties: SparkConf = conf,
       submissionId: String = submissionId,
       submissionDate: Date = submissionDate,
       retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
-    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
+
+    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
       submissionId, submissionDate, retryState)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
index 807835105e..cd98110ddc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -50,7 +50,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
     val driverDescription = Iterable.apply(driverState.description)
     val submissionState = Iterable.apply(driverState.submissionState)
     val command = Iterable.apply(driverState.description.command)
-    val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
+    val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
     val commandEnv = Iterable.apply(driverState.description.command.environment)
     val driverTable =
       UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1e9644d06e..ae531e1997 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -353,19 +353,16 @@ private[spark] class MesosClusterScheduler(
     }
   }
 
-  private def getDriverExecutorURI(desc: MesosDriverDescription) = {
-    desc.schedulerProperties.get("spark.executor.uri")
-      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+  private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
+    desc.conf.getOption("spark.executor.uri")
+        .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
   }
 
   private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
     val env = {
-      val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+      val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
       val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
-
-      val prefix = "spark.mesos.driverEnv."
-      val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
-        .map { case (k, v) => (k.substring(prefix.length), v) }
+      val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
 
       driverEnv ++ executorEnv ++ desc.command.environment
     }
@@ -379,8 +376,8 @@ private[spark] class MesosClusterScheduler(
 
   private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
     val confUris = List(conf.getOption("spark.mesos.uris"),
-      desc.schedulerProperties.get("spark.mesos.uris"),
-      desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
+      desc.conf.getOption("spark.mesos.uris"),
+      desc.conf.getOption("spark.submit.pyFiles")).flatMap(
       _.map(_.split(",").map(_.trim))
     ).flatten
 
@@ -391,7 +388,7 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverCommandValue(desc: MesosDriverDescription): String = {
-    val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
+    val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
     val executorUri = getDriverExecutorURI(desc)
     // Gets the path to run spark-submit, and the path to the Mesos sandbox.
     val (executable, sandboxPath) = if (dockerDefined) {
@@ -411,7 +408,7 @@ private[spark] class MesosClusterScheduler(
       // Sandbox path points to the parent folder as we chdir into the folderBasename.
       (cmdExecutable, "..")
     } else {
-      val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
+      val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
         .orElse(conf.getOption("spark.home"))
         .orElse(Option(System.getenv("SPARK_HOME")))
         .getOrElse {
@@ -438,7 +435,7 @@ private[spark] class MesosClusterScheduler(
 
   private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
     var options = Seq(
-      "--name", desc.schedulerProperties("spark.app.name"),
+      "--name", desc.conf.get("spark.app.name"),
       "--master", s"mesos://${conf.get("spark.master")}",
       "--driver-cores", desc.cores.toString,
       "--driver-memory", s"${desc.mem}M")
@@ -454,19 +451,19 @@ private[spark] class MesosClusterScheduler(
       options ++= Seq("--class", desc.command.mainClass)
     }
 
-    desc.schedulerProperties.get("spark.executor.memory").map { v =>
+    desc.conf.getOption("spark.executor.memory").foreach { v =>
       options ++= Seq("--executor-memory", v)
     }
-    desc.schedulerProperties.get("spark.cores.max").map { v =>
+    desc.conf.getOption("spark.cores.max").foreach { v =>
       options ++= Seq("--total-executor-cores", v)
     }
-    desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+    desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
       val formattedFiles = pyFiles.split(",")
         .map { path => new File(sandboxPath, path.split("/").last).toString() }
         .mkString(",")
       options ++= Seq("--py-files", formattedFiles)
     }
-    desc.schedulerProperties
+    desc.conf.getAll
       .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
       .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
     options
@@ -476,6 +473,7 @@ private[spark] class MesosClusterScheduler(
    * Escape args for Unix-like shells, unless already quoted by the user.
    * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
    * and http://www.grymoire.com/Unix/Quote.html
+ *
    * @param value argument
    * @return escaped argument
    */
@@ -498,6 +496,33 @@ private[spark] class MesosClusterScheduler(
     }
   }
 
+  private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
+    val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
+
+    val (remainingResources, cpuResourcesToUse) =
+      partitionResources(offer.resources, "cpus", desc.cores)
+    val (finalResources, memResourcesToUse) =
+      partitionResources(remainingResources.asJava, "mem", desc.mem)
+    offer.resources = finalResources.asJava
+
+    val appName = desc.conf.get("spark.app.name")
+    val taskInfo = TaskInfo.newBuilder()
+      .setTaskId(taskId)
+      .setName(s"Driver for ${appName}")
+      .setSlaveId(offer.slaveId)
+      .setCommand(buildDriverCommand(desc))
+      .addAllResources(cpuResourcesToUse.asJava)
+      .addAllResources(memResourcesToUse.asJava)
+
+    desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
+        desc.conf,
+        taskInfo.getContainerBuilder)
+    }
+
+    taskInfo.build
+  }
+
   /**
    * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
    * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@@ -521,32 +546,12 @@ private[spark] class MesosClusterScheduler(
           s"cpu: $driverCpu, mem: $driverMem")
       } else {
         val offer = offerOption.get
-        val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
-        val (remainingResources, cpuResourcesToUse) =
-          partitionResources(offer.resources, "cpus", driverCpu)
-        val (finalResources, memResourcesToUse) =
-          partitionResources(remainingResources.asJava, "mem", driverMem)
-        val commandInfo = buildDriverCommand(submission)
-        val appName = submission.schedulerProperties("spark.app.name")
-        val taskInfo = TaskInfo.newBuilder()
-          .setTaskId(taskId)
-          .setName(s"Driver for $appName")
-          .setSlaveId(offer.slaveId)
-          .setCommand(commandInfo)
-          .addAllResources(cpuResourcesToUse.asJava)
-          .addAllResources(memResourcesToUse.asJava)
-        offer.resources = finalResources.asJava
-        submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
-          MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-            image,
-            submission.schedulerProperties.get,
-            taskInfo.getContainerBuilder())
-        }
         val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
-        queuedTasks += taskInfo.build()
+        val task = createTaskInfo(submission, offer)
+        queuedTasks += task
         logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
           submission.submissionId)
-        val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
+        val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
           None, new Date(), None)
         launchedDrivers(submission.submissionId) = newState
         launchedDriversState.persist(submission.submissionId, newState)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 52993caad1..959d6fd46d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -410,7 +410,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
             MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
               image,
-              sc.conf.getOption,
+              sc.conf,
               taskBuilder.getContainerBuilder
             )
           }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 8d4fc9eed7..d8d661da31 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -153,7 +153,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
       MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
         image,
-        sc.conf.getOption,
+        sc.conf,
         executorInfo.getContainerBuilder()
       )
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index aa669f01bd..3fe06743b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.mesos.Protos.{ContainerInfo, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
 import org.apache.mesos.Protos.ContainerInfo.DockerInfo
 
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 
 /**
@@ -104,19 +105,33 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
   def addDockerInfo(
       container: ContainerInfo.Builder,
       image: String,
+      containerizer: String,
       forcePullImage: Boolean = false,
       volumes: Option[List[Volume]] = None,
-      network: Option[ContainerInfo.DockerInfo.Network] = None,
       portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
 
-    val docker = ContainerInfo.DockerInfo.newBuilder()
-      .setImage(image)
-      .setForcePullImage(forcePullImage)
+    containerizer match {
+      case "docker" =>
+        container.setType(ContainerInfo.Type.DOCKER)
+        val docker = ContainerInfo.DockerInfo.newBuilder()
+          .setImage(image)
+          .setForcePullImage(forcePullImage)
+        // TODO (mgummelt): Remove this. Portmaps have no effect,
+        //                  as we don't support bridge networking.
+        portmaps.foreach(_.foreach(docker.addPortMappings))
+        container.setDocker(docker)
+      case "mesos" =>
+        container.setType(ContainerInfo.Type.MESOS)
+        val imageProto = Image.newBuilder()
+          .setType(Image.Type.DOCKER)
+          .setDocker(Image.Docker.newBuilder().setName(image))
+          .setCached(!forcePullImage)
+        container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
+      case _ =>
+        throw new SparkException(
+          "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
+    }
 
-    network.foreach(docker.setNetwork)
-    portmaps.foreach(_.foreach(docker.addPortMappings))
-    container.setType(ContainerInfo.Type.DOCKER)
-    container.setDocker(docker.build())
     volumes.foreach(_.foreach(container.addVolumes))
   }
 
@@ -125,18 +140,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
    */
   def setupContainerBuilderDockerInfo(
     imageName: String,
-    conf: String => Option[String],
+    conf: SparkConf,
     builder: ContainerInfo.Builder): Unit = {
-    val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+    val forcePullImage = conf
+      .getOption("spark.mesos.executor.docker.forcePullImage")
       .exists(_.equals("true"))
-    val volumes = conf("spark.mesos.executor.docker.volumes")
+    val volumes = conf
+      .getOption("spark.mesos.executor.docker.volumes")
       .map(parseVolumesSpec)
-    val portmaps = conf("spark.mesos.executor.docker.portmaps")
+    val portmaps = conf
+      .getOption("spark.mesos.executor.docker.portmaps")
       .map(parsePortMappingsSpec)
 
+    val containerizer = conf.get("spark.mesos.containerizer", "docker")
     addDockerInfo(
       builder,
       imageName,
+      containerizer,
       forcePullImage = forcePullImage,
       volumes = volumes,
       portmaps = portmaps)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 7355ba317d..cd4b45f8de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
+
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
  * methods and Mesos scheduler will use.
@@ -79,7 +80,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
       credBuilder.setPrincipal(principal)
     }
     conf.getOption("spark.mesos.secret").foreach { secret =>
-      credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+      credBuilder.setSecret(secret)
     }
     if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
       throw new SparkException(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 51d262e75e..a74fdf79a1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -109,7 +109,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
 
-    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
     assert(cpus == executorCores)
   }
 
@@ -123,7 +123,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
 
-    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
     assert(cpus == offerCores)
   }
 
@@ -137,7 +137,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
 
-    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
     assert(cpus == maxCores)
   }
 
@@ -252,6 +252,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend.start()
   }
 
+  test("honors unset spark.mesos.containerizer") {
+    setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
+  }
+
+  test("honors spark.mesos.containerizer=\"mesos\"") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "test",
+      "spark.mesos.containerizer" -> "mesos"))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
+  }
+
   test("docker settings are reflected in created tasks") {
     setBackend(Map(
       "spark.mesos.executor.docker.image" -> "some_image",
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 9350b9df50..d0771e1ac8 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
 libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 2e1a6a3dc6..ef97ffd9ab 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 9baf87e532..fba3c18b14 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 9112452b5c..9747acda81 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index b0e3e9304b..7231bcaf6c 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/docs/_config.yml b/docs/_config.yml
index bbb576e0e7..e4fc093fe7 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
 SPARK_VERSION_SHORT: 2.1.0
 SCALA_BINARY_VERSION: "2.11"
 SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.22.0
+MESOS_VERSION: 1.0.0
 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
 SPARK_GITHUB_URL: https://github.com/apache/spark
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ce888b5445..d037e7be0a 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -432,6 +432,16 @@ See the [configuration page](configuration.html) for information on Spark config
     </ul>
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.containerizer</code></td>
+  <td><code>docker</code></td>
+  <td>
+    This only affects docker containers, and must be one of "docker"
+    or "mesos".  Mesos supports two types of
+    containerizers for docker: the "docker" containerizer, and the preferred
+    "mesos" containerizer.  Read more here: http://mesos.apache.org/documentation/latest/container-image/
+  </td>
+</tr>
 <tr>
   <td><code>spark.mesos.driver.webui.url</code></td>
   <td><code>(none)</code></td>
diff --git a/pom.xml b/pom.xml
index 9b7be371bb..0491e981d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>0.22.2</mesos.version>
+    <mesos.version>1.0.0</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-- 
GitLab