From eeb1d6db878641d9eac62d0869a90fe80c1f4461 Mon Sep 17 00:00:00 2001
From: uncleGen <hustyugm@gmail.com>
Date: Wed, 8 Mar 2017 23:23:10 -0800
Subject: [PATCH] [SPARK-19859][SS][FOLLOW-UP] The new watermark should
 override the old one.

## What changes were proposed in this pull request?

A follow up to SPARK-19859:

- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark

## How was this patch tested?

Jenkins.

Author: uncleGen <hustyugm@gmail.com>

Closes #17221 from uncleGen/SPARK-19859.
---
 .../plans/logical/EventTimeWatermark.scala    |  9 ++++++++-
 .../streaming/EventTimeWatermarkExec.scala    | 19 +++++++++++--------
 2 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 62f68a6d7b..06196b5afb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval
 object EventTimeWatermark {
   /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the eventTime watermark delay. */
   val delayKey = "spark.watermarkDelayMs"
+
+  def getDelayMs(delay: CalendarInterval): Long = {
+    // We define month as `31 days` to simplify calculation.
+    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+    delay.milliseconds + delay.months * millisPerMonth
+  }
 }
 
 /**
@@ -37,9 +43,10 @@ case class EventTimeWatermark(
   // Update the metadata on the eventTime column to include the desired delay.
   override val output: Seq[Attribute] = child.output.map { a =>
     if (a semanticEquals eventTime) {
+      val delayMs = EventTimeWatermark.getDelayMs(delay)
       val updatedMetadata = new MetadataBuilder()
         .withMetadata(a.metadata)
-        .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+        .putLong(EventTimeWatermark.delayKey, delayMs)
         .build()
       a.withMetadata(updatedMetadata)
     } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 5a9a99e111..25cf609fc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,10 +84,7 @@ case class EventTimeWatermarkExec(
     child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
-  val delayMs = {
-    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
-    delay.milliseconds + delay.months * millisPerMonth
-  }
+  val delayMs = EventTimeWatermark.getDelayMs(delay)
 
   sparkContext.register(eventTimeStats)
 
@@ -105,10 +102,16 @@ case class EventTimeWatermarkExec(
   override val output: Seq[Attribute] = child.output.map { a =>
     if (a semanticEquals eventTime) {
       val updatedMetadata = new MetadataBuilder()
-          .withMetadata(a.metadata)
-          .putLong(EventTimeWatermark.delayKey, delayMs)
-          .build()
-
+        .withMetadata(a.metadata)
+        .putLong(EventTimeWatermark.delayKey, delayMs)
+        .build()
+      a.withMetadata(updatedMetadata)
+    } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+      // Remove existing watermark
+      val updatedMetadata = new MetadataBuilder()
+        .withMetadata(a.metadata)
+        .remove(EventTimeWatermark.delayKey)
+        .build()
       a.withMetadata(updatedMetadata)
     } else {
       a
-- 
GitLab