diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 6b8edca5aa48546fcf0f2a9da4fa4c04e0eb0aad..b68f8c7685eba612dfe6f0c3ceed8975c31bf599 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -18,18 +18,18 @@
 package org.apache.spark.scheduler.cluster.mesos
 
 import java.io.File
-import java.util.{Collections, List => JList}
+import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 import org.apache.spark.rpc.RpcAddress
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
 
 /**
  * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
@@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(
 
   val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
 
+  // Offer constraints
+  private val slaveOfferConstraints =
+    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
   var nextMesosTaskId = 0
 
   @volatile var appId: String = _
@@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
   override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
     synchronized {
       val filters = Filters.newBuilder().setRefuseSeconds(5).build()
-
       for (offer <- offers) {
+        val offerAttributes = toAttributeMap(offer.getAttributesList)
+        val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
         val slaveId = offer.getSlaveId.toString
         val mem = getResource(offer.getResourcesList, "mem")
         val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores &&
-            mem >= MemoryUtils.calculateTotalMemory(sc) &&
+        val id = offer.getId.getValue
+        if (meetsConstraints &&
+            totalCoresAcquired < maxCores &&
+            mem >= calculateTotalMemory(sc) &&
             cpus >= 1 &&
             failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
             !slaveIdsWithExecutors.contains(slaveId)) {
@@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
             .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
             .setName("Task " + taskId)
             .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem",
-              MemoryUtils.calculateTotalMemory(sc)))
+            .addResources(createResource("mem", calculateTotalMemory(sc)))
 
           sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
             MesosSchedulerBackendUtil
-              .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
+              .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
           }
 
-          d.launchTasks(
-            Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
+          // accept the offer and launch the task
+          logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+          d.launchTasks(List(offer.getId), List(task.build()), filters)
         } else {
-          // Filter it out
-          d.launchTasks(
-            Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
+          // Decline the offer
+          logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+          d.declineOffer(offer.getId)
         }
       }
     }
   }
 
-  /** Build a Mesos resource protobuf object */
-  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
 
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     val taskId = status.getTaskId.getValue.toInt
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
deleted file mode 100644
index 8df4f3b554c411d65825b7aefd393469a5d0cf6d..0000000000000000000000000000000000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.apache.spark.SparkContext
-
-private[spark] object MemoryUtils {
-  // These defaults copied from YARN
-  val OVERHEAD_FRACTION = 0.10
-  val OVERHEAD_MINIMUM = 384
-
-  def calculateTotalMemory(sc: SparkContext): Int = {
-    sc.conf.getInt("spark.mesos.executor.memoryOverhead",
-      math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 1067a7f1caf4cd6ec2710310817b3c0b076834c9..d3a20f822176ed3552fcc7eba6bdcbcfd92b960c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
 import org.apache.mesos.Protos.TaskStatus.Reason
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
 import org.apache.mesos.{Scheduler, SchedulerDriver}
+
 import org.apache.spark.deploy.mesos.MesosDriverDescription
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
 import org.apache.spark.metrics.MetricsSystem
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 49de85ef48ada79c7c4b86a0ccc717d1eff85b97..d72e2af456e153ad33d03e2bcb99c9fb71042f66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{HashMap, HashSet}
 
+import org.apache.mesos.{Scheduler => MScheduler, _}
 import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
 import org.apache.mesos.protobuf.ByteString
-import org.apache.mesos.{Scheduler => MScheduler, _}
+import org.apache.spark.{SparkContext, SparkException, TaskState}
 import org.apache.spark.executor.MesosExecutorBackend
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
-import org.apache.spark.{SparkContext, SparkException, TaskState}
 
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
@@ -59,6 +59,10 @@ private[spark] class MesosSchedulerBackend(
 
   private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
 
+  // Offer constraints
+  private[this] val slaveOfferConstraints =
+    parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
+
   @volatile var appId: String = _
 
   override def start() {
@@ -71,8 +75,8 @@ private[spark] class MesosSchedulerBackend(
     val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
       .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
       .getOrElse {
-        throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
-      }
+      throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
+    }
     val environment = Environment.newBuilder()
     sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
       environment.addVariables(
@@ -115,14 +119,14 @@ private[spark] class MesosSchedulerBackend(
       .setName("cpus")
       .setType(Value.Type.SCALAR)
       .setScalar(Value.Scalar.newBuilder()
-        .setValue(mesosExecutorCores).build())
+      .setValue(mesosExecutorCores).build())
       .build()
     val memory = Resource.newBuilder()
       .setName("mem")
       .setType(Value.Type.SCALAR)
       .setScalar(
         Value.Scalar.newBuilder()
-          .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
+          .setValue(calculateTotalMemory(sc)).build())
       .build()
     val executorInfo = MesosExecutorInfo.newBuilder()
       .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
@@ -191,13 +195,31 @@ private[spark] class MesosSchedulerBackend(
         val mem = getResource(o.getResourcesList, "mem")
         val cpus = getResource(o.getResourcesList, "cpus")
         val slaveId = o.getSlaveId.getValue
-        (mem >= MemoryUtils.calculateTotalMemory(sc) &&
-          // need at least 1 for executor, 1 for task
-          cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
-          (slaveIdsWithExecutors.contains(slaveId) &&
-            cpus >= scheduler.CPUS_PER_TASK)
+        val offerAttributes = toAttributeMap(o.getAttributesList)
+
+        // check if all constraints are satisfield
+        //  1. Attribute constraints
+        //  2. Memory requirements
+        //  3. CPU requirements - need at least 1 for executor, 1 for task
+        val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
+        val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
+        val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
+
+        val meetsRequirements =
+          (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
+          (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
+
+        // add some debug messaging
+        val debugstr = if (meetsRequirements) "Accepting" else "Declining"
+        val id = o.getId.getValue
+        logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
+
+        meetsRequirements
       }
 
+      // Decline offers we ruled out immediately
+      unUsableOffers.foreach(o => d.declineOffer(o.getId))
+
       val workerOffers = usableOffers.map { o =>
         val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
           getResource(o.getResourcesList, "cpus").toInt
@@ -223,15 +245,15 @@ private[spark] class MesosSchedulerBackend(
       val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
       acceptedOffers
         .foreach { offer =>
-          offer.foreach { taskDesc =>
-            val slaveId = taskDesc.executorId
-            slaveIdsWithExecutors += slaveId
-            slavesIdsOfAcceptedOffers += slaveId
-            taskIdToSlaveId(taskDesc.taskId) = slaveId
-            mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
-              .add(createMesosTask(taskDesc, slaveId))
-          }
+        offer.foreach { taskDesc =>
+          val slaveId = taskDesc.executorId
+          slaveIdsWithExecutors += slaveId
+          slavesIdsOfAcceptedOffers += slaveId
+          taskIdToSlaveId(taskDesc.taskId) = slaveId
+          mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
+            .add(createMesosTask(taskDesc, slaveId))
         }
+      }
 
       // Reply to the offers
       val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
@@ -251,8 +273,6 @@ private[spark] class MesosSchedulerBackend(
         d.declineOffer(o.getId)
       }
 
-      // Decline offers we ruled out immediately
-      unUsableOffers.foreach(o => d.declineOffer(o.getId))
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index d11228f3d016a870322528b4c9652f80e20c29e3..d8a8c848bb4d188da5935fcd0ed28d6421f6c8c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.scheduler.cluster.mesos
 
-import java.util.List
+import java.util.{List => JList}
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
 
-import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
-import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
-import org.apache.spark.Logging
+import com.google.common.base.Splitter
+import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
+import org.apache.mesos.Protos._
+import org.apache.mesos.protobuf.GeneratedMessage
+import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.util.Utils
 
 /**
@@ -86,10 +89,150 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
   /**
    * Get the amount of resources for the specified type from the resource list
    */
-  protected def getResource(res: List[Resource], name: String): Double = {
+  protected def getResource(res: JList[Resource], name: String): Double = {
     for (r <- res if r.getName == name) {
       return r.getScalar.getValue
     }
     0.0
   }
+
+  /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
+  protected def getAttribute(attr: Attribute): (String, Set[String]) = {
+    (attr.getName, attr.getText.getValue.split(',').toSet)
+  }
+
+
+  /** Build a Mesos resource protobuf object */
+  protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+    Resource.newBuilder()
+      .setName(resourceName)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+      .build()
+  }
+
+  /**
+   * Converts the attributes from the resource offer into a Map of name -> Attribute Value
+   * The attribute values are the mesos attribute types and they are
+   * @param offerAttributes
+   * @return
+   */
+  protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
+    offerAttributes.map(attr => {
+      val attrValue = attr.getType match {
+        case Value.Type.SCALAR => attr.getScalar
+        case Value.Type.RANGES => attr.getRanges
+        case Value.Type.SET => attr.getSet
+        case Value.Type.TEXT => attr.getText
+      }
+      (attr.getName, attrValue)
+    }).toMap
+  }
+
+
+  /**
+   * Match the requirements (if any) to the offer attributes.
+   * if attribute requirements are not specified - return true
+   * else if attribute is defined and no values are given, simple attribute presence is performed
+   * else if attribute name and value is specified, subset match is performed on slave attributes
+   */
+  def matchesAttributeRequirements(
+      slaveOfferConstraints: Map[String, Set[String]],
+      offerAttributes: Map[String, GeneratedMessage]): Boolean = {
+    slaveOfferConstraints.forall {
+      // offer has the required attribute and subsumes the required values for that attribute
+      case (name, requiredValues) =>
+        offerAttributes.get(name) match {
+          case None => false
+          case Some(_) if requiredValues.isEmpty => true // empty value matches presence
+          case Some(scalarValue: Value.Scalar) =>
+            // check if provided values is less than equal to the offered values
+            requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
+          case Some(rangeValue: Value.Range) =>
+            val offerRange = rangeValue.getBegin to rangeValue.getEnd
+            // Check if there is some required value that is between the ranges specified
+            // Note: We only support the ability to specify discrete values, in the future
+            // we may expand it to subsume ranges specified with a XX..YY value or something
+            // similar to that.
+            requiredValues.map(_.toLong).exists(offerRange.contains(_))
+          case Some(offeredValue: Value.Set) =>
+            // check if the specified required values is a subset of offered set
+            requiredValues.subsetOf(offeredValue.getItemList.toSet)
+          case Some(textValue: Value.Text) =>
+            // check if the specified value is equal, if multiple values are specified
+            // we succeed if any of them match.
+            requiredValues.contains(textValue.getValue)
+        }
+    }
+  }
+
+  /**
+   * Parses the attributes constraints provided to spark and build a matching data struct:
+   *  Map[<attribute-name>, Set[values-to-match]]
+   *  The constraints are specified as ';' separated key-value pairs where keys and values
+   *  are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
+   *  multiple values (comma separated). For example:
+   *  {{{
+   *  parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
+   *  // would result in
+   *  <code>
+   *  Map(
+   *    "tachyon" -> Set("true"),
+   *    "zone":   -> Set("us-east-1a", "us-east-1b")
+   *  )
+   *  }}}
+   *
+   *  Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/
+   *                       https://github.com/apache/mesos/blob/master/src/common/values.cpp
+   *                       https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
+   *
+   * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated
+   *                       by ':')
+   * @return  Map of constraints to match resources offers.
+   */
+  def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
+    /*
+      Based on mesos docs:
+      attributes : attribute ( ";" attribute )*
+      attribute : labelString ":" ( labelString | "," )+
+      labelString : [a-zA-Z0-9_/.-]
+    */
+    val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
+    // kv splitter
+    if (constraintsVal.isEmpty) {
+      Map()
+    } else {
+      try {
+        Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
+          case (k, v) =>
+            if (v == null || v.isEmpty) {
+              (k, Set[String]())
+            } else {
+              (k, v.split(',').toSet)
+            }
+        }
+      } catch {
+        case NonFatal(e) =>
+          throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
+      }
+    }
+  }
+
+  // These defaults copied from YARN
+  private val MEMORY_OVERHEAD_FRACTION = 0.10
+  private val MEMORY_OVERHEAD_MINIMUM = 384
+
+  /**
+   * Return the amount of memory to allocate to each executor, taking into account
+   * container overheads.
+   * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
+   * @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
+   *         (whichever is larger)
+   */
+  def calculateTotalMemory(sc: SparkContext): Int = {
+    sc.conf.getInt("spark.mesos.executor.memoryOverhead",
+      math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
+      sc.executorMemory
+  }
+
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
deleted file mode 100644
index e72285d03d3eee0c05370a3eb9e1126cde982f61..0000000000000000000000000000000000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-
-class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar {
-  test("MesosMemoryUtils should always override memoryOverhead when it's set") {
-    val sparkConf = new SparkConf
-
-    val sc = mock[SparkContext]
-    when(sc.conf).thenReturn(sparkConf)
-
-    // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
-    when(sc.executorMemory).thenReturn(512)
-    assert(MemoryUtils.calculateTotalMemory(sc) === 896)
-
-    // 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
-    when(sc.executorMemory).thenReturn(4096)
-    assert(MemoryUtils.calculateTotalMemory(sc) === 4505)
-
-    // set memoryOverhead
-    sparkConf.set("spark.mesos.executor.memoryOverhead", "100")
-    assert(MemoryUtils.calculateTotalMemory(sc) === 4196)
-    sparkConf.set("spark.mesos.executor.memoryOverhead", "400")
-    assert(MemoryUtils.calculateTotalMemory(sc) === 4496)
-  }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 68df46a41ddc8753854edc6278061b83fb606eb9..d01837fe78957651039d14e1bd9e8e1868c5ff8d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -149,7 +149,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
     when(sc.conf).thenReturn(new SparkConf)
     when(sc.listenerBus).thenReturn(listenerBus)
 
-    val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
+    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+    val minMem = backend.calculateTotalMemory(sc)
     val minCpu = 4
 
     val mesosOffers = new java.util.ArrayList[Offer]
@@ -157,8 +159,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
     mesosOffers.add(createOffer(2, minMem - 1, minCpu))
     mesosOffers.add(createOffer(3, minMem, minCpu))
 
-    val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
-
     val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
     expectedWorkerOffers.append(new WorkerOffer(
       mesosOffers.get(0).getSlaveId.getValue,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b354914b6ffd0139f7c136949952f293cfcc2820
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.mesos.Protos.Value
+import org.mockito.Mockito._
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+
+class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
+
+  // scalastyle:off structural.type
+  // this is the documented way of generating fixtures in scalatest
+  def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
+    val sparkConf = new SparkConf
+    val sc = mock[SparkContext]
+    when(sc.conf).thenReturn(sparkConf)
+  }
+  val utils = new MesosSchedulerUtils { }
+  // scalastyle:on structural.type
+
+  test("use at-least minimum overhead") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(512)
+    utils.calculateTotalMemory(f.sc) shouldBe 896
+  }
+
+  test("use overhead if it is greater than minimum value") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(4096)
+    utils.calculateTotalMemory(f.sc) shouldBe 4505
+  }
+
+  test("use spark.mesos.executor.memoryOverhead (if set)") {
+    val f = fixture
+    when(f.sc.executorMemory).thenReturn(1024)
+    f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
+    utils.calculateTotalMemory(f.sc) shouldBe 1536
+  }
+
+  test("parse a non-empty constraint string correctly") {
+    val expectedMap = Map(
+      "tachyon" -> Set("true"),
+      "zone" -> Set("us-east-1a", "us-east-1b")
+    )
+    utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap)
+  }
+
+  test("parse an empty constraint string correctly") {
+    utils.parseConstraintString("") shouldBe Map()
+  }
+
+  test("throw an exception when the input is malformed") {
+    an[IllegalArgumentException] should be thrownBy
+      utils.parseConstraintString("tachyon;zone:us-east")
+  }
+
+  test("empty values for attributes' constraints matches all values") {
+    val constraintsStr = "tachyon:"
+    val parsedConstraints = utils.parseConstraintString(constraintsStr)
+
+    parsedConstraints shouldBe Map("tachyon" -> Set())
+
+    val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
+    val noTachyonOffer = Map("zone" -> zoneSet)
+    val tachyonTrueOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
+    val tachyonFalseOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("false").build())
+
+    utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) shouldBe false
+    utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) shouldBe true
+    utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) shouldBe true
+  }
+
+  test("subset match is performed for set attributes") {
+    val supersetConstraint = Map(
+      "tachyon" -> Value.Text.newBuilder().setValue("true").build(),
+      "zone" -> Value.Set.newBuilder()
+        .addItem("us-east-1a")
+        .addItem("us-east-1b")
+        .addItem("us-east-1c")
+        .build())
+
+    val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c"
+    val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
+
+    utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
+  }
+
+  test("less than equal match is performed on scalar attributes") {
+    val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
+
+    val ltConstraint = utils.parseConstraintString("gpus:2")
+    val eqConstraint = utils.parseConstraintString("gpus:3")
+    val gtConstraint = utils.parseConstraintString("gpus:4")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+  }
+
+  test("contains match is performed for range attributes") {
+    val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
+    val ltConstraint = utils.parseConstraintString("ports:6000")
+    val eqConstraint = utils.parseConstraintString("ports:7500")
+    val gtConstraint = utils.parseConstraintString("ports:8002")
+    val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
+
+    utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
+    utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
+    utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
+  }
+
+  test("equality match is performed for text attributes") {
+    val offerAttribs = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
+
+    val trueConstraint = utils.parseConstraintString("tachyon:true")
+    val falseConstraint = utils.parseConstraintString("tachyon:false")
+
+    utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
+    utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
+  }
+
+}
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 5f1d6daeb27f0f60fd1e2fb9f550e09960fcd41e..1f915d8ea1d73257e4187c745db899e8e00cd855 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -184,6 +184,14 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere
 only makes sense if you run just one application at a time. You can cap the maximum number of cores
 using `conf.set("spark.cores.max", "10")` (for example).
 
+You may also make use of `spark.mesos.constraints` to set attribute based constraints on mesos resource offers. By default, all resource offers will be accepted.
+
+{% highlight scala %}
+conf.set("spark.mesos.constraints", "tachyon=true;us-east-1=false")
+{% endhighlight %}
+
+For example, Let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
+
 # Mesos Docker Support
 
 Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image`
@@ -298,6 +306,20 @@ See the [configuration page](configuration.html) for information on Spark config
     the final overhead will be this value.
   </td>
 </tr>
+<tr>
+  <td><code>spark.mesos.constraints</code></td>
+  <td>Attribute based constraints to be matched against when accepting resource offers.</td>
+  <td>
+    Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
+    <ul>
+      <li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
+      <li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
+      <li>Set constraints are matched with "subset of" semantics i.e. value in the constraint must be a subset of the resource offer's value.</li>
+      <li>Text constraints are metched with "equality" semantics i.e. value in the constraint must be exactly equal to the resource offer's value.</li>
+      <li>In case there is no value present as a part of the constraint any offer with the corresponding attribute will be accepted (without value check).</li>
+    </ul>
+  </td>
+</tr>
 </table>
 
 # Troubleshooting and Debugging