diff --git a/core/src/main/java/spark/network/netty/FileServer.java b/core/src/main/java/spark/network/netty/FileServer.java
index dd3f12561cb655cfb14b822a0d61376550276196..dd3a557ae59522187f6e15f68d352f5e19c5c900 100644
--- a/core/src/main/java/spark/network/netty/FileServer.java
+++ b/core/src/main/java/spark/network/netty/FileServer.java
@@ -37,29 +37,33 @@ class FileServer {
         .childHandler(new FileServerChannelInitializer(pResolver));
     // Start the server.
     channelFuture = bootstrap.bind(addr);
-    this.port = addr.getPort();
+    try {
+      // Get the address we bound to.
+      InetSocketAddress boundAddress =
+        ((InetSocketAddress) channelFuture.sync().channel().localAddress());
+      this.port = boundAddress.getPort();
+    } catch (InterruptedException ie) {
+      this.port = 0;
+    }
   }
 
   /**
    * Start the file server asynchronously in a new thread.
    */
   public void start() {
-    try {
-      blockingThread = new Thread() {
-        public void run() {
-          try {
-            Channel channel = channelFuture.sync().channel();
-            channel.closeFuture().sync();
-          } catch (InterruptedException e) {
-            LOG.error("File server start got interrupted", e);
-          }
+    blockingThread = new Thread() {
+      public void run() {
+        try {
+          channelFuture.channel().closeFuture().sync();
+          LOG.info("FileServer exiting");
+        } catch (InterruptedException e) {
+          LOG.error("File server start got interrupted", e);
         }
-      };
-      blockingThread.setDaemon(true);
-      blockingThread.start();
-    } finally {
-      bootstrap.shutdown();
-    }
+        // NOTE: bootstrap is shutdown in stop()
+      }
+    };
+    blockingThread.setDaemon(true);
+    blockingThread.start();
   }
 
   public int getPort() {
@@ -67,17 +71,16 @@ class FileServer {
   }
 
   public void stop() {
-    if (blockingThread != null) {
-      blockingThread.stop();
-      blockingThread = null;
-    }
+    // Close the bound channel.
     if (channelFuture != null) {
-      channelFuture.channel().closeFuture();
+      channelFuture.channel().close();
       channelFuture = null;
     }
+    // Shutdown bootstrap.
     if (bootstrap != null) {
       bootstrap.shutdown();
       bootstrap = null;
     }
+    // TODO: Shutdown all accepted channels as well ?
   }
 }
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 2b0e697337610f8fee2670262b6274bc1834afc9..fa4bbfc76f3062f0b6384f020c0e6a4f4dbcc03b 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -10,6 +10,8 @@ import scala.collection.JavaConversions._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.mapred.FileOutputCommitter
 import org.apache.hadoop.mapred.FileOutputFormat
 import org.apache.hadoop.mapred.HadoopWriter
@@ -17,7 +19,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.OutputFormat
 
 import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil, TaskAttemptID, TaskAttemptContext}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
 
 import spark.partial.BoundedDouble
 import spark.partial.PartialResult
@@ -185,11 +187,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
    * partitioning of the resulting key-value pair RDD by passing a Partitioner.
    */
   def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+    // groupByKey shouldn't use map side combine because map side combine does not
+    // reduce the amount of data shuffled and requires all map side data be inserted
+    // into a hash table, leading to more objects in the old gen.
     def createCombiner(v: V) = ArrayBuffer(v)
     def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
     val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, mergeCombiners _, partitioner)
+      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
     bufs.asInstanceOf[RDD[(K, Seq[V])]]
   }
 
@@ -515,6 +519,16 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
   }
 
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD. Compress the result with the
+   * supplied codec.
+   */
+  def saveAsHadoopFile[F <: OutputFormat[K, V]](
+      path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
+    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+  }
+
   /**
    * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
    * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
@@ -574,6 +588,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     jobCommitter.cleanupJob(jobTaskContext)
   }
 
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
+   */
+  def saveAsHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      codec: Class[_ <: CompressionCodec]) {
+    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
+      new JobConf(self.context.hadoopConfiguration), Some(codec))
+  }
+
   /**
    * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
    * supporting the key and value types K and V in this RDD.
@@ -583,11 +611,19 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf(self.context.hadoopConfiguration)) {
+      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
+      codec: Option[Class[_ <: CompressionCodec]] = None) {
     conf.setOutputKeyClass(keyClass)
     conf.setOutputValueClass(valueClass)
     // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
     conf.set("mapred.output.format.class", outputFormatClass.getName)
+    for (c <- codec) {
+      conf.setCompressMapOutput(true)
+      conf.set("mapred.output.compress", "true")
+      conf.setMapOutputCompressorClass(c)
+      conf.set("mapred.output.compression.codec", c.getCanonicalName)
+      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+    }
     conf.setOutputCommitter(classOf[FileOutputCommitter])
     FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
     saveAsHadoopDataset(conf)
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 152f7be9bb8680e91eeb3f57d2be2911e49bee84..05ff399a7b2dbd571203acd6d9d7884d3ebf9e56 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,15 +1,13 @@
 package spark
 
-import java.net.URL
-import java.util.{Date, Random}
-import java.util.{HashMap => JHashMap}
+import java.util.Random
 
 import scala.collection.Map
 import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
 
 import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.TextOutputFormat
@@ -33,13 +31,13 @@ import spark.rdd.MapPartitionsWithIndexRDD
 import spark.rdd.PipedRDD
 import spark.rdd.SampledRDD
 import spark.rdd.ShuffledRDD
-import spark.rdd.SubtractedRDD
 import spark.rdd.UnionRDD
 import spark.rdd.ZippedRDD
 import spark.rdd.ZippedPartitionsRDD2
 import spark.rdd.ZippedPartitionsRDD3
 import spark.rdd.ZippedPartitionsRDD4
 import spark.storage.StorageLevel
+import spark.util.BoundedPriorityQueue
 
 import SparkContext._
 
@@ -142,10 +140,15 @@ abstract class RDD[T: ClassManifest](
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
   def cache(): RDD[T] = persist()
 
-  /** Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. */
-  def unpersist(): RDD[T] = {
+  /**
+   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+   *
+   * @param blocking Whether to block until all blocks are deleted.
+   * @return This RDD.
+   */
+  def unpersist(blocking: Boolean = true): RDD[T] = {
     logInfo("Removing RDD " + id + " from persistence list")
-    sc.env.blockManager.master.removeRdd(id)
+    sc.env.blockManager.master.removeRdd(id, blocking)
     sc.persistentRdds.remove(id)
     storageLevel = StorageLevel.NONE
     this
@@ -270,8 +273,8 @@ abstract class RDD[T: ClassManifest](
   def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
     var fraction = 0.0
     var total = 0
-    var multiplier = 3.0
-    var initialCount = count()
+    val multiplier = 3.0
+    val initialCount = count()
     var maxSelected = 0
 
     if (initialCount > Integer.MAX_VALUE - 1) {
@@ -739,6 +742,24 @@ abstract class RDD[T: ClassManifest](
     case _ => throw new UnsupportedOperationException("empty collection")
   }
 
+  /**
+   * Returns the top K elements from this RDD as defined by
+   * the specified implicit Ordering[T].
+   * @param num the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an array of top elements
+   */
+  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+    mapPartitions { items =>
+      val queue = new BoundedPriorityQueue[T](num)
+      queue ++= items
+      Iterator.single(queue)
+    }.reduce { (queue1, queue2) =>
+      queue1 ++= queue2
+      queue1
+    }.toArray
+  }
+
   /**
    * Save this RDD as a text file, using string representations of elements.
    */
@@ -747,6 +768,14 @@ abstract class RDD[T: ClassManifest](
       .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
   }
 
+  /**
+   * Save this RDD as a compressed text file, using string representations of elements.
+   */
+  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
+    this.map(x => (NullWritable.get(), new Text(x.toString)))
+      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
+  }
+
   /**
    * Save this RDD as a SequenceFile of serialized objects.
    */
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 518034e07bf172ac96cd515b43543415c274cbf9..2911f9036eb4a57a953763e15c304f4b921fe296 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -18,6 +18,7 @@ import org.apache.hadoop.mapred.TextOutputFormat
 import org.apache.hadoop.mapred.SequenceFileOutputFormat
 import org.apache.hadoop.mapred.OutputCommitter
 import org.apache.hadoop.mapred.FileOutputCommitter
+import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.io.BytesWritable
@@ -62,7 +63,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
    * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
    * file system.
    */
-  def saveAsSequenceFile(path: String) {
+  def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
     def anyToWritable[U <% Writable](u: U): Writable = u
 
     val keyClass = getWritableClass[K]
@@ -72,14 +73,18 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
 
     logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
     val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
+    val jobConf = new JobConf(self.context.hadoopConfiguration)
     if (!convertKey && !convertValue) {
-      self.saveAsHadoopFile(path, keyClass, valueClass, format)
+      self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
     } else if (!convertKey && convertValue) {
-      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
+        path, keyClass, valueClass, format, jobConf, codec)
     } else if (convertKey && !convertValue) {
-      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
+      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
+        path, keyClass, valueClass, format, jobConf, codec)
     } else if (convertKey && convertValue) {
-      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
+        path, keyClass, valueClass, format, jobConf, codec)
     }
   }
 }
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 84626df553a38de903b9968ead04e1e886a28c7d..ec15326014e8d1e8e7ab058c4c2c1118bfd26c79 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -4,20 +4,26 @@ import java.io._
 import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
 import java.util.{Locale, Random, UUID}
 import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+import java.util.regex.Pattern
+
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.collection.JavaConversions._
 import scala.io.Source
+
 import com.google.common.io.Files
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+
+import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
+
 import spark.serializer.SerializerInstance
 import spark.deploy.SparkHadoopUtil
-import java.util.regex.Pattern
+
 
 /**
  * Various utility methods used by Spark.
  */
 private object Utils extends Logging {
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -68,7 +74,6 @@ private object Utils extends Logging {
     return buf
   }
 
-
   private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
 
   // Register the path to be deleted via shutdown hook
@@ -87,19 +92,19 @@ private object Utils extends Logging {
     }
   }
 
-  // Note: if file is child of some registered path, while not equal to it, then return true; else false
-  // This is to ensure that two shutdown hooks do not try to delete each others paths - resulting in IOException
-  // and incomplete cleanup
+  // Note: if file is child of some registered path, while not equal to it, then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to delete each others
+  // paths - resulting in IOException and incomplete cleanup.
   def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-
     val absolutePath = file.getAbsolutePath()
-
     val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.find(path => ! absolutePath.equals(path) && absolutePath.startsWith(path) ).isDefined
+      shutdownDeletePaths.find { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }.isDefined
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
     }
-
-    if (retval) logInfo("path = " + file + ", already present as root for deletion.")
-
     retval
   }
 
@@ -131,7 +136,7 @@ private object Utils extends Logging {
         if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
       }
     })
-    return dir
+    dir
   }
 
   /** Copy all data from an InputStream to an OutputStream */
@@ -174,35 +179,30 @@ private object Utils extends Logging {
         Utils.copyStream(in, out, true)
         if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
           tempFile.delete()
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
+          throw new SparkException(
+            "File " + targetFile + " exists and does not match contents of" + " " + url)
         } else {
           Files.move(tempFile, targetFile)
         }
       case "file" | null =>
-        val sourceFile = if (uri.isAbsolute) {
-          new File(uri)
-        } else {
-          new File(url)
-        }
-        if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
-        } else {
-          // Remove the file if it already exists
-          targetFile.delete()
-          // Symlink the file locally.
-          if (uri.isAbsolute) {
-            // url is absolute, i.e. it starts with "file:///". Extract the source
-            // file's absolute path from the url.
-            val sourceFile = new File(uri)
-            logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
-            FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
+        // In the case of a local file, copy the local file to the target directory.
+        // Note the difference between uri vs url.
+        val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
+        if (targetFile.exists) {
+          // If the target file already exists, warn the user if
+          if (!Files.equal(sourceFile, targetFile)) {
+            throw new SparkException(
+              "File " + targetFile + " exists and does not match contents of" + " " + url)
           } else {
-            // url is not absolute, i.e. itself is the path to the source file.
-            logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
-            FileUtil.symLink(url, targetFile.getAbsolutePath)
+            // Do nothing if the file contents are the same, i.e. this file has been copied
+            // previously.
+            logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+              + targetFile.getAbsolutePath)
           }
+        } else {
+          // The file does not exist in the target directory. Copy it there.
+          logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
+          Files.copy(sourceFile, targetFile)
         }
       case _ =>
         // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
@@ -323,8 +323,6 @@ private object Utils extends Logging {
     InetAddress.getByName(address).getHostName
   }
 
-
-
   def localHostPort(): String = {
     val retval = System.getProperty("spark.hostPort", null)
     if (retval == null) {
@@ -382,6 +380,7 @@ private object Utils extends Logging {
   // Typically, this will be of order of number of nodes in cluster
   // If not, we should change it to LRUCache or something.
   private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
+
   def parseHostPort(hostPort: String): (String,  Int) = {
     {
       // Check cache first.
@@ -390,7 +389,8 @@ private object Utils extends Logging {
     }
 
     val indx: Int = hostPort.lastIndexOf(':')
-    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ... but then hadoop does not support ipv6 right now.
+    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
+    // but then hadoop does not support ipv6 right now.
     // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
     if (-1 == indx) {
       val retval = (hostPort, 0)
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 30084df4e20fc99e5ed00a69f76d661b8047b61d..76051597b65f065fdd8b0adb04eed0ab9654ff6c 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -6,6 +6,7 @@ import java.util.Comparator
 import scala.Tuple2
 import scala.collection.JavaConversions._
 
+import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapred.OutputFormat
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
@@ -459,6 +460,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
     rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass)
   }
 
+  /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */
+  def saveAsHadoopFile[F <: OutputFormat[_, _]](
+    path: String,
+    keyClass: Class[_],
+    valueClass: Class[_],
+    outputFormatClass: Class[F],
+    codec: Class[_ <: CompressionCodec]) {
+    rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec)
+  }
+
   /** Output the RDD to any Hadoop-supported file system. */
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
     path: String,
diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala
index eb81ed64cd62caaff652b1a99fc3733b7f9e6057..626b49945429ce688b94472b32c7a184349826b7 100644
--- a/core/src/main/scala/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDD.scala
@@ -86,7 +86,6 @@ JavaRDDLike[T, JavaRDD[T]] {
    */
   def subtract(other: JavaRDD[T], p: Partitioner): JavaRDD[T] =
     wrapRDD(rdd.subtract(other, p))
-
 }
 
 object JavaRDD {
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 9b74d1226f1d9c994a1bcf13e5e5919710fd0877..b555f2030a4f397b1441cc65009f76a6d8279869 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -1,9 +1,10 @@
 package spark.api.java
 
-import java.util.{List => JList}
+import java.util.{List => JList, Comparator}
 import scala.Tuple2
 import scala.collection.JavaConversions._
 
+import org.apache.hadoop.io.compress.CompressionCodec
 import spark.{SparkContext, Partition, RDD, TaskContext}
 import spark.api.java.JavaPairRDD._
 import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
@@ -310,6 +311,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    */
   def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
 
+
+  /**
+   * Save this RDD as a compressed text file, using string representations of elements.
+   */
+  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
+    rdd.saveAsTextFile(path, codec)
+
   /**
    * Save this RDD as a SequenceFile of serialized objects.
    */
@@ -351,4 +359,29 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
   def toDebugString(): String = {
     rdd.toDebugString
   }
+
+  /**
+   * Returns the top K elements from this RDD as defined by
+   * the specified Comparator[T].
+   * @param num the number of top elements to return
+   * @param comp the comparator that defines the order
+   * @return an array of top elements
+   */
+  def top(num: Int, comp: Comparator[T]): JList[T] = {
+    import scala.collection.JavaConversions._
+    val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp))
+    val arr: java.util.Collection[T] = topElems.toSeq
+    new java.util.ArrayList(arr)
+  }
+
+  /**
+   * Returns the top K elements from this RDD using the
+   * natural ordering for T.
+   * @param num the number of top elements to return
+   * @return an array of top elements
+   */
+  def top(num: Int): JList[T] = {
+    val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[T]]
+    top(num, comp)
+  }
 }
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 7599ba1a0224b207c2b524bdea11805c35b9a67b..8966f9f86e3f8ac5fd9494b0993ec0579af1710c 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -6,7 +6,7 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.JavaConversions
 import scala.collection.mutable.ArrayBuffer
 
-import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
 import spark.{Dependency, OneToOneDependency, ShuffleDependency}
 
 
@@ -49,12 +49,16 @@ private[spark] class CoGroupAggregator
  *
  * @param rdds parent RDDs.
  * @param part partitioner used to partition the shuffle output.
- * @param mapSideCombine flag indicating whether to merge values before shuffle step.
+ * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
+ *                       is on, Spark does an extra pass over the data on the map side to merge
+ *                       all values belonging to the same key together. This can reduce the amount
+ *                       of data shuffled if and only if the number of distinct keys is very small,
+ *                       and the ratio of key size to value size is also very small.
  */
 class CoGroupedRDD[K](
   @transient var rdds: Seq[RDD[(K, _)]],
   part: Partitioner,
-  val mapSideCombine: Boolean = true,
+  val mapSideCombine: Boolean = false,
   val serializerClass: String = null)
   extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
index a5d6285c993c407574007a92e9f03038195f85b1..f33310a34a29d44a1a7fafa2a1ca99f4845169c4 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingAlgorithm.scala
@@ -13,11 +13,11 @@ private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
   override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
     val priority1 = s1.priority
     val priority2 = s2.priority
-    var res = Math.signum(priority1 - priority2)
+    var res = math.signum(priority1 - priority2)
     if (res == 0) {
       val stageId1 = s1.stageId
       val stageId2 = s2.stageId
-      res = Math.signum(stageId1 - stageId2)
+      res = math.signum(stageId1 - stageId2)
     }
     if (res < 0) {
       return true
@@ -35,22 +35,30 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
     val runningTasks2 = s2.runningTasks
     val s1Needy = runningTasks1 < minShare1
     val s2Needy = runningTasks2 < minShare2
-    val minShareRatio1 = runningTasks1.toDouble / Math.max(minShare1, 1.0).toDouble
-    val minShareRatio2 = runningTasks2.toDouble / Math.max(minShare2, 1.0).toDouble
+    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
+    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
     val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
     val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
     var res:Boolean = true
+    var compare:Int = 0
 
     if (s1Needy && !s2Needy) {
-      res = true
+      return true
     } else if (!s1Needy && s2Needy) {
-      res = false
+      return false
     } else if (s1Needy && s2Needy) {
-      res = minShareRatio1 <= minShareRatio2
+      compare = minShareRatio1.compareTo(minShareRatio2)
+    } else {
+      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
+    }
+
+    if (compare < 0) {
+      return true
+    } else if (compare > 0) {
+      return false
     } else {
-      res = taskToWeightRatio1 <= taskToWeightRatio2
+      return s1.name < s2.name
     }
-    return res
   }
 }
 
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index 95308c728273c1f7d03908ce1729b7fb94acb4cd..1d69d658f7d8a27aea617e883279a88552d019e1 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -124,6 +124,7 @@ object BlockFetcherIterator {
     protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
       // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
       // at most maxBytesInFlight in order to limit the amount of data in flight.
+      val originalTotalBlocks = _totalBlocks
       val remoteRequests = new ArrayBuffer[FetchRequest]
       for ((address, blockInfos) <- blocksByAddress) {
         if (address == blockManagerId) {
@@ -140,8 +141,15 @@ object BlockFetcherIterator {
           var curBlocks = new ArrayBuffer[(String, Long)]
           while (iterator.hasNext) {
             val (blockId, size) = iterator.next()
-            curBlocks += ((blockId, size))
-            curRequestSize += size
+            // Skip empty blocks
+            if (size > 0) {
+              curBlocks += ((blockId, size))
+              curRequestSize += size
+            } else if (size == 0) {
+              _totalBlocks -= 1
+            } else {
+              throw new BlockException(blockId, "Negative block size " + size)
+            }
             if (curRequestSize >= minRequestSize) {
               // Add this FetchRequest
               remoteRequests += new FetchRequest(address, curBlocks)
@@ -155,6 +163,8 @@ object BlockFetcherIterator {
           }
         }
       }
+      logInfo("Getting " + _totalBlocks + " non-zero-bytes blocks out of " +
+        originalTotalBlocks + " blocks")
       remoteRequests
     }
 
@@ -278,53 +288,6 @@ object BlockFetcherIterator {
       logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
     }
 
-    override protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
-      // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
-      // at most maxBytesInFlight in order to limit the amount of data in flight.
-      val originalTotalBlocks = _totalBlocks;
-      val remoteRequests = new ArrayBuffer[FetchRequest]
-      for ((address, blockInfos) <- blocksByAddress) {
-        if (address == blockManagerId) {
-          localBlockIds ++= blockInfos.map(_._1)
-        } else {
-          remoteBlockIds ++= blockInfos.map(_._1)
-          // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
-          // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
-          // nodes, rather than blocking on reading output from one node.
-          val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
-          logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
-          val iterator = blockInfos.iterator
-          var curRequestSize = 0L
-          var curBlocks = new ArrayBuffer[(String, Long)]
-          while (iterator.hasNext) {
-            val (blockId, size) = iterator.next()
-            if (size > 0) {
-              curBlocks += ((blockId, size))
-              curRequestSize += size
-            } else if (size == 0) {
-              //here we changes the totalBlocks
-              _totalBlocks -= 1
-            } else {
-              throw new BlockException(blockId, "Negative block size " + size)
-            }
-            if (curRequestSize >= minRequestSize) {
-              // Add this FetchRequest
-              remoteRequests += new FetchRequest(address, curBlocks)
-              curRequestSize = 0
-              curBlocks = new ArrayBuffer[(String, Long)]
-            }
-          }
-          // Add in the final request
-          if (!curBlocks.isEmpty) {
-            remoteRequests += new FetchRequest(address, curBlocks)
-          }
-        }
-      }
-      logInfo("Getting " + _totalBlocks + " non-zero-bytes blocks out of " +
-        originalTotalBlocks + " blocks")
-      remoteRequests
-    }
-
     private var copiers: List[_ <: Thread] = null
 
     override def initialize() {
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index d35c43f194bcd914a2c19d2a123bb58f2d4835e0..9b39d3aadfd6d890add71250b00fb59ebc2d0de7 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -3,8 +3,7 @@ package spark.storage
 import java.io.{InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
 
-import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet, Queue}
-import scala.collection.JavaConversions._
+import scala.collection.mutable.{HashMap, ArrayBuffer, HashSet}
 
 import akka.actor.{ActorSystem, Cancellable, Props}
 import akka.dispatch.{Await, Future}
@@ -15,7 +14,7 @@ import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
 
 import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
 
-import spark.{Logging, SizeEstimator, SparkEnv, SparkException, Utils}
+import spark.{Logging, SparkEnv, SparkException, Utils}
 import spark.network._
 import spark.serializer.Serializer
 import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -95,9 +94,11 @@ private[spark] class BlockManager(
     new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
 
   // If we use Netty for shuffle, start a new Netty-based shuffle sender service.
-  private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
-  private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
-  private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+  private val nettyPort: Int = {
+    val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
+    val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
+    if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
+  }
 
   val connectionManager = new ConnectionManager(0)
   implicit val futureExecContext = connectionManager.futureExecContext
@@ -824,10 +825,24 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Remove all blocks belonging to the given RDD.
+   * @return The number of blocks removed.
+   */
+  def removeRdd(rddId: Int): Int = {
+    // TODO: Instead of doing a linear scan on the blockInfo map, create another map that maps
+    // from RDD.id to blocks.
+    logInfo("Removing RDD " + rddId)
+    val rddPrefix = "rdd_" + rddId + "_"
+    val blocksToRemove = blockInfo.filter(_._1.startsWith(rddPrefix)).map(_._1)
+    blocksToRemove.foreach(blockId => removeBlock(blockId, false))
+    blocksToRemove.size
+  }
+
   /**
    * Remove a block from both memory and disk.
    */
-  def removeBlock(blockId: String) {
+  def removeBlock(blockId: String, tellMaster: Boolean = true) {
     logInfo("Removing block " + blockId)
     val info = blockInfo.get(blockId).orNull
     if (info != null) info.synchronized {
@@ -839,7 +854,7 @@ private[spark] class BlockManager(
           "the disk or memory store")
       }
       blockInfo.remove(blockId)
-      if (info.tellMaster) {
+      if (tellMaster && info.tellMaster) {
         reportBlockStatus(blockId, info)
       }
     } else {
@@ -950,7 +965,7 @@ private[spark] object BlockManager extends Logging {
   }
 
   def getHeartBeatFrequencyFromSystemProperties: Long =
-    System.getProperty("spark.storage.blockManagerHeartBeatMs", "5000").toLong
+    System.getProperty("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4
 
   def getDisableHeartBeatsForTesting: Boolean =
     System.getProperty("spark.test.disableBlockManagerHeartBeat", "false").toBoolean
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index ac26c168670456d7631651c6f1676f26c5805145..58888b1ebb55b7176a214846371eb4030d498368 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -1,19 +1,11 @@
 package spark.storage
 
-import java.io._
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.dispatch.Await
+import akka.actor.ActorRef
+import akka.dispatch.{Await, Future}
 import akka.pattern.ask
-import akka.util.{Duration, Timeout}
-import akka.util.duration._
+import akka.util.Duration
 
-import spark.{Logging, SparkException, Utils}
+import spark.{Logging, SparkException}
 
 
 private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
@@ -91,15 +83,13 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
   /**
    * Remove all blocks belonging to the given RDD.
    */
-  def removeRdd(rddId: Int) {
-    val rddBlockPrefix = "rdd_" + rddId + "_"
-    // Get the list of blocks in block manager, and remove ones that are part of this RDD.
-    // The runtime complexity is linear to the number of blocks persisted in the cluster.
-    // It could be expensive if the cluster is large and has a lot of blocks persisted.
-    getStorageStatus.flatMap(_.blocks).foreach { case(blockId, status) =>
-      if (blockId.startsWith(rddBlockPrefix)) {
-        removeBlock(blockId)
-      }
+  def removeRdd(rddId: Int, blocking: Boolean) {
+    val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
+    future onFailure {
+      case e: Throwable => logError("Failed to remove RDD " + rddId, e)
+    }
+    if (blocking) {
+      Await.result(future, timeout)
     }
   }
 
@@ -114,7 +104,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
   }
 
   def getStorageStatus: Array[StorageStatus] = {
-    askDriverWithReply[ArrayBuffer[StorageStatus]](GetStorageStatus).toArray
+    askDriverWithReply[Array[StorageStatus]](GetStorageStatus)
   }
 
   /** Stop the driver actor, called only on the Spark driver node */
@@ -151,7 +141,7 @@ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Loggi
         val future = driverActor.ask(message)(timeout)
         val result = Await.result(future, timeout)
         if (result == null) {
-          throw new Exception("BlockManagerMaster returned null")
+          throw new SparkException("BlockManagerMaster returned null")
         }
         return result.asInstanceOf[T]
       } catch {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 9b64f95df80731cd8188edb5a2a159410ccda153..0d4384ba1f1fecc350f452784c5bbc7c540558ac 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -2,15 +2,16 @@ package spark.storage
 
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable
 import scala.collection.JavaConversions._
-import scala.util.Random
 
 import akka.actor.{Actor, ActorRef, Cancellable}
-import akka.util.{Duration, Timeout}
+import akka.dispatch.Future
+import akka.pattern.ask
+import akka.util.Duration
 import akka.util.duration._
 
-import spark.{Logging, Utils}
+import spark.{Logging, Utils, SparkException}
 
 /**
  * BlockManagerMasterActor is an actor on the master node to track statuses of
@@ -21,13 +22,16 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
 
   // Mapping from block manager id to the block manager's information.
   private val blockManagerInfo =
-    new HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
+    new mutable.HashMap[BlockManagerId, BlockManagerMasterActor.BlockManagerInfo]
 
   // Mapping from executor ID to block manager ID.
-  private val blockManagerIdByExecutor = new HashMap[String, BlockManagerId]
+  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]
 
   // Mapping from block id to the set of block managers that have the block.
-  private val blockLocations = new JHashMap[String, Pair[Int, HashSet[BlockManagerId]]]
+  private val blockLocations = new JHashMap[String, mutable.HashSet[BlockManagerId]]
+
+  val akkaTimeout = Duration.create(
+    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
 
   initLogging()
 
@@ -35,7 +39,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     "" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
 
   val checkTimeoutInterval = System.getProperty("spark.storage.blockManagerTimeoutIntervalMs",
-    "5000").toLong
+    "60000").toLong
 
   var timeoutCheckingTask: Cancellable = null
 
@@ -50,28 +54,34 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
   def receive = {
     case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
       register(blockManagerId, maxMemSize, slaveActor)
+      sender ! true
 
     case UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
+      // TODO: Ideally we want to handle all the message replies in receive instead of in the
+      // individual private methods.
       updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size)
 
     case GetLocations(blockId) =>
-      getLocations(blockId)
+      sender ! getLocations(blockId)
 
     case GetLocationsMultipleBlockIds(blockIds) =>
-      getLocationsMultipleBlockIds(blockIds)
+      sender ! getLocationsMultipleBlockIds(blockIds)
 
     case GetPeers(blockManagerId, size) =>
-      getPeersDeterministic(blockManagerId, size)
-      /*getPeers(blockManagerId, size)*/
+      sender ! getPeers(blockManagerId, size)
 
     case GetMemoryStatus =>
-      getMemoryStatus
+      sender ! memoryStatus
 
     case GetStorageStatus =>
-      getStorageStatus
+      sender ! storageStatus
+
+    case RemoveRdd(rddId) =>
+      sender ! removeRdd(rddId)
 
     case RemoveBlock(blockId) =>
-      removeBlock(blockId)
+      removeBlockFromWorkers(blockId)
+      sender ! true
 
     case RemoveExecutor(execId) =>
       removeExecutor(execId)
@@ -81,7 +91,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
       logInfo("Stopping BlockManagerMaster")
       sender ! true
       if (timeoutCheckingTask != null) {
-        timeoutCheckingTask.cancel
+        timeoutCheckingTask.cancel()
       }
       context.stop(self)
 
@@ -89,13 +99,36 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
       expireDeadHosts()
 
     case HeartBeat(blockManagerId) =>
-      heartBeat(blockManagerId)
+      sender ! heartBeat(blockManagerId)
 
     case other =>
-      logInfo("Got unknown message: " + other)
+      logWarning("Got unknown message: " + other)
+  }
+
+  private def removeRdd(rddId: Int): Future[Seq[Int]] = {
+    // First remove the metadata for the given RDD, and then asynchronously remove the blocks
+    // from the slaves.
+
+    val prefix = "rdd_" + rddId + "_"
+    // Find all blocks for the given RDD, remove the block from both blockLocations and
+    // the blockManagerInfo that is tracking the blocks.
+    val blocks = blockLocations.keySet().filter(_.startsWith(prefix))
+    blocks.foreach { blockId =>
+      val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId)
+      bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+      blockLocations.remove(blockId)
+    }
+
+    // Ask the slaves to remove the RDD, and put the result in a sequence of Futures.
+    // The dispatcher is used as an implicit argument into the Future sequence construction.
+    import context.dispatcher
+    val removeMsg = RemoveRdd(rddId)
+    Future.sequence(blockManagerInfo.values.map { bm =>
+      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
+    }.toSeq)
   }
 
-  def removeBlockManager(blockManagerId: BlockManagerId) {
+  private def removeBlockManager(blockManagerId: BlockManagerId) {
     val info = blockManagerInfo(blockManagerId)
 
     // Remove the block manager from blockManagerIdByExecutor.
@@ -106,7 +139,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     val iterator = info.blocks.keySet.iterator
     while (iterator.hasNext) {
       val blockId = iterator.next
-      val locations = blockLocations.get(blockId)._2
+      val locations = blockLocations.get(blockId)
       locations -= blockManagerId
       if (locations.size == 0) {
         blockLocations.remove(locations)
@@ -114,11 +147,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     }
   }
 
-  def expireDeadHosts() {
+  private def expireDeadHosts() {
     logTrace("Checking for hosts with no recent heart beats in BlockManagerMaster.")
     val now = System.currentTimeMillis()
     val minSeenTime = now - slaveTimeout
-    val toRemove = new HashSet[BlockManagerId]
+    val toRemove = new mutable.HashSet[BlockManagerId]
     for (info <- blockManagerInfo.values) {
       if (info.lastSeenMs < minSeenTime) {
         logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
@@ -129,31 +162,26 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     toRemove.foreach(removeBlockManager)
   }
 
-  def removeExecutor(execId: String) {
+  private def removeExecutor(execId: String) {
     logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.")
     blockManagerIdByExecutor.get(execId).foreach(removeBlockManager)
-    sender ! true
   }
 
-  def heartBeat(blockManagerId: BlockManagerId) {
+  private def heartBeat(blockManagerId: BlockManagerId): Boolean = {
     if (!blockManagerInfo.contains(blockManagerId)) {
-      if (blockManagerId.executorId == "<driver>" && !isLocal) {
-        sender ! true
-      } else {
-        sender ! false
-      }
+      blockManagerId.executorId == "<driver>" && !isLocal
     } else {
       blockManagerInfo(blockManagerId).updateLastSeenMs()
-      sender ! true
+      true
     }
   }
 
   // Remove a block from the slaves that have it. This can only be used to remove
   // blocks that the master knows about.
-  private def removeBlock(blockId: String) {
-    val block = blockLocations.get(blockId)
-    if (block != null) {
-      block._2.foreach { blockManagerId: BlockManagerId =>
+  private def removeBlockFromWorkers(blockId: String) {
+    val locations = blockLocations.get(blockId)
+    if (locations != null) {
+      locations.foreach { blockManagerId: BlockManagerId =>
         val blockManager = blockManagerInfo.get(blockManagerId)
         if (blockManager.isDefined) {
           // Remove the block from the slave's BlockManager.
@@ -163,23 +191,20 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
         }
       }
     }
-    sender ! true
   }
 
   // Return a map from the block manager id to max memory and remaining memory.
-  private def getMemoryStatus() {
-    val res = blockManagerInfo.map { case(blockManagerId, info) =>
+  private def memoryStatus: Map[BlockManagerId, (Long, Long)] = {
+    blockManagerInfo.map { case(blockManagerId, info) =>
       (blockManagerId, (info.maxMem, info.remainingMem))
     }.toMap
-    sender ! res
   }
 
-  private def getStorageStatus() {
-    val res = blockManagerInfo.map { case(blockManagerId, info) =>
+  private def storageStatus: Array[StorageStatus] = {
+    blockManagerInfo.map { case(blockManagerId, info) =>
       import collection.JavaConverters._
       StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala.toMap)
-    }
-    sender ! res
+    }.toArray
   }
 
   private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
@@ -188,7 +213,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     } else if (!blockManagerInfo.contains(id)) {
       blockManagerIdByExecutor.get(id.executorId) match {
         case Some(manager) =>
-          // A block manager of the same host name already exists
+          // A block manager of the same executor already exists.
+          // This should never happen. Let's just quit.
           logError("Got two different block manager registrations on " + id.executorId)
           System.exit(1)
         case None =>
@@ -197,7 +223,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
       blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo(
         id, System.currentTimeMillis(), maxMemSize, slaveActor)
     }
-    sender ! true
   }
 
   private def updateBlockInfo(
@@ -226,12 +251,12 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
 
     blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
 
-    var locations: HashSet[BlockManagerId] = null
+    var locations: mutable.HashSet[BlockManagerId] = null
     if (blockLocations.containsKey(blockId)) {
-      locations = blockLocations.get(blockId)._2
+      locations = blockLocations.get(blockId)
     } else {
-      locations = new HashSet[BlockManagerId]
-      blockLocations.put(blockId, (storageLevel.replication, locations))
+      locations = new mutable.HashSet[BlockManagerId]
+      blockLocations.put(blockId, locations)
     }
 
     if (storageLevel.isValid) {
@@ -247,70 +272,24 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
     sender ! true
   }
 
-  private def getLocations(blockId: String) {
-    val startTimeMs = System.currentTimeMillis()
-    val tmp = " " + blockId + " "
-    if (blockLocations.containsKey(blockId)) {
-      var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-      res.appendAll(blockLocations.get(blockId)._2)
-      sender ! res.toSeq
-    } else {
-      var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-      sender ! res
-    }
-  }
-
-  private def getLocationsMultipleBlockIds(blockIds: Array[String]) {
-    def getLocations(blockId: String): Seq[BlockManagerId] = {
-      val tmp = blockId
-      if (blockLocations.containsKey(blockId)) {
-        var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-        res.appendAll(blockLocations.get(blockId)._2)
-        return res.toSeq
-      } else {
-        var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-        return res.toSeq
-      }
-    }
-
-    var res: ArrayBuffer[Seq[BlockManagerId]] = new ArrayBuffer[Seq[BlockManagerId]]
-    for (blockId <- blockIds) {
-      res.append(getLocations(blockId))
-    }
-    sender ! res.toSeq
+  private def getLocations(blockId: String): Seq[BlockManagerId] = {
+    if (blockLocations.containsKey(blockId)) blockLocations.get(blockId).toSeq else Seq.empty
   }
 
-  private def getPeers(blockManagerId: BlockManagerId, size: Int) {
-    var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-    var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
-    res.appendAll(peers)
-    res -= blockManagerId
-    val rand = new Random(System.currentTimeMillis())
-    while (res.length > size) {
-      res.remove(rand.nextInt(res.length))
-    }
-    sender ! res.toSeq
+  private def getLocationsMultipleBlockIds(blockIds: Array[String]): Seq[Seq[BlockManagerId]] = {
+    blockIds.map(blockId => getLocations(blockId))
   }
 
-  private def getPeersDeterministic(blockManagerId: BlockManagerId, size: Int) {
-    var peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
-    var res: ArrayBuffer[BlockManagerId] = new ArrayBuffer[BlockManagerId]
+  private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
+    val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
 
     val selfIndex = peers.indexOf(blockManagerId)
     if (selfIndex == -1) {
-      throw new Exception("Self index for " + blockManagerId + " not found")
+      throw new SparkException("Self index for " + blockManagerId + " not found")
     }
 
     // Note that this logic will select the same node multiple times if there aren't enough peers
-    var index = selfIndex
-    while (res.size < size) {
-      index += 1
-      if (index == selfIndex) {
-        throw new Exception("More peer expected than available")
-      }
-      res += peers(index % peers.size)
-    }
-    sender ! res.toSeq
+    Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
   }
 }
 
@@ -384,6 +363,13 @@ object BlockManagerMasterActor {
       }
     }
 
+    def removeBlock(blockId: String) {
+      if (_blocks.containsKey(blockId)) {
+        _remainingMem += _blocks.get(blockId).memSize
+        _blocks.remove(blockId)
+      }
+    }
+
     def remainingMem: Long = _remainingMem
 
     def lastSeenMs: Long = _lastSeenMs
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index cff48d9909a161047a11cffa5056aee6ecefd751..0010726c8d32ecc2046b25cef2bdc729b4577c95 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -16,6 +16,9 @@ sealed trait ToBlockManagerSlave
 private[spark]
 case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
 
+// Remove all blocks belonging to a specific RDD.
+private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+
 
 //////////////////////////////////////////////////////////////////////////////////
 // Messages from slaves to the master.
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
index f570cdc52dd1b2347b8cca8eab62af80b318815e..b264d1deb59b9b7b4341a30be7736228610e397c 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -11,6 +11,12 @@ import spark.{Logging, SparkException, Utils}
  */
 class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
   override def receive = {
-    case RemoveBlock(blockId) => blockManager.removeBlock(blockId)
+
+    case RemoveBlock(blockId) =>
+      blockManager.removeBlock(blockId)
+
+    case RemoveRdd(rddId) =>
+      val numBlocksRemoved = blockManager.removeRdd(rddId)
+      sender ! numBlocksRemoved
   }
 }
diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
index 15225f93a626d05759a8fd94e4f2089a27899176..3057ade23395ea0ace8b17f347d0c7376f1cd3df 100644
--- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala
@@ -2,13 +2,7 @@ package spark.storage
 
 import java.nio.ByteBuffer
 
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.Random
-
-import spark.{Logging, Utils, SparkEnv}
+import spark.{Logging, Utils}
 import spark.network._
 
 /**
@@ -88,8 +82,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
 
 private[spark] object BlockManagerWorker extends Logging {
   private var blockManagerWorker: BlockManagerWorker = null
-  private val DATA_TRANSFER_TIME_OUT_MS: Long = 500
-  private val REQUEST_RETRY_INTERVAL_MS: Long = 1000
 
   initLogging()
 
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 57d4dafefc56a10728e4e62a86b3a41460ddad01..c7281200e7e0086660e9cfeb65d288ce28f7875b 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -59,6 +59,8 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     // Flush the partial writes, and set valid length to be the length of the entire file.
     // Return the number of bytes written for this commit.
     override def commit(): Long = {
+      // NOTE: Flush the serializer first and then the compressed/buffered output stream
+      objOut.flush()
       bs.flush()
       val prevPos = lastValidPosition
       lastValidPosition = channel.position()
@@ -68,6 +70,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
     override def revertPartialWrites() {
       // Discard current writes. We do this by flushing the outstanding writes and
       // truncate the file to the last valid position.
+      objOut.flush()
       bs.flush()
       channel.truncate(lastValidPosition)
     }
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index 8f52168c241b3893bb6f6b68b42c6e0419cfece3..950c0cdf352f736b157a7a05d71563b95371f377 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -55,21 +55,21 @@ object StorageUtils {
     }.mapValues(_.values.toArray)
 
     // For each RDD, generate an RDDInfo object
-    val rddInfos = groupedRddBlocks.map { case(rddKey, rddBlocks) =>
-
+    val rddInfos = groupedRddBlocks.map { case (rddKey, rddBlocks) =>
       // Add up memory and disk sizes
       val memSize = rddBlocks.map(_.memSize).reduce(_ + _)
       val diskSize = rddBlocks.map(_.diskSize).reduce(_ + _)
 
       // Find the id of the RDD, e.g. rdd_1 => 1
       val rddId = rddKey.split("_").last.toInt
-      // Get the friendly name for the rdd, if available.
-      val rdd = sc.persistentRdds(rddId)
-      val rddName = Option(rdd.name).getOrElse(rddKey)
-      val rddStorageLevel = rdd.getStorageLevel
 
-      RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, rdd.partitions.size, memSize, diskSize)
-    }.toArray
+      // Get the friendly name and storage level for the RDD, if available
+      sc.persistentRdds.get(rddId).map { r =>
+        val rddName = Option(r.name).getOrElse(rddKey)
+        val rddStorageLevel = r.getStorageLevel
+        RDDInfo(rddId, rddName, rddStorageLevel, rddBlocks.length, r.partitions.size, memSize, diskSize)
+      }
+    }.flatten.toArray
 
     scala.util.Sorting.quickSort(rddInfos)
 
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index cd79bd2bdad0ceefd7d8a89c846b77448bd11204..e93cc3b4850668fa47ade3a839023b9ca172e329 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -29,7 +29,7 @@ private[spark] object AkkaUtils {
   def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
     val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
     val akkaBatchSize = System.getProperty("spark.akka.batchSize", "15").toInt
-    val akkaTimeout = System.getProperty("spark.akka.timeout", "20").toInt
+    val akkaTimeout = System.getProperty("spark.akka.timeout", "60").toInt
     val akkaFrameSize = System.getProperty("spark.akka.frameSize", "10").toInt
     val lifecycleEvents = if (System.getProperty("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off"
     // 10 seconds is the default akka timeout, but in a cluster, we need higher by default.
diff --git a/core/src/main/scala/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4bc5db8bb7960c9c10842549809c31d1524aa5c3
--- /dev/null
+++ b/core/src/main/scala/spark/util/BoundedPriorityQueue.scala
@@ -0,0 +1,45 @@
+package spark.util
+
+import java.io.Serializable
+import java.util.{PriorityQueue => JPriorityQueue}
+import scala.collection.generic.Growable
+import scala.collection.JavaConverters._
+
+/**
+ * Bounded priority queue. This class wraps the original PriorityQueue
+ * class and modifies it such that only the top K elements are retained.
+ * The top K elements are defined by an implicit Ordering[A].
+ */
+class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Ordering[A])
+  extends Iterable[A] with Growable[A] with Serializable {
+
+  private val underlying = new JPriorityQueue[A](maxSize, ord)
+
+  override def iterator: Iterator[A] = underlying.iterator.asScala
+
+  override def ++=(xs: TraversableOnce[A]): this.type = {
+    xs.foreach { this += _ }
+    this
+  }
+
+  override def +=(elem: A): this.type = {
+    if (size < maxSize) underlying.offer(elem)
+    else maybeReplaceLowest(elem)
+    this
+  }
+
+  override def +=(elem1: A, elem2: A, elems: A*): this.type = {
+    this += elem1 += elem2 ++= elems
+  }
+
+  override def clear() { underlying.clear() }
+
+  private def maybeReplaceLowest(a: A): Boolean = {
+    val head = underlying.peek()
+    if (head != null && ord.gt(a, head)) {
+      underlying.poll()
+      underlying.offer(a)
+    } else false
+  }
+}
+
diff --git a/core/src/test/scala/spark/FileSuite.scala b/core/src/test/scala/spark/FileSuite.scala
index 91b48c745659b2f1a7fa3136da036c4acd731ce9..e61ff7793d20cd343839f1774fa0079ca3c515a6 100644
--- a/core/src/test/scala/spark/FileSuite.scala
+++ b/core/src/test/scala/spark/FileSuite.scala
@@ -7,6 +7,8 @@ import scala.io.Source
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 import org.apache.hadoop.io._
+import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodec, GzipCodec}
+
 
 import SparkContext._
 
@@ -26,6 +28,28 @@ class FileSuite extends FunSuite with LocalSparkContext {
     assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
   }
 
+  test("text files (compressed)") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+    val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+    val codec = new DefaultCodec()
+
+    val data = sc.parallelize("a" * 10000, 1)
+    data.saveAsTextFile(normalDir)
+    data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec])
+
+    val normalFile = new File(normalDir, "part-00000")
+    val normalContent = sc.textFile(normalDir).collect
+    assert(normalContent === Array.fill(10000)("a"))
+
+    val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+    val compressedContent = sc.textFile(compressedOutputDir).collect
+    assert(compressedContent === Array.fill(10000)("a"))
+
+    assert(compressedFile.length < normalFile.length)
+  }
+
   test("SequenceFiles") {
     sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
@@ -37,6 +61,28 @@ class FileSuite extends FunSuite with LocalSparkContext {
     assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
   }
 
+  test("SequenceFile (compressed)") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+    val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+    val codec = new DefaultCodec()
+
+    val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
+    data.saveAsSequenceFile(normalDir)
+    data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
+
+    val normalFile = new File(normalDir, "part-00000")
+    val normalContent = sc.sequenceFile[String, String](normalDir).collect
+    assert(normalContent === Array.fill(100)("abc", "abc"))
+
+    val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+    val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
+    assert(compressedContent === Array.fill(100)("abc", "abc"))
+
+    assert(compressedFile.length < normalFile.length)
+  }
+
   test("SequenceFile with writable key") {
     sc = new SparkContext("local", "test")
     val tempDir = Files.createTempDir()
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 93bb69b41c7907cf3645e3b310d5200b1d9a3808..d306124fca47deb9fad9eaa1b6411c23999b234f 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -8,6 +8,7 @@ import java.util.*;
 import scala.Tuple2;
 
 import com.google.common.base.Charsets;
+import org.apache.hadoop.io.compress.DefaultCodec;
 import com.google.common.io.Files;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
@@ -473,6 +474,19 @@ public class JavaAPISuite implements Serializable {
     Assert.assertEquals(expected, readRDD.collect());
   }
 
+  @Test
+  public void textFilesCompressed() throws IOException {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output").getAbsolutePath();
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
+    rdd.saveAsTextFile(outputDir, DefaultCodec.class);
+
+    // Try reading it in as a text file RDD
+    List<String> expected = Arrays.asList("1", "2", "3", "4");
+    JavaRDD<String> readRDD = sc.textFile(outputDir);
+    Assert.assertEquals(expected, readRDD.collect());
+  }
+
   @Test
   public void sequenceFile() {
     File tempDir = Files.createTempDir();
@@ -619,6 +633,37 @@ public class JavaAPISuite implements Serializable {
     }).collect().toString());
   }
 
+  @Test
+  public void hadoopFileCompressed() {
+    File tempDir = Files.createTempDir();
+    String outputDir = new File(tempDir, "output_compressed").getAbsolutePath();
+    List<Tuple2<Integer, String>> pairs = Arrays.asList(
+        new Tuple2<Integer, String>(1, "a"),
+        new Tuple2<Integer, String>(2, "aa"),
+        new Tuple2<Integer, String>(3, "aaa")
+    );
+    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
+
+    rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
+      @Override
+      public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
+        return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
+      }
+    }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class,
+        DefaultCodec.class);
+
+    JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
+        SequenceFileInputFormat.class, IntWritable.class, Text.class);
+
+    Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
+        String>() {
+      @Override
+      public String call(Tuple2<IntWritable, Text> x) {
+        return x.toString();
+      }
+    }).collect().toString());
+  }
+
   @Test
   public void zip() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index a761dd77c5118f6ae2a37e2a3cd5d55a3fbb2ffe..67f3332d444d83b68bdee5c3e0159d9a6e3edf8d 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -106,9 +106,9 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     sc = new SparkContext("local", "test")
     val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
     rdd.count
-    assert(sc.persistentRdds.isEmpty == false)
+    assert(sc.persistentRdds.isEmpty === false)
     rdd.unpersist()
-    assert(sc.persistentRdds.isEmpty == true)
+    assert(sc.persistentRdds.isEmpty === true)
 
     failAfter(Span(3000, Millis)) {
       try {
@@ -116,12 +116,12 @@ class RDDSuite extends FunSuite with LocalSparkContext {
           Thread.sleep(200)
         }
       } catch {
-        case e: Exception =>
+        case _ => { Thread.sleep(10) }
           // Do nothing. We might see exceptions because block manager
           // is racing this thread to remove entries from the driver.
       }
     }
-    assert(sc.getRDDStorageInfo.isEmpty == true)
+    assert(sc.getRDDStorageInfo.isEmpty === true)
   }
 
   test("caching with failures") {
@@ -317,4 +317,23 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     assert(sample.size === checkSample.size)
     for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
   }
+
+  test("top with predefined ordering") {
+    sc = new SparkContext("local", "test")
+    val nums = Array.range(1, 100000)
+    val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
+    val topK = ints.top(5)
+    assert(topK.size === 5)
+    assert(topK.sorted === nums.sorted.takeRight(5))
+  }
+
+  test("top with custom ordering") {
+    sc = new SparkContext("local", "test")
+    val words = Vector("a", "b", "c", "d")
+    implicit val ord = implicitly[Ordering[String]].reverse
+    val rdd = sc.makeRDD(words, 2)
+    val topK = rdd.top(2)
+    assert(topK.size === 2)
+    assert(topK.sorted === Array("b", "a"))
+  }
 }
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index fdee7ca38460554610b6d121c2d93fa22b3dca50..b967016cf726791b543781a9f42cf8c9607aab71 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -305,9 +305,32 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     assert(c.partitioner.get === p)
   }
 
+  test("shuffle non-zero block size") {
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    val NUM_BLOCKS = 3
+
+    val a = sc.parallelize(1 to 10, 2)
+    val b = a.map { x =>
+      (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+    }
+    // If the Kryo serializer is not used correctly, the shuffle would fail because the
+    // default Java serializer cannot handle the non serializable class.
+    val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
+      classOf[spark.KryoSerializer].getName)
+    val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+
+    assert(c.count === 10)
+
+    // All blocks must have non-zero size
+    (0 until NUM_BLOCKS).foreach { id =>
+      val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
+      assert(statuses.forall(s => s._2 > 0))
+    }
+  }
+
   test("shuffle serializer") {
     // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[1,2,512]", "test")
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
     val a = sc.parallelize(1 to 10, 2)
     val b = a.map { x =>
       (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
@@ -317,6 +340,33 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
     val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
     assert(c.count === 10)
   }
+
+  test("zero sized blocks") {
+    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+
+    // 10 partitions from 4 keys
+    val NUM_BLOCKS = 10
+    val a = sc.parallelize(1 to 4, NUM_BLOCKS)
+    val b = a.map(x => (x, x*2))
+
+    // NOTE: The default Java serializer doesn't create zero-sized blocks.
+    //       So, use Kryo
+    val c = new ShuffledRDD(b, new HashPartitioner(10), classOf[spark.KryoSerializer].getName)
+
+    val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+    assert(c.count === 4)
+
+    val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
+      val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
+      statuses.map(x => x._2)
+    }
+    val nonEmptyBlocks = blockSizes.filter(x => x > 0)
+
+    // We should have at most 4 non-zero sized partitions
+    assert(nonEmptyBlocks.size <= 4)
+  }
+
 }
 
 object ShuffleSuite {
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
index a39418b71618fb132f55c45d4d8d3fb8143f5abd..c861597c6b0eefe21769237d1b711040c3bfc736 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
@@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
   }
 }
 
-class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
+class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
 
   def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
     new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
@@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
 
   def resourceOffer(rootPool: Pool): Int = {
     val taskSetQueue = rootPool.getSortedTaskSetQueue()
-    for (taskSet <- taskSetQueue)
-    {
+    /* Just for Test*/
+    for (manager <- taskSetQueue) {
+       logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+    }
+    for (taskSet <- taskSetQueue) {
       taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
         case Some(task) =>
           return taskSet.stageId
diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
index bff2475686a477ae492c22d1396fccca8e510540..b9d5f9668e654d929e5e51a849897b9a8df09080 100644
--- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala
@@ -15,10 +15,10 @@ import org.scalatest.time.SpanSugar._
 import spark.JavaSerializer
 import spark.KryoSerializer
 import spark.SizeEstimator
-import spark.Utils
 import spark.util.AkkaUtils
 import spark.util.ByteBufferInputStream
 
+
 class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
   var store: BlockManager = null
   var store2: BlockManager = null
@@ -124,7 +124,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, false)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory
     assert(store.getSingle("a1") != None, "a1 was not in store")
@@ -170,7 +170,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
     store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
-    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, false)
+    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
 
     // Checking whether blocks are in memory and memory size
     val memStatus = master.getMemoryStatus.head._2
@@ -218,7 +218,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
     store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
     store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
-    master.removeRdd(0)
+    master.removeRdd(0, blocking = false)
 
     eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
       store.getSingle("rdd_0_0") should be (None)
@@ -232,6 +232,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
       store.getSingle("nonrddblock") should not be (None)
       master.getLocations("nonrddblock") should have size (1)
     }
+
+    store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+    master.removeRdd(0, blocking = true)
+    store.getSingle("rdd_0_0") should be (None)
+    master.getLocations("rdd_0_0") should have size 0
+    store.getSingle("rdd_0_1") should be (None)
+    master.getLocations("rdd_0_1") should have size 0
   }
 
   test("reregistration on heart beat") {
@@ -262,7 +270,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     master.removeExecutor(store.blockManagerId.executorId)
     assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
 
-    store.putSingle("a2", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
     store.waitForAsyncReregister()
 
     assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@@ -280,7 +288,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
       master.removeExecutor(store.blockManagerId.executorId)
       val t1 = new Thread {
         override def run() {
-          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, true)
+          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
         }
       }
       val t2 = new Thread {
@@ -490,9 +498,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     val list1 = List(new Array[Byte](200), new Array[Byte](200))
     val list2 = List(new Array[Byte](200), new Array[Byte](200))
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, true)
-    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, true)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list2") != None, "list2 was not in store")
     assert(store.get("list2").get.size == 2)
     assert(store.get("list3") != None, "list3 was not in store")
@@ -501,7 +509,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.get("list2") != None, "list2 was not in store")
     assert(store.get("list2").get.size == 2)
     // At this point list2 was gotten last, so LRU will getSingle rid of list3
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, true)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
     assert(store.get("list1") != None, "list1 was not in store")
     assert(store.get("list1").get.size == 2)
     assert(store.get("list2") != None, "list2 was not in store")
@@ -516,9 +524,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     val list3 = List(new Array[Byte](200), new Array[Byte](200))
     val list4 = List(new Array[Byte](200), new Array[Byte](200))
     // First store list1 and list2, both in memory, and list3, on disk only
-    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, true)
-    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, true)
-    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, true)
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
     // At this point LRU should not kick in because list3 is only on disk
     assert(store.get("list1") != None, "list2 was not in store")
     assert(store.get("list1").get.size === 2)
@@ -533,7 +541,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
     assert(store.get("list3") != None, "list1 was not in store")
     assert(store.get("list3").get.size === 2)
     // Now let's add in list4, which uses both disk and memory; list1 should drop out
-    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, true)
+    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
     assert(store.get("list1") === None, "list1 was in store")
     assert(store.get("list2") != None, "list3 was not in store")
     assert(store.get("list2").get.size === 2)
diff --git a/pom.xml b/pom.xml
index 6ee64d07c26ec14072d25c73142ccdb3668f46e2..c893ec755eb4c2055fa184e4939993b6ba949d27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,9 @@
     <slf4j.version>1.6.1</slf4j.version>
     <cdh.version>4.1.2</cdh.version>
     <log4j.version>1.2.17</log4j.version>
+
+    <PermGen>64m</PermGen>
+    <MaxPermGen>512m</MaxPermGen>
   </properties>
 
   <repositories>
@@ -392,6 +395,8 @@
             <jvmArgs>
               <jvmArg>-Xms64m</jvmArg>
               <jvmArg>-Xmx1024m</jvmArg>
+              <jvmArg>-XX:PermSize=${PermGen}</jvmArg>
+              <jvmArg>-XX:MaxPermSize=${MaxPermGen}</jvmArg>
             </jvmArgs>
             <javacArgs>
               <javacArg>-source</javacArg>