diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index db1c902955151d38f002fee1ac802f1d43da6936..b70153fd30dcaa17d4de78a18c663241794a64db 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -113,7 +113,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
   }
 
   private[spark] class ExecutorsListener extends SparkListener with Logging {
-    val executorToTasksActive = HashMap[String, HashSet[Long]]()
+    val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
     val executorToTasksComplete = HashMap[String, Int]()
     val executorToTasksFailed = HashMap[String, Int]()
     val executorToTaskInfos =
@@ -121,9 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
 
     override def onTaskStart(taskStart: SparkListenerTaskStart) {
       val eid = taskStart.taskInfo.executorId
-      if (!executorToTasksActive.contains(eid))
-        executorToTasksActive(eid) = HashSet[Long]()
-      executorToTasksActive(eid) += taskStart.taskInfo.taskId
+      val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+      activeTasks += taskStart.taskInfo
       val taskList = executorToTaskInfos.getOrElse(
         eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
       taskList += ((taskStart.taskInfo, None, None))
@@ -132,9 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
 
     override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
       val eid = taskEnd.taskInfo.executorId
-      if (!executorToTasksActive.contains(eid))
-        executorToTasksActive(eid) = HashSet[Long]()
-      executorToTasksActive(eid) -= taskEnd.taskInfo.taskId
+      val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
+      activeTasks -= taskEnd.taskInfo
       val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
         taskEnd.reason match {
           case e: ExceptionFailure =>
@@ -142,7 +140,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
             (Some(e), e.metrics)
           case _ =>
             executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
-            (None, Some(taskEnd.taskMetrics))
+            (None, Option(taskEnd.taskMetrics))
         }
       val taskList = executorToTaskInfos.getOrElse(
         eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index f7f5c2fb6e195d243e4aae8793f7f168ec7046a6..e0a41682904952d5016e608a7cc1cb172815627e 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -25,9 +25,10 @@ import scala.Some
 import scala.xml.{NodeSeq, Node}
 
 import spark.scheduler.Stage
-import spark.ui.UIUtils._
-import spark.ui.Page._
 import spark.storage.StorageLevel
+import spark.ui.Page._
+import spark.ui.UIUtils._
+import spark.Utils
 
 /** Page showing list of all ongoing and recently finished stages */
 private[spark] class IndexPage(parent: JobProgressUI) {
@@ -38,6 +39,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
     val activeStages = listener.activeStages.toSeq
     val completedStages = listener.completedStages.reverse.toSeq
     val failedStages = listener.failedStages.reverse.toSeq
+    val now = System.currentTimeMillis()
+
+    var activeTime = 0L
+    for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
+      activeTime += t.timeRunning(now)
+    }
 
     /** Special table which merges two header cells. */
     def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
@@ -48,7 +55,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
           <th>Submitted</th>
           <th>Duration</th>
           <th colspan="2">Tasks: Complete/Total</th>
-          <th>Shuffle Activity</th>
+          <th>Shuffle Read</th>
+          <th>Shuffle Write</th>
           <th>Stored RDD</th>
         </thead>
         <tbody>
@@ -57,11 +65,33 @@ private[spark] class IndexPage(parent: JobProgressUI) {
       </table>
     }
 
+    val summary: NodeSeq =
+     <div>
+       <ul class="unstyled">
+          <li>
+            <strong>CPU time: </strong>
+            {parent.formatDuration(listener.totalTime + activeTime)}
+          </li>
+         {if (listener.totalShuffleRead > 0)
+           <li>
+              <strong>Shuffle read: </strong>
+              {Utils.memoryBytesToString(listener.totalShuffleRead)}
+            </li>
+         }
+         {if (listener.totalShuffleWrite > 0)
+           <li>
+              <strong>Shuffle write: </strong>
+              {Utils.memoryBytesToString(listener.totalShuffleWrite)}
+            </li>
+         }
+       </ul>
+     </div>
     val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
     val completedStageTable = stageTable(stageRow, completedStages)
     val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
 
-    val content = <h2>Active Stages</h2> ++ activeStageTable ++
+    val content = summary ++
+                  <h2>Active Stages</h2> ++ activeStageTable ++
                   <h2>Completed Stages</h2>  ++ completedStageTable ++
                   <h2>Failed Stages</h2>  ++ failedStageTable
 
@@ -91,14 +121,16 @@ private[spark] class IndexPage(parent: JobProgressUI) {
       case Some(t) => dateFmt.format(new Date(t))
       case None => "Unknown"
     }
-    val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
-    val shuffleInfo = (read, write) match {
-      case (true, true) => "Read/Write"
-      case (true, false) => "Read"
-      case (false, true) => "Write"
-      case _ => ""
+
+    val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+      case 0 => ""
+      case b => Utils.memoryBytesToString(b)
+    }
+    val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+      case 0 => ""
+      case b => Utils.memoryBytesToString(b)
     }
-    val startedTasks = listener.stageToTasksActive.getOrElse(s.id, Seq[Long]()).size
+
     val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
     val totalTasks = s.numPartitions
 
@@ -115,7 +147,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
         case _ =>
       }}
       </td>
-      <td>{shuffleInfo}</td>
+      <td>{shuffleRead}</td>
+      <td>{shuffleWrite}</td>
       <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
              <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
                {Option(s.rdd.name).getOrElse(s.rdd.id)}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 6e332415dbcf85eaaf50e2a335c3ee1c68751e6b..09d24b6302047746d4bdf5a1339a91033d2d5718 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -65,7 +65,15 @@ private[spark] class JobProgressListener extends SparkListener {
   val completedStages = ListBuffer[Stage]()
   val failedStages = ListBuffer[Stage]()
 
-  val stageToTasksActive = HashMap[Int, HashSet[Long]]()
+  // Total metrics reflect metrics only for completed tasks
+  var totalTime = 0L
+  var totalShuffleRead = 0L
+  var totalShuffleWrite = 0L
+
+  val stageToTime = HashMap[Int, Long]()
+  val stageToShuffleRead = HashMap[Int, Long]()
+  val stageToShuffleWrite = HashMap[Int, Long]()
+  val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
   val stageToTasksComplete = HashMap[Int, Int]()
   val stageToTasksFailed = HashMap[Int, Int]()
   val stageToTaskInfos =
@@ -86,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener {
       val toRemove = RETAINED_STAGES / 10
       stages.takeRight(toRemove).foreach( s => {
         stageToTaskInfos.remove(s.id)
+        stageToTime.remove(s.id)
+        stageToShuffleRead.remove(s.id)
+        stageToShuffleWrite.remove(s.id)
+        stageToTasksActive.remove(s.id)
+        stageToTasksComplete.remove(s.id)
+        stageToTasksFailed.remove(s.id)
       })
       stages.trimEnd(toRemove)
     }
@@ -96,9 +110,8 @@ private[spark] class JobProgressListener extends SparkListener {
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) {
     val sid = taskStart.task.stageId
-    if (!stageToTasksActive.contains(sid))
-      stageToTasksActive(sid) = HashSet[Long]()
-    stageToTasksActive(sid) += taskStart.taskInfo.taskId
+    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+    tasksActive += taskStart.taskInfo
     val taskList = stageToTaskInfos.getOrElse(
       sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
     taskList += ((taskStart.taskInfo, None, None))
@@ -107,9 +120,8 @@ private[spark] class JobProgressListener extends SparkListener {
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
     val sid = taskEnd.task.stageId
-    if (!stageToTasksActive.contains(sid))
-      stageToTasksActive(sid) = HashSet[Long]()
-    stageToTasksActive(sid) -= taskEnd.taskInfo.taskId
+    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+    tasksActive -= taskEnd.taskInfo
     val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
       taskEnd.reason match {
         case e: ExceptionFailure =>
@@ -117,8 +129,26 @@ private[spark] class JobProgressListener extends SparkListener {
           (Some(e), e.metrics)
         case _ =>
           stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
-          (None, Some(taskEnd.taskMetrics))
+          (None, Option(taskEnd.taskMetrics))
       }
+
+    stageToTime.getOrElseUpdate(sid, 0L)
+    val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+    stageToTime(sid) += time
+    totalTime += time
+
+    stageToShuffleRead.getOrElseUpdate(sid, 0L)
+    val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+      s.remoteBytesRead).getOrElse(0L)
+    stageToShuffleRead(sid) += shuffleRead
+    totalShuffleRead += shuffleRead
+
+    stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+    val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+      s.shuffleBytesWritten).getOrElse(0L)
+    stageToShuffleWrite(sid) += shuffleWrite
+    totalShuffleWrite += shuffleWrite
+
     val taskList = stageToTaskInfos.getOrElse(
       sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
     taskList -= ((taskEnd.taskInfo, None, None))
@@ -139,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener {
       case _ =>
     }
   }
-
-  /** Is this stage's input from a shuffle read. */
-  def hasShuffleRead(stageID: Int): Boolean = {
-    // This is written in a slightly complicated way to avoid having to scan all tasks
-    for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
-      if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
-    }
-    return false // No tasks have finished for this stage
-  }
-
-  /** Is this stage's output to a shuffle write. */
-  def hasShuffleWrite(stageID: Int): Boolean = {
-    // This is written in a slightly complicated way to avoid having to scan all tasks
-    for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
-      if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
-    }
-    return false // No tasks have finished for this stage
-  }
 }
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 654f3477235405cbc995cb3a41de817f9efb9c1d..e327cb3947889c3d67105f9743f380f0f7318f4c 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -37,6 +37,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
 
   def render(request: HttpServletRequest): Seq[Node] = {
     val stageId = request.getParameter("id").toInt
+    val now = System.currentTimeMillis()
 
     if (!listener.stageToTaskInfos.contains(stageId)) {
       val content =
@@ -49,8 +50,35 @@ private[spark] class StagePage(parent: JobProgressUI) {
 
     val tasks = listener.stageToTaskInfos(stageId)
 
-    val shuffleRead = listener.hasShuffleRead(stageId)
-    val shuffleWrite = listener.hasShuffleWrite(stageId)
+    val shuffleRead = listener.stageToShuffleRead(stageId) > 0
+    val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
+
+    var activeTime = 0L
+    listener.stageToTasksActive(stageId).foreach { t =>
+      activeTime += t.timeRunning(now)
+    }
+
+    val summary =
+      <div>
+        <ul class="unstyled">
+          <li>
+            <strong>CPU time: </strong>
+            {parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
+          </li>
+          {if (shuffleRead)
+            <li>
+              <strong>Shuffle read: </strong>
+              {Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
+            </li>
+          }
+          {if (shuffleWrite)
+            <li>
+              <strong>Shuffle write: </strong>
+              {Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
+            </li>
+          }
+        </ul>
+      </div>
 
     val taskHeaders: Seq[String] =
       Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
@@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
       }
 
     val content =
-      <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
+      summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
+        <h2>Tasks</h2> ++ taskTable;
 
     headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
   }