From b4fb7b80a0d863500943d788ad3e34d502a6dafa Mon Sep 17 00:00:00 2001 From: Nishkam Ravi <nravi@cloudera.com> Date: Thu, 2 Oct 2014 13:48:35 -0500 Subject: [PATCH] Modify default YARN memory_overhead-- from an additive constant to a multiplier Redone against the recent master branch (https://github.com/apache/spark/pull/1391) Author: Nishkam Ravi <nravi@cloudera.com> Author: nravi <nravi@c1704.halxg.cloudera.com> Author: nishkamravi2 <nishkamravi@gmail.com> Closes #2485 from nishkamravi2/master_nravi and squashes the following commits: 636a9ff [nishkamravi2] Update YarnAllocator.scala 8f76c8b [Nishkam Ravi] Doc change for yarn memory overhead 35daa64 [Nishkam Ravi] Slight change in the doc for yarn memory overhead 5ac2ec1 [Nishkam Ravi] Remove out dac1047 [Nishkam Ravi] Additional documentation for yarn memory overhead issue 42c2c3d [Nishkam Ravi] Additional changes for yarn memory overhead issue 362da5e [Nishkam Ravi] Additional changes for yarn memory overhead c726bd9 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi f00fa31 [Nishkam Ravi] Improving logging for AM memoryOverhead 1cf2d1e [nishkamravi2] Update YarnAllocator.scala ebcde10 [Nishkam Ravi] Modify default YARN memory_overhead-- from an additive constant to a multiplier (redone to resolve merge conflicts) 2e69f11 [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark into master_nravi efd688a [Nishkam Ravi] Merge branch 'master' of https://github.com/apache/spark 2b630f9 [nravi] Accept memory input as "30g", "512M" instead of an int value, to be consistent with rest of Spark 3bf8fad [nravi] Merge branch 'master' of https://github.com/apache/spark 5423a03 [nravi] Merge branch 'master' of https://github.com/apache/spark eb663ca [nravi] Merge branch 'master' of https://github.com/apache/spark df2aeb1 [nravi] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) 6b840f0 [nravi] Undo the fix for SPARK-1758 (the problem is fixed) 5108700 [nravi] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) 681b36f [nravi] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles --- docs/running-on-yarn.md | 8 ++++---- .../spark/deploy/yarn/ClientArguments.scala | 16 +++++++++------- .../apache/spark/deploy/yarn/ClientBase.scala | 12 ++++++++---- .../apache/spark/deploy/yarn/YarnAllocator.scala | 16 ++++++++-------- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 8 ++++++-- 5 files changed, 35 insertions(+), 25 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4b3a49eca7..695813a2ba 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -79,16 +79,16 @@ Most of the configs are the same for Spark on YARN as for other deployment modes </tr> <tr> <td><code>spark.yarn.executor.memoryOverhead</code></td> - <td>384</td> + <td>executorMemory * 0.07, with minimum of 384 </td> <td> - The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). </td> </tr> <tr> <td><code>spark.yarn.driver.memoryOverhead</code></td> - <td>384</td> + <td>driverMemory * 0.07, with minimum of 384 </td> <td> - The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. + The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). </td> </tr> <tr> diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 26dbd6237c..a12f82d2fb 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf import org.apache.spark.util.{Utils, IntParam, MemoryParam} - +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { @@ -39,15 +39,17 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) var appName: String = "Spark" var priority = 0 + parseArgs(args.toList) + loadEnvironmentArgs() + // Additional memory to allocate to containers // For now, use driver's memory overhead as our AM container's memory overhead - val amMemoryOverhead = sparkConf.getInt( - "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) - val executorMemoryOverhead = sparkConf.getInt( - "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) + val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead", + math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN)) + + val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) - parseArgs(args.toList) - loadEnvironmentArgs() validateArgs() /** Load any default arguments provided through environment variables and Spark properties. */ diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 1cf19c1985..6ecac6eae6 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -64,14 +64,18 @@ private[spark] trait ClientBase extends Logging { s"memory capability of the cluster ($maxMem MB per container)") val executorMem = args.executorMemory + executorMemoryOverhead if (executorMem > maxMem) { - throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " + - s"is above the max threshold ($maxMem MB) of this cluster!") + throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" + + s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } val amMem = args.amMemory + amMemoryOverhead if (amMem > maxMem) { - throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " + - s"is above the max threshold ($maxMem MB) of this cluster!") + throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" + + s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!") } + logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format( + amMem, + amMemoryOverhead)) + // We could add checks to make sure the entire cluster has enough resources but that involves // getting all the node reports and computing ourselves. } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 299e38a5eb..4f4f1d2aaa 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ object AllocationType extends Enumeration { type AllocationType = Value @@ -78,10 +79,6 @@ private[yarn] abstract class YarnAllocator( // Containers to be released in next request to RM private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean] - // Additional memory overhead - in mb. - protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", - YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) - // Number of container requests that have been sent to, but not yet allocated by the // ApplicationMaster. private val numPendingAllocate = new AtomicInteger() @@ -97,6 +94,10 @@ private[yarn] abstract class YarnAllocator( protected val (preferredHostToCount, preferredRackToCount) = generateNodeToWeight(conf, preferredNodes) + // Additional memory overhead - in mb. + protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead", + math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)) + private val launcherPool = new ThreadPoolExecutor( // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE, @@ -114,12 +115,11 @@ private[yarn] abstract class YarnAllocator( // this is needed by alpha, do it here since we add numPending right after this val executorsPending = numPendingAllocate.get() - if (missing > 0) { + val totalExecutorMemory = executorMemory + memoryOverhead numPendingAllocate.addAndGet(missing) - logInfo("Will Allocate %d executor containers, each with %d memory".format( - missing, - (executorMemory + memoryOverhead))) + logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + + s"memory including $memoryOverhead MB overhead") } else { logDebug("Empty allocation request ...") } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 0b712c2019..e1e0144f46 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -84,8 +84,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { } object YarnSparkHadoopUtil { - // Additional memory overhead - in mb. - val DEFAULT_MEMORY_OVERHEAD = 384 + // Additional memory overhead + // 7% was arrived at experimentally. In the interest of minimizing memory waste while covering + // the common cases. Memory overhead tends to grow with container size. + + val MEMORY_OVERHEAD_FACTOR = 0.07 + val MEMORY_OVERHEAD_MIN = 384 val ANY_HOST = "*" -- GitLab