Skip to content
Snippets Groups Projects
Commit 6f150978 authored by Brenden Matthews's avatar Brenden Matthews Committed by Andrew Or
Browse files

[SPARK-3535][Mesos] Fix resource handling.


Author: Brenden Matthews <brenden@diddyinc.com>

Closes #2401 from brndnmtthws/master and squashes the following commits:

4abaa5d [Brenden Matthews] [SPARK-3535][Mesos] Fix resource handling.

(cherry picked from commit a8c52d53)
Signed-off-by: default avatarAndrew Or <andrewor14@gmail.com>
parent d5af9e16
No related branches found
No related tags found
No related merge requests found
......@@ -198,7 +198,9 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
if (totalCoresAcquired < maxCores &&
mem >= MemoryUtils.calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
......@@ -214,7 +216,8 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", sc.executorMemory))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
.build()
d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task), filters)
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster.mesos
import org.apache.spark.SparkContext
private[spark] object MemoryUtils {
// These defaults copied from YARN
val OVERHEAD_FRACTION = 1.07
val OVERHEAD_MINIMUM = 384
def calculateTotalMemory(sc: SparkContext) = {
math.max(
sc.conf.getOption("spark.mesos.executor.memoryOverhead")
.getOrElse(OVERHEAD_MINIMUM.toString)
.toInt + sc.executorMemory,
OVERHEAD_FRACTION * sc.executorMemory
)
}
}
......@@ -124,15 +124,24 @@ private[spark] class MesosSchedulerBackend(
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val cpus = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(scheduler.CPUS_PER_TASK).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
.setScalar(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(cpus)
.addResources(memory)
.build()
}
......@@ -204,18 +213,31 @@ private[spark] class MesosSchedulerBackend(
val offerableWorkers = new ArrayBuffer[WorkerOffer]
val offerableIndices = new HashMap[String, Int]
def enoughMemory(o: Offer) = {
def sufficientOffer(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
cpus >= 2 * scheduler.CPUS_PER_TASK) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
}
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
offerableIndices.put(offer.getSlaveId.getValue, index)
for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) {
val slaveId = offer.getSlaveId.getValue
offerableIndices.put(slaveId, index)
val cpus = if (slaveIdsWithExecutors.contains(slaveId)) {
getResource(offer.getResourcesList, "cpus").toInt
} else {
// If the executor doesn't exist yet, subtract CPU for executor
getResource(offer.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK
}
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
getResource(offer.getResourcesList, "cpus").toInt)
cpus)
}
// Call into the TaskSchedulerImpl
......
......@@ -224,6 +224,17 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.executor.uri</code>.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.memoryOverhead</code></td>
<td>executor memory * 0.07, with minimum of 384</td>
<td>
This value is an additive for <code>spark.executor.memory</code>, specified in MiB,
which is used to calculate the total Mesos task memory. A value of <code>384</code>
implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum
overhead. The final overhead will be the larger of either
`spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`.
</td>
</tr>
</table>
#### Shuffle Behavior
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment