diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala index 4c8cb069d23a0e45bcc6521453e1f69d2ed7d66e..e8570d040dbe468724e53cfe79b1b3b0d14b5d65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.streaming -import scala.math.max - import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection} @@ -28,24 +26,48 @@ import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.util.AccumulatorV2 -/** Tracks the maximum positive long seen. */ -class MaxLong(protected var currentValue: Long = 0) - extends AccumulatorV2[Long, Long] { +/** Class for collecting event time stats with an accumulator */ +case class EventTimeStats(var max: Long, var min: Long, var sum: Long, var count: Long) { + def add(eventTime: Long): Unit = { + this.max = math.max(this.max, eventTime) + this.min = math.min(this.min, eventTime) + this.sum += eventTime + this.count += 1 + } + + def merge(that: EventTimeStats): Unit = { + this.max = math.max(this.max, that.max) + this.min = math.min(this.min, that.min) + this.sum += that.sum + this.count += that.count + } + + def avg: Long = sum / count +} + +object EventTimeStats { + def zero: EventTimeStats = EventTimeStats( + max = Long.MinValue, min = Long.MaxValue, sum = 0L, count = 0L) +} + +/** Accumulator that collects stats on event time in a batch. */ +class EventTimeStatsAccum(protected var currentStats: EventTimeStats = EventTimeStats.zero) + extends AccumulatorV2[Long, EventTimeStats] { - override def isZero: Boolean = value == 0 - override def value: Long = currentValue - override def copy(): AccumulatorV2[Long, Long] = new MaxLong(currentValue) + override def isZero: Boolean = value == EventTimeStats.zero + override def value: EventTimeStats = currentStats + override def copy(): AccumulatorV2[Long, EventTimeStats] = new EventTimeStatsAccum(currentStats) override def reset(): Unit = { - currentValue = 0 + currentStats = EventTimeStats.zero } override def add(v: Long): Unit = { - currentValue = max(v, value) + currentStats.add(v) } - override def merge(other: AccumulatorV2[Long, Long]): Unit = { - currentValue = max(value, other.value) + override def merge(other: AccumulatorV2[Long, EventTimeStats]): Unit = { + currentStats.merge(other.value) } } @@ -54,22 +76,21 @@ class MaxLong(protected var currentValue: Long = 0) * adding appropriate metadata to this column, this operator also tracks the maximum observed event * time. Based on the maximum observed time and a user specified delay, we can calculate the * `watermark` after which we assume we will no longer see late records for a particular time - * period. + * period. Note that event time is measured in milliseconds. */ case class EventTimeWatermarkExec( eventTime: Attribute, delay: CalendarInterval, child: SparkPlan) extends SparkPlan { - // TODO: Use Spark SQL Metrics? - val maxEventTime = new MaxLong - sparkContext.register(maxEventTime) + val eventTimeStats = new EventTimeStatsAccum() + sparkContext.register(eventTimeStats) override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions { iter => val getEventTime = UnsafeProjection.create(eventTime :: Nil, child.output) iter.map { row => - maxEventTime.add(getEventTime(row).getLong(0)) + eventTimeStats.add(getEventTime(row).getLong(0) / 1000) row } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 40e3151337af62044d8e759cced04d97182c1f68..549b93694d9497e10f0eb91b96b5730155adbc34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -41,7 +41,9 @@ import org.apache.spark.util.Clock trait ProgressReporter extends Logging { case class ExecutionStats( - inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress]) + inputRows: Map[Source, Long], + stateOperators: Seq[StateOperatorProgress], + eventTimeStats: Map[String, String]) // Internal state of the stream, required for computing metrics. protected def id: UUID @@ -127,12 +129,7 @@ trait ProgressReporter extends Logging { protected def finishTrigger(hasNewData: Boolean): Unit = { currentTriggerEndTimestamp = triggerClock.getTimeMillis() - val executionStats: ExecutionStats = if (!hasNewData) { - ExecutionStats(Map.empty, Seq.empty) - } else { - extractExecutionStats - } - + val executionStats = extractExecutionStats(hasNewData) val processingTimeSec = (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 1000 @@ -160,10 +157,10 @@ trait ProgressReporter extends Logging { id = id, runId = runId, name = name, - timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)), + timestamp = formatTimestamp(currentTriggerStartTimestamp), batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, - currentWatermark = offsetSeqMetadata.batchWatermarkMs, + eventTime = executionStats.eventTimeStats.asJava, stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, sink = sinkProgress) @@ -184,7 +181,13 @@ trait ProgressReporter extends Logging { } /** Extracts statistics from the most recent query execution. */ - private def extractExecutionStats: ExecutionStats = { + private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { + val watermarkTimestamp = Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) + + if (!hasNewData) { + return ExecutionStats(Map.empty, Seq.empty, watermarkTimestamp) + } + // We want to associate execution plan leaves to sources that generate them, so that we match // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. // Consider the translation from the streaming logical plan to the final executed plan. @@ -241,7 +244,16 @@ trait ProgressReporter extends Logging { numRowsUpdated = node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L)) } - ExecutionStats(numInputRows, stateOperators) + val eventTimeStats = lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + val stats = e.eventTimeStats.value + Map( + "max" -> stats.max, + "min" -> stats.min, + "avg" -> stats.avg).mapValues(formatTimestamp) + }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp + + ExecutionStats(numInputRows, stateOperators, eventTimeStats) } /** Records the duration of running `body` for the next query progress update. */ @@ -257,6 +269,10 @@ trait ProgressReporter extends Logging { result } + private def formatTimestamp(millis: Long): String = { + timestampFormat.format(new Date(millis)) + } + /** Updates the message returned in `status`. */ protected def updateStatusMessage(message: String): Unit = { currentStatus = currentStatus.copy(message = message) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 9fe6819837bbc95974736eb7b9702b9b1dc63e0f..8f97d9570eaa6bafef06f4af4e91f86fd59639f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -382,6 +382,24 @@ class StreamExecution( if (hasNewData) { // Current batch timestamp in milliseconds offsetSeqMetadata.batchTimestampMs = triggerClock.getTimeMillis() + // Update the eventTime watermark if we find one in the plan. + if (lastExecution != null) { + lastExecution.executedPlan.collect { + case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => + logDebug(s"Observed event time stats: ${e.eventTimeStats.value}") + e.eventTimeStats.value.max - e.delay.milliseconds + }.headOption.foreach { newWatermarkMs => + if (newWatermarkMs > offsetSeqMetadata.batchWatermarkMs) { + logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") + offsetSeqMetadata.batchWatermarkMs = newWatermarkMs + } else { + logDebug( + s"Event time didn't move: $newWatermarkMs < " + + s"${offsetSeqMetadata.batchWatermarkMs}") + } + } + } + updateStatusMessage("Writing offsets to log") reportTimeTaken("walCommit") { assert(offsetLog.add( @@ -485,21 +503,6 @@ class StreamExecution( sink.addBatch(currentBatchId, nextBatch) } - // Update the eventTime watermark if we find one in the plan. - lastExecution.executedPlan.collect { - case e: EventTimeWatermarkExec => - logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}") - (e.maxEventTime.value / 1000) - e.delay.milliseconds() - }.headOption.foreach { newWatermark => - if (newWatermark > offsetSeqMetadata.batchWatermarkMs) { - logInfo(s"Updating eventTime watermark to: $newWatermark ms") - offsetSeqMetadata.batchWatermarkMs = newWatermark - } else { - logTrace(s"Event time didn't move: $newWatermark < " + - s"$offsetSeqMetadata.currentEventTimeWatermark") - } - } - awaitBatchLock.lock() try { // Wake up any threads that are waiting for the stream to progress. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index d1568758b7a430c978ff609388699f00ea7cc5c4..e219cfde1265639f43e8ec7b07a5f6fd888ccbef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.streaming import java.{util => ju} +import java.lang.{Long => JLong} import java.util.UUID import scala.collection.JavaConverters._ @@ -29,7 +30,6 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.util.DateTimeUtils /** * :: Experimental :: @@ -61,13 +61,20 @@ class StateOperatorProgress private[sql]( * @param id An unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param timestamp Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps. * @param batchId A unique id for the current batch of data being processed. Note that in the * case of retries after a failure a given batchId my be executed more than once. * Similarly, when there is no data to be processed, the batchId will not be * incremented. * @param durationMs The amount of time taken to perform various operations in milliseconds. - * @param currentWatermark The current event time watermark in milliseconds + * @param eventTime Statistics of event time seen in this batch. It may contain the following keys: + * { + * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger + * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger + * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger + * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger + * } + * All timestamps are in ISO8601 format, i.e. UTC timestamps. * @param stateOperators Information about operators in the query that store state. * @param sources detailed statistics on data being read from each of the streaming sources. * @since 2.1.0 @@ -79,8 +86,8 @@ class StreamingQueryProgress private[sql]( val name: String, val timestamp: String, val batchId: Long, - val durationMs: ju.Map[String, java.lang.Long], - val currentWatermark: Long, + val durationMs: ju.Map[String, JLong], + val eventTime: ju.Map[String, String], val stateOperators: Array[StateOperatorProgress], val sources: Array[SourceProgress], val sink: SinkProgress) { @@ -107,6 +114,13 @@ class StreamingQueryProgress private[sql]( if (value.isNaN || value.isInfinity) JNothing else JDouble(value) } + /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */ + def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.asScala.keySet.toSeq.sorted + keys.map { k => k -> valueToJValue(map.get(k)) : JObject }.reduce(_ ~ _) + } + ("id" -> JString(id.toString)) ~ ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ @@ -114,11 +128,8 @@ class StreamingQueryProgress private[sql]( ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ - ("durationMs" -> durationMs - .asScala - .map { case (k, v) => k -> JInt(v.toLong): JObject } - .reduce(_ ~ _)) ~ - ("currentWatermark" -> JInt(currentWatermark)) ~ + ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~ + ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~ ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ ("sink" -> sink.jsonValue) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index f75f5b537e41b43d12cbe8212e3599831cc55027..7c6745ac8285a2c01674e2b1fd96b7494e0bee8a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -185,9 +185,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryProgressEvent serialization") { def testSerialization(event: QueryProgressEvent): Unit = { + import scala.collection.JavaConverters._ val json = JsonProtocol.sparkEventToJson(event) val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent] assert(newEvent.progress.json === event.progress.json) // json as a proxy for equality + assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala) + assert(newEvent.progress.eventTime.asScala === event.progress.eventTime.asScala) } testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress1)) testSerialization(new QueryProgressEvent(StreamingQueryStatusAndProgressSuite.testProgress2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 193c943f83be8113325ffca9806eb28b1bfd65f5..c970743a31ad6c7ea7e3e9770f7018b308895442 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -44,7 +44,12 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "durationMs" : { | "total" : 0 | }, - | "currentWatermark" : 3, + | "eventTime" : { + | "avg" : "2016-12-05T20:54:20.827Z", + | "max" : "2016-12-05T20:54:20.827Z", + | "min" : "2016-12-05T20:54:20.827Z", + | "watermark" : "2016-12-05T20:54:20.827Z" + | }, | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1 @@ -76,7 +81,6 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "durationMs" : { | "total" : 0 | }, - | "currentWatermark" : 3, | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1 @@ -134,7 +138,11 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, + eventTime = Map( + "max" -> "2016-12-05T20:54:20.827Z", + "min" -> "2016-12-05T20:54:20.827Z", + "avg" -> "2016-12-05T20:54:20.827Z", + "watermark" -> "2016-12-05T20:54:20.827Z").asJava, stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( @@ -156,7 +164,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, - currentWatermark = 3L, + eventTime = Map.empty[String, String].asJava, // empty maps should be handled correctly stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), sources = Array( new SourceProgress( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index c66d6b1f8d8e614fbde69cd71f951194171f6128..afd788ce3ddfd3cd9537a9ab98c50b7601e76a5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.streaming +import scala.collection.JavaConverters._ + import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 12f3c3e5ff3d91c63f8585d1cb1c76e91ca9e17d..f1cc19c6e235d01852f63699c1f83cd1988f0c48 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.streaming +import java.{util => ju} +import java.text.SimpleDateFormat + import org.scalatest.BeforeAndAfter import org.apache.spark.internal.Logging @@ -50,8 +53,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { } - test("watermark metric") { - + test("event time and watermark metrics") { val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() @@ -61,21 +63,43 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { .agg(count("*") as 'count) .select($"window".getField("start").cast("long").as[Long], $"count".as[Long]) + def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = AssertOnQuery { q => + body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime) + true + } + testStream(windowedAggregation)( AddData(inputData, 15), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 5000 + assertEventStats { e => + assert(e.get("max") === formatTimestamp(15)) + assert(e.get("min") === formatTimestamp(15)) + assert(e.get("avg") === formatTimestamp(15)) + assert(e.get("watermark") === formatTimestamp(0)) }, - AddData(inputData, 15), + AddData(inputData, 10, 12, 14), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 5000 + assertEventStats { e => + assert(e.get("max") === formatTimestamp(14)) + assert(e.get("min") === formatTimestamp(10)) + assert(e.get("avg") === formatTimestamp(12)) + assert(e.get("watermark") === formatTimestamp(5)) }, AddData(inputData, 25), CheckAnswer(), - AssertOnQuery { query => - query.lastProgress.currentWatermark === 15000 + assertEventStats { e => + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(5)) + }, + AddData(inputData, 25), + CheckAnswer((10, 3)), + assertEventStats { e => + assert(e.get("max") === formatTimestamp(25)) + assert(e.get("min") === formatTimestamp(25)) + assert(e.get("avg") === formatTimestamp(25)) + assert(e.get("watermark") === formatTimestamp(15)) } ) } @@ -206,4 +230,11 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { CheckAnswer((10, 1)) ) } + + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(ju.TimeZone.getTimeZone("UTC")) + + private def formatTimestamp(sec: Long): String = { + timestampFormat.format(new ju.Date(sec * 1000)) + } }