Skip to content
Snippets Groups Projects
Commit 76db394f authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Tom Graves
Browse files

[SPARK-18750][YARN] Avoid using "mapValues" when allocating containers.

That method is prone to stack overflows when the input map is really
large; instead, use plain "map". Also includes a unit test that was
tested and caused stack overflows without the fix.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16667 from vanzin/SPARK-18750.
parent 3fdce814
No related branches found
No related tags found
No related merge requests found
......@@ -129,9 +129,9 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
val largestRatio = updatedHostToContainerCount.values.max
// Round the ratio of preferred locality to the number of locality required container
// number, which is used for locality preferred host calculating.
var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>
val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
adjustedRatio.ceil.toInt
(k, adjustedRatio.ceil.toInt)
}
for (i <- 0 until requiredLocalityAwareContainerNum) {
......@@ -145,7 +145,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
// Minus 1 each time when the host is used. When the current ratio is 0,
// which means all the required ratio is satisfied, this host will not be allocated again.
preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
}
}
......@@ -218,7 +218,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
.toMap
pendingHostToContainerCount.map { case (k, v) =>
(k, v * localityMatchedPendingNum / possibleTotalContainerNum)
}.toMap
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn
import scala.collection.mutable.{HashMap, HashSet, Set}
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Mockito._
import org.apache.spark.{SparkConf, SparkFunSuite}
class LocalityPlacementStrategySuite extends SparkFunSuite {
test("handle large number of containers and tasks (SPARK-18750)") {
// Run the test in a thread with a small stack size, since the original issue
// surfaced as a StackOverflowError.
var error: Throwable = null
val runnable = new Runnable() {
override def run(): Unit = try {
runTest()
} catch {
case e: Throwable => error = e
}
}
val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
thread.start()
thread.join()
assert(error === null)
}
private def runTest(): Unit = {
val yarnConf = new YarnConfiguration()
yarnConf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[MockResolver], classOf[DNSToSwitchMapping])
// The numbers below have been chosen to balance being large enough to replicate the
// original issue while not taking too long to run when the issue is fixed. The main
// goal is to create enough requests for localized containers (so there should be many
// tasks on several hosts that have no allocated containers).
val resource = Resource.newInstance(8 * 1024, 4)
val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
yarnConf, resource)
val totalTasks = 32 * 1024
val totalContainers = totalTasks / 16
val totalHosts = totalContainers / 16
val mockId = mock(classOf[ContainerId])
val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap
val containers = (1 to totalContainers).map { i => mockId }
val count = containers.size / hosts.size / 2
val hostToContainerMap = new HashMap[String, Set[ContainerId]]()
hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) =>
val hostContainers = new HashSet[ContainerId]()
containers.drop(count * i).take(i).foreach { c => hostContainers += c }
hostToContainerMap(host) = hostContainers
}
strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
hostToContainerMap, Nil)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment