Skip to content
Snippets Groups Projects
Commit 27347b5f authored by Michael Gummelt's avatar Michael Gummelt Committed by Sean Owen
Browse files

[SPARK-19373][MESOS] Base spark.scheduler.minRegisteredResourceRatio …

…on registered cores rather than accepted cores

See JIRA

Unit tests, Mesos/Spark integration tests

cc skonto susanxhuynh

Author: Michael Gummelt <mgummeltmesosphere.io>

Closes #17045 from mgummelt/SPARK-19373-registered-resources.

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #17129 from mgummelt/SPARK-19373-registered-resources-2.1.
parent bbe0d8ca
No related branches found
No related tags found
No related merge requests found
......@@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
with org.apache.mesos.Scheduler
with MesosSchedulerUtils {
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
// Blacklist a slave after this many failures
private val MAX_SLAVE_FAILURES = 2
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
// Maximum number of cores to acquire
private val maxCores = maxCoresOption.getOrElse(Int.MaxValue)
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false)
private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
......@@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new mutable.HashMap[String, Int]
val gpusByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0
var totalGpusAcquired = 0
private val coresByTaskId = new mutable.HashMap[String, Int]
private val gpusByTaskId = new mutable.HashMap[String, Int]
private var totalCoresAcquired = 0
private var totalGpusAcquired = 0
// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
......@@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0)
// Offer constraints
private val slaveOfferConstraints =
......@@ -140,7 +143,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
securityManager.isSaslEncryptionEnabled())
}
var nextMesosTaskId = 0
private var nextMesosTaskId = 0
@volatile var appId: String = _
......@@ -257,7 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
override def sufficientResourcesRegistered(): Boolean = {
totalCoresAcquired >= maxCores * minRegisteredRatio
totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
}
override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
......
......@@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
......@@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._
......@@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
test("weburi is set in created scheduler driver") {
setBackend()
initializeSparkConf()
sc = new SparkContext(sparkConf)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(webuiUrl.isDefined)
assert(webuiUrl.get.equals("http://webui"))
......@@ -422,37 +424,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}
test("Do not call removeExecutor() after backend is stopped") {
setBackend()
// launches a task on a valid offer
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")
// launches a thread simulating status update
val statusUpdateThread = new Thread {
override def run(): Unit = {
while (!stopCalled) {
Thread.sleep(100)
}
val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
backend.statusUpdate(driver, status)
}
}.start
backend.stop()
// Any method of the backend involving sending messages to the driver endpoint should not
// be called after the backend is stopped.
verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
}
test("mesos supports spark.executor.uri") {
val url = "spark.spark.spark.com"
setBackend(Map(
"spark.executor.uri" -> url
), false)
), null)
val (mem, cpu) = (backend.executorMemory(sc), 4)
......@@ -468,7 +444,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "true",
"spark.executor.uri" -> url
), false)
), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
......@@ -482,7 +458,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map(
"spark.mesos.fetcherCache.enable" -> "false",
"spark.executor.uri" -> url
), false)
), null)
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
......@@ -491,8 +467,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!uris.asScala.head.getCache)
}
test("supports spark.scheduler.minRegisteredResourcesRatio") {
val expectedCores = 1
setBackend(Map(
"spark.cores.max" -> expectedCores.toString,
"spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))
val offers = List(Resources(backend.executorMemory(sc), expectedCores))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")
assert(!backend.isReady)
registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores)
assert(backend.isReady)
}
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = {
val mockEndpointRef = mock[RpcEndpointRef]
val mockAddress = mock[RpcAddress]
val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty)
backend.driverEndpoint.askWithRetry[Boolean](message)
}
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
......@@ -521,8 +520,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
shuffleClient: MesosExternalShuffleClient,
endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
shuffleClient: MesosExternalShuffleClient) = {
val securityManager = mock[SecurityManager]
val backend = new MesosCoarseGrainedSchedulerBackend(
......@@ -540,9 +538,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
override protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
// override to avoid race condition with the driver thread on `mesosDriver`
override def startScheduler(newDriver: SchedulerDriver): Unit = {
mesosDriver = newDriver
......@@ -558,31 +553,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend
}
private def setBackend(sparkConfVars: Map[String, String] = null,
setHome: Boolean = true) {
private def initializeSparkConf(
sparkConfVars: Map[String, String] = null,
home: String = "/path"): Unit = {
sparkConf = (new SparkConf)
.setMaster("local[*]")
.setAppName("test-mesos-dynamic-alloc")
.set("spark.mesos.driver.webui.url", "http://webui")
if (setHome) {
sparkConf.setSparkHome("/path")
if (home != null) {
sparkConf.setSparkHome(home)
}
if (sparkConfVars != null) {
sparkConf.setAll(sparkConfVars)
}
}
private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") {
initializeSparkConf(sparkConfVars, home)
sc = new SparkContext(sparkConf)
driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)
externalShuffleClient = mock[MesosExternalShuffleClient]
driverEndpoint = mock[RpcEndpointRef]
when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future)
backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment