diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 64568409dbafdea3552051a994a54d1613d2abd1..3161f1ee9fa8af7814f6383329e2ff30135ec181 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -198,7 +198,9 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 && + if (totalCoresAcquired < maxCores && + mem >= MemoryUtils.calculateTotalMemory(sc) && + cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave @@ -214,7 +216,8 @@ private[spark] class CoarseMesosSchedulerBackend( .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", sc.executorMemory)) + .addResources(createResource("mem", + MemoryUtils.calculateTotalMemory(sc))) .build() d.launchTasks( Collections.singleton(offer.getId), Collections.singletonList(task), filters) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala new file mode 100644 index 0000000000000000000000000000000000000000..5101ec8352e791beb23077cd3d926b7ffb229be9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.spark.SparkContext + +private[spark] object MemoryUtils { + // These defaults copied from YARN + val OVERHEAD_FRACTION = 1.07 + val OVERHEAD_MINIMUM = 384 + + def calculateTotalMemory(sc: SparkContext) = { + math.max( + sc.conf.getOption("spark.mesos.executor.memoryOverhead") + .getOrElse(OVERHEAD_MINIMUM.toString) + .toInt + sc.executorMemory, + OVERHEAD_FRACTION * sc.executorMemory + ) + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index a9ef126f5de0e24d885867ca341b30b8580b50a7..4c49aa074ebc00db3e6022dc6caf8a0c9e834cac 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -124,15 +124,24 @@ private[spark] class MesosSchedulerBackend( command.setValue("cd %s*; ./sbin/spark-executor".format(basename)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } + val cpus = Resource.newBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder() + .setValue(scheduler.CPUS_PER_TASK).build()) + .build() val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build()) + .setScalar( + Value.Scalar.newBuilder() + .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) .build() ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) + .addResources(cpus) .addResources(memory) .build() } @@ -204,18 +213,31 @@ private[spark] class MesosSchedulerBackend( val offerableWorkers = new ArrayBuffer[WorkerOffer] val offerableIndices = new HashMap[String, Int] - def enoughMemory(o: Offer) = { + def sufficientOffer(o: Offer) = { val mem = getResource(o.getResourcesList, "mem") + val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId) + (mem >= MemoryUtils.calculateTotalMemory(sc) && + // need at least 1 for executor, 1 for task + cpus >= 2 * scheduler.CPUS_PER_TASK) || + (slaveIdsWithExecutors.contains(slaveId) && + cpus >= scheduler.CPUS_PER_TASK) } - for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { - offerableIndices.put(offer.getSlaveId.getValue, index) + for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) { + val slaveId = offer.getSlaveId.getValue + offerableIndices.put(slaveId, index) + val cpus = if (slaveIdsWithExecutors.contains(slaveId)) { + getResource(offer.getResourcesList, "cpus").toInt + } else { + // If the executor doesn't exist yet, subtract CPU for executor + getResource(offer.getResourcesList, "cpus").toInt - + scheduler.CPUS_PER_TASK + } offerableWorkers += new WorkerOffer( offer.getSlaveId.getValue, offer.getHostname, - getResource(offer.getResourcesList, "cpus").toInt) + cpus) } // Call into the TaskSchedulerImpl diff --git a/docs/configuration.md b/docs/configuration.md index a782809a55ec01f4a96dd35a762e82105cadfac1..1c338553651702bb458fb7ec6a47ac35783e1895 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -253,6 +253,17 @@ Apart from these, the following properties are also available, and may be useful <code>spark.executor.uri</code>. </td> </tr> +<tr> + <td><code>spark.mesos.executor.memoryOverhead</code></td> + <td>executor memory * 0.07, with minimum of 384</td> + <td> + This value is an additive for <code>spark.executor.memory</code>, specified in MiB, + which is used to calculate the total Mesos task memory. A value of <code>384</code> + implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum + overhead. The final overhead will be the larger of either + `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`. + </td> +</tr> </table> #### Shuffle Behavior