diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index e6196194acbb4f05c7e6baf9dce353f2ca169c61..b32e15738f28bae356a3e601b49d4615ffea75e6 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -64,6 +64,8 @@ private[yarn] abstract class YarnAllocator( securityMgr: SecurityManager) extends Logging { + import YarnAllocator._ + // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set<containerid> // allocatedContainerToHostMap: container to host mapping. @@ -439,19 +441,6 @@ private[yarn] abstract class YarnAllocator( } } - private val MEM_REGEX = "[0-9.]+ [KMG]B" - private val PMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used") - private val VMEM_EXCEEDED_PATTERN = - Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used") - - def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = { - val matcher = pattern.matcher(diagnostics) - val diag = if (matcher.find()) " " + matcher.group() + "." else "" - ("Container killed by YARN for exceeding memory limits." + diag - + " Consider boosting spark.yarn.executor.memoryOverhead.") - } - protected def allocatedContainersOnHost(host: String): Int = { var retval = 0 allocatedHostToContainersMap.synchronized { @@ -532,3 +521,18 @@ private[yarn] abstract class YarnAllocator( } } + +private object YarnAllocator { + val MEM_REGEX = "[0-9.]+ [KMG]B" + val PMEM_EXCEEDED_PATTERN = + Pattern.compile(s"$MEM_REGEX of $MEM_REGEX physical memory used") + val VMEM_EXCEEDED_PATTERN = + Pattern.compile(s"$MEM_REGEX of $MEM_REGEX virtual memory used") + + def memLimitExceededLogMessage(diagnostics: String, pattern: Pattern): String = { + val matcher = pattern.matcher(diagnostics) + val diag = if (matcher.find()) " " + matcher.group() + "." else "" + ("Container killed by YARN for exceeding memory limits." + diag + + " Consider boosting spark.yarn.executor.memoryOverhead.") + } +} diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 9fff63fb251563ce648f2d88fe297c5eb3dea5f2..8d184a09d64cc6b9305dd6518a5037f714866992 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import org.apache.spark.deploy.yarn.MemLimitLogger._ +import org.apache.spark.deploy.yarn.YarnAllocator._ import org.scalatest.FunSuite class YarnAllocatorSuite extends FunSuite { @@ -31,4 +31,4 @@ class YarnAllocatorSuite extends FunSuite { assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used.")) assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used.")) } -} \ No newline at end of file +}