diff --git a/core/src/main/scala/spark/ui/Page.scala b/core/src/main/scala/spark/ui/Page.scala
index a31e750d06b1b7563e496c964e6bcec2de36869b..03034a4520799b2d9adee5ad89a0aa1c05132c12 100644
--- a/core/src/main/scala/spark/ui/Page.scala
+++ b/core/src/main/scala/spark/ui/Page.scala
@@ -17,4 +17,4 @@
 
 package spark.ui
 
-private[spark] object Page extends Enumeration { val Storage, Jobs, Environment = Value }
+private[spark] object Page extends Enumeration { val Storage, Jobs, Environment, Executors = Value }
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 9396f220634e94cc7ce7774e108c092e65475f05..7599f82a94414558f182c775a5a191a67c177ca7 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -23,6 +23,7 @@ import org.eclipse.jetty.server.{Handler, Server}
 
 import spark.{Logging, SparkContext, Utils}
 import spark.ui.env.EnvironmentUI
+import spark.ui.exec.ExecutorsUI
 import spark.ui.storage.BlockManagerUI
 import spark.ui.jobs.JobProgressUI
 import spark.ui.JettyUtils._
@@ -41,7 +42,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val storage = new BlockManagerUI(sc)
   val jobs = new JobProgressUI(sc)
   val env = new EnvironmentUI(sc)
-  val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ handlers
+  val exec = new ExecutorsUI(sc)
+  val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
+    exec.getHandlers ++ handlers
 
   /** Bind the HTTP server which backs this web interface */
   def bind() {
@@ -64,6 +67,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
     //  This server must register all handlers, including JobProgressUI, before binding
     //  JobProgressUI registers a listener with SparkContext, which requires sc to initialize
     jobs.start()
+    exec.start()
   }
 
   def stop() {
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index b1d11954dd535ea20e0f8b2504cc1f6884877f44..e33c80282a5fbbfae534c6123e8ab0f10840d82a 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -40,6 +40,10 @@ private[spark] object UIUtils {
       case Environment => <li class="active"><a href="/environment">Environment</a></li>
       case _ => <li><a href="/environment">Environment</a></li>
     }
+    val executors = page match {
+      case Executors => <li class="active"><a href="/executors">Executors</a></li>
+      case _ => <li><a href="/executors">Executors</a></li>
+    }
 
     <html>
       <head>
@@ -66,6 +70,7 @@ private[spark] object UIUtils {
                       {storage}
                       {jobs}
                       {environment}
+                      {executors}
                     </ul>
                     <ul id="infolist">
                       <li>Application: <strong>{sc.appName}</strong></li>
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
new file mode 100644
index 0000000000000000000000000000000000000000..20ea54d6a621cd0ab7940f42af4ba545c18a02af
--- /dev/null
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -0,0 +1,136 @@
+package spark.ui.exec
+
+
+import javax.servlet.http.HttpServletRequest
+
+import org.eclipse.jetty.server.Handler
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.util.Properties
+
+import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
+import spark.executor.TaskMetrics
+import spark.scheduler.cluster.TaskInfo
+import spark.scheduler._
+import spark.SparkContext
+import spark.storage.{StorageStatus, StorageUtils}
+import spark.ui.JettyUtils._
+import spark.ui.Page.Executors
+import spark.ui.UIUtils.headerSparkPage
+import spark.ui.UIUtils
+import spark.Utils
+
+import scala.xml.{Node, XML}
+
+private[spark] class ExecutorsUI(val sc: SparkContext) {
+
+  private var _listener: Option[ExecutorsListener] = None
+  def listener = _listener.get
+
+  def start() {
+    _listener = Some(new ExecutorsListener)
+    sc.addSparkListener(listener)
+  }
+
+  def getHandlers = Seq[(String, Handler)](
+    ("/executors", (request: HttpServletRequest) => render(request))
+  )
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val storageStatusList = sc.getExecutorStorageStatus
+
+    val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
+    val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
+    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
+      .reduceOption(_+_).getOrElse(0L)
+
+    val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
+      "Failed tasks", "Complete tasks", "Total tasks")
+    def execRow(kv: Seq[String]) =
+      <tr>
+        <td>{kv(0)}</td>
+        <td>{kv(1)}</td>
+        <td>{kv(2)}</td>
+        <td sorttable_customkey={kv(3)}>
+          {Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)}
+        </td>
+        <td sorttable_customkey={kv(5)}>
+          {Utils.memoryBytesToString(kv(5).toLong)}
+        </td>
+        <td>{kv(6)}</td>
+        <td>{kv(7)}</td>
+        <td>{kv(8)}</td>
+      </tr>
+    val execInfo =
+      for (b <- 0 until storageStatusList.size)
+        yield getExecInfo(b)
+    val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
+
+    val content =
+      <div class="row">
+        <div class="span12">
+          <ul class="unstyled">
+            <li><strong>Memory:</strong>
+              {Utils.memoryBytesToString(memUsed)} Used
+              ({Utils.memoryBytesToString(maxMem)} Total) </li>
+            <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+          </ul>
+        </div>
+      </div>
+      <div class = "row">
+        <div class="span12">
+          {execTable}
+        </div>
+      </div>;
+
+    headerSparkPage(content, sc, "Executors", Executors)
+  }
+
+  def getExecInfo(a: Int): Seq[String] = {
+    val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
+    val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
+    val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
+    val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
+    val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
+    val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
+    val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
+    val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
+    val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
+
+    Seq(
+      execId,
+      hostPort,
+      rddBlocks,
+      memUsed,
+      maxMem,
+      diskUsed,
+      failedTasks,
+      completedTasks,
+      totalTasks
+    )
+  }
+
+  private[spark] class ExecutorsListener extends SparkListener with Logging {
+    val executorToTasksComplete = HashMap[String, Int]()
+    val executorToTasksFailed = HashMap[String, Int]()
+    val executorToTaskInfos =
+      HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+      val eid = taskEnd.taskInfo.executorId
+      val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+        taskEnd.reason match {
+          case e: ExceptionFailure =>
+            executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
+            (Some(e), e.metrics)
+          case _ =>
+            executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
+            (None, Some(taskEnd.taskMetrics))
+        }
+      val taskList = executorToTaskInfos.getOrElse(
+        eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+      taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+      executorToTaskInfos(eid) = taskList
+    }
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index 4e0360d19aec4e6b32aaa2f2ae783c053928e972..f76192eba84e777b2773016f21fc0ef6de9f71bd 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -33,10 +33,6 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
   def render(request: HttpServletRequest): Seq[Node] = {
     val storageStatusList = sc.getExecutorStorageStatus
     // Calculate macro-level statistics
-    val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
-    val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
-    val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
-      .reduceOption(_+_).getOrElse(0L)
 
     val rddHeaders = Seq(
       "RDD Name",
@@ -46,19 +42,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
       "Size in Memory",
       "Size on Disk")
     val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
-    val rddTable = listingTable(rddHeaders, rddRow, rdds)
-
-    val content =
-      <div class="row">
-        <div class="span12">
-          <ul class="unstyled">
-            <li><strong>Memory:</strong>
-              {Utils.memoryBytesToString(maxMem - remainingMem)} Used
-              ({Utils.memoryBytesToString(remainingMem)} Available) </li>
-            <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
-          </ul>
-        </div>
-      </div> ++ {rddTable};
+    val content = listingTable(rddHeaders, rddRow, rdds)
 
     headerSparkPage(content, parent.sc, "Spark Storage ", Storage)
   }