diff --git a/core/src/main/resources/org/apache/spark/log4j-defaults.properties b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
index 89eec7d4b7f615f98348b805093f945e208ce757..c99a61f63ea2b50a043d9d99dff728583f155c5f 100644
--- a/core/src/main/resources/org/apache/spark/log4j-defaults.properties
+++ b/core/src/main/resources/org/apache/spark/log4j-defaults.properties
@@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
+log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index de65ef23ad1cee2f84d90e5045c1f5d443e258c1..4c35b60c57df3fbfd55e14d5062bf503f1db5fe1 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.Collections
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
 import java.util.regex.Pattern
 
 import scala.collection.JavaConversions._
@@ -28,33 +28,26 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.yarn.util.RackResolver
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 
-object AllocationType extends Enumeration {
-  type AllocationType = Value
-  val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
 /**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
+ * what to do with containers when YARN fulfills these requests.
+ *
+ * This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
+ * * Making our resource needs known, which updates local bookkeeping about containers requested.
+ * * Calling "allocate", which syncs our local container requests with the RM, and returns any
+ *   containers that YARN has granted to us.  This also functions as a heartbeat.
+ * * Processing the containers granted to us to possibly launch executors inside of them.
+ *
+ * The public methods of this class are thread-safe.  All methods that mutate state are
+ * synchronized.
  */
 private[yarn] class YarnAllocator(
     conf: Configuration,
@@ -62,50 +55,42 @@ private[yarn] class YarnAllocator(
     amClient: AMRMClient[ContainerRequest],
     appAttemptId: ApplicationAttemptId,
     args: ApplicationMasterArguments,
-    preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
     securityMgr: SecurityManager)
   extends Logging {
 
   import YarnAllocator._
 
-  // These three are locked on allocatedHostToContainersMap. Complementary data structures
-  // allocatedHostToContainersMap : containers which are running : host, Set<ContainerId>
-  // allocatedContainerToHostMap: container to host mapping.
-  private val allocatedHostToContainersMap =
-    new HashMap[String, collection.mutable.Set[ContainerId]]()
+  // These two complementary data structures are locked on allocatedHostToContainersMap.
+  // Visible for testing.
+  val allocatedHostToContainersMap =
+    new HashMap[String, collection.mutable.Set[ContainerId]]
+  val allocatedContainerToHostMap = new HashMap[ContainerId, String]
 
-  private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+  // Containers that we no longer care about. We've either already told the RM to release them or
+  // will on the next heartbeat. Containers get removed from this map after the RM tells us they've
+  // completed.
+  private val releasedContainers = Collections.newSetFromMap[ContainerId](
+    new ConcurrentHashMap[ContainerId, java.lang.Boolean])
 
-  // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
-  // allocated node)
-  // As with the two data structures above, tightly coupled with them, and to be locked on
-  // allocatedHostToContainersMap
-  private val allocatedRackCount = new HashMap[String, Int]()
+  @volatile private var numExecutorsRunning = 0
+  // Used to generate a unique ID per executor
+  private var executorIdCounter = 0
+  @volatile private var numExecutorsFailed = 0
 
-  // Containers to be released in next request to RM
-  private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
-  // Number of container requests that have been sent to, but not yet allocated by the
-  // ApplicationMaster.
-  private val numPendingAllocate = new AtomicInteger()
-  private val numExecutorsRunning = new AtomicInteger()
-  // Used to generate a unique id per executor
-  private val executorIdCounter = new AtomicInteger()
-  private val numExecutorsFailed = new AtomicInteger()
-
-  private var maxExecutors = args.numExecutors
+  @volatile private var maxExecutors = args.numExecutors
 
   // Keep track of which container is running which executor to remove the executors later
   private val executorIdToContainer = new HashMap[String, Container]
 
+  // Executor memory in MB.
   protected val executorMemory = args.executorMemory
-  protected val executorCores = args.executorCores
-  protected val (preferredHostToCount, preferredRackToCount) =
-    generateNodeToWeight(conf, preferredNodes)
-
-  // Additional memory overhead - in mb.
+  // Additional memory overhead.
   protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
     math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+  // Number of cores per executor.
+  protected val executorCores = args.executorCores
+  // Resource capability requested for each executors
+  private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
 
   private val launcherPool = new ThreadPoolExecutor(
     // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
@@ -115,26 +100,34 @@ private[yarn] class YarnAllocator(
     new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
   launcherPool.allowCoreThreadTimeOut(true)
 
-  def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+  private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
+    sparkConf.get("spark.driver.host"),
+    sparkConf.get("spark.driver.port"),
+    CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+  // For testing
+  private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
 
-  def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+  def getNumExecutorsRunning: Int = numExecutorsRunning
+
+  def getNumExecutorsFailed: Int = numExecutorsFailed
+
+  /**
+   * Number of container requests that have not yet been fulfilled.
+   */
+  def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+
+  /**
+   * Number of container requests at the given location that have not yet been fulfilled.
+   */
+  private def getNumPendingAtLocation(location: String): Int =
+    amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum
 
   /**
    * Request as many executors from the ResourceManager as needed to reach the desired total.
-   * This takes into account executors already running or pending.
    */
   def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
-    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
-    if (requestedTotal > currentTotal) {
-      maxExecutors += (requestedTotal - currentTotal)
-      // We need to call `allocateResources` here to avoid the following race condition:
-      // If we request executors twice before `allocateResources` is called, then we will end up
-      // double counting the number requested because `numPendingAllocate` is not updated yet.
-      allocateResources()
-    } else {
-      logInfo(s"Not allocating more executors because there are already $currentTotal " +
-        s"(application requested $requestedTotal total)")
-    }
+    maxExecutors = requestedTotal
   }
 
   /**
@@ -144,7 +137,7 @@ private[yarn] class YarnAllocator(
     if (executorIdToContainer.contains(executorId)) {
       val container = executorIdToContainer.remove(executorId).get
       internalReleaseContainer(container)
-      numExecutorsRunning.decrementAndGet()
+      numExecutorsRunning -= 1
       maxExecutors -= 1
       assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
     } else {
@@ -153,498 +146,236 @@ private[yarn] class YarnAllocator(
   }
 
   /**
-   * Allocate missing containers based on the number of executors currently pending and running.
+   * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
+   * equal to maxExecutors.
    *
-   * This method prioritizes the allocated container responses from the RM based on node and
-   * rack locality. Additionally, it releases any extra containers allocated for this application
-   * but are not needed. This must be synchronized because variables read in this block are
-   * mutated by other methods.
+   * Deal with any containers YARN has granted to us by possibly launching executors in them.
+   *
+   * This must be synchronized because variables read in this method are mutated by other methods.
    */
   def allocateResources(): Unit = synchronized {
-    val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+    val numPendingAllocate = getNumPendingAllocate
+    val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
 
     if (missing > 0) {
-      val totalExecutorMemory = executorMemory + memoryOverhead
-      numPendingAllocate.addAndGet(missing)
-      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
-        s"memory including $memoryOverhead MB overhead")
-    } else {
-      logDebug("Empty allocation request ...")
+      logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
+        s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
     }
 
-    val allocateResponse = allocateContainers(missing)
+    addResourceRequests(missing)
+    val progressIndicator = 0.1f
+    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
+    // requests.
+    val allocateResponse = amClient.allocate(progressIndicator)
+
     val allocatedContainers = allocateResponse.getAllocatedContainers()
 
     if (allocatedContainers.size > 0) {
-      var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
-      if (numPendingAllocateNow < 0) {
-        numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
-      }
-
-      logDebug("""
-        Allocated containers: %d
-        Current executor count: %d
-        Containers released: %s
-        Cluster resources: %s
-        """.format(
+      logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
+        .format(
           allocatedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainers,
+          numExecutorsRunning,
           allocateResponse.getAvailableResources))
 
-      val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (container <- allocatedContainers) {
-        if (isResourceConstraintSatisfied(container)) {
-          // Add the accepted `container` to the host's list of already accepted,
-          // allocated containers
-          val host = container.getNodeId.getHost
-          val containersForHost = hostToContainers.getOrElseUpdate(host,
-            new ArrayBuffer[Container]())
-          containersForHost += container
-        } else {
-          // Release container, since it doesn't satisfy resource constraints.
-          internalReleaseContainer(container)
-        }
-      }
-
-       // Find the appropriate containers to use.
-      // TODO: Cleanup this group-by...
-      val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
-      val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
-      for (candidateHost <- hostToContainers.keySet) {
-        val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
-        val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
-        val remainingContainersOpt = hostToContainers.get(candidateHost)
-        assert(remainingContainersOpt.isDefined)
-        var remainingContainers = remainingContainersOpt.get
-
-        if (requiredHostCount >= remainingContainers.size) {
-          // Since we have <= required containers, add all remaining containers to
-          // `dataLocalContainers`.
-          dataLocalContainers.put(candidateHost, remainingContainers)
-          // There are no more free containers remaining.
-          remainingContainers = null
-        } else if (requiredHostCount > 0) {
-          // Container list has more containers than we need for data locality.
-          // Split the list into two: one based on the data local container count,
-          // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
-          // containers.
-          val (dataLocal, remaining) = remainingContainers.splitAt(
-            remainingContainers.size - requiredHostCount)
-          dataLocalContainers.put(candidateHost, dataLocal)
-
-          // Invariant: remainingContainers == remaining
-
-          // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
-          // Add each container in `remaining` to list of containers to release. If we have an
-          // insufficient number of containers, then the next allocation cycle will reallocate
-          // (but won't treat it as data local).
-          // TODO(harvey): Rephrase this comment some more.
-          for (container <- remaining) internalReleaseContainer(container)
-          remainingContainers = null
-        }
-
-        // For rack local containers
-        if (remainingContainers != null) {
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-          if (rack != null) {
-            val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
-            val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
-              rackLocalContainers.getOrElse(rack, List()).size
-
-            if (requiredRackCount >= remainingContainers.size) {
-              // Add all remaining containers to to `dataLocalContainers`.
-              dataLocalContainers.put(rack, remainingContainers)
-              remainingContainers = null
-            } else if (requiredRackCount > 0) {
-              // Container list has more containers that we need for data locality.
-              // Split the list into two: one based on the data local container count,
-              // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
-              // containers.
-              val (rackLocal, remaining) = remainingContainers.splitAt(
-                remainingContainers.size - requiredRackCount)
-              val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
-                new ArrayBuffer[Container]())
-
-              existingRackLocal ++= rackLocal
-
-              remainingContainers = remaining
-            }
-          }
-        }
-
-        if (remainingContainers != null) {
-          // Not all containers have been consumed - add them to the list of off-rack containers.
-          offRackContainers.put(candidateHost, remainingContainers)
-        }
-      }
-
-      // Now that we have split the containers into various groups, go through them in order:
-      // first host-local, then rack-local, and finally off-rack.
-      // Note that the list we create below tries to ensure that not all containers end up within
-      // a host if there is a sufficiently large number of hosts/containers.
-      val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
-      allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
-      // Run each of the allocated containers.
-      for (container <- allocatedContainersToProcess) {
-        val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
-        val executorHostname = container.getNodeId.getHost
-        val containerId = container.getId
-
-        val executorMemoryOverhead = (executorMemory + memoryOverhead)
-        assert(container.getResource.getMemory >= executorMemoryOverhead)
-
-        if (numExecutorsRunningNow > maxExecutors) {
-          logInfo("""Ignoring container %s at host %s, since we already have the required number of
-            containers for it.""".format(containerId, executorHostname))
-          internalReleaseContainer(container)
-          numExecutorsRunning.decrementAndGet()
-        } else {
-          val executorId = executorIdCounter.incrementAndGet().toString
-          val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
-            SparkEnv.driverActorSystemName,
-            sparkConf.get("spark.driver.host"),
-            sparkConf.get("spark.driver.port"),
-            CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
-          logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
-          executorIdToContainer(executorId) = container
-
-          // To be safe, remove the container from `releasedContainers`.
-          releasedContainers.remove(containerId)
-
-          val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
-          allocatedHostToContainersMap.synchronized {
-            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-              new HashSet[ContainerId]())
-
-            containerSet += containerId
-            allocatedContainerToHostMap.put(containerId, executorHostname)
-
-            if (rack != null) {
-              allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
-            }
-          }
-          logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
-            driverUrl, executorHostname))
-          val executorRunnable = new ExecutorRunnable(
-            container,
-            conf,
-            sparkConf,
-            driverUrl,
-            executorId,
-            executorHostname,
-            executorMemory,
-            executorCores,
-            appAttemptId.getApplicationId.toString,
-            securityMgr)
-          launcherPool.execute(executorRunnable)
-        }
-      }
-      logDebug("""
-        Finished allocating %s containers (from %s originally).
-        Current number of executors running: %d,
-        Released containers: %s
-        """.format(
-          allocatedContainersToProcess,
-          allocatedContainers,
-          numExecutorsRunning.get(),
-          releasedContainers))
+      handleAllocatedContainers(allocatedContainers)
     }
 
     val completedContainers = allocateResponse.getCompletedContainersStatuses()
     if (completedContainers.size > 0) {
       logDebug("Completed %d containers".format(completedContainers.size))
 
-      for (completedContainer <- completedContainers) {
-        val containerId = completedContainer.getContainerId
-
-        if (releasedContainers.containsKey(containerId)) {
-          // Already marked the container for release, so remove it from
-          // `releasedContainers`.
-          releasedContainers.remove(containerId)
-        } else {
-          // Decrement the number of executors running. The next iteration of
-          // the ApplicationMaster's reporting thread will take care of allocating.
-          numExecutorsRunning.decrementAndGet()
-          logInfo("Completed container %s (state: %s, exit status: %s)".format(
-            containerId,
-            completedContainer.getState,
-            completedContainer.getExitStatus))
-          // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
-          // there are some exit status' we shouldn't necessarily count against us, but for
-          // now I think its ok as none of the containers are expected to exit
-          if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
-            logWarning(memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              VMEM_EXCEEDED_PATTERN))
-          } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
-            logWarning(memLimitExceededLogMessage(
-              completedContainer.getDiagnostics,
-              PMEM_EXCEEDED_PATTERN))
-          } else if (completedContainer.getExitStatus != 0) {
-            logInfo("Container marked as failed: " + containerId +
-              ". Exit status: " + completedContainer.getExitStatus +
-              ". Diagnostics: " + completedContainer.getDiagnostics)
-            numExecutorsFailed.incrementAndGet()
-          }
-        }
+      processCompletedContainers(completedContainers)
 
-        allocatedHostToContainersMap.synchronized {
-          if (allocatedContainerToHostMap.containsKey(containerId)) {
-            val hostOpt = allocatedContainerToHostMap.get(containerId)
-            assert(hostOpt.isDefined)
-            val host = hostOpt.get
-
-            val containerSetOpt = allocatedHostToContainersMap.get(host)
-            assert(containerSetOpt.isDefined)
-            val containerSet = containerSetOpt.get
-
-            containerSet.remove(containerId)
-            if (containerSet.isEmpty) {
-              allocatedHostToContainersMap.remove(host)
-            } else {
-              allocatedHostToContainersMap.update(host, containerSet)
-            }
-
-            allocatedContainerToHostMap.remove(containerId)
-
-            // TODO: Move this part outside the synchronized block?
-            val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-            if (rack != null) {
-              val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
-              if (rackCount > 0) {
-                allocatedRackCount.put(rack, rackCount)
-              } else {
-                allocatedRackCount.remove(rack)
-              }
-            }
-          }
-        }
-      }
-      logDebug("""
-        Finished processing %d completed containers.
-        Current number of executors running: %d,
-        Released containers: %s
-        """.format(
-          completedContainers.size,
-          numExecutorsRunning.get(),
-          releasedContainers))
+      logDebug("Finished processing %d completed containers. Current running executor count: %d."
+        .format(completedContainers.size, numExecutorsRunning))
     }
   }
 
-  private def allocatedContainersOnHost(host: String): Int = {
-    allocatedHostToContainersMap.synchronized {
-     allocatedHostToContainersMap.getOrElse(host, Set()).size
+  /**
+   * Request numExecutors additional containers from YARN. Visible for testing.
+   */
+  def addResourceRequests(numExecutors: Int): Unit = {
+    for (i <- 0 until numExecutors) {
+      val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
+      amClient.addContainerRequest(request)
+      val nodes = request.getNodes
+      val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+      logInfo("Container request (host: %s, capability: %s".format(hostStr, resource))
     }
   }
 
-  private def allocatedContainersOnRack(rack: String): Int = {
-    allocatedHostToContainersMap.synchronized {
-      allocatedRackCount.getOrElse(rack, 0)
+  /**
+   * Handle containers granted by the RM by launching executors on them.
+   *
+   * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
+   * in YARN granting containers that we no longer need. In this case, we release them.
+   *
+   * Visible for testing.
+   */
+  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
+    val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
+
+    // Match incoming requests by host
+    val remainingAfterHostMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- allocatedContainers) {
+      matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
+        containersToUse, remainingAfterHostMatches)
     }
-  }
-
-  private def isResourceConstraintSatisfied(container: Container): Boolean = {
-    container.getResource.getMemory >= (executorMemory + memoryOverhead)
-  }
 
-  // A simple method to copy the split info map.
-  private def generateNodeToWeight(
-      conf: Configuration,
-      input: collection.Map[String, collection.Set[SplitInfo]])
-    : (Map[String, Int], Map[String, Int]) = {
-    if (input == null) {
-      return (Map[String, Int](), Map[String, Int]())
+    // Match remaining by rack
+    val remainingAfterRackMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- remainingAfterHostMatches) {
+      val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
+      matchContainerToRequest(allocatedContainer, rack, containersToUse,
+        remainingAfterRackMatches)
     }
 
-    val hostToCount = new HashMap[String, Int]
-    val rackToCount = new HashMap[String, Int]
-
-    for ((host, splits) <- input) {
-      val hostCount = hostToCount.getOrElse(host, 0)
-      hostToCount.put(host, hostCount + splits.size)
+    // Assign remaining that are neither node-local nor rack-local
+    val remainingAfterOffRackMatches = new ArrayBuffer[Container]
+    for (allocatedContainer <- remainingAfterRackMatches) {
+      matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
+        remainingAfterOffRackMatches)
+    }
 
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
-      if (rack != null) {
-        val rackCount = rackToCount.getOrElse(host, 0)
-        rackToCount.put(host, rackCount + splits.size)
+    if (!remainingAfterOffRackMatches.isEmpty) {
+      logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
+        s"allocated to us")
+      for (container <- remainingAfterOffRackMatches) {
+        internalReleaseContainer(container)
       }
     }
 
-    (hostToCount.toMap, rackToCount.toMap)
-  }
+    runAllocatedContainers(containersToUse)
 
-  private def internalReleaseContainer(container: Container): Unit = {
-    releasedContainers.put(container.getId(), true)
-    amClient.releaseAssignedContainer(container.getId())
+    logInfo("Received %d containers from YARN, launching executors on %d of them."
+      .format(allocatedContainers.size, containersToUse.size))
   }
 
   /**
-   * Called to allocate containers in the cluster.
+   * Looks for requests for the given location that match the given container allocation. If it
+   * finds one, removes the request so that it won't be submitted again. Places the container into
+   * containersToUse or remaining.
    *
-   * @param count Number of containers to allocate.
-   *              If zero, should still contact RM (as a heartbeat).
-   * @return Response to the allocation request.
+   * @param allocatedContainer container that was given to us by YARN
+   * @location resource name, either a node, rack, or *
+   * @param containersToUse list of containers that will be used
+   * @param remaining list of containers that will not be used
    */
-  private def allocateContainers(count: Int): AllocateResponse = {
-    addResourceRequests(count)
-
-    // We have already set the container request. Poll the ResourceManager for a response.
-    // This doubles as a heartbeat if there are no pending container requests.
-    val progressIndicator = 0.1f
-    amClient.allocate(progressIndicator)
+  private def matchContainerToRequest(
+      allocatedContainer: Container,
+      location: String,
+      containersToUse: ArrayBuffer[Container],
+      remaining: ArrayBuffer[Container]): Unit = {
+    val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
+      allocatedContainer.getResource)
+
+    // Match the allocation to a request
+    if (!matchingRequests.isEmpty) {
+      val containerRequest = matchingRequests.get(0).iterator.next
+      amClient.removeContainerRequest(containerRequest)
+      containersToUse += allocatedContainer
+    } else {
+      remaining += allocatedContainer
+    }
   }
 
-  private def createRackResourceRequests(hostContainers: ArrayBuffer[ContainerRequest])
-    : ArrayBuffer[ContainerRequest] = {
-    // Generate modified racks and new set of hosts under it before issuing requests.
-    val rackToCounts = new HashMap[String, Int]()
-
-    for (container <- hostContainers) {
-      val candidateHost = container.getNodes.last
-      assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
-      val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-      if (rack != null) {
-        var count = rackToCounts.getOrElse(rack, 0)
-        count += 1
-        rackToCounts.put(rack, count)
+  /**
+   * Launches executors in the allocated containers.
+   */
+  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
+    for (container <- containersToUse) {
+      numExecutorsRunning += 1
+      assert(numExecutorsRunning <= maxExecutors)
+      val executorHostname = container.getNodeId.getHost
+      val containerId = container.getId
+      executorIdCounter += 1
+      val executorId = executorIdCounter.toString
+
+      assert(container.getResource.getMemory >= resource.getMemory)
+
+      logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+
+      val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+        new HashSet[ContainerId])
+
+      containerSet += containerId
+      allocatedContainerToHostMap.put(containerId, executorHostname)
+
+      val executorRunnable = new ExecutorRunnable(
+        container,
+        conf,
+        sparkConf,
+        driverUrl,
+        executorId,
+        executorHostname,
+        executorMemory,
+        executorCores,
+        appAttemptId.getApplicationId.toString,
+        securityMgr)
+      if (launchContainers) {
+        logInfo("Launching ExecutorRunnable. driverUrl: %s,  executorHostname: %s".format(
+          driverUrl, executorHostname))
+        launcherPool.execute(executorRunnable)
       }
     }
-
-    val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
-    for ((rack, count) <- rackToCounts) {
-      requestedContainers ++= createResourceRequests(
-        AllocationType.RACK,
-        rack,
-        count,
-        RM_REQUEST_PRIORITY)
-    }
-
-    requestedContainers
   }
 
-  private def addResourceRequests(numExecutors: Int): Unit = {
-    val containerRequests: List[ContainerRequest] =
-      if (numExecutors <= 0) {
-        logDebug("numExecutors: " + numExecutors)
-        List()
-      } else if (preferredHostToCount.isEmpty) {
-        logDebug("host preferences is empty")
-        createResourceRequests(
-          AllocationType.ANY,
-          resource = null,
-          numExecutors,
-          RM_REQUEST_PRIORITY).toList
+  private def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+    for (completedContainer <- completedContainers) {
+      val containerId = completedContainer.getContainerId
+
+      if (releasedContainers.contains(containerId)) {
+        // Already marked the container for release, so remove it from
+        // `releasedContainers`.
+        releasedContainers.remove(containerId)
       } else {
-        // Request for all hosts in preferred nodes and for numExecutors -
-        // candidates.size, request by default allocation policy.
-        val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
-        for ((candidateHost, candidateCount) <- preferredHostToCount) {
-          val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
-          if (requiredCount > 0) {
-            hostContainerRequests ++= createResourceRequests(
-              AllocationType.HOST,
-              candidateHost,
-              requiredCount,
-              RM_REQUEST_PRIORITY)
-          }
+        // Decrement the number of executors running. The next iteration of
+        // the ApplicationMaster's reporting thread will take care of allocating.
+        numExecutorsRunning -= 1
+        logInfo("Completed container %s (state: %s, exit status: %s)".format(
+          containerId,
+          completedContainer.getState,
+          completedContainer.getExitStatus))
+        // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+        // there are some exit status' we shouldn't necessarily count against us, but for
+        // now I think its ok as none of the containers are expected to exit
+        if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+          logWarning(memLimitExceededLogMessage(
+            completedContainer.getDiagnostics,
+            VMEM_EXCEEDED_PATTERN))
+        } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
+          logWarning(memLimitExceededLogMessage(
+            completedContainer.getDiagnostics,
+            PMEM_EXCEEDED_PATTERN))
+        } else if (completedContainer.getExitStatus != 0) {
+          logInfo("Container marked as failed: " + containerId +
+            ". Exit status: " + completedContainer.getExitStatus +
+            ". Diagnostics: " + completedContainer.getDiagnostics)
+          numExecutorsFailed += 1
         }
-        val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
-          hostContainerRequests).toList
-
-        val anyContainerRequests = createResourceRequests(
-          AllocationType.ANY,
-          resource = null,
-          numExecutors,
-          RM_REQUEST_PRIORITY)
-
-        val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
-          hostContainerRequests.size + rackContainerRequests.size + anyContainerRequests.size)
-
-        containerRequestBuffer ++= hostContainerRequests
-        containerRequestBuffer ++= rackContainerRequests
-        containerRequestBuffer ++= anyContainerRequests
-        containerRequestBuffer.toList
       }
 
-    for (request <- containerRequests) {
-      amClient.addContainerRequest(request)
-    }
+      allocatedHostToContainersMap.synchronized {
+        if (allocatedContainerToHostMap.containsKey(containerId)) {
+          val host = allocatedContainerToHostMap.get(containerId).get
+          val containerSet = allocatedHostToContainersMap.get(host).get
 
-    for (request <- containerRequests) {
-      val nodes = request.getNodes
-      val hostStr = if (nodes == null || nodes.isEmpty) {
-        "Any"
-      } else {
-        nodes.last
-      }
-      logInfo("Container request (host: %s, priority: %s, capability: %s".format(
-        hostStr,
-        request.getPriority().getPriority,
-        request.getCapability))
-    }
-  }
+          containerSet.remove(containerId)
+          if (containerSet.isEmpty) {
+            allocatedHostToContainersMap.remove(host)
+          } else {
+            allocatedHostToContainersMap.update(host, containerSet)
+          }
 
-  private def createResourceRequests(
-      requestType: AllocationType.AllocationType,
-      resource: String,
-      numExecutors: Int,
-      priority: Int): ArrayBuffer[ContainerRequest] = {
-    // If hostname is specified, then we need at least two requests - node local and rack local.
-    // There must be a third request, which is ANY. That will be specially handled.
-    requestType match {
-      case AllocationType.HOST => {
-        assert(YarnSparkHadoopUtil.ANY_HOST != resource)
-        val hostname = resource
-        val nodeLocal = constructContainerRequests(
-          Array(hostname),
-          racks = null,
-          numExecutors,
-          priority)
-
-        // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
-        YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
-        nodeLocal
-      }
-      case AllocationType.RACK => {
-        val rack = resource
-        constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
+          allocatedContainerToHostMap.remove(containerId)
+        }
       }
-      case AllocationType.ANY => constructContainerRequests(
-        hosts = null, racks = null, numExecutors, priority)
-      case _ => throw new IllegalArgumentException(
-        "Unexpected/unsupported request type: " + requestType)
     }
   }
 
-  private def constructContainerRequests(
-      hosts: Array[String],
-      racks: Array[String],
-      numExecutors: Int,
-      priority: Int
-    ): ArrayBuffer[ContainerRequest] = {
-    val memoryRequest = executorMemory + memoryOverhead
-    val resource = Resource.newInstance(memoryRequest, executorCores)
-
-    val prioritySetting = Records.newRecord(classOf[Priority])
-    prioritySetting.setPriority(priority)
-
-    val requests = new ArrayBuffer[ContainerRequest]()
-    for (i <- 0 until numExecutors) {
-      requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
-    }
-    requests
+  private def internalReleaseContainer(container: Container): Unit = {
+    releasedContainers.add(container.getId())
+    amClient.releaseAssignedContainer(container.getId())
   }
 
 }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index b45e599588ad39fc6750956eb8fdc94f3c11aa23..b134751366522d677de4b2ab6707eda11503b890 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
       amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
       registered = true
     }
-    new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
-      preferredNodeLocations, securityMgr)
+    new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
   }
 
   /**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d7cf904db1c9e03eae4416e04b9651de48b697ee..4bff8461236196800ee3e24ef9d262c740528632 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
 import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
@@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {
 
   // All RM requests are issued with same priority : we do not (yet) have any distinction between
   // request types (like map/reduce in hadoop for example)
-  val RM_REQUEST_PRIORITY = 1
-
-  // Host to rack map - saved from allocation requests. We are expecting this not to change.
-  // Note that it is possible for this to change : and ResourceManager will indicate that to us via
-  // update response to allocate. But we are punting on handling that for now.
-  private val hostToRack = new ConcurrentHashMap[String, String]()
-  private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+  val RM_REQUEST_PRIORITY = Priority.newInstance(1)
 
   /**
    * Add a path variable to the given environment map.
@@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  def lookupRack(conf: Configuration, host: String): String = {
-    if (!hostToRack.contains(host)) {
-      populateRackInfo(conf, host)
-    }
-    hostToRack.get(host)
-  }
-
-  def populateRackInfo(conf: Configuration, hostname: String) {
-    Utils.checkHost(hostname)
-
-    if (!hostToRack.containsKey(hostname)) {
-      // If there are repeated failures to resolve, all to an ignore list.
-      val rackInfo = RackResolver.resolve(conf, hostname)
-      if (rackInfo != null && rackInfo.getNetworkLocation != null) {
-        val rack = rackInfo.getNetworkLocation
-        hostToRack.put(hostname, rack)
-        if (! rackToHostSet.containsKey(rack)) {
-          rackToHostSet.putIfAbsent(rack,
-            Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
-        }
-        rackToHostSet.get(rack).add(hostname)
-
-        // TODO(harvey): Figure out what this comment means...
-        // Since RackResolver caches, we are disabling this for now ...
-      } /* else {
-        // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
-        hostToRack.put(hostname, null)
-      } */
-    }
-  }
-
   def getApplicationAclsForYarn(securityMgr: SecurityManager)
       : Map[ApplicationAccessType, String] = {
     Map[ApplicationAccessType, String] (
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 254774a6b839e6cc3b66309a10f49527b3cafde1..2fa24cc43325e6ef3ef08ef81d79262789b62275 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.scheduler.cluster
 
+import org.apache.hadoop.yarn.util.RackResolver
+
 import org.apache.spark._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 
@@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSc
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+    Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
   }
 }
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 4157ff95c2794780312e5bcb1812539561d2e373..be55d26f1cf61a4cebe83005c561a50e2cd64d83 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.scheduler.cluster
 
+import org.apache.hadoop.yarn.util.RackResolver
+
 import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.ApplicationMaster
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.util.Utils
 
@@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedule
   // By default, rack is unknown
   override def getRackForHost(hostPort: String): Option[String] = {
     val host = Utils.parseHostPort(hostPort)._1
-    Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+    Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
   }
 
   override def postStartHook() {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 8d184a09d64cc6b9305dd6518a5037f714866992..024b25f9d3365571449cbe3a2254d1d8b0940b1f 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,18 +17,160 @@
 
 package org.apache.spark.deploy.yarn
 
+import java.util.{Arrays, List => JList}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.YarnAllocator._
-import org.scalatest.FunSuite
+import org.apache.spark.scheduler.SplitInfo
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+
+class MockResolver extends DNSToSwitchMapping {
+
+  override def resolve(names: JList[String]): JList[String] = {
+    if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
+    else Arrays.asList("/rack1")
+  }
+
+  override def reloadCachedMappings() {}
+
+  def reloadCachedMappings(names: JList[String]) {}
+}
+
+class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach {
+  val conf = new Configuration()
+  conf.setClass(
+    CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+    classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+  val sparkConf = new SparkConf()
+  sparkConf.set("spark.driver.host", "localhost")
+  sparkConf.set("spark.driver.port", "4040")
+  sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+  sparkConf.set("spark.yarn.launchContainers", "false")
+
+  val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
+
+  // Resource returned by YARN.  YARN can give larger containers than requested, so give 6 cores
+  // instead of the 5 requested and 3 GB instead of the 2 requested.
+  val containerResource = Resource.newInstance(3072, 6)
+
+  var rmClient: AMRMClient[ContainerRequest] = _
+
+  var containerNum = 0
+
+  override def beforeEach() {
+    rmClient = AMRMClient.createAMRMClient()
+    rmClient.init(conf)
+    rmClient.start()
+  }
+
+  override def afterEach() {
+    rmClient.stop()
+  }
+
+  class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
+    override def equals(other: Any) = false
+  }
+
+  def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
+    val args = Array(
+      "--num-executors", s"$maxExecutors",
+      "--executor-cores", "5",
+      "--executor-memory", "2048",
+      "--jar", "somejar.jar",
+      "--class", "SomeClass")
+    new YarnAllocator(
+      conf,
+      sparkConf,
+      rmClient,
+      appAttemptId,
+      new ApplicationMasterArguments(args),
+      new SecurityManager(sparkConf))
+  }
+
+  def createContainer(host: String): Container = {
+    val containerId = ContainerId.newInstance(appAttemptId, containerNum)
+    containerNum += 1
+    val nodeId = NodeId.newInstance(host, 1000)
+    Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
+  }
+
+  test("single container allocated") {
+    // request a single container and receive it
+    val handler = createAllocator()
+    handler.addResourceRequests(1)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (1)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+    rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size should be (0)
+  }
+
+  test("some containers allocated") {
+    // request a few containers and receive some of them
+    val handler = createAllocator()
+    handler.addResourceRequests(4)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (4)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host1")
+    val container3 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container1, container2, container3))
+
+    handler.getNumExecutorsRunning should be (3)
+    handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
+    handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
+    handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
+    handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
+    handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
+    handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
+  }
+
+  test("receive more containers than requested") {
+    val handler = createAllocator(2)
+    handler.addResourceRequests(2)
+    handler.getNumExecutorsRunning should be (0)
+    handler.getNumPendingAllocate should be (2)
+
+    val container1 = createContainer("host1")
+    val container2 = createContainer("host2")
+    val container3 = createContainer("host4")
+    handler.handleAllocatedContainers(Array(container1, container2, container3))
+
+    handler.getNumExecutorsRunning should be (2)
+    handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
+    handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
+    handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
+    handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
+    handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
+    handler.allocatedHostToContainersMap.contains("host4") should be (false)
+  }
 
-class YarnAllocatorSuite extends FunSuite {
   test("memory exceeded diagnostic regexes") {
     val diagnostics =
       "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
-      "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
-      "5.8 GB of 4.2 GB virtual memory used. Killing container."
+        "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
+        "5.8 GB of 4.2 GB virtual memory used. Killing container."
     val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
     val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
     assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
     assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
   }
+
 }