diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 443830f8d03b6878beb73c1eb04876c6f95206d8..842bfdbadc948b9fe94fc5155f337cdd189805f0 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -24,11 +24,23 @@ package org.apache.spark
 private[spark] trait ExecutorAllocationClient {
 
   /**
-   * Express a preference to the cluster manager for a given total number of executors.
-   * This can result in canceling pending requests or filing additional requests.
+   * Update the cluster manager on our scheduling needs. Three bits of information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The cluster manager
+   *                     shouldn't kill any running executor to reach this number, but,
+   *                     if all existing executors were to die, this is the number of executors
+   *                     we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that have a locality
+   *                           preferences. This includes running, pending, and completed tasks.
+   * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
+   *                             that would like to like to run on that host.
+   *                             This includes running, pending, and completed tasks.
    * @return whether the request is acknowledged by the cluster manager.
    */
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]): Boolean
 
   /**
    * Request an additional number of executors from the cluster manager.
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 648bcfe28cad2d585f5ec707dc3b3f2e272092ca..1877aaf2cac5597eb0d029ee9c3caad4628c300a 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -161,6 +161,12 @@ private[spark] class ExecutorAllocationManager(
   //   (2) an executor idle timeout has elapsed.
   @volatile private var initializing: Boolean = true
 
+  // Number of locality aware tasks, used for executor placement.
+  private var localityAwareTasks = 0
+
+  // Host to possible task running on it, used for executor placement.
+  private var hostToLocalTaskCount: Map[String, Int] = Map.empty
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -295,7 +301,7 @@ private[spark] class ExecutorAllocationManager(
 
       // If the new target has not changed, avoid sending a message to the cluster manager
       if (numExecutorsTarget < oldNumExecutorsTarget) {
-        client.requestTotalExecutors(numExecutorsTarget)
+        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
         logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
           s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
       }
@@ -349,7 +355,8 @@ private[spark] class ExecutorAllocationManager(
       return 0
     }
 
-    val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
+    val addRequestAcknowledged = testing ||
+      client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
     if (addRequestAcknowledged) {
       val executorsString = "executor" + { if (delta > 1) "s" else "" }
       logInfo(s"Requesting $delta new $executorsString because tasks are backlogged" +
@@ -519,6 +526,12 @@ private[spark] class ExecutorAllocationManager(
     // Number of tasks currently running on the cluster.  Should be 0 when no stages are active.
     private var numRunningTasks: Int = _
 
+    // stageId to tuple (the number of task with locality preferences, a map where each pair is a
+    // node and the number of tasks that would like to be scheduled on that node) map,
+    // maintain the executor placement hints for each stage Id used by resource framework to better
+    // place the executors.
+    private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])]
+
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
       initializing = false
       val stageId = stageSubmitted.stageInfo.stageId
@@ -526,6 +539,24 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         stageIdToNumTasks(stageId) = numTasks
         allocationManager.onSchedulerBacklogged()
+
+        // Compute the number of tasks requested by the stage on each host
+        var numTasksPending = 0
+        val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
+        stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
+          if (!locality.isEmpty) {
+            numTasksPending += 1
+            locality.foreach { location =>
+              val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
+              hostToLocalTaskCountPerStage(location.host) = count
+            }
+          }
+        }
+        stageIdToExecutorPlacementHints.put(stageId,
+          (numTasksPending, hostToLocalTaskCountPerStage.toMap))
+
+        // Update the executor placement hints
+        updateExecutorPlacementHints()
       }
     }
 
@@ -534,6 +565,10 @@ private[spark] class ExecutorAllocationManager(
       allocationManager.synchronized {
         stageIdToNumTasks -= stageId
         stageIdToTaskIndices -= stageId
+        stageIdToExecutorPlacementHints -= stageId
+
+        // Update the executor placement hints
+        updateExecutorPlacementHints()
 
         // If this is the last stage with pending tasks, mark the scheduler queue as empty
         // This is needed in case the stage is aborted for any reason
@@ -637,6 +672,29 @@ private[spark] class ExecutorAllocationManager(
     def isExecutorIdle(executorId: String): Boolean = {
       !executorIdToTaskIds.contains(executorId)
     }
+
+    /**
+     * Update the Executor placement hints (the number of tasks with locality preferences,
+     * a map where each pair is a node and the number of tasks that would like to be scheduled
+     * on that node).
+     *
+     * These hints are updated when stages arrive and complete, so are not up-to-date at task
+     * granularity within stages.
+     */
+    def updateExecutorPlacementHints(): Unit = {
+      var localityAwareTasks = 0
+      val localityToCount = new mutable.HashMap[String, Int]()
+      stageIdToExecutorPlacementHints.values.foreach { case (numTasksPending, localities) =>
+        localityAwareTasks += numTasksPending
+        localities.foreach { case (hostname, count) =>
+          val updatedCount = localityToCount.getOrElse(hostname, 0) + count
+          localityToCount(hostname) = updatedCount
+        }
+      }
+
+      allocationManager.localityAwareTasks = localityAwareTasks
+      allocationManager.hostToLocalTaskCount = localityToCount.toMap
+    }
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6a6b94a271cfc70e6f927b38e8b436e0f27cee15..ac6ac6c2167673467114a52af36de38283d0c0fc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1382,16 +1382,29 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   }
 
   /**
-   * Express a preference to the cluster manager for a given total number of executors.
-   * This can result in canceling pending requests or filing additional requests.
-   * This is currently only supported in YARN mode. Return whether the request is received.
-   */
-  private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
+   * Update the cluster manager on our scheduling needs. Three bits of information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The cluster manager
+   *                     shouldn't kill any running executor to reach this number, but,
+   *                     if all existing executors were to die, this is the number of executors
+   *                     we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that have a locality
+   *                           preferences. This includes running, pending, and completed tasks.
+   * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
+   *                             that would like to like to run on that host.
+   *                             This includes running, pending, and completed tasks.
+   * @return whether the request is acknowledged by the cluster manager.
+   */
+  private[spark] override def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
+    ): Boolean = {
     assert(supportDynamicAllocation,
       "Requesting executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.requestTotalExecutors(numExecutors)
+        b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
       case _ =>
         logWarning("Requesting executors is only supported in coarse-grained mode")
         false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b6a833bbb0833428dc7345fd96ea8ebb9dc80cf5..cdf60784211231d109c7d59c9b7c29799d01488e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,8 +790,28 @@ class DAGScheduler(
     // serializable. If tasks are not serializable, a SparkListenerStageCompleted event
     // will be posted, which should always come after a corresponding SparkListenerStageSubmitted
     // event.
-    stage.makeNewStageAttempt(partitionsToCompute.size)
     outputCommitCoordinator.stageStart(stage.id)
+    val taskIdToLocations = try {
+      stage match {
+        case s: ShuffleMapStage =>
+          partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
+        case s: ResultStage =>
+          val job = s.resultOfJob.get
+          partitionsToCompute.map { id =>
+            val p = job.partitions(id)
+            (id, getPreferredLocs(stage.rdd, p))
+          }.toMap
+      }
+    } catch {
+      case NonFatal(e) =>
+        stage.makeNewStageAttempt(partitionsToCompute.size)
+        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
+        abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
+        runningStages -= stage
+        return
+    }
+
+    stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
     listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
 
     // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
@@ -830,7 +850,7 @@ class DAGScheduler(
       stage match {
         case stage: ShuffleMapStage =>
           partitionsToCompute.map { id =>
-            val locs = getPreferredLocs(stage.rdd, id)
+            val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs)
           }
@@ -840,7 +860,7 @@ class DAGScheduler(
           partitionsToCompute.map { id =>
             val p: Int = job.partitions(id)
             val part = stage.rdd.partitions(p)
-            val locs = getPreferredLocs(stage.rdd, p)
+            val locs = taskIdToLocations(id)
             new ResultTask(stage.id, stage.latestInfo.attemptId, taskBinary, part, locs, id)
           }
       }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b86724de2cb7315416bd2c2fd9d570d10e0bdb53..40a333a3e06b2e81165ef643b559a6604c973f38 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -77,8 +77,11 @@ private[spark] abstract class Stage(
   private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
 
   /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
-  def makeNewStageAttempt(numPartitionsToCompute: Int): Unit = {
-    _latestInfo = StageInfo.fromStage(this, nextAttemptId, Some(numPartitionsToCompute))
+  def makeNewStageAttempt(
+      numPartitionsToCompute: Int,
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = {
+    _latestInfo = StageInfo.fromStage(
+      this, nextAttemptId, Some(numPartitionsToCompute), taskLocalityPreferences)
     nextAttemptId += 1
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 5d2abbc67e9d922e1b6313ce9dedec7a3bf2ae02..24796c14300b1314ec27fa993cfcf9838325c9cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -34,7 +34,8 @@ class StageInfo(
     val numTasks: Int,
     val rddInfos: Seq[RDDInfo],
     val parentIds: Seq[Int],
-    val details: String) {
+    val details: String,
+    private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
   /** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -70,7 +71,12 @@ private[spark] object StageInfo {
    * shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
    * sequence of narrow dependencies should also be associated with this Stage.
    */
-  def fromStage(stage: Stage, attemptId: Int, numTasks: Option[Int] = None): StageInfo = {
+  def fromStage(
+      stage: Stage,
+      attemptId: Int,
+      numTasks: Option[Int] = None,
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+    ): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
     new StageInfo(
@@ -80,6 +86,7 @@ private[spark] object StageInfo {
       numTasks.getOrElse(stage.numTasks),
       rddInfos,
       stage.parents.map(_.id),
-      stage.details)
+      stage.details,
+      taskLocalityPreferences)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 4be1eda2e9291054bc2690e441cc6fbeb26b3ef1..06f5438433b6e14c4c665d3884762692b738ae4a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -86,7 +86,11 @@ private[spark] object CoarseGrainedClusterMessages {
 
   // Request executors by specifying the new total number of executors desired
   // This includes executors already pending or running
-  case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage
+  case class RequestExecutors(
+      requestedTotal: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int])
+    extends CoarseGrainedClusterMessage
 
   case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c65b3e517773ed2382d859658230bb0479b4b4cc..660702f6e6fd00580eab55f3877795dc92194182 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -66,6 +66,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Executors we have requested the cluster manager to kill that have not died yet
   private val executorsPendingToRemove = new HashSet[String]
 
+  // A map to store hostname with its possible task number running on it
+  protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
+
+  // The number of pending tasks which is locality required
+  protected var localityAwareTasks = 0
+
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
@@ -339,6 +345,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
     logDebug(s"Number of pending executors is now $numPendingExecutors")
+
     numPendingExecutors += numAdditionalExecutors
     // Account for executors pending to be added or removed
     val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
@@ -346,16 +353,33 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   /**
-   * Express a preference to the cluster manager for a given total number of executors. This can
-   * result in canceling pending requests or filing additional requests.
-   * @return whether the request is acknowledged.
+   * Update the cluster manager on our scheduling needs. Three bits of information are included
+   * to help it make decisions.
+   * @param numExecutors The total number of executors we'd like to have. The cluster manager
+   *                     shouldn't kill any running executor to reach this number, but,
+   *                     if all existing executors were to die, this is the number of executors
+   *                     we'd want to be allocated.
+   * @param localityAwareTasks The number of tasks in all active stages that have a locality
+   *                           preferences. This includes running, pending, and completed tasks.
+   * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages
+   *                             that would like to like to run on that host.
+   *                             This includes running, pending, and completed tasks.
+   * @return whether the request is acknowledged by the cluster manager.
    */
-  final override def requestTotalExecutors(numExecutors: Int): Boolean = synchronized {
+  final override def requestTotalExecutors(
+      numExecutors: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]
+    ): Boolean = synchronized {
     if (numExecutors < 0) {
       throw new IllegalArgumentException(
         "Attempted to request a negative number of executor(s) " +
           s"$numExecutors from the cluster manager. Please specify a positive number!")
     }
+
+    this.localityAwareTasks = localityAwareTasks
+    this.hostToLocalTaskCount = hostToLocalTaskCount
+
     numPendingExecutors =
       math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size, 0)
     doRequestTotalExecutors(numExecutors)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index bc67abb5df4468341dbc803061b0c4a2e650f90a..074282d1be37ddba5705763f2d64824f3aca03e3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -53,7 +53,8 @@ private[spark] abstract class YarnSchedulerBackend(
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    yarnSchedulerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
+    yarnSchedulerEndpoint.askWithRetry[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 803e1831bb26930b381f9a858e4709634dedf4fb..34caca892891c5c0d95dafac549bb14a0f804d8a 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -751,6 +751,42 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 2)
   }
 
+  test("get pending task number and related locality preference") {
+    sc = createSparkContext(2, 5, 3)
+    val manager = sc.executorAllocationManager.get
+
+    val localityPreferences1 = Seq(
+      Seq(TaskLocation("host1"), TaskLocation("host2"), TaskLocation("host3")),
+      Seq(TaskLocation("host1"), TaskLocation("host2"), TaskLocation("host4")),
+      Seq(TaskLocation("host2"), TaskLocation("host3"), TaskLocation("host4")),
+      Seq.empty,
+      Seq.empty
+    )
+    val stageInfo1 = createStageInfo(1, 5, localityPreferences1)
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1))
+
+    assert(localityAwareTasks(manager) === 3)
+    assert(hostToLocalTaskCount(manager) ===
+      Map("host1" -> 2, "host2" -> 3, "host3" -> 2, "host4" -> 2))
+
+    val localityPreferences2 = Seq(
+      Seq(TaskLocation("host2"), TaskLocation("host3"), TaskLocation("host5")),
+      Seq(TaskLocation("host3"), TaskLocation("host4"), TaskLocation("host5")),
+      Seq.empty
+    )
+    val stageInfo2 = createStageInfo(2, 3, localityPreferences2)
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2))
+
+    assert(localityAwareTasks(manager) === 5)
+    assert(hostToLocalTaskCount(manager) ===
+      Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2))
+
+    sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1))
+    assert(localityAwareTasks(manager) === 2)
+    assert(hostToLocalTaskCount(manager) ===
+      Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
@@ -784,8 +820,13 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private val sustainedSchedulerBacklogTimeout = 2L
   private val executorIdleTimeout = 3L
 
-  private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
-    new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details")
+  private def createStageInfo(
+      stageId: Int,
+      numTasks: Int,
+      taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty
+    ): StageInfo = {
+    new StageInfo(
+      stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details", taskLocalityPreferences)
   }
 
   private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
@@ -815,6 +856,8 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private val _onSchedulerQueueEmpty = PrivateMethod[Unit]('onSchedulerQueueEmpty)
   private val _onExecutorIdle = PrivateMethod[Unit]('onExecutorIdle)
   private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
+  private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
+  private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
 
   private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
     manager invokePrivate _numExecutorsToAdd()
@@ -885,4 +928,12 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): Unit = {
     manager invokePrivate _onExecutorBusy(id)
   }
+
+  private def localityAwareTasks(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _localityAwareTasks()
+  }
+
+  private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
+    manager invokePrivate _hostToLocalTaskCount()
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 5a2670e4d1cf0c6591332f3805d9e3e105b8e141..139b8dc25f4b4b61fffad82e904ed99cc4a1023d 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -182,7 +182,7 @@ class HeartbeatReceiverSuite
 
     // Adjust the target number of executors on the cluster manager side
     assert(fakeClusterManager.getTargetNumExecutors === 0)
-    sc.requestTotalExecutors(2)
+    sc.requestTotalExecutors(2, 0, Map.empty)
     assert(fakeClusterManager.getTargetNumExecutors === 2)
     assert(fakeClusterManager.getExecutorIdsToKill.isEmpty)
 
@@ -241,7 +241,8 @@ private class FakeSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
   protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
-    clusterManagerEndpoint.askWithRetry[Boolean](RequestExecutors(requestedTotal))
+    clusterManagerEndpoint.askWithRetry[Boolean](
+      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
   }
 
   protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
@@ -260,7 +261,7 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin
   def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case RequestExecutors(requestedTotal) =>
+    case RequestExecutors(requestedTotal, _, _) =>
       targetNumExecutors = requestedTotal
       context.reply(true)
     case KillExecutors(executorIds) =>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 83dafa4a125d299b818f4382cf2df99ce1ae798c..44acc7374d024f96a77fed28d1cb04cb9f3cbacc 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -555,11 +555,12 @@ private[spark] class ApplicationMaster(
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-      case RequestExecutors(requestedTotal) =>
+      case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
         Option(allocator) match {
           case Some(a) =>
             allocatorLock.synchronized {
-              if (a.requestTotalExecutors(requestedTotal)) {
+              if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
+                localityAwareTasks, hostToLocalTaskCount)) {
                 allocatorLock.notifyAll()
               }
             }
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
new file mode 100644
index 0000000000000000000000000000000000000000..081780204e42486cea85f694502c33ce6a1313dc
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, Set}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records.{ContainerId, Resource}
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String])
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN containers by considering
+ * the node ratio of pending tasks, number of required cores/containers and and locality of current
+ * existing containers. The target of this algorithm is to maximize the number of tasks that
+ * would run locally.
+ *
+ * Consider a situation in which we have 20 tasks that require (host1, host2, host3)
+ * and 10 tasks that require (host1, host2, host4), besides each container has 2 cores
+ * and cpus per task is 1, so the required container number is 15,
+ * and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1)
+ *
+ * 3. If containers exist but none of them can match the requested localities,
+ * follow the method of 1 and 2.
+ *
+ * 4. If containers exist and some of them can match the requested localities.
+ * For example if we have 1 containers on each node (host1: 1, host2: 1: host3: 1, host4: 1),
+ * and the expected containers on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to (host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers exist and existing localities can fully cover the requested localities.
+ * For example if we have 5 containers on each node (host1: 5, host2: 5, host3: 5, host4: 5),
+ * which could cover the current requested localities. This algorithm will allocate all the
+ * requested containers with no localities.
+ */
+private[yarn] class LocalityPreferredContainerPlacementStrategy(
+    val sparkConf: SparkConf,
+    val yarnConf: Configuration,
+    val resource: Resource) {
+
+  // Number of CPUs per task
+  private val CPUS_PER_TASK = sparkConf.getInt("spark.task.cpus", 1)
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwareTasks number of locality required tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+   *                             numbers running on it, used as hints for container allocation
+   * @return node localities and rack localities, each locality is an array of string,
+   *         the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+      numContainer: Int,
+      numLocalityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+    ): Array[ContainerLocalityPreferences] = {
+    val updatedHostToContainerCount = expectedHostToContainerCount(
+      numLocalityAwareTasks, hostToLocalTaskCount, allocatedHostToContainersMap)
+    val updatedLocalityAwareContainerNum = updatedHostToContainerCount.values.sum
+
+    // The number of containers to allocate, divided into two groups, one with preferred locality,
+    // and the other without locality preference.
+    val requiredLocalityFreeContainerNum =
+      math.max(0, numContainer - updatedLocalityAwareContainerNum)
+    val requiredLocalityAwareContainerNum = numContainer - requiredLocalityFreeContainerNum
+
+    val containerLocalityPreferences = ArrayBuffer[ContainerLocalityPreferences]()
+    if (requiredLocalityFreeContainerNum > 0) {
+      for (i <- 0 until requiredLocalityFreeContainerNum) {
+        containerLocalityPreferences += ContainerLocalityPreferences(
+          null.asInstanceOf[Array[String]], null.asInstanceOf[Array[String]])
+      }
+    }
+
+    if (requiredLocalityAwareContainerNum > 0) {
+      val largestRatio = updatedHostToContainerCount.values.max
+      // Round the ratio of preferred locality to the number of locality required container
+      // number, which is used for locality preferred host calculating.
+      var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
+        val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
+        adjustedRatio.ceil.toInt
+      }
+
+      for (i <- 0 until requiredLocalityAwareContainerNum) {
+        // Only filter out the ratio which is larger than 0, which means the current host can
+        // still be allocated with new container request.
+        val hosts = preferredLocalityRatio.filter(_._2 > 0).keys.toArray
+        val racks = hosts.map { h =>
+          RackResolver.resolve(yarnConf, h).getNetworkLocation
+        }.toSet
+        containerLocalityPreferences += ContainerLocalityPreferences(hosts, racks.toArray)
+
+        // Minus 1 each time when the host is used. When the current ratio is 0,
+        // which means all the required ratio is satisfied, this host will not be allocated again.
+        preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+      }
+    }
+
+    containerLocalityPreferences.toArray
+  }
+
+  /**
+   * Calculate the number of executors need to satisfy the given number of pending tasks.
+   */
+  private def numExecutorsPending(numTasksPending: Int): Int = {
+    val coresPerExecutor = resource.getVirtualCores
+    (numTasksPending * CPUS_PER_TASK + coresPerExecutor - 1) / coresPerExecutor
+  }
+
+  /**
+   * Calculate the expected host to number of containers by considering with allocated containers.
+   * @param localityAwareTasks number of locality aware tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and possible task
+   *                             numbers running on it, used as hints for container allocation
+   * @return a map with hostname as key and required number of containers on this host as value
+   */
+  private def expectedHostToContainerCount(
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int],
+      allocatedHostToContainersMap: HashMap[String, Set[ContainerId]]
+    ): Map[String, Int] = {
+    val totalLocalTaskNum = hostToLocalTaskCount.values.sum
+    hostToLocalTaskCount.map { case (host, count) =>
+      val expectedCount =
+        count.toDouble * numExecutorsPending(localityAwareTasks) / totalLocalTaskNum
+      val existedCount = allocatedHostToContainersMap.get(host)
+        .map(_.size)
+        .getOrElse(0)
+
+      // If existing container can not fully satisfy the expected number of container,
+      // the required container number is expected count minus existed count. Otherwise the
+      // required container number is 0.
+      (host, math.max(0, (expectedCount - existedCount).ceil.toInt))
+    }
+  }
+}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 940873fbd046c6624d0e4bf3234f48d942b2b566..6c103394af098b3a791c473152bcfe18b73cc936 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -96,7 +96,7 @@ private[yarn] class YarnAllocator(
   // Number of cores per executor.
   protected val executorCores = args.executorCores
   // Resource capability requested for each executors
-  private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
+  private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
 
   private val launcherPool = new ThreadPoolExecutor(
     // max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
@@ -127,6 +127,16 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  // A map to store preferred hostname and possible task numbers running on it.
+  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
+
+  // Number of tasks that have locality preferences in active stages
+  private var numLocalityAwareTasks: Int = 0
+
+  // A container placement strategy based on pending tasks' locality preference
+  private[yarn] val containerPlacementStrategy =
+    new LocalityPreferredContainerPlacementStrategy(sparkConf, conf, resource)
+
   def getNumExecutorsRunning: Int = numExecutorsRunning
 
   def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -146,10 +156,19 @@ private[yarn] class YarnAllocator(
    * Request as many executors from the ResourceManager as needed to reach the desired total. If
    * the requested total is smaller than the current number of running executors, no executors will
    * be killed.
-   *
+   * @param requestedTotal total number of containers requested
+   * @param localityAwareTasks number of locality aware tasks to be used as container placement hint
+   * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
+   *                             container placement hint.
    * @return Whether the new requested total is different than the old value.
    */
-  def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
+  def requestTotalExecutorsWithPreferredLocalities(
+      requestedTotal: Int,
+      localityAwareTasks: Int,
+      hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+    this.numLocalityAwareTasks = localityAwareTasks
+    this.hostToLocalTaskCounts = hostToLocalTaskCount
+
     if (requestedTotal != targetNumExecutors) {
       logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
       targetNumExecutors = requestedTotal
@@ -221,12 +240,20 @@ private[yarn] class YarnAllocator(
     val numPendingAllocate = getNumPendingAllocate
     val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
 
+    // TODO. Consider locality preferences of pending container requests.
+    // Since the last time we made container requests, stages have completed and been submitted,
+    // and that the localities at which we requested our pending executors
+    // no longer apply to our current needs. We should consider to remove all outstanding
+    // container requests and add requests anew each time to avoid this.
     if (missing > 0) {
       logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
         s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
 
-      for (i <- 0 until missing) {
-        val request = createContainerRequest(resource)
+      val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers(
+        missing, numLocalityAwareTasks, hostToLocalTaskCounts, allocatedHostToContainersMap)
+
+      for (locality <- containerLocalityPreferences) {
+        val request = createContainerRequest(resource, locality.nodes, locality.racks)
         amClient.addContainerRequest(request)
         val nodes = request.getNodes
         val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -249,11 +276,14 @@ private[yarn] class YarnAllocator(
    * Creates a container request, handling the reflection required to use YARN features that were
    * added in recent versions.
    */
-  private def createContainerRequest(resource: Resource): ContainerRequest = {
+  protected def createContainerRequest(
+      resource: Resource,
+      nodes: Array[String],
+      racks: Array[String]): ContainerRequest = {
     nodeLabelConstructor.map { constructor =>
-      constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
+      constructor.newInstance(resource, nodes, racks, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
         labelExpression.orNull)
-    }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY))
+    }.getOrElse(new ContainerRequest(resource, nodes, racks, RM_REQUEST_PRIORITY))
   }
 
   /**
@@ -437,7 +467,6 @@ private[yarn] class YarnAllocator(
     releasedContainers.add(container.getId())
     amClient.releaseAssignedContainer(container.getId())
   }
-
 }
 
 private object YarnAllocator {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b7fe4ccc67a38b851910ba7209c9b3e089f21a5d
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
+  import yarnAllocatorSuite._
+
+  override def beforeEach() {
+    yarnAllocatorSuite.beforeEach()
+  }
+
+  override def afterEach() {
+    yarnAllocatorSuite.afterEach()
+  }
+
+  test("allocate locality preferred containers with enough resource and no matched existed " +
+    "containers") {
+    // 1. All the locations of current containers cannot satisfy the new requirements
+    // 2. Current requested container number can fully satisfy the pending tasks.
+
+    val handler = createAllocator(2)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
+
+    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+      3, 15, Map("host3" -> 15, "host4" -> 15, "host5" -> 10), handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(
+      Array("host3", "host4", "host5"),
+      Array("host3", "host4", "host5"),
+      Array("host3", "host4")))
+  }
+
+  test("allocate locality preferred containers with enough resource and partially matched " +
+    "containers") {
+    // 1. Parts of current containers' locations can satisfy the new requirements
+    // 2. Current requested container number can fully satisfy the pending tasks.
+
+    val handler = createAllocator(3)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2")
+    ))
+
+    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) ===
+      Array(null, Array("host2", "host3"), Array("host2", "host3")))
+  }
+
+  test("allocate locality preferred containers with limited resource and partially matched " +
+    "containers") {
+    // 1. Parts of current containers' locations can satisfy the new requirements
+    // 2. Current requested container number cannot fully satisfy the pending tasks.
+
+    val handler = createAllocator(3)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2")
+    ))
+
+    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+      1, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(Array("host2", "host3")))
+  }
+
+  test("allocate locality preferred containers with fully matched containers") {
+    // Current containers' locations can fully satisfy the new requirements
+
+    val handler = createAllocator(5)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(
+      createContainer("host1"),
+      createContainer("host1"),
+      createContainer("host2"),
+      createContainer("host2"),
+      createContainer("host3")
+    ))
+
+    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+      3, 15, Map("host1" -> 15, "host2" -> 15, "host3" -> 10), handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(null, null, null))
+  }
+
+  test("allocate containers with no locality preference") {
+    // Request new container without locality preference
+
+    val handler = createAllocator(2)
+    handler.updateResourceRequests()
+    handler.handleAllocatedContainers(Array(createContainer("host1"), createContainer("host2")))
+
+    val localities = handler.containerPlacementStrategy.localityOfRequestedContainers(
+      1, 0, Map.empty, handler.allocatedHostToContainersMap)
+
+    assert(localities.map(_.nodes) === Array(null))
+  }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 7509000771d9412c309c26dc231d72bb525f9701..37a789fcd375ba63889014969fbe6fc81eeadca7 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.scalatest.{BeforeAndAfterEach, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkFunSuite}
 import org.apache.spark.SparkConf
@@ -32,8 +33,6 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.YarnAllocator._
 import org.apache.spark.scheduler.SplitInfo
 
-import org.scalatest.{BeforeAndAfterEach, Matchers}
-
 class MockResolver extends DNSToSwitchMapping {
 
   override def resolve(names: JList[String]): JList[String] = {
@@ -171,7 +170,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (4)
 
-    handler.requestTotalExecutors(3)
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (3)
 
@@ -182,7 +181,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
     handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
 
-    handler.requestTotalExecutors(2)
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (1)
   }
@@ -193,7 +192,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.getNumExecutorsRunning should be (0)
     handler.getNumPendingAllocate should be (4)
 
-    handler.requestTotalExecutors(3)
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (3)
 
@@ -203,7 +202,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
     handler.getNumExecutorsRunning should be (2)
 
-    handler.requestTotalExecutors(1)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
     handler.updateResourceRequests()
     handler.getNumPendingAllocate should be (0)
     handler.getNumExecutorsRunning should be (2)
@@ -219,7 +218,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
 
-    handler.requestTotalExecutors(1)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
     handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
 
     val statuses = Seq(container1, container2).map { c =>
@@ -241,5 +240,4 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
     assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
   }
-
 }