Skip to content
Snippets Groups Projects
Commit a35a67a8 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Michael Armbrust
Browse files

[SPARK-14579][SQL] Fix the race condition in StreamExecution.processAllAvailable again

## What changes were proposed in this pull request?

#12339 didn't fix the race condition. MemorySinkSuite is still flaky: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-maven-hadoop-2.2/814/testReport/junit/org.apache.spark.sql.streaming/MemorySinkSuite/registering_as_a_table/

Here is an execution order to reproduce it.

| Time        |Thread 1           | MicroBatchThread  |
|:-------------:|:-------------:|:-----:|
| 1 | |  `MemorySink.getOffset` |
| 2 | |  availableOffsets ++= newData (availableOffsets is not changed here)  |
| 3 | addData(newData)      |   |
| 4 | Set `noNewData` to `false` in  processAllAvailable |  |
| 5 | | `dataAvailable` returns `false`   |
| 6 | | noNewData = true |
| 7 | `noNewData` is true so just return | |
| 8 |  assert results and fail | |
| 9 |   | `dataAvailable` returns true so process the new batch |

This PR expands the scope of `awaitBatchLock.synchronized` to eliminate the above race.

## How was this patch tested?

test("stress test"). It always failed before this patch. And it will pass after applying this patch. Ignore this test in the PR as it takes several minutes to finish.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12582 from zsxwing/SPARK-14579-2.
parent 99274418
No related branches found
No related tags found
No related merge requests found
......@@ -242,12 +242,12 @@ class StreamExecution(
// method. See SPARK-14131.
//
// Check to see what new data is available.
val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
availableOffsets ++= newData
val hasNewData = awaitBatchLock.synchronized {
val newData = microBatchThread.runUninterruptibly {
uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
}
availableOffsets ++= newData
if (dataAvailable) {
true
} else {
......
......@@ -26,6 +26,15 @@ class MemorySinkSuite extends StreamTest with SharedSQLContext {
import testImplicits._
test("registering as a table") {
testRegisterAsTable()
}
ignore("stress test") {
// Ignore the stress test as it takes several minutes to run
(0 until 1000).foreach(_ => testRegisterAsTable())
}
private def testRegisterAsTable(): Unit = {
val input = MemoryStream[Int]
val query = input.toDF().write
.format("memory")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment