diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 2760f31b12fa70e107c9bf2608c6c33b91477f94..1bc6f71860c3f037b89c3008fe64e8574c6e1e09 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -152,6 +152,7 @@ private[spark] class MesosClusterScheduler(
   // is registered with Mesos master.
   @volatile protected var ready = false
   private var masterInfo: Option[MasterInfo] = None
+  private var schedulerDriver: SchedulerDriver = _
 
   def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
     val c = new CreateSubmissionResponse
@@ -168,9 +169,8 @@ private[spark] class MesosClusterScheduler(
         return c
       }
       c.submissionId = desc.submissionId
-      queuedDriversState.persist(desc.submissionId, desc)
-      queuedDrivers += desc
       c.success = true
+      addDriverToQueue(desc)
     }
     c
   }
@@ -191,7 +191,7 @@ private[spark] class MesosClusterScheduler(
       // 4. Check if it has already completed.
       if (launchedDrivers.contains(submissionId)) {
         val task = launchedDrivers(submissionId)
-        mesosDriver.killTask(task.taskId)
+        schedulerDriver.killTask(task.taskId)
         k.success = true
         k.message = "Killing running driver"
       } else if (removeFromQueuedDrivers(submissionId)) {
@@ -324,7 +324,7 @@ private[spark] class MesosClusterScheduler(
     ready = false
     metricsSystem.report()
     metricsSystem.stop()
-    mesosDriver.stop(true)
+    schedulerDriver.stop(true)
   }
 
   override def registered(
@@ -340,6 +340,8 @@ private[spark] class MesosClusterScheduler(
 
     stateLock.synchronized {
       this.masterInfo = Some(masterInfo)
+      this.schedulerDriver = driver
+
       if (!pendingRecover.isEmpty) {
         // Start task reconciliation if we need to recover.
         val statuses = pendingRecover.collect {
@@ -506,11 +508,10 @@ private[spark] class MesosClusterScheduler(
   }
 
   private class ResourceOffer(
-      val offerId: OfferID,
-      val slaveId: SlaveID,
-      var resources: JList[Resource]) {
+      val offer: Offer,
+      var remainingResources: JList[Resource]) {
     override def toString(): String = {
-      s"Offer id: ${offerId}, resources: ${resources}"
+      s"Offer id: ${offer.getId}, resources: ${remainingResources}"
     }
   }
 
@@ -518,16 +519,16 @@ private[spark] class MesosClusterScheduler(
     val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
 
     val (remainingResources, cpuResourcesToUse) =
-      partitionResources(offer.resources, "cpus", desc.cores)
+      partitionResources(offer.remainingResources, "cpus", desc.cores)
     val (finalResources, memResourcesToUse) =
       partitionResources(remainingResources.asJava, "mem", desc.mem)
-    offer.resources = finalResources.asJava
+    offer.remainingResources = finalResources.asJava
 
     val appName = desc.conf.get("spark.app.name")
     val taskInfo = TaskInfo.newBuilder()
       .setTaskId(taskId)
       .setName(s"Driver for ${appName}")
-      .setSlaveId(offer.slaveId)
+      .setSlaveId(offer.offer.getSlaveId)
       .setCommand(buildDriverCommand(desc))
       .addAllResources(cpuResourcesToUse.asJava)
       .addAllResources(memResourcesToUse.asJava)
@@ -549,23 +550,29 @@ private[spark] class MesosClusterScheduler(
       val driverCpu = submission.cores
       val driverMem = submission.mem
       logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
-      val offerOption = currentOffers.find { o =>
-        getResource(o.resources, "cpus") >= driverCpu &&
-        getResource(o.resources, "mem") >= driverMem
+      val offerOption = currentOffers.find { offer =>
+        getResource(offer.remainingResources, "cpus") >= driverCpu &&
+        getResource(offer.remainingResources, "mem") >= driverMem
       }
       if (offerOption.isEmpty) {
         logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
           s"cpu: $driverCpu, mem: $driverMem")
       } else {
         val offer = offerOption.get
-        val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
+        val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
         try {
           val task = createTaskInfo(submission, offer)
           queuedTasks += task
-          logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
+          logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
             submission.submissionId)
-          val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
-            None, new Date(), None, getDriverFrameworkID(submission))
+          val newState = new MesosClusterSubmissionState(
+            submission,
+            task.getTaskId,
+            offer.offer.getSlaveId,
+            None,
+            new Date(),
+            None,
+            getDriverFrameworkID(submission))
           launchedDrivers(submission.submissionId) = newState
           launchedDriversState.persist(submission.submissionId, newState)
           afterLaunchCallback(submission.submissionId)
@@ -588,7 +595,7 @@ private[spark] class MesosClusterScheduler(
     val currentTime = new Date()
 
     val currentOffers = offers.asScala.map {
-      o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
+      offer => new ResourceOffer(offer, offer.getResourcesList)
     }.toList
 
     stateLock.synchronized {
@@ -615,8 +622,8 @@ private[spark] class MesosClusterScheduler(
       driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
     }
 
-    for (o <- currentOffers if !tasks.contains(o.offerId)) {
-      driver.declineOffer(o.offerId)
+    for (offer <- currentOffers if !tasks.contains(offer.offer.getId)) {
+      declineOffer(driver, offer.offer, None, Some(getRejectOfferDuration(conf)))
     }
   }
 
@@ -662,6 +669,12 @@ private[spark] class MesosClusterScheduler(
 
   override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
     val taskId = status.getTaskId.getValue
+
+    logInfo(s"Received status update: taskId=${taskId}" +
+      s" state=${status.getState}" +
+      s" message=${status.getMessage}" +
+      s" reason=${status.getReason}");
+
     stateLock.synchronized {
       if (launchedDrivers.contains(taskId)) {
         if (status.getReason == Reason.REASON_RECONCILIATION &&
@@ -682,8 +695,7 @@ private[spark] class MesosClusterScheduler(
 
           val newDriverDescription = state.driverDescription.copy(
             retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
-          pendingRetryDrivers += newDriverDescription
-          pendingRetryDriversState.persist(taskId, newDriverDescription)
+          addDriverToPending(newDriverDescription, taskId);
         } else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
           removeFromLaunchedDrivers(taskId)
           state.finishDate = Some(new Date())
@@ -746,4 +758,21 @@ private[spark] class MesosClusterScheduler(
   def getQueuedDriversSize: Int = queuedDrivers.size
   def getLaunchedDriversSize: Int = launchedDrivers.size
   def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
+
+  private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
+    queuedDriversState.persist(desc.submissionId, desc)
+    queuedDrivers += desc
+    revive()
+  }
+
+  private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = {
+    pendingRetryDriversState.persist(taskId, desc)
+    pendingRetryDrivers += desc
+    revive()
+  }
+
+  private def revive(): Unit = {
+    logInfo("Reviving Offers.")
+    schedulerDriver.reviveOffers()
+  }
 }
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f69c223ab9b6deef2e2acece4ee121a5ae964c0a..85c2e9c76f4b07119f41c6867f08788fa4a3120a 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
 import scala.concurrent.Future
 
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
 
 import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
 import org.apache.spark.network.netty.SparkTransportConf
@@ -119,11 +120,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   // Reject offers with mismatched constraints in seconds
   private val rejectOfferDurationForUnmetConstraints =
-    getRejectOfferDurationForUnmetConstraints(sc)
+    getRejectOfferDurationForUnmetConstraints(sc.conf)
 
   // Reject offers when we reached the maximum number of cores for this framework
   private val rejectOfferDurationForReachedMaxCores =
-    getRejectOfferDurationForReachedMaxCores(sc)
+    getRejectOfferDurationForReachedMaxCores(sc.conf)
 
   // A client for talking to the external shuffle service
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
@@ -146,6 +147,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   @volatile var appId: String = _
 
+  private var schedulerDriver: SchedulerDriver = _
+
   def newMesosTaskId(): String = {
     val id = nextMesosTaskId
     nextMesosTaskId += 1
@@ -252,9 +255,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
 
   override def registered(
-      d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    appId = frameworkId.getValue
-    mesosExternalShuffleClient.foreach(_.init(appId))
+      driver: org.apache.mesos.SchedulerDriver,
+      frameworkId: FrameworkID,
+      masterInfo: MasterInfo) {
+    this.appId = frameworkId.getValue
+    this.mesosExternalShuffleClient.foreach(_.init(appId))
+    this.schedulerDriver = driver
     markRegistered()
   }
 
@@ -293,46 +299,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   private def declineUnmatchedOffers(
-      d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
+      driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
     offers.foreach { offer =>
-      declineOffer(d, offer, Some("unmet constraints"),
+      declineOffer(
+        driver,
+        offer,
+        Some("unmet constraints"),
         Some(rejectOfferDurationForUnmetConstraints))
     }
   }
 
-  private def declineOffer(
-      d: org.apache.mesos.SchedulerDriver,
-      offer: Offer,
-      reason: Option[String] = None,
-      refuseSeconds: Option[Long] = None): Unit = {
-
-    val id = offer.getId.getValue
-    val offerAttributes = toAttributeMap(offer.getAttributesList)
-    val mem = getResource(offer.getResourcesList, "mem")
-    val cpus = getResource(offer.getResourcesList, "cpus")
-    val ports = getRangeResource(offer.getResourcesList, "ports")
-
-    logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
-      s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
-      reason.map(r => s" (reason: $r)").getOrElse(""))
-
-    refuseSeconds match {
-      case Some(seconds) =>
-        val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
-        d.declineOffer(offer.getId, filters)
-      case _ => d.declineOffer(offer.getId)
-    }
-  }
-
   /**
    * Launches executors on accepted offers, and declines unused offers. Executors are launched
    * round-robin on offers.
    *
-   * @param d SchedulerDriver
+   * @param driver SchedulerDriver
    * @param offers Mesos offers that match attribute constraints
    */
   private def handleMatchedOffers(
-      d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
+      driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
     val tasks = buildMesosTasks(offers)
     for (offer <- offers) {
       val offerAttributes = toAttributeMap(offer.getAttributesList)
@@ -358,15 +343,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             s" ports: $ports")
         }
 
-        d.launchTasks(
+        driver.launchTasks(
           Collections.singleton(offer.getId),
           offerTasks.asJava)
       } else if (totalCoresAcquired >= maxCores) {
         // Reject an offer for a configurable amount of time to avoid starving other frameworks
-        declineOffer(d, offer, Some("reached spark.cores.max"),
+        declineOffer(driver,
+          offer,
+          Some("reached spark.cores.max"),
           Some(rejectOfferDurationForReachedMaxCores))
       } else {
-        declineOffer(d, offer)
+        declineOffer(
+          driver,
+          offer)
       }
     }
   }
@@ -582,8 +571,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     // Close the mesos external shuffle client if used
     mesosExternalShuffleClient.foreach(_.close())
 
-    if (mesosDriver != null) {
-      mesosDriver.stop()
+    if (schedulerDriver != null) {
+      schedulerDriver.stop()
     }
   }
 
@@ -634,13 +623,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   }
 
   override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
-    if (mesosDriver == null) {
+    if (schedulerDriver == null) {
       logWarning("Asked to kill executors before the Mesos driver was started.")
       false
     } else {
       for (executorId <- executorIds) {
         val taskId = TaskID.newBuilder().setValue(executorId).build()
-        mesosDriver.killTask(taskId)
+        schedulerDriver.killTask(taskId)
       }
       // no need to adjust `executorLimitOption` since the AllocationManager already communicated
       // the desired limit through a call to `doRequestTotalExecutors`.
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 7e561916a71e2fdaba095d668209a22e33f3ff77..215271302ec518fb42c6fb605145562c535299b8 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap, HashSet}
 
 import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
 import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SparkContext, SparkException, TaskState}
@@ -65,7 +66,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
 
   // reject offers with mismatched constraints in seconds
   private val rejectOfferDurationForUnmetConstraints =
-    getRejectOfferDurationForUnmetConstraints(sc)
+    getRejectOfferDurationForUnmetConstraints(sc.conf)
+
+  private var schedulerDriver: SchedulerDriver = _
 
   @volatile var appId: String = _
 
@@ -89,6 +92,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
 
   /**
    * Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+ *
    * @param availableResources Available resources that is offered by Mesos
    * @param execId The executor id to assign to this new executor.
    * @return A tuple of the new mesos executor info and the remaining available resources.
@@ -178,10 +182,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
 
   override def registered(
-      d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+      driver: org.apache.mesos.SchedulerDriver,
+      frameworkId: FrameworkID,
+      masterInfo: MasterInfo) {
     inClassLoader() {
       appId = frameworkId.getValue
       logInfo("Registered as framework ID " + appId)
+      this.schedulerDriver = driver
       markRegistered()
     }
   }
@@ -383,13 +390,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   }
 
   override def stop() {
-    if (mesosDriver != null) {
-      mesosDriver.stop()
+    if (schedulerDriver != null) {
+      schedulerDriver.stop()
     }
   }
 
   override def reviveOffers() {
-    mesosDriver.reviveOffers()
+    schedulerDriver.reviveOffers()
   }
 
   override def frameworkMessage(
@@ -426,7 +433,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
   }
 
   override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
-    mesosDriver.killTask(
+    schedulerDriver.killTask(
       TaskID.newBuilder()
         .setValue(taskId.toString).build()
     )
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1d742fefbbacf4858e807bf99647373797be11b4..3f25535cb5ec20c6deb07bb29d318b849ed45c6e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -46,9 +46,6 @@ trait MesosSchedulerUtils extends Logging {
   // Lock used to wait for scheduler to be registered
   private final val registerLatch = new CountDownLatch(1)
 
-  // Driver for talking to Mesos
-  protected var mesosDriver: SchedulerDriver = null
-
   /**
    * Creates a new MesosSchedulerDriver that communicates to the Mesos master.
    *
@@ -115,10 +112,6 @@ trait MesosSchedulerUtils extends Logging {
    */
   def startScheduler(newDriver: SchedulerDriver): Unit = {
     synchronized {
-      if (mesosDriver != null) {
-        registerLatch.await()
-        return
-      }
       @volatile
       var error: Option[Exception] = None
 
@@ -128,8 +121,7 @@ trait MesosSchedulerUtils extends Logging {
         setDaemon(true)
         override def run() {
           try {
-            mesosDriver = newDriver
-            val ret = mesosDriver.run()
+            val ret = newDriver.run()
             logInfo("driver.run() returned with code " + ret)
             if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
               error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
@@ -379,12 +371,24 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
-    sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+  private def getRejectOfferDurationStr(conf: SparkConf): String = {
+    conf.get("spark.mesos.rejectOfferDuration", "120s")
+  }
+
+  protected def getRejectOfferDuration(conf: SparkConf): Long = {
+    Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf))
+  }
+
+  protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = {
+    conf.getTimeAsSeconds(
+      "spark.mesos.rejectOfferDurationForUnmetConstraints",
+      getRejectOfferDurationStr(conf))
   }
 
-  protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
-    sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+  protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = {
+    conf.getTimeAsSeconds(
+      "spark.mesos.rejectOfferDurationForReachedMaxCores",
+      getRejectOfferDurationStr(conf))
   }
 
   /**
@@ -438,6 +442,7 @@ trait MesosSchedulerUtils extends Logging {
 
   /**
    * The values of the non-zero ports to be used by the executor process.
+ *
    * @param conf the spark config to use
    * @return the ono-zero values of the ports
    */
@@ -521,4 +526,33 @@ trait MesosSchedulerUtils extends Logging {
     case TaskState.KILLED => MesosTaskState.TASK_KILLED
     case TaskState.LOST => MesosTaskState.TASK_LOST
   }
+
+  protected def declineOffer(
+    driver: org.apache.mesos.SchedulerDriver,
+    offer: Offer,
+    reason: Option[String] = None,
+    refuseSeconds: Option[Long] = None): Unit = {
+
+    val id = offer.getId.getValue
+    val offerAttributes = toAttributeMap(offer.getAttributesList)
+    val mem = getResource(offer.getResourcesList, "mem")
+    val cpus = getResource(offer.getResourcesList, "cpus")
+    val ports = getRangeResource(offer.getResourcesList, "ports")
+
+    logDebug(s"Declining offer: $id with " +
+      s"attributes: $offerAttributes " +
+      s"mem: $mem " +
+      s"cpu: $cpus " +
+      s"port: $ports " +
+      refuseSeconds.map(s => s"for ${s} seconds ").getOrElse("") +
+      reason.map(r => s" (reason: $r)").getOrElse(""))
+
+    refuseSeconds match {
+      case Some(seconds) =>
+        val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+        driver.declineOffer(offer.getId, filters)
+      case _ =>
+        driver.declineOffer(offer.getId)
+    }
+  }
 }
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index b9d098486b6750291a70c30e34dc6cfba1097d29..32967b04cd3468c3997911508460b1b76df4d6a3 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -53,19 +53,32 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
       override def start(): Unit = { ready = true }
     }
     scheduler.start()
+    scheduler.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO)
+  }
+
+  private def testDriverDescription(submissionId: String): MesosDriverDescription = {
+    new MesosDriverDescription(
+      "d1",
+      "jar",
+      1000,
+      1,
+      true,
+      command,
+      Map[String, String](),
+      submissionId,
+      new Date())
   }
 
   test("can queue drivers") {
     setScheduler()
 
-    val response = scheduler.submitDriver(
-      new MesosDriverDescription("d1", "jar", 1000, 1, true,
-        command, Map[String, String](), "s1", new Date()))
+    val response = scheduler.submitDriver(testDriverDescription("s1"))
     assert(response.success)
-    val response2 =
-      scheduler.submitDriver(new MesosDriverDescription(
-        "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+    verify(driver, times(1)).reviveOffers()
+
+    val response2 = scheduler.submitDriver(testDriverDescription("s2"))
     assert(response2.success)
+
     val state = scheduler.getSchedulerState()
     val queuedDrivers = state.queuedDrivers.toList
     assert(queuedDrivers(0).submissionId == response.submissionId)
@@ -75,9 +88,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
   test("can kill queued drivers") {
     setScheduler()
 
-    val response = scheduler.submitDriver(
-        new MesosDriverDescription("d1", "jar", 1000, 1, true,
-          command, Map[String, String](), "s1", new Date()))
+    val response = scheduler.submitDriver(testDriverDescription("s1"))
     assert(response.success)
     val killResponse = scheduler.killDriver(response.submissionId)
     assert(killResponse.success)
@@ -238,18 +249,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
   }
 
   test("can kill supervised drivers") {
-    val driver = mock[SchedulerDriver]
     val conf = new SparkConf()
     conf.setMaster("mesos://localhost:5050")
     conf.setAppName("spark mesos")
-    scheduler = new MesosClusterScheduler(
-      new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
-      override def start(): Unit = {
-        ready = true
-        mesosDriver = driver
-      }
-    }
-    scheduler.start()
+    setScheduler(conf.getAll.toMap)
 
     val response = scheduler.submitDriver(
       new MesosDriverDescription("d1", "jar", 100, 1, true, command,
@@ -291,4 +294,16 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
     assert(state.launchedDrivers.isEmpty)
     assert(state.finishedDrivers.size == 1)
   }
+
+  test("Declines offer with refuse seconds = 120.") {
+    setScheduler()
+
+    val filter = Filters.newBuilder().setRefuseSeconds(120).build()
+    val offerId = OfferID.newBuilder().setValue("o1").build()
+    val offer = Utils.createOffer(offerId.getValue, "s1", 1000, 1)
+
+    scheduler.resourceOffers(driver, Collections.singletonList(offer))
+
+    verify(driver, times(1)).declineOffer(offerId, filter)
+  }
 }
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 78346e9744957ee65b1442d994fcbbe84681a50a..98033bec6dd68c9f97eccf60b6b4b603e6ca824c 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -552,17 +552,14 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
       override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
 
       // override to avoid race condition with the driver thread on `mesosDriver`
-      override def startScheduler(newDriver: SchedulerDriver): Unit = {
-        mesosDriver = newDriver
-      }
+      override def startScheduler(newDriver: SchedulerDriver): Unit = {}
 
       override def stopExecutors(): Unit = {
         stopCalled = true
       }
-
-      markRegistered()
     }
     backend.start()
+    backend.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO)
     backend
   }
 
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 7ebb294aa90809b9ccc9eaab7305b94faf433c5f..2a67cbc913ffebe28f7d02e90f2968214c961bb8 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -28,6 +28,17 @@ import org.mockito.{ArgumentCaptor, Matchers}
 import org.mockito.Mockito._
 
 object Utils {
+
+  val TEST_FRAMEWORK_ID = FrameworkID.newBuilder()
+    .setValue("test-framework-id")
+    .build()
+
+  val TEST_MASTER_INFO = MasterInfo.newBuilder()
+    .setId("test-master")
+    .setIp(0)
+    .setPort(0)
+    .build()
+
   def createOffer(
       offerId: String,
       slaveId: String,