Skip to content
Snippets Groups Projects
Commit d8830c50 authored by Shixiong Zhu's avatar Shixiong Zhu
Browse files

[SPARK-19859][SS] The new watermark should override the old one

## What changes were proposed in this pull request?

The new watermark should override the old one. Otherwise, we just pick up the first column which has a watermark, it may be unexpected.

## How was this patch tested?

The new test.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17199 from zsxwing/SPARK-19859.
parent ca849ac4
No related branches found
No related tags found
No related merge requests found
......@@ -42,6 +42,13 @@ case class EventTimeWatermark(
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
.build()
a.withMetadata(updatedMetadata)
} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
// Remove existing watermark
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
.remove(EventTimeWatermark.delayKey)
.build()
a.withMetadata(updatedMetadata)
} else {
a
}
......
......@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.OutputMode._
......@@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
)
}
test("the new watermark should override the old one") {
val df = MemoryStream[(Long, Long)].toDF()
.withColumn("first", $"_1".cast("timestamp"))
.withColumn("second", $"_2".cast("timestamp"))
.withWatermark("first", "1 minute")
.withWatermark("second", "2 minutes")
val eventTimeColumns = df.logicalPlan.output
.filter(_.metadata.contains(EventTimeWatermark.delayKey))
assert(eventTimeColumns.size === 1)
assert(eventTimeColumns(0).name === "second")
}
private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment