diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 3f667a4a0f9c53d5bafd547289eec99f7db826f6..8f867686a044330cb23f2f677911a76a9415ac21 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 
+import org.apache.spark.executor.InputMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
@@ -41,9 +42,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
     val key = RDDBlockId(rdd.id, partition.index)
     logDebug(s"Looking for partition $key")
     blockManager.get(key) match {
-      case Some(values) =>
+      case Some(blockResult) =>
         // Partition is already materialized, so just return its values
-        new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
+        context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
+        new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
 
       case None =>
         // Acquire a lock for loading this partition
@@ -110,7 +112,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
           logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
           loading.add(id)
         }
-        values.map(_.asInstanceOf[Iterator[T]])
+        values.map(_.data.asInstanceOf[Iterator[T]])
       }
     }
   }
@@ -132,7 +134,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
        * exceptions that can be avoided. */
       updatedBlocks ++= blockManager.put(key, values, storageLevel, tellMaster = true)
       blockManager.get(key) match {
-        case Some(v) => v.asInstanceOf[Iterator[T]]
+        case Some(v) => v.data.asInstanceOf[Iterator[T]]
         case None =>
           logInfo(s"Failure to store $key")
           throw new BlockException(key, s"Block manager failed to return cached value for $key!")
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 350fd74173f6520f7114763f4ea7dd382e4a152e..ac73288442a74e300369872fec02315234c2be27 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -66,6 +66,12 @@ class TaskMetrics extends Serializable {
    */
   var diskBytesSpilled: Long = _
 
+  /**
+   * If this task reads from a HadoopRDD or from persisted data, metrics on how much data was read
+   * are stored here.
+   */
+  var inputMetrics: Option[InputMetrics] = None
+
   /**
    * If this task reads from shuffle output, metrics on getting shuffle data will be collected here
    */
@@ -87,6 +93,29 @@ private[spark] object TaskMetrics {
   def empty: TaskMetrics = new TaskMetrics
 }
 
+/**
+ * :: DeveloperApi ::
+ * Method by which input data was read.  Network means that the data was read over the network
+ * from a remote block manager (which may have stored the data on-disk or in-memory).
+ */
+@DeveloperApi
+object DataReadMethod extends Enumeration with Serializable {
+  type DataReadMethod = Value
+  val Memory, Disk, Hadoop, Network = Value
+}
+
+/**
+ * :: DeveloperApi ::
+ * Metrics about reading input data.
+ */
+@DeveloperApi
+case class InputMetrics(readMethod: DataReadMethod.Value) {
+  /**
+   * Total bytes read.
+   */
+  var bytesRead: Long = 0L
+}
+
 
 /**
  * :: DeveloperApi ::
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index c64da8804d1660fe3fbe3a46353f63d98112dc90..2673ec22509e9616f6596ed3b2d04fce2fa48971 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -46,7 +46,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
     val blockManager = SparkEnv.get.blockManager
     val blockId = split.asInstanceOf[BlockRDDPartition].blockId
     blockManager.get(blockId) match {
-      case Some(block) => block.asInstanceOf[Iterator[T]]
+      case Some(block) => block.data.asInstanceOf[Iterator[T]]
       case None =>
         throw new Exception("Could not compute split, block " + blockId + " not found")
     }
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2aa111d600e9b98913400904325f097137f7d8e8..98dcbf4e2dbfa0e15f6ac360c88123c4ada8bbab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -38,6 +38,7 @@ import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 import org.apache.spark.util.NextIterator
 
 /**
@@ -196,6 +197,20 @@ class HadoopRDD[K, V](
       context.addOnCompleteCallback{ () => closeIfNeeded() }
       val key: K = reader.createKey()
       val value: V = reader.createValue()
+
+      // Set the task input metrics.
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      try {
+        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+         * always at record boundaries, so tasks may need to read into other splits to complete
+         * a record. */
+        inputMetrics.bytesRead = split.inputSplit.value.getLength()
+      } catch {
+        case e: java.io.IOException =>
+          logWarning("Unable to get input size to set InputMetrics for task", e)
+      }
+      context.taskMetrics.inputMetrics = Some(inputMetrics)
+
       override def getNext() = {
         try {
           finished = !reader.next(key, value)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index ac1ccc06f238ab5b5dfd5f7db6a94e4eb040ffad..f2b3a64bf13455800ba013c0c5517e893d82f0d2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -31,6 +31,7 @@ import org.apache.spark.Logging
 import org.apache.spark.Partition
 import org.apache.spark.SerializableWritable
 import org.apache.spark.{SparkContext, TaskContext}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 
 private[spark] class NewHadoopPartition(
     rddId: Int,
@@ -112,6 +113,18 @@ class NewHadoopRDD[K, V](
         split.serializableHadoopSplit.value, hadoopAttemptContext)
       reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
 
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      try {
+        /* bytesRead may not exactly equal the bytes read by a task: split boundaries aren't
+         * always at record boundaries, so tasks may need to read into other splits to complete
+         * a record. */
+        inputMetrics.bytesRead = split.serializableHadoopSplit.value.getLength()
+      } catch {
+        case e: Exception =>
+          logWarning("Unable to get input split size in order to set task input bytes", e)
+      }
+      context.taskMetrics.inputMetrics = Some(inputMetrics)
+
       // Register an on-task-completion callback to close the input stream.
       context.addOnCompleteCallback(() => close())
       var havePair = false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index a1e21cad48b9b9b2028755248c0741e44b1a7c6d..47dd112f68325e62feef7daa346b88ef6fece839 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -26,7 +26,9 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
 
 /**
  * :: DeveloperApi ::
@@ -160,7 +162,13 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
                " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
                " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
     val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
-    val readMetrics = taskMetrics.shuffleReadMetrics match {
+    val inputMetrics = taskMetrics.inputMetrics match {
+      case Some(metrics) =>
+        " READ_METHOD=" + metrics.readMethod.toString +
+        " INPUT_BYTES=" + metrics.bytesRead
+      case None => ""
+    }
+    val shuffleReadMetrics = taskMetrics.shuffleReadMetrics match {
       case Some(metrics) =>
         " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
         " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
@@ -174,7 +182,8 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
       case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
       case None => ""
     }
-    stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
+    stageLogInfo(stageId, status + info + executorRunTime + inputMetrics + shuffleReadMetrics +
+      writeMetrics)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index d2f7baf928b6268db6008d9f4c905691f10bcf9a..0db0a5bc7341be2269dfbd61037d396e2bf7f94b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,6 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
 import sun.nio.ch.DirectBuffer
 
 import org.apache.spark._
+import org.apache.spark.executor.{DataReadMethod, InputMetrics}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
@@ -39,6 +40,15 @@ private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValu
 private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
 private[spark] case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends BlockValues
 
+/* Class for returning a fetched block and associated metrics. */
+private[spark] class BlockResult(
+    val data: Iterator[Any],
+    readMethod: DataReadMethod.Value,
+    bytes: Long) {
+  val inputMetrics = new InputMetrics(readMethod)
+  inputMetrics.bytesRead = bytes
+}
+
 private[spark] class BlockManager(
     executorId: String,
     actorSystem: ActorSystem,
@@ -334,9 +344,9 @@ private[spark] class BlockManager(
   /**
    * Get block from local block manager.
    */
-  def getLocal(blockId: BlockId): Option[Iterator[Any]] = {
+  def getLocal(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting local block $blockId")
-    doGetLocal(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+    doGetLocal(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
   }
 
   /**
@@ -355,11 +365,11 @@ private[spark] class BlockManager(
             blockId, s"Block $blockId not found on disk, though it should be")
       }
     } else {
-      doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+      doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
     }
   }
 
-  private def doGetLocal(blockId: BlockId, asValues: Boolean): Option[Any] = {
+  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
     val info = blockInfo.get(blockId).orNull
     if (info != null) {
       info.synchronized {
@@ -386,14 +396,14 @@ private[spark] class BlockManager(
         // Look for the block in memory
         if (level.useMemory) {
           logDebug(s"Getting block $blockId from memory")
-          val result = if (asValues) {
-            memoryStore.getValues(blockId)
+          val result = if (asBlockResult) {
+            memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
           } else {
             memoryStore.getBytes(blockId)
           }
           result match {
             case Some(values) =>
-              return Some(values)
+              return result
             case None =>
               logDebug(s"Block $blockId not found in memory")
           }
@@ -405,10 +415,11 @@ private[spark] class BlockManager(
           if (tachyonStore.contains(blockId)) {
             tachyonStore.getBytes(blockId) match {
               case Some(bytes) =>
-                if (!asValues) {
+                if (!asBlockResult) {
                   return Some(bytes)
                 } else {
-                  return Some(dataDeserialize(blockId, bytes))
+                  return Some(new BlockResult(
+                    dataDeserialize(blockId, bytes), DataReadMethod.Memory, info.size))
                 }
               case None =>
                 logDebug(s"Block $blockId not found in tachyon")
@@ -429,14 +440,15 @@ private[spark] class BlockManager(
 
           if (!level.useMemory) {
             // If the block shouldn't be stored in memory, we can just return it
-            if (asValues) {
-              return Some(dataDeserialize(blockId, bytes))
+            if (asBlockResult) {
+              return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
+                info.size))
             } else {
               return Some(bytes)
             }
           } else {
             // Otherwise, we also have to store something in the memory store
-            if (!level.deserialized || !asValues) {
+            if (!level.deserialized || !asBlockResult) {
               /* We'll store the bytes in memory if the block's storage level includes
                * "memory serialized", or if it should be cached as objects in memory
                * but we only requested its serialized bytes. */
@@ -445,7 +457,7 @@ private[spark] class BlockManager(
               memoryStore.putBytes(blockId, copyForMemory, level)
               bytes.rewind()
             }
-            if (!asValues) {
+            if (!asBlockResult) {
               return Some(bytes)
             } else {
               val values = dataDeserialize(blockId, bytes)
@@ -457,12 +469,12 @@ private[spark] class BlockManager(
                 memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data
                   match {
                     case Left(values2) =>
-                      return Some(values2)
+                      return Some(new BlockResult(values2, DataReadMethod.Disk, info.size))
                     case _ =>
-                      throw new SparkException("Memory store did not return an iterator")
+                      throw new SparkException("Memory store did not return back an iterator")
                   }
               } else {
-                return Some(values)
+                return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
               }
             }
           }
@@ -477,9 +489,9 @@ private[spark] class BlockManager(
   /**
    * Get block from remote block managers.
    */
-  def getRemote(blockId: BlockId): Option[Iterator[Any]] = {
+  def getRemote(blockId: BlockId): Option[BlockResult] = {
     logDebug(s"Getting remote block $blockId")
-    doGetRemote(blockId, asValues = true).asInstanceOf[Option[Iterator[Any]]]
+    doGetRemote(blockId, asBlockResult = true).asInstanceOf[Option[BlockResult]]
   }
 
   /**
@@ -487,10 +499,10 @@ private[spark] class BlockManager(
    */
   def getRemoteBytes(blockId: BlockId): Option[ByteBuffer] = {
     logDebug(s"Getting remote block $blockId as bytes")
-    doGetRemote(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
+    doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
   }
 
-  private def doGetRemote(blockId: BlockId, asValues: Boolean): Option[Any] = {
+  private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
     require(blockId != null, "BlockId is null")
     val locations = Random.shuffle(master.getLocations(blockId))
     for (loc <- locations) {
@@ -498,8 +510,11 @@ private[spark] class BlockManager(
       val data = BlockManagerWorker.syncGetBlock(
         GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
       if (data != null) {
-        if (asValues) {
-          return Some(dataDeserialize(blockId, data))
+        if (asBlockResult) {
+          return Some(new BlockResult(
+            dataDeserialize(blockId, data),
+            DataReadMethod.Network,
+            data.limit()))
         } else {
           return Some(data)
         }
@@ -513,7 +528,7 @@ private[spark] class BlockManager(
   /**
    * Get a block from the block manager (either local or remote).
    */
-  def get(blockId: BlockId): Option[Iterator[Any]] = {
+  def get(blockId: BlockId): Option[BlockResult] = {
     val local = getLocal(blockId)
     if (local.isDefined) {
       logInfo(s"Found block $blockId locally")
@@ -792,7 +807,7 @@ private[spark] class BlockManager(
    * Read a block consisting of a single object.
    */
   def getSingle(blockId: BlockId): Option[Any] = {
-    get(blockId).map(_.next())
+    get(blockId).map(_.data.next())
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index a107c5182b3be98ef3e816d2e5adf47cb3ee132b..328be158db680fc9fe2ccfca426724ee66f75d0c 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -78,7 +78,7 @@ private[spark] object ThreadingTest {
         val startTime = System.currentTimeMillis()
         manager.get(blockId) match {
           case Some(retrievedBlock) =>
-            assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList,
+            assert(retrievedBlock.data.toList.asInstanceOf[List[Int]] == block.toList,
               "Block " + blockId + " did not match")
             println("Got block " + blockId + " in " +
               (System.currentTimeMillis - startTime) + " ms")
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 6cfc46c7e78fe0d728c269845cc5cec7fe742abf..9625337ae21a52f0643b01e4bb7536215b84f594 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -72,6 +72,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
     "Complete Tasks",
     "Total Tasks",
     "Task Time",
+    "Input Bytes",
     "Shuffle Read",
     "Shuffle Write")
 
@@ -97,6 +98,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
       <td>{values("Complete Tasks")}</td>
       <td>{values("Total Tasks")}</td>
       <td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td>
+      <td sorttable_customkey={values("Input Bytes")}>{Utils.bytesToString(values("Input Bytes").toLong)}</td>
       <td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td>
       <td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td>
     </tr>
@@ -119,6 +121,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
     val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
     val totalTasks = activeTasks + failedTasks + completedTasks
     val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
+    val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0)
     val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
     val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
 
@@ -136,6 +139,7 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
       completedTasks,
       totalTasks,
       totalDuration,
+      totalInputBytes,
       totalShuffleRead,
       totalShuffleWrite,
       maxMem
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 91d37b835b19dd5d5ccbe0be399a654bf9567bf5..58eeb86bf9a3a5538685a574969fd0506d7a59dc 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -46,6 +46,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
   val executorToTasksComplete = HashMap[String, Int]()
   val executorToTasksFailed = HashMap[String, Int]()
   val executorToDuration = HashMap[String, Long]()
+  val executorToInputBytes = HashMap[String, Long]()
   val executorToShuffleRead = HashMap[String, Long]()
   val executorToShuffleWrite = HashMap[String, Long]()
 
@@ -72,6 +73,10 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener)
       // Update shuffle read/write
       val metrics = taskEnd.taskMetrics
       if (metrics != null) {
+        metrics.inputMetrics.foreach { inputMetrics =>
+          executorToInputBytes(eid) =
+            executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead
+        }
         metrics.shuffleReadMetrics.foreach { shuffleRead =>
           executorToShuffleRead(eid) =
             executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
index 2aaf6329b792de6a11ee7f756b32c67e8ab8c75e..c4a8996c0b9a9e803bc7734c24a12bcd4a7df665 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
@@ -28,6 +28,7 @@ class ExecutorSummary {
   var taskTime : Long = 0
   var failedTasks : Int = 0
   var succeededTasks : Int = 0
+  var inputBytes: Long = 0
   var shuffleRead : Long = 0
   var shuffleWrite : Long = 0
   var memoryBytesSpilled : Long = 0
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index add0e9878a546331cd62a3b3c3bfda861ad89c88..2a34a9af925d6f9f948e2c31a9f21cbc2f6e49e1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -43,6 +43,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
         <th>Total Tasks</th>
         <th>Failed Tasks</th>
         <th>Succeeded Tasks</th>
+        <th>Input Bytes</th>
         <th>Shuffle Read</th>
         <th>Shuffle Write</th>
         <th>Shuffle Spill (Memory)</th>
@@ -75,6 +76,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
             <td>{v.failedTasks + v.succeededTasks}</td>
             <td>{v.failedTasks}</td>
             <td>{v.succeededTasks}</td>
+            <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
             <td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
             <td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
             <td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 381a5443df8b5773d595d0b03c265d9bbf56ccd3..2286a7f952f28b2fbb8573c8b9bffff122e3cb71 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -46,13 +46,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
 
-  // Total metrics reflect metrics only for completed tasks
-  var totalTime = 0L
-  var totalShuffleRead = 0L
-  var totalShuffleWrite = 0L
-
   // TODO: Should probably consolidate all following into a single hash map.
   val stageIdToTime = HashMap[Int, Long]()
+  val stageIdToInputBytes = HashMap[Int, Long]()
   val stageIdToShuffleRead = HashMap[Int, Long]()
   val stageIdToShuffleWrite = HashMap[Int, Long]()
   val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
@@ -93,6 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
       val toRemove = math.max(retainedStages / 10, 1)
       stages.take(toRemove).foreach { s =>
         stageIdToTime.remove(s.stageId)
+        stageIdToInputBytes.remove(s.stageId)
         stageIdToShuffleRead.remove(s.stageId)
         stageIdToShuffleWrite.remove(s.stageId)
         stageIdToMemoryBytesSpilled.remove(s.stageId)
@@ -171,6 +168,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
 
           val metrics = taskEnd.taskMetrics
           if (metrics != null) {
+            metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
             metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
             metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
             y.memoryBytesSpilled += metrics.memoryBytesSpilled
@@ -200,18 +198,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
       stageIdToTime.getOrElseUpdate(sid, 0L)
       val time = metrics.map(_.executorRunTime).getOrElse(0L)
       stageIdToTime(sid) += time
-      totalTime += time
+
+      stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+      val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
+      stageIdToInputBytes(sid) += inputBytes
 
       stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
       val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
       stageIdToShuffleRead(sid) += shuffleRead
-      totalShuffleRead += shuffleRead
 
       stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
       val shuffleWrite =
         metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
       stageIdToShuffleWrite(sid) += shuffleWrite
-      totalShuffleWrite += shuffleWrite
 
       stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
       val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8e3d5d1cd4c6be964f34e82fdc6f7799be39fd7a..afb8ed754ff8b13dd34c526e96f5c5dd67ab8435 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -48,6 +48,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
       val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
 
       val numCompleted = tasks.count(_.taskInfo.finished)
+      val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
+      val hasInput = inputBytes > 0
       val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
       val hasShuffleRead = shuffleReadBytes > 0
       val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
@@ -69,6 +71,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
               <strong>Total task time across all tasks: </strong>
               {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
             </li>
+            {if (hasInput)
+              <li>
+                <strong>Input: </strong>
+                {Utils.bytesToString(inputBytes)}
+              </li>
+            }
             {if (hasShuffleRead)
               <li>
                 <strong>Shuffle read: </strong>
@@ -98,13 +106,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
         Seq(
           "Index", "ID", "Attempt", "Status", "Locality Level", "Executor",
           "Launch Time", "Duration", "GC Time") ++
+        {if (hasInput) Seq("Input") else Nil} ++
         {if (hasShuffleRead) Seq("Shuffle Read")  else Nil} ++
         {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++
         {if (hasBytesSpilled) Seq("Shuffle Spill (Memory)", "Shuffle Spill (Disk)") else Nil} ++
         Seq("Errors")
 
       val taskTable = UIUtils.listingTable(
-        taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
+        taskHeaders, taskRow(hasInput, hasShuffleRead, hasShuffleWrite, hasBytesSpilled), tasks)
 
       // Excludes tasks which failed and have incomplete metrics
       val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
@@ -159,6 +168,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           def getQuantileCols(data: Seq[Double]) =
             Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
 
+          val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
+            metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
+          }
+          val inputQuantiles = "Input" +: getQuantileCols(inputSizes)
+
           val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
             metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
           }
@@ -186,6 +200,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
             serviceQuantiles,
             gettingResultQuantiles,
             schedulerDelayQuantiles,
+            if (hasInput) inputQuantiles else Nil,
             if (hasShuffleRead) shuffleReadQuantiles else Nil,
             if (hasShuffleWrite) shuffleWriteQuantiles else Nil,
             if (hasBytesSpilled) memoryBytesSpilledQuantiles else Nil,
@@ -209,8 +224,11 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
     }
   }
 
-  def taskRow(shuffleRead: Boolean, shuffleWrite: Boolean, bytesSpilled: Boolean)
-      (taskData: TaskUIData): Seq[Node] = {
+  def taskRow(
+      hasInput: Boolean,
+      hasShuffleRead: Boolean,
+      hasShuffleWrite: Boolean,
+      hasBytesSpilled: Boolean)(taskData: TaskUIData): Seq[Node] = {
     taskData match { case TaskUIData(info, metrics, errorMessage) =>
       val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
         else metrics.map(_.executorRunTime).getOrElse(1L)
@@ -219,6 +237,12 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
       val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
       val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
 
+      val maybeInput = metrics.flatMap(_.inputMetrics)
+      val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
+      val inputReadable = maybeInput
+        .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})")
+        .getOrElse("")
+
       val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead)
       val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
       val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")
@@ -265,12 +289,17 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""}
         </td>
         -->
-        {if (shuffleRead) {
+        {if (hasInput) {
+          <td sorttable_customkey={inputSortable}>
+            {inputReadable}
+          </td>
+        }}
+        {if (hasShuffleRead) {
            <td sorttable_customkey={shuffleReadSortable}>
              {shuffleReadReadable}
            </td>
         }}
-        {if (shuffleWrite) {
+        {if (hasShuffleWrite) {
            <td sorttable_customkey={writeTimeSortable}>
              {writeTimeReadable}
            </td>
@@ -278,7 +307,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
              {shuffleWriteReadable}
            </td>
         }}
-        {if (bytesSpilled) {
+        {if (hasBytesSpilled) {
           <td sorttable_customkey={memoryBytesSpilledSortable}>
             {memoryBytesSpilledReadable}
           </td>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index b2b6cc6a6ec2d1c40e0022784199e1a3ca9faa68..a9ac6d5bee9c992e42f3131f8703b76c092ed17e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -43,6 +43,7 @@ private[ui] class StageTableBase(
     <th>Submitted</th>
     <th>Duration</th>
     <th>Tasks: Succeeded/Total</th>
+    <th>Input</th>
     <th>Shuffle Read</th>
     <th>Shuffle Write</th>
   }
@@ -123,6 +124,11 @@ private[ui] class StageTableBase(
       case _ => ""
     }
     val totalTasks = s.numTasks
+    val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
+    val inputRead = inputSortable match {
+      case 0 => ""
+      case b => Utils.bytesToString(b)
+    }
     val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
     val shuffleRead = shuffleReadSortable match {
       case 0 => ""
@@ -150,6 +156,7 @@ private[ui] class StageTableBase(
     <td class="progress-cell">
       {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
     </td>
+    <td sorttable_customekey={inputSortable.toString}>{inputRead}</td>
     <td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td>
     <td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td>
   }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 6245b4b8023c2a6adb28b370a14399f101364877..26c9c9d6034ec977eb583fba3062b9047cbec8c0 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -26,7 +26,8 @@ import org.json4s.DefaultFormats
 import org.json4s.JsonDSL._
 import org.json4s.JsonAST._
 
-import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleReadMetrics,
+  ShuffleWriteMetrics, TaskMetrics}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 import org.apache.spark._
@@ -213,6 +214,8 @@ private[spark] object JsonProtocol {
       taskMetrics.shuffleReadMetrics.map(shuffleReadMetricsToJson).getOrElse(JNothing)
     val shuffleWriteMetrics =
       taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
+    val inputMetrics =
+      taskMetrics.inputMetrics.map(inputMetricsToJson).getOrElse(JNothing)
     val updatedBlocks =
       taskMetrics.updatedBlocks.map { blocks =>
         JArray(blocks.toList.map { case (id, status) =>
@@ -230,6 +233,7 @@ private[spark] object JsonProtocol {
     ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
     ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
     ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
+    ("Input Metrics" -> inputMetrics) ~
     ("Updated Blocks" -> updatedBlocks)
   }
 
@@ -247,6 +251,11 @@ private[spark] object JsonProtocol {
     ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime)
   }
 
+  def inputMetricsToJson(inputMetrics: InputMetrics): JValue = {
+    ("Data Read Method" -> inputMetrics.readMethod.toString) ~
+    ("Bytes Read" -> inputMetrics.bytesRead)
+  }
+
   def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
     val reason = Utils.getFormattedClassName(taskEndReason)
     val json = taskEndReason match {
@@ -528,6 +537,8 @@ private[spark] object JsonProtocol {
       Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)
     metrics.shuffleWriteMetrics =
       Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson)
+    metrics.inputMetrics =
+      Utils.jsonOption(json \ "Input Metrics").map(inputMetricsFromJson)
     metrics.updatedBlocks =
       Utils.jsonOption(json \ "Updated Blocks").map { value =>
         value.extract[List[JValue]].map { block =>
@@ -557,6 +568,13 @@ private[spark] object JsonProtocol {
     metrics
   }
 
+  def inputMetricsFromJson(json: JValue): InputMetrics = {
+    val metrics = new InputMetrics(
+      DataReadMethod.withName((json \ "Data Read Method").extract[String]))
+    metrics.bytesRead = (json \ "Bytes Read").extract[Long]
+    metrics
+  }
+
   def taskEndReasonFromJson(json: JValue): TaskEndReason = {
     val success = Utils.getFormattedClassName(Success)
     val resubmitted = Utils.getFormattedClassName(Resubmitted)
diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
index 4f178db40f638af35c7e1484616a87f6e204d8bc..7f5d0b061e8b0bbc7ab43cebf96b31c36db4d218 100644
--- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.scalatest.{BeforeAndAfter, FunSuite}
 import org.scalatest.mock.EasyMockSugar
 
+import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage._
 
@@ -66,7 +67,8 @@ class CacheManagerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar
 
   test("get cached rdd") {
     expecting {
-      blockManager.get(RDDBlockId(0, 0)).andReturn(Some(ArrayBuffer(5, 6, 7).iterator))
+      val result = new BlockResult(ArrayBuffer(5, 6, 7).iterator, DataReadMethod.Memory, 12)
+      blockManager.get(RDDBlockId(0, 0)).andReturn(Some(result))
     }
 
     whenExecuting(blockManager) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6df0a080961b6b7358d4e642e86f5a6e2c06b4a1..71f48e295eccacd28eb607d51423d00bb521e636 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -251,6 +251,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
       taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0l)
         if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) {
+          taskMetrics.inputMetrics should not be ('defined)
           taskMetrics.shuffleWriteMetrics should be ('defined)
           taskMetrics.shuffleWriteMetrics.get.shuffleBytesWritten should be > (0l)
         }
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d7dbe5164b7f693be1589adb66db795b4e44a153..23cb6905bfdeb6a7ef576b8ab27c92f72198a482 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -31,11 +31,13 @@ import org.scalatest.concurrent.Timeouts._
 import org.scalatest.Matchers
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf}
+import org.apache.spark.{MapOutputTrackerMaster, SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
 import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
 
+import scala.collection.mutable.ArrayBuffer
 import scala.language.implicitConversions
 import scala.language.postfixOps
 
@@ -415,6 +417,39 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     }
   }
 
+  test("correct BlockResult returned from get() calls") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf, securityMgr,
+      mapOutputTracker)
+    val list1 = List(new Array[Byte](200), new Array[Byte](200))
+    val list1ForSizeEstimate = new ArrayBuffer[Any]
+    list1ForSizeEstimate ++= list1.iterator
+    val list1SizeEstimate = SizeEstimator.estimate(list1ForSizeEstimate)
+    val list2 = List(new Array[Byte](50), new Array[Byte](100), new Array[Byte](150))
+    val list2ForSizeEstimate = new ArrayBuffer[Any]
+    list2ForSizeEstimate ++= list2.iterator
+    val list2SizeEstimate = SizeEstimator.estimate(list2ForSizeEstimate)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    val list1Get = store.get("list1")
+    assert(list1Get.isDefined, "list1 expected to be in store")
+    assert(list1Get.get.data.size === 2)
+    assert(list1Get.get.inputMetrics.bytesRead === list1SizeEstimate)
+    assert(list1Get.get.inputMetrics.readMethod === DataReadMethod.Memory)
+    val list2MemoryGet = store.get("list2memory")
+    assert(list2MemoryGet.isDefined, "list2memory expected to be in store")
+    assert(list2MemoryGet.get.data.size === 3)
+    assert(list2MemoryGet.get.inputMetrics.bytesRead === list2SizeEstimate)
+    assert(list2MemoryGet.get.inputMetrics.readMethod === DataReadMethod.Memory)
+    val list2DiskGet = store.get("list2disk")
+    assert(list2DiskGet.isDefined, "list2memory expected to be in store")
+    assert(list2DiskGet.get.data.size === 3)
+    System.out.println(list2DiskGet)
+    // We don't know the exact size of the data on disk, but it should certainly be > 0.
+    assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
+    assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
+  }
+
   test("in-memory LRU storage") {
     store = new BlockManager("<driver>", actorSystem, master, serializer, 1200, conf,
       securityMgr, mapOutputTracker)
@@ -630,18 +665,18 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list2").get.size == 2)
+    assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3").isDefined, "list3 was not in store")
-    assert(store.get("list3").get.size == 2)
+    assert(store.get("list3").get.data.size === 2)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list2").get.size == 2)
+    assert(store.get("list2").get.data.size === 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list1").isDefined, "list1 was not in store")
-    assert(store.get("list1").get.size == 2)
+    assert(store.get("list1").get.data.size === 2)
     assert(store.get("list2").isDefined, "list2 was not in store")
-    assert(store.get("list2").get.size == 2)
+    assert(store.get("list2").get.data.size === 2)
     assert(store.get("list3") === None, "list1 was in store")
   }
 
@@ -656,28 +691,31 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
     store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
     store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
     store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    val listForSizeEstimate = new ArrayBuffer[Any]
+    listForSizeEstimate ++= list1.iterator
+    val listSize = SizeEstimator.estimate(listForSizeEstimate)
     // At this point LRU should not kick in because list3 is only on disk
-    assert(store.get("list1").isDefined, "list2 was not in store")
-    assert(store.get("list1").get.size === 2)
-    assert(store.get("list2").isDefined, "list3 was not in store")
-    assert(store.get("list2").get.size === 2)
-    assert(store.get("list3").isDefined, "list1 was not in store")
-    assert(store.get("list3").get.size === 2)
-    assert(store.get("list1").isDefined, "list2 was not in store")
-    assert(store.get("list1").get.size === 2)
-    assert(store.get("list2").isDefined, "list3 was not in store")
-    assert(store.get("list2").get.size === 2)
-    assert(store.get("list3").isDefined, "list1 was not in store")
-    assert(store.get("list3").get.size === 2)
+    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.get("list1").get.data.size === 2)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list2").get.data.size === 2)
+    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.get("list3").get.data.size === 2)
+    assert(store.get("list1").isDefined, "list1 was not in store")
+    assert(store.get("list1").get.data.size === 2)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list2").get.data.size === 2)
+    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.get("list3").get.data.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
     store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.get("list1") === None, "list1 was in store")
-    assert(store.get("list2").isDefined, "list3 was not in store")
-    assert(store.get("list2").get.size === 2)
-    assert(store.get("list3").isDefined, "list1 was not in store")
-    assert(store.get("list3").get.size === 2)
+    assert(store.get("list2").isDefined, "list2 was not in store")
+    assert(store.get("list2").get.data.size === 2)
+    assert(store.get("list3").isDefined, "list3 was not in store")
+    assert(store.get("list3").get.data.size === 2)
     assert(store.get("list4").isDefined, "list4 was not in store")
-    assert(store.get("list4").get.size === 2)
+    assert(store.get("list4").get.data.size === 2)
   }
 
   test("negative byte values in ByteBufferInputStream") {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6c49870455873f0b7599f6e66ab0ed2a9a54d076..316e14100e4091d7c12b243b47c7b1a44b320672 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -39,7 +39,11 @@ class JsonProtocolSuite extends FunSuite {
     val taskGettingResult =
       SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
     val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
-      makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800))
+      makeTaskInfo(123L, 234, 67, 345L, false),
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
+    val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+      makeTaskInfo(123L, 234, 67, 345L, false),
+      makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
     val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
     val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
@@ -61,6 +65,7 @@ class JsonProtocolSuite extends FunSuite {
     testEvent(taskStart, taskStartJsonString)
     testEvent(taskGettingResult, taskGettingResultJsonString)
     testEvent(taskEnd, taskEndJsonString)
+    testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
     testEvent(jobStart, jobStartJsonString)
     testEvent(jobEnd, jobEndJsonString)
     testEvent(environmentUpdate, environmentUpdateJsonString)
@@ -75,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
     testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
     testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
     testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
-    testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8))
+    testTaskMetrics(makeTaskMetrics(33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false))
     testBlockManagerId(BlockManagerId("Hong", "Kong", 500, 1000))
 
     // StorageLevel
@@ -118,7 +123,7 @@ class JsonProtocolSuite extends FunSuite {
     testBlockId(StreamBlockId(1, 2L))
   }
 
-  test("Backward compatibility") {
+  test("StageInfo.details backward compatibility") {
     // StageInfo.details was added after 1.0.0.
     val info = makeStageInfo(1, 2, 3, 4L, 5L)
     assert(info.details.nonEmpty)
@@ -129,6 +134,16 @@ class JsonProtocolSuite extends FunSuite {
     assert("" === newInfo.details)
   }
 
+  test("InputMetrics backward compatibility") {
+    // InputMetrics were added after 1.0.1.
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true)
+    assert(metrics.inputMetrics.nonEmpty)
+    val newJson = JsonProtocol.taskMetricsToJson(metrics)
+    val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.inputMetrics.isEmpty)
+  }
+
 
   /** -------------------------- *
    | Helper test running methods |
@@ -294,6 +309,8 @@ class JsonProtocolSuite extends FunSuite {
       metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals)
     assertOptionEquals(
       metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals)
+    assertOptionEquals(
+      metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
     assertOptionEquals(metrics1.updatedBlocks, metrics2.updatedBlocks, assertBlocksEquals)
   }
 
@@ -311,6 +328,11 @@ class JsonProtocolSuite extends FunSuite {
     assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
   }
 
+  private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
+    assert(metrics1.readMethod === metrics2.readMethod)
+    assert(metrics1.bytesRead === metrics2.bytesRead)
+  }
+
   private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
     assert(bm1.executorId === bm2.executorId)
     assert(bm1.host === bm2.host)
@@ -403,6 +425,10 @@ class JsonProtocolSuite extends FunSuite {
     assertEquals(w1, w2)
   }
 
+  private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
+    assertEquals(i1, i2)
+  }
+
   private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
     assertEquals(t1, t2)
   }
@@ -460,9 +486,19 @@ class JsonProtocolSuite extends FunSuite {
     new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
   }
 
-  private def makeTaskMetrics(a: Long, b: Long, c: Long, d: Long, e: Int, f: Int) = {
+  /**
+   * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
+   * set to true) or read data from a shuffle otherwise.
+   */
+  private def makeTaskMetrics(
+      a: Long,
+      b: Long,
+      c: Long,
+      d: Long,
+      e: Int,
+      f: Int,
+      hasHadoopInput: Boolean) = {
     val t = new TaskMetrics
-    val sr = new ShuffleReadMetrics
     val sw = new ShuffleWriteMetrics
     t.hostname = "localhost"
     t.executorDeserializeTime = a
@@ -471,15 +507,23 @@ class JsonProtocolSuite extends FunSuite {
     t.jvmGCTime = d
     t.resultSerializationTime = a + b
     t.memoryBytesSpilled = a + c
-    sr.shuffleFinishTime = b + c
-    sr.totalBlocksFetched = e + f
-    sr.remoteBytesRead = b + d
-    sr.localBlocksFetched = e
-    sr.fetchWaitTime = a + d
-    sr.remoteBlocksFetched = f
+
+    if (hasHadoopInput) {
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      inputMetrics.bytesRead = d + e + f
+      t.inputMetrics = Some(inputMetrics)
+    } else {
+      val sr = new ShuffleReadMetrics
+      sr.shuffleFinishTime = b + c
+      sr.totalBlocksFetched = e + f
+      sr.remoteBytesRead = b + d
+      sr.localBlocksFetched = e
+      sr.fetchWaitTime = a + d
+      sr.remoteBlocksFetched = f
+      t.shuffleReadMetrics = Some(sr)
+    }
     sw.shuffleBytesWritten = a + b + c
     sw.shuffleWriteTime = b + c + d
-    t.shuffleReadMetrics = Some(sr)
     t.shuffleWriteMetrics = Some(sw)
     // Make at most 6 blocks
     t.updatedBlocks = Some((1 to (e % 5 + 1)).map { i =>
@@ -552,8 +596,9 @@ class JsonProtocolSuite extends FunSuite {
       |  },
       |  "Shuffle Write Metrics":{
       |    "Shuffle Bytes Written":1200,
-      |    "Shuffle Write Time":1500},
-      |    "Updated Blocks":[
+      |    "Shuffle Write Time":1500
+      |  },
+      |  "Updated Blocks":[
       |    {"Block ID":"rdd_0_0",
       |      "Status":{
       |        "Storage Level":{
@@ -568,6 +613,35 @@ class JsonProtocolSuite extends FunSuite {
       |}
     """.stripMargin
 
+  private val taskEndWithHadoopInputJsonString =
+    """
+      |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
+      |"Task End Reason":{"Reason":"Success"},
+      |"Task Info":{
+      |  "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
+      |  "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
+      |  "Getting Result Time":0,"Finish Time":0,"Failed":false,"Serialized Size":0
+      |},
+      |"Task Metrics":{
+      |  "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
+      |  "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
+      |  "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
+      |  "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
+      |  "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
+      |  "Updated Blocks":[
+      |    {"Block ID":"rdd_0_0",
+      |      "Status":{
+      |        "Storage Level":{
+      |          "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
+      |          "Replication":2
+      |        },
+      |        "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+      |      }
+      |    }
+      |  ]}
+      |}
+    """
+
   private val jobStartJsonString =
     """
       {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":