diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index fc18e5f065a04bf3a2187cfd32d64ef47b4deac0..ce68c0968fb60b2a507125b07b02a6fbde06940e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -242,12 +242,12 @@ class StreamExecution(
     // method. See SPARK-14131.
     //
     // Check to see what new data is available.
-    val newData = microBatchThread.runUninterruptibly {
-      uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
-    }
-    availableOffsets ++= newData
-
     val hasNewData = awaitBatchLock.synchronized {
+      val newData = microBatchThread.runUninterruptibly {
+        uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
+      }
+      availableOffsets ++= newData
+
       if (dataAvailable) {
         true
       } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
index 1f2834054519bdfd86e39209e5a482c996c49bec..74ca3977d6635498941cae3e100c5b8728390990 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/MemorySinkSuite.scala
@@ -26,6 +26,15 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
   import testImplicits._
 
   test("registering as a table") {
+    testRegisterAsTable()
+  }
+
+  ignore("stress test") {
+    // Ignore the stress test as it takes several minutes to run
+    (0 until 1000).foreach(_ => testRegisterAsTable())
+  }
+
+  private def testRegisterAsTable(): Unit = {
     val input = MemoryStream[Int]
     val query = input.toDF().write
       .format("memory")