Skip to content
Snippets Groups Projects
Commit 5039a49b authored by Felix Bechstein's avatar Felix Bechstein Committed by Andrew Or
Browse files

[SPARK-10471][CORE][MESOS] prevent getting offers for unmet constraints

this change rejects offers for slaves with unmet constraints for 120s to mitigate offer starvation.
this prevents mesos to send us these offers again and again.
in return, we get more offers for slaves which might meet our constraints.
and it enables mesos to send the rejected offers to other frameworks.

Author: Felix Bechstein <felix.bechstein@otto.de>

Closes #8639 from felixb/decline_offers_constraint_mismatch.
parent 88a3fdcc
No related branches found
No related tags found
No related merge requests found
......@@ -101,6 +101,10 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
// A client for talking to the external shuffle service, if it is a
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) {
......@@ -249,48 +253,56 @@ private[spark] class CoarseMesosSchedulerBackend(
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
if (meetsConstraints) {
if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
val taskId = newMesosTaskId()
taskIdToSlaveId.put(taskId, slaveId)
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}
// Accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
}
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
// This offer does not meet constraints. We don't need to see it again.
// Decline the offer for a long period of time.
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ s" for $rejectOfferDurationForUnmetConstraints seconds")
d.declineOffer(offer.getId, Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}
}
}
......
......@@ -63,6 +63,10 @@ private[spark] class MesosSchedulerBackend(
private[this] val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc)
@volatile var appId: String = _
override def start() {
......@@ -212,29 +216,47 @@ private[spark] class MesosSchedulerBackend(
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
inClassLoader() {
// Fail-fast on offers we know will be rejected
val (usableOffers, unUsableOffers) = offers.asScala.partition { o =>
// Fail first on offers with unmet constraints
val (offersMatchingConstraints, offersNotMatchingConstraints) =
offers.asScala.partition { o =>
val offerAttributes = toAttributeMap(o.getAttributesList)
val meetsConstraints =
matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
// add some debug messaging
if (!meetsConstraints) {
val id = o.getId.getValue
logDebug(s"Declining offer: $id with attributes: $offerAttributes")
}
meetsConstraints
}
// These offers do not meet constraints. We don't need to see them again.
// Decline the offer for a long period of time.
offersNotMatchingConstraints.foreach { o =>
d.declineOffer(o.getId, Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build())
}
// Of the matching constraints, see which ones give us enough memory and cores
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)
// check if all constraints are satisfield
// 1. Attribute constraints
// 2. Memory requirements
// 3. CPU requirements - need at least 1 for executor, 1 for task
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
(meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
val id = o.getId.getValue
logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ s"$offerAttributes mem: $mem cpu: $cpus")
meetsRequirements
}
......
......@@ -336,4 +336,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}
protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment