diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index a3e9566832d06f6acfbdaa64cfc036cffc78b027..74cd637d881557eadf58f1d10ae8db7773113ff2 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     stageData.shuffleReadBytes += shuffleReadDelta
     execSummary.shuffleRead += shuffleReadDelta
 
+    val inputBytesDelta =
+      (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
+      - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
+    stageData.inputBytes += inputBytesDelta
+    execSummary.inputBytes += inputBytesDelta
+
     val diskSpillDelta =
       taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
     stageData.diskBytesSpilled += diskSpillDelta
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index f5ba31c309277556ec772770d7b52d65be79639d..147ec0bc52e39c905549500ee4231ddb620560ba 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark._
 import org.apache.spark.{LocalSparkContext, SparkConf, Success}
-import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, TaskMetrics}
+import org.apache.spark.executor._
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
@@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       taskMetrics.executorRunTime = base + 4
       taskMetrics.diskBytesSpilled = base + 5
       taskMetrics.memoryBytesSpilled = base + 6
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      taskMetrics.inputMetrics = Some(inputMetrics)
+      inputMetrics.bytesRead = base + 7
       taskMetrics
     }
 
@@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(stage1Data.diskBytesSpilled == 205)
     assert(stage0Data.memoryBytesSpilled == 112)
     assert(stage1Data.memoryBytesSpilled == 206)
+    assert(stage0Data.inputBytes == 114)
+    assert(stage1Data.inputBytes == 207)
     assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 2)
     assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     assert(stage1Data.diskBytesSpilled == 610)
     assert(stage0Data.memoryBytesSpilled == 412)
     assert(stage1Data.memoryBytesSpilled == 612)
+    assert(stage0Data.inputBytes == 414)
+    assert(stage1Data.inputBytes == 614)
     assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 302)
     assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get