diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala index 4224a7997c4101fc79c3b5a5dd5506a8fad25d83..c919cdb4cd65f19b547a3acd9d2e4d1a0435fba9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala @@ -42,6 +42,13 @@ case class EventTimeWatermark( .putLong(EventTimeWatermark.delayKey, delay.milliseconds) .build() a.withMetadata(updatedMetadata) + } else if (a.metadata.contains(EventTimeWatermark.delayKey)) { + // Remove existing watermark + val updatedMetadata = new MetadataBuilder() + .withMetadata(a.metadata) + .remove(EventTimeWatermark.delayKey) + .build() + a.withMetadata(updatedMetadata) } else { a } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c34d119734cc070d086bea1060abdab136b73566..c768525bc685578d531d1452dcc9aa5fbb4aabf8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.streaming.OutputMode._ @@ -305,6 +306,19 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + test("the new watermark should override the old one") { + val df = MemoryStream[(Long, Long)].toDF() + .withColumn("first", $"_1".cast("timestamp")) + .withColumn("second", $"_2".cast("timestamp")) + .withWatermark("first", "1 minute") + .withWatermark("second", "2 minutes") + + val eventTimeColumns = df.logicalPlan.output + .filter(_.metadata.contains(EventTimeWatermark.delayKey)) + assert(eventTimeColumns.size === 1) + assert(eventTimeColumns(0).name === "second") + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)