diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index b7ae9c1fc0a235248b7eb2d1cee3ff15c6dd21be..ae99432f5ce869e5059e545a239a609f3a9cd10a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,12 +22,13 @@ import java.net.URI
 private[spark] class ApplicationDescription(
     val name: String,
     val maxCores: Option[Int],
-    val memoryPerSlave: Int,
+    val memoryPerExecutorMB: Int,
     val command: Command,
     var appUiUrl: String,
     val eventLogDir: Option[URI] = None,
     // short name of compression codec used when writing event logs, if any (e.g. lzf)
-    val eventLogCodec: Option[String] = None)
+    val eventLogCodec: Option[String] = None,
+    val coresPerExecutor: Option[Int] = None)
   extends Serializable {
 
   val user = System.getProperty("user.name", "<unknown>")
@@ -35,13 +36,13 @@ private[spark] class ApplicationDescription(
   def copy(
       name: String = name,
       maxCores: Option[Int] = maxCores,
-      memoryPerSlave: Int = memoryPerSlave,
+      memoryPerExecutorMB: Int = memoryPerExecutorMB,
       command: Command = command,
       appUiUrl: String = appUiUrl,
       eventLogDir: Option[URI] = eventLogDir,
       eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
     new ApplicationDescription(
-      name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
+      name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index dfc5b97e6a6c8d98c8c1a04b6508d509d7e9ac36..2954f932b4f41e86a7d19806844200c6caf932a7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -46,7 +46,7 @@ private[deploy] object JsonProtocol {
     ("name" -> obj.desc.name) ~
     ("cores" -> obj.desc.maxCores) ~
     ("user" ->  obj.desc.user) ~
-    ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+    ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
     ("submitdate" -> obj.submitDate.toString) ~
     ("state" -> obj.state.toString) ~
     ("duration" -> obj.duration)
@@ -55,7 +55,7 @@ private[deploy] object JsonProtocol {
   def writeApplicationDescription(obj: ApplicationDescription): JObject = {
     ("name" -> obj.name) ~
     ("cores" -> obj.maxCores) ~
-    ("memoryperslave" -> obj.memoryPerSlave) ~
+    ("memoryperslave" -> obj.memoryPerExecutorMB) ~
     ("user" -> obj.user) ~
     ("command" -> obj.command.toString)
   }
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 60bc243ebf40a15c4e158024827e55b57d256052..296a0764b8baf27bfb95d06105a3cdf982430a0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -406,6 +406,8 @@ object SparkSubmit {
       OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
 
       // Other options
+      OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
+        sysProp = "spark.executor.cores"),
       OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
         sysProp = "spark.executor.memory"),
       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 03ecf3fd99ec59092c3606acd3ef445dc22106b7..faa8780288ea35ffeb49c81bd537cfeb14b99cf3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -482,10 +482,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
         | Spark standalone and Mesos only:
         |  --total-executor-cores NUM  Total cores for all executors.
         |
+        | Spark standalone and YARN only:
+        |  --executor-cores NUM        Number of cores per executor. (Default: 1 in YARN mode,
+        |                              or all available cores on the worker in standalone mode)
+        |
         | YARN-only:
         |  --driver-cores NUM          Number of cores used by the driver, only in cluster mode
         |                              (Default: 1).
-        |  --executor-cores NUM        Number of cores per executor (Default: 1).
         |  --queue QUEUE_NAME          The YARN queue to submit to (Default: "default").
         |  --num-executors NUM         Number of executors to launch (Default: 2).
         |  --archives ARCHIVES         Comma separated list of archives to be extracted into the
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bc5b293379f2b4bbab89443563c9eec1e983adfa..f59d550d4f3b3dacc78baa879d8fe2ae2ea552d3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -75,9 +75,11 @@ private[deploy] class ApplicationInfo(
     }
   }
 
-  private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): 
-  ExecutorDesc = {
-    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+  private[master] def addExecutor(
+      worker: WorkerInfo,
+      cores: Int,
+      useID: Option[Int] = None): ExecutorDesc = {
+    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
     executors(exec.id) = exec
     coresGranted += cores
     exec
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9a5d5877da86df0f8632a32706eb4c8f0acf1b5a..c5a6b1beac9becfac0efaecf5955e4a278027495 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -524,52 +524,28 @@ private[master] class Master(
   }
 
   /**
-   * Can an app use the given worker? True if the worker has enough memory and we haven't already
-   * launched an executor for the app on it (right now the standalone backend doesn't like having
-   * two executors on the same worker).
-   */
-  private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
-    worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
-  }
-
-  /**
-   * Schedule the currently available resources among waiting apps. This method will be called
-   * every time a new app joins or resource availability changes.
+   * Schedule executors to be launched on the workers.
+   *
+   * There are two modes of launching executors. The first attempts to spread out an application's
+   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
+   * on as few workers as possible). The former is usually better for data locality purposes and is
+   * the default.
+   *
+   * The number of cores assigned to each executor is configurable. When this is explicitly set,
+   * multiple executors from the same application may be launched on the same worker if the worker
+   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
+   * worker by default, in which case only one executor may be launched on each worker.
    */
-  private def schedule() {
-    if (state != RecoveryState.ALIVE) { return }
-
-    // First schedule drivers, they take strict precedence over applications
-    // Randomization helps balance drivers
-    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
-    val numWorkersAlive = shuffledAliveWorkers.size
-    var curPos = 0
-
-    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
-      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
-      // start from the last worker that was assigned a driver, and continue onwards until we have
-      // explored all alive workers.
-      var launched = false
-      var numWorkersVisited = 0
-      while (numWorkersVisited < numWorkersAlive && !launched) {
-        val worker = shuffledAliveWorkers(curPos)
-        numWorkersVisited += 1
-        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
-          launchDriver(worker, driver)
-          waitingDrivers -= driver
-          launched = true
-        }
-        curPos = (curPos + 1) % numWorkersAlive
-      }
-    }
-
+  private def startExecutorsOnWorkers(): Unit = {
     // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
     // in the queue, then the second app, etc.
     if (spreadOutApps) {
-      // Try to spread out each app among all the nodes, until it has all its cores
+      // Try to spread out each app among all the workers, until it has all its cores
       for (app <- waitingApps if app.coresLeft > 0) {
         val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
-          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+            worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
+          .sortBy(_.coresFree).reverse
         val numUsable = usableWorkers.length
         val assigned = new Array[Int](numUsable) // Number of cores to give on each node
         var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -582,32 +558,61 @@ private[master] class Master(
           pos = (pos + 1) % numUsable
         }
         // Now that we've decided how many cores to give on each node, let's actually give them
-        for (pos <- 0 until numUsable) {
-          if (assigned(pos) > 0) {
-            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
-            launchExecutor(usableWorkers(pos), exec)
-            app.state = ApplicationState.RUNNING
-          }
+        for (pos <- 0 until numUsable if assigned(pos) > 0) {
+          allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
         }
       }
     } else {
-      // Pack each app into as few nodes as possible until we've assigned all its cores
+      // Pack each app into as few workers as possible until we've assigned all its cores
       for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
         for (app <- waitingApps if app.coresLeft > 0) {
-          if (canUse(app, worker)) {
-            val coresToUse = math.min(worker.coresFree, app.coresLeft)
-            if (coresToUse > 0) {
-              val exec = app.addExecutor(worker, coresToUse)
-              launchExecutor(worker, exec)
-              app.state = ApplicationState.RUNNING
-            }
-          }
+          allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
+        }
+      }
+    }
+  }
+
+  /**
+   * Allocate a worker's resources to one or more executors.
+   * @param app the info of the application which the executors belong to
+   * @param coresToAllocate cores on this worker to be allocated to this application
+   * @param worker the worker info
+   */
+  private def allocateWorkerResourceToExecutors(
+      app: ApplicationInfo,
+      coresToAllocate: Int,
+      worker: WorkerInfo): Unit = {
+    val memoryPerExecutor = app.desc.memoryPerExecutorMB
+    val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
+    var coresLeft = coresToAllocate
+    while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
+      val exec = app.addExecutor(worker, coresPerExecutor)
+      coresLeft -= coresPerExecutor
+      launchExecutor(worker, exec)
+      app.state = ApplicationState.RUNNING
+    }
+  }
+
+  /**
+   * Schedule the currently available resources among waiting apps. This method will be called
+   * every time a new app joins or resource availability changes.
+   */
+  private def schedule(): Unit = {
+    if (state != RecoveryState.ALIVE) { return }
+    // Drivers take strict precedence over executors
+    val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
+    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
+      for (driver <- waitingDrivers) {
+        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
+          launchDriver(worker, driver)
+          waitingDrivers -= driver
         }
       }
     }
+    startExecutorsOnWorkers()
   }
 
-  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
+  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
     worker.addExecutor(exec)
     worker.actor ! LaunchExecutor(masterUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 761aa8f7b1ef62c8633325a0522a1a3b7e5ea46d..273f077bd8f57fb2407fb16343f5a94fe67ed234 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -94,7 +94,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
             </li>
             <li>
               <strong>Executor Memory:</strong>
-              {Utils.megabytesToString(app.desc.memoryPerSlave)}
+              {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
             </li>
             <li><strong>Submit Date:</strong> {app.submitDate}</li>
             <li><strong>State:</strong> {app.state}</li>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 45412a35e9a7d6478deee34ac885d1ae2ac2d45f..399f07399a0aae607c3f5c30d8f0fbe7da153391 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -208,8 +208,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
       <td>
         {app.coresGranted}
       </td>
-      <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
-        {Utils.megabytesToString(app.desc.memoryPerSlave)}
+      <td sorttable_customkey={app.desc.memoryPerExecutorMB.toString}>
+        {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
       </td>
       <td>{UIUtils.formatDate(app.submitDate)}</td>
       <td>{app.desc.user}</td>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb3fdc19b5b8f0ba6f51e3e2a134669e95c9ff6..ed5b7c1088196ae696ba9378bf7484a854e97646 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -82,12 +82,11 @@ private[spark] class SparkDeploySchedulerBackend(
     val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
     val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
-    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
-      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
-
+    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
+    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
+      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
-
     waitForRegistration()
   }
 
diff --git a/docs/configuration.md b/docs/configuration.md
index 7169ec295ef7f40e7d695b721d5afaa98f496747..d9e9e67026cbbde1f6cefbe2c3b29f3b6053ff83 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -723,6 +723,17 @@ Apart from these, the following properties are also available, and may be useful
     this duration will be cleared as well.
   </td>
 </tr>
+<tr>
+  <td><code>spark.executor.cores</code></td>
+  <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
+  <td>
+    The number of cores to use on each executor. For YARN and standalone mode only.
+    
+    In standalone mode, setting this parameter allows an application to run multiple executors on 
+    the same worker, provided that there are enough cores on that worker. Otherwise, only one 
+    executor per application will run on each worker.
+  </td>
+</tr>
 <tr>
   <td><code>spark.default.parallelism</code></td>
   <td>