diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 43dd4a170731dd729ec67be80f5d37f7c184d596..ee60d697d8799261cca27a943fce0f5db254d997 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -177,16 +177,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
           s"${now - lastSeenMs} ms exceeds timeout $executorTimeoutMs ms")
         scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
           s"timed out after ${now - lastSeenMs} ms"))
-        if (sc.supportDynamicAllocation) {
           // Asynchronously kill the executor to avoid blocking the current thread
-          killExecutorThread.submit(new Runnable {
-            override def run(): Unit = Utils.tryLogNonFatalError {
-              // Note: we want to get an executor back after expiring this one,
-              // so do not simply call `sc.killExecutor` here (SPARK-8119)
-              sc.killAndReplaceExecutor(executorId)
-            }
-          })
-        }
+        killExecutorThread.submit(new Runnable {
+          override def run(): Unit = Utils.tryLogNonFatalError {
+            // Note: we want to get an executor back after expiring this one,
+            // so do not simply call `sc.killExecutor` here (SPARK-8119)
+            sc.killAndReplaceExecutor(executorId)
+          }
+        })
         executorLastSeen.remove(executorId)
       }
     }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2d8aa25d81daa577c78ba7139c0dd5062f72d3ca..a1c66ef4fc5ea783303f31b3955c68484225232d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -531,8 +531,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
     _executorAllocationManager =
       if (dynamicAllocationEnabled) {
-        assert(supportDynamicAllocation,
-          "Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
         Some(new ExecutorAllocationManager(this, listenerBus, _conf))
       } else {
         None
@@ -1361,17 +1359,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     postEnvironmentUpdate()
   }
 
-  /**
-   * Return whether dynamically adjusting the amount of resources allocated to
-   * this application is supported. This is currently only available for YARN
-   * and Mesos coarse-grained mode.
-   */
-  private[spark] def supportDynamicAllocation: Boolean = {
-    (master.contains("yarn")
-      || master.contains("mesos")
-      || _conf.getBoolean("spark.dynamicAllocation.testing", false))
-  }
-
   /**
    * :: DeveloperApi ::
    * Register a listener to receive up-calls from events that happen during execution.
@@ -1400,8 +1387,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       localityAwareTasks: Int,
       hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]
     ): Boolean = {
-    assert(supportDynamicAllocation,
-      "Requesting executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount)
@@ -1414,12 +1399,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   /**
    * :: DeveloperApi ::
    * Request an additional number of executors from the cluster manager.
-   * This is currently only supported in YARN mode. Return whether the request is received.
+   * @return whether the request is received.
    */
   @DeveloperApi
   override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
-    assert(supportDynamicAllocation,
-      "Requesting executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.requestExecutors(numAdditionalExecutors)
@@ -1438,12 +1421,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * through this method with new ones, it should follow up explicitly with a call to
    * {{SparkContext#requestExecutors}}.
    *
-   * This is currently only supported in YARN mode. Return whether the request is received.
+   * @return whether the request is received.
    */
   @DeveloperApi
   override def killExecutors(executorIds: Seq[String]): Boolean = {
-    assert(supportDynamicAllocation,
-      "Killing executors is currently only supported in YARN and Mesos modes")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
         b.killExecutors(executorIds)
@@ -1462,7 +1443,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * through this method with a new one, it should follow up explicitly with a call to
    * {{SparkContext#requestExecutors}}.
    *
-   * This is currently only supported in YARN mode. Return whether the request is received.
+   * @return whether the request is received.
    */
   @DeveloperApi
   override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId)
@@ -1479,7 +1460,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * can steal the window of opportunity and acquire this application's resources in the
    * mean time.
    *
-   * This is currently only supported in YARN mode. Return whether the request is received.
+   * @return whether the request is received.
    */
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 12727de9b4cf332917677e5856050dc7404ef435..d8084a57658ad08aafdba54a81753682a9865548 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -107,6 +107,10 @@ private[deploy] object DeployMessages {
 
   case class MasterChangeAcknowledged(appId: String)
 
+  case class RequestExecutors(appId: String, requestedTotal: Int)
+
+  case class KillExecutors(appId: String, executorIds: Seq[String])
+
   // Master to AppClient
 
   case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index 09973a0a2c9980008cb87d19e2109ab532fc865d..4089c3e771fa86e3f87c214c06f5e9797201d1f6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -70,6 +70,11 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
     server = transportContext.createServer(port, bootstraps)
   }
 
+  /** Clean up all shuffle files associated with an application that has exited. */
+  def applicationRemoved(appId: String): Unit = {
+    blockHandler.applicationRemoved(appId, true /* cleanupLocalDirs */)
+  }
+
   def stop() {
     if (server != null) {
       server.close()
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index a659abf70395d49d91a79b822787e351694d27b9..7576a2985ee7be23e9a38a5584be4e4791b260e1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -197,6 +197,22 @@ private[spark] class AppClient(
         sendToMaster(UnregisterApplication(appId))
         context.reply(true)
         stop()
+
+      case r: RequestExecutors =>
+        master match {
+          case Some(m) => context.reply(m.askWithRetry[Boolean](r))
+          case None =>
+            logWarning("Attempted to request executors before registering with Master.")
+            context.reply(false)
+        }
+
+      case k: KillExecutors =>
+        master match {
+          case Some(m) => context.reply(m.askWithRetry[Boolean](k))
+          case None =>
+            logWarning("Attempted to kill executors before registering with Master.")
+            context.reply(false)
+        }
     }
 
     override def onDisconnected(address: RpcAddress): Unit = {
@@ -257,4 +273,33 @@ private[spark] class AppClient(
       endpoint = null
     }
   }
+
+  /**
+   * Request executors from the Master by specifying the total number desired,
+   * including existing pending and running executors.
+   *
+   * @return whether the request is acknowledged.
+   */
+  def requestTotalExecutors(requestedTotal: Int): Boolean = {
+    if (endpoint != null && appId != null) {
+      endpoint.askWithRetry[Boolean](RequestExecutors(appId, requestedTotal))
+    } else {
+      logWarning("Attempted to request executors before driver fully initialized.")
+      false
+    }
+  }
+
+  /**
+   * Kill the given list of executors through the Master.
+   * @return whether the kill request is acknowledged.
+   */
+  def killExecutors(executorIds: Seq[String]): Boolean = {
+    if (endpoint != null && appId != null) {
+      endpoint.askWithRetry[Boolean](KillExecutors(appId, executorIds))
+    } else {
+      logWarning("Attempted to kill executors before driver fully initialized.")
+      false
+    }
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index aa54ed9360f367db7de30962e8df66fc2b937763..b40d20f9f78683dc01be970617b9d8c379b8aa1b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -22,7 +22,6 @@ import java.util.Date
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.deploy.ApplicationDescription
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
@@ -43,6 +42,11 @@ private[spark] class ApplicationInfo(
   @transient var endTime: Long = _
   @transient var appSource: ApplicationSource = _
 
+  // A cap on the number of executors this application can have at any given time.
+  // By default, this is infinite. Only after the first allocation request is issued by the
+  // application will this be set to a finite value. This is used for dynamic allocation.
+  @transient private[master] var executorLimit: Int = _
+
   @transient private var nextExecutorId: Int = _
 
   init()
@@ -60,6 +64,7 @@ private[spark] class ApplicationInfo(
     appSource = new ApplicationSource(this)
     nextExecutorId = 0
     removedExecutors = new ArrayBuffer[ExecutorDesc]
+    executorLimit = Integer.MAX_VALUE
   }
 
   private def newExecutorId(useID: Option[Int] = None): Int = {
@@ -116,6 +121,12 @@ private[spark] class ApplicationInfo(
     state != ApplicationState.WAITING && state != ApplicationState.RUNNING
   }
 
+  /**
+   * Return the limit on the number of executors this application can have.
+   * For testing only.
+   */
+  private[deploy] def getExecutorLimit: Int = executorLimit
+
   def duration: Long = {
     if (endTime != -1) {
       endTime - startTime
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 51b3f0dead73e24c59a5fad04abe3b455f69deea..e38e437fe1c5a0638dcbed746e23468653085726 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -45,7 +45,7 @@ import org.apache.spark.serializer.{JavaSerializer, Serializer}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
 
-private[master] class Master(
+private[deploy] class Master(
     override val rpcEnv: RpcEnv,
     address: RpcAddress,
     webUiPort: Int,
@@ -468,6 +468,13 @@ private[master] class Master(
     case BoundPortsRequest => {
       context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
     }
+
+    case RequestExecutors(appId, requestedTotal) =>
+      context.reply(handleRequestExecutors(appId, requestedTotal))
+
+    case KillExecutors(appId, executorIds) =>
+      val formattedExecutorIds = formatExecutorIds(executorIds)
+      context.reply(handleKillExecutors(appId, formattedExecutorIds))
   }
 
   override def onDisconnected(address: RpcAddress): Unit = {
@@ -563,32 +570,49 @@ private[master] class Master(
       app: ApplicationInfo,
       usableWorkers: Array[WorkerInfo],
       spreadOutApps: Boolean): Array[Int] = {
-    // If the number of cores per executor is not specified, then we can just schedule
-    // 1 core at a time since we expect a single executor to be launched on each worker
-    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
+    val coresPerExecutor = app.desc.coresPerExecutor
+    val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
+    val oneExecutorPerWorker = coresPerExecutor.isEmpty
     val memoryPerExecutor = app.desc.memoryPerExecutorMB
     val numUsable = usableWorkers.length
     val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
-    val assignedMemory = new Array[Int](numUsable) // Amount of memory to give to each worker
+    val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
     var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
-    var freeWorkers = (0 until numUsable).toIndexedSeq
 
+    /** Return whether the specified worker can launch an executor for this app. */
     def canLaunchExecutor(pos: Int): Boolean = {
-      usableWorkers(pos).coresFree - assignedCores(pos) >= coresPerExecutor &&
-      usableWorkers(pos).memoryFree - assignedMemory(pos) >= memoryPerExecutor
+      // If we allow multiple executors per worker, then we can always launch new executors.
+      // Otherwise, we may have already started assigning cores to the executor on this worker.
+      val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
+      val underLimit =
+        if (launchingNewExecutor) {
+          assignedExecutors.sum + app.executors.size < app.executorLimit
+        } else {
+          true
+        }
+      val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+      usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
+      usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
+      coresToAssign >= minCoresPerExecutor &&
+      underLimit
     }
 
-    while (coresToAssign >= coresPerExecutor && freeWorkers.nonEmpty) {
-      freeWorkers = freeWorkers.filter(canLaunchExecutor)
+    // Keep launching executors until no more workers can accommodate any
+    // more executors, or if we have reached this application's limits
+    var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
+    while (freeWorkers.nonEmpty) {
       freeWorkers.foreach { pos =>
         var keepScheduling = true
-        while (keepScheduling && canLaunchExecutor(pos) && coresToAssign >= coresPerExecutor) {
-          coresToAssign -= coresPerExecutor
-          assignedCores(pos) += coresPerExecutor
-          // If cores per executor is not set, we are assigning 1 core at a time
-          // without actually meaning to launch 1 executor for each core assigned
-          if (app.desc.coresPerExecutor.isDefined) {
-            assignedMemory(pos) += memoryPerExecutor
+        while (keepScheduling && canLaunchExecutor(pos)) {
+          coresToAssign -= minCoresPerExecutor
+          assignedCores(pos) += minCoresPerExecutor
+
+          // If we are launching one executor per worker, then every iteration assigns 1 core
+          // to the executor. Otherwise, every iteration assigns cores to a new executor.
+          if (oneExecutorPerWorker) {
+            assignedExecutors(pos) = 1
+          } else {
+            assignedExecutors(pos) += 1
           }
 
           // Spreading out an application means spreading out its executors across as
@@ -600,6 +624,7 @@ private[master] class Master(
           }
         }
       }
+      freeWorkers = freeWorkers.filter(canLaunchExecutor)
     }
     assignedCores
   }
@@ -785,9 +810,7 @@ private[master] class Master(
       rebuildSparkUI(app)
 
       for (exec <- app.executors.values) {
-        exec.worker.removeExecutor(exec)
-        exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
-        exec.state = ExecutorState.KILLED
+        killExecutor(exec)
       }
       app.markFinished(state)
       if (state != ApplicationState.FINISHED) {
@@ -803,6 +826,87 @@ private[master] class Master(
     }
   }
 
+  /**
+   * Handle a request to set the target number of executors for this application.
+   *
+   * If the executor limit is adjusted upwards, new executors will be launched provided
+   * that there are workers with sufficient resources. If it is adjusted downwards, however,
+   * we do not kill existing executors until we explicitly receive a kill request.
+   *
+   * @return whether the application has previously registered with this Master.
+   */
+  private def handleRequestExecutors(appId: String, requestedTotal: Int): Boolean = {
+    idToApp.get(appId) match {
+      case Some(appInfo) =>
+        logInfo(s"Application $appId requested to set total executors to $requestedTotal.")
+        appInfo.executorLimit = requestedTotal
+        schedule()
+        true
+      case None =>
+        logWarning(s"Unknown application $appId requested $requestedTotal total executors.")
+        false
+    }
+  }
+
+  /**
+   * Handle a kill request from the given application.
+   *
+   * This method assumes the executor limit has already been adjusted downwards through
+   * a separate [[RequestExecutors]] message, such that we do not launch new executors
+   * immediately after the old ones are removed.
+   *
+   * @return whether the application has previously registered with this Master.
+   */
+  private def handleKillExecutors(appId: String, executorIds: Seq[Int]): Boolean = {
+    idToApp.get(appId) match {
+      case Some(appInfo) =>
+        logInfo(s"Application $appId requests to kill executors: " + executorIds.mkString(", "))
+        val (known, unknown) = executorIds.partition(appInfo.executors.contains)
+        known.foreach { executorId =>
+          val desc = appInfo.executors(executorId)
+          appInfo.removeExecutor(desc)
+          killExecutor(desc)
+        }
+        if (unknown.nonEmpty) {
+          logWarning(s"Application $appId attempted to kill non-existent executors: "
+            + unknown.mkString(", "))
+        }
+        schedule()
+        true
+      case None =>
+        logWarning(s"Unregistered application $appId requested us to kill executors!")
+        false
+    }
+  }
+
+  /**
+   * Cast the given executor IDs to integers and filter out the ones that fail.
+   *
+   * All executors IDs should be integers since we launched these executors. However,
+   * the kill interface on the driver side accepts arbitrary strings, so we need to
+   * handle non-integer executor IDs just to be safe.
+   */
+  private def formatExecutorIds(executorIds: Seq[String]): Seq[Int] = {
+    executorIds.flatMap { executorId =>
+      try {
+        Some(executorId.toInt)
+      } catch {
+        case e: NumberFormatException =>
+          logError(s"Encountered executor with a non-integer ID: $executorId. Ignoring")
+          None
+      }
+    }
+  }
+
+  /**
+   * Ask the worker on which the specified executor is launched to kill the executor.
+   */
+  private def killExecutor(exec: ExecutorDesc): Unit = {
+    exec.worker.removeExecutor(exec)
+    exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
+    exec.state = ExecutorState.KILLED
+  }
+
   /**
    * Rebuild a new SparkUI from the given application's event logs.
    * Return the UI if successful, else None
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 0276c24f85368e1946fb832018ed104aa6f117b4..c82a7ccab54dca5625f4f1306bbd305d3286baf7 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -40,7 +40,7 @@ import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rpc._
 import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
 
-private[worker] class Worker(
+private[deploy] class Worker(
     override val rpcEnv: RpcEnv,
     webUiPort: Int,
     cores: Int,
@@ -553,6 +553,7 @@ private[worker] class Worker(
           Utils.deleteRecursively(new File(dir))
         }
       }
+      shuffleService.applicationRemoved(id)
     }
   }
 
@@ -660,6 +661,9 @@ private[worker] class Worker(
 }
 
 private[deploy] object Worker extends Logging {
+  val SYSTEM_NAME = "sparkWorker"
+  val ENDPOINT_NAME = "Worker"
+
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
     val conf = new SparkConf
@@ -681,13 +685,12 @@ private[deploy] object Worker extends Logging {
       conf: SparkConf = new SparkConf): RpcEnv = {
 
     // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
-    val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
-    val actorName = "Worker"
+    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
     val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
-    rpcEnv.setupEndpoint(actorName, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses,
-      systemName, actorName, workDir, conf, securityMgr))
+    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
+      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr))
     rpcEnv
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index bd89160af4ffa0e5ffa1a0ba176a14afe05147ad..6acf8a9a5e9b48fb675eed487f61246efd1461ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -134,7 +134,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
         } else {
           logInfo("Registered executor: " + executorRef + " with ID " + executorId)
-          context.reply(RegisteredExecutor)
           addressToExecutorId(executorRef.address) = executorId
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
@@ -149,6 +148,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
             }
           }
+          // Note: some tests expect the reply to come after we put the executor in the map
+          context.reply(RegisteredExecutor)
           listenerBus.post(
             SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
           makeOffers()
@@ -435,7 +436,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   /**
    * Kill the given list of executors through the cluster manager.
-   * Return whether the kill request is acknowledged.
+   * @return whether the kill request is acknowledged.
    */
   protected def doKillExecutors(executorIds: Seq[String]): Boolean = false
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 687ae9620460f5dfe2c97c356432f604b083e7e9..bbe51b4a09a22feb6e5416d001b340db69d866ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -152,6 +152,34 @@ private[spark] class SparkDeploySchedulerBackend(
       super.applicationId
     }
 
+  /**
+   * Request executors from the Master by specifying the total number desired,
+   * including existing pending and running executors.
+   *
+   * @return whether the request is acknowledged.
+   */
+  protected override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+    Option(client) match {
+      case Some(c) => c.requestTotalExecutors(requestedTotal)
+      case None =>
+        logWarning("Attempted to request executors before driver fully initialized.")
+        false
+    }
+  }
+
+  /**
+   * Kill the given list of executors through the Master.
+   * @return whether the kill request is acknowledged.
+   */
+  protected override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+    Option(client) match {
+      case Some(c) => c.killExecutors(executorIds)
+      case None =>
+        logWarning("Attempted to kill executors before driver fully initialized.")
+        false
+    }
+  }
+
   private def waitForRegistration() = {
     registrationBarrier.acquire()
   }
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..08c41a897a86104d8c7724f181e8eec80b397842
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.mockito.Mockito.{mock, when}
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark._
+import org.apache.spark.deploy.master.Master
+import org.apache.spark.deploy.worker.Worker
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
+import org.apache.spark.scheduler.cluster._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
+
+/**
+ * End-to-end tests for dynamic allocation in standalone mode.
+ */
+class StandaloneDynamicAllocationSuite
+  extends SparkFunSuite
+  with LocalSparkContext
+  with BeforeAndAfterAll {
+
+  private val numWorkers = 2
+  private val conf = new SparkConf()
+  private val securityManager = new SecurityManager(conf)
+
+  private var masterRpcEnv: RpcEnv = null
+  private var workerRpcEnvs: Seq[RpcEnv] = null
+  private var master: Master = null
+  private var workers: Seq[Worker] = null
+
+  /**
+   * Start the local cluster.
+   * Note: local-cluster mode is insufficient because we want a reference to the Master.
+   */
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    masterRpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, securityManager)
+    workerRpcEnvs = (0 until numWorkers).map { i =>
+      RpcEnv.create(Worker.SYSTEM_NAME + i, "localhost", 0, conf, securityManager)
+    }
+    master = makeMaster()
+    workers = makeWorkers(10, 2048)
+  }
+
+  override def afterAll(): Unit = {
+    masterRpcEnv.shutdown()
+    workerRpcEnvs.foreach(_.shutdown())
+    master.stop()
+    workers.foreach(_.stop())
+    masterRpcEnv = null
+    workerRpcEnvs = null
+    master = null
+    workers = null
+    super.afterAll()
+  }
+
+  test("dynamic allocation default behavior") {
+    sc = new SparkContext(appConf)
+    val appId = sc.applicationId
+    assert(master.apps.size === 1)
+    assert(master.apps.head.id === appId)
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    // kill all executors
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request 1
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // request 1 more
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === 2)
+    // request 1 more; this one won't go through
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === 3)
+    // kill all existing executors; we should end up with 3 - 2 = 1 executor
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // kill all executors again; this time we'll have 1 - 1 = 0 executors left
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request many more; this increases the limit well beyond the cluster capacity
+    assert(sc.requestExecutors(1000))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === 1000)
+  }
+
+  test("dynamic allocation with max cores <= cores per worker") {
+    sc = new SparkContext(appConf.set("spark.cores.max", "8"))
+    val appId = sc.applicationId
+    assert(master.apps.size === 1)
+    assert(master.apps.head.id === appId)
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    // kill all executors
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request 1
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.executors.values.head.cores === 8)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // request 1 more; this one won't go through because we're already at max cores.
+    // This highlights a limitation of using dynamic allocation with max cores WITHOUT
+    // setting cores per executor: once an application scales down and then scales back
+    // up, its executors may not be spread out anymore!
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.getExecutorLimit === 2)
+    // request 1 more; this one also won't go through for the same reason
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.getExecutorLimit === 3)
+    // kill all existing executors; we should end up with 3 - 1 = 2 executor
+    // Note: we scheduled these executors together, so their cores should be evenly distributed
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(master.apps.head.getExecutorLimit === 2)
+    // kill all executors again; this time we'll have 1 - 1 = 0 executors left
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request many more; this increases the limit well beyond the cluster capacity
+    assert(sc.requestExecutors(1000))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(master.apps.head.getExecutorLimit === 1000)
+  }
+
+  test("dynamic allocation with max cores > cores per worker") {
+    sc = new SparkContext(appConf.set("spark.cores.max", "16"))
+    val appId = sc.applicationId
+    assert(master.apps.size === 1)
+    assert(master.apps.head.id === appId)
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    // kill all executors
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request 1
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.executors.values.head.cores === 10)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // request 1 more
+    // Note: the cores are not evenly distributed because we scheduled these executors 1 by 1
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
+    assert(master.apps.head.getExecutorLimit === 2)
+    // request 1 more; this one won't go through
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === 3)
+    // kill all existing executors; we should end up with 3 - 2 = 1 executor
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.executors.values.head.cores === 10)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // kill all executors again; this time we'll have 1 - 1 = 0 executors left
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request many more; this increases the limit well beyond the cluster capacity
+    assert(sc.requestExecutors(1000))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+    assert(master.apps.head.getExecutorLimit === 1000)
+  }
+
+  test("dynamic allocation with cores per executor") {
+    sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
+    val appId = sc.applicationId
+    assert(master.apps.size === 1)
+    assert(master.apps.head.id === appId)
+    assert(master.apps.head.executors.size === 10) // 20 cores total
+    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    // kill all executors
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request 1
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // request 3 more
+    assert(sc.requestExecutors(3))
+    assert(master.apps.head.executors.size === 4)
+    assert(master.apps.head.getExecutorLimit === 4)
+    // request 10 more; only 6 will go through
+    assert(sc.requestExecutors(10))
+    assert(master.apps.head.executors.size === 10)
+    assert(master.apps.head.getExecutorLimit === 14)
+    // kill 2 executors; we should get 2 back immediately
+    assert(killNExecutors(sc, 2))
+    assert(master.apps.head.executors.size === 10)
+    assert(master.apps.head.getExecutorLimit === 12)
+    // kill 4 executors; we should end up with 12 - 4 = 8 executors
+    assert(killNExecutors(sc, 4))
+    assert(master.apps.head.executors.size === 8)
+    assert(master.apps.head.getExecutorLimit === 8)
+    // kill all executors; this time we'll have 8 - 8 = 0 executors left
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request many more; this increases the limit well beyond the cluster capacity
+    assert(sc.requestExecutors(1000))
+    assert(master.apps.head.executors.size === 10)
+    assert(master.apps.head.getExecutorLimit === 1000)
+  }
+
+  test("dynamic allocation with cores per executor AND max cores") {
+    sc = new SparkContext(appConf
+      .set("spark.executor.cores", "2")
+      .set("spark.cores.max", "8"))
+    val appId = sc.applicationId
+    assert(master.apps.size === 1)
+    assert(master.apps.head.id === appId)
+    assert(master.apps.head.executors.size === 4) // 8 cores total
+    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    // kill all executors
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request 1
+    assert(sc.requestExecutors(1))
+    assert(master.apps.head.executors.size === 1)
+    assert(master.apps.head.getExecutorLimit === 1)
+    // request 3 more
+    assert(sc.requestExecutors(3))
+    assert(master.apps.head.executors.size === 4)
+    assert(master.apps.head.getExecutorLimit === 4)
+    // request 10 more; none will go through
+    assert(sc.requestExecutors(10))
+    assert(master.apps.head.executors.size === 4)
+    assert(master.apps.head.getExecutorLimit === 14)
+    // kill all executors; 4 executors will be launched immediately
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 4)
+    assert(master.apps.head.getExecutorLimit === 10)
+    // ... and again
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 4)
+    assert(master.apps.head.getExecutorLimit === 6)
+    // ... and again; now we end up with 6 - 4 = 2 executors left
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 2)
+    assert(master.apps.head.getExecutorLimit === 2)
+    // ... and again; this time we have 2 - 2 = 0 executors left
+    assert(killAllExecutors(sc))
+    assert(master.apps.head.executors.size === 0)
+    assert(master.apps.head.getExecutorLimit === 0)
+    // request many more; this increases the limit well beyond the cluster capacity
+    assert(sc.requestExecutors(1000))
+    assert(master.apps.head.executors.size === 4)
+    assert(master.apps.head.getExecutorLimit === 1000)
+  }
+
+  // ===============================
+  // | Utility methods for testing |
+  // ===============================
+
+  /** Return a SparkConf for applications that want to talk to our Master. */
+  private def appConf: SparkConf = {
+    new SparkConf()
+      .setMaster(masterRpcEnv.address.toSparkURL)
+      .setAppName("test")
+      .set("spark.executor.memory", "256m")
+  }
+
+  /** Make a master to which our application will send executor requests. */
+  private def makeMaster(): Master = {
+    val master = new Master(masterRpcEnv, masterRpcEnv.address, 0, securityManager, conf)
+    masterRpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    master
+  }
+
+  /** Make a few workers that talk to our master. */
+  private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
+    (0 until numWorkers).map { i =>
+      val rpcEnv = workerRpcEnvs(i)
+      val worker = new Worker(rpcEnv, 0, cores, memory, Array(masterRpcEnv.address),
+        Worker.SYSTEM_NAME + i, Worker.ENDPOINT_NAME, null, conf, securityManager)
+      rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
+      worker
+    }
+  }
+
+  /** Kill all executors belonging to this application. */
+  private def killAllExecutors(sc: SparkContext): Boolean = {
+    killNExecutors(sc, Int.MaxValue)
+  }
+
+  /** Kill N executors belonging to this application. */
+  private def killNExecutors(sc: SparkContext, n: Int): Boolean = {
+    syncExecutors(sc)
+    sc.killExecutors(getExecutorIds(sc).take(n))
+  }
+
+  /**
+   * Return a list of executor IDs belonging to this application.
+   *
+   * Note that we must use the executor IDs according to the Master, which has the most
+   * updated view. We cannot rely on the executor IDs according to the driver because we
+   * don't wait for executors to register. Otherwise the tests will take much longer to run.
+   */
+  private def getExecutorIds(sc: SparkContext): Seq[String] = {
+    assert(master.idToApp.contains(sc.applicationId))
+    master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
+  }
+
+  /**
+   * Sync executor IDs between the driver and the Master.
+   *
+   * This allows us to avoid waiting for new executors to register with the driver before
+   * we submit a request to kill them. This must be called before each kill request.
+   */
+  private def syncExecutors(sc: SparkContext): Unit = {
+    val driverExecutors = sc.getExecutorStorageStatus
+      .map(_.blockManagerId.executorId)
+      .filter { _ != SparkContext.DRIVER_IDENTIFIER}
+    val masterExecutors = getExecutorIds(sc)
+    val missingExecutors = masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
+    missingExecutors.foreach { id =>
+      // Fake an executor registration so the driver knows about us
+      val port = System.currentTimeMillis % 65536
+      val endpointRef = mock(classOf[RpcEndpointRef])
+      val mockAddress = mock(classOf[RpcAddress])
+      when(endpointRef.address).thenReturn(mockAddress)
+      val message = RegisterExecutor(id, endpointRef, s"localhost:$port", 10, Map.empty)
+      val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+      backend.driverEndpoint.askWithRetry[CoarseGrainedClusterMessage](message)
+    }
+  }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 4d7016d1e594bcaa8881512ab107cd0af4d8a24b..30780a0da7f8da51f768d3e19139e77cabb0cc4b 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -120,7 +120,7 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
     CustomRecoveryModeFactory.instantiationAttempts should be > instantiationAttempts
   }
 
-  test("Master & worker web ui available") {
+  test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
     val localCluster = new LocalSparkCluster(2, 2, 512, conf)
@@ -144,174 +144,202 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
   }
 
   test("basic scheduling - spread out") {
-    testBasicScheduling(spreadOut = true)
+    basicScheduling(spreadOut = true)
   }
 
   test("basic scheduling - no spread out") {
-    testBasicScheduling(spreadOut = false)
+    basicScheduling(spreadOut = false)
   }
 
   test("scheduling with max cores - spread out") {
-    testSchedulingWithMaxCores(spreadOut = true)
+    schedulingWithMaxCores(spreadOut = true)
   }
 
   test("scheduling with max cores - no spread out") {
-    testSchedulingWithMaxCores(spreadOut = false)
+    schedulingWithMaxCores(spreadOut = false)
   }
 
   test("scheduling with cores per executor - spread out") {
-    testSchedulingWithCoresPerExecutor(spreadOut = true)
+    schedulingWithCoresPerExecutor(spreadOut = true)
   }
 
   test("scheduling with cores per executor - no spread out") {
-    testSchedulingWithCoresPerExecutor(spreadOut = false)
+    schedulingWithCoresPerExecutor(spreadOut = false)
   }
 
   test("scheduling with cores per executor AND max cores - spread out") {
-    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
+    schedulingWithCoresPerExecutorAndMaxCores(spreadOut = true)
   }
 
   test("scheduling with cores per executor AND max cores - no spread out") {
-    testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
+    schedulingWithCoresPerExecutorAndMaxCores(spreadOut = false)
   }
 
-  private def testBasicScheduling(spreadOut: Boolean): Unit = {
+  test("scheduling with executor limit - spread out") {
+    schedulingWithExecutorLimit(spreadOut = true)
+  }
+
+  test("scheduling with executor limit - no spread out") {
+    schedulingWithExecutorLimit(spreadOut = false)
+  }
+
+  test("scheduling with executor limit AND max cores - spread out") {
+    schedulingWithExecutorLimitAndMaxCores(spreadOut = true)
+  }
+
+  test("scheduling with executor limit AND max cores - no spread out") {
+    schedulingWithExecutorLimitAndMaxCores(spreadOut = false)
+  }
+
+  test("scheduling with executor limit AND cores per executor - spread out") {
+    schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = true)
+  }
+
+  test("scheduling with executor limit AND cores per executor - no spread out") {
+    schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut = false)
+  }
+
+  test("scheduling with executor limit AND cores per executor AND max cores - spread out") {
+    schedulingWithEverything(spreadOut = true)
+  }
+
+  test("scheduling with executor limit AND cores per executor AND max cores - no spread out") {
+    schedulingWithEverything(spreadOut = false)
+  }
+
+  private def basicScheduling(spreadOut: Boolean): Unit = {
     val master = makeMaster()
     val appInfo = makeAppInfo(1024)
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    val scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 10)
-    assert(scheduledCores(1) === 10)
-    assert(scheduledCores(2) === 10)
+    val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores === Array(10, 10, 10))
   }
 
-  private def testSchedulingWithMaxCores(spreadOut: Boolean): Unit = {
+  private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
     val master = makeMaster()
     val appInfo1 = makeAppInfo(1024, maxCores = Some(8))
     val appInfo2 = makeAppInfo(1024, maxCores = Some(16))
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    var scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    // With spreading out, each worker should be assigned a few cores
-    if (spreadOut) {
-      assert(scheduledCores(0) === 3)
-      assert(scheduledCores(1) === 3)
-      assert(scheduledCores(2) === 2)
-    } else {
-      // Without spreading out, the cores should be concentrated on the first worker
-      assert(scheduledCores(0) === 8)
-      assert(scheduledCores(1) === 0)
-      assert(scheduledCores(2) === 0)
-    }
-    // Now test the same thing with max cores > cores per worker
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
     if (spreadOut) {
-      assert(scheduledCores(0) === 6)
-      assert(scheduledCores(1) === 5)
-      assert(scheduledCores(2) === 5)
+      assert(scheduledCores1 === Array(3, 3, 2))
+      assert(scheduledCores2 === Array(6, 5, 5))
     } else {
-      // Without spreading out, the first worker should be fully booked,
-      // and the leftover cores should spill over to the second worker only.
-      assert(scheduledCores(0) === 10)
-      assert(scheduledCores(1) === 6)
-      assert(scheduledCores(2) === 0)
+      assert(scheduledCores1 === Array(8, 0, 0))
+      assert(scheduledCores2 === Array(10, 6, 0))
     }
   }
 
-  private def testSchedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
+  private def schedulingWithCoresPerExecutor(spreadOut: Boolean): Unit = {
     val master = makeMaster()
     val appInfo1 = makeAppInfo(1024, coresPerExecutor = Some(2))
     val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2))
     val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3))
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    // Each worker should end up with 4 executors with 2 cores each
-    // This should be 4 because of the memory restriction on each worker
-    var scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 8)
-    assert(scheduledCores(1) === 8)
-    assert(scheduledCores(2) === 8)
-    // Now test the same thing without running into the worker memory limit
-    // Each worker should now end up with 5 executors with 2 cores each
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 10)
-    assert(scheduledCores(1) === 10)
-    assert(scheduledCores(2) === 10)
-    // Now test the same thing with a cores per executor that 10 is not divisible by
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
-    assert(scheduledCores(0) === 9)
-    assert(scheduledCores(1) === 9)
-    assert(scheduledCores(2) === 9)
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(8, 8, 8)) // 4 * 2 because of memory limits
+    assert(scheduledCores2 === Array(10, 10, 10)) // 5 * 2
+    assert(scheduledCores3 === Array(9, 9, 9)) // 3 * 3
   }
 
   // Sorry for the long method name!
-  private def testSchedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
+  private def schedulingWithCoresPerExecutorAndMaxCores(spreadOut: Boolean): Unit = {
     val master = makeMaster()
     val appInfo1 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(4))
     val appInfo2 = makeAppInfo(256, coresPerExecutor = Some(2), maxCores = Some(20))
     val appInfo3 = makeAppInfo(256, coresPerExecutor = Some(3), maxCores = Some(20))
-    val workerInfo = makeWorkerInfo(4096, 10)
-    val workerInfos = Array(workerInfo, workerInfo, workerInfo)
-    // We should only launch two executors, each with exactly 2 cores
-    var scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo1, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo1, workerInfos, spreadOut)
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo2, workerInfos, spreadOut)
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo3, workerInfos, spreadOut)
+    if (spreadOut) {
+      assert(scheduledCores1 === Array(2, 2, 0))
+      assert(scheduledCores2 === Array(8, 6, 6))
+      assert(scheduledCores3 === Array(6, 6, 6))
+    } else {
+      assert(scheduledCores1 === Array(4, 0, 0))
+      assert(scheduledCores2 === Array(10, 10, 0))
+      assert(scheduledCores3 === Array(9, 9, 0))
+    }
+  }
+
+  private def schedulingWithExecutorLimit(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(256)
+    appInfo.executorLimit = 0
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 2
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 5
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
+    assert(scheduledCores2 === Array(10, 10, 0))
+    assert(scheduledCores3 === Array(10, 10, 10))
+  }
+
+  private def schedulingWithExecutorLimitAndMaxCores(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(256, maxCores = Some(16))
+    appInfo.executorLimit = 0
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 2
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 5
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
     if (spreadOut) {
-      assert(scheduledCores(0) === 2)
-      assert(scheduledCores(1) === 2)
-      assert(scheduledCores(2) === 0)
+      assert(scheduledCores2 === Array(8, 8, 0))
+      assert(scheduledCores3 === Array(6, 5, 5))
     } else {
-      assert(scheduledCores(0) === 4)
-      assert(scheduledCores(1) === 0)
-      assert(scheduledCores(2) === 0)
+      assert(scheduledCores2 === Array(10, 6, 0))
+      assert(scheduledCores3 === Array(10, 6, 0))
     }
-    // Test max cores > number of cores per worker
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo2, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
+  }
+
+  private def schedulingWithExecutorLimitAndCoresPerExecutor(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(256, coresPerExecutor = Some(4))
+    appInfo.executorLimit = 0
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 2
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 5
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
     if (spreadOut) {
-      assert(scheduledCores(0) === 8)
-      assert(scheduledCores(1) === 6)
-      assert(scheduledCores(2) === 6)
+      assert(scheduledCores2 === Array(4, 4, 0))
     } else {
-      assert(scheduledCores(0) === 10)
-      assert(scheduledCores(1) === 10)
-      assert(scheduledCores(2) === 0)
+      assert(scheduledCores2 === Array(8, 0, 0))
     }
-    // Test max cores > number of cores per worker AND
-    // a cores per executor that is 10 is not divisible by
-    scheduledCores = master.invokePrivate(
-      _scheduleExecutorsOnWorkers(appInfo3, workerInfos, spreadOut))
-    assert(scheduledCores.length === 3)
+    assert(scheduledCores3 === Array(8, 8, 4))
+  }
+
+  // Everything being: executor limit + cores per executor + max cores
+  private def schedulingWithEverything(spreadOut: Boolean): Unit = {
+    val master = makeMaster()
+    val appInfo = makeAppInfo(256, coresPerExecutor = Some(4), maxCores = Some(18))
+    appInfo.executorLimit = 0
+    val scheduledCores1 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 2
+    val scheduledCores2 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    appInfo.executorLimit = 5
+    val scheduledCores3 = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+    assert(scheduledCores1 === Array(0, 0, 0))
     if (spreadOut) {
-      assert(scheduledCores(0) === 6)
-      assert(scheduledCores(1) === 6)
-      assert(scheduledCores(2) === 6)
+      assert(scheduledCores2 === Array(4, 4, 0))
+      assert(scheduledCores3 === Array(8, 4, 4))
     } else {
-      assert(scheduledCores(0) === 9)
-      assert(scheduledCores(1) === 9)
-      assert(scheduledCores(2) === 0)
+      assert(scheduledCores2 === Array(8, 0, 0))
+      assert(scheduledCores3 === Array(8, 8, 0))
     }
   }
 
-  // ===============================
-  // | Utility methods for testing |
-  // ===============================
+  // ==========================================
+  // | Utility methods and fields for testing |
+  // ==========================================
 
   private val _scheduleExecutorsOnWorkers = PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
+  private val workerInfo = makeWorkerInfo(4096, 10)
+  private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
 
   private def makeMaster(conf: SparkConf = new SparkConf): Master = {
     val securityMgr = new SecurityManager(conf)
@@ -335,4 +363,12 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
     new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 101, "address")
   }
 
+  private def scheduleExecutorsOnWorkers(
+      master: Master,
+      appInfo: ApplicationInfo,
+      workerInfos: Array[WorkerInfo],
+      spreadOut: Boolean): Array[Int] = {
+    master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, spreadOut))
+  }
+
 }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index fa36629c37a35e453243ce754e2e4f9fc96a5f65..f9384c4c3c9d64a0a73e697b0d326f5bba3bea6d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -151,6 +151,10 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
+          ) ++ Seq(
+            // SPARK-4751 Dynamic allocation for standalone mode
+            ProblemFilters.exclude[MissingMethodProblem](
+              "org.apache.spark.SparkContext.supportDynamicAllocation")
           )
 
         case v if v.startsWith("1.4") =>