Skip to content
Snippets Groups Projects
Commit f0562e8c authored by Iulian Dragos's avatar Iulian Dragos Committed by Andrew Or
Browse files

[SPARK-6350] [MESOS] Fine-grained mode scheduler respects mesosExecutor.cores

This is a regression introduced in #4960, this commit fixes it and adds a test.

tnachen andrewor14 please review, this should be an easy one.

Author: Iulian Dragos <jaguarul@gmail.com>

Closes #8653 from dragos/issue/mesos/fine-grained-maxExecutorCores.
parent af3bc59d
No related branches found
No related tags found
No related merge requests found
......@@ -32,7 +32,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
......@@ -127,7 +126,7 @@ private[spark] class MesosSchedulerBackend(
}
val builder = MesosExecutorInfo.newBuilder()
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
partitionResources(availableResources, "cpus", mesosExecutorCores)
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
......
......@@ -42,6 +42,38 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui
class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
test("Use configured mesosExecutor.cores for ExecutorInfo") {
val mesosExecutorCores = 3
val conf = new SparkConf
conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
val sc = mock[SparkContext]
when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
when(sc.conf).thenReturn(conf)
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.executorMemory).thenReturn(100)
when(sc.listenerBus).thenReturn(listenerBus)
val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
val resources = Arrays.asList(
mesosSchedulerBackend.createResource("cpus", 4),
mesosSchedulerBackend.createResource("mem", 1024))
// uri is null.
val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
val executorResources = executorInfo.getResourcesList
val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
assert(cpus === mesosExecutorCores)
}
test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")
......@@ -263,7 +295,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
.setHostname(s"host${id.toString}").build()
val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(offer)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment