From acd208ee50b29bde4e097bf88761867b1d57a665 Mon Sep 17 00:00:00 2001
From: 10129659 <chen.yanshan@zte.com.cn>
Date: Fri, 23 Jun 2017 20:53:26 +0800
Subject: [PATCH] [SPARK-21115][CORE] If the cores left is less than the
 coresPerExecutor,the cores left will not be allocated, so it should not to
 check in every schedule

## What changes were proposed in this pull request?
If we start an app with the param --total-executor-cores=4 and spark.executor.cores=3, the cores left is always 1, so it will try to allocate executors in the function org.apache.spark.deploy.master.startExecutorsOnWorkers in every schedule.
Another question is, is it will be better to allocate another executor with 1 core for the cores left.

## How was this patch tested?
unit test

Author: 10129659 <chen.yanshan@zte.com.cn>

Closes #18322 from eatoncys/leftcores.
---
 .../scala/org/apache/spark/SparkConf.scala    | 11 +++++++
 .../apache/spark/deploy/master/Master.scala   | 29 ++++++++++---------
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index ba7a65f79c..de2f475c68 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -543,6 +543,17 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
       }
     }
 
+    if (contains("spark.cores.max") && contains("spark.executor.cores")) {
+      val totalCores = getInt("spark.cores.max", 1)
+      val executorCores = getInt("spark.executor.cores", 1)
+      val leftCores = totalCores % executorCores
+      if (leftCores != 0) {
+        logWarning(s"Total executor cores: ${totalCores} is not " +
+          s"divisible by cores per executor: ${executorCores}, " +
+          s"the left cores: ${leftCores} will not be allocated")
+      }
+    }
+
     val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED)
     require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
       s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c192a0cc82..0dee25fb2e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -659,19 +659,22 @@ private[deploy] class Master(
   private def startExecutorsOnWorkers(): Unit = {
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     // in the queue, then the second app, etc.
-    for (app <- waitingApps if app.coresLeft > 0) {
-      val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
-      // Filter out workers that don't have enough resources to launch an executor
-      val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
-        .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
-          worker.coresFree >= coresPerExecutor.getOrElse(1))
-        .sortBy(_.coresFree).reverse
-      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
-
-      // Now that we've decided how many cores to allocate on each worker, let's allocate them
-      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
-        allocateWorkerResourceToExecutors(
-          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
+    for (app <- waitingApps) {
+      val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+      // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
+      if (app.coresLeft >= coresPerExecutor) {
+        // Filter out workers that don't have enough resources to launch an executor
+        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
+          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+            worker.coresFree >= coresPerExecutor)
+          .sortBy(_.coresFree).reverse
+        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
+
+        // Now that we've decided how many cores to allocate on each worker, let's allocate them
+        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
+          allocateWorkerResourceToExecutors(
+            app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
+        }
       }
     }
   }
-- 
GitLab