diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index cbade131494bcea412782c1ee381c06506fa96dd..b7fde0d9b32655fa44c39d351ee3b47365f3777e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{List => JList, Collections}
 import java.util.concurrent.locks.ReentrantLock
+import java.util.{Collections, List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
@@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet}
 import com.google.common.collect.HashBiMap
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler => MScheduler, _}
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   /**
    * The total number of executors we aim to have. Undefined when not using dynamic allocation
-   * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
+   * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
    */
   private var executorLimitOption: Option[Int] = None
 
@@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   override def start() {
     super.start()
-    val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
-    startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
+    val driver = createSchedulerDriver(
+      master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+    startScheduler(driver)
   }
 
   def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
@@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend(
           taskIdToSlaveId(taskId) = slaveId
           slaveIdsWithExecutors += slaveId
           coresByTaskId(taskId) = cpusToUse
-          val task = MesosTaskInfo.newBuilder()
+          // Gather cpu resources from the available resources and use them in the task.
+          val (remainingResources, cpuResourcesToUse) =
+            partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+          val (_, memResourcesToUse) =
+            partitionResources(remainingResources, "mem", calculateTotalMemory(sc))
+          val taskBuilder = MesosTaskInfo.newBuilder()
             .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
             .setSlaveId(offer.getSlaveId)
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
             .setName("Task " + taskId)
-            .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", calculateTotalMemory(sc)))
+            .addAllResources(cpuResourcesToUse)
+            .addAllResources(memResourcesToUse)
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
             MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
+              .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
           }
 
           // accept the offer and launch the task
           logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
           d.launchTasks(
             Collections.singleton(offer.getId),
-            Collections.singleton(task.build()), filters)
+            Collections.singleton(taskBuilder.build()), filters)
         } else {
           // Decline the offer
           logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
@@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue.toInt
     val state = status.getState
-    logInfo("Mesos task " + taskId + " is now " + state)
+    logInfo(s"Mesos task $taskId is now $state")
     stateLock.synchronized {
       if (TaskState.isFinished(TaskState.fromMesos(state))) {
         val slaveId = taskIdToSlaveId(taskId)
@@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend(
         if (TaskState.isFailed(TaskState.fromMesos(state))) {
           failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
           if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
-            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
+            logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
                 "is Spark installed on it?")
           }
         }
@@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   override def error(d: SchedulerDriver, message: String) {
-    logError("Mesos error: " + message)
+    logError(s"Mesos error: $message")
     scheduler.error(message)
   }
 
@@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend(
   }
 
   override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
+    logInfo(s"Mesos slave lost: ${slaveId.getValue}")
     executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index d3a20f822176ed3552fcc7eba6bdcbcfd92b960c..f078547e713523e768100d268ef664bdd4097136 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -295,20 +295,24 @@ private[spark] class MesosClusterScheduler(
   def start(): Unit = {
     // TODO: Implement leader election to make sure only one framework running in the cluster.
     val fwId = schedulerState.fetch[String]("frameworkId")
-    val builder = FrameworkInfo.newBuilder()
-      .setUser(Utils.getCurrentUserName())
-      .setName(appName)
-      .setWebuiUrl(frameworkUrl)
-      .setCheckpoint(true)
-      .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
     fwId.foreach { id =>
-      builder.setId(FrameworkID.newBuilder().setValue(id).build())
       frameworkId = id
     }
     recoverState()
     metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
     metricsSystem.start()
-    startScheduler(master, MesosClusterScheduler.this, builder.build())
+    val driver = createSchedulerDriver(
+      master,
+      MesosClusterScheduler.this,
+      Utils.getCurrentUserName(),
+      appName,
+      conf,
+      Some(frameworkUrl),
+      Some(true),
+      Some(Integer.MAX_VALUE),
+      fwId)
+
+    startScheduler(driver)
     ready = true
   }
 
@@ -449,12 +453,8 @@ private[spark] class MesosClusterScheduler(
         offer.cpu -= driverCpu
         offer.mem -= driverMem
         val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
-        val cpuResource = Resource.newBuilder()
-          .setName("cpus").setType(Value.Type.SCALAR)
-          .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
-        val memResource = Resource.newBuilder()
-          .setName("mem").setType(Value.Type.SCALAR)
-          .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
+        val cpuResource = createResource("cpus", driverCpu)
+        val memResource = createResource("mem", driverMem)
         val commandInfo = buildDriverCommand(submission)
         val appName = submission.schedulerProperties("spark.app.name")
         val taskInfo = TaskInfo.newBuilder()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index d72e2af456e153ad33d03e2bcb99c9fb71042f66..3f63ec1c5832f0a115af82fad59fa428fbda61dd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -32,6 +32,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
 
+
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
  * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
@@ -45,8 +46,8 @@ private[spark] class MesosSchedulerBackend(
   with MScheduler
   with MesosSchedulerUtils {
 
-  // Which slave IDs we have executors on
-  val slaveIdsWithExecutors = new HashSet[String]
+  // Stores the slave ids that has launched a Mesos executor.
+  val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
   val taskIdToSlaveId = new HashMap[Long, String]
 
   // An ExecutorInfo for our tasks
@@ -66,12 +67,21 @@ private[spark] class MesosSchedulerBackend(
   @volatile var appId: String = _
 
   override def start() {
-    val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
     classLoader = Thread.currentThread.getContextClassLoader
-    startScheduler(master, MesosSchedulerBackend.this, fwInfo)
+    val driver = createSchedulerDriver(
+      master, MesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
+    startScheduler(driver)
   }
 
-  def createExecutorInfo(execId: String): MesosExecutorInfo = {
+  /**
+   * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+   * @param availableResources Available resources that is offered by Mesos
+   * @param execId The executor id to assign to this new executor.
+   * @return A tuple of the new mesos executor info and the remaining available resources.
+   */
+  def createExecutorInfo(
+      availableResources: JList[Resource],
+      execId: String): (MesosExecutorInfo, JList[Resource]) = {
     val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
       .getOrElse {
@@ -115,32 +125,25 @@ private[spark] class MesosSchedulerBackend(
       command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
       command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
     }
-    val cpus = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder()
-      .setValue(mesosExecutorCores).build())
-      .build()
-    val memory = Resource.newBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(
-        Value.Scalar.newBuilder()
-          .setValue(calculateTotalMemory(sc)).build())
-      .build()
-    val executorInfo = MesosExecutorInfo.newBuilder()
+    val builder = MesosExecutorInfo.newBuilder()
+    val (resourcesAfterCpu, usedCpuResources) =
+      partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
+    val (resourcesAfterMem, usedMemResources) =
+      partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))
+
+    builder.addAllResources(usedCpuResources)
+    builder.addAllResources(usedMemResources)
+    val executorInfo = builder
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
       .setCommand(command)
       .setData(ByteString.copyFrom(createExecArg()))
-      .addResources(cpus)
-      .addResources(memory)
 
     sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
       MesosSchedulerBackendUtil
         .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
     }
 
-    executorInfo.build()
+    (executorInfo.build(), resourcesAfterMem)
   }
 
   /**
@@ -183,6 +186,18 @@ private[spark] class MesosSchedulerBackend(
 
   override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
 
+  private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
+    val builder = new StringBuilder
+    tasks.foreach { t =>
+      builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
+        .append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
+        .append("Task resources: ").append(t.getResourcesList).append("\n")
+        .append("Executor resources: ").append(t.getExecutor.getResourcesList)
+        .append("---------------------------------------------\n")
+    }
+    builder.toString()
+  }
+
   /**
    * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
    * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
@@ -207,7 +222,7 @@ private[spark] class MesosSchedulerBackend(
 
         val meetsRequirements =
           (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
-          (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+          (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
 
         // add some debug messaging
         val debugstr = if (meetsRequirements) "Accepting" else "Declining"
@@ -221,7 +236,7 @@ private[spark] class MesosSchedulerBackend(
       unUsableOffers.foreach(o => d.declineOffer(o.getId))
 
       val workerOffers = usableOffers.map { o =>
-        val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
+        val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
           getResource(o.getResourcesList, "cpus").toInt
         } else {
           // If the Mesos executor has not been started on this slave yet, set aside a few
@@ -236,6 +251,10 @@ private[spark] class MesosSchedulerBackend(
 
       val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
       val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
+      val slaveIdToResources = new HashMap[String, JList[Resource]]()
+      usableOffers.foreach { o =>
+        slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
+      }
 
       val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]
 
@@ -245,15 +264,19 @@ private[spark] class MesosSchedulerBackend(
       val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
       acceptedOffers
         .foreach { offer =>
-        offer.foreach { taskDesc =>
-          val slaveId = taskDesc.executorId
-          slaveIdsWithExecutors += slaveId
-          slavesIdsOfAcceptedOffers += slaveId
-          taskIdToSlaveId(taskDesc.taskId) = slaveId
-          mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
-            .add(createMesosTask(taskDesc, slaveId))
+          offer.foreach { taskDesc =>
+            val slaveId = taskDesc.executorId
+            slavesIdsOfAcceptedOffers += slaveId
+            taskIdToSlaveId(taskDesc.taskId) = slaveId
+            val (mesosTask, remainingResources) = createMesosTask(
+              taskDesc,
+              slaveIdToResources(slaveId),
+              slaveId)
+            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+              .add(mesosTask)
+            slaveIdToResources(slaveId) = remainingResources
+          }
         }
-      }
 
       // Reply to the offers
       val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
@@ -264,6 +287,7 @@ private[spark] class MesosSchedulerBackend(
             // TODO: Add support for log urls for Mesos
             new ExecutorInfo(o.host, o.cores, Map.empty)))
         )
+        logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
       }
 
@@ -272,26 +296,32 @@ private[spark] class MesosSchedulerBackend(
       for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
         d.declineOffer(o.getId)
       }
-
     }
   }
 
-  /** Turn a Spark TaskDescription into a Mesos task */
-  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
+  /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
+  def createMesosTask(
+      task: TaskDescription,
+      resources: JList[Resource],
+      slaveId: String): (MesosTaskInfo, JList[Resource]) = {
     val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val cpuResource = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
-      .build()
-    MesosTaskInfo.newBuilder()
+    val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
+      (slaveIdToExecutorInfo(slaveId), resources)
+    } else {
+      createExecutorInfo(resources, slaveId)
+    }
+    slaveIdToExecutorInfo(slaveId) = executorInfo
+    val (finalResources, cpuResources) =
+      partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
+    val taskInfo = MesosTaskInfo.newBuilder()
       .setTaskId(taskId)
       .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(createExecutorInfo(slaveId))
+      .setExecutor(executorInfo)
       .setName(task.name)
-      .addResources(cpuResource)
+      .addAllResources(cpuResources)
       .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
       .build()
+    (taskInfo, finalResources)
   }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
@@ -337,7 +367,7 @@ private[spark] class MesosSchedulerBackend(
   private def removeExecutor(slaveId: String, reason: String) = {
     synchronized {
       listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
-      slaveIdsWithExecutors -= slaveId
+      slaveIdToExecutorInfo -= slaveId
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 925702e63afd3f4088a7f65c7495113fa2d6d149..c04920e4f58731ea40fe3c4fb2d5026a24fec997 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -21,15 +21,17 @@ import java.util.{List => JList}
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 
 import com.google.common.base.Splitter
 import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
 import org.apache.mesos.Protos._
-import org.apache.mesos.protobuf.GeneratedMessage
-import org.apache.spark.{Logging, SparkContext}
+import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
+import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext}
 import org.apache.spark.util.Utils
 
+
 /**
  * Shared trait for implementing a Mesos Scheduler. This holds common state and helper
  * methods and Mesos scheduler will use.
@@ -42,13 +44,63 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   protected var mesosDriver: SchedulerDriver = null
 
   /**
-   * Starts the MesosSchedulerDriver with the provided information. This method returns
-   * only after the scheduler has registered with Mesos.
-   * @param masterUrl Mesos master connection URL
-   * @param scheduler Scheduler object
-   * @param fwInfo FrameworkInfo to pass to the Mesos master
+   * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+   * @param masterUrl The url to connect to Mesos master
+   * @param scheduler the scheduler class to receive scheduler callbacks
+   * @param sparkUser User to impersonate with when running tasks
+   * @param appName The framework name to display on the Mesos UI
+   * @param conf Spark configuration
+   * @param webuiUrl The WebUI url to link from Mesos UI
+   * @param checkpoint Option to checkpoint tasks for failover
+   * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect
+   * @param frameworkId The id of the new framework
    */
-  def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = {
+  protected def createSchedulerDriver(
+      masterUrl: String,
+      scheduler: Scheduler,
+      sparkUser: String,
+      appName: String,
+      conf: SparkConf,
+      webuiUrl: Option[String] = None,
+      checkpoint: Option[Boolean] = None,
+      failoverTimeout: Option[Double] = None,
+      frameworkId: Option[String] = None): SchedulerDriver = {
+    val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
+    val credBuilder = Credential.newBuilder()
+    webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) }
+    checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) }
+    failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) }
+    frameworkId.foreach { id =>
+      fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build())
+    }
+    conf.getOption("spark.mesos.principal").foreach { principal =>
+      fwInfoBuilder.setPrincipal(principal)
+      credBuilder.setPrincipal(principal)
+    }
+    conf.getOption("spark.mesos.secret").foreach { secret =>
+      credBuilder.setSecret(ByteString.copyFromUtf8(secret))
+    }
+    if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
+      throw new SparkException(
+        "spark.mesos.principal must be configured when spark.mesos.secret is set")
+    }
+    conf.getOption("spark.mesos.role").foreach { role =>
+      fwInfoBuilder.setRole(role)
+    }
+    if (credBuilder.hasPrincipal) {
+      new MesosSchedulerDriver(
+        scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
+    } else {
+      new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
+    }
+  }
+
+  /**
+   * Starts the MesosSchedulerDriver and stores the current running driver to this new instance.
+   * This driver is expected to not be running.
+   * This method returns only after the scheduler has registered with Mesos.
+   */
+  def startScheduler(newDriver: SchedulerDriver): Unit = {
     synchronized {
       if (mesosDriver != null) {
         registerLatch.await()
@@ -59,11 +111,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
         setDaemon(true)
 
         override def run() {
-          mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl)
+          mesosDriver = newDriver
           try {
             val ret = mesosDriver.run()
             logInfo("driver.run() returned with code " + ret)
-            if (ret.equals(Status.DRIVER_ABORTED)) {
+            if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
               System.exit(1)
             }
           } catch {
@@ -82,18 +134,62 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   /**
    * Signal that the scheduler has registered with Mesos.
    */
+  protected def getResource(res: JList[Resource], name: String): Double = {
+    // A resource can have multiple values in the offer since it can either be from
+    // a specific role or wildcard.
+    res.filter(_.getName == name).map(_.getScalar.getValue).sum
+  }
+
   protected def markRegistered(): Unit = {
     registerLatch.countDown()
   }
 
+  def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
+    val builder = Resource.newBuilder()
+      .setName(name)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(amount).build())
+
+    role.foreach { r => builder.setRole(r) }
+
+    builder.build()
+  }
+
   /**
-   * Get the amount of resources for the specified type from the resource list
+   * Partition the existing set of resources into two groups, those remaining to be
+   * scheduled and those requested to be used for a new task.
+   * @param resources The full list of available resources
+   * @param resourceName The name of the resource to take from the available resources
+   * @param amountToUse The amount of resources to take from the available resources
+   * @return The remaining resources list and the used resources list.
    */
-  protected def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
+  def partitionResources(
+      resources: JList[Resource],
+      resourceName: String,
+      amountToUse: Double): (List[Resource], List[Resource]) = {
+    var remain = amountToUse
+    var requestedResources = new ArrayBuffer[Resource]
+    val remainingResources = resources.map {
+      case r => {
+        if (remain > 0 &&
+          r.getType == Value.Type.SCALAR &&
+          r.getScalar.getValue > 0.0 &&
+          r.getName == resourceName) {
+          val usage = Math.min(remain, r.getScalar.getValue)
+          requestedResources += createResource(resourceName, usage, Some(r.getRole))
+          remain -= usage
+          createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole))
+        } else {
+          r
+        }
+      }
     }
-    0.0
+
+    // Filter any resource that has depleted.
+    val filteredResources =
+      remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0)
+
+    (filteredResources.toList, requestedResources.toList)
   }
 
   /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
index 3f1692917a357254bbe4c8c0c897702c7927fe74..4b504df7b88511e97e2d08f6d836ea191b8071a2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala
@@ -22,7 +22,7 @@ import java.util.Collections
 
 import org.apache.mesos.Protos.Value.Scalar
 import org.apache.mesos.Protos._
-import org.apache.mesos.SchedulerDriver
+import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.mockito.Matchers
@@ -60,7 +60,16 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
       taskScheduler: TaskSchedulerImpl,
       driver: SchedulerDriver): CoarseMesosSchedulerBackend = {
     val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") {
-      mesosDriver = driver
+      override protected def createSchedulerDriver(
+        masterUrl: String,
+        scheduler: Scheduler,
+        sparkUser: String,
+        appName: String,
+        conf: SparkConf,
+        webuiUrl: Option[String] = None,
+        checkpoint: Option[Boolean] = None,
+        failoverTimeout: Option[Double] = None,
+        frameworkId: Option[String] = None): SchedulerDriver = driver
       markRegistered()
     }
     backend.start()
@@ -80,6 +89,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
 
   test("mesos supports killing and limiting executors") {
     val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
     val taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
 
@@ -87,7 +97,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
     sparkConf.set("spark.driver.port", "1234")
 
     val backend = createSchedulerBackend(taskScheduler, driver)
-    val minMem = backend.calculateTotalMemory(sc).toInt
+    val minMem = backend.calculateTotalMemory(sc)
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]
@@ -130,11 +140,12 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
 
   test("mesos supports killing and relaunching tasks with executors") {
     val driver = mock[SchedulerDriver]
+    when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
     val taskScheduler = mock[TaskSchedulerImpl]
     when(taskScheduler.sc).thenReturn(sc)
 
     val backend = createSchedulerBackend(taskScheduler, driver)
-    val minMem = backend.calculateTotalMemory(sc).toInt + 1024
+    val minMem = backend.calculateTotalMemory(sc) + 1024
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index d01837fe78957651039d14e1bd9e8e1868c5ff8d..5ed30f64d705f26bd9dd28bb9a5c4385a578b9b3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer
 import java.util
 import java.util.Collections
 
+import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
@@ -60,14 +61,17 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
 
     val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
 
+    val resources = List(
+      mesosSchedulerBackend.createResource("cpus", 4),
+      mesosSchedulerBackend.createResource("mem", 1024))
     // uri is null.
-    val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id")
+    val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
     assert(executorInfo.getCommand.getValue ===
       s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
 
     // uri exists.
     conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
-    val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id")
+    val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
     assert(executorInfo1.getCommand.getValue ===
       s"cd test-app-1*;  ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
   }
@@ -93,7 +97,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
 
     val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
 
-    val execInfo = backend.createExecutorInfo("mockExecutor")
+    val (execInfo, _) = backend.createExecutorInfo(
+      List(backend.createResource("cpus", 4)), "mockExecutor")
     assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
     val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
     assert(portmaps.get(0).getHostPort.equals(80))
@@ -194,7 +199,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
     )
     verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
     verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
-    assert(capture.getValue.size() == 1)
+    assert(capture.getValue.size() === 1)
     val taskInfo = capture.getValue.iterator().next()
     assert(taskInfo.getName.equals("n1"))
     val cpus = taskInfo.getResourcesList.get(0)
@@ -214,4 +219,97 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
     backend.resourceOffers(driver, mesosOffers2)
     verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
   }
+
+  test("can handle multiple roles") {
+    val driver = mock[SchedulerDriver]
+    val taskScheduler = mock[TaskSchedulerImpl]
+
+    val listenerBus = mock[LiveListenerBus]
+    listenerBus.post(
+      SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+    val sc = mock[SparkContext]
+    when(sc.executorMemory).thenReturn(100)
+    when(sc.getSparkHome()).thenReturn(Option("/path"))
+    when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+    when(sc.conf).thenReturn(new SparkConf)
+    when(sc.listenerBus).thenReturn(listenerBus)
+
+    val id = 1
+    val builder = Offer.newBuilder()
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setType(Value.Type.SCALAR)
+      .setRole("prod")
+      .setScalar(Scalar.newBuilder().setValue(500))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setRole("prod")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(1))
+    builder.addResourcesBuilder()
+      .setName("mem")
+      .setRole("dev")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(600))
+    builder.addResourcesBuilder()
+      .setName("cpus")
+      .setRole("dev")
+      .setType(Value.Type.SCALAR)
+      .setScalar(Scalar.newBuilder().setValue(2))
+    val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
+      .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
+      .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
+      .setHostname(s"host${id.toString}").build()
+
+
+    val mesosOffers = new java.util.ArrayList[Offer]
+    mesosOffers.add(offer)
+
+    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+    val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
+    expectedWorkerOffers.append(new WorkerOffer(
+      mesosOffers.get(0).getSlaveId.getValue,
+      mesosOffers.get(0).getHostname,
+      2 // Deducting 1 for executor
+    ))
+
+    val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
+    when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
+    when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
+
+    val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]])
+    when(
+      driver.launchTasks(
+        Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+        capture.capture(),
+        any(classOf[Filters])
+      )
+    ).thenReturn(Status.valueOf(1))
+
+    backend.resourceOffers(driver, mesosOffers)
+
+    verify(driver, times(1)).launchTasks(
+      Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
+      capture.capture(),
+      any(classOf[Filters])
+    )
+
+    assert(capture.getValue.size() === 1)
+    val taskInfo = capture.getValue.iterator().next()
+    assert(taskInfo.getName.equals("n1"))
+    assert(taskInfo.getResourcesCount === 1)
+    val cpusDev = taskInfo.getResourcesList.get(0)
+    assert(cpusDev.getName.equals("cpus"))
+    assert(cpusDev.getScalar.getValue.equals(1.0))
+    assert(cpusDev.getRole.equals("dev"))
+    val executorResources = taskInfo.getExecutor.getResourcesList
+    assert(executorResources.exists { r =>
+      r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
+    })
+    assert(executorResources.exists { r =>
+      r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
+    })
+  }
 }
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 1f915d8ea1d73257e4187c745db899e8e00cd855..debdd2adf22d6bdb5ac8d67d30166fce1588a243 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -306,6 +306,28 @@ See the [configuration page](configuration.html) for information on Spark config
     the final overhead will be this value.
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.principal</code></td>
+  <td>Framework principal to authenticate to Mesos</td>
+  <td>
+    Set the principal with which Spark framework will use to authenticate with Mesos.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.secret</code></td>
+  <td>Framework secret to authenticate to Mesos</td>
+  <td>
+    Set the secret with which Spark framework will use to authenticate with Mesos.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.mesos.role</code></td>
+  <td>Role for the Spark framework</td>
+  <td>
+    Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations
+    and resource weight sharing.
+  </td>
+</tr>
 <tr>
   <td><code>spark.mesos.constraints</code></td>
   <td>Attribute based constraints to be matched against when accepting resource offers.</td>