Skip to content
Snippets Groups Projects
Commit d4588165 authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Tathagata Das
Browse files

[SPARK-18722][SS] Move no data rate limit from StreamExecution to ProgressReporter


## What changes were proposed in this pull request?

Move no data rate limit from StreamExecution to ProgressReporter to make `recentProgresses` and listener events consistent.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16155 from zsxwing/SPARK-18722.

(cherry picked from commit 4af142f5)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent 1946854a
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
import org.apache.spark.util.Clock
/**
......@@ -56,6 +57,7 @@ trait ProgressReporter extends Logging {
protected def offsetSeqMetadata: OffsetSeqMetadata
protected def currentBatchId: Long
protected def sparkSession: SparkSession
protected def postEvent(event: StreamingQueryListener.Event): Unit
// Local timestamps and counters.
private var currentTriggerStartTimestamp = -1L
......@@ -70,6 +72,12 @@ trait ProgressReporter extends Logging {
/** Holds the most recent query progress updates. Accesses must lock on the queue itself. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
// The timestamp we report an event that has no input data
private var lastNoDataProgressEventTime = Long.MinValue
@volatile
protected var currentStatus: StreamingQueryStatus = {
new StreamingQueryStatus(
......@@ -100,6 +108,17 @@ trait ProgressReporter extends Logging {
currentDurationsMs.clear()
}
private def updateProgress(newProgress: StreamingQueryProgress): Unit = {
progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
progressBuffer.dequeue()
}
}
postEvent(new QueryProgressEvent(newProgress))
logInfo(s"Streaming query made progress: $newProgress")
}
/** Finalizes the query progress and adds it to list of recent status updates. */
protected def finishTrigger(hasNewData: Boolean): Unit = {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
......@@ -145,14 +164,18 @@ trait ProgressReporter extends Logging {
sources = sourceProgress.toArray,
sink = sinkProgress)
progressBuffer.synchronized {
progressBuffer += newProgress
while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) {
progressBuffer.dequeue()
if (hasNewData) {
// Reset noDataEventTimestamp if we processed any data
lastNoDataProgressEventTime = Long.MinValue
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
updateProgress(newProgress)
}
}
logInfo(s"Streaming query made progress: $newProgress")
currentStatus = currentStatus.copy(isTriggerActive = false)
}
......
......@@ -58,9 +58,6 @@ class StreamExecution(
private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay
private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval
/**
* A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation.
*/
......@@ -217,9 +214,6 @@ class StreamExecution(
// While active, repeatedly attempt to run batches.
SparkSession.setActiveSession(sparkSession)
// The timestamp we report an event that has no input data
var lastNoDataProgressEventTime = Long.MinValue
triggerExecutor.execute(() => {
startTrigger()
......@@ -242,18 +236,6 @@ class StreamExecution(
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Reset noDataEventTimestamp if we processed any data
lastNoDataProgressEventTime = Long.MinValue
postEvent(new QueryProgressEvent(lastProgress))
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
postEvent(new QueryProgressEvent(lastProgress))
}
}
if (dataAvailable) {
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
......@@ -504,7 +486,7 @@ class StreamExecution(
}
}
private def postEvent(event: StreamingQueryListener.Event) {
override protected def postEvent(event: StreamingQueryListener.Event): Unit = {
sparkSession.streams.postListenerEvent(event)
}
......
......@@ -237,6 +237,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
true
}
// `recentProgresses` should not receive too many no data events
actions += AssertOnQuery { q =>
q.recentProgresses.size > 1 && q.recentProgresses.size <= 11
}
testStream(input.toDS)(actions: _*)
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// 11 is the max value of the possible numbers of events.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment