Skip to content
Snippets Groups Projects
Commit 0d3a6319 authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-20714][SS] Fix match error when watermark is set with timeout = no...

[SPARK-20714][SS] Fix match error when watermark is set with timeout = no timeout / processing timeout

## What changes were proposed in this pull request?

When watermark is set, and timeout conf is NoTimeout or ProcessingTimeTimeout (both do not need the watermark), the query fails at runtime with the following exception.
```
MatchException: Some(org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate1a9b798e) (of class scala.Some)
    org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:120)
    org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1.apply(FlatMapGroupsWithStateExec.scala:116)
    org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:70)
    org.apache.spark.sql.execution.streaming.state.package$StateStoreOps$$anonfun$1.apply(package.scala:65)
    org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:64)
```

The match did not correctly handle cases where watermark was defined by the timeout was different from EventTimeTimeout.

## How was this patch tested?
New unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17954 from tdas/SPARK-20714.
parent 7d6ff391
No related branches found
No related tags found
No related merge requests found
......@@ -120,7 +120,7 @@ case class FlatMapGroupsWithStateExec(
val filteredIter = watermarkPredicateForData match {
case Some(predicate) if timeoutConf == EventTimeTimeout =>
iter.filter(row => !predicate.eval(row))
case None =>
case _ =>
iter
}
......
......@@ -589,7 +589,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
)
}
test("flatMapGroupsWithState - streaming with event time timeout") {
test("flatMapGroupsWithState - streaming with event time timeout + watermark") {
// Function to maintain the max event time
// Returns the max event time in the state, or -1 if the state was removed by timeout
val stateFunc = (
......@@ -761,6 +761,44 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
assert(e.getMessage === "The output mode of function should be append or update")
}
def testWithTimeout(timeoutConf: GroupStateTimeout): Unit = {
test("SPARK-20714: watermark does not fail query when timeout = " + timeoutConf) {
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
val stateFunc =
(key: String, values: Iterator[(String, Long)], state: GroupState[RunningCount]) => {
if (state.hasTimedOut) {
state.remove()
Iterator((key, "-1"))
} else {
val count = state.getOption.map(_.count).getOrElse(0L) + values.size
state.update(RunningCount(count))
state.setTimeoutDuration("10 seconds")
Iterator((key, count.toString))
}
}
val clock = new StreamManualClock
val inputData = MemoryStream[(String, Long)]
val result =
inputData.toDF().toDF("key", "time")
.selectExpr("key", "cast(time as timestamp) as timestamp")
.withWatermark("timestamp", "10 second")
.as[(String, Long)]
.groupByKey(x => x._1)
.flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
testStream(result, Update)(
StartStream(ProcessingTime("1 second"), triggerClock = clock),
AddData(inputData, ("a", 1L)),
AdvanceManualClock(1 * 1000),
CheckLastBatch(("a", "1"))
)
}
}
testWithTimeout(NoTimeout)
testWithTimeout(ProcessingTimeTimeout)
def testStateUpdateWithData(
testName: String,
stateUpdates: GroupState[Int] => Unit,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment