Skip to content
Snippets Groups Projects
Commit eb019af9 authored by Sebastien Rainville's avatar Sebastien Rainville Committed by Andrew Or
Browse files

[SPARK-13001][CORE][MESOS] Prevent getting offers when reached max cores

Similar to https://github.com/apache/spark/pull/8639

This change rejects offers for 120s when reached `spark.cores.max` in coarse-grained mode to mitigate offer starvation. This prevents Mesos to send us offers again and again, starving other frameworks. This is especially problematic when running many small frameworks on the same Mesos cluster, e.g. many small Sparks streaming jobs, and cause the bigger spark jobs to stop receiving offers. By rejecting the offers for a long period of time, they become available to those other frameworks.

Author: Sebastien Rainville <sebastien@hopper.com>

Closes #10924 from sebastienrainville/master.
parent cdce4e62
No related branches found
No related tags found
No related merge requests found
...@@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -109,10 +109,14 @@ private[spark] class CoarseMesosSchedulerBackend(
private val slaveOfferConstraints = private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))
// reject offers with mismatched constraints in seconds // Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints = private val rejectOfferDurationForUnmetConstraints =
getRejectOfferDurationForUnmetConstraints(sc) getRejectOfferDurationForUnmetConstraints(sc)
// Reject offers when we reached the maximum number of cores for this framework
private val rejectOfferDurationForReachedMaxCores =
getRejectOfferDurationForReachedMaxCores(sc)
// A client for talking to the external shuffle service // A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = { private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
if (shuffleServiceEnabled) { if (shuffleServiceEnabled) {
...@@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -279,18 +283,32 @@ private[spark] class CoarseMesosSchedulerBackend(
} }
private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = { private def declineUnmatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
for (offer <- offers) { offers.foreach { offer =>
val id = offer.getId.getValue declineOffer(d, offer, Some("unmet constraints"),
val offerAttributes = toAttributeMap(offer.getAttributesList) Some(rejectOfferDurationForUnmetConstraints))
val mem = getResource(offer.getResourcesList, "mem") }
val cpus = getResource(offer.getResourcesList, "cpus") }
val filters = Filters.newBuilder()
.setRefuseSeconds(rejectOfferDurationForUnmetConstraints).build()
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus"
+ s" for $rejectOfferDurationForUnmetConstraints seconds")
d.declineOffer(offer.getId, filters) private def declineOffer(
d: SchedulerDriver,
offer: Offer,
reason: Option[String] = None,
refuseSeconds: Option[Long] = None): Unit = {
val id = offer.getId.getValue
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
s" cpu: $cpus for $refuseSeconds seconds" +
reason.map(r => s" (reason: $r)").getOrElse(""))
refuseSeconds match {
case Some(seconds) =>
val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
d.declineOffer(offer.getId, filters)
case _ => d.declineOffer(offer.getId)
} }
} }
...@@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend( ...@@ -326,11 +344,12 @@ private[spark] class CoarseMesosSchedulerBackend(
d.launchTasks( d.launchTasks(
Collections.singleton(offer.getId), Collections.singleton(offer.getId),
offerTasks.asJava) offerTasks.asJava)
} else { // decline } else if (totalCoresAcquired >= maxCores) {
logDebug(s"Declining offer: $id with attributes: $offerAttributes " + // Reject an offer for a configurable amount of time to avoid starving other frameworks
s"mem: $offerMem cpu: $offerCpus") declineOffer(d, offer, Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
d.declineOffer(offer.getId) } else {
declineOffer(d, offer)
} }
} }
} }
......
...@@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { ...@@ -352,4 +352,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
} }
protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
}
} }
...@@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite ...@@ -147,6 +147,19 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
verifyDeclinedOffer(driver, createOfferId("o1"), true) verifyDeclinedOffer(driver, createOfferId("o1"), true)
} }
test("mesos declines offers with a filter when reached spark.cores.max") {
val maxCores = 3
setBackend(Map("spark.cores.max" -> maxCores.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))
verifyTaskLaunched("o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
}
test("mesos assigns tasks round-robin on offers") { test("mesos assigns tasks round-robin on offers") {
val executorCores = 4 val executorCores = 4
val maxCores = executorCores * 2 val maxCores = executorCores * 2
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment