diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ccba3ed9e643c7f05a8bd57b218a80a9a7169e41..a6857b4c7d8828026224dad3bf83da7db9cbcd4c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -269,7 +269,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       conf: SparkConf,
       isLocal: Boolean,
       listenerBus: LiveListenerBus): SparkEnv = {
-    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
+    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
   }
 
   private[spark] def env: SparkEnv = _env
@@ -2560,6 +2560,21 @@ object SparkContext extends Logging {
     res
   }
 
+  /**
+   * The number of driver cores to use for execution in local mode, 0 otherwise.
+   */
+  private[spark] def numDriverCores(master: String): Int = {
+    def convertToInt(threads: String): Int = {
+      if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
+    }
+    master match {
+      case "local" => 1
+      case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
+      case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
+      case _ => 0 // driver is not used for execution
+    }
+  }
+
   /**
    * Create a task scheduler based on a given master URL.
    * Return a 2-tuple of the scheduler backend and the task scheduler.
@@ -2567,18 +2582,7 @@ object SparkContext extends Logging {
   private def createTaskScheduler(
       sc: SparkContext,
       master: String): (SchedulerBackend, TaskScheduler) = {
-    // Regular expression used for local[N] and local[*] master formats
-    val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
-    // Regular expression for local[N, maxRetries], used in tests with failing tasks
-    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
-    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
-    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-    // Regular expression for connecting to Spark deploy clusters
-    val SPARK_REGEX = """spark://(.*)""".r
-    // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
-    val MESOS_REGEX = """(mesos|zk)://.*""".r
-    // Regular expression for connection to Simr cluster
-    val SIMR_REGEX = """simr://(.*)""".r
+    import SparkMasterRegex._
 
     // When running locally, don't try to re-execute tasks on failure.
     val MAX_LOCAL_TASK_FAILURES = 1
@@ -2719,6 +2723,24 @@ object SparkContext extends Logging {
   }
 }
 
+/**
+ * A collection of regexes for extracting information from the master string.
+ */
+private object SparkMasterRegex {
+  // Regular expression used for local[N] and local[*] master formats
+  val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
+  // Regular expression for local[N, maxRetries], used in tests with failing tasks
+  val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
+  // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
+  val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
+  // Regular expression for connecting to Spark deploy clusters
+  val SPARK_REGEX = """spark://(.*)""".r
+  // Regular expression for connection to Mesos cluster by mesos:// or zk:// url
+  val MESOS_REGEX = """(mesos|zk)://.*""".r
+  // Regular expression for connection to Simr cluster
+  val SIMR_REGEX = """simr://(.*)""".r
+}
+
 /**
  * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
  * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 704158bfc7643231bd52cdd2d84bffc6b8794fd1..b5c35c569e45fc7936749732f35188786b30e919 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -190,6 +190,7 @@ object SparkEnv extends Logging {
       conf: SparkConf,
       isLocal: Boolean,
       listenerBus: LiveListenerBus,
+      numCores: Int,
       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
     assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
     assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
@@ -202,6 +203,7 @@ object SparkEnv extends Logging {
       port,
       isDriver = true,
       isLocal = isLocal,
+      numUsableCores = numCores,
       listenerBus = listenerBus,
       mockOutputCommitCoordinator = mockOutputCommitCoordinator
     )
@@ -241,8 +243,8 @@ object SparkEnv extends Logging {
       port: Int,
       isDriver: Boolean,
       isLocal: Boolean,
+      numUsableCores: Int,
       listenerBus: LiveListenerBus = null,
-      numUsableCores: Int = 0,
       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
 
     // Listener bus is only used on the driver
diff --git a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 6d08d7c5b7d2a11ec462f42f85b785e8484eb5b6..48456a9cd6e7b41db998bb1c1919ffac29ad912a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -87,7 +87,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
         outputCommitCoordinator = spy(new OutputCommitCoordinator(conf, isDriver = true))
         // Use Mockito.spy() to maintain the default infrastructure everywhere else.
         // This mocking allows us to control the coordinator responses in test cases.
-        SparkEnv.createDriverEnv(conf, isLocal, listenerBus, Some(outputCommitCoordinator))
+        SparkEnv.createDriverEnv(conf, isLocal, listenerBus,
+          SparkContext.numDriverCores(master), Some(outputCommitCoordinator))
       }
     }
     // Use Mockito.spy() to maintain the default infrastructure everywhere else