From df652ea02a3e42d987419308ef14874300347373 Mon Sep 17 00:00:00 2001
From: Sandy Ryza <sandy@cloudera.com>
Date: Sun, 17 Aug 2014 22:39:06 -0700
Subject: [PATCH] SPARK-2900. aggregate inputBytes per stage

Author: Sandy Ryza <sandy@cloudera.com>

Closes #1826 from sryza/sandy-spark-2900 and squashes the following commits:

43f9091 [Sandy Ryza] SPARK-2900
---
 .../org/apache/spark/ui/jobs/JobProgressListener.scala   | 6 ++++++
 .../apache/spark/ui/jobs/JobProgressListenerSuite.scala  | 9 ++++++++-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index a3e9566832..74cd637d88 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     stageData.shuffleReadBytes += shuffleReadDelta
     execSummary.shuffleRead += shuffleReadDelta
 
+    val inputBytesDelta =
+      (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
+      - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
+    stageData.inputBytes += inputBytesDelta
+    execSummary.inputBytes += inputBytesDelta
+
     val diskSpillDelta =
       taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
     stageData.diskBytesSpilled += diskSpillDelta
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index f5ba31c309..147ec0bc52 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark._
 import org.apache.spark.{LocalSparkContext, SparkConf, Success}
-import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics}
+import org.apache.spark.executor._
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
@@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       taskMetrics.executorRunTime = base + 4
       taskMetrics.diskBytesSpilled = base + 5
       taskMetrics.memoryBytesSpilled = base + 6
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      taskMetrics.inputMetrics = Some(inputMetrics)
+      inputMetrics.bytesRead = base + 7
       taskMetrics
     }
 
@@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(stage1Data.diskBytesSpilled == 205)
     assert(stage0Data.memoryBytesSpilled == 112)
     assert(stage1Data.memoryBytesSpilled == 206)
+    assert(stage0Data.inputBytes == 114)
+    assert(stage1Data.inputBytes == 207)
     assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 2)
     assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(stage1Data.diskBytesSpilled == 610)
     assert(stage0Data.memoryBytesSpilled == 412)
     assert(stage1Data.memoryBytesSpilled == 612)
+    assert(stage0Data.inputBytes == 414)
+    assert(stage1Data.inputBytes == 614)
     assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 302)
     assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
-- 
GitLab