diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 2f4b498b3ca74998a7efeac457b638d5f88a4893..0b66d1cf08eac468c68a23f52fb5b7d2d7b3b01a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -496,7 +496,6 @@ private[yarn] class YarnAllocator(
 
       def updateInternalState(): Unit = synchronized {
         numExecutorsRunning += 1
-        assert(numExecutorsRunning <= targetNumExecutors)
         executorIdToContainer(executorId) = container
         containerIdToExecutorId(container.getId) = executorId
 
@@ -506,36 +505,41 @@ private[yarn] class YarnAllocator(
         allocatedContainerToHostMap.put(containerId, executorHostname)
       }
 
-      if (launchContainers) {
-        launcherPool.execute(new Runnable {
-          override def run(): Unit = {
-            try {
-              new ExecutorRunnable(
-                Some(container),
-                conf,
-                sparkConf,
-                driverUrl,
-                executorId,
-                executorHostname,
-                executorMemory,
-                executorCores,
-                appAttemptId.getApplicationId.toString,
-                securityMgr,
-                localResources
-              ).run()
-              updateInternalState()
-            } catch {
-              case NonFatal(e) =>
-                logError(s"Failed to launch executor $executorId on container $containerId", e)
-                // Assigned container should be released immediately to avoid unnecessary resource
-                // occupation.
-                amClient.releaseAssignedContainer(containerId)
+      if (numExecutorsRunning < targetNumExecutors) {
+        if (launchContainers) {
+          launcherPool.execute(new Runnable {
+            override def run(): Unit = {
+              try {
+                new ExecutorRunnable(
+                  Some(container),
+                  conf,
+                  sparkConf,
+                  driverUrl,
+                  executorId,
+                  executorHostname,
+                  executorMemory,
+                  executorCores,
+                  appAttemptId.getApplicationId.toString,
+                  securityMgr,
+                  localResources
+                ).run()
+                updateInternalState()
+              } catch {
+                case NonFatal(e) =>
+                  logError(s"Failed to launch executor $executorId on container $containerId", e)
+                  // Assigned container should be released immediately to avoid unnecessary resource
+                  // occupation.
+                  amClient.releaseAssignedContainer(containerId)
+              }
             }
-          }
-        })
+          })
+        } else {
+          // For test only
+          updateInternalState()
+        }
       } else {
-        // For test only
-        updateInternalState()
+        logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
+          "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
       }
     }
   }
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 696e552c35d12eb27c8e9f5b5f3c2ab74948b28a..994dc75d34c304c9e31c54f89f26343a9c2c2026 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -136,6 +136,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     size should be (0)
   }
 
+  test("container should not be created if requested number if met") {
+    // request a single container and receive it
+    val handler = createAllocator(1)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be (0)
+    handler.getPendingAllocate.size should be (1)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+
+    handler.getNumExecutorsRunning should be (1)
+    handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+    handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+
+    val container2 = createContainer("host2")
+    handler.handleAllocatedContainers(Array(container2))
+    handler.getNumExecutorsRunning should be (1)
+  }
+
   test("some containers allocated") {
     // request a few containers and receive some of them
     val handler = createAllocator(4)