Skip to content
Snippets Groups Projects
Unverified Commit 29f186bf authored by Timothy Chen's avatar Timothy Chen Committed by Sean Owen
Browse files

[SPARK-14082][MESOS] Enable GPU support with Mesos

## What changes were proposed in this pull request?

Enable GPU resources to be used when running coarse grain mode with Mesos.

## How was this patch tested?

Manual test with GPU.

Author: Timothy Chen <tnachen@gmail.com>

Closes #14644 from tnachen/gpu_mesos.
parent 3f8a0222
No related branches found
No related tags found
No related merge requests found
......@@ -498,6 +498,15 @@ See the [configuration page](configuration.html) for information on Spark config
in the history server.
</td>
</tr>
<tr>
<td><code>spark.mesos.gpus.max</code></td>
<td><code>0</code></td>
<td>
Set the maximum number GPU resources to acquire for this job. Note that executors will still launch when no GPU resources are found
since this configuration is just a upper limit and not a guaranteed amount.
</td>
</tr>
</table>
......
......@@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
......@@ -72,7 +74,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Cores we have acquired with each Mesos task ID
val coresByTaskId = new mutable.HashMap[String, Int]
val gpusByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0
var totalGpusAcquired = 0
// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
......@@ -396,6 +400,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
launchTasks = true
val taskId = newMesosTaskId()
val offerCPUs = getResource(resources, "cpus").toInt
val taskGPUs = Math.min(
Math.max(0, maxGpus - totalGpusAcquired), getResource(resources, "gpus").toInt)
val taskCPUs = executorCores(offerCPUs)
val taskMemory = executorMemory(sc)
......@@ -403,7 +409,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
val (resourcesLeft, resourcesToUse) =
partitionTaskResources(resources, taskCPUs, taskMemory)
partitionTaskResources(resources, taskCPUs, taskMemory, taskGPUs)
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
......@@ -425,6 +431,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
coresByTaskId(taskId) = taskCPUs
if (taskGPUs > 0) {
totalGpusAcquired += taskGPUs
gpusByTaskId(taskId) = taskGPUs
}
}
}
}
......@@ -432,21 +442,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
private def partitionTaskResources(
resources: JList[Resource],
taskCPUs: Int,
taskMemory: Int,
taskGPUs: Int)
: (List[Resource], List[Resource]) = {
// partition cpus & mem
val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
val (afterMemResources, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
val (afterGPUResources, gpuResourcesToUse) =
partitionResources(afterMemResources.asJava, "gpus", taskGPUs)
// If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
// on the same host. This essentially means one executor per host.
// TODO: handle network isolator case
val (nonPortResources, portResourcesToUse) =
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterGPUResources)
(nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
(nonPortResources,
cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse ++ gpuResourcesToUse)
}
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
......@@ -513,6 +530,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
totalCoresAcquired -= cores
coresByTaskId -= taskId
}
// Also remove the gpus we have remembered for this task, if it's in the hashmap
for (gpus <- gpusByTaskId.get(taskId)) {
totalGpusAcquired -= gpus
gpusByTaskId -= taskId
}
// If it was a failure, mark the slave as failed for blacklisting purposes
if (TaskState.isFailed(state)) {
slave.taskFailures += 1
......
......@@ -27,6 +27,7 @@ import scala.util.control.NonFatal
import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.FrameworkInfo.Capability
import org.apache.mesos.protobuf.{ByteString, GeneratedMessage}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
......@@ -93,6 +94,10 @@ trait MesosSchedulerUtils extends Logging {
conf.getOption("spark.mesos.role").foreach { role =>
fwInfoBuilder.setRole(role)
}
val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
if (maxGpus > 0) {
fwInfoBuilder.addCapabilities(Capability.newBuilder().setType(Capability.Type.GPU_RESOURCES))
}
if (credBuilder.hasPrincipal) {
new MesosSchedulerDriver(
scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
......
......@@ -67,7 +67,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val minMem = backend.executorMemory(sc)
val minCpu = 4
val offers = List((minMem, minCpu))
val offers = List(Resources(minMem, minCpu))
// launches a task on a valid offer
offerResources(offers)
......@@ -95,8 +95,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// launches a task on a valid offer
val minMem = backend.executorMemory(sc) + 1024
val minCpu = 4
val offer1 = (minMem, minCpu)
val offer2 = (minMem, 1)
val offer1 = Resources(minMem, minCpu)
val offer2 = Resources(minMem, 1)
offerResources(List(offer1, offer2))
verifyTaskLaunched(driver, "o1")
......@@ -115,7 +115,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map("spark.executor.cores" -> executorCores.toString))
val executorMemory = backend.executorMemory(sc)
val offers = List((executorMemory * 2, executorCores + 1))
val offers = List(Resources(executorMemory * 2, executorCores + 1))
offerResources(offers)
val taskInfos = verifyTaskLaunched(driver, "o1")
......@@ -130,7 +130,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
val offerCores = 10
offerResources(List((executorMemory * 2, offerCores)))
offerResources(List(Resources(executorMemory * 2, offerCores)))
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
......@@ -144,7 +144,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend(Map("spark.cores.max" -> maxCores.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List((executorMemory, maxCores + 1)))
offerResources(List(Resources(executorMemory, maxCores + 1)))
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
......@@ -153,9 +153,38 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(cpus == maxCores)
}
test("mesos does not acquire gpus if not specified") {
setBackend()
val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, 1)))
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
assert(gpus == 0.0)
}
test("mesos does not acquire more than spark.mesos.gpus.max") {
val maxGpus = 5
setBackend(Map("spark.mesos.gpus.max" -> maxGpus.toString))
val executorMemory = backend.executorMemory(sc)
offerResources(List(Resources(executorMemory, 1, maxGpus + 1)))
val taskInfos = verifyTaskLaunched(driver, "o1")
assert(taskInfos.length == 1)
val gpus = backend.getResource(taskInfos.head.getResourcesList, "gpus")
assert(gpus == maxGpus)
}
test("mesos declines offers that violate attribute constraints") {
setBackend(Map("spark.mesos.constraints" -> "x:true"))
offerResources(List((backend.executorMemory(sc), 4)))
offerResources(List(Resources(backend.executorMemory(sc), 4)))
verifyDeclinedOffer(driver, createOfferId("o1"), true)
}
......@@ -165,8 +194,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))
Resources(executorMemory, maxCores + 1),
Resources(executorMemory, maxCores + 1)))
verifyTaskLaunched(driver, "o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
......@@ -180,8 +209,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
offerResources(List(
(executorMemory * 2, executorCores * 2),
(executorMemory * 2, executorCores * 2)))
Resources(executorMemory * 2, executorCores * 2),
Resources(executorMemory * 2, executorCores * 2)))
verifyTaskLaunched(driver, "o1")
verifyTaskLaunched(driver, "o2")
......@@ -193,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// offer with room for two executors
val executorMemory = backend.executorMemory(sc)
offerResources(List((executorMemory * 2, executorCores * 2)))
offerResources(List(Resources(executorMemory * 2, executorCores * 2)))
// verify two executors were started on a single offer
val taskInfos = verifyTaskLaunched(driver, "o1")
......@@ -397,7 +426,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
setBackend()
// launches a task on a valid offer
val offers = List((backend.executorMemory(sc), 1))
val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
verifyTaskLaunched(driver, "o1")
......@@ -434,6 +463,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
}
private case class Resources(mem: Int, cpus: Int, gpus: Int = 0)
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
......@@ -444,9 +475,9 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
}
}
private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
private def offerResources(offers: List[Resources], startId: Int = 1): Unit = {
val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
createOffer(s"o${i + startId}", s"s${i + startId}", offer.mem, offer.cpus, None, offer.gpus)}
backend.resourceOffers(driver, mesosOffers.asJava)
}
......
......@@ -32,8 +32,9 @@ object Utils {
offerId: String,
slaveId: String,
mem: Int,
cpu: Int,
ports: Option[(Long, Long)] = None): Offer = {
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
......@@ -42,7 +43,7 @@ object Utils {
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
.setScalar(Scalar.newBuilder().setValue(cpus))
ports.foreach { resourcePorts =>
builder.addResourcesBuilder()
.setName("ports")
......@@ -50,6 +51,12 @@ object Utils {
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
}
if (gpus > 0) {
builder.addResourcesBuilder()
.setName("gpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(gpus))
}
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
......@@ -82,4 +89,3 @@ object Utils {
TaskID.newBuilder().setValue(taskId).build()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment