Skip to content
Snippets Groups Projects
Commit 3c8861d9 authored by Tathagata Das's avatar Tathagata Das Committed by Shixiong Zhu
Browse files

[SPARK-18894][SS] Fix event time watermark delay threshold specified in months or years


## What changes were proposed in this pull request?

Two changes
- Fix how delays specified in months and years are translated to milliseconds
- Following up on #16258, not show watermark when there is no watermarking in the query

## How was this patch tested?
Updated and new unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16304 from tdas/SPARK-18834-1.

(cherry picked from commit 607a1e63)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent bc54a14b
No related branches found
No related tags found
No related merge requests found
......@@ -84,6 +84,11 @@ case class EventTimeWatermarkExec(
child: SparkPlan) extends SparkPlan {
val eventTimeStats = new EventTimeStatsAccum()
val delayMs = {
val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
delay.milliseconds + delay.months * millisPerMonth
}
sparkContext.register(eventTimeStats)
override protected def doExecute(): RDD[InternalRow] = {
......@@ -101,7 +106,7 @@ case class EventTimeWatermarkExec(
if (a semanticEquals eventTime) {
val updatedMetadata = new MetadataBuilder()
.withMetadata(a.metadata)
.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
.putLong(EventTimeWatermark.delayKey, delayMs)
.build()
a.withMetadata(updatedMetadata)
......
......@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
......@@ -182,7 +182,10 @@ trait ProgressReporter extends Logging {
/** Extracts statistics from the most recent query execution. */
private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = {
val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty
val watermarkTimestamp =
if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs))
else Map.empty[String, String]
if (!hasNewData) {
return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp)
......
......@@ -387,7 +387,7 @@ class StreamExecution(
lastExecution.executedPlan.collect {
case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 =>
logDebug(s"Observed event time stats: ${e.eventTimeStats.value}")
e.eventTimeStats.value.max - e.delay.milliseconds
e.eventTimeStats.value.max - e.delayMs
}.headOption.foreach { newWatermarkMs =>
if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) {
logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
......
......@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
import java.{util => ju}
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.scalatest.BeforeAndAfter
......@@ -26,8 +27,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.InternalOutputModes.Complete
class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
import testImplicits._
......@@ -52,24 +54,35 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.getMessage contains "int")
}
test("event time and watermark metrics") {
val inputData = MemoryStream[Int]
// No event time metrics when there is no watermarking
val inputData1 = MemoryStream[Int]
val aggWithoutWatermark = inputData1.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
val windowedAggregation = inputData.toDF()
testStream(aggWithoutWatermark, outputMode = Complete)(
AddData(inputData1, 15),
CheckAnswer((15, 1)),
assertEventStats { e => assert(e.isEmpty) },
AddData(inputData1, 10, 12, 14),
CheckAnswer((10, 3), (15, 1)),
assertEventStats { e => assert(e.isEmpty) }
)
// All event time metrics where watermarking is set
val inputData2 = MemoryStream[Int]
val aggWithWatermark = inputData2.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
true
}
testStream(windowedAggregation)(
AddData(inputData, 15),
testStream(aggWithWatermark)(
AddData(inputData2, 15),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(15))
......@@ -77,7 +90,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.get("avg") === formatTimestamp(15))
assert(e.get("watermark") === formatTimestamp(0))
},
AddData(inputData, 10, 12, 14),
AddData(inputData2, 10, 12, 14),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(14))
......@@ -85,7 +98,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.get("avg") === formatTimestamp(12))
assert(e.get("watermark") === formatTimestamp(5))
},
AddData(inputData, 25),
AddData(inputData2, 25),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
......@@ -93,7 +106,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
assert(e.get("avg") === formatTimestamp(25))
assert(e.get("watermark") === formatTimestamp(5))
},
AddData(inputData, 25),
AddData(inputData2, 25),
CheckAnswer((10, 3)),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
......@@ -124,6 +137,33 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
)
}
test("delay in months and years handled correctly") {
val currentTimeMs = System.currentTimeMillis
val currentTime = new Date(currentTimeMs)
val input = MemoryStream[Long]
val aggWithWatermark = input.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "2 years 5 months")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
def monthsSinceEpoch(date: Date): Int = { date.getYear * 12 + date.getMonth }
testStream(aggWithWatermark)(
AddData(input, currentTimeMs / 1000),
CheckAnswer(),
AddData(input, currentTimeMs / 1000),
CheckAnswer(),
assertEventStats { e =>
assert(timestampFormat.parse(e.get("max")).getTime === (currentTimeMs / 1000) * 1000)
val watermarkTime = timestampFormat.parse(e.get("watermark"))
assert(monthsSinceEpoch(currentTime) - monthsSinceEpoch(watermarkTime) === 29)
}
)
}
test("recovery") {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
......@@ -231,6 +271,13 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging {
)
}
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
AssertOnQuery { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
true
}
}
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment