Skip to content
Snippets Groups Projects
Commit 00461bb9 authored by Timothy Chen's avatar Timothy Chen Committed by Andrew Or
Browse files

[SPARK-10749][MESOS] Support multiple roles with mesos cluster mode.

Currently the Mesos cluster dispatcher is not using offers from multiple roles correctly, as it simply aggregates all the offers resource values into one, but doesn't apply them correctly before calling the driver as Mesos needs the resources from the offers to be specified which role it originally belongs to. Multiple roles is already supported with fine/coarse grain scheduler, so porting that logic here to the cluster scheduler.

https://issues.apache.org/jira/browse/SPARK-10749

Author: Timothy Chen <tnachen@gmail.com>

Closes #8872 from tnachen/cluster_multi_roles.
parent 40e6d40f
No related branches found
No related tags found
No related merge requests found
...@@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler( ...@@ -357,9 +357,10 @@ private[spark] class MesosClusterScheduler(
val appJar = CommandInfo.URI.newBuilder() val appJar = CommandInfo.URI.newBuilder()
.setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build() .setValue(desc.jarUrl.stripPrefix("file:").stripPrefix("local:")).build()
val builder = CommandInfo.newBuilder().addUris(appJar) val builder = CommandInfo.newBuilder().addUris(appJar)
val entries = val entries = conf.getOption("spark.executor.extraLibraryPath")
(conf.getOption("spark.executor.extraLibraryPath").toList ++ .map(path => Seq(path) ++ desc.command.libraryPathEntries)
desc.command.libraryPathEntries) .getOrElse(desc.command.libraryPathEntries)
val prefixEnv = if (!entries.isEmpty) { val prefixEnv = if (!entries.isEmpty) {
Utils.libraryPathEnvPrefix(entries) Utils.libraryPathEnvPrefix(entries)
} else { } else {
...@@ -442,9 +443,12 @@ private[spark] class MesosClusterScheduler( ...@@ -442,9 +443,12 @@ private[spark] class MesosClusterScheduler(
options options
} }
private class ResourceOffer(val offer: Offer, var cpu: Double, var mem: Double) { private class ResourceOffer(
val offerId: OfferID,
val slaveId: SlaveID,
var resources: JList[Resource]) {
override def toString(): String = { override def toString(): String = {
s"Offer id: ${offer.getId.getValue}, cpu: $cpu, mem: $mem" s"Offer id: ${offerId}, resources: ${resources}"
} }
} }
...@@ -463,27 +467,29 @@ private[spark] class MesosClusterScheduler( ...@@ -463,27 +467,29 @@ private[spark] class MesosClusterScheduler(
val driverMem = submission.mem val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val offerOption = currentOffers.find { o => val offerOption = currentOffers.find { o =>
o.cpu >= driverCpu && o.mem >= driverMem getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
} }
if (offerOption.isEmpty) { if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " + logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem") s"cpu: $driverCpu, mem: $driverMem")
} else { } else {
val offer = offerOption.get val offer = offerOption.get
offer.cpu -= driverCpu
offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build() val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val cpuResource = createResource("cpus", driverCpu) val (remainingResources, cpuResourcesToUse) =
val memResource = createResource("mem", driverMem) partitionResources(offer.resources, "cpus", driverCpu)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", driverMem)
val commandInfo = buildDriverCommand(submission) val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name") val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder() val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId) .setTaskId(taskId)
.setName(s"Driver for $appName") .setName(s"Driver for $appName")
.setSlaveId(offer.offer.getSlaveId) .setSlaveId(offer.slaveId)
.setCommand(commandInfo) .setCommand(commandInfo)
.addResources(cpuResource) .addAllResources(cpuResourcesToUse.asJava)
.addResources(memResource) .addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder() val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties val volumes = submission.schedulerProperties
...@@ -496,11 +502,11 @@ private[spark] class MesosClusterScheduler( ...@@ -496,11 +502,11 @@ private[spark] class MesosClusterScheduler(
container, image, volumes = volumes, portmaps = portmaps) container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build()) taskInfo.setContainer(container.build())
} }
val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo]) val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build() queuedTasks += taskInfo.build()
logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " + logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
submission.submissionId) submission.submissionId)
val newState = new MesosClusterSubmissionState(submission, taskId, offer.offer.getSlaveId, val newState = new MesosClusterSubmissionState(submission, taskId, offer.slaveId,
None, new Date(), None) None, new Date(), None)
launchedDrivers(submission.submissionId) = newState launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState) launchedDriversState.persist(submission.submissionId, newState)
...@@ -510,14 +516,14 @@ private[spark] class MesosClusterScheduler( ...@@ -510,14 +516,14 @@ private[spark] class MesosClusterScheduler(
} }
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
val currentOffers = offers.asScala.map(o => logTrace(s"Received offers from Mesos: \n${offers.asScala.mkString("\n")}")
new ResourceOffer(
o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
).toList
logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}")
val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]()
val currentTime = new Date() val currentTime = new Date()
val currentOffers = offers.asScala.map {
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
}.toList
stateLock.synchronized { stateLock.synchronized {
// We first schedule all the supervised drivers that are ready to retry. // We first schedule all the supervised drivers that are ready to retry.
// This list will be empty if none of the drivers are marked as supervise. // This list will be empty if none of the drivers are marked as supervise.
...@@ -541,9 +547,10 @@ private[spark] class MesosClusterScheduler( ...@@ -541,9 +547,10 @@ private[spark] class MesosClusterScheduler(
tasks.foreach { case (offerId, taskInfos) => tasks.foreach { case (offerId, taskInfos) =>
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
} }
offers.asScala
.filter(o => !tasks.keySet.contains(o.getId)) for (o <- currentOffers if !tasks.contains(o.offerId)) {
.foreach(o => driver.declineOffer(o.getId)) driver.declineOffer(o.offerId)
}
} }
private def copyBuffer( private def copyBuffer(
......
...@@ -15,30 +15,41 @@ ...@@ -15,30 +15,41 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.scheduler.mesos package org.apache.spark.scheduler.cluster.mesos
import java.util.Date import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar import org.scalatest.mock.MockitoSugar
import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.Command import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.scheduler.cluster.mesos._
class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
private val command = new Command("mainClass", Seq("arg"), null, null, null, null) private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
private var scheduler: MesosClusterScheduler = _
test("can queue drivers") { override def beforeEach(): Unit = {
val conf = new SparkConf() val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050") conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos") conf.setAppName("spark mesos")
val scheduler = new MesosClusterScheduler( scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) { new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true } override def start(): Unit = { ready = true }
} }
scheduler.start() scheduler.start()
}
test("can queue drivers") {
val response = scheduler.submitDriver( val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true, new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date())) command, Map[String, String](), "s1", new Date()))
...@@ -54,14 +65,6 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi ...@@ -54,14 +65,6 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
} }
test("can kill queued drivers") { test("can kill queued drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
val scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true }
}
scheduler.start()
val response = scheduler.submitDriver( val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true, new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date())) command, Map[String, String](), "s1", new Date()))
...@@ -71,4 +74,66 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi ...@@ -71,4 +74,66 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val state = scheduler.getSchedulerState() val state = scheduler.getSchedulerState()
assert(state.queuedDrivers.isEmpty) assert(state.queuedDrivers.isEmpty)
} }
test("can handle multiple roles") {
val driver = mock[SchedulerDriver]
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
"s1",
new Date()))
assert(response.success)
val offer = Offer.newBuilder()
.addResources(
Resource.newBuilder().setRole("*")
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("*")
.setScalar(Scalar.newBuilder().setValue(1000).build())
.setName("mem")
.setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("role2")
.setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
.addResources(
Resource.newBuilder().setRole("role2")
.setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
.setId(OfferID.newBuilder().setValue("o1").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
.setSlaveId(SlaveID.newBuilder().setValue("s1").build())
.setHostname("host1")
.build()
val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
when(
driver.launchTasks(
Matchers.eq(Collections.singleton(offer.getId)),
capture.capture())
).thenReturn(Status.valueOf(1))
scheduler.resourceOffers(driver, Collections.singletonList(offer))
val taskInfos = capture.getValue
assert(taskInfos.size() == 1)
val taskInfo = taskInfos.iterator().next()
val resources = taskInfo.getResourcesList
assert(scheduler.getResource(resources, "cpus") == 1.5)
assert(scheduler.getResource(resources, "mem") == 1200)
val resourcesSeq: Seq[Resource] = resources.asScala
val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
assert(cpus.size == 2)
assert(cpus.exists(_.getRole().equals("role2")))
assert(cpus.exists(_.getRole().equals("*")))
val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
assert(mem.size == 2)
assert(mem.exists(_.getRole().equals("role2")))
assert(mem.exists(_.getRole().equals("*")))
verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(offer.getId)),
capture.capture()
)
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment