diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 7d0d086746c798f442ca7479e934ae6e520bbe60..d95f55267e142ebbfa976534ff36a33728d4ccbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent import org.apache.spark.util.Clock /** @@ -56,6 +57,7 @@ trait ProgressReporter extends Logging { protected def offsetSeqMetadata: OffsetSeqMetadata protected def currentBatchId: Long protected def sparkSession: SparkSession + protected def postEvent(event: StreamingQueryListener.Event): Unit // Local timestamps and counters. private var currentTriggerStartTimestamp = -1L @@ -70,6 +72,12 @@ trait ProgressReporter extends Logging { /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() + private val noDataProgressEventInterval = + sparkSession.sessionState.conf.streamingNoDataProgressEventInterval + + // The timestamp we report an event that has no input data + private var lastNoDataProgressEventTime = Long.MinValue + @volatile protected var currentStatus: StreamingQueryStatus = { new StreamingQueryStatus( @@ -100,6 +108,17 @@ trait ProgressReporter extends Logging { currentDurationsMs.clear() } + private def updateProgress(newProgress: StreamingQueryProgress): Unit = { + progressBuffer.synchronized { + progressBuffer += newProgress + while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { + progressBuffer.dequeue() + } + } + postEvent(new QueryProgressEvent(newProgress)) + logInfo(s"Streaming query made progress: $newProgress") + } + /** Finalizes the query progress and adds it to list of recent status updates. */ protected def finishTrigger(hasNewData: Boolean): Unit = { currentTriggerEndTimestamp = triggerClock.getTimeMillis() @@ -145,14 +164,18 @@ trait ProgressReporter extends Logging { sources = sourceProgress.toArray, sink = sinkProgress) - progressBuffer.synchronized { - progressBuffer += newProgress - while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { - progressBuffer.dequeue() + if (hasNewData) { + // Reset noDataEventTimestamp if we processed any data + lastNoDataProgressEventTime = Long.MinValue + updateProgress(newProgress) + } else { + val now = triggerClock.getTimeMillis() + if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { + lastNoDataProgressEventTime = now + updateProgress(newProgress) } } - logInfo(s"Streaming query made progress: $newProgress") currentStatus = currentStatus.copy(isTriggerActive = false) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 083cce8eb52a689050d9166a03048327af42f1a9..39be222d05d0fe4a7df5c1e5499251b14d2ff0e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -58,9 +58,6 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay - private val noDataProgressEventInterval = - sparkSession.sessionState.conf.streamingNoDataProgressEventInterval - /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -217,9 +214,6 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) - // The timestamp we report an event that has no input data - var lastNoDataProgressEventTime = Long.MinValue - triggerExecutor.execute(() => { startTrigger() @@ -242,18 +236,6 @@ class StreamExecution( // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) - if (dataAvailable) { - // Reset noDataEventTimestamp if we processed any data - lastNoDataProgressEventTime = Long.MinValue - postEvent(new QueryProgressEvent(lastProgress)) - } else { - val now = triggerClock.getTimeMillis() - if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { - lastNoDataProgressEventTime = now - postEvent(new QueryProgressEvent(lastProgress)) - } - } - if (dataAvailable) { // We'll increase currentBatchId after we complete processing current batch's data currentBatchId += 1 @@ -504,7 +486,7 @@ class StreamExecution( } } - private def postEvent(event: StreamingQueryListener.Event) { + override protected def postEvent(event: StreamingQueryListener.Event): Unit = { sparkSession.streams.postListenerEvent(event) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index a38c05eed5e3327e999adae04e180d2e689f04f9..1cd503c6de6963d697dd32d164825b6b18d3ee4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -237,6 +237,10 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } true } + // `recentProgresses` should not receive too many no data events + actions += AssertOnQuery { q => + q.recentProgresses.size > 1 && q.recentProgresses.size <= 11 + } testStream(input.toDS)(actions: _*) spark.sparkContext.listenerBus.waitUntilEmpty(10000) // 11 is the max value of the possible numbers of events.