diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index e8570d040dbe468724e53cfe79b1b3b0d14b5d65..5a9a99e11188e1cf051f9077b6e8aa4f681c3a67 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
     child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
+  val delayMs = {
+    val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+    delay.milliseconds + delay.months * millisPerMonth
+  }
+
   sparkContext.register(eventTimeStats)
 
   override protected def doExecute(): RDD[InternalRow] = {
@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
     if (a semanticEquals eventTime) {
       val updatedMetadata = new MetadataBuilder()
           .withMetadata(a.metadata)
-          .putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+          .putLong(EventTimeWatermark.delayKey, delayMs)
           .build()
 
       a.withMetadata(updatedMetadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 2386f33f8ad414cc592228f90a590503add3b4eb..c5e9eae607b395b686424f078bf199d7e2a25819 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
 
   /** Extracts statistics from the most recent query execution. */
   private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
-    val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+    val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
+    val watermarkTimestamp =
+      if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
+      else Map.empty[String, String]
 
     if (!hasNewData) {
       return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8f97d9570eaa6bafef06f4af4e91f86fd59639f0..e05200df50849ae4a9af90d37cbd93e196364b76 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -387,7 +387,7 @@ class StreamExecution(
         lastExecution.executedPlan.collect {
           case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
             logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
-            e.eventTimeStats.value.max - e.delay.milliseconds
+            e.eventTimeStats.value.max - e.delayMs
         }.headOption.foreach { newWatermarkMs =>
           if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
             logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
similarity index 77%
rename from sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index f1cc19c6e235d01852f63699c1f83cd1988f0c48..bdfba9590b0a0186c688d6c49087ef343ba66b10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
 
 import java.{util => ju}
 import java.text.SimpleDateFormat
+import java.util.{Calendar, Date}
 
 import org.scalatest.BeforeAndAfter
 
@@ -26,8 +27,9 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
+import org.apache.spark.sql.InternalOutputModes.Complete
 
-class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
+class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
 
   import testImplicits._
 
@@ -52,24 +54,35 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
     assert(e.getMessage contains "int")
   }
 
-
   test("event time and watermark metrics") {
-    val inputData = MemoryStream[Int]
+    // No event time metrics when there is no watermarking
+    val inputData1 = MemoryStream[Int]
+    val aggWithoutWatermark = inputData1.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
 
-    val windowedAggregation = inputData.toDF()
+    testStream(aggWithoutWatermark, outputMode = Complete)(
+      AddData(inputData1, 15),
+      CheckAnswer((15, 1)),
+      assertEventStats { e => assert(e.isEmpty) },
+      AddData(inputData1, 10, 12, 14),
+      CheckAnswer((10, 3), (15, 1)),
+      assertEventStats { e => assert(e.isEmpty) }
+    )
+
+    // All event time metrics where watermarking is set
+    val inputData2 = MemoryStream[Int]
+    val aggWithWatermark = inputData2.toDF()
         .withColumn("eventTime", $"value".cast("timestamp"))
         .withWatermark("eventTime", "10 seconds")
         .groupBy(window($"eventTime", "5 seconds") as 'window)
         .agg(count("*") as 'count)
         .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
 
-    def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
-      body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
-      true
-    }
-
-    testStream(windowedAggregation)(
-      AddData(inputData, 15),
+    testStream(aggWithWatermark)(
+      AddData(inputData2, 15),
       CheckAnswer(),
       assertEventStats { e =>
         assert(e.get("max") === formatTimestamp(15))
@@ -77,7 +90,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
         assert(e.get("avg") === formatTimestamp(15))
         assert(e.get("watermark") === formatTimestamp(0))
       },
-      AddData(inputData, 10, 12, 14),
+      AddData(inputData2, 10, 12, 14),
       CheckAnswer(),
       assertEventStats { e =>
         assert(e.get("max") === formatTimestamp(14))
@@ -85,7 +98,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
         assert(e.get("avg") === formatTimestamp(12))
         assert(e.get("watermark") === formatTimestamp(5))
       },
-      AddData(inputData, 25),
+      AddData(inputData2, 25),
       CheckAnswer(),
       assertEventStats { e =>
         assert(e.get("max") === formatTimestamp(25))
@@ -93,7 +106,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
         assert(e.get("avg") === formatTimestamp(25))
         assert(e.get("watermark") === formatTimestamp(5))
       },
-      AddData(inputData, 25),
+      AddData(inputData2, 25),
       CheckAnswer((10, 3)),
       assertEventStats { e =>
         assert(e.get("max") === formatTimestamp(25))
@@ -124,6 +137,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
     )
   }
 
+  test("delay in months and years handled correctly") {
+    val currentTimeMs = System.currentTimeMillis
+    val currentTime = new Date(currentTimeMs)
+
+    val input = MemoryStream[Long]
+    val aggWithWatermark = input.toDF()
+      .withColumn("eventTime", $"value".cast("timestamp"))
+      .withWatermark("eventTime", "2 years 5 months")
+      .groupBy(window($"eventTime", "5 seconds") as 'window)
+      .agg(count("*") as 'count)
+      .select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth }
+
+    testStream(aggWithWatermark)(
+      AddData(input, currentTimeMs / 1000),
+      CheckAnswer(),
+      AddData(input, currentTimeMs / 1000),
+      CheckAnswer(),
+      assertEventStats { e =>
+        assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000)
+        val watermarkTime = timestampFormat.parse(e.get("watermark"))
+        assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29)
+      }
+    )
+  }
+
   test("recovery") {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()
@@ -231,6 +271,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
     )
   }
 
+  private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
+    AssertOnQuery { q =>
+      body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
+      true
+    }
+  }
+
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))