Skip to content
Snippets Groups Projects
Commit d35a1268 authored by Burak Yavuz's avatar Burak Yavuz Committed by Tathagata Das
Browse files

[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics...

[SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even if there is no new data in trigger

In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics.

Regression test in `StreamingQueryStatusAndProgressSuite`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16716 from brkyvz/state-agg.

(cherry picked from commit 081b7add)
Signed-off-by: default avatarTathagata Das <tathagata.das1565@gmail.com>
parent e43f161b
No related branches found
No related tags found
No related merge requests found
......@@ -180,6 +180,26 @@ trait ProgressReporter extends Logging {
currentStatus = currentStatus.copy(isTriggerActive = false)
}
/** Extract statistics about stateful operators from the executed query plan. */
private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = {
if (lastExecution == null) return Nil
// lastExecution could belong to one of the previous triggers if `!hasNewData`.
// Walking the plan again should be inexpensive.
val stateNodes = lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreSaveExec] => p
}
stateNodes.map { node =>
val numRowsUpdated = if (hasNewData) {
node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)
} else {
0L
}
new StateOperatorProgress(
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
numRowsUpdated = numRowsUpdated)
}
}
/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
......@@ -187,8 +207,11 @@ trait ProgressReporter extends Logging {
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]
// SPARK-19378: Still report metrics even though no data was processed while reporting progress.
val stateOperators = extractStateOperatorMetrics(hasNewData)
if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp)
}
// We want to associate execution plan leaves to sources that generate them, so that we match
......@@ -237,16 +260,6 @@ trait ProgressReporter extends Logging {
Map.empty
}
// Extract statistics about stateful operators in the query plan.
val stateNodes = lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreSaveExec] => p
}
val stateOperators = stateNodes.map { node =>
new StateOperatorProgress(
numRowsTotal = node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
}
val eventTimeStats = lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
val stats = e.eventTimeStats.value
......
......@@ -20,16 +20,25 @@ package org.apache.spark.sql.streaming
import java.util.UUID
import scala.collection.JavaConverters._
import scala.language.postfixOps
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
class StreamingQueryStatusAndProgressSuite extends StreamTest {
class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
implicit class EqualsIgnoreCRLF(source: String) {
def equalsIgnoreCRLF(target: String): Boolean = {
source.replaceAll("\r\n|\r|\n", System.lineSeparator) ===
target.replaceAll("\r\n|\r|\n", System.lineSeparator)
}
}
test("StreamingQueryProgress - prettyJson") {
val json1 = testProgress1.prettyJson
......@@ -165,6 +174,41 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest {
query.stop()
}
}
test("SPARK-19378: Continue reporting stateOp metrics even if there is no active trigger") {
import testImplicits._
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "10") {
val inputData = MemoryStream[Int]
val query = inputData.toDS().toDF("value")
.select('value)
.groupBy($"value")
.agg(count("*"))
.writeStream
.queryName("metric_continuity")
.format("memory")
.outputMode("complete")
.start()
try {
inputData.addData(1, 2)
query.processAllAvailable()
val progress = query.lastProgress
assert(progress.stateOperators.length > 0)
// Should emit new progresses every 10 ms, but we could be facing a slow Jenkins
eventually(timeout(1 minute)) {
val nextProgress = query.lastProgress
assert(nextProgress.timestamp !== progress.timestamp)
assert(nextProgress.numInputRows === 0)
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
}
} finally {
query.stop()
}
}
}
}
object StreamingQueryStatusAndProgressSuite {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment