From 29bd251dd5914fc3b6146eb4fe0b45f1c84dba62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=A8=E6=B2=BB=E5=9B=BD10192065?= <yang.zhiguo@zte.com.cn> Date: Thu, 29 Jun 2017 20:53:48 +0800 Subject: [PATCH] [SPARK-21225][CORE] Considering CPUS_PER_TASK when allocating task slots for each WorkerOffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JIRA Issue:https://issues.apache.org/jira/browse/SPARK-21225 In the function "resourceOffers", It declare a variable "tasks" for storage the tasks which have allocated a executor. It declared like this: `val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))` But, I think this code only conside a situation for that one task per core. If the user set "spark.task.cpus" as 2 or 3, It really don't need so much Mem. I think It can motify as follow: val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) to instead. Motify like this the other earning is that it's more easy to understand the way how the tasks allocate offers. Author: æ¨æ²»å›½10192065 <yang.zhiguo@zte.com.cn> Closes #18435 from JackYangzg/motifyTaskCoreDisp. --- .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 91ec172ffe..737b383631 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl( val shuffledOffers = shuffleOffers(filteredOffers) // Build a list of tasks to assign to each worker. - val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) + val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { -- GitLab