diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 0486ca4c79213edb35f8f56a3123deecd289e029..63a5a2093ebaa746a7aecf4c63b03c74ff6d7faa 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -1,48 +1,45 @@
-# syntax: [instance].[sink|source].[name].[options]
-
-#  "instance" specify "who" (the role) use metrics system. In spark there are
-#  several roles like master, worker, executor, driver, these roles will
-#  create metrics system for monitoring. So instance represents these roles.
-#  Currently in Spark, several instances have already implemented: master,
-#  worker, executor, driver.
-#
-#  [instance] field can be "master", "worker", "executor", "driver", which means
-#  only the specified instance has this property.
-#  a wild card "*" can be used to represent instance name, which means all the
-#  instances will have this property.
+#  syntax: [instance].sink|source.[name].[options]=[value]
+
+#  This file configures Spark's internal metrics system. The metrics system is
+#  divided into instances which correspond to internal components.
+#  Each instance can be configured to report its metrics to one or more sinks.
+#  Accepted values for [instance] are "master", "worker", "executor", "driver", 
+#  and "applications". A wild card "*" can be used as an instance name, in 
+#  which case all instances will inherit the supplied property.
 #
-#  "source" specify "where" (source) to collect metrics data. In metrics system,
-#  there exists two kinds of source:
-#    1. Spark internal source, like MasterSource, WorkerSource, etc, which will
-#    collect Spark component's internal state, these sources are related to
-#    instance and will be added after specific metrics system is created.
-#    2. Common source, like JvmSource, which will collect low level state, is
-#    configured by configuration and loaded through reflection.
+#  Within an instance, a "source" specifies a particular set of grouped metrics.
+#  there are two kinds of sources:
+#    1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
+#    collect a Spark component's internal state. Each instance is paired with a
+#    Spark source that is added automatically.
+#    2. Common sources, like JvmSource, which will collect low level state.
+#    These can be added through configuration options and are then loaded
+#    using reflection.
 #
-#  "sink" specify "where" (destination) to output metrics data to. Several sinks
-#  can be coexisted and flush metrics to all these sinks.
+#  A "sink" specifies where metrics are delivered to. Each instance can be
+#  assigned one or more sinks.
 #
-#  [sink|source] field specify this property is source related or sink, this
-#  field can only be source or sink.
+#  The sink|source field specifies whether the property relates to a sink or 
+#  source.
 #
-#  [name] field specify the name of source or sink, this is custom defined.
+#  The [name] field specifies the name of source or sink.
 #
-#  [options] field is the specific property of this source or sink, this source
-#  or sink is responsible for parsing this property.
+#  The [options] field is the specific property of this source or sink. The
+#  source or sink is responsible for parsing this property.
 #
 #  Notes:
-#    1. Sinks should be added through configuration, like console sink, class
-#    full name should be specified by class property.
-#    2. Some sinks can specify polling period, like console sink, which is 10 seconds,
-#    it should be attention minimal polling period is 1 seconds, any period
-#    below than 1s is illegal.
-#    3. Wild card property can be overlapped by specific instance property, for
-#    example, *.sink.console.period can be overlapped by master.sink.console.period.
+#    1. To add a new sink, set the "class" option to a fully qualified class 
+#    name (see examples below).
+#    2. Some sinks involve a polling period. The minimum allowed polling period
+#    is  1 second.
+#    3. Wild card properties can be overridden by more specific properties. 
+#    For example, master.sink.console.period takes precedence over 
+#    *.sink.console.period.
 #    4. A metrics specific configuration
 #    "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
-#    added to Java property using -Dspark.metrics.conf=xxx if you want to
-#    customize metrics system, or you can put it in ${SPARK_HOME}/conf,
-#    metrics system will search and load it automatically.
+#    added to Java properties using -Dspark.metrics.conf=xxx if you want to
+#    customize metrics system. You can also put the file in ${SPARK_HOME}/conf
+#    and it will be loaded automatically.
 
 # Enable JmxSink for all instances by class name
 #*.sink.jmx.class=spark.metrics.sink.JmxSink
diff --git a/core/src/main/resources/spark/ui/static/webui.css b/core/src/main/resources/spark/ui/static/webui.css
index f7537bb7661919e20f66298273471d1629feb0a3..fd2cbad004a8e96663941662dd223257ed9c4e8f 100644
--- a/core/src/main/resources/spark/ui/static/webui.css
+++ b/core/src/main/resources/spark/ui/static/webui.css
@@ -47,3 +47,31 @@
   padding-top: 7px;
   padding-left: 4px;
 }
+
+.table td {
+  vertical-align: middle !important;
+}
+
+.progress-completed .bar,
+.progress .bar-completed {
+  background-color: #b3def9;
+  background-image: -moz-linear-gradient(top, #addfff, #badcf2);
+  background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#addfff), to(#badcf2));
+  background-image: -webkit-linear-gradient(top, #addfff, #badcf2);
+  background-image: -o-linear-gradient(top, #addfff, #badcf2);
+  background-image: linear-gradient(to bottom, #addfff, #badcf2);
+  background-repeat: repeat-x;
+  filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffaddfff', endColorstr='#ffbadcf2', GradientType=0);
+}
+
+.progress-running .bar,
+.progress .bar-running {
+  background-color: #c2ebfa;
+  background-image: -moz-linear-gradient(top, #bdedff, #c7e8f5);
+  background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#bdedff), to(#c7e8f5));
+  background-image: -webkit-linear-gradient(top, #bdedff, #c7e8f5);
+  background-image: -o-linear-gradient(top, #bdedff, #c7e8f5);
+  background-image: linear-gradient(to bottom, #bdedff, #c7e8f5);
+  background-repeat: repeat-x;
+  filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffbdedff', endColorstr='#ffc7e8f5', GradientType=0);
+}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 77cb0ee0cd6b06d1cc286023030e059381bb7594..40b30e4d236cf766dc6a6a094e7a64fb26e1054e 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
 import scala.collection.Map
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 import scala.util.DynamicVariable
 import scala.collection.mutable.{ConcurrentMap, HashMap}
@@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary
 import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import spark.partial.{ApproximateEvaluator, PartialResult}
 import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
+import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
+  SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
+  ClusterScheduler, Schedulable, SchedulingMode}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
@@ -125,6 +128,8 @@ class SparkContext(
   private[spark] val ui = new SparkUI(this)
   ui.bind()
 
+  val startTime = System.currentTimeMillis()
+
   // Add each JAR given through the constructor
   if (jars != null) {
     jars.foreach { addJar(_) }
@@ -262,12 +267,18 @@ class SparkContext(
       localProperties.value = new Properties()
   }
 
-  def addLocalProperties(key: String, value: String) {
+  def addLocalProperty(key: String, value: String) {
     if(localProperties.value == null) {
       localProperties.value = new Properties()
     }
     localProperties.value.setProperty(key,value)
   }
+
+  /** Set a human readable description of the current job. */
+  def setDescription(value: String) {
+    addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
+  }
+
   // Post init
   taskScheduler.postStartHook()
 
@@ -574,6 +585,28 @@ class SparkContext(
     env.blockManager.master.getStorageStatus
   }
 
+  /**
+   *  Return pools for fair scheduler
+   *  TODO(xiajunluan): We should take nested pools into account
+   */
+  def getAllPools: ArrayBuffer[Schedulable] = {
+    taskScheduler.rootPool.schedulableQueue
+  }
+
+  /**
+   * Return the pool associated with the given name, if one exists
+   */
+  def getPoolForName(pool: String): Option[Schedulable] = {
+    taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
+  }
+
+  /**
+   *  Return current scheduling mode
+   */
+  def getSchedulingMode: SchedulingMode.SchedulingMode = {
+    taskScheduler.schedulingMode
+  }
+
   /**
    * Clear the job's list of files added by `addFile` so that they do not get downloaded to
    * any new nodes.
@@ -816,6 +849,7 @@ class SparkContext(
  * various Spark features.
  */
 object SparkContext {
+  val SPARK_JOB_DESCRIPTION = "spark.job.description"
 
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
@@ -933,7 +967,6 @@ object SparkContext {
   }
 }
 
-
 /**
  * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
  * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
@@ -945,3 +978,4 @@ private[spark] class WritableConverter[T](
     val writableClass: ClassManifest[T] => Class[_ <: Writable],
     val convert: Writable => T)
   extends Serializable
+
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4a1d341f5d26b5f60f67c0b015d102a77923e908..0adbf1d96e9d38f2ba502e85287297902d3b4da8 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -97,13 +97,26 @@ class SparkEnv (
 
 object SparkEnv extends Logging {
   private val env = new ThreadLocal[SparkEnv]
+  @volatile private var lastSetSparkEnv : SparkEnv = _
 
   def set(e: SparkEnv) {
+	  lastSetSparkEnv = e
     env.set(e)
   }
 
+  /**
+   * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
+   * previously set in any thread.
+   */
   def get: SparkEnv = {
-    env.get()
+    Option(env.get()).getOrElse(lastSetSparkEnv)
+  }
+
+  /**
+   * Returns the ThreadLocal SparkEnv.
+   */
+  def getThreadLocal : SparkEnv = {
+	  env.get()
   }
 
   def createFromSystemProperties(
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index ef598ae41b1f69867042e0302c24e27fb7036c15..673f9a810dd7348eca15501287d1807892db6d45 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -33,8 +33,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
 
 import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
 
-import spark.serializer.SerializerInstance
+import spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 import spark.deploy.SparkHadoopUtil
+import java.nio.ByteBuffer
 
 
 /**
@@ -68,6 +69,47 @@ private object Utils extends Logging {
     return ois.readObject.asInstanceOf[T]
   }
 
+  /** Serialize via nested stream using specific serializer */
+  def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
+    val osWrapper = ser.serializeStream(new OutputStream {
+      def write(b: Int) = os.write(b)
+
+      override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
+    })
+    try {
+      f(osWrapper)
+    } finally {
+      osWrapper.close()
+    }
+  }
+
+  /** Deserialize via nested stream using specific serializer */
+  def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
+    val isWrapper = ser.deserializeStream(new InputStream {
+      def read(): Int = is.read()
+
+      override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
+    })
+    try {
+      f(isWrapper)
+    } finally {
+      isWrapper.close()
+    }
+  }
+
+  /**
+   * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
+   */
+  def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
+    if (bb.hasArray) {
+      out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+    } else {
+      val bbval = new Array[Byte](bb.remaining())
+      bb.get(bbval)
+      out.write(bbval)
+    }
+  }
+
   def isAlpha(c: Char): Boolean = {
     (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
   }
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 7c37a166157a9c787fd2b81bf09b6be73824d2e2..31861f3ac2d985b5615eed7bb08adca40a6e02af 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -109,6 +109,7 @@ private[deploy] object DeployMessages {
   }
 
   //  WorkerWebUI to Worker
+
   case object RequestWorkerState
 
   // Worker to WorkerWebUI
@@ -120,4 +121,9 @@ private[deploy] object DeployMessages {
     Utils.checkHost(host, "Required hostname")
     assert (port > 0)
   }
+
+  // Actor System to Master
+
+  case object CheckForWorkerTimeOut
+
 }
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
index 15ff9197382c8c06f5ae2173dbab0f0c37c6665d..6dd2f06126527dd5c3dae462b07f9978fc23f502 100644
--- a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
@@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
   var executors = new mutable.HashMap[Int, ExecutorInfo]
   var coresGranted = 0
   var endTime = -1L
+  val appSource = new ApplicationSource(this)
 
   private var nextExecutorId = 0
 
@@ -51,8 +52,10 @@ private[spark] class ApplicationInfo(
   }
 
   def removeExecutor(exec: ExecutorInfo) {
-    executors -= exec.id
-    coresGranted -= exec.cores
+    if (executors.contains(exec.id)) {
+      executors -= exec.id
+      coresGranted -= exec.cores
+    }
   }
 
   def coresLeft: Int = desc.maxCores - coresGranted
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4df2b6bfddd7503ef4b5b59b8e9b3c660ddfd3ec
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
@@ -0,0 +1,24 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ApplicationSource(val application: ApplicationInfo) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "%s.%s.%s".format("application", application.desc.name,
+    System.currentTimeMillis())
+
+  metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
+    override def getValue: String = application.state.toString
+  })
+
+  metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
+    override def getValue: Long = application.duration
+  })
+
+  metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+    override def getValue: Int = application.coresGranted
+  })
+
+}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 202d5bcdb7f013cef97b78ced528401d371f40e5..4a4d9908a06f769f562789078e4d72fdb88b8e04 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -38,7 +38,9 @@ import spark.util.AkkaUtils
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
   val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
-
+  val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
+  val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
+ 
   var nextAppNumber = 0
   val workers = new HashSet[WorkerInfo]
   val idToWorker = new HashMap[String, WorkerInfo]
@@ -59,7 +61,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   Utils.checkHost(host, "Expected hostname")
 
-  val metricsSystem = MetricsSystem.createMetricsSystem("master")
+  val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
+  val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
   val masterSource = new MasterSource(this)
 
   val masterPublicAddress = {
@@ -77,15 +80,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     // Listen for remote client disconnection events, since they don't go through Akka's watch()
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
     webUi.start()
-    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+    context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
 
-    metricsSystem.registerSource(masterSource)
-    metricsSystem.start()
+    masterMetricsSystem.registerSource(masterSource)
+    masterMetricsSystem.start()
+    applicationMetricsSystem.start()
   }
 
   override def postStop() {
     webUi.stop()
-    metricsSystem.stop()
+    masterMetricsSystem.stop()
+    applicationMetricsSystem.stop()
   }
 
   override def receive = {
@@ -171,6 +176,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     case RequestMasterState => {
       sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
     }
+
+    case CheckForWorkerTimeOut => {
+      timeOutDeadWorkers()
+    }
   }
 
   /**
@@ -275,6 +284,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     val now = System.currentTimeMillis()
     val date = new Date(now)
     val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+    applicationMetricsSystem.registerSource(app.appSource)
     apps += app
     idToApp(app.id) = app
     actorToApp(driver) = app
@@ -300,7 +310,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       idToApp -= app.id
       actorToApp -= app.driver
       addressToApp -= app.driver.path.address
-      completedApps += app   // Remember it in our history
+      if (completedApps.size >= RETAINED_APPLICATIONS) {
+        val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+        completedApps.take(toRemove).foreach( a => {
+          applicationMetricsSystem.removeSource(a.appSource)
+        })
+        completedApps.trimStart(toRemove)
+      }
+      completedApps += app // Remember it in our history
       waitingApps -= app
       for (exec <- app.executors.values) {
         exec.worker.removeExecutor(exec)
@@ -325,12 +342,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
   /** Check for, and remove, any timed-out workers */
   def timeOutDeadWorkers() {
     // Copy the workers into an array so we don't modify the hashset while iterating through it
-    val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
-    val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+    val currentTime = System.currentTimeMillis()
+    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
     for (worker <- toRemove) {
-      logWarning("Removing %s because we got no heartbeat in %d seconds".format(
-        worker.id, WORKER_TIMEOUT))
-      removeWorker(worker)
+      if (worker.state != WorkerState.DEAD) {
+        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+          worker.id, WORKER_TIMEOUT/1000))
+        removeWorker(worker)
+      } else {
+        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it 
+      }
     }
   }
 }
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index fabddfb9476a54ed04f0caa22321e05ee630ffc1..1dacafa13517c66db4ccf20b28177672d9b2dc9c 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -17,7 +17,7 @@
 
 package spark.metrics
 
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
 
 import java.util.Properties
 import java.util.concurrent.TimeUnit
@@ -93,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
     }
   }
 
+  def removeSource(source: Source) {
+    sources -= source
+    registry.removeMatching(new MetricFilter {
+      def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
+    })
+  }
+
   def registerSources() {
     val instConfig = metricsConfig.getInstance(instance)
     val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index d0fdeb741e4e5041bd5eca49e60ff777609d327e..fd00d59c77160b6058bee7f795ae2bf46a4042e6 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -88,6 +88,7 @@ class HadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
     val split = theSplit.asInstanceOf[HadoopPartition]
+    logInfo("Input split: " + split.inputSplit)
     var reader: RecordReader[K, V] = null
 
     val conf = confBroadcast.value.value
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 17fe805fd435857efdbe0c7a329ab104ab3d150c..0b7160816956b9a473b2712c3174bd6f4388d790 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -73,6 +73,7 @@ class NewHadoopRDD[K, V](
 
   override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
     val split = theSplit.asInstanceOf[NewHadoopPartition]
+    logInfo("Input split: " + split.serializableHadoopSplit)
     val conf = confBroadcast.value.value
     val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
     val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index 16ba0c26f8ee10414d5843b7ecbec520765ec0c0..33079cd53937d99062c2fd6ddc0e04a68476a007 100644
--- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -20,13 +20,15 @@ package spark.rdd
 import scala.collection.immutable.NumericRange
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.Map
-import spark.{RDD, TaskContext, SparkContext, Partition}
+import spark._
+import java.io._
+import scala.Serializable
 
 private[spark] class ParallelCollectionPartition[T: ClassManifest](
-    val rddId: Long,
-    val slice: Int,
-    values: Seq[T])
-  extends Partition with Serializable {
+    var rddId: Long,
+    var slice: Int,
+    var values: Seq[T])
+    extends Partition with Serializable {
 
   def iterator: Iterator[T] = values.iterator
 
@@ -37,15 +39,49 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
     case _ => false
   }
 
-  override val index: Int = slice
+  override def index: Int = slice
+
+  @throws(classOf[IOException])
+  private def writeObject(out: ObjectOutputStream): Unit = {
+
+    val sfactory = SparkEnv.get.serializer
+
+    // Treat java serializer with default action rather than going thru serialization, to avoid a
+    // separate serialization header.
+
+    sfactory match {
+      case js: JavaSerializer => out.defaultWriteObject()
+      case _ =>
+        out.writeLong(rddId)
+        out.writeInt(slice)
+
+        val ser = sfactory.newInstance()
+        Utils.serializeViaNestedStream(out, ser)(_.writeObject(values))
+    }
+  }
+
+  @throws(classOf[IOException])
+  private def readObject(in: ObjectInputStream): Unit = {
+
+    val sfactory = SparkEnv.get.serializer
+    sfactory match {
+      case js: JavaSerializer => in.defaultReadObject()
+      case _ =>
+        rddId = in.readLong()
+        slice = in.readInt()
+
+        val ser = sfactory.newInstance()
+        Utils.deserializeViaNestedStream(in, ser)(ds => values = ds.readObject())
+    }
+  }
 }
 
 private[spark] class ParallelCollectionRDD[T: ClassManifest](
     @transient sc: SparkContext,
     @transient data: Seq[T],
     numSlices: Int,
-    locationPrefs: Map[Int,Seq[String]])
-  extends RDD[T](sc, Nil) {
+    locationPrefs: Map[Int, Seq[String]])
+    extends RDD[T](sc, Nil) {
   // TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
   // cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
   // instead.
@@ -82,16 +118,17 @@ private object ParallelCollectionRDD {
           1
         }
         slice(new Range(
-            r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
+          r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
       }
       case r: Range => {
         (0 until numSlices).map(i => {
           val start = ((i * r.length.toLong) / numSlices).toInt
-          val end = (((i+1) * r.length.toLong) / numSlices).toInt
+          val end = (((i + 1) * r.length.toLong) / numSlices).toInt
           new Range(r.start + start * r.step, r.start + end * r.step, r.step)
         }).asInstanceOf[Seq[Seq[T]]]
       }
-      case nr: NumericRange[_] => { // For ranges of Long, Double, BigInteger, etc
+      case nr: NumericRange[_] => {
+        // For ranges of Long, Double, BigInteger, etc
         val slices = new ArrayBuffer[Seq[T]](numSlices)
         val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
         var r = nr
@@ -102,10 +139,10 @@ private object ParallelCollectionRDD {
         slices
       }
       case _ => {
-        val array = seq.toArray  // To prevent O(n^2) operations for List etc
+        val array = seq.toArray // To prevent O(n^2) operations for List etc
         (0 until numSlices).map(i => {
           val start = ((i * array.length.toLong) / numSlices).toInt
-          val end = (((i+1) * array.length.toLong) / numSlices).toInt
+          val end = (((i + 1) * array.length.toLong) / numSlices).toInt
           array.slice(start, end).toSeq
         })
       }
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9b45fc29388eaba971624bd031b891abbeac2de5..89c51a44c98790c479dbf0260b563189770295bf 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -510,6 +510,12 @@ class DAGScheduler(
         tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
       }
     }
+    // must be run listener before possible NotSerializableException
+    // should be "StageSubmitted" first and then "JobEnded"
+    val properties = idToActiveJob(stage.priority).properties
+    sparkListeners.foreach(_.onStageSubmitted(
+      SparkListenerStageSubmitted(stage, tasks.size, properties)))
+    
     if (tasks.size > 0) {
       // Preemptively serialize a task to make sure it can be serialized. We are catching this
       // exception here because it would be fairly hard to catch the non-serializable exception
@@ -524,11 +530,9 @@ class DAGScheduler(
           return
       }
 
-      sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)
-      val properties = idToActiveJob(stage.priority).properties
       taskSched.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
       if (!stage.submissionTime.isDefined) {
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index f7565b8c57e14c503da6fcca0af23afa868c88bb..ad2efcec63706edc222e9f6dc357f6c9eccec2d0 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue
 import scala.collection.mutable.{Map, HashMap, ListBuffer}
 import scala.io.Source
 import spark._
+import spark.SparkContext
 import spark.executor.TaskMetrics
 import spark.scheduler.cluster.TaskInfo
 
@@ -62,7 +63,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
         event match {
           case SparkListenerJobStart(job, properties) =>
             processJobStartEvent(job, properties)
-          case SparkListenerStageSubmitted(stage, taskSize) =>
+          case SparkListenerStageSubmitted(stage, taskSize, properties) =>
             processStageSubmittedEvent(stage, taskSize)
           case StageCompleted(stageInfo) =>
             processStageCompletedEvent(stageInfo)
@@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
 
   protected def recordJobProperties(jobID: Int, properties: Properties) {
     if(properties != null) {
-      val annotation = properties.getProperty("spark.job.annotation", "")
-      jobLogInfo(jobID, annotation, false)
+      val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
+      jobLogInfo(jobID, description, false)
     }
   }
 
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 4eb7e4e6a5f8329b2e5df1466a93451a8dd66ee5..2a09a956ad05788b8fbfbdbd67c8b080151cc379 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -25,7 +25,8 @@ import spark.executor.TaskMetrics
 
 sealed trait SparkListenerEvents
 
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
+     extends SparkListenerEvents
 
 case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
 
@@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa
 case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
      taskMetrics: TaskMetrics) extends SparkListenerEvents
 
-case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null) 
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
      extends SparkListenerEvents
 
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult) 
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
      extends SparkListenerEvents
 
 trait SparkListener {
@@ -45,7 +46,7 @@ trait SparkListener {
    * Called when a stage is completed, with information on the completed stage
    */
   def onStageCompleted(stageCompleted: StageCompleted) { }
-  
+
   /**
    * Called when a stage is submitted
    */
@@ -65,12 +66,12 @@ trait SparkListener {
    * Called when a job starts
    */
   def onJobStart(jobStart: SparkListenerJobStart) { }
-  
+
   /**
    * Called when a job ends
    */
   def onJobEnd(jobEnd: SparkListenerJobEnd) { }
-  
+
 }
 
 /**
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index dc0621ea7ba5fb20ec6be60627d20d602790a1de..89793e0e8287839f62300488c0fe7ccdbcc25885 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -21,6 +21,8 @@ import java.io._
 
 import scala.collection.mutable.Map
 import spark.executor.TaskMetrics
+import spark.{Utils, SparkEnv}
+import java.nio.ByteBuffer
 
 // Task result. Also contains updates to accumulator variables.
 // TODO: Use of distributed cache to return result is a hack to get around
@@ -30,7 +32,13 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
   def this() = this(null.asInstanceOf[T], null, null)
 
   override def writeExternal(out: ObjectOutput) {
-    out.writeObject(value)
+
+    val objectSer = SparkEnv.get.serializer.newInstance()
+    val bb = objectSer.serialize(value)
+
+    out.writeInt(bb.remaining())
+    Utils.writeByteBuffer(bb, out)
+
     out.writeInt(accumUpdates.size)
     for ((key, value) <- accumUpdates) {
       out.writeLong(key)
@@ -40,7 +48,14 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
   }
 
   override def readExternal(in: ObjectInput) {
-    value = in.readObject().asInstanceOf[T]
+
+    val objectSer = SparkEnv.get.serializer.newInstance()
+
+    val blen = in.readInt()
+    val byteVal = new Array[Byte](blen)
+    in.readFully(byteVal)
+    value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+
     val numUpdates = in.readInt
     if (numUpdates == 0) {
       accumUpdates = null
diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
index 51883080069f34eb7cf54c5a7feab1b973ef8356..4943d58e254034f43c2f4ce6b305455b5cf40e1a 100644
--- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
@@ -17,6 +17,8 @@
 
 package spark.scheduler
 
+import spark.scheduler.cluster.Pool
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 /**
  * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
  * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
@@ -25,6 +27,11 @@ package spark.scheduler
  * the TaskSchedulerListener interface.
  */
 private[spark] trait TaskScheduler {
+
+  def rootPool: Pool
+
+  def schedulingMode: SchedulingMode
+
   def start(): Unit
 
   // Invoked after system has successfully initialized (typically in spark context).
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 7c10074dc761fad68116bda60c287dcb7bed0358..96568e0d276cdb57311654b098d5b54b5af639b7 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet
 import spark._
 import spark.TaskState.TaskState
 import spark.scheduler._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 import java.nio.ByteBuffer
 import java.util.concurrent.atomic.AtomicLong
 import java.util.{TimerTask, Timer}
@@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   var schedulableBuilder: SchedulableBuilder = null
   var rootPool: Pool = null
+  // default scheduler is FIFO
+  val schedulingMode: SchedulingMode = SchedulingMode.withName(
+    System.getProperty("spark.cluster.schedulingmode", "FIFO"))
 
   override def setListener(listener: TaskSchedulerListener) {
     this.listener = listener
@@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
 
   def initialize(context: SchedulerBackend) {
     backend = context
-    //default scheduler is FIFO
-    val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
-    //temporarily set rootPool name to empty
-    rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+    // temporarily set rootPool name to empty
+    rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
-        case "FIFO" =>
+        case SchedulingMode.FIFO =>
           new FIFOSchedulableBuilder(rootPool)
-        case "FAIR" =>
+        case SchedulingMode.FAIR =>
           new FairSchedulableBuilder(rootPool)
       }
     }
@@ -204,7 +206,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
           override def run() {
             if (!hasLaunchedTask) {
               logWarning("Initial job has not accepted any resources; " +
-                "check your cluster UI to ensure that workers are registered")
+                "check your cluster UI to ensure that workers are registered " +
+                "and have sufficient memory")
             } else {
               this.cancel()
             }
@@ -270,10 +273,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
       }
       var launchedTask = false
       val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-      for (manager <- sortedTaskSetQueue)
-      {
-        logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
+
+      for (manager <- sortedTaskSetQueue) {
+        logDebug("parentName:%s, name:%s, runningTasks:%s".format(
+          manager.parent.name, manager.name, manager.runningTasks))
       }
+
       for (manager <- sortedTaskSetQueue) {
 
         // Split offers based on node local, rack local and off-rack tasks.
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index ffb5890ec2aac1369ce70c8d93f1183de4951330..7f855cd345b7f6ab4ea600d0c9aed39f86574f87 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -92,7 +92,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
 
   // Serializer for closures and tasks.
-  val ser = SparkEnv.get.closureSerializer.newInstance()
+  val env = SparkEnv.get
+  val ser = env.closureSerializer.newInstance()
 
   val tasks = taskSet.tasks
   val numTasks = tasks.length
@@ -107,9 +108,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   var runningTasks = 0
   var priority = taskSet.priority
   var stageId = taskSet.stageId
-  var name = "TaskSet_" + taskSet.stageId.toString
+  var name = "TaskSet_"+taskSet.stageId.toString
   var parent: Schedulable = null
-
   // Last time when we launched a preferred task (for delay scheduling)
   var lastPreferredLaunchTime = System.currentTimeMillis
 
@@ -535,6 +535,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
   }
 
   override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    SparkEnv.set(env)
     state match {
       case TaskState.FINISHED =>
         taskFinished(tid, state, serializedData)
@@ -697,18 +698,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
     }
   }
 
-  // TODO: for now we just find Pool not TaskSetManager,
+  // TODO(xiajunluan): for now we just find Pool not TaskSetManager
   // we can extend this function in future if needed
   override def getSchedulableByName(name: String): Schedulable = {
     return null
   }
 
   override def addSchedulable(schedulable:Schedulable) {
-    //nothing
+    // nothing
   }
 
   override def removeSchedulable(schedulable:Schedulable) {
-    //nothing
+    // nothing
   }
 
   override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index f557b142c4eb7861b64626bc3e89b194d245e587..e77e8e4162e1a435d33a6a84117920b71d60ba74 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -17,14 +17,18 @@
 
 package spark.scheduler.cluster
 
-import scala.collection.mutable.ArrayBuffer
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
+import scala.collection.mutable.ArrayBuffer
 /**
  * An interface for schedulable entities.
  * there are two type of Schedulable entities(Pools and TaskSetManagers)
  */
 private[spark] trait Schedulable {
   var parent: Schedulable
+  // child queues
+  def schedulableQueue: ArrayBuffer[Schedulable]
+  def schedulingMode: SchedulingMode
   def weight: Int
   def minShare: Int
   def runningTasks: Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index 95554023c02c0efcbb6b7fe063a7bc13075372df..b2d089f31d9d01c0399006cb8b9e54ceb536003d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder {
   def addTaskSetManager(manager: Schedulable, properties: Properties)
 }
 
-private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
+  extends SchedulableBuilder with Logging {
 
   override def buildPools() {
-    //nothing
+    // nothing
   }
 
   override def addTaskSetManager(manager: Schedulable, properties: Properties) {
@@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
   }
 }
 
-private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+  extends SchedulableBuilder with Logging {
 
   val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
@@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
       }
     }
 
-    //finally create "default" pool
+    // finally create "default" pool
     if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
-      val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+      val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
+        DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
       rootPool.addSchedulable(pool)
       logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
         DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
@@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
       poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
       parentPool = rootPool.getSchedulableByName(poolName)
       if (parentPool == null) {
-        //we will create a new pool that user has configured in app instead of being defined in xml file
-        parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+        // we will create a new pool that user has configured in app
+        // instead of being defined in xml file
+        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
+          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
         rootPool.addSchedulable(parentPool)
         logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
           poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 4b3e3e50e13babcccd854c5aadcd0783b75db34d..55cdf4791f985725d7bf05f431da100ba281bb60 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -17,8 +17,13 @@
 
 package spark.scheduler.cluster
 
-object SchedulingMode extends Enumeration("FAIR","FIFO"){
+/**
+ *  "FAIR" and "FIFO" determines which policy is used
+ *    to order tasks amongst a Schedulable's sub-queues
+ *  "NONE" is used when the a Schedulable has no sub-queues.
+ */
+object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
 
   type SchedulingMode = Value
-  val FAIR,FIFO = Value
+  val FAIR,FIFO,NONE = Value
 }
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 7978a5df7464d316f195b3423983130af1c192a7..1a92a5ed6fa06bb1a2dcf212db1c7de99f7e402d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -23,7 +23,10 @@ import spark.TaskState.TaskState
 import spark.scheduler.TaskSet
 
 private[spark] trait TaskSetManager extends Schedulable {
-
+  def schedulableQueue = null
+  
+  def schedulingMode = SchedulingMode.NONE
+  
   def taskSet: TaskSet
 
   def slaveOffer(
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index edd83d4cb49639c236d0df50ee33fe66fc6b4d4d..f274b1a767984524df7b0fba6d92f9045c6ce312 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -29,6 +29,7 @@ import spark.TaskState.TaskState
 import spark.executor.ExecutorURLClassLoader
 import spark.scheduler._
 import spark.scheduler.cluster._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 import akka.actor._
 
 /**
@@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
 
   var schedulableBuilder: SchedulableBuilder = null
   var rootPool: Pool = null
+  val schedulingMode: SchedulingMode = SchedulingMode.withName(
+    System.getProperty("spark.cluster.schedulingmode", "FIFO"))
   val activeTaskSets = new HashMap[String, TaskSetManager]
   val taskIdToTaskSetId = new HashMap[Long, String]
   val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
   var localActor: ActorRef = null
 
   override def start() {
-    //default scheduler is FIFO
-    val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
-    //temporarily set rootPool name to empty
-    rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+    // temporarily set rootPool name to empty
+    rootPool = new Pool("", schedulingMode, 0, 0)
     schedulableBuilder = {
       schedulingMode match {
-        case "FIFO" =>
+        case SchedulingMode.FIFO =>
           new FIFOSchedulableBuilder(rootPool)
-        case "FAIR" =>
+        case SchedulingMode.FAIR =>
           new FairSchedulableBuilder(rootPool)
       }
     }
@@ -168,7 +169,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
     // Set the Spark execution environment for the worker thread
     SparkEnv.set(env)
     val ser = SparkEnv.get.closureSerializer.newInstance()
-    var attemptedTask: Option[Task[_]] = None
+    val objectSer = SparkEnv.get.serializer.newInstance()
+    var attemptedTask: Option[Task[_]] = None   
     val start = System.currentTimeMillis()
     var taskStart: Long = 0
     try {
@@ -192,9 +194,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
       // executor does. This is useful to catch serialization errors early
       // on in development (so when users move their local Spark programs
       // to the cluster, they don't get surprised by serialization errors).
-      val serResult = ser.serialize(result)
+      val serResult = objectSer.serialize(result)
       deserializedTask.metrics.get.resultSize = serResult.limit()
-      val resultToReturn = ser.deserialize[Any](serResult)
+      val resultToReturn = objectSer.deserialize[Any](serResult)
       val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
         ser.serialize(Accumulators.values))
       val serviceTime = System.currentTimeMillis() - taskStart
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index b29740c886e572b7f82413dce53097fe710ec1d4..c38eeb9e11eb96b23fad0efd906cfc927ce179af 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -42,7 +42,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   val taskInfos = new HashMap[Long, TaskInfo]
   val numTasks = taskSet.tasks.size
   var numFinished = 0
-  val ser = SparkEnv.get.closureSerializer.newInstance()
+  val env = SparkEnv.get
+  val ser = env.closureSerializer.newInstance()
   val copiesRunning = new Array[Int](numTasks)
   val finished = new Array[Boolean](numTasks)
   val numFailures = new Array[Int](numTasks)
@@ -63,11 +64,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   override def addSchedulable(schedulable: Schedulable): Unit = {
-    //nothing
+    // nothing
   }
 
   override def removeSchedulable(schedulable: Schedulable): Unit = {
-    //nothing
+    // nothing
   }
 
   override def getSchedulableByName(name: String): Schedulable = {
@@ -75,7 +76,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   override def executorLost(executorId: String, host: String): Unit = {
-    //nothing
+    // nothing
   }
 
   override def checkSpeculatableTasks() = true
@@ -143,6 +144,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
   }
 
   override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+    SparkEnv.set(env)
     state match {
       case TaskState.FINISHED =>
         taskEnded(tid, state, serializedData)
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index 80c0bebc6646f0f288f3b56e32bcd6879b0f17fe..3b63e8b343e6b03286a7039d1b84a0da6f4526ca 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -28,14 +28,14 @@ private[spark] object UIUtils {
   /** Returns a spark page with correctly formatted headers */
   def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
   : Seq[Node] = {
-    val storage = page match {
-      case Storage => <li class="active"><a href="/storage">Storage</a></li>
-      case _ => <li><a href="/storage">Storage</a></li>
-    }
     val jobs = page match {
       case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
       case _ => <li><a href="/stages">Jobs</a></li>
     }
+    val storage = page match {
+      case Storage => <li class="active"><a href="/storage">Storage</a></li>
+      case _ => <li><a href="/storage">Storage</a></li>
+    }
     val environment = page match {
       case Environment => <li class="active"><a href="/environment">Environment</a></li>
       case _ => <li><a href="/environment">Environment</a></li>
@@ -65,17 +65,14 @@ private[spark] object UIUtils {
               <div class="navbar">
                 <div class="navbar-inner">
                   <div class="container">
-                    <div class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></div>
-                    <ul class="nav nav-pills">
-                      {storage}
+                    <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
+                    <ul class="nav">
                       {jobs}
+                      {storage}
                       {environment}
                       {executors}
                     </ul>
-                    <ul id="infolist" class="text">
-                      <li>Application: <strong>{sc.appName}</strong></li>
-                      <li>Executors: <strong>{sc.getExecutorStorageStatus.size}</strong></li>
-                    </ul>
+                    <p class="navbar-text pull-right">Application: <strong>{sc.appName}</strong></p>
                   </div>
                 </div>
               </div>
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index a80e2d7002689873849b7e510f3bda9fd5ac7d24..97ea644021fff6c7c4490064850c3b5a3e73df4b 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -21,7 +21,8 @@ import scala.util.Random
 
 import spark.SparkContext
 import spark.SparkContext._
-
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 /**
  * Continuously generates jobs that expose various features of the WebUI (internal testing tool).
  *
@@ -29,18 +30,29 @@ import spark.SparkContext._
  */
 private[spark] object UIWorkloadGenerator {
   val NUM_PARTITIONS = 100
-  val INTER_JOB_WAIT_MS = 500
+  val INTER_JOB_WAIT_MS = 5000
 
   def main(args: Array[String]) {
+    if (args.length < 2) {
+      println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+      System.exit(1)
+    }
     val master = args(0)
+    val schedulingMode = SchedulingMode.withName(args(1))
     val appName = "Spark UI Tester"
+
+    if (schedulingMode == SchedulingMode.FAIR) {
+      System.setProperty("spark.cluster.schedulingmode", "FAIR")
+    }
     val sc = new SparkContext(master, appName)
 
-    // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
-    //       but we pass it here anyways since it will be useful once we do.
-    def setName(s: String) = {
-      sc.addLocalProperties("spark.job.annotation", s)
+    def setProperties(s: String) = {
+      if(schedulingMode == SchedulingMode.FAIR) {
+        sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
+      }
+      sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
     }
+
     val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
     def nextFloat() = (new Random()).nextFloat()
 
@@ -73,14 +85,18 @@ private[spark] object UIWorkloadGenerator {
 
     while (true) {
       for ((desc, job) <- jobs) {
-        try {
-          setName(desc)
-          job()
-          println("Job funished: " + desc)
-        } catch {
-          case e: Exception =>
-            println("Job Failed: " + desc)
-        }
+        new Thread {
+          override def run() {
+            try {
+              setProperties(desc)
+              job()
+              println("Job funished: " + desc)
+            } catch {
+              case e: Exception =>
+                println("Job Failed: " + desc)
+            }
+          }
+        }.start
         Thread.sleep(INTER_JOB_WAIT_MS)
       }
     }
diff --git a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
index 8c3e9804f7828399f6ea854936a2ce013e849cd1..dc39b91648e9b4cf35562c80c6a9140adc7dd1bb 100644
--- a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
@@ -44,7 +44,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
       ("Java Home", Properties.javaHome),
       ("Scala Version", Properties.versionString),
       ("Scala Home", Properties.scalaHome)
-    )
+    ).sorted
     def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
     def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
 
@@ -53,8 +53,8 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
         .filter{case (k, v) => k.contains("java.class.path")}
         .headOption
         .getOrElse("", "")
-    val sparkProperties = properties.filter(_._1.startsWith("spark"))
-    val otherProperties = properties.diff(sparkProperties :+ classPathProperty)
+    val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
+    val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
 
     val propertyHeaders = Seq("Name", "Value")
     def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
@@ -67,7 +67,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
         .map(e => (e, "System Classpath"))
     val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
     val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
-    val classPath = addedJars ++ addedFiles ++ classPathEntries
+    val classPath = (addedJars ++ addedFiles ++ classPathEntries).sorted
 
     val classPathHeaders = Seq("Resource", "Source")
     def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 4be2bfa413dc8be04d39ccec29e7987688dcefab..6ec48f70a456a79e79e982dcb20b21510b222f35 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
       .getOrElse(0).toString
     val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
     val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
-    val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
+    val totalTasks = activeTasks + failedTasks + completedTasks
 
     Seq(
       execId,
@@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
     val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
     val executorToTasksComplete = HashMap[String, Int]()
     val executorToTasksFailed = HashMap[String, Int]()
-    val executorToTaskInfos =
-      HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
 
     override def onTaskStart(taskStart: SparkListenerTaskStart) {
       val eid = taskStart.taskInfo.executorId
       val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
       activeTasks += taskStart.taskInfo
-      val taskList = executorToTaskInfos.getOrElse(
-        eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
-      taskList += ((taskStart.taskInfo, None, None))
-      executorToTaskInfos(eid) = taskList
     }
 
     override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
             executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
             (None, Option(taskEnd.taskMetrics))
         }
-      val taskList = executorToTaskInfos.getOrElse(
-        eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
-      taskList -= ((taskEnd.taskInfo, None, None))
-      taskList += ((taskEnd.taskInfo, metrics, failureInfo))
-      executorToTaskInfos(eid) = taskList
     }
   }
 }
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index e8bebc66513ab6591b74ca496eb75d820f927ec5..e34af1ab8939873b730f60834194d7efa44fa2e6 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -17,25 +17,18 @@
 
 package spark.ui.jobs
 
-import java.util.Date
-
 import javax.servlet.http.HttpServletRequest
 
-import scala.collection.mutable.HashSet
-import scala.Some
 import scala.xml.{NodeSeq, Node}
 
-import spark.scheduler.cluster.TaskInfo
-import spark.scheduler.Stage
-import spark.storage.StorageLevel
+import spark.scheduler.cluster.SchedulingMode
 import spark.ui.Page._
 import spark.ui.UIUtils._
 import spark.Utils
 
-/** Page showing list of all ongoing and recently finished stages */
+/** Page showing list of all ongoing and recently finished stages and pools*/
 private[spark] class IndexPage(parent: JobProgressUI) {
   def listener = parent.listener
-  val dateFmt = parent.dateFmt
 
   def render(request: HttpServletRequest): Seq[Node] = {
     val activeStages = listener.activeStages.toSeq
@@ -48,29 +41,19 @@ private[spark] class IndexPage(parent: JobProgressUI) {
       activeTime += t.timeRunning(now)
     }
 
-    /** Special table which merges two header cells. */
-    def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
-      <table class="table table-bordered table-striped table-condensed sortable">
-        <thead>
-          <th>Stage Id</th>
-          <th>Origin</th>
-          <th>Submitted</th>
-          <th>Duration</th>
-          <th colspan="2">Tasks: Complete/Total</th>
-          <th>Shuffle Read</th>
-          <th>Shuffle Write</th>
-          <th>Stored RDD</th>
-        </thead>
-        <tbody>
-          {rows.map(r => makeRow(r))}
-        </tbody>
-      </table>
-    }
+    val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+    val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+    val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
 
+    val poolTable = new PoolTable(listener.sc.getAllPools, listener)
     val summary: NodeSeq =
      <div>
        <ul class="unstyled">
-          <li>
+         <li>
+           <strong>Duration: </strong>
+           {parent.formatDuration(now - listener.sc.startTime)}
+         </li>
+         <li>
             <strong>CPU time: </strong>
             {parent.formatDuration(listener.totalTime + activeTime)}
           </li>
@@ -86,79 +69,35 @@ private[spark] class IndexPage(parent: JobProgressUI) {
               {Utils.memoryBytesToString(listener.totalShuffleWrite)}
             </li>
          }
+         <li>
+           <a href="#active"><strong>Active Stages:</strong></a>
+           {activeStages.size}
+         </li>
+         <li>
+           <a href="#completed"><strong>Completed Stages:</strong></a>
+           {completedStages.size}
+         </li>
+         <li>
+           <a href="#failed"><strong>Failed Stages:</strong></a>
+           {failedStages.size}
+         </li>
+         <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
        </ul>
      </div>
-    val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
-    val completedStageTable = stageTable(stageRow, completedStages)
-    val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
 
-    val content = summary ++
-                  <h4>Active Stages</h4> ++ activeStageTable ++
-                  <h4>Completed Stages</h4>  ++ completedStageTable ++
-                  <h4>Failed Stages</h4>  ++ failedStageTable
+    val content = summary ++ 
+      {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
+         <h4>Pools</h4> ++ poolTable.toNodeSeq
+      } else {
+        Seq()
+      }} ++
+      <h4 id="active">Active Stages : {activeStages.size}</h4> ++
+      activeStagesTable.toNodeSeq++
+      <h4 id="completed">Completed Stages : {completedStages.size}</h4> ++
+      completedStagesTable.toNodeSeq++
+      <h4 id ="failed">Failed Stages : {failedStages.size}</h4> ++
+      failedStagesTable.toNodeSeq
 
     headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
   }
-
-  def getElapsedTime(submitted: Option[Long], completed: Long): String = {
-    submitted match {
-      case Some(t) => parent.formatDuration(completed - t)
-      case _ => "Unknown"
-    }
-  }
-
-  def makeProgressBar(started: Int, completed: Int, total: Int): Seq[Node] = {
-    val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
-    val startWidth = "width: %s%%".format((started.toDouble/total)*100)
-
-    <div class="progress" style="height: 15px; margin-bottom: 0px">
-      <div class="bar" style={completeWidth}></div>
-      <div class="bar bar-info" style={startWidth}></div>
-    </div>
-  }
-
-
-  def stageRow(s: Stage): Seq[Node] = {
-    val submissionTime = s.submissionTime match {
-      case Some(t) => dateFmt.format(new Date(t))
-      case None => "Unknown"
-    }
-
-    val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
-      case 0 => ""
-      case b => Utils.memoryBytesToString(b)
-    }
-    val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
-      case 0 => ""
-      case b => Utils.memoryBytesToString(b)
-    }
-
-    val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
-    val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
-    val totalTasks = s.numPartitions
-
-    <tr>
-      <td>{s.id}</td>
-      <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
-      <td>{submissionTime}</td>
-      <td>{getElapsedTime(s.submissionTime,
-             s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
-      <td class="progress-cell">{makeProgressBar(startedTasks, completedTasks, totalTasks)}</td>
-      <td style="border-left: 0; text-align: center;">
-        {completedTasks} / {totalTasks}
-        {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
-        case f if f > 0 => "(%s failed)".format(f)
-        case _ =>
-        }}
-      </td>
-      <td>{shuffleRead}</td>
-      <td>{shuffleWrite}</td>
-      <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
-             <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
-               {Option(s.rdd.name).getOrElse(s.rdd.id)}
-             </a>
-          }}
-      </td>
-    </tr>
-  }
 }
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
new file mode 100644
index 0000000000000000000000000000000000000000..c6103edcb0f180482d581b6343adf8a59497b704
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -0,0 +1,167 @@
+package spark.ui.jobs
+
+import scala.Seq
+import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
+
+import spark.{ExceptionFailure, SparkContext, Success, Utils}
+import spark.scheduler._
+import spark.scheduler.cluster.TaskInfo
+import spark.executor.TaskMetrics
+import collection.mutable
+
+private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
+  // How many stages to remember
+  val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+  val DEFAULT_POOL_NAME = "default"
+
+  val stageToPool = new HashMap[Stage, String]()
+  val stageToDescription = new HashMap[Stage, String]()
+  val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
+
+  val activeStages = HashSet[Stage]()
+  val completedStages = ListBuffer[Stage]()
+  val failedStages = ListBuffer[Stage]()
+
+  // Total metrics reflect metrics only for completed tasks
+  var totalTime = 0L
+  var totalShuffleRead = 0L
+  var totalShuffleWrite = 0L
+
+  val stageToTime = HashMap[Int, Long]()
+  val stageToShuffleRead = HashMap[Int, Long]()
+  val stageToShuffleWrite = HashMap[Int, Long]()
+  val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
+  val stageToTasksComplete = HashMap[Int, Int]()
+  val stageToTasksFailed = HashMap[Int, Int]()
+  val stageToTaskInfos =
+    HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+  override def onJobStart(jobStart: SparkListenerJobStart) {}
+
+  override def onStageCompleted(stageCompleted: StageCompleted) = {
+    val stage = stageCompleted.stageInfo.stage
+    poolToActiveStages(stageToPool(stage)) -= stage
+    activeStages -= stage
+    completedStages += stage
+    trimIfNecessary(completedStages)
+  }
+
+  /** If stages is too large, remove and garbage collect old stages */
+  def trimIfNecessary(stages: ListBuffer[Stage]) {
+    if (stages.size > RETAINED_STAGES) {
+      val toRemove = RETAINED_STAGES / 10
+      stages.takeRight(toRemove).foreach( s => {
+        stageToTaskInfos.remove(s.id)
+        stageToTime.remove(s.id)
+        stageToShuffleRead.remove(s.id)
+        stageToShuffleWrite.remove(s.id)
+        stageToTasksActive.remove(s.id)
+        stageToTasksComplete.remove(s.id)
+        stageToTasksFailed.remove(s.id)
+        stageToPool.remove(s)
+        if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
+      })
+      stages.trimEnd(toRemove)
+    }
+  }
+
+  /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
+    val stage = stageSubmitted.stage
+    activeStages += stage
+
+    val poolName = Option(stageSubmitted.properties).map {
+      p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+    }.getOrElse(DEFAULT_POOL_NAME)
+    stageToPool(stage) = poolName
+
+    val description = Option(stageSubmitted.properties).flatMap {
+      p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+    }
+    description.map(d => stageToDescription(stage) = d)
+
+    val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+    stages += stage
+  }
+  
+  override def onTaskStart(taskStart: SparkListenerTaskStart) {
+    val sid = taskStart.task.stageId
+    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+    tasksActive += taskStart.taskInfo
+    val taskList = stageToTaskInfos.getOrElse(
+      sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+    taskList += ((taskStart.taskInfo, None, None))
+    stageToTaskInfos(sid) = taskList
+  }
+ 
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+    val sid = taskEnd.task.stageId
+    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+    tasksActive -= taskEnd.taskInfo
+    val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+      taskEnd.reason match {
+        case e: ExceptionFailure =>
+          stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+          (Some(e), e.metrics)
+        case _ =>
+          stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+          (None, Option(taskEnd.taskMetrics))
+      }
+
+    stageToTime.getOrElseUpdate(sid, 0L)
+    val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+    stageToTime(sid) += time
+    totalTime += time
+
+    stageToShuffleRead.getOrElseUpdate(sid, 0L)
+    val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+      s.remoteBytesRead).getOrElse(0L)
+    stageToShuffleRead(sid) += shuffleRead
+    totalShuffleRead += shuffleRead
+
+    stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+    val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+      s.shuffleBytesWritten).getOrElse(0L)
+    stageToShuffleWrite(sid) += shuffleWrite
+    totalShuffleWrite += shuffleWrite
+
+    val taskList = stageToTaskInfos.getOrElse(
+      sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+    taskList -= ((taskEnd.taskInfo, None, None))
+    taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+    stageToTaskInfos(sid) = taskList
+  }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+    jobEnd match {
+      case end: SparkListenerJobEnd =>
+        end.jobResult match {
+          case JobFailed(ex, Some(stage)) =>
+            activeStages -= stage
+            poolToActiveStages(stageToPool(stage)) -= stage
+            failedStages += stage
+            trimIfNecessary(failedStages)
+          case _ =>
+        }
+      case _ =>
+    }
+  }
+
+  /** Is this stage's input from a shuffle read. */
+  def hasShuffleRead(stageID: Int): Boolean = {
+    // This is written in a slightly complicated way to avoid having to scan all tasks
+    for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+      if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
+    }
+    return false // No tasks have finished for this stage
+  }
+
+  /** Is this stage's output to a shuffle write. */
+  def hasShuffleWrite(stageID: Int): Boolean = {
+    // This is written in a slightly complicated way to avoid having to scan all tasks
+    for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+      if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
+    }
+    return false // No tasks have finished for this stage
+  }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 09d24b6302047746d4bdf5a1339a91033d2d5718..c83f102ff32ff024dc1c776741209176bdeaf1f3 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -31,9 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
 import spark.ui.JettyUtils._
 import spark.{ExceptionFailure, SparkContext, Success, Utils}
 import spark.scheduler._
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
 import collection.mutable
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
 private[spark] class JobProgressUI(val sc: SparkContext) {
@@ -43,9 +43,10 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
 
   private val indexPage = new IndexPage(this)
   private val stagePage = new StagePage(this)
+  private val poolPage = new PoolPage(this)
 
   def start() {
-    _listener = Some(new JobProgressListener)
+    _listener = Some(new JobProgressListener(sc))
     sc.addSparkListener(listener)
   }
 
@@ -53,120 +54,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
 
   def getHandlers = Seq[(String, Handler)](
     ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
+    ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
     ("/stages", (request: HttpServletRequest) => indexPage.render(request))
   )
 }
-
-private[spark] class JobProgressListener extends SparkListener {
-  // How many stages to remember
-  val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
-
-  val activeStages = HashSet[Stage]()
-  val completedStages = ListBuffer[Stage]()
-  val failedStages = ListBuffer[Stage]()
-
-  // Total metrics reflect metrics only for completed tasks
-  var totalTime = 0L
-  var totalShuffleRead = 0L
-  var totalShuffleWrite = 0L
-
-  val stageToTime = HashMap[Int, Long]()
-  val stageToShuffleRead = HashMap[Int, Long]()
-  val stageToShuffleWrite = HashMap[Int, Long]()
-  val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
-  val stageToTasksComplete = HashMap[Int, Int]()
-  val stageToTasksFailed = HashMap[Int, Int]()
-  val stageToTaskInfos =
-    HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
-
-  override def onJobStart(jobStart: SparkListenerJobStart) {}
-
-  override def onStageCompleted(stageCompleted: StageCompleted) = {
-    val stage = stageCompleted.stageInfo.stage
-    activeStages -= stage
-    completedStages += stage
-    trimIfNecessary(completedStages)
-  }
-
-  /** If stages is too large, remove and garbage collect old stages */
-  def trimIfNecessary(stages: ListBuffer[Stage]) {
-    if (stages.size > RETAINED_STAGES) {
-      val toRemove = RETAINED_STAGES / 10
-      stages.takeRight(toRemove).foreach( s => {
-        stageToTaskInfos.remove(s.id)
-        stageToTime.remove(s.id)
-        stageToShuffleRead.remove(s.id)
-        stageToShuffleWrite.remove(s.id)
-        stageToTasksActive.remove(s.id)
-        stageToTasksComplete.remove(s.id)
-        stageToTasksFailed.remove(s.id)
-      })
-      stages.trimEnd(toRemove)
-    }
-  }
-
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
-    activeStages += stageSubmitted.stage
-
-  override def onTaskStart(taskStart: SparkListenerTaskStart) {
-    val sid = taskStart.task.stageId
-    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
-    tasksActive += taskStart.taskInfo
-    val taskList = stageToTaskInfos.getOrElse(
-      sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
-    taskList += ((taskStart.taskInfo, None, None))
-    stageToTaskInfos(sid) = taskList
-  }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-    val sid = taskEnd.task.stageId
-    val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
-    tasksActive -= taskEnd.taskInfo
-    val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
-      taskEnd.reason match {
-        case e: ExceptionFailure =>
-          stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
-          (Some(e), e.metrics)
-        case _ =>
-          stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
-          (None, Option(taskEnd.taskMetrics))
-      }
-
-    stageToTime.getOrElseUpdate(sid, 0L)
-    val time = metrics.map(m => m.executorRunTime).getOrElse(0)
-    stageToTime(sid) += time
-    totalTime += time
-
-    stageToShuffleRead.getOrElseUpdate(sid, 0L)
-    val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
-      s.remoteBytesRead).getOrElse(0L)
-    stageToShuffleRead(sid) += shuffleRead
-    totalShuffleRead += shuffleRead
-
-    stageToShuffleWrite.getOrElseUpdate(sid, 0L)
-    val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
-      s.shuffleBytesWritten).getOrElse(0L)
-    stageToShuffleWrite(sid) += shuffleWrite
-    totalShuffleWrite += shuffleWrite
-
-    val taskList = stageToTaskInfos.getOrElse(
-      sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
-    taskList -= ((taskEnd.taskInfo, None, None))
-    taskList += ((taskEnd.taskInfo, metrics, failureInfo))
-    stageToTaskInfos(sid) = taskList
-  }
-
-  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
-    jobEnd match {
-      case end: SparkListenerJobEnd =>
-        end.jobResult match {
-          case JobFailed(ex, Some(stage)) =>
-            activeStages -= stage
-            failedStages += stage
-            trimIfNecessary(failedStages)
-          case _ =>
-        }
-      case _ =>
-    }
-  }
-}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
new file mode 100644
index 0000000000000000000000000000000000000000..647c6d2ae3edb836e75f806073d1141c02dee0ea
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -0,0 +1,30 @@
+package spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+
+/** Page showing specific pool details */
+private[spark] class PoolPage(parent: JobProgressUI) {
+  def listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    val poolName = request.getParameter("poolname")
+    val poolToActiveStages = listener.poolToActiveStages
+    val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
+    val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+
+    val pool = listener.sc.getPoolForName(poolName).get
+    val poolTable = new PoolTable(Seq(pool), listener)
+
+    val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
+                  <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
+
+    headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
+  }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
new file mode 100644
index 0000000000000000000000000000000000000000..9cfe0d68f0eb6f921e5b70d2c92be07023ec6594
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -0,0 +1,49 @@
+package spark.ui.jobs
+
+import scala.xml.Node
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.Stage
+import spark.scheduler.cluster.Schedulable
+
+/** Table showing list of pools */
+private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
+
+  var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
+
+  def toNodeSeq(): Seq[Node] = {
+    poolTable(poolRow, pools)
+  }
+
+  // pool tables
+  def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
+    rows: Seq[Schedulable]
+    ): Seq[Node] = {
+    <table class="table table-bordered table-striped table-condensed sortable">
+      <thead>
+        <th>Pool Name</th>
+        <th>Minimum Share</th>
+        <th>Pool Weight</th>
+        <td>Active Stages</td>
+        <td>Running Tasks</td>
+        <td>SchedulingMode</td>
+      </thead>
+      <tbody>
+        {rows.map(r => makeRow(r, poolToActiveStages))}
+      </tbody>
+    </table>
+  }
+
+  def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
+    <tr>
+      <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+      <td>{p.minShare}</td>
+      <td>{p.weight}</td>
+      <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td>
+      <td>{p.runningTasks}</td>
+      <td>{p.schedulingMode}</td>
+    </tr>
+  }
+}
+
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index b2bbbd9eb531a99060cc12e92634c0929695f8f5..02f9adf8a8fea7fcd3c7db6908c5b3e41a774a6b 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
       return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
     }
 
-    val tasks = listener.stageToTaskInfos(stageId)
+    val tasks = listener.stageToTaskInfos(stageId).toSeq
 
     val shuffleRead = listener.stageToShuffleRead(stageId) > 0
     val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
new file mode 100644
index 0000000000000000000000000000000000000000..1df0e0913c6755cee8ecc5d5cf7b146a65fd8d7c
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -0,0 +1,116 @@
+package spark.ui.jobs
+
+import java.util.Date
+import java.text.SimpleDateFormat
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.Some
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+import spark.Utils
+import spark.storage.StorageLevel
+
+/** Page showing list of all ongoing and recently finished stages */
+private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
+
+  val listener = parent.listener
+  val dateFmt = parent.dateFmt
+  val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+  def toNodeSeq(): Seq[Node] = {
+    stageTable(stageRow, stages)
+  }
+
+  /** Special table which merges two header cells. */
+  def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+    <table class="table table-bordered table-striped table-condensed sortable">
+      <thead>
+        <th>Stage Id</th>
+        {if (isFairScheduler) {<th>Pool Name</th>} else {}}
+        <th>Description</th>
+        <th>Submitted</th>
+        <td>Duration</td>
+        <td>Tasks: Succeeded/Total</td>
+        <td>Shuffle Read</td>
+        <td>Shuffle Write</td>
+      </thead>
+      <tbody>
+        {rows.map(r => makeRow(r))}
+      </tbody>
+    </table>
+  }
+
+  def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+    submitted match {
+      case Some(t) => parent.formatDuration(completed - t)
+      case _ => "Unknown"
+    }
+  }
+
+  def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+    val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+    val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+
+    <div class="progress" style="height: 15px; margin-bottom: 0px; position: relative">
+      <span style="text-align:center; position:absolute; width:100%;">
+        {completed}/{total} {failed}
+      </span>
+      <div class="bar bar-completed" style={completeWidth}></div>
+      <div class="bar bar-running" style={startWidth}></div>
+    </div>
+  }
+
+
+  def stageRow(s: Stage): Seq[Node] = {
+    val submissionTime = s.submissionTime match {
+      case Some(t) => dateFmt.format(new Date(t))
+      case None => "Unknown"
+    }
+
+    val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+      case 0 => ""
+      case b => Utils.memoryBytesToString(b)
+    }
+    val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+      case 0 => ""
+      case b => Utils.memoryBytesToString(b)
+    }
+
+    val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
+    val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
+    val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+        case f if f > 0 => "(%s failed)".format(f)
+        case _ => ""
+    }
+    val totalTasks = s.numPartitions
+
+    val poolName = listener.stageToPool.get(s)
+
+    val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+    val description = listener.stageToDescription.get(s)
+      .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+
+    <tr>
+      <td>{s.id}</td>
+      {if (isFairScheduler) {
+        <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+      }
+      <td>{description}</td>
+      <td valign="middle">{submissionTime}</td>
+      <td>{getElapsedTime(s.submissionTime,
+             s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+      <td class="progress-cell">
+        {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+      </td>
+      <td>{shuffleRead}</td>
+      <td>{shuffleWrite}</td>
+    </tr>
+  }
+}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 7f4ac830eb58dac753eff1d2cb0c8b8407af6321..4c3ee12c987a0f0c0594b48601af60d980aaebb8 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -83,18 +83,19 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
       <hr/>
       <div class="row">
         <div class="span12">
+          <h3> Data Distribution Summary </h3>
           {workerTable}
         </div>
       </div>
       <hr/>
       <div class="row">
         <div class="span12">
-          <h4> RDD Summary </h4>
+          <h4> Partitions </h4>
           {blockTable}
         </div>
       </div>;
 
-    headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs)
+    headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Storage)
   }
 
   def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 30d2d5282bc84bf887d25c5233020be9e7d06499..01390027c8de1bb5cfd5f1d8f8cc3f4a4c2a7d55 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -22,7 +22,9 @@ import scala.collection.mutable
 import org.scalatest.FunSuite
 import com.esotericsoftware.kryo._
 
-class KryoSerializerSuite extends FunSuite {
+import KryoTest._
+
+class KryoSerializerSuite extends FunSuite with SharedSparkContext {
   test("basic types") {
     val ser = (new KryoSerializer).newInstance()
     def check[T](t: T) {
@@ -124,6 +126,45 @@ class KryoSerializerSuite extends FunSuite {
 
     System.clearProperty("spark.kryo.registrator")
   }
+
+  test("kryo with collect") {
+    val control = 1 :: 2 :: Nil
+    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
+    assert(control === result.toSeq)
+  }
+
+  test("kryo with parallelize") {
+    val control = 1 :: 2 :: Nil
+    val result = sc.parallelize(control.map(new ClassWithoutNoArgConstructor(_))).map(_.x).collect()
+    assert (control === result.toSeq)
+  }
+
+  test("kryo with reduce") {
+    val control = 1 :: 2 :: Nil
+    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
+        .reduce((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
+    assert(control.sum === result)
+  }
+
+  // TODO: this still doesn't work
+  ignore("kryo with fold") {
+    val control = 1 :: 2 :: Nil
+    val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
+        .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
+    assert(10 + control.sum === result)
+  }
+
+  override def beforeAll() {
+    System.setProperty("spark.serializer", "spark.KryoSerializer")
+    System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
+    super.beforeAll()
+  }
+
+  override def afterAll() {
+    super.afterAll()
+    System.clearProperty("spark.kryo.registrator")
+    System.clearProperty("spark.serializer")
+  }
 }
 
 object KryoTest {
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index a8b88d7936e4f5413d540c5c56c0fa094286aaff..caaf3209fd660f7f88b2b1eace053732d4e8bb51 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -32,6 +32,10 @@ import spark.{Dependency, ShuffleDependency, OneToOneDependency}
 import spark.{FetchFailed, Success, TaskEndReason}
 import spark.storage.{BlockManagerId, BlockManagerMaster}
 
+import spark.scheduler.cluster.Pool
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
 /**
  * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
  * rather than spawning an event loop thread as happens in the real code. They use EasyMock
@@ -49,6 +53,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
   /** Set of TaskSets the DAGScheduler has requested executed. */
   val taskSets = scala.collection.mutable.Buffer[TaskSet]()
   val taskScheduler = new TaskScheduler() {
+    override def rootPool: Pool = null
+    override def schedulingMode: SchedulingMode = SchedulingMode.NONE
     override def start() = {}
     override def stop() = {}
     override def submitTasks(taskSet: TaskSet) = {
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index 0f855c38da4f49f7abc3e5e119cc9f1bdf09afd3..bb9e715f95e217bd2c2b7c5f7a3bcbe90c782ae9 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -57,7 +57,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
     val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
     val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
     
-    joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
+    joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
     joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
     parentRdd.setName("MyRDD")
     joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 14bb58731b7449a06a1d4a56c7099e016269090e..66fd59e8bbee62853c13dc77e2ba2c0546307eae 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
     TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
     new Thread {
       if (poolName != null) {
-        sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
+        sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
       }
       override def run() {
         val ans = nums.map(number => {
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 3986c0c79dceaecb0da46adbf07241b64229f650..7463844a4e36aba77ff99f63fbdabbebe32d2df0 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -43,7 +43,7 @@ Finally, the following configuration options can be passed to the master and wor
   </tr>
   <tr>
     <td><code>-p PORT</code>, <code>--port PORT</code></td>
-    <td>IP address or DNS name to listen on (default: 7077 for master, random for worker)</td>
+    <td>Port for service to listen on (default: 7077 for master, random for worker)</td>
   </tr>
   <tr>
     <td><code>--webui-port PORT</code></td>
diff --git a/examples/src/main/java/spark/examples/JavaPageRank.java b/examples/src/main/java/spark/examples/JavaPageRank.java
new file mode 100644
index 0000000000000000000000000000000000000000..9d90ef91748bc9bed3109d5d49e92f467f571858
--- /dev/null
+++ b/examples/src/main/java/spark/examples/JavaPageRank.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.examples;
+
+import scala.Tuple2;
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.FlatMapFunction;
+import spark.api.java.function.Function;
+import spark.api.java.function.PairFlatMapFunction;
+import spark.api.java.function.PairFunction;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Computes the PageRank of URLs from an input file. Input file should
+ * be in format of:
+ * URL         neighbor URL
+ * URL         neighbor URL
+ * URL         neighbor URL
+ * ...
+ * where URL and their neighbors are separated by space(s).
+ */
+public class JavaPageRank {
+  private static double sum(List<Double> numbers) {
+    double out = 0.0;
+    for (double number : numbers) {
+      out += number;
+    }
+    return out;
+  }
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 3) {
+      System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
+      System.exit(1);
+    }
+
+    JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
+      System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+    // Loads in input file. It should be in format of:
+    //     URL         neighbor URL
+    //     URL         neighbor URL
+    //     URL         neighbor URL
+    //     ...
+    JavaRDD<String> lines = ctx.textFile(args[1], 1);
+
+    // Loads all URLs from input file and initialize their neighbors.
+    JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
+      @Override
+      public Tuple2<String, String> call(String s) {
+        String[] parts = s.split("\\s+");
+        return new Tuple2<String, String>(parts[0], parts[1]);
+      }
+    }).distinct().groupByKey().cache();
+
+    // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
+    JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
+      @Override
+      public Double call(List<String> rs) throws Exception {
+        return 1.0;
+      }
+    });
+
+    // Calculates and updates URL ranks continuously using PageRank algorithm.
+    for (int current = 0; current < Integer.parseInt(args[2]); current++) {
+      // Calculates URL contributions to the rank of other URLs.
+      JavaPairRDD<String, Double> contribs = links.join(ranks).values()
+        .flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+          @Override
+          public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
+            List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+            for (String n : s._1) {
+              results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
+            }
+
+            return results;
+          }
+      });
+
+      // Re-calculates URL ranks based on neighbor contributions.
+      ranks = contribs.groupByKey().mapValues(new Function<List<Double>, Double>() {
+        @Override
+        public Double call(List<Double> cs) throws Exception {
+          return 0.15 + sum(cs) * 0.85;
+        }
+      });
+    }
+
+    // Collects all URL ranks and dump them to console.
+    List<Tuple2<String, Double>> output = ranks.collect();
+    for (Tuple2 tuple : output) {
+        System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
+    }
+
+    System.exit(0);
+  }
+}
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 7281b2fcb9196dd7056e72bacee15e0f984b06f6..6ecf0151a1d2f7ba6cc3093bc3a14ab7ec65e8b4 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -418,6 +418,7 @@ object ALS {
     System.setProperty("spark.serializer", "spark.KryoSerializer")
     System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
     System.setProperty("spark.kryo.referenceTracking", "false")
+    System.setProperty("spark.kryoserializer.buffer.mb", "8")
     System.setProperty("spark.locality.wait", "10000")
     val sc = new SparkContext(master, "ALS")
     val ratings = sc.textFile(ratingsFile).map { line =>
diff --git a/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala
new file mode 100644
index 0000000000000000000000000000000000000000..88992cde0cf8ed81a488673b74467e90d535ebd7
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.recommendation
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+/**
+* Generate RDD(s) containing data for Matrix Factorization.
+*
+* This method samples training entries according to the oversampling factor
+* 'trainSampFact', which is a multiplicative factor of the number of
+* degrees of freedom of the matrix: rank*(m+n-rank).
+* 
+* It optionally samples entries for a testing matrix using 
+* 'testSampFact', the percentage of the number of training entries 
+* to use for testing.
+*
+* This method takes the following inputs:
+*   sparkMaster    (String) The master URL.
+*   outputPath     (String) Directory to save output.
+*   m              (Int) Number of rows in data matrix.
+*   n              (Int) Number of columns in data matrix.
+*   rank           (Int) Underlying rank of data matrix.
+*   trainSampFact  (Double) Oversampling factor.
+*   noise          (Boolean) Whether to add gaussian noise to training data.
+*   sigma          (Double) Standard deviation of added gaussian noise.
+*   test           (Boolean) Whether to create testing RDD.
+*   testSampFact   (Double) Percentage of training data to use as test data.
+*/
+
+object MFDataGenerator{
+
+  def main(args: Array[String]) {
+    if (args.length < 2) {
+      println("Usage: MFDataGenerator " +
+        "<master> <outputDir> [m] [n] [rank] [trainSampFact] [noise] [sigma] [test] [testSampFact]")
+      System.exit(1)
+    }
+
+    val sparkMaster: String = args(0)
+    val outputPath: String = args(1)
+    val m: Int = if (args.length > 2) args(2).toInt else 100
+    val n: Int = if (args.length > 3) args(3).toInt else 100
+    val rank: Int = if (args.length > 4) args(4).toInt else 10
+    val trainSampFact: Double = if (args.length > 5) args(5).toDouble else 1.0
+    val noise: Boolean = if (args.length > 6) args(6).toBoolean else false
+    val sigma: Double = if (args.length > 7) args(7).toDouble else 0.1
+    val test: Boolean = if (args.length > 8) args(8).toBoolean else false
+    val testSampFact: Double = if (args.length > 9) args(9).toDouble else 0.1
+
+    val sc = new SparkContext(sparkMaster, "MFDataGenerator")
+
+    val A = DoubleMatrix.randn(m, rank)
+    val B = DoubleMatrix.randn(rank, n)
+    val z = 1 / (scala.math.sqrt(scala.math.sqrt(rank)))
+    A.mmuli(z)
+    B.mmuli(z)
+    val fullData = A.mmul(B)
+
+    val df = rank * (m + n - rank)
+    val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
+      scala.math.round(.99 * m * n)).toInt
+    val rand = new Random()
+    val mn = m * n
+    val shuffled = rand.shuffle(1 to mn toIterable)
+
+    val omega = shuffled.slice(0, sampSize)
+    val ordered = omega.sortWith(_ < _).toArray
+    val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
+      .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+
+    // optionally add gaussian noise
+    if (noise) { 
+      trainData.map(x => (x._1, x._2, x._3 + rand.nextGaussian * sigma))
+    }
+
+    trainData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
+
+    // optionally generate testing data
+    if (test) {
+      val testSampSize = scala.math
+        .min(scala.math.round(sampSize * testSampFact),scala.math.round(mn - sampSize)).toInt
+      val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
+      val testOrdered = testOmega.sortWith(_ < _).toArray
+      val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
+        .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+      testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
+    }
+        
+    sc.stop()
+  
+  }
+}
\ No newline at end of file
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index bb96ad4ae33ed0624dfd4cc593407f01c80d797f..c822f49e786c700ed92fd820eed3411017eb90f9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -178,7 +178,7 @@ object SparkBuild extends Build {
       "it.unimi.dsi" % "fastutil" % "6.4.4",
       "colt" % "colt" % "1.2.0",
       "net.liftweb" % "lift-json_2.9.2" % "2.5",
-      "org.apache.mesos" % "mesos" % "0.12.0-incubating",
+      "org.apache.mesos" % "mesos" % "0.9.0-incubating",
       "io.netty" % "netty-all" % "4.0.0.Beta2",
       "org.apache.derby" % "derby" % "10.4.2.0" % "test",
       "com.codahale.metrics" % "metrics-core" % "3.0.0",
diff --git a/python/examples/logistic_regression.py b/python/examples/logistic_regression.py
index 3ac1bae4e9ad3d9852b7e7f13dbe57414a1bf4e6..1117dea5380e764a82174e21ca1b0bfbf9b9b974 100755
--- a/python/examples/logistic_regression.py
+++ b/python/examples/logistic_regression.py
@@ -16,7 +16,8 @@
 #
 
 """
-This example requires numpy (http://www.numpy.org/)
+A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches
+of input data using efficient matrix operations.
 """
 from collections import namedtuple
 from math import exp
@@ -27,47 +28,45 @@ import numpy as np
 from pyspark import SparkContext
 
 
-N = 100000  # Number of data points
 D = 10  # Number of dimensions
-R = 0.7   # Scaling factor
-ITERATIONS = 5
-np.random.seed(42)
 
 
-DataPoint = namedtuple("DataPoint", ['x', 'y'])
-from logistic_regression import DataPoint  # So that DataPoint is properly serialized
-
-
-def generateData():
-    def generatePoint(i):
-        y = -1 if i % 2 == 0 else 1
-        x = np.random.normal(size=D) + (y * R)
-        return DataPoint(x, y)
-    return [generatePoint(i) for i in range(N)]
-
+# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
+# make further computations faster.
+# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
+# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
+def readPointBatch(iterator):
+    strs = list(iterator)
+    matrix = np.zeros((len(strs), D + 1))
+    for i in xrange(len(strs)):
+        matrix[i] = np.fromstring(strs[i].replace(',', ' '), dtype=np.float32, sep=' ')
+    return [matrix]
 
 if __name__ == "__main__":
-    if len(sys.argv) == 1:
-        print >> sys.stderr, "Usage: logistic_regression <master> [<slices>]"
+    if len(sys.argv) != 4:
+        print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
         exit(-1)
     sc = SparkContext(sys.argv[1], "PythonLR", pyFiles=[realpath(__file__)])
-    slices = int(sys.argv[2]) if len(sys.argv) > 2 else 2
-    points = sc.parallelize(generateData(), slices).cache()
+    points = sc.textFile(sys.argv[2]).mapPartitions(readPointBatch).cache()
+    iterations = int(sys.argv[3])
 
     # Initialize w to a random value
     w = 2 * np.random.ranf(size=D) - 1
     print "Initial w: " + str(w)
 
+    # Compute logistic regression gradient for a matrix of data points
+    def gradient(matrix, w):
+        Y = matrix[:,0]    # point labels (first column of input file)
+        X = matrix[:,1:]   # point coordinates
+        # For each point (x, y), compute gradient function, then sum these up
+        return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
+
     def add(x, y):
         x += y
         return x
 
-    for i in range(1, ITERATIONS + 1):
-        print "On iteration %i" % i
-
-        gradient = points.map(lambda p:
-            (1.0 / (1.0 + exp(-p.y * np.dot(w, p.x)))) * p.y * p.x
-        ).reduce(add)
-        w -= gradient
+    for i in range(iterations):
+        print "On iteration %i" % (i + 1)
+        w -= points.map(lambda m: gradient(m, w)).reduce(add)
 
     print "Final w: " + str(w)