From ffe65b06511f3143cb2549073bfbe145663ad561 Mon Sep 17 00:00:00 2001
From: uncleGen <hustyugm@gmail.com>
Date: Thu, 9 Mar 2017 11:07:31 -0800
Subject: [PATCH] [SPARK-19861][SS] watermark should not be a negative time.

## What changes were proposed in this pull request?

`watermark` should not be negative. This behavior is invalid, check it before real run.

## How was this patch tested?

add new unit test.

Author: uncleGen <hustyugm@gmail.com>
Author: dylon <hustyugm@gmail.com>

Closes #17202 from uncleGen/SPARK-19861.

(cherry picked from commit 30b18e69361746b4d656474374d8b486bb48a19e)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>
---
 .../scala/org/apache/spark/sql/Dataset.scala  |  4 +++-
 .../streaming/EventTimeWatermarkSuite.scala   | 23 +++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 563bfa8a84..e2d0e512cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -559,7 +559,7 @@ class Dataset[T] private[sql](
    * @param eventTime the name of the column that contains the event time of the row.
    * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest
    *                       record that has been processed in the form of an interval
-   *                       (e.g. "1 minute" or "5 hours").
+   *                       (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
    *
    * @group streaming
    * @since 2.1.0
@@ -572,6 +572,8 @@ class Dataset[T] private[sql](
     val parsedDelay =
       Option(CalendarInterval.fromString("interval " + delayThreshold))
         .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
+    require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
+      s"delay threshold ($delayThreshold) should not be negative.")
     EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index c768525bc6..7614ea5eb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
     )
   }
 
+  test("delay threshold should not be negative.") {
+    val inputData = MemoryStream[Int].toDF()
+    var e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "-1 year")
+    }
+    assert(e.getMessage contains "should not be negative.")
+
+    e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "1 year -13 months")
+    }
+    assert(e.getMessage contains "should not be negative.")
+
+    e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "1 month -40 days")
+    }
+    assert(e.getMessage contains "should not be negative.")
+
+    e = intercept[IllegalArgumentException] {
+      inputData.withWatermark("value", "-10 seconds")
+    }
+    assert(e.getMessage contains "should not be negative.")
+  }
+
   test("the new watermark should override the old one") {
     val df = MemoryStream[(Long, Long)].toDF()
       .withColumn("first", $"_1".cast("timestamp"))
-- 
GitLab