Skip to content
Snippets Groups Projects
Commit 0d26b3aa authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-21546][SS] dropDuplicates should ignore watermark when it's not a key

## What changes were proposed in this pull request?

When the watermark is not a column of `dropDuplicates`, right now it will crash. This PR fixed this issue.

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #18822 from zsxwing/SPARK-21546.
parent 9456176d
No related branches found
No related tags found
No related merge requests found
......@@ -156,8 +156,13 @@ trait WatermarkSupport extends UnaryExecNode {
}
/** Predicate based on keys that matches data older than the watermark */
lazy val watermarkPredicateForKeys: Option[Predicate] =
watermarkExpression.map(newPredicate(_, keyExpressions))
lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
Some(newPredicate(e, keyExpressions))
} else {
None
}
}
/** Predicate based on the child output that matches data older than the watermark. */
lazy val watermarkPredicateForData: Option[Predicate] =
......
......@@ -268,4 +268,17 @@ class DeduplicateSuite extends StateStoreMetricsTest with BeforeAndAfterAll {
CheckLastBatch(7)
)
}
test("SPARK-21546: dropDuplicates should ignore watermark when it's not a key") {
val input = MemoryStream[(Int, Int)]
val df = input.toDS.toDF("id", "time")
.withColumn("time", $"time".cast("timestamp"))
.withWatermark("time", "1 second")
.dropDuplicates("id")
.select($"id", $"time".cast("long"))
testStream(df)(
AddData(input, 1 -> 1, 1 -> 2, 2 -> 2),
CheckLastBatch(1 -> 1, 2 -> 2)
)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment