diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 10e6886c16a4f026fc8e08dfc48e9d5c0aa04129..d13795186c48ebf94023b29e8be04827299d3a52 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -208,12 +208,10 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { - // Fail-fast on offers we know will be rejected - val (usableOffers, unUsableOffers) = offers.partition { o => + val (acceptedOffers, declinedOffers) = offers.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task cpus >= 2 * scheduler.CPUS_PER_TASK) || @@ -221,12 +219,11 @@ private[spark] class MesosSchedulerBackend( cpus >= scheduler.CPUS_PER_TASK) } - val workerOffers = usableOffers.map { o => + val offerableWorkers = acceptedOffers.map { o => val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { getResource(o.getResourcesList, "cpus").toInt } else { // If the executor doesn't exist yet, subtract CPU for executor - // TODO(pwendell): Should below just subtract "1"? getResource(o.getResourcesList, "cpus").toInt - scheduler.CPUS_PER_TASK } @@ -236,20 +233,17 @@ private[spark] class MesosSchedulerBackend( cpus) } - val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap + val slaveIdToOffer = acceptedOffers.map(o => o.getSlaveId.getValue -> o).toMap val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] - val slavesIdsOfAcceptedOffers = HashSet[String]() - // Call into the TaskSchedulerImpl - val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) - acceptedOffers + scheduler.resourceOffers(offerableWorkers) + .filter(!_.isEmpty) .foreach { offer => offer.foreach { taskDesc => val slaveId = taskDesc.executorId slaveIdsWithExecutors += slaveId - slavesIdsOfAcceptedOffers += slaveId taskIdToSlaveId(taskDesc.taskId) = slaveId mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) .add(createMesosTask(taskDesc, slaveId)) @@ -263,14 +257,7 @@ private[spark] class MesosSchedulerBackend( d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } - // Decline offers that weren't used - // NOTE: This logic assumes that we only get a single offer for each host in a given batch - for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) { - d.declineOffer(o.getId) - } - - // Decline offers we ruled out immediately - unUsableOffers.foreach(o => d.declineOffer(o.getId)) + declinedOffers.foreach(o => d.declineOffer(o.getId)) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index e60e70afd321853a467fd7a6fa29c6eb06209181..bef8d3a58ba63b1ffb8fa7ca948a8e729c153204 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -30,11 +30,9 @@ import java.nio.ByteBuffer import java.util.Collections import java.util import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { - - test("mesos resource offers result in launching tasks") { + test("mesos resource offer is launching tasks") { def createOffer(id: Int, mem: Int, cpu: Int) = { val builder = Offer.newBuilder() builder.addResourcesBuilder() @@ -45,61 +43,46 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() + builder.setId(OfferID.newBuilder().setValue(id.toString).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue("s1")).setHostname("localhost").build() } val driver = EasyMock.createMock(classOf[SchedulerDriver]) val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() EasyMock.expect(sc.conf).andReturn(new SparkConf).anyTimes() EasyMock.replay(sc) - val minMem = MemoryUtils.calculateTotalMemory(sc).toInt val minCpu = 4 - - val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) - mesosOffers.add(createOffer(2, minMem - 1, minCpu)) - mesosOffers.add(createOffer(3, minMem, minCpu)) - + val offers = new java.util.ArrayList[Offer] + offers.add(createOffer(1, minMem, minCpu)) + offers.add(createOffer(1, minMem - 1, minCpu)) val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - - val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) - expectedWorkerOffers.append(new WorkerOffer( - mesosOffers.get(0).getSlaveId.getValue, - mesosOffers.get(0).getHostname, - 2 - )) - expectedWorkerOffers.append(new WorkerOffer( - mesosOffers.get(2).getSlaveId.getValue, - mesosOffers.get(2).getHostname, + val workerOffers = Seq(offers.get(0)).map(o => new WorkerOffer( + o.getSlaveId.getValue, + o.getHostname, 2 )) val taskDesc = new TaskDescription(1L, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) - EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(expectedWorkerOffers))).andReturn(Seq(Seq(taskDesc))) + EasyMock.expect(taskScheduler.resourceOffers(EasyMock.eq(workerOffers))).andReturn(Seq(Seq(taskDesc))) EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() EasyMock.replay(taskScheduler) - val capture = new Capture[util.Collection[TaskInfo]] EasyMock.expect( driver.launchTasks( - EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), + EasyMock.eq(Collections.singleton(offers.get(0).getId)), EasyMock.capture(capture), EasyMock.anyObject(classOf[Filters]) ) - ).andReturn(Status.valueOf(1)).once - EasyMock.expect(driver.declineOffer(mesosOffers.get(1).getId)).andReturn(Status.valueOf(1)).times(1) - EasyMock.expect(driver.declineOffer(mesosOffers.get(2).getId)).andReturn(Status.valueOf(1)).times(1) + ).andReturn(Status.valueOf(1)) + EasyMock.expect(driver.declineOffer(offers.get(1).getId)).andReturn(Status.valueOf(1)) EasyMock.replay(driver) - - backend.resourceOffers(driver, mesosOffers) - - EasyMock.verify(driver) + backend.resourceOffers(driver, offers) assert(capture.getValue.size() == 1) val taskInfo = capture.getValue.iterator().next() assert(taskInfo.getName.equals("n1")) @@ -107,19 +90,5 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea assert(cpus.getName.equals("cpus")) assert(cpus.getScalar.getValue.equals(2.0)) assert(taskInfo.getSlaveId.getValue.equals("s1")) - - // Unwanted resources offered on an existing node. Make sure they are declined - val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) - EasyMock.reset(taskScheduler) - EasyMock.reset(driver) - EasyMock.expect(taskScheduler.resourceOffers(EasyMock.anyObject(classOf[Seq[WorkerOffer]])).andReturn(Seq(Seq()))) - EasyMock.expect(taskScheduler.CPUS_PER_TASK).andReturn(2).anyTimes() - EasyMock.replay(taskScheduler) - EasyMock.expect(driver.declineOffer(mesosOffers2.get(0).getId)).andReturn(Status.valueOf(1)).times(1) - EasyMock.replay(driver) - - backend.resourceOffers(driver, mesosOffers2) - EasyMock.verify(driver) } }