Skip to content
Snippets Groups Projects
Commit 924c4247 authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[SPARK-20301][FLAKY-TEST] Fix Hadoop Shell.runCommand flakiness in Structured Streaming tests

## What changes were proposed in this pull request?

Some Structured Streaming tests show flakiness such as:
```
[info] - prune results by current_date, complete mode - 696 *** FAILED *** (10 seconds, 937 milliseconds)
[info]   Timed out while stopping and waiting for microbatchthread to terminate.: The code passed to failAfter did not complete within 10 seconds.
```

This happens when we wait for the stream to stop, but it doesn't. The reason it doesn't stop is that we interrupt the microBatchThread, but Hadoop's `Shell.runCommand` swallows the interrupt exception, and the exception is not propagated upstream to the microBatchThread. Then this thread continues to run, only to start blocking on the `streamManualClock`.

## How was this patch tested?

Thousand retries locally and [Jenkins](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75720/testReport) of the flaky tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #17613 from brkyvz/flaky-stream-agg.
parent 99a94731
No related branches found
No related tags found
No related merge requests found
...@@ -284,42 +284,38 @@ class StreamExecution( ...@@ -284,42 +284,38 @@ class StreamExecution(
triggerExecutor.execute(() => { triggerExecutor.execute(() => {
startTrigger() startTrigger()
val continueToRun = if (isActive) {
if (isActive) { reportTimeTaken("triggerExecution") {
reportTimeTaken("triggerExecution") { if (currentBatchId < 0) {
if (currentBatchId < 0) { // We'll do this initialization only once
// We'll do this initialization only once populateStartOffsets(sparkSessionToRunBatches)
populateStartOffsets(sparkSessionToRunBatches) logDebug(s"Stream running from $committedOffsets to $availableOffsets")
logDebug(s"Stream running from $committedOffsets to $availableOffsets") } else {
} else { constructNextBatch()
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionToRunBatches)
}
} }
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) { if (dataAvailable) {
// Update committed offsets. currentStatus = currentStatus.copy(isDataAvailable = true)
batchCommitLog.add(currentBatchId) updateStatusMessage("Processing new data")
committedOffsets ++= availableOffsets runBatch(sparkSessionToRunBatches)
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
} }
true }
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
batchCommitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else { } else {
false currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
} }
}
updateStatusMessage("Waiting for next trigger") updateStatusMessage("Waiting for next trigger")
continueToRun isActive
}) })
updateStatusMessage("Stopped") updateStatusMessage("Stopped")
} else { } else {
......
...@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { ...@@ -277,6 +277,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def threadState = def threadState =
if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead"
def threadStackTrace = if (currentStream != null && currentStream.microBatchThread.isAlive) {
s"Thread stack trace: ${currentStream.microBatchThread.getStackTrace.mkString("\n")}"
} else {
""
}
def testState = def testState =
s""" s"""
...@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { ...@@ -287,6 +292,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
|Output Mode: $outputMode |Output Mode: $outputMode
|Stream state: $currentOffsets |Stream state: $currentOffsets
|Thread state: $threadState |Thread state: $threadState
|$threadStackTrace
|${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""} |${if (streamThreadDeathCause != null) stackTraceToString(streamThreadDeathCause) else ""}
| |
|== Sink == |== Sink ==
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment