From 84f1b25f316a42ce4d3b69a3e136d0db41c9aec2 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Tue, 18 Jul 2017 16:29:45 -0700
Subject: [PATCH] [SPARK-21462][SS] Added batchId to
 StreamingQueryProgress.json

## What changes were proposed in this pull request?

- Added batchId to StreamingQueryProgress.json as that was missing from the generated json.
- Also, removed recently added numPartitions from StatefulOperatorProgress as this value does not change through the query run, and there are other ways to find that.

## How was this patch tested?
Updated unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18675 from tdas/SPARK-21462.
---
 .../sql/execution/streaming/statefulOperators.scala  |  3 +--
 .../org/apache/spark/sql/streaming/progress.scala    |  9 ++++-----
 .../StreamingQueryStatusAndProgressSuite.scala       | 12 ++++++------
 3 files changed, 11 insertions(+), 13 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 3ca7f4b145..6addab69f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -87,8 +87,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
     new StateOperatorProgress(
       numRowsTotal = longMetric("numTotalStateRows").value,
       numRowsUpdated = longMetric("numUpdatedStateRows").value,
-      memoryUsedBytes = longMetric("stateMemory").value,
-      numPartitions = this.sqlContext.conf.numShufflePartitions)
+      memoryUsedBytes = longMetric("stateMemory").value)
   }
 
   /** Records the duration of running `body` for the next query progress update. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 81a2387b80..3000c4233c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -38,8 +38,7 @@ import org.apache.spark.annotation.InterfaceStability
 class StateOperatorProgress private[sql](
     val numRowsTotal: Long,
     val numRowsUpdated: Long,
-    val memoryUsedBytes: Long,
-    val numPartitions: Long
+    val memoryUsedBytes: Long
   ) extends Serializable {
 
   /** The compact JSON representation of this progress. */
@@ -49,13 +48,12 @@ class StateOperatorProgress private[sql](
   def prettyJson: String = pretty(render(jsonValue))
 
   private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress =
-    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numPartitions)
+    new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes)
 
   private[sql] def jsonValue: JValue = {
     ("numRowsTotal" -> JInt(numRowsTotal)) ~
     ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
-    ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
-    ("numPartitions" -> JInt(numPartitions))
+    ("memoryUsedBytes" -> JInt(memoryUsedBytes))
   }
 }
 
@@ -131,6 +129,7 @@ class StreamingQueryProgress private[sql](
     ("runId" -> JString(runId.toString)) ~
     ("name" -> JString(name)) ~
     ("timestamp" -> JString(timestamp)) ~
+    ("batchId" -> JInt(batchId)) ~
     ("numInputRows" -> JInt(numInputRows)) ~
     ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
     ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d3cafac4f1..79bb827e0d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -43,6 +43,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
         |  "runId" : "${testProgress1.runId.toString}",
         |  "name" : "myName",
         |  "timestamp" : "2016-12-05T20:54:20.827Z",
+        |  "batchId" : 2,
         |  "numInputRows" : 678,
         |  "inputRowsPerSecond" : 10.0,
         |  "durationMs" : {
@@ -57,8 +58,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
         |  "stateOperators" : [ {
         |    "numRowsTotal" : 0,
         |    "numRowsUpdated" : 1,
-        |    "memoryUsedBytes" : 2,
-        |    "numPartitions" : 4
+        |    "memoryUsedBytes" : 2
         |  } ],
         |  "sources" : [ {
         |    "description" : "source",
@@ -83,6 +83,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
          |  "runId" : "${testProgress2.runId.toString}",
          |  "name" : null,
          |  "timestamp" : "2016-12-05T20:54:20.827Z",
+         |  "batchId" : 2,
          |  "numInputRows" : 678,
          |  "durationMs" : {
          |    "total" : 0
@@ -90,8 +91,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
          |  "stateOperators" : [ {
          |    "numRowsTotal" : 0,
          |    "numRowsUpdated" : 1,
-         |    "memoryUsedBytes" : 2,
-         |    "numPartitions" : 4
+         |    "memoryUsedBytes" : 2
          |  } ],
          |  "sources" : [ {
          |    "description" : "source",
@@ -230,7 +230,7 @@ object StreamingQueryStatusAndProgressSuite {
       "avg" -> "2016-12-05T20:54:20.827Z",
       "watermark" -> "2016-12-05T20:54:20.827Z").asJava),
     stateOperators = Array(new StateOperatorProgress(
-      numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numPartitions = 4)),
+      numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
     sources = Array(
       new SourceProgress(
         description = "source",
@@ -254,7 +254,7 @@ object StreamingQueryStatusAndProgressSuite {
     // empty maps should be handled correctly
     eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
     stateOperators = Array(new StateOperatorProgress(
-      numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numPartitions = 4)),
+      numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)),
     sources = Array(
       new SourceProgress(
         description = "source",
-- 
GitLab