Skip to content
Snippets Groups Projects
Commit 1165b17d authored by Ankur Chauhan's avatar Ankur Chauhan Committed by Andrew Or
Browse files

[SPARK-6707] [CORE] [MESOS] Mesos Scheduler should allow the user to specify...

[SPARK-6707] [CORE] [MESOS] Mesos Scheduler should allow the user to specify constraints based on slave attributes

Currently, the mesos scheduler only looks at the 'cpu' and 'mem' resources when trying to determine the usablility of a resource offer from a mesos slave node. It may be preferable for the user to be able to ensure that the spark jobs are only started on a certain set of nodes (based on attributes).

For example, If the user sets a property, let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.

Author: Ankur Chauhan <achauhan@brightcove.com>

Closes #5563 from ankurcha/mesos_attribs and squashes the following commits:

902535b [Ankur Chauhan] Fix line length
d83801c [Ankur Chauhan] Update code as per code review comments
8b73f2d [Ankur Chauhan] Fix imports
c3523e7 [Ankur Chauhan] Added docs
1a24d0b [Ankur Chauhan] Expand scope of attributes matching to include all data types
482fd71 [Ankur Chauhan] Update access modifier to private[this] for offer constraints
5ccc32d [Ankur Chauhan] Fix nit pick whitespace
1bce782 [Ankur Chauhan] Fix nit pick whitespace
c0cbc75 [Ankur Chauhan] Use offer id value for debug message
7fee0ea [Ankur Chauhan] Add debug statements
fc7eb5b [Ankur Chauhan] Fix import codestyle
00be252 [Ankur Chauhan] Style changes as per code review comments
662535f [Ankur Chauhan] Incorporate code review comments + use SparkFunSuite
fdc0937 [Ankur Chauhan] Decline offers that did not meet criteria
67b58a0 [Ankur Chauhan] Add documentation for spark.mesos.constraints
63f53f4 [Ankur Chauhan] Update codestyle - uniform style for config values
02031e4 [Ankur Chauhan] Fix scalastyle warnings in tests
c09ed84 [Ankur Chauhan] Fixed the access modifier on offerConstraints val to private[mesos]
0c64df6 [Ankur Chauhan] Rename overhead fractions to memory_*, fix spacing
8cc1e8f [Ankur Chauhan] Make exception message more explicit about the source of the error
addedba [Ankur Chauhan] Added test case for malformed constraint string
ec9d9a6 [Ankur Chauhan] Add tests for parse constraint string
72fe88a [Ankur Chauhan] Fix up tests + remove redundant method override, combine utility class into new mesos scheduler util trait
92b47fd [Ankur Chauhan] Add attributes based constraints support to MesosScheduler
parent 9ff20334
No related branches found
No related tags found
No related merge requests found
Showing
with 376 additions and 128 deletions
......@@ -18,18 +18,18 @@
package org.apache.spark.scheduler.cluster.mesos
import java.io.File
import java.util.{Collections, List => JList}
import java.util.{List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
......@@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(
val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)
// Offer constraints
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
var nextMesosTaskId = 0
@volatile var appId: String = _
......@@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores &&
mem >= MemoryUtils.calculateTotalMemory(sc) &&
val id = offer.getId.getValue
if (meetsConstraints &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
......@@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
.addResources(createResource("mem", calculateTotalMemory(sc)))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
}
d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(List(offer.getId), List(task.build()), filters)
} else {
// Filter it out
d.launchTasks(
Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
}
}
}
}
/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.build()
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import org.apache.spark.SparkContext
private[spark] object MemoryUtils {
// These defaults copied from YARN
val OVERHEAD_FRACTION = 0.10
val OVERHEAD_MINIMUM = 384
def calculateTotalMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory
}
}
......@@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
......
......@@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList}
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.spark.{SparkContext, SparkException, TaskState}
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkException, TaskState}
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
......@@ -59,6 +59,10 @@ private[spark] class MesosSchedulerBackend(
private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
// Offer constraints
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
@volatile var appId: String = _
override def start() {
......@@ -71,8 +75,8 @@ private[spark] class MesosSchedulerBackend(
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
......@@ -115,14 +119,14 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(mesosExecutorCores).build())
.setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
......@@ -191,13 +195,31 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
val offerAttributes = toAttributeMap(o.getAttributesList)
// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
val id = o.getId.getValue
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
meetsRequirements
}
// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
......@@ -223,15 +245,15 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
}
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
}
}
// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
......@@ -251,8 +273,6 @@ private[spark] class MesosSchedulerBackend(
d.declineOffer(o.getId)
}
// Decline offers we ruled out immediately
unUsableOffers.foreach(o => d.declineOffer(o.getId))
}
}
......
......@@ -17,14 +17,17 @@
package org.apache.spark.scheduler.cluster.mesos
import java.util.List
import java.util.{List => JList}
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConversions._
import scala.util.control.NonFatal
import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status}
import org.apache.mesos.{MesosSchedulerDriver, Scheduler}
import org.apache.spark.Logging
import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
import org.apache.mesos.Protos._
import org.apache.mesos.protobuf.GeneratedMessage
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.util.Utils
/**
......@@ -86,10 +89,150 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Get the amount of resources for the specified type from the resource list
*/
protected def getResource(res: List[Resource], name: String): Double = {
protected def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0.0
}
/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
protected def getAttribute(attr: Attribute): (String, Set[String]) = {
(attr.getName, attr.getText.getValue.split(',').toSet)
}
/** Build a Mesos resource protobuf object */
protected def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.build()
}
/**
* Converts the attributes from the resource offer into a Map of name -> Attribute Value
* The attribute values are the mesos attribute types and they are
* @param offerAttributes
* @return
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
offerAttributes.map(attr => {
val attrValue = attr.getType match {
case Value.Type.SCALAR => attr.getScalar
case Value.Type.RANGES => attr.getRanges
case Value.Type.SET => attr.getSet
case Value.Type.TEXT => attr.getText
}
(attr.getName, attrValue)
}).toMap
}
/**
* Match the requirements (if any) to the offer attributes.
* if attribute requirements are not specified - return true
* else if attribute is defined and no values are given, simple attribute presence is performed
* else if attribute name and value is specified, subset match is performed on slave attributes
*/
def matchesAttributeRequirements(
slaveOfferConstraints: Map[String, Set[String]],
offerAttributes: Map[String, GeneratedMessage]): Boolean = {
slaveOfferConstraints.forall {
// offer has the required attribute and subsumes the required values for that attribute
case (name, requiredValues) =>
offerAttributes.get(name) match {
case None => false
case Some(_) if requiredValues.isEmpty => true // empty value matches presence
case Some(scalarValue: Value.Scalar) =>
// check if provided values is less than equal to the offered values
requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue)
case Some(rangeValue: Value.Range) =>
val offerRange = rangeValue.getBegin to rangeValue.getEnd
// Check if there is some required value that is between the ranges specified
// Note: We only support the ability to specify discrete values, in the future
// we may expand it to subsume ranges specified with a XX..YY value or something
// similar to that.
requiredValues.map(_.toLong).exists(offerRange.contains(_))
case Some(offeredValue: Value.Set) =>
// check if the specified required values is a subset of offered set
requiredValues.subsetOf(offeredValue.getItemList.toSet)
case Some(textValue: Value.Text) =>
// check if the specified value is equal, if multiple values are specified
// we succeed if any of them match.
requiredValues.contains(textValue.getValue)
}
}
}
/**
* Parses the attributes constraints provided to spark and build a matching data struct:
* Map[<attribute-name>, Set[values-to-match]]
* The constraints are specified as ';' separated key-value pairs where keys and values
* are separated by ':'. The ':' implies equality (for singular values) and "is one of" for
* multiple values (comma separated). For example:
* {{{
* parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b")
* // would result in
* <code>
* Map(
* "tachyon" -> Set("true"),
* "zone": -> Set("us-east-1a", "us-east-1b")
* )
* }}}
*
* Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/
* https://github.com/apache/mesos/blob/master/src/common/values.cpp
* https://github.com/apache/mesos/blob/master/src/common/attributes.cpp
*
* @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated
* by ':')
* @return Map of constraints to match resources offers.
*/
def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {
/*
Based on mesos docs:
attributes : attribute ( ";" attribute )*
attribute : labelString ":" ( labelString | "," )+
labelString : [a-zA-Z0-9_/.-]
*/
val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':')
// kv splitter
if (constraintsVal.isEmpty) {
Map()
} else {
try {
Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map {
case (k, v) =>
if (v == null || v.isEmpty) {
(k, Set[String]())
} else {
(k, v.split(',').toSet)
}
}
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e)
}
}
}
// These defaults copied from YARN
private val MEMORY_OVERHEAD_FRACTION = 0.10
private val MEMORY_OVERHEAD_MINIMUM = 384
/**
* Return the amount of memory to allocate to each executor, taking into account
* container overheads.
* @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
*/
def calculateTotalMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar {
test("MesosMemoryUtils should always override memoryOverhead when it's set") {
val sparkConf = new SparkConf
val sc = mock[SparkContext]
when(sc.conf).thenReturn(sparkConf)
// 384 > sc.executorMemory * 0.1 => 512 + 384 = 896
when(sc.executorMemory).thenReturn(512)
assert(MemoryUtils.calculateTotalMemory(sc) === 896)
// 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6
when(sc.executorMemory).thenReturn(4096)
assert(MemoryUtils.calculateTotalMemory(sc) === 4505)
// set memoryOverhead
sparkConf.set("spark.mesos.executor.memoryOverhead", "100")
assert(MemoryUtils.calculateTotalMemory(sc) === 4196)
sparkConf.set("spark.mesos.executor.memoryOverhead", "400")
assert(MemoryUtils.calculateTotalMemory(sc) === 4496)
}
}
......@@ -149,7 +149,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
when(sc.conf).thenReturn(new SparkConf)
when(sc.listenerBus).thenReturn(listenerBus)
val minMem = MemoryUtils.calculateTotalMemory(sc).toInt
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
val minMem = backend.calculateTotalMemory(sc)
val minCpu = 4
val mesosOffers = new java.util.ArrayList[Offer]
......@@ -157,8 +159,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
mesosOffers.add(createOffer(2, minMem - 1, minCpu))
mesosOffers.add(createOffer(3, minMem, minCpu))
val backend = new MesosSchedulerBackend(taskScheduler, sc, "master")
val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.Value
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mock.MockitoSugar
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
// scalastyle:off structural.type
// this is the documented way of generating fixtures in scalatest
def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
val sparkConf = new SparkConf
val sc = mock[SparkContext]
when(sc.conf).thenReturn(sparkConf)
}
val utils = new MesosSchedulerUtils { }
// scalastyle:on structural.type
test("use at-least minimum overhead") {
val f = fixture
when(f.sc.executorMemory).thenReturn(512)
utils.calculateTotalMemory(f.sc) shouldBe 896
}
test("use overhead if it is greater than minimum value") {
val f = fixture
when(f.sc.executorMemory).thenReturn(4096)
utils.calculateTotalMemory(f.sc) shouldBe 4505
}
test("use spark.mesos.executor.memoryOverhead (if set)") {
val f = fixture
when(f.sc.executorMemory).thenReturn(1024)
f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
utils.calculateTotalMemory(f.sc) shouldBe 1536
}
test("parse a non-empty constraint string correctly") {
val expectedMap = Map(
"tachyon" -> Set("true"),
"zone" -> Set("us-east-1a", "us-east-1b")
)
utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap)
}
test("parse an empty constraint string correctly") {
utils.parseConstraintString("") shouldBe Map()
}
test("throw an exception when the input is malformed") {
an[IllegalArgumentException] should be thrownBy
utils.parseConstraintString("tachyon;zone:us-east")
}
test("empty values for attributes' constraints matches all values") {
val constraintsStr = "tachyon:"
val parsedConstraints = utils.parseConstraintString(constraintsStr)
parsedConstraints shouldBe Map("tachyon" -> Set())
val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
val noTachyonOffer = Map("zone" -> zoneSet)
val tachyonTrueOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
val tachyonFalseOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("false").build())
utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) shouldBe false
utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) shouldBe true
utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) shouldBe true
}
test("subset match is performed for set attributes") {
val supersetConstraint = Map(
"tachyon" -> Value.Text.newBuilder().setValue("true").build(),
"zone" -> Value.Set.newBuilder()
.addItem("us-east-1a")
.addItem("us-east-1b")
.addItem("us-east-1c")
.build())
val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c"
val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
}
test("less than equal match is performed on scalar attributes") {
val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
val ltConstraint = utils.parseConstraintString("gpus:2")
val eqConstraint = utils.parseConstraintString("gpus:3")
val gtConstraint = utils.parseConstraintString("gpus:4")
utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
}
test("contains match is performed for range attributes") {
val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
val ltConstraint = utils.parseConstraintString("ports:6000")
val eqConstraint = utils.parseConstraintString("ports:7500")
val gtConstraint = utils.parseConstraintString("ports:8002")
val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
}
test("equality match is performed for text attributes") {
val offerAttribs = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build())
val trueConstraint = utils.parseConstraintString("tachyon:true")
val falseConstraint = utils.parseConstraintString("tachyon:false")
utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
}
}
......@@ -184,6 +184,14 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere
only makes sense if you run just one application at a time. You can cap the maximum number of cores
using `conf.set("spark.cores.max", "10")` (for example).
You may also make use of `spark.mesos.constraints` to set attribute based constraints on mesos resource offers. By default, all resource offers will be accepted.
{% highlight scala %}
conf.set("spark.mesos.constraints", "tachyon=true;us-east-1=false")
{% endhighlight %}
For example, Let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
# Mesos Docker Support
Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image`
......@@ -298,6 +306,20 @@ See the [configuration page](configuration.html) for information on Spark config
the final overhead will be this value.
</td>
</tr>
<tr>
<td><code>spark.mesos.constraints</code></td>
<td>Attribute based constraints to be matched against when accepting resource offers.</td>
<td>
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
<ul>
<li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
<li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
<li>Set constraints are matched with "subset of" semantics i.e. value in the constraint must be a subset of the resource offer's value.</li>
<li>Text constraints are metched with "equality" semantics i.e. value in the constraint must be exactly equal to the resource offer's value.</li>
<li>In case there is no value present as a part of the constraint any offer with the corresponding attribute will be accepted (without value check).</li>
</ul>
</td>
</tr>
</table>
# Troubleshooting and Debugging
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment