Skip to content
Snippets Groups Projects
Commit 768b3d62 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-14579][SQL] Fix a race condition in StreamExecution.processAllAvailable

## What changes were proposed in this pull request?

There is a race condition in `StreamExecution.processAllAvailable`. Here is an execution order to reproduce it.

| Time        |Thread 1           | MicroBatchThread  |
|:-------------:|:-------------:|:-----:|
| 1 | |  `dataAvailable in constructNextBatch` returns false  |
| 2 | addData(newData)      |   |
| 3 | `noNewData = false` in  processAllAvailable |  |
| 4 | | noNewData = true |
| 5 | `noNewData` is true so just return | |

The root cause is that `checking dataAvailable and change noNewData to true` is not atomic. This PR puts these two actions into `synchronized` to make sure they are atomic.

In addition, this PR also has the following changes:

- Make `committedOffsets` and `availableOffsets` volatile to make sure they can be seen in other threads.
- Copy the reference of `availableOffsets` to a local variable so that `sourceStatuses` can use a snapshot of `availableOffsets`.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12339 from zsxwing/race-condition.
parent 372baf04
No related branches found
No related tags found
No related merge requests found
...@@ -59,12 +59,14 @@ class StreamExecution( ...@@ -59,12 +59,14 @@ class StreamExecution(
* Tracks how much data we have processed and committed to the sink or state store from each * Tracks how much data we have processed and committed to the sink or state store from each
* input source. * input source.
*/ */
@volatile
private[sql] var committedOffsets = new StreamProgress private[sql] var committedOffsets = new StreamProgress
/** /**
* Tracks the offsets that are available to be processed, but have not yet be committed to the * Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink. * sink.
*/ */
@volatile
private var availableOffsets = new StreamProgress private var availableOffsets = new StreamProgress
/** The current batchId or -1 if execution has not yet been initialized. */ /** The current batchId or -1 if execution has not yet been initialized. */
...@@ -111,7 +113,8 @@ class StreamExecution( ...@@ -111,7 +113,8 @@ class StreamExecution(
/** Returns current status of all the sources. */ /** Returns current status of all the sources. */
override def sourceStatuses: Array[SourceStatus] = { override def sourceStatuses: Array[SourceStatus] = {
sources.map(s => new SourceStatus(s.toString, availableOffsets.get(s))).toArray val localAvailableOffsets = availableOffsets
sources.map(s => new SourceStatus(s.toString, localAvailableOffsets.get(s))).toArray
} }
/** Returns current status of the sink. */ /** Returns current status of the sink. */
...@@ -228,7 +231,7 @@ class StreamExecution( ...@@ -228,7 +231,7 @@ class StreamExecution(
* Queries all of the sources to see if any new data is available. When there is new data the * Queries all of the sources to see if any new data is available. When there is new data the
* batchId counter is incremented and a new log entry is written with the newest offsets. * batchId counter is incremented and a new log entry is written with the newest offsets.
*/ */
private def constructNextBatch(): Boolean = { private def constructNextBatch(): Unit = {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue. // If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand" // As "FileStreamSource.getOffset" will create a file using HDFS API and call "Shell.runCommand"
...@@ -241,7 +244,15 @@ class StreamExecution( ...@@ -241,7 +244,15 @@ class StreamExecution(
} }
availableOffsets ++= newData availableOffsets ++= newData
if (dataAvailable) { val hasNewData = awaitBatchLock.synchronized {
if (dataAvailable) {
true
} else {
noNewData = true
false
}
}
if (hasNewData) {
// There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). // There is a potential dead-lock in Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622).
// If we interrupt some thread running Shell.runCommand, we may hit this issue. // If we interrupt some thread running Shell.runCommand, we may hit this issue.
// As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set // As "offsetLog.add" will create a file using HDFS API and call "Shell.runCommand" to set
...@@ -254,15 +265,11 @@ class StreamExecution( ...@@ -254,15 +265,11 @@ class StreamExecution(
} }
currentBatchId += 1 currentBatchId += 1
logInfo(s"Committed offsets for batch $currentBatchId.") logInfo(s"Committed offsets for batch $currentBatchId.")
true
} else { } else {
noNewData = true
awaitBatchLock.synchronized { awaitBatchLock.synchronized {
// Wake up any threads that are waiting for the stream to progress. // Wake up any threads that are waiting for the stream to progress.
awaitBatchLock.notifyAll() awaitBatchLock.notifyAll()
} }
false
} }
} }
...@@ -353,7 +360,10 @@ class StreamExecution( ...@@ -353,7 +360,10 @@ class StreamExecution(
* least the given `Offset`. This method is indented for use primarily when writing tests. * least the given `Offset`. This method is indented for use primarily when writing tests.
*/ */
def awaitOffset(source: Source, newOffset: Offset): Unit = { def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = !committedOffsets.contains(source) || committedOffsets(source) < newOffset def notDone = {
val localCommittedOffsets = committedOffsets
!localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
}
while (notDone) { while (notDone) {
logInfo(s"Waiting until $newOffset at $source") logInfo(s"Waiting until $newOffset at $source")
...@@ -365,13 +375,17 @@ class StreamExecution( ...@@ -365,13 +375,17 @@ class StreamExecution(
/** A flag to indicate that a batch has completed with no new data available. */ /** A flag to indicate that a batch has completed with no new data available. */
@volatile private var noNewData = false @volatile private var noNewData = false
override def processAllAvailable(): Unit = { override def processAllAvailable(): Unit = awaitBatchLock.synchronized {
noNewData = false noNewData = false
while (!noNewData) { while (true) {
awaitBatchLock.synchronized { awaitBatchLock.wait(10000) } awaitBatchLock.wait(10000)
if (streamDeathCause != null) { throw streamDeathCause } if (streamDeathCause != null) {
throw streamDeathCause
}
if (noNewData) {
return
}
} }
if (streamDeathCause != null) { throw streamDeathCause }
} }
override def awaitTermination(): Unit = { override def awaitTermination(): Unit = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment