diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index f6af9ccc41b39114ec06ec3e0fdd303c692216ef..b6d244b1a0b630d0fe44ab51652d41e022336291 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -47,7 +47,7 @@ import org.apache.spark.util.Utils
  *
  * @param loadDefaults whether to also load values from Java system properties
  */
-class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
+class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Serializable {
 
   import SparkConf._
 
@@ -370,6 +370,13 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
     settings.entrySet().asScala.map(x => (x.getKey, x.getValue)).toArray
   }
 
+  /** Get all parameters that start with `prefix` */
+  def getAllWithPrefix(prefix: String): Array[(String, String)] = {
+    getAll.filter { case (k, v) => k.startsWith(prefix) }
+      .map { case (k, v) => (k.substring(prefix.length), v) }
+  }
+
+
   /** Get a parameter as an integer, falling back to a default if not set */
   def getInt(key: String, defaultValue: Int): Int = {
     getOption(key).map(_.toInt).getOrElse(defaultValue)
@@ -392,9 +399,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
 
   /** Get all executor environment variables set on this SparkConf */
   def getExecutorEnv: Seq[(String, String)] = {
-    val prefix = "spark.executorEnv."
-    getAll.filter{case (k, v) => k.startsWith(prefix)}
-          .map{case (k, v) => (k.substring(prefix.length), v)}
+    getAllWithPrefix("spark.executorEnv.")
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/TaskState.scala b/core/src/main/scala/org/apache/spark/TaskState.scala
index fe19f07e32d1b701271285f406884a4c1e54e88c..d232fae6b15b33dd38353866e8c41f16fb939050 100644
--- a/core/src/main/scala/org/apache/spark/TaskState.scala
+++ b/core/src/main/scala/org/apache/spark/TaskState.scala
@@ -41,13 +41,11 @@ private[spark] object TaskState extends Enumeration {
   }
 
   def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
-    case MesosTaskState.TASK_STAGING => LAUNCHING
-    case MesosTaskState.TASK_STARTING => LAUNCHING
-    case MesosTaskState.TASK_RUNNING => RUNNING
+    case MesosTaskState.TASK_STAGING | MesosTaskState.TASK_STARTING => LAUNCHING
+    case MesosTaskState.TASK_RUNNING | MesosTaskState.TASK_KILLING => RUNNING
     case MesosTaskState.TASK_FINISHED => FINISHED
     case MesosTaskState.TASK_FAILED => FAILED
     case MesosTaskState.TASK_KILLED => KILLED
-    case MesosTaskState.TASK_LOST => LOST
-    case MesosTaskState.TASK_ERROR => LOST
+    case MesosTaskState.TASK_LOST | MesosTaskState.TASK_ERROR => LOST
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
index 1948226800afefee1a72091fe3531925a1f8cab2..d4c7022f006a106cf1cc9721c003600f5e5e1892 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/MesosDriverDescription.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.mesos
 
 import java.util.Date
 
+import org.apache.spark.SparkConf
 import org.apache.spark.deploy.Command
 import org.apache.spark.scheduler.cluster.mesos.MesosClusterRetryState
 
@@ -40,12 +41,15 @@ private[spark] class MesosDriverDescription(
     val cores: Double,
     val supervise: Boolean,
     val command: Command,
-    val schedulerProperties: Map[String, String],
+    schedulerProperties: Map[String, String],
     val submissionId: String,
     val submissionDate: Date,
     val retryState: Option[MesosClusterRetryState] = None)
   extends Serializable {
 
+  val conf = new SparkConf(false)
+  schedulerProperties.foreach {case (k, v) => conf.set(k, v)}
+
   def copy(
       name: String = name,
       jarUrl: String = jarUrl,
@@ -53,11 +57,12 @@ private[spark] class MesosDriverDescription(
       cores: Double = cores,
       supervise: Boolean = supervise,
       command: Command = command,
-      schedulerProperties: Map[String, String] = schedulerProperties,
+      schedulerProperties: SparkConf = conf,
       submissionId: String = submissionId,
       submissionDate: Date = submissionDate,
       retryState: Option[MesosClusterRetryState] = retryState): MesosDriverDescription = {
-    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, schedulerProperties,
+
+    new MesosDriverDescription(name, jarUrl, mem, cores, supervise, command, conf.getAll.toMap,
       submissionId, submissionDate, retryState)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
index 807835105ec3e4ed42cd00b28976b83a653bf237..cd98110ddcc027cba50e811b92cb1b1002f700f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/mesos/ui/DriverPage.scala
@@ -50,7 +50,7 @@ private[ui] class DriverPage(parent: MesosClusterUI) extends WebUIPage("driver")
     val driverDescription = Iterable.apply(driverState.description)
     val submissionState = Iterable.apply(driverState.submissionState)
     val command = Iterable.apply(driverState.description.command)
-    val schedulerProperties = Iterable.apply(driverState.description.schedulerProperties)
+    val schedulerProperties = Iterable.apply(driverState.description.conf.getAll.toMap)
     val commandEnv = Iterable.apply(driverState.description.command.environment)
     val driverTable =
       UIUtils.listingTable(driverHeaders, driverRow, driverDescription)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1e9644d06e1d0a0ddd75db9c5e627f44e36abdf4..ae531e1997814132226211103b7f5c2ae4bec670 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -353,19 +353,16 @@ private[spark] class MesosClusterScheduler(
     }
   }
 
-  private def getDriverExecutorURI(desc: MesosDriverDescription) = {
-    desc.schedulerProperties.get("spark.executor.uri")
-      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+  private def getDriverExecutorURI(desc: MesosDriverDescription): Option[String] = {
+    desc.conf.getOption("spark.executor.uri")
+        .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
   }
 
   private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
     val env = {
-      val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
+      val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
       val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
-
-      val prefix = "spark.mesos.driverEnv."
-      val driverEnv = desc.schedulerProperties.filterKeys(_.startsWith(prefix))
-        .map { case (k, v) => (k.substring(prefix.length), v) }
+      val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
 
       driverEnv ++ executorEnv ++ desc.command.environment
     }
@@ -379,8 +376,8 @@ private[spark] class MesosClusterScheduler(
 
   private def getDriverUris(desc: MesosDriverDescription): List[CommandInfo.URI] = {
     val confUris = List(conf.getOption("spark.mesos.uris"),
-      desc.schedulerProperties.get("spark.mesos.uris"),
-      desc.schedulerProperties.get("spark.submit.pyFiles")).flatMap(
+      desc.conf.getOption("spark.mesos.uris"),
+      desc.conf.getOption("spark.submit.pyFiles")).flatMap(
       _.map(_.split(",").map(_.trim))
     ).flatten
 
@@ -391,7 +388,7 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverCommandValue(desc: MesosDriverDescription): String = {
-    val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
+    val dockerDefined = desc.conf.contains("spark.mesos.executor.docker.image")
     val executorUri = getDriverExecutorURI(desc)
     // Gets the path to run spark-submit, and the path to the Mesos sandbox.
     val (executable, sandboxPath) = if (dockerDefined) {
@@ -411,7 +408,7 @@ private[spark] class MesosClusterScheduler(
       // Sandbox path points to the parent folder as we chdir into the folderBasename.
       (cmdExecutable, "..")
     } else {
-      val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
+      val executorSparkHome = desc.conf.getOption("spark.mesos.executor.home")
         .orElse(conf.getOption("spark.home"))
         .orElse(Option(System.getenv("SPARK_HOME")))
         .getOrElse {
@@ -438,7 +435,7 @@ private[spark] class MesosClusterScheduler(
 
   private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
     var options = Seq(
-      "--name", desc.schedulerProperties("spark.app.name"),
+      "--name", desc.conf.get("spark.app.name"),
       "--master", s"mesos://${conf.get("spark.master")}",
       "--driver-cores", desc.cores.toString,
       "--driver-memory", s"${desc.mem}M")
@@ -454,19 +451,19 @@ private[spark] class MesosClusterScheduler(
       options ++= Seq("--class", desc.command.mainClass)
     }
 
-    desc.schedulerProperties.get("spark.executor.memory").map { v =>
+    desc.conf.getOption("spark.executor.memory").foreach { v =>
       options ++= Seq("--executor-memory", v)
     }
-    desc.schedulerProperties.get("spark.cores.max").map { v =>
+    desc.conf.getOption("spark.cores.max").foreach { v =>
       options ++= Seq("--total-executor-cores", v)
     }
-    desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
+    desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
       val formattedFiles = pyFiles.split(",")
         .map { path => new File(sandboxPath, path.split("/").last).toString() }
         .mkString(",")
       options ++= Seq("--py-files", formattedFiles)
     }
-    desc.schedulerProperties
+    desc.conf.getAll
       .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
       .foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
     options
@@ -476,6 +473,7 @@ private[spark] class MesosClusterScheduler(
    * Escape args for Unix-like shells, unless already quoted by the user.
    * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
    * and http://www.grymoire.com/Unix/Quote.html
+ *
    * @param value argument
    * @return escaped argument
    */
@@ -498,6 +496,33 @@ private[spark] class MesosClusterScheduler(
     }
   }
 
+  private def createTaskInfo(desc: MesosDriverDescription, offer: ResourceOffer): TaskInfo = {
+    val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
+
+    val (remainingResources, cpuResourcesToUse) =
+      partitionResources(offer.resources, "cpus", desc.cores)
+    val (finalResources, memResourcesToUse) =
+      partitionResources(remainingResources.asJava, "mem", desc.mem)
+    offer.resources = finalResources.asJava
+
+    val appName = desc.conf.get("spark.app.name")
+    val taskInfo = TaskInfo.newBuilder()
+      .setTaskId(taskId)
+      .setName(s"Driver for ${appName}")
+      .setSlaveId(offer.slaveId)
+      .setCommand(buildDriverCommand(desc))
+      .addAllResources(cpuResourcesToUse.asJava)
+      .addAllResources(memResourcesToUse.asJava)
+
+    desc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+      MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(image,
+        desc.conf,
+        taskInfo.getContainerBuilder)
+    }
+
+    taskInfo.build
+  }
+
   /**
    * This method takes all the possible candidates and attempt to schedule them with Mesos offers.
    * Every time a new task is scheduled, the afterLaunchCallback is called to perform post scheduled
@@ -521,32 +546,12 @@ private[spark] class MesosClusterScheduler(
           s"cpu: $driverCpu, mem: $driverMem")
       } else {
         val offer = offerOption.get
-        val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
-        val (remainingResources, cpuResourcesToUse) =
-          partitionResources(offer.resources, "cpus", driverCpu)
-        val (finalResources, memResourcesToUse) =
-          partitionResources(remainingResources.asJava, "mem", driverMem)
-        val commandInfo = buildDriverCommand(submission)
-        val appName = submission.schedulerProperties("spark.app.name")
-        val taskInfo = TaskInfo.newBuilder()
-          .setTaskId(taskId)
-          .setName(s"Driver for $appName")
-          .setSlaveId(offer.slaveId)
-          .setCommand(commandInfo)
-          .addAllResources(cpuResourcesToUse.asJava)
-          .addAllResources(memResourcesToUse.asJava)
-        offer.resources = finalResources.asJava
-        submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
-          MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
-            image,
-            submission.schedulerProperties.get,
-            taskInfo.getContainerBuilder())
-        }
         val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
-        queuedTasks += taskInfo.build()
+        val task = createTaskInfo(submission, offer)
+        queuedTasks += task
         logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
           submission.submissionId)
-        val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
+        val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
           None, new Date(), None)
         launchedDrivers(submission.submissionId) = newState
         launchedDriversState.persist(submission.submissionId, newState)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 52993caad1aa992143753079bb9ed8a2e76018ec..959d6fd46dee9dd33a53807bda43b6569ea1a544 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -410,7 +410,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
             MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
               image,
-              sc.conf.getOption,
+              sc.conf,
               taskBuilder.getContainerBuilder
             )
           }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 8d4fc9eed7af765fdbdb0c6a044efc2aeefaa4d5..d8d661da311f7ef5c08622e495ff9eb4860a80d6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -153,7 +153,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
       MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
         image,
-        sc.conf.getOption,
+        sc.conf,
         executorInfo.getContainerBuilder()
       )
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index aa669f01bd60785e68050b1e82b02d5cc97024b4..3fe06743b8809c0429ab136fc0fd949bcf528054 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import org.apache.mesos.Protos.{ContainerInfo, Volume}
+import org.apache.mesos.Protos.{ContainerInfo, Image, Volume}
 import org.apache.mesos.Protos.ContainerInfo.DockerInfo
 
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 
 /**
@@ -104,19 +105,33 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
   def addDockerInfo(
       container: ContainerInfo.Builder,
       image: String,
+      containerizer: String,
       forcePullImage: Boolean = false,
       volumes: Option[List[Volume]] = None,
-      network: Option[ContainerInfo.DockerInfo.Network] = None,
       portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
 
-    val docker = ContainerInfo.DockerInfo.newBuilder()
-      .setImage(image)
-      .setForcePullImage(forcePullImage)
+    containerizer match {
+      case "docker" =>
+        container.setType(ContainerInfo.Type.DOCKER)
+        val docker = ContainerInfo.DockerInfo.newBuilder()
+          .setImage(image)
+          .setForcePullImage(forcePullImage)
+        // TODO (mgummelt): Remove this. Portmaps have no effect,
+        //                  as we don't support bridge networking.
+        portmaps.foreach(_.foreach(docker.addPortMappings))
+        container.setDocker(docker)
+      case "mesos" =>
+        container.setType(ContainerInfo.Type.MESOS)
+        val imageProto = Image.newBuilder()
+          .setType(Image.Type.DOCKER)
+          .setDocker(Image.Docker.newBuilder().setName(image))
+          .setCached(!forcePullImage)
+        container.setMesos(ContainerInfo.MesosInfo.newBuilder().setImage(imageProto))
+      case _ =>
+        throw new SparkException(
+          "spark.mesos.containerizer must be one of {\"docker\", \"mesos\"}")
+    }
 
-    network.foreach(docker.setNetwork)
-    portmaps.foreach(_.foreach(docker.addPortMappings))
-    container.setType(ContainerInfo.Type.DOCKER)
-    container.setDocker(docker.build())
     volumes.foreach(_.foreach(container.addVolumes))
   }
 
@@ -125,18 +140,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
    */
   def setupContainerBuilderDockerInfo(
     imageName: String,
-    conf: String => Option[String],
+    conf: SparkConf,
     builder: ContainerInfo.Builder): Unit = {
-    val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
+    val forcePullImage = conf
+      .getOption("spark.mesos.executor.docker.forcePullImage")
       .exists(_.equals("true"))
-    val volumes = conf("spark.mesos.executor.docker.volumes")
+    val volumes = conf
+      .getOption("spark.mesos.executor.docker.volumes")
       .map(parseVolumesSpec)
-    val portmaps = conf("spark.mesos.executor.docker.portmaps")
+    val portmaps = conf
+      .getOption("spark.mesos.executor.docker.portmaps")
       .map(parsePortMappingsSpec)
 
+    val containerizer = conf.get("spark.mesos.containerizer", "docker")
     addDockerInfo(
       builder,
       imageName,
+      containerizer,
       forcePullImage = forcePullImage,
       volumes = volumes,
       portmaps = portmaps)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 7355ba317d9a002ffcb550514f7170ee6269f353..cd4b45f8de3d3db54f96ecf18a56a4ec521a5ae7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -33,6 +33,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
 
+
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
  * methods and Mesos scheduler will use.
@@ -79,7 +80,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
       credBuilder.setPrincipal(principal)
     }
     conf.getOption("spark.mesos.secret").foreach { secret =>
-      credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+      credBuilder.setSecret(secret)
     }
     if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
       throw new SparkException(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 51d262e75ef3adf82c6ca4edd23089dbf43ad59b..a74fdf79a13cba2824006731768232a2018360b0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -109,7 +109,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
 
-    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
     assert(cpus == executorCores)
   }
 
@@ -123,7 +123,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
 
-    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
     assert(cpus == offerCores)
   }
 
@@ -137,7 +137,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     val taskInfos = verifyTaskLaunched(driver, "o1")
     assert(taskInfos.length == 1)
 
-    val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
+    val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
     assert(cpus == maxCores)
   }
 
@@ -252,6 +252,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     backend.start()
   }
 
+  test("honors unset spark.mesos.containerizer") {
+    setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
+  }
+
+  test("honors spark.mesos.containerizer=\"mesos\"") {
+    setBackend(Map(
+      "spark.mesos.executor.docker.image" -> "test",
+      "spark.mesos.containerizer" -> "mesos"))
+
+    val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+    val offer1 = createOffer("o1", "s1", mem, cpu)
+    backend.resourceOffers(driver, List(offer1).asJava)
+
+    val taskInfos = verifyTaskLaunched(driver, "o1")
+    assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
+  }
+
   test("docker settings are reflected in created tasks") {
     setBackend(Map(
       "spark.mesos.executor.docker.image" -> "some_image",
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index 9350b9df50c0d331338769a5962c43a0b15deb9c..d0771e1ac85f5b45c8fdd1616371f28a9ae90c91 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
 libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index 2e1a6a3dc60cc05d9c061e96a0acc0ca864e8a31..ef97ffd9ab31e9243601c981659880bf9fdadd90 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 9baf87e5329ff89b62f27fe2fcb9a3da8311cda4..fba3c18b1449c22780eccbf205542310a63c301c 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 9112452b5cb508e9d8da21d3b35c06d25c74de00..9747acda817062bf32968545578e2d72e5a5a5ce 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index b0e3e9304b198c42c83c4bcb1e5547d361b9dff0..7231bcaf6c30f1866ba3f57690e060c893abcb2a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
 log4j-1.2.17.jar
 lz4-1.3.0.jar
 mail-1.4.7.jar
-mesos-0.22.2-shaded-protobuf.jar
+mesos-1.0.0-shaded-protobuf.jar
 metrics-core-3.1.2.jar
 metrics-graphite-3.1.2.jar
 metrics-json-3.1.2.jar
diff --git a/docs/_config.yml b/docs/_config.yml
index bbb576e0e7bb3b84b0c7cf1e4d2ae521b7494fdf..e4fc093fe73342a89abee88f5dc29aec2c4e760a 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
 SPARK_VERSION_SHORT: 2.1.0
 SCALA_BINARY_VERSION: "2.11"
 SCALA_VERSION: "2.11.7"
-MESOS_VERSION: 0.22.0
+MESOS_VERSION: 1.0.0
 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
 SPARK_GITHUB_URL: https://github.com/apache/spark
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ce888b544516e1549f83fd59e90c18e1082acfd5..d037e7be0a9f86908a2d57e908a1e21bde26d8c2 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -432,6 +432,16 @@ See the [configuration page](configuration.html) for information on Spark config
     </ul>
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.containerizer</code></td>
+  <td><code>docker</code></td>
+  <td>
+    This only affects docker containers, and must be one of "docker"
+    or "mesos".  Mesos supports two types of
+    containerizers for docker: the "docker" containerizer, and the preferred
+    "mesos" containerizer.  Read more here: http://mesos.apache.org/documentation/latest/container-image/
+  </td>
+</tr>
 <tr>
   <td><code>spark.mesos.driver.webui.url</code></td>
   <td><code>(none)</code></td>
diff --git a/pom.xml b/pom.xml
index 9b7be371bb134a04ec7264968e55c067665303cd..0491e981d585c7a6ab6d660e49e17d53162ce692 100644
--- a/pom.xml
+++ b/pom.xml
@@ -119,7 +119,7 @@
     <java.version>1.7</java.version>
     <maven.version>3.3.9</maven.version>
     <sbt.project.name>spark</sbt.project.name>
-    <mesos.version>0.22.2</mesos.version>
+    <mesos.version>1.0.0</mesos.version>
     <mesos.classifier>shaded-protobuf</mesos.classifier>
     <slf4j.version>1.7.16</slf4j.version>
     <log4j.version>1.2.17</log4j.version>