diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 16edb35b1d43f48a66923f7f99ca50e588f253a8..0a4d3a93a07e8cbca6ffe210faef7482742f69e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -563,7 +563,7 @@ class Dataset[T] private[sql]( * @param eventTime the name of the column that contains the event time of the row. * @param delayThreshold the minimum delay to wait to data to arrive late, relative to the latest * record that has been processed in the form of an interval - * (e.g. "1 minute" or "5 hours"). + * (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative. * * @group streaming * @since 2.1.0 @@ -576,6 +576,8 @@ class Dataset[T] private[sql]( val parsedDelay = Option(CalendarInterval.fromString("interval " + delayThreshold)) .getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'")) + require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0, + s"delay threshold ($delayThreshold) should not be negative.") EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index c768525bc685578d531d1452dcc9aa5fbb4aabf8..7614ea5eb3c017899fbdbda842c1ebdd7ddad58a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -306,6 +306,29 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin ) } + test("delay threshold should not be negative.") { + val inputData = MemoryStream[Int].toDF() + var e = intercept[IllegalArgumentException] { + inputData.withWatermark("value", "-1 year") + } + assert(e.getMessage contains "should not be negative.") + + e = intercept[IllegalArgumentException] { + inputData.withWatermark("value", "1 year -13 months") + } + assert(e.getMessage contains "should not be negative.") + + e = intercept[IllegalArgumentException] { + inputData.withWatermark("value", "1 month -40 days") + } + assert(e.getMessage contains "should not be negative.") + + e = intercept[IllegalArgumentException] { + inputData.withWatermark("value", "-10 seconds") + } + assert(e.getMessage contains "should not be negative.") + } + test("the new watermark should override the old one") { val df = MemoryStream[(Long, Long)].toDF() .withColumn("first", $"_1".cast("timestamp"))