diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index d10a77f8e5c78ba9d9d6f4e27e8b816cbd0b040a..2de9b6a6516925fd3fe6ec2b4d8ed7a0bcef64c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
   private val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
+  // reject offers with mismatched constraints in seconds
+  private val rejectOfferDurationForUnmetConstraints =
+    getRejectOfferDurationForUnmetConstraints(sc)
+
   // A client for talking to the external shuffle service, if it is a
   private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
     if (shuffleServiceEnabled) {
@@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
         val id = offer.getId.getValue
-        if (taskIdToSlaveId.size < executorLimit &&
-            totalCoresAcquired < maxCores &&
-            meetsConstraints &&
-            mem >= calculateTotalMemory(sc) &&
-            cpus >= 1 &&
-            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
-            !slaveIdsWithExecutors.contains(slaveId)) {
-          // Launch an executor on the slave
-          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-          totalCoresAcquired += cpusToUse
-          val taskId = newMesosTaskId()
-          taskIdToSlaveId.put(taskId, slaveId)
-          slaveIdsWithExecutors += slaveId
-          coresByTaskId(taskId) = cpusToUse
-          // Gather cpu resources from the available resources and use them in the task.
-          val (remainingResources, cpuResourcesToUse) =
-            partitionResources(offer.getResourcesList, "cpus", cpusToUse)
-          val (_, memResourcesToUse) =
-            partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
-          val taskBuilder = MesosTaskInfo.newBuilder()
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-            .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
-            .setName("Task " + taskId)
-            .addAllResources(cpuResourcesToUse.asJava)
-            .addAllResources(memResourcesToUse.asJava)
-
-          sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
-            MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
+        if (meetsConstraints) {
+          if (taskIdToSlaveId.size < executorLimit &&
+              totalCoresAcquired < maxCores &&
+              mem >= calculateTotalMemory(sc) &&
+              cpus >= 1 &&
+              failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
+              !slaveIdsWithExecutors.contains(slaveId)) {
+            // Launch an executor on the slave
+            val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+            totalCoresAcquired += cpusToUse
+            val taskId = newMesosTaskId()
+            taskIdToSlaveId.put(taskId, slaveId)
+            slaveIdsWithExecutors += slaveId
+            coresByTaskId(taskId) = cpusToUse
+            // Gather cpu resources from the available resources and use them in the task.
+            val (remainingResources, cpuResourcesToUse) =
+              partitionResources(offer.getResourcesList, "cpus", cpusToUse)
+            val (_, memResourcesToUse) =
+              partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
+            val taskBuilder = MesosTaskInfo.newBuilder()
+              .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+              .setSlaveId(offer.getSlaveId)
+              .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
+              .setName("Task " + taskId)
+              .addAllResources(cpuResourcesToUse.asJava)
+              .addAllResources(memResourcesToUse.asJava)
+
+            sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
+              MesosSchedulerBackendUtil
+                .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
+            }
+
+            // Accept the offer and launch the task
+            logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+            slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
+            d.launchTasks(
+              Collections.singleton(offer.getId),
+              Collections.singleton(taskBuilder.build()), filters)
+          } else {
+            // Decline the offer
+            logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+            d.declineOffer(offer.getId)
           }
-
-          // accept the offer and launch the task
-          logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
-          slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
-          d.launchTasks(
-            Collections.singleton(offer.getId),
-            Collections.singleton(taskBuilder.build()), filters)
         } else {
-          // Decline the offer
-          logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
-          d.declineOffer(offer.getId)
+          // This offer does not meet constraints. We don't need to see it again.
+          // Decline the offer for a long period of time.
+          logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+              + s" for $rejectOfferDurationForUnmetConstraints seconds")
+          d.declineOffer(offer.getId, Filters.newBuilder()
+            .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
         }
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index aaffac604a88560146b2915f15b2db4e1c9669fd..281965a5981bb0389ba7265fb63db928f6107d56 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
   private[this] val slaveOfferConstraints =
     parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
 
+  // reject offers with mismatched constraints in seconds
+  private val rejectOfferDurationForUnmetConstraints =
+    getRejectOfferDurationForUnmetConstraints(sc)
+
   @volatile var appId: String = _
 
   override def start() {
@@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
    */
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     inClassLoader() {
-      // Fail-fast on offers we know will be rejected
-      val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
+      // Fail first on offers with unmet constraints
+      val (offersMatchingConstraints, offersNotMatchingConstraints) =
+        offers.asScala.partition { o =>
+          val offerAttributes = toAttributeMap(o.getAttributesList)
+          val meetsConstraints =
+            matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+
+          // add some debug messaging
+          if (!meetsConstraints) {
+            val id = o.getId.getValue
+            logDebug(s"Declining offer: $id with attributes: $offerAttributes")
+          }
+
+          meetsConstraints
+        }
+
+      // These offers do not meet constraints. We don't need to see them again.
+      // Decline the offer for a long period of time.
+      offersNotMatchingConstraints.foreach { o =>
+        d.declineOffer(o.getId, Filters.newBuilder()
+          .setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
+      }
+
+      // Of the matching constraints, see which ones give us enough memory and cores
+      val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
         val slaveId = o.getSlaveId.getValue
         val offerAttributes = toAttributeMap(o.getAttributesList)
 
-        // check if all constraints are satisfield
-        //  1. Attribute constraints
-        //  2. Memory requirements
-        //  3. CPU requirements - need at least 1 for executor, 1 for task
-        val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+        // check offers for
+        //  1. Memory requirements
+        //  2. CPU requirements - need at least 1 for executor, 1 for task
         val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
         val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
-
         val meetsRequirements =
-          (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
+          (meetsMemoryRequirements && meetsCPURequirements) ||
           (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
-
-        // add some debug messaging
         val debugstr = if (meetsRequirements) "Accepting" else "Declining"
-        val id = o.getId.getValue
-        logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+        logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+          + s"$offerAttributes mem: $mem cpu: $cpus")
 
         meetsRequirements
       }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 860c8e097b3b958632b65a226e08ba8296c0c72d..721861fbbc51768a0add872609450eadc6c8c3f4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
     }
   }
 
+  protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
+    sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+  }
+
 }