Skip to content
Snippets Groups Projects
Commit acd208ee authored by 10129659's avatar 10129659 Committed by Wenchen Fan
Browse files

[SPARK-21115][CORE] If the cores left is less than the coresPerExecutor,the...

[SPARK-21115][CORE] If the cores left is less than the coresPerExecutor,the cores left will not be allocated, so it should not to check in every schedule

## What changes were proposed in this pull request?
If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the cores left is always 1, so it will try to allocate executors in the function org.apache.spark.deploy.master.startExecutorsOnWorkers in every schedule.
Another question is, is it will be better to allocate another executor with 1 core for the cores left.

## How was this patch tested?
unit test

Author: 10129659 <chen.yanshan@zte.com.cn>

Closes #18322 from eatoncys/leftcores.
parent 153dd49b
No related branches found
No related tags found
No related merge requests found
......@@ -543,6 +543,17 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}
if (contains("spark.cores.max") && contains("spark.executor.cores")) {
val totalCores = getInt("spark.cores.max", 1)
val executorCores = getInt("spark.executor.cores", 1)
val leftCores = totalCores % executorCores
if (leftCores != 0) {
logWarning(s"Total executor cores: ${totalCores} is not " +
s"divisible by cores per executor: ${executorCores}, " +
s"the left cores: ${leftCores} will not be allocated")
}
}
val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
......
......@@ -659,19 +659,22 @@ private[deploy] class Master(
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
for (app <- waitingApps) {
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor)
.sortBy(_.coresFree).reverse
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment