diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index b49e5790711cdfbe80efbba2e913ef8bc3b8b327..1d2bf35a6d458abb7c206785ffb6c9af16b40b70 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -36,11 +36,11 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 
   private val batchDurationMillis = 1000L
   private var allocationClient: ExecutorAllocationClient = null
-  private var clock: ManualClock = null
+  private var clock: StreamManualClock = null
 
   before {
     allocationClient = mock[ExecutorAllocationClient]
-    clock = new ManualClock()
+    clock = new StreamManualClock()
   }
 
   test("basic functionality") {
@@ -57,10 +57,14 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
         reset(allocationClient)
         when(allocationClient.getExecutorIds()).thenReturn(Seq("1", "2"))
         addBatchProcTime(allocationManager, batchProcTimeMs.toLong)
-        clock.advance(SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1)
+        val advancedTime = SCALING_INTERVAL_DEFAULT_SECS * 1000 + 1
+        val expectedWaitTime = clock.getTimeMillis() + advancedTime
+        clock.advance(advancedTime)
+        // Make sure ExecutorAllocationManager.manageAllocation is called
         eventually(timeout(10 seconds)) {
-          body
+          assert(clock.isStreamWaitingAt(expectedWaitTime))
         }
+        body
       }
 
       /** Verify that the expected number of total executor were requested */
@@ -394,3 +398,27 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
     }
   }
 }
+
+/**
+ * A special manual clock that provide `isStreamWaitingAt` to allow the user to check if the clock
+ * is blocking.
+ */
+class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
+  private var waitStartTime: Option[Long] = None
+
+  override def waitTillTime(targetTime: Long): Long = synchronized {
+    try {
+      waitStartTime = Some(getTimeMillis())
+      super.waitTillTime(targetTime)
+    } finally {
+      waitStartTime = None
+    }
+  }
+
+  /**
+   * Returns if the clock is blocking and the time it started to block is the parameter `time`.
+   */
+  def isStreamWaitingAt(time: Long): Boolean = synchronized {
+    waitStartTime == Some(time)
+  }
+}