diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5063c1fe988bcf81e4299170ca2029e00438a48a..22df2b1db284e82eaa9cf434cc40044b788b06cf 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -54,14 +54,17 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( with org.apache.mesos.Scheduler with MesosSchedulerUtils { - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + // Blacklist a slave after this many failures + private val MAX_SLAVE_FAILURES = 2 - // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt) - val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + // Maximum number of cores to acquire + private val maxCores = maxCoresOption.getOrElse(Int.MaxValue) - val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + + private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") @@ -75,10 +78,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new mutable.HashMap[String, Int] - val gpusByTaskId = new mutable.HashMap[String, Int] - var totalCoresAcquired = 0 - var totalGpusAcquired = 0 + private val coresByTaskId = new mutable.HashMap[String, Int] + private val gpusByTaskId = new mutable.HashMap[String, Int] + private var totalCoresAcquired = 0 + private var totalGpusAcquired = 0 // SlaveID -> Slave // This map accumulates entries for the duration of the job. Slaves are never deleted, because @@ -108,7 +111,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // may lead to deadlocks since the superclass might also try to lock private val stateLock = new ReentrantLock - val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) + private val extraCoresPerExecutor = conf.getInt("spark.mesos.extra.cores", 0) // Offer constraints private val slaveOfferConstraints = @@ -140,7 +143,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( securityManager.isSaslEncryptionEnabled()) } - var nextMesosTaskId = 0 + private var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -257,7 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } override def sufficientResourcesRegistered(): Boolean = { - totalCoresAcquired >= maxCores * minRegisteredRatio + totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio } override def disconnected(d: org.apache.mesos.SchedulerDriver) {} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index f73638fda62327a76a7659675883458fe2cdd6f5..f96d65338b79ccb606ca7c4b1582a0907ba41fe7 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -20,9 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.concurrent.Promise import scala.reflect.ClassTag import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} @@ -37,8 +35,8 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient -import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.mesos.Utils._ @@ -304,25 +302,29 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite } test("weburi is set in created scheduler driver") { - setBackend() + initializeSparkConf() + sc = new SparkContext(sparkConf) + val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] val backend = new MesosCoarseGrainedSchedulerBackend( - taskScheduler, sc, "master", securityManager) { + taskScheduler, sc, "master", securityManager) { override protected def createSchedulerDriver( - masterUrl: String, - scheduler: Scheduler, - sparkUser: String, - appName: String, - conf: SparkConf, - webuiUrl: Option[String] = None, - checkpoint: Option[Boolean] = None, - failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): SchedulerDriver = { + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { markRegistered() assert(webuiUrl.isDefined) assert(webuiUrl.get.equals("http://webui")) @@ -422,37 +424,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!dockerInfo.getForcePullImage) } - test("Do not call removeExecutor() after backend is stopped") { - setBackend() - - // launches a task on a valid offer - val offers = List(Resources(backend.executorMemory(sc), 1)) - offerResources(offers) - verifyTaskLaunched(driver, "o1") - - // launches a thread simulating status update - val statusUpdateThread = new Thread { - override def run(): Unit = { - while (!stopCalled) { - Thread.sleep(100) - } - - val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) - backend.statusUpdate(driver, status) - } - }.start - - backend.stop() - // Any method of the backend involving sending messages to the driver endpoint should not - // be called after the backend is stopped. - verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) - } - test("mesos supports spark.executor.uri") { val url = "spark.spark.spark.com" setBackend(Map( "spark.executor.uri" -> url - ), false) + ), null) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -468,7 +444,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite setBackend(Map( "spark.mesos.fetcherCache.enable" -> "true", "spark.executor.uri" -> url - ), false) + ), null) val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) val launchedTasks = verifyTaskLaunched(driver, "o1") @@ -482,7 +458,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite setBackend(Map( "spark.mesos.fetcherCache.enable" -> "false", "spark.executor.uri" -> url - ), false) + ), null) val offers = List(Resources(backend.executorMemory(sc), 1)) offerResources(offers) val launchedTasks = verifyTaskLaunched(driver, "o1") @@ -491,8 +467,31 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(!uris.asScala.head.getCache) } + test("supports spark.scheduler.minRegisteredResourcesRatio") { + val expectedCores = 1 + setBackend(Map( + "spark.cores.max" -> expectedCores.toString, + "spark.scheduler.minRegisteredResourcesRatio" -> "1.0")) + + val offers = List(Resources(backend.executorMemory(sc), expectedCores)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(!backend.isReady) + + registerMockExecutor(launchedTasks(0).getTaskId.getValue, "s1", expectedCores) + assert(backend.isReady) + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) + private def registerMockExecutor(executorId: String, slaveId: String, cores: Integer) = { + val mockEndpointRef = mock[RpcEndpointRef] + val mockAddress = mock[RpcAddress] + val message = RegisterExecutor(executorId, mockEndpointRef, slaveId, cores, Map.empty) + + backend.driverEndpoint.askWithRetry[Boolean](message) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { @@ -521,8 +520,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver, - shuffleClient: MesosExternalShuffleClient, - endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = { + shuffleClient: MesosExternalShuffleClient) = { val securityManager = mock[SecurityManager] val backend = new MesosCoarseGrainedSchedulerBackend( @@ -540,9 +538,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient - override protected def createDriverEndpointRef( - properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint - // override to avoid race condition with the driver thread on `mesosDriver` override def startScheduler(newDriver: SchedulerDriver): Unit = { mesosDriver = newDriver @@ -558,31 +553,35 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend } - private def setBackend(sparkConfVars: Map[String, String] = null, - setHome: Boolean = true) { + private def initializeSparkConf( + sparkConfVars: Map[String, String] = null, + home: String = "/path"): Unit = { sparkConf = (new SparkConf) .setMaster("local[*]") .setAppName("test-mesos-dynamic-alloc") .set("spark.mesos.driver.webui.url", "http://webui") - if (setHome) { - sparkConf.setSparkHome("/path") + if (home != null) { + sparkConf.setSparkHome(home) } if (sparkConfVars != null) { sparkConf.setAll(sparkConfVars) } + } + private def setBackend(sparkConfVars: Map[String, String] = null, home: String = "/path") { + initializeSparkConf(sparkConfVars, home) sc = new SparkContext(sparkConf) driver = mock[SchedulerDriver] when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) + externalShuffleClient = mock[MesosExternalShuffleClient] - driverEndpoint = mock[RpcEndpointRef] - when(driverEndpoint.ask(any())(any())).thenReturn(Promise().future) - backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint) + backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient) } }