Skip to content
Snippets Groups Projects
Commit df652ea0 authored by Sandy Ryza's avatar Sandy Ryza Committed by Patrick Wendell
Browse files

SPARK-2900. aggregate inputBytes per stage

Author: Sandy Ryza <sandy@cloudera.com>

Closes #1826 from sryza/sandy-spark-2900 and squashes the following commits:

43f9091 [Sandy Ryza] SPARK-2900
parent 5173f3c4
No related branches found
No related tags found
No related merge requests found
......@@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.shuffleReadBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
val inputBytesDelta =
(taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
- oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta
val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
......
......@@ -22,7 +22,7 @@ import org.scalatest.Matchers
import org.apache.spark._
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
......@@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskMetrics.executorRunTime = base + 4
taskMetrics.diskBytesSpilled = base + 5
taskMetrics.memoryBytesSpilled = base + 6
val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
taskMetrics.inputMetrics = Some(inputMetrics)
inputMetrics.bytesRead = base + 7
taskMetrics
}
......@@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.diskBytesSpilled == 205)
assert(stage0Data.memoryBytesSpilled == 112)
assert(stage1Data.memoryBytesSpilled == 206)
assert(stage0Data.inputBytes == 114)
assert(stage1Data.inputBytes == 207)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
......@@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
assert(stage1Data.diskBytesSpilled == 610)
assert(stage0Data.memoryBytesSpilled == 412)
assert(stage1Data.memoryBytesSpilled == 612)
assert(stage0Data.inputBytes == 414)
assert(stage1Data.inputBytes == 614)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment