Skip to content
Snippets Groups Projects
Commit 2eeada37 authored by Sandy Ryza's avatar Sandy Ryza Committed by Thomas Graves
Browse files

SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnA...

...llocator

The goal of this PR is to simplify YarnAllocator as much as possible and get it up to the level of code quality we see in the rest of Spark.

In service of this, it does a few things:
* Uses AMRMClient APIs for matching containers to requests.
* Adds calls to AMRMClient.removeContainerRequest so that, when we use a container, we don't end up requesting it again.
* Removes YarnAllocator's host->rack cache. YARN's RackResolver already does this caching, so this is redundant.
* Adds tests for basic YarnAllocator functionality.
* Breaks up the allocateResources method, which was previously nearly 300 lines.
* A little bit of stylistic cleanup.
* Fixes a bug that causes three times the requests to be filed when preferred host locations are given.

The patch is lossy. In particular, it loses the logic for trying to avoid containers bunching up on nodes. As I understand it, the logic that's gone is:

* If, in a single response from the RM, we receive a set of containers on a node, and prefer some number of containers on that node greater than 0 but less than the number we received, give back the delta between what we preferred and what we received.

This seems like a weird way to avoid bunching E.g. it does nothing to avoid bunching when we don't request containers on particular nodes.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #3765 from sryza/sandy-spark-1714 and squashes the following commits:

32a5942 [Sandy Ryza] Muffle RackResolver logs
74f56dd [Sandy Ryza] Fix a couple comments and simplify requestTotalExecutors
60ea4bd [Sandy Ryza] Fix scalastyle
ca35b53 [Sandy Ryza] Simplify further
e9cf8a6 [Sandy Ryza] Fix YarnClusterSuite
257acf3 [Sandy Ryza] Remove locality stuff and more cleanup
59a3c5e [Sandy Ryza] Take out rack stuff
5f72fd5 [Sandy Ryza] Further documentation and cleanup
89edd68 [Sandy Ryza] SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnAllocator
parent 8c06a5fa
No related branches found
No related tags found
No related merge requests found
......@@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
......@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations, securityMgr)
new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
}
/**
......
......@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
......@@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val RM_REQUEST_PRIORITY = 1
// Host to rack map - saved from allocation requests. We are expecting this not to change.
// Note that it is possible for this to change : and ResourceManager will indicate that to us via
// update response to allocate. But we are punting on handling that for now.
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
val RM_REQUEST_PRIORITY = Priority.newInstance(1)
/**
* Add a path variable to the given environment map.
......@@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
}
}
def lookupRack(conf: Configuration, host: String): String = {
if (!hostToRack.contains(host)) {
populateRackInfo(conf, host)
}
hostToRack.get(host)
}
def populateRackInfo(conf: Configuration, hostname: String) {
Utils.checkHost(hostname)
if (!hostToRack.containsKey(hostname)) {
// If there are repeated failures to resolve, all to an ignore list.
val rackInfo = RackResolver.resolve(conf, hostname)
if (rackInfo != null && rackInfo.getNetworkLocation != null) {
val rack = rackInfo.getNetworkLocation
hostToRack.put(hostname, rack)
if (! rackToHostSet.containsKey(rack)) {
rackToHostSet.putIfAbsent(rack,
Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
}
rackToHostSet.get(rack).add(hostname)
// TODO(harvey): Figure out what this comment means...
// Since RackResolver caches, we are disabling this for now ...
} /* else {
// right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
hostToRack.put(hostname, null)
} */
}
}
def getApplicationAclsForYarn(securityMgr: SecurityManager)
: Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
......
......@@ -17,8 +17,9 @@
package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.spark._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
......@@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSc
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}
......@@ -17,8 +17,10 @@
package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.deploy.yarn.ApplicationMaster
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
......@@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedule
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
override def postStartHook() {
......
......@@ -17,18 +17,160 @@
package org.apache.spark.deploy.yarn
import java.util.{Arrays, List => JList}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.net.DNSToSwitchMapping
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.spark.SecurityManager
import org.apache.spark.SparkConf
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
import org.scalatest.FunSuite
import org.apache.spark.scheduler.SplitInfo
import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
class MockResolver extends DNSToSwitchMapping {
override def resolve(names: JList[String]): JList[String] = {
if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
else Arrays.asList("/rack1")
}
override def reloadCachedMappings() {}
def reloadCachedMappings(names: JList[String]) {}
}
class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach {
val conf = new Configuration()
conf.setClass(
CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
classOf[MockResolver], classOf[DNSToSwitchMapping])
val sparkConf = new SparkConf()
sparkConf.set("spark.driver.host", "localhost")
sparkConf.set("spark.driver.port", "4040")
sparkConf.set("spark.yarn.jar", "notarealjar.jar")
sparkConf.set("spark.yarn.launchContainers", "false")
val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
// Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores
// instead of the 5 requested and 3 GB instead of the 2 requested.
val containerResource = Resource.newInstance(3072, 6)
var rmClient: AMRMClient[ContainerRequest] = _
var containerNum = 0
override def beforeEach() {
rmClient = AMRMClient.createAMRMClient()
rmClient.init(conf)
rmClient.start()
}
override def afterEach() {
rmClient.stop()
}
class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
override def equals(other: Any) = false
}
def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
val args = Array(
"--num-executors", s"$maxExecutors",
"--executor-cores", "5",
"--executor-memory", "2048",
"--jar", "somejar.jar",
"--class", "SomeClass")
new YarnAllocator(
conf,
sparkConf,
rmClient,
appAttemptId,
new ApplicationMasterArguments(args),
new SecurityManager(sparkConf))
}
def createContainer(host: String): Container = {
val containerId = ContainerId.newInstance(appAttemptId, containerNum)
containerNum += 1
val nodeId = NodeId.newInstance(host, 1000)
Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
}
test("single container allocated") {
// request a single container and receive it
val handler = createAllocator()
handler.addResourceRequests(1)
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (1)
val container = createContainer("host1")
handler.handleAllocatedContainers(Array(container))
handler.getNumExecutorsRunning should be (1)
handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size should be (0)
}
test("some containers allocated") {
// request a few containers and receive some of them
val handler = createAllocator()
handler.addResourceRequests(4)
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (4)
val container1 = createContainer("host1")
val container2 = createContainer("host1")
val container3 = createContainer("host2")
handler.handleAllocatedContainers(Array(container1, container2, container3))
handler.getNumExecutorsRunning should be (3)
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
}
test("receive more containers than requested") {
val handler = createAllocator(2)
handler.addResourceRequests(2)
handler.getNumExecutorsRunning should be (0)
handler.getNumPendingAllocate should be (2)
val container1 = createContainer("host1")
val container2 = createContainer("host2")
val container3 = createContainer("host4")
handler.handleAllocatedContainers(Array(container1, container2, container3))
handler.getNumExecutorsRunning should be (2)
handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
handler.allocatedHostToContainersMap.contains("host4") should be (false)
}
class YarnAllocatorSuite extends FunSuite {
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
"beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
"5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment