diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e8fdfff04390d90157e351389ce6012b8c58c15a..40ea369f9ef9399530080b213b1747a11d9185a5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -294,7 +294,8 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
   executorEnvs("SPARK_USER") = sparkUser
   // Create and start the scheduler
-  private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
+  private[spark] var (schedulerBackend, taskScheduler) =
+    SparkContext.createTaskScheduler(this, master)
   private val heartbeatReceiver = env.actorSystem.actorOf(
     Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
   @volatile private[spark] var dagScheduler: DAGScheduler = _
@@ -856,6 +857,40 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
+  /**
+   * :: DeveloperApi ::
+   * Request an additional number of executors from the cluster manager.
+   * This is currently only supported in Yarn mode.
+   */
+  @DeveloperApi
+  def requestExecutors(numAdditionalExecutors: Int): Unit = {
+    schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors)
+      case _ => logWarning("Requesting executors is only supported in coarse-grained mode")
+    }
+  }
+  /**
+   * :: DeveloperApi ::
+   * Request that the cluster manager kill the specified executors.
+   * This is currently only supported in Yarn mode.
+   */
+  @DeveloperApi
+  def killExecutors(executorIds: Seq[String]): Unit = {
+    schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds)
+      case _ => logWarning("Killing executors is only supported in coarse-grained mode")
+    }
+  }
+  /**
+   * :: DeveloperApi ::
+   * Request that cluster manager the kill the specified executor.
+   * This is currently only supported in Yarn mode.
+   */
+  @DeveloperApi
+  def killExecutor(executorId: String): Unit = killExecutors(Seq(executorId))
   /** The version of Spark on which this application is running. */
   def version = SPARK_VERSION
@@ -1438,8 +1473,13 @@ object SparkContext extends Logging {
-  /** Creates a task scheduler based on a given master URL. Extracted for testing. */
-  private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = {
+  /**
+   * Create a task scheduler based on a given master URL.
+   * Return a 2-tuple of the scheduler backend and the task scheduler.
+   */
+  private def createTaskScheduler(
+      sc: SparkContext,
+      master: String): (SchedulerBackend, TaskScheduler) = {
     // Regular expression used for local[N] and local[*] master formats
     val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
     // Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -1461,7 +1501,7 @@ object SparkContext extends Logging {
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalBackend(scheduler, 1)
-        scheduler
+        (backend, scheduler)
       case LOCAL_N_REGEX(threads) =>
         def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1470,7 +1510,7 @@ object SparkContext extends Logging {
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
         val backend = new LocalBackend(scheduler, threadCount)
-        scheduler
+        (backend, scheduler)
       case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
         def localCpuCount = Runtime.getRuntime.availableProcessors()
@@ -1480,14 +1520,14 @@ object SparkContext extends Logging {
         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
         val backend = new LocalBackend(scheduler, threadCount)
-        scheduler
+        (backend, scheduler)
       case SPARK_REGEX(sparkUrl) =>
         val scheduler = new TaskSchedulerImpl(sc)
         val masterUrls = sparkUrl.split(",").map("spark://" + _)
         val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
-        scheduler
+        (backend, scheduler)
       case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
         // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
@@ -1507,7 +1547,7 @@ object SparkContext extends Logging {
         backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
-        scheduler
+        (backend, scheduler)
       case "yarn-standalone" | "yarn-cluster" =>
         if (master == "yarn-standalone") {
@@ -1536,7 +1576,7 @@ object SparkContext extends Logging {
-        scheduler
+        (backend, scheduler)
       case "yarn-client" =>
         val scheduler = try {
@@ -1563,7 +1603,7 @@ object SparkContext extends Logging {
-        scheduler
+        (backend, scheduler)
       case mesosUrl @ MESOS_REGEX(_) =>
@@ -1576,13 +1616,13 @@ object SparkContext extends Logging {
           new MesosSchedulerBackend(scheduler, sc, url)
-        scheduler
+        (backend, scheduler)
       case SIMR_REGEX(simrUrl) =>
         val scheduler = new TaskSchedulerImpl(sc)
         val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
-        scheduler
+        (backend, scheduler)
       case _ =>
         throw new SparkException("Could not parse Master URL: '" + master + "'")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 2b39c7fc872dab327d5bab21953a4b54e1396d8e..cd3c015321e85275b7c7e83df41ac22654b76ab5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -34,7 +34,6 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
-import akka.actor.Props
  * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index fb8160abc59db4fdf7fb5d5f1d828a0448a86a0f..1da6fe976da5b2b46132c416657abcf20c4262b5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -66,7 +66,19 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
-  case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase :String)
+  // Exchanged between the driver and the AM in Yarn client mode
+  case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String)
     extends CoarseGrainedClusterMessage
+  // Messages exchanged between the driver and the cluster manager for executor allocation
+  // In Yarn mode, these are exchanged between the driver and the AM
+  case object RegisterClusterManager extends CoarseGrainedClusterMessage
+  // Request executors by specifying the new total number of executors desired
+  // This includes executors already pending or running
+  case class RequestExecutors(requestedTotal: Int) extends CoarseGrainedClusterMessage
+  case class KillExecutors(executorIds: Seq[String]) extends CoarseGrainedClusterMessage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 59aed6b72fe428733234f8bf74e7b6ffe4b92e4c..7a6ee56f816899f11b4e03dd654e93459ab5c943 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -31,7 +31,6 @@ import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState}
 import org.apache.spark.scheduler.{SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{ActorLogReceive, SerializableBuffer, AkkaUtils, Utils}
-import org.apache.spark.ui.JettyUtils
  * A scheduler backend that waits for coarse grained executors to connect to it through Akka.
@@ -42,7 +41,7 @@ import org.apache.spark.ui.JettyUtils
  * (spark.deploy.*).
-class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: ActorSystem)
+class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSystem: ActorSystem)
   extends SchedulerBackend with Logging
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
@@ -61,10 +60,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()
+  private val executorDataMap = new HashMap[String, ExecutorData]
+  // Number of executors requested from the cluster manager that have not registered yet
+  private var numPendingExecutors = 0
+  // Executors we have requested the cluster manager to kill that have not died yet
+  private val executorsPendingToRemove = new HashSet[String]
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
     override protected def log = CoarseGrainedSchedulerBackend.this.log
     private val addressToExecutorId = new HashMap[Address, String]
-    private val executorDataMap = new HashMap[String, ExecutorData]
     override def preStart() {
       // Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -84,12 +90,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         } else {
           logInfo("Registered executor: " + sender + " with ID " + executorId)
           sender ! RegisteredExecutor
-          executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
-            Utils.parseHostPort(hostPort)._1, cores, cores))
           addressToExecutorId(sender.path.address) = executorId
+          val (host, _) = Utils.parseHostPort(hostPort)
+          val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
+          // This must be synchronized because variables mutated
+          // in this block are read when requesting executors
+          CoarseGrainedSchedulerBackend.this.synchronized {
+            executorDataMap.put(executorId, data)
+            if (numPendingExecutors > 0) {
+              numPendingExecutors -= 1
+              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
+            }
+          }
@@ -128,10 +143,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         removeExecutor(executorId, reason)
         sender ! true
-      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
-        addWebUIFilter(filterName, filterParams, proxyBase)
-        sender ! true
       case DisassociatedEvent(_, address, _) =>
           "remote Akka client disassociated"))
@@ -183,13 +194,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     // Remove a disconnected slave from the cluster
-    def removeExecutor(executorId: String, reason: String) {
+    def removeExecutor(executorId: String, reason: String): Unit = {
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
-          executorDataMap -= executorId
+          // This must be synchronized because variables mutated
+          // in this block are read when requesting executors
+          CoarseGrainedSchedulerBackend.this.synchronized {
+            executorDataMap -= executorId
+            executorsPendingToRemove -= executorId
+          }
           scheduler.executorLost(executorId, SlaveLost(reason))
-        case None => logError(s"Asked to remove non existant executor $executorId")
+        case None => logError(s"Asked to remove non-existent executor $executorId")
@@ -274,21 +290,62 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
-  // Add filters to the SparkUI
-  def addWebUIFilter(filterName: String, filterParams: Map[String, String], proxyBase: String) {
-    if (proxyBase != null && proxyBase.nonEmpty) {
-      System.setProperty("spark.ui.proxyBase", proxyBase)
-    }
+  /**
+   * Return the number of executors currently registered with this backend.
+   */
+  def numExistingExecutors: Int = executorDataMap.size
+  /**
+   * Request an additional number of executors from the cluster manager.
+   * Return whether the request is acknowledged.
+   */
+  final def requestExecutors(numAdditionalExecutors: Int): Boolean = synchronized {
+    logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
+    logDebug(s"Number of pending executors is now $numPendingExecutors")
+    numPendingExecutors += numAdditionalExecutors
+    // Account for executors pending to be added or removed
+    val newTotal = numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size
+    doRequestTotalExecutors(newTotal)
+  }
-    val hasFilter = (filterName != null && filterName.nonEmpty &&
-      filterParams != null && filterParams.nonEmpty)
-    if (hasFilter) {
-      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
-      conf.set("spark.ui.filters", filterName)
-      filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
-      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+  /**
+   * Request executors from the cluster manager by specifying the total number desired,
+   * including existing pending and running executors.
+   *
+   * The semantics here guarantee that we do not over-allocate executors for this application,
+   * since a later request overrides the value of any prior request. The alternative interface
+   * of requesting a delta of executors risks double counting new executors when there are
+   * insufficient resources to satisfy the first request. We make the assumption here that the
+   * cluster manager will eventually fulfill all requests when resources free up.
+   *
+   * Return whether the request is acknowledged.
+   */
+  protected def doRequestTotalExecutors(requestedTotal: Int): Boolean = false
+  /**
+   * Request that the cluster manager kill the specified executors.
+   * Return whether the kill request is acknowledged.
+   */
+  final def killExecutors(executorIds: Seq[String]): Boolean = {
+    logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
+    val filteredExecutorIds = new ArrayBuffer[String]
+    executorIds.foreach { id =>
+      if (executorDataMap.contains(id)) {
+        filteredExecutorIds += id
+      } else {
+        logWarning(s"Executor to kill $id does not exist!")
+      }
+    executorsPendingToRemove ++= filteredExecutorIds
+    doKillExecutors(filteredExecutorIds)
+  /**
+   * Kill the given list of executors through the cluster manager.
+   * Return whether the kill request is acknowledged.
+   */
+  protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
 private[spark] object CoarseGrainedSchedulerBackend {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
new file mode 100644
index 0000000000000000000000000000000000000000..50721b9d6cd6cd8496e3f3e88368fbddbc091ea1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -0,0 +1,142 @@
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+import akka.actor.{Actor, ActorRef, Props}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.ui.JettyUtils
+import org.apache.spark.util.AkkaUtils
+ * Abstract Yarn scheduler backend that contains common logic
+ * between the client and cluster Yarn scheduler backends.
+ */
+private[spark] abstract class YarnSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
+  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+    minRegisteredRatio = 0.8
+  }
+  protected var totalExpectedExecutors = 0
+  private val yarnSchedulerActor: ActorRef =
+    actorSystem.actorOf(
+      Props(new YarnSchedulerActor),
+      name = YarnSchedulerBackend.ACTOR_NAME)
+  private implicit val askTimeout = AkkaUtils.askTimeout(sc.conf)
+  /**
+   * Request executors from the ApplicationMaster by specifying the total number desired.
+   * This includes executors already pending or running.
+   */
+  override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+    AkkaUtils.askWithReply[Boolean](
+      RequestExecutors(requestedTotal), yarnSchedulerActor, askTimeout)
+  }
+  /**
+   * Request that the ApplicationMaster kill the specified executors.
+   */
+  override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+    AkkaUtils.askWithReply[Boolean](
+      KillExecutors(executorIds), yarnSchedulerActor, askTimeout)
+  }
+  override def sufficientResourcesRegistered(): Boolean = {
+    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
+  }
+  /**
+   * Add filters to the SparkUI.
+   */
+  private def addWebUIFilter(
+      filterName: String,
+      filterParams: Map[String, String],
+      proxyBase: String): Unit = {
+    if (proxyBase != null && proxyBase.nonEmpty) {
+      System.setProperty("spark.ui.proxyBase", proxyBase)
+    }
+    val hasFilter =
+      filterName != null && filterName.nonEmpty &&
+      filterParams != null && filterParams.nonEmpty
+    if (hasFilter) {
+      logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
+      conf.set("spark.ui.filters", filterName)
+      filterParams.foreach { case (k, v) => conf.set(s"spark.$filterName.param.$k", v) }
+      scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
+    }
+  }
+  /**
+   * An actor that communicates with the ApplicationMaster.
+   */
+  private class YarnSchedulerActor extends Actor {
+    private var amActor: Option[ActorRef] = None
+    override def preStart(): Unit = {
+      // Listen for disassociation events
+      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
+    }
+    override def receive = {
+      case RegisterClusterManager =>
+        logInfo(s"ApplicationMaster registered as $sender")
+        amActor = Some(sender)
+      case r: RequestExecutors =>
+        amActor match {
+          case Some(actor) =>
+            sender ! AkkaUtils.askWithReply[Boolean](r, actor, askTimeout)
+          case None =>
+            logWarning("Attempted to request executors before the AM has registered!")
+            sender ! false
+        }
+      case k: KillExecutors =>
+        amActor match {
+          case Some(actor) =>
+            sender ! AkkaUtils.askWithReply[Boolean](k, actor, askTimeout)
+          case None =>
+            logWarning("Attempted to kill executors before the AM has registered!")
+            sender ! false
+        }
+      case AddWebUIFilter(filterName, filterParams, proxyBase) =>
+        addWebUIFilter(filterName, filterParams, proxyBase)
+        sender ! true
+      case d: DisassociatedEvent =>
+        if (amActor.isDefined && sender == amActor.get) {
+          logWarning(s"ApplicationMaster has disassociated: $d")
+        }
+    }
+  }
+private[spark] object YarnSchedulerBackend {
+  val ACTOR_NAME = "YarnScheduler"
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index f41c8d0315cb36b32a8a9aa37749f53d3ef28fa7..79e398eb8c1041dadf1d7cb7dd236699e0aad429 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -159,17 +159,28 @@ private[spark] object AkkaUtils extends Logging {
   def askWithReply[T](
       message: Any,
       actor: ActorRef,
-      retryAttempts: Int,
+      timeout: FiniteDuration): T = {
+    askWithReply[T](message, actor, maxAttempts = 1, retryInterval = Int.MaxValue, timeout)
+  }
+  /**
+   * Send a message to the given actor and get its result within a default timeout, or
+   * throw a SparkException if this fails even after the specified number of retries.
+   */
+  def askWithReply[T](
+      message: Any,
+      actor: ActorRef,
+      maxAttempts: Int,
       retryInterval: Int,
       timeout: FiniteDuration): T = {
     // TODO: Consider removing multiple attempts
     if (actor == null) {
-      throw new SparkException("Error sending message as driverActor is null " +
+      throw new SparkException("Error sending message as actor is null " +
         "[message = " + message + "]")
     var attempts = 0
     var lastException: Exception = null
-    while (attempts < retryAttempts) {
+    while (attempts < maxAttempts) {
       attempts += 1
       try {
         val future = actor.ask(message)(timeout)
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 495a0d48633a465625821763edf3209c686c6ea7..df237ba796b38d1706339f3a2be3e2fedbaf1adb 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester}
-import org.apache.spark.scheduler.{TaskScheduler, TaskSchedulerImpl}
+import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
 import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import org.apache.spark.scheduler.local.LocalBackend
@@ -31,8 +31,9 @@ class SparkContextSchedulerCreationSuite
     // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
     // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
     val sc = new SparkContext("local", "test")
-    val createTaskSchedulerMethod = PrivateMethod[TaskScheduler]('createTaskScheduler)
-    val sched = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
+    val createTaskSchedulerMethod =
+      PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
+    val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index adbdc5d1da3c16dd6f105840b44f89b4190b27b1..6a0495f8fd54020f76c13f637828a5e80510433a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -73,6 +73,10 @@ object MimaExcludes {
+          ) ++ Seq(
+            // SPARK-3822
+            ProblemFilters.exclude[IncompatibleResultTypeProblem](
+              "org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
         case v if v.startsWith("1.1") =>
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e6fe0265d8811bdd5d2d5d0d410525b4cef3b02b..68073798886dd382085b9d41564bce8f1cac35af 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -36,8 +36,8 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, Spar
 import org.apache.spark.SparkException
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.HistoryServer
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
+import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@@ -385,8 +385,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
-      CoarseGrainedSchedulerBackend.ACTOR_NAME)
-    actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
+      YarnSchedulerBackend.ACTOR_NAME)
+    actorSystem.actorOf(Props(new AMActor(driverUrl)), name = "YarnAM")
   /** Add the Yarn IP filter that is required for properly securing the UI. */
@@ -479,9 +479,10 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
-  // Actor used to monitor the driver when running in client deploy mode.
-  private class MonitorActor(driverUrl: String) extends Actor {
+  /**
+   * Actor that communicates with the driver in client deploy mode.
+   */
+  private class AMActor(driverUrl: String) extends Actor {
     var driver: ActorSelection = _
     override def preStart() = {
@@ -490,6 +491,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       // Send a hello message to establish the connection, after which
       // we can monitor Lifecycle Events.
       driver ! "Hello"
+      driver ! RegisterClusterManager
       context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
@@ -497,11 +499,27 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       case x: DisassociatedEvent =>
         logInfo(s"Driver terminated or disconnected! Shutting down. $x")
         finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
       case x: AddWebUIFilter =>
         logInfo(s"Add WebUI Filter. $x")
         driver ! x
-    }
+      case RequestExecutors(requestedTotal) =>
+        logInfo(s"Driver requested a total number of executors of $requestedTotal.")
+        Option(allocator) match {
+          case Some(a) => a.requestTotalExecutors(requestedTotal)
+          case None => logWarning("Container allocator is not ready to request executors yet.")
+        }
+        sender ! true
+      case KillExecutors(executorIds) =>
+        logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.")
+        Option(allocator) match {
+          case Some(a) => executorIds.foreach(a.killExecutor)
+          case None => logWarning("Container allocator is not ready to kill executors yet.")
+        }
+        sender ! true
+    }
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index e1af8d5a74cb1d81ff90e56b1d657bd6a4a847ba..7ae8ef237ff8955f9e5ededb6e8396d6f43064f9 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -88,7 +88,10 @@ private[yarn] abstract class YarnAllocator(
   private val executorIdCounter = new AtomicInteger()
   private val numExecutorsFailed = new AtomicInteger()
-  private val maxExecutors = args.numExecutors
+  private var maxExecutors = args.numExecutors
+  // Keep track of which container is running which executor to remove the executors later
+  private val executorIdToContainer = new HashMap[String, Container]
   protected val executorMemory = args.executorMemory
   protected val executorCores = args.executorCores
@@ -111,7 +114,48 @@ private[yarn] abstract class YarnAllocator(
   def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
-  def allocateResources() = {
+  /**
+   * Request as many executors from the ResourceManager as needed to reach the desired total.
+   * This takes into account executors already running or pending.
+   */
+  def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
+    val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
+    if (requestedTotal > currentTotal) {
+      maxExecutors += (requestedTotal - currentTotal)
+      // We need to call `allocateResources` here to avoid the following race condition:
+      // If we request executors twice before `allocateResources` is called, then we will end up
+      // double counting the number requested because `numPendingAllocate` is not updated yet.
+      allocateResources()
+    } else {
+      logInfo(s"Not allocating more executors because there are already $currentTotal " +
+        s"(application requested $requestedTotal total)")
+    }
+  }
+  /**
+   * Request that the ResourceManager release the container running the specified executor.
+   */
+  def killExecutor(executorId: String): Unit = synchronized {
+    if (executorIdToContainer.contains(executorId)) {
+      val container = executorIdToContainer.remove(executorId).get
+      internalReleaseContainer(container)
+      numExecutorsRunning.decrementAndGet()
+      maxExecutors -= 1
+      assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
+    } else {
+      logWarning(s"Attempted to kill unknown executor $executorId!")
+    }
+  }
+  /**
+   * Allocate missing containers based on the number of executors currently pending and running.
+   *
+   * This method prioritizes the allocated container responses from the RM based on node and
+   * rack locality. Additionally, it releases any extra containers allocated for this application
+   * but are not needed. This must be synchronized because variables read in this block are
+   * mutated by other methods.
+   */
+  def allocateResources(): Unit = synchronized {
     val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
     // this is needed by alpha, do it here since we add numPending right after this
@@ -119,7 +163,7 @@ private[yarn] abstract class YarnAllocator(
     if (missing > 0) {
       val totalExecutorMemory = executorMemory + memoryOverhead
-      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " + 
+      logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
         s"memory including $memoryOverhead MB overhead")
     } else {
       logDebug("Empty allocation request ...")
@@ -269,6 +313,7 @@ private[yarn] abstract class YarnAllocator(
           logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+          executorIdToContainer(executorId) = container
           // To be safe, remove the container from `releasedContainers`.
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 59b2b47aed2fe07680ea0ce4497b7b222a2afcaf..f6f6dc52433e5a681d2b1b5b81ee0ac445bd03d3 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -17,27 +17,23 @@
 package org.apache.spark.scheduler.cluster
+import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
 import org.apache.spark.{SparkException, Logging, SparkContext}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments}
 import org.apache.spark.scheduler.TaskSchedulerImpl
-import scala.collection.mutable.ArrayBuffer
 private[spark] class YarnClientSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  extends YarnSchedulerBackend(scheduler, sc)
   with Logging {
-  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
-    minRegisteredRatio = 0.8
-  }
   private var client: Client = null
   private var appId: ApplicationId = null
   private var stopping: Boolean = false
-  private var totalExpectedExecutors = 0
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -151,14 +147,11 @@ private[spark] class YarnClientSchedulerBackend(
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
-  }
-  override def applicationId(): String =
+  override def applicationId(): String = {
     Option(appId).map(_.toString).getOrElse {
       logWarning("Application ID is not initialized yet.")
+  }
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 3a186cfeb4eeb572245cbfa3cc2f2025966d4215..a96a54f66824c015814f2f16174bcbfeee2165e2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -25,13 +25,7 @@ import org.apache.spark.util.IntParam
 private[spark] class YarnClusterSchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
-  var totalExpectedExecutors = 0
-  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
-    minRegisteredRatio = 0.8
-  }
+  extends YarnSchedulerBackend(scheduler, sc) {
   override def start() {
@@ -44,10 +38,6 @@ private[spark] class YarnClusterSchedulerBackend(
     totalExpectedExecutors = sc.getConf.getInt("spark.executor.instances", totalExpectedExecutors)
-  override def sufficientResourcesRegistered(): Boolean = {
-    totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
-  }
   override def applicationId(): String =
     // In YARN Cluster mode, spark.yarn.app.id is expect to be set
     // before user application is launched.