diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a1fa266e183e18e489bc324235d54c4784982717..0e8b735b923bb240dbbea11497f02feaf1f89e6b 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -244,7 +244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private[spark] def eventLogDir: Option[URI] = _eventLogDir
   private[spark] def eventLogCodec: Option[String] = _eventLogCodec
 
-  def isLocal: Boolean = (master == "local" || master.startsWith("local["))
+  def isLocal: Boolean = Utils.isLocalMaster(_conf)
 
   /**
    * @return true if context is stopped or in the midst of stopping.
@@ -526,10 +526,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
     // Optionally scale number of executors dynamically based on workload. Exposed for testing.
     val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
-    if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
-      logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
-    }
-
     _executorAllocationManager =
       if (dynamicAllocationEnabled) {
         Some(new ExecutorAllocationManager(this, listenerBus, _conf))
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index e0c9bf02a1a206772e1eba585e04762afb6010ff..6103a10ccc50e503af4d80c3b8257714178f609b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2195,6 +2195,16 @@ private[spark] object Utils extends Logging {
     isInDirectory(parent, child.getParentFile)
   }
 
+
+  /**
+   *
+   * @return whether it is local mode
+   */
+  def isLocalMaster(conf: SparkConf): Boolean = {
+    val master = conf.get("spark.master", "")
+    master == "local" || master.startsWith("local[")
+  }
+
   /**
    * Return whether dynamic allocation is enabled in the given conf
    * Dynamic allocation and explicitly setting the number of executors are inherently
@@ -2202,8 +2212,13 @@ private[spark] object Utils extends Logging {
    * the latter should override the former (SPARK-9092).
    */
   def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
-    conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
-      conf.getInt("spark.executor.instances", 0) == 0
+    val numExecutor = conf.getInt("spark.executor.instances", 0)
+    val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
+    if (numExecutor != 0 && dynamicAllocationEnabled) {
+      logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
+    }
+    numExecutor == 0 && dynamicAllocationEnabled &&
+      (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false))
   }
 
   def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = {
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 7c6778b065467089631412bf8e26bd24b90541cf..412c0ac9d9be3c50f44909b126b22ed44b020afd 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -722,6 +722,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
 
   test("isDynamicAllocationEnabled") {
     val conf = new SparkConf()
+    conf.set("spark.master", "yarn-client")
     assert(Utils.isDynamicAllocationEnabled(conf) === false)
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.dynamicAllocation.enabled", "false")) === false)
@@ -731,6 +732,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
       conf.set("spark.executor.instances", "1")) === false)
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.executor.instances", "0")) === true)
+    assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false)
+    assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true")))
   }
 
   test("encodeFileNameToURIRawPath") {