From 539bb3cf9573be5cd86e7e6502523ce89c0de170 Mon Sep 17 00:00:00 2001 From: Tathagata Das <tathagata.das1565@gmail.com> Date: Tue, 6 Dec 2016 17:04:26 -0800 Subject: [PATCH] [SPARK-18734][SS] Represent timestamp in StreamingQueryProgress as formatted string instead of millis ## What changes were proposed in this pull request? Easier to read while debugging as a formatted string (in ISO8601 format) than in millis ## How was this patch tested? Updated unit tests Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #16166 from tdas/SPARK-18734. --- .../spark/sql/execution/streaming/ProgressReporter.scala | 8 ++++++-- .../scala/org/apache/spark/sql/streaming/progress.scala | 6 +++--- .../streaming/StreamingQueryStatusAndProgressSuite.scala | 8 ++++---- .../apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index d95f55267e..12d0c1e9b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution.streaming -import java.util.UUID +import java.text.SimpleDateFormat +import java.util.{Date, TimeZone, UUID} import scala.collection.mutable import scala.collection.JavaConverters._ @@ -78,6 +79,9 @@ trait ProgressReporter extends Logging { // The timestamp we report an event that has no input data private var lastNoDataProgressEventTime = Long.MinValue + private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC")) + @volatile protected var currentStatus: StreamingQueryStatus = { new StreamingQueryStatus( @@ -156,7 +160,7 @@ trait ProgressReporter extends Logging { id = id, runId = runId, name = name, - timestamp = currentTriggerStartTimestamp, + timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)), batchId = currentBatchId, durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava, currentWatermark = offsetSeqMetadata.batchWatermarkMs, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index f768080f5d..d1568758b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -29,6 +29,7 @@ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.catalyst.util.DateTimeUtils /** * :: Experimental :: @@ -76,7 +77,7 @@ class StreamingQueryProgress private[sql]( val id: UUID, val runId: UUID, val name: String, - val timestamp: Long, + val timestamp: String, val batchId: Long, val durationMs: ju.Map[String, java.lang.Long], val currentWatermark: Long, @@ -109,7 +110,7 @@ class StreamingQueryProgress private[sql]( ("id" -> JString(id.toString)) ~ ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ - ("timestamp" -> JInt(timestamp)) ~ + ("timestamp" -> JString(timestamp)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ @@ -121,7 +122,6 @@ class StreamingQueryProgress private[sql]( ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ ("sink" -> sink.jsonValue) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 96f19db1a9..193c943f83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -38,7 +38,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress1.id.toString}", | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", - | "timestamp" : 1, + | "timestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { @@ -71,7 +71,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite { | "id" : "${testProgress2.id.toString}", | "runId" : "${testProgress2.runId.toString}", | "name" : null, - | "timestamp" : 1, + | "timestamp" : "2016-12-05T20:54:20.827Z", | "numInputRows" : 678, | "durationMs" : { | "total" : 0 @@ -131,7 +131,7 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = "myName", - timestamp = 1L, + timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, currentWatermark = 3L, @@ -153,7 +153,7 @@ object StreamingQueryStatusAndProgressSuite { id = UUID.randomUUID, runId = UUID.randomUUID, name = null, // should not be present in the json - timestamp = 1L, + timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, currentWatermark = 3L, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 893cb762c6..55dd1a5d51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -243,7 +243,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { assert(progress.id === query.id) assert(progress.name === query.name) assert(progress.batchId === 0) - assert(progress.timestamp === 100) + assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC assert(progress.numInputRows === 2) assert(progress.processedRowsPerSecond === 2.0) -- GitLab