From f391ad2c82d01c84a1cb5e032a086cb120e7cee3 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin <vanzin@cloudera.com> Date: Wed, 25 Jan 2017 08:18:41 -0600 Subject: [PATCH] [SPARK-18750][YARN] Avoid using "mapValues" when allocating containers. That method is prone to stack overflows when the input map is really large; instead, use plain "map". Also includes a unit test that was tested and caused stack overflows without the fix. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16667 from vanzin/SPARK-18750. (cherry picked from commit 76db394f2baedc2c7b7a52c05314a64ec9068263) Signed-off-by: Tom Graves <tgraves@yahoo-inc.com> --- .../yarn/LocalityPlacementStrategySuite.scala | 87 +++++++++++++++++++ ...yPreferredContainerPlacementStrategy.scala | 11 +-- 2 files changed, 93 insertions(+), 5 deletions(-) create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala new file mode 100644 index 0000000000..fb80ff9f31 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.{HashMap, HashSet, Set} + +import org.apache.hadoop.fs.CommonConfigurationKeysPublic +import org.apache.hadoop.net.DNSToSwitchMapping +import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.Mockito._ + +import org.apache.spark.{SparkConf, SparkFunSuite} + +class LocalityPlacementStrategySuite extends SparkFunSuite { + + test("handle large number of containers and tasks (SPARK-18750)") { + // Run the test in a thread with a small stack size, since the original issue + // surfaced as a StackOverflowError. + var error: Throwable = null + + val runnable = new Runnable() { + override def run(): Unit = try { + runTest() + } catch { + case e: Throwable => error = e + } + } + + val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024) + thread.start() + thread.join() + + assert(error === null) + } + + private def runTest(): Unit = { + val yarnConf = new YarnConfiguration() + yarnConf.setClass( + CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, + classOf[MockResolver], classOf[DNSToSwitchMapping]) + + // The numbers below have been chosen to balance being large enough to replicate the + // original issue while not taking too long to run when the issue is fixed. The main + // goal is to create enough requests for localized containers (so there should be many + // tasks on several hosts that have no allocated containers). + + val resource = Resource.newInstance(8 * 1024, 4) + val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(), + yarnConf, resource) + + val totalTasks = 32 * 1024 + val totalContainers = totalTasks / 16 + val totalHosts = totalContainers / 16 + + val mockId = mock(classOf[ContainerId]) + val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap + val containers = (1 to totalContainers).map { i => mockId } + val count = containers.size / hosts.size / 2 + + val hostToContainerMap = new HashMap[String, Set[ContainerId]]() + hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) => + val hostContainers = new HashSet[ContainerId]() + containers.drop(count * i).take(i).foreach { c => hostContainers += c } + hostToContainerMap(host) = hostContainers + } + + strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts, + hostToContainerMap, Nil) + } + +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index 8772e26f43..db638d84c0 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -129,9 +129,9 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( val largestRatio = updatedHostToContainerCount.values.max // Round the ratio of preferred locality to the number of locality required container // number, which is used for locality preferred host calculating. - var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio => + var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) => val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio - adjustedRatio.ceil.toInt + (k, adjustedRatio.ceil.toInt) } for (i <- 0 until requiredLocalityAwareContainerNum) { @@ -145,7 +145,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( // Minus 1 each time when the host is used. When the current ratio is 0, // which means all the required ratio is satisfied, this host will not be allocated again. - preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1) + preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) } } } @@ -218,7 +218,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( val possibleTotalContainerNum = pendingHostToContainerCount.values.sum val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble - pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum) - .toMap + pendingHostToContainerCount.map { case (k, v) => + (k, v * localityMatchedPendingNum / possibleTotalContainerNum) + }.toMap } } -- GitLab