diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 6516bea157233444021b16eba6cb05b8d9722907..b0daa70cfdf14f3f59ee45ade35b251a1472c5e5 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -9,9 +9,9 @@ package spark
   *                       known as map-side aggregations. When set to false, 
   *                       mergeCombiners function is not used.
   */
-class Aggregator[K, V, C] (
+case class Aggregator[K, V, C] (
     val createCombiner: V => C,
     val mergeValue: (C, V) => C,
     val mergeCombiners: (C, C) => C,
     val mapSideCombine: Boolean = true)
-  extends Serializable
+
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
index e6ad4dd28edd10fe4168833c247ebb529783fd87..05ca846c85ada98da4cd9345c63772a28e4f0620 100644
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ b/core/src/main/scala/spark/HttpFileServer.scala
@@ -20,7 +20,7 @@ class HttpFileServer extends Logging {
     fileDir.mkdir()
     jarDir.mkdir()
     logInfo("HTTP File server directory is " + baseDir)
-    httpServer = new HttpServer(fileDir)
+    httpServer = new HttpServer(baseDir)
     httpServer.start()
     serverUri = httpServer.uri
   }
@@ -30,11 +30,13 @@ class HttpFileServer extends Logging {
   }
   
   def addFile(file: File) : String = {
-    return addFileToDir(file, fileDir)
+    addFileToDir(file, fileDir)
+    return serverUri + "/files/" + file.getName
   }
   
   def addJar(file: File) : String = {
-    return addFileToDir(file, jarDir)
+    addFileToDir(file, jarDir)
+    return serverUri + "/jars/" + file.getName
   }
   
   def addFileToDir(file: File, dir: File) : String = {
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 64018f8c6bf1efaee6e5585fc6aae9c1c46dbc53..aa1d00c63c3f25b611236602e0867040b1bd9c23 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,11 +1,10 @@
 package spark
 
 import java.io.EOFException
-import java.net.URL
 import java.io.ObjectInputStream
+import java.net.URL
+import java.util.{Date, HashMap => JHashMap}
 import java.util.concurrent.atomic.AtomicLong
-import java.util.{HashMap => JHashMap}
-import java.util.Date
 import java.text.SimpleDateFormat
 
 import scala.collection.Map
@@ -50,9 +49,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   def combineByKey[C](createCombiner: V => C,
       mergeValue: (C, V) => C,
       mergeCombiners: (C, C) => C,
-      partitioner: Partitioner): RDD[(K, C)] = {
-    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
-    new ShuffledRDD(self, aggregator, partitioner)
+      partitioner: Partitioner,
+      mapSideCombine: Boolean = true): RDD[(K, C)] = {
+    val aggregator =
+      if (mapSideCombine) {
+        new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+      } else {
+        // Don't apply map-side combiner.
+        // A sanity check to make sure mergeCombiners is not defined.
+        assert(mergeCombiners == null)
+        new Aggregator[K, V, C](createCombiner, mergeValue, null, false)
+      }
+    new ShuffledAggregatedRDD(self, aggregator, partitioner)
   }
 
   def combineByKey[C](createCombiner: V => C,
@@ -65,7 +73,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
     combineByKey[V]((v: V) => v, func, func, partitioner)
   }
-  
+
   def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
     def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
       val map = new JHashMap[K, V]
@@ -116,13 +124,24 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     groupByKey(new HashPartitioner(numSplits))
   }
 
-  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
-    def createCombiner(v: V) = ArrayBuffer(v)
-    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
-    val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, mergeCombiners _, partitioner)
-    bufs.flatMapValues(buf => buf)
+  /**
+   * Repartition the RDD using the specified partitioner. If mapSideCombine is
+   * true, Spark will group values of the same key together on the map side
+   * before the repartitioning. If a large number of duplicated keys are
+   * expected, and the size of the keys are large, mapSideCombine should be set
+   * to true.
+   */
+  def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
+    if (mapSideCombine) {
+      def createCombiner(v: V) = ArrayBuffer(v)
+      def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+      def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
+      val bufs = combineByKey[ArrayBuffer[V]](
+        createCombiner _, mergeValue _, mergeCombiners _, partitioner)
+      bufs.flatMapValues(buf => buf)
+    } else {
+      new RepartitionShuffledRDD(self, partitioner)
+    }
   }
 
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
@@ -194,17 +213,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   }
 
   def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
-  
+
   def mapValues[U](f: V => U): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
     new MappedValuesRDD(self, cleanF)
   }
-  
+
   def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
     new FlatMappedValuesRDD(self, cleanF)
   }
-  
+
   def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
     val cg = new CoGroupedRDD[K](
         Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
@@ -215,12 +234,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
         (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
     }
   }
-  
+
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
       : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
     val cg = new CoGroupedRDD[K](
         Seq(self.asInstanceOf[RDD[(_, _)]],
-            other1.asInstanceOf[RDD[(_, _)]], 
+            other1.asInstanceOf[RDD[(_, _)]],
             other2.asInstanceOf[RDD[(_, _)]]),
         partitioner)
     val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
@@ -289,7 +308,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
   def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
     saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
   }
-  
+
   def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
     saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
   }
@@ -363,7 +382,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf))
     saveAsHadoopDataset(conf)
   }
-  
+
   def saveAsHadoopDataset(conf: JobConf) {
     val outputFormatClass = conf.getOutputFormat
     val keyClass = conf.getOutputKeyClass
@@ -377,7 +396,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     if (valueClass == null) {
       throw new SparkException("Output value class not set")
     }
-    
+
     logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
 
     val writer = new HadoopWriter(conf)
@@ -390,14 +409,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
 
       writer.setup(context.stageId, context.splitId, attemptNumber)
       writer.open()
-      
+
       var count = 0
       while(iter.hasNext) {
         val record = iter.next
         count += 1
         writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
       }
-    
+
       writer.close()
       writer.commit()
     }
@@ -413,28 +432,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
 
 class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
   self: RDD[(K, V)])
-  extends Logging 
+  extends Logging
   with Serializable {
 
   def sortByKey(ascending: Boolean = true): RDD[(K,V)] = {
-    val rangePartitionedRDD = self.partitionBy(new RangePartitioner(self.splits.size, self, ascending))
-    new SortedRDD(rangePartitionedRDD, ascending)
+    new ShuffledSortedRDD(self, ascending)
   }
 }
 
-class SortedRDD[K <% Ordered[K], V](prev: RDD[(K, V)], ascending: Boolean)
-  extends RDD[(K, V)](prev.context) {
-
-  override def splits = prev.splits
-  override val partitioner = prev.partitioner
-  override val dependencies = List(new OneToOneDependency(prev))
-
-  override def compute(split: Split) = {
-    prev.iterator(split).toArray
-      .sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
-  }
-} 
- 
 class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) {
   override def splits = prev.splits
   override val dependencies = List(new OneToOneDependency(prev))
@@ -444,7 +449,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
 
 class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
   extends RDD[(K, U)](prev.context) {
-  
+
   override def splits = prev.splits
   override val dependencies = List(new OneToOneDependency(prev))
   override val partitioner = prev.partitioner
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index d28f3593fe62d85df86be50540c847941186baa8..efe248896a16b8163abd166985d77f6850342d25 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -68,6 +68,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
   def preferredLocations(split: Split): Seq[String] = Nil
   
   def context = sc
+
+  def elementClassManifest: ClassManifest[T] = classManifest[T]
   
   // Get a unique ID for this RDD
   val id = sc.newRddId()
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 61a70beaf1fd73566443f8cf7e05c2317eceafd4..5f26bd2a7b3eadad697452c6e7bd4bf850a92c04 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -43,7 +43,7 @@ trait SerializerInstance {
   def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
     // Default implementation uses deserializeStream
     buffer.rewind()
-    deserializeStream(new ByteBufferInputStream(buffer)).toIterator
+    deserializeStream(new ByteBufferInputStream(buffer)).asIterator
   }
 }
 
@@ -74,7 +74,7 @@ trait DeserializationStream {
    * Read the elements of this stream through an iterator. This can only be called once, as
    * reading each element will consume data from the input source.
    */
-  def toIterator: Iterator[Any] = new Iterator[Any] {
+  def asIterator: Iterator[Any] = new Iterator[Any] {
     var gotNext = false
     var finished = false
     var nextValue: Any = null
diff --git a/core/src/main/scala/spark/ShuffledRDD.scala b/core/src/main/scala/spark/ShuffledRDD.scala
index 3616d8e47ec85ffb7e819e2544ff20d29bbb9d41..a7346060b3ccb542abb5ee50a2697db7555de4ca 100644
--- a/core/src/main/scala/spark/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/ShuffledRDD.scala
@@ -1,29 +1,89 @@
 package spark
 
+import scala.collection.mutable.ArrayBuffer
 import java.util.{HashMap => JHashMap}
 
+
 class ShuffledRDDSplit(val idx: Int) extends Split {
   override val index = idx
   override def hashCode(): Int = idx
 }
 
-class ShuffledRDD[K, V, C](
+
+/**
+ * The resulting RDD from a shuffle (e.g. repartitioning of data).
+ */
+abstract class ShuffledRDD[K, V, C](
     @transient parent: RDD[(K, V)],
     aggregator: Aggregator[K, V, C],
-    part : Partitioner) 
+    part : Partitioner)
   extends RDD[(K, C)](parent.context) {
-  //override val partitioner = Some(part)
+
   override val partitioner = Some(part)
-  
+
   @transient
   val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
 
   override def splits = splits_
-  
+
   override def preferredLocations(split: Split) = Nil
-  
+
   val dep = new ShuffleDependency(context.newShuffleId, parent, aggregator, part)
   override val dependencies = List(dep)
+}
+
+
+/**
+ * Repartition a key-value pair RDD.
+ */
+class RepartitionShuffledRDD[K, V](
+    @transient parent: RDD[(K, V)],
+    part : Partitioner)
+  extends ShuffledRDD[K, V, V](
+    parent,
+    Aggregator[K, V, V](null, null, null, false),
+    part) {
+
+  override def compute(split: Split): Iterator[(K, V)] = {
+    val buf = new ArrayBuffer[(K, V)]
+    val fetcher = SparkEnv.get.shuffleFetcher
+    def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+    fetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+    buf.iterator
+  }
+}
+
+
+/**
+ * A sort-based shuffle (that doesn't apply aggregation). It does so by first
+ * repartitioning the RDD by range, and then sort within each range.
+ */
+class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
+    @transient parent: RDD[(K, V)],
+    ascending: Boolean)
+  extends RepartitionShuffledRDD[K, V](
+    parent,
+    new RangePartitioner(parent.splits.size, parent, ascending)) {
+
+  override def compute(split: Split): Iterator[(K, V)] = {
+    // By separating this from RepartitionShuffledRDD, we avoided a
+    // buf.iterator.toArray call, thus avoiding building up the buffer twice.
+    val buf = new ArrayBuffer[(K, V)]
+    def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
+    SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
+    buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
+  }
+}
+
+
+/**
+ * The resulting RDD from shuffle and running (hash-based) aggregation.
+ */
+class ShuffledAggregatedRDD[K, V, C](
+    @transient parent: RDD[(K, V)],
+    aggregator: Aggregator[K, V, C],
+    part : Partitioner)
+  extends ShuffledRDD[K, V, C](parent, aggregator, part) {
 
   override def compute(split: Split): Iterator[(K, C)] = {
     val combiners = new JHashMap[K, C]
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 7473b40aa346b0c720af61545ff4262491b341fa..6ffae8e85f12a2f0d521b6d9670aa0371b4869d1 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -40,6 +40,8 @@ class SparkEnv (
     blockManager.stop()
     blockManager.master.stop()
     actorSystem.shutdown()
+    // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
+    Thread.sleep(100)
     actorSystem.awaitTermination()
     // Akka's awaitTermination doesn't actually wait until the port is unbound, so sleep a bit
     Thread.sleep(100)
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 07aa18e5404cc9411421316bb5ab13b13b324707..5a3f8bde4372ab922711f3e7282cda09ba1eb91b 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -163,6 +163,8 @@ object Utils extends Logging {
       logInfo("Untarring " + filename)
       Utils.execute(Seq("tar", "-xf", filename), targetDir)
     }
+    // Make the file executable - That's necessary for scripts
+    FileUtil.chmod(filename, "a+x")
   }
 
   /**
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index 31d48b82b990aa05117ffd3d1e3900e646e1c155..4c81a1b447b58bdd42520960a4b3cb2ddab025f8 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -31,4 +31,13 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
   }
 
   def coresLeft: Int = desc.cores - coresGranted
+
+  private var _retryCount = 0
+
+  def retryCount = _retryCount
+
+  def incrementRetryCount = {
+    _retryCount += 1
+    _retryCount
+  }
 }
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
index 50b0c6f95bbb98493844b8cc9eb7e54a11663c4a..8d458ac39c2c219836f823e524ebb52ce1609bb9 100644
--- a/core/src/main/scala/spark/deploy/master/JobState.scala
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -4,4 +4,6 @@ object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED")
   type JobState = Value
 
   val WAITING, RUNNING, FINISHED, FAILED = Value
+
+  val MAX_NUM_RETRY = 10
 }
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c98dddea7bf2e7003d0e48def7bbff14d2bcecde..5cc73633abaf4abac8a0450806b479c963c0f741 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,19 +1,18 @@
 package spark.deploy.master
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-
 import akka.actor._
-import spark.{Logging, Utils}
-import spark.util.AkkaUtils
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+
 import java.text.SimpleDateFormat
 import java.util.Date
-import akka.remote.RemoteClientLifeCycleEvent
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
 import spark.deploy._
-import akka.remote.RemoteClientShutdown
-import akka.remote.RemoteClientDisconnected
-import spark.deploy.RegisterWorker
-import spark.deploy.RegisterWorkerFailed
-import akka.actor.Terminated
+import spark.{Logging, SparkException, Utils}
+import spark.util.AkkaUtils
+
 
 class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
           exec.state = state
           exec.job.actor ! ExecutorUpdated(execId, state, message)
           if (ExecutorState.isFinished(state)) {
+            val jobInfo = idToJob(jobId)
             // Remove this executor from the worker and job
             logInfo("Removing executor " + exec.fullId + " because it is " + state)
-            idToJob(jobId).removeExecutor(exec)
+            jobInfo.removeExecutor(exec)
             exec.worker.removeExecutor(exec)
-            // TODO: the worker would probably want to restart the executor a few times
-            schedule()
+
+            // Only retry certain number of times so we don't go into an infinite loop.
+            if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
+              schedule()
+            } else {
+              val e = new SparkException("Job %s wth ID %s failed %d times.".format(
+                jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
+              logError(e.getMessage, e)
+              throw e
+              //System.exit(1)
+            }
           }
         }
         case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
       addressToWorker.get(address).foreach(removeWorker)
       addressToJob.get(address).foreach(removeJob)
     }
-    
+
     case RequestMasterState => {
       sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
     }
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 704336102019cf706c011911a508d33865fd70a6..e2a9df275afab842ebc4c3186e3ebdf552328b64 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -75,7 +75,8 @@ class ExecutorRunner(
 
   def buildCommandSeq(): Seq[String] = {
     val command = jobDesc.command
-    val runScript = new File(sparkHome, "run").getCanonicalPath
+    val script = if (System.getProperty("os.name").startsWith("Windows")) "run.cmd" else "run";
+    val runScript = new File(sparkHome, script).getCanonicalPath
     Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
   }
 
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 8f975c52d4ccc7416e8334838555e8126b67f0e9..9999b6ba806ddb76ca9785ec7bac6812c27de7e7 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -38,6 +38,10 @@ class Executor extends Logging {
       System.setProperty(key, value)
     }
 
+    // Create our ClassLoader and set it on this thread
+    urlClassLoader = createClassLoader()
+    Thread.currentThread.setContextClassLoader(urlClassLoader)
+
     // Initialize Spark environment (using system properties read above)
     env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
     SparkEnv.set(env)
@@ -45,11 +49,6 @@ class Executor extends Logging {
     // Start worker thread pool
     threadPool = new ThreadPoolExecutor(
       1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
-    // Create our ClassLoader and set it on this thread
-    urlClassLoader = createClassLoader()
-    Thread.currentThread.setContextClassLoader(urlClassLoader)
-
   }
 
   def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 3bcc588015d0186325b09017d61432865eb9012c..745aa0c93915b3b6836f38d9e1d364ea6bb3c277 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -44,7 +44,8 @@ object ShuffleMapTask {
   }
 
   // Since both the JarSet and FileSet have the same format this is used for both.
-  def serializeFileSet(set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
+  def serializeFileSet(
+    set : HashMap[String, Long], stageId: Int, cache : JHashMap[Int, Array[Byte]]) : Array[Byte] = {
     val old = cache.get(stageId)
     if (old != null) {
       return old
@@ -59,7 +60,6 @@ object ShuffleMapTask {
     }
   }
 
-
   def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_,_]) = {
     synchronized {
       val loader = Thread.currentThread.getContextClassLoader
@@ -113,7 +113,8 @@ class ShuffleMapTask(
     out.writeInt(bytes.length)
     out.write(bytes)
 
-    val fileSetBytes = ShuffleMapTask.serializeFileSet(fileSet, stageId, ShuffleMapTask.fileSetCache)
+    val fileSetBytes = ShuffleMapTask.serializeFileSet(
+      fileSet, stageId, ShuffleMapTask.fileSetCache)
     out.writeInt(fileSetBytes.length)
     out.write(fileSetBytes)
     val jarSetBytes = ShuffleMapTask.serializeFileSet(jarSet, stageId, ShuffleMapTask.jarSetCache)
@@ -172,7 +173,7 @@ class ShuffleMapTask(
         buckets.map(_.iterator)
       } else {
         // No combiners (no map-side aggregation). Simply partition the map output.
-        val buckets = Array.tabulate(numOutputSplits)(_ => new ArrayBuffer[(Any, Any)])
+        val buckets = Array.fill(numOutputSplits)(new ArrayBuffer[(Any, Any)])
         for (elem <- rdd.iterator(split)) {
           val pair = elem.asInstanceOf[(Any, Any)]
           val bucketId = partitioner.getPartition(pair._1)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3a51f6bd96cbcfc898e9ecac87dd2bbfa072feaa..15748b70d5ba47231bedd47be2918ebc7f3e554a 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -614,10 +614,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
   }
 
   def dataDeserialize(bytes: ByteBuffer): Iterator[Any] = {
-    /*serializer.newInstance().deserializeMany(bytes)*/
-    val ser = serializer.newInstance()
     bytes.rewind()
-    return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
+    val ser = serializer.newInstance()
+    return ser.deserializeStream(new ByteBufferInputStream(bytes)).asIterator
   }
 
   def stop() {
@@ -632,7 +631,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
 object BlockManager {
 
   def getNumParallelFetchesFromSystemProperties(): Int = {
-    System.getProperty("spark.blockManager.parallelFetches", "8").toInt
+    System.getProperty("spark.blockManager.parallelFetches", "4").toInt
   }
 
   def getMaxMemoryFromSystemProperties(): Long = {
diff --git a/core/src/main/scala/spark/storage/BlockStore.scala b/core/src/main/scala/spark/storage/BlockStore.scala
index 09287faba04a3dad9571300baf2ecc455f93ad63..d505df66a7d72f4548d92c68400372a8295c4ba8 100644
--- a/core/src/main/scala/spark/storage/BlockStore.scala
+++ b/core/src/main/scala/spark/storage/BlockStore.scala
@@ -1,6 +1,6 @@
 package spark.storage
 
-import java.io.{File, RandomAccessFile}
+import java.io.{File, FileOutputStream, RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 import java.util.{LinkedHashMap, UUID}
@@ -8,12 +8,14 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
 
 import scala.collection.mutable.ArrayBuffer
 
+import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
+
 import spark.{Utils, Logging, Serializer, SizeEstimator}
 
 /**
  * Abstract class to store blocks
  */
-abstract class BlockStore(blockManager: BlockManager) extends Logging {
+abstract class BlockStore(val blockManager: BlockManager) extends Logging {
   initLogging()
 
   def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) 
@@ -131,7 +133,7 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
       return None 
     }
     if (entry.deserialized) {
-      return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].toIterator)
+      return Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
     } else {
       return Some(dataDeserialize(entry.value.asInstanceOf[ByteBuffer].duplicate())) 
     }
@@ -217,25 +219,28 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
     logDebug("Attempting to put block " + blockId)
     val startTime = System.currentTimeMillis
     val file = createFile(blockId)
-    if (file != null) {
-      val channel = new RandomAccessFile(file, "rw").getChannel()
-      val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
-      buffer.put(bytes)
-      channel.close()
-      val finishTime = System.currentTimeMillis
-      logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
-        blockId, bytes.limit, (finishTime - startTime)))
-    } else {
-      logError("File not created for block " + blockId)
-    }
+    val channel = new RandomAccessFile(file, "rw").getChannel()
+    val buffer = channel.map(MapMode.READ_WRITE, 0, bytes.limit)
+    buffer.put(bytes)
+    channel.close()
+    val finishTime = System.currentTimeMillis
+    logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
+      blockId, bytes.limit, (finishTime - startTime)))
   }
 
   def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
-  : Either[Iterator[Any], ByteBuffer] = {
-    val bytes = dataSerialize(values) 
-    logDebug("Converted block " + blockId + " to " + bytes.limit + " bytes")
-    putBytes(blockId, bytes, level)
-    return Right(bytes)
+      : Either[Iterator[Any], ByteBuffer] = {
+
+    logDebug("Attempting to write values for block " + blockId)
+    val file = createFile(blockId)
+    val fileOut = new FastBufferedOutputStream(new FileOutputStream(file))
+    val objOut = blockManager.serializer.newInstance().serializeStream(fileOut)
+    objOut.writeAll(values)
+    objOut.close()
+
+    // Return a byte buffer for the contents of the file
+    val channel = new RandomAccessFile(file, "rw").getChannel()
+    Right(channel.map(MapMode.READ_WRITE, 0, channel.size()))
   }
 
   def getBytes(blockId: String): Option[ByteBuffer] = {
@@ -267,8 +272,7 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
       newFile.getParentFile.mkdirs()
       return newFile 
     } else {
-      logError("File for block " + blockId + " already exists on disk, " + file)
-      return null
+      throw new Exception("File for block " + blockId + " already exists on disk, " + file)
     }
   }
 
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index b7b8a79327748ba63092d46dc5a309e5a30a6e73..93b876d2057810bde02809b4bbe0831c3e4e8b32 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,7 +18,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
 
   val clusterUrl = "local-cluster[2,1,512]"
   
-  var sc: SparkContext = _
+  @transient var sc: SparkContext = _
   
   after {
     if (sc != null) {
diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala
index 500af1eb902bd61a1d492ff1b412738fe51cd99c..fd7a7bd589430e7f37d7e88e436d300eef82071c 100644
--- a/core/src/test/scala/spark/FileServerSuite.scala
+++ b/core/src/test/scala/spark/FileServerSuite.scala
@@ -3,14 +3,14 @@ package spark
 import com.google.common.io.Files
 import org.scalatest.FunSuite
 import org.scalatest.BeforeAndAfter
-import java.io.{File, PrintWriter}
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
 import SparkContext._
 
 class FileServerSuite extends FunSuite with BeforeAndAfter {
   
-  var sc: SparkContext = _
-  var tmpFile : File = _ 
-  var testJarFile : File = _ 
+  @transient var sc: SparkContext = _
+  @transient var tmpFile : File = _
+  @transient var testJarFile : File = _
   
   before {
     // Create a sample text file
@@ -38,7 +38,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
-      val in  = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+      val in  = new BufferedReader(new FileReader("FileServerSuite.txt"))
       val fileVal = in.readLine().toInt
       in.close()
       _ * fileVal + _ * fileVal
@@ -53,7 +53,9 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
     sc.addJar(sampleJarFile)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
     val result = sc.parallelize(testData).reduceByKey { (x,y) => 
-      val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+      val fac = Thread.currentThread.getContextClassLoader()
+                                    .loadClass("org.uncommons.maths.Maths")
+                                    .getDeclaredMethod("factorial", classOf[Int])
       val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
       val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
       a + b
@@ -66,7 +68,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
     sc.addFile(tmpFile.toString)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
     val result = sc.parallelize(testData).reduceByKey {
-      val in  = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
+      val in  = new BufferedReader(new FileReader("FileServerSuite.txt"))
       val fileVal = in.readLine().toInt
       in.close()
       _ * fileVal + _ * fileVal
@@ -75,19 +77,19 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
     assert(result.toSet === Set((1,200), (2,300), (3,500)))
   }
 
-
   test ("Dynamically adding JARS on a standalone cluster") {
     sc = new SparkContext("local-cluster[1,1,512]", "test")
     val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
     sc.addJar(sampleJarFile)
     val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
     val result = sc.parallelize(testData).reduceByKey { (x,y) => 
-      val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
+      val fac = Thread.currentThread.getContextClassLoader()
+                                    .loadClass("org.uncommons.maths.Maths")
+                                    .getDeclaredMethod("factorial", classOf[Int])
       val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
       val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
       a + b
     }.collect()
     assert(result.toSet === Set((1,2), (2,7), (3,121)))
   }
-  
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index f622c413f7c5f8a51707f4edecb8eb4fc905e1d5..9d7e2591f1f9bd73ec0e756d01b9a8c9d4177c6f 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -15,16 +15,16 @@ import scala.collection.mutable.ArrayBuffer
 import SparkContext._
 
 class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
-  
+
   var sc: SparkContext = _
-  
+
   after {
     if (sc != null) {
       sc.stop()
       sc = null
     }
   }
-  
+
   test("groupByKey") {
     sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -57,7 +57,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     val valuesFor2 = groups.find(_._1 == 2).get._2
     assert(valuesFor2.toList.sorted === List(1))
   }
-  
+
   test("groupByKey with many output partitions") {
     sc = new SparkContext("local", "test")
     val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
@@ -187,7 +187,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
       (4, (ArrayBuffer(), ArrayBuffer('w')))
     ))
   }
-  
+
   test("zero-partition RDD") {
     sc = new SparkContext("local", "test")
     val emptyDir = Files.createTempDir()
@@ -195,7 +195,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     assert(file.splits.size == 0)
     assert(file.collect().toList === Nil)
     // Test that a shuffle on the file works, because this used to be a bug
-    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)    
+    assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
   }
 
   test("map-side combine") {
@@ -212,7 +212,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
       _+_,
       _+_,
       false)
-    val shuffledRdd = new ShuffledRDD(
+    val shuffledRdd = new ShuffledAggregatedRDD(
       pairs, aggregator, new HashPartitioner(2))
     assert(shuffledRdd.collect().toSet === Set((1, 8), (2, 1)))
 
@@ -220,7 +220,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     // not see an exception because mergeCombine should not have been called.
     val aggregatorWithException = new Aggregator[Int, Int, Int](
       (v: Int) => v, _+_, ShuffleSuite.mergeCombineException, false)
-    val shuffledRdd1 = new ShuffledRDD(
+    val shuffledRdd1 = new ShuffledAggregatedRDD(
       pairs, aggregatorWithException, new HashPartitioner(2))
     assert(shuffledRdd1.collect().toSet === Set((1, 8), (2, 1)))
 
@@ -228,7 +228,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with BeforeAndAfter {
     // expect to see an exception thrown.
     val aggregatorWithException1 = new Aggregator[Int, Int, Int](
       (v: Int) => v, _+_, ShuffleSuite.mergeCombineException)
-    val shuffledRdd2 = new ShuffledRDD(
+    val shuffledRdd2 = new ShuffledAggregatedRDD(
       pairs, aggregatorWithException1, new HashPartitioner(2))
     evaluating { shuffledRdd2.collect() } should produce [SparkException]
   }
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index b133376a97a49a9f4d488c649e4455e13d15b587..0c925c176c62b0b42bf6e4beafa1420ed9f2829a 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -19,7 +19,7 @@ To write a Bagel application, you will need to add Spark, its dependencies, and
 
 ## Programming Model
 
-Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
+Bagel operates on a graph represented as a [distributed dataset]({{HOME_PATH}}scala-programming-guide.html) of (K, V) pairs, where keys are vertex IDs and values are vertices plus their associated state. In each superstep, Bagel runs a user-specified compute function on each vertex that takes as input the current vertex state and a list of messages sent to that vertex during the previous superstep, and returns the new vertex state and a list of outgoing messages.
 
 For example, we can use Bagel to implement PageRank. Here, vertices represent pages, edges represent links between pages, and messages represent shares of PageRank sent to the pages that a particular page links to. 
 
diff --git a/docs/index.md b/docs/index.md
index 3df638f629537f17c9891a2e572a881c647d74e3..69d55e505ed7bbac622bc311567597c98f592aa1 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -54,7 +54,7 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
 
 # Where to Go from Here
 
-* [Spark Programming Guide]({{HOME_PATH}}programming-guide.html): how to get started using Spark, and details on the API
+* [Spark Programming Guide]({{HOME_PATH}}scala-programming-guide.html): how to get started using Spark, and details on the API
 * [Running Spark on Amazon EC2]({{HOME_PATH}}ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes
 * [Running Spark on Mesos]({{HOME_PATH}}running-on-mesos.html): instructions on how to deploy to a private cluster
 * [Running Spark on YARN]({{HOME_PATH}}running-on-yarn.html): instructions on how to run Spark on top of a YARN cluster
diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md
index e3f644d7489a32a64b01ab3002aa7d6465500c4a..c63448a965bf1c37a39fb399490e64ea84790363 100644
--- a/docs/java-programming-guide.md
+++ b/docs/java-programming-guide.md
@@ -2,4 +2,172 @@
 layout: global
 title: Java Programming Guide
 ---
-TODO: Write Java programming guide!
+
+The Spark Java API
+([spark.api.java]({{HOME_PATH}}api/core/index.html#spark.api.java.package)) defines
+[`JavaSparkContext`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaSparkContext) and
+[`JavaRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaRDD) clases,
+which support
+the same methods as their Scala counterparts but take Java functions and return
+Java data and collection types.
+
+Because Java API is similar to the Scala API, this programming guide only
+covers Java-specific features;
+the [Scala Programming Guide]({{HOME_PATH}}scala-programming-guide.html)
+provides a more general introduction to Spark concepts and should be read
+first.
+
+
+# Key differences in the Java API
+There are a few key differences between the Java and Scala APIs:
+
+* Java does not support anonymous or first-class functions, so functions must
+  be implemented by extending the
+  [`spark.api.java.function.Function`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function),
+  [`Function2`]({{HOME_PATH}}api/core/index.html#spark.api.java.function.Function2), etc.
+  classes.
+* To maintain type safety, the Java API defines specialized Function and RDD
+  classes for key-value pairs and doubles.
+* RDD methods like `collect` and `countByKey` return Java collections types,
+  such as `java.util.List` and `java.util.Map`.
+
+
+## RDD Classes
+Spark defines additional operations on RDDs of doubles and key-value pairs, such
+as `stdev` and `join`.
+
+In the Scala API, these methods are automatically added using Scala's
+[implicit conversions](http://www.scala-lang.org/node/130) mechanism.
+
+In the Java API, the extra methods are defined in
+[`JavaDoubleRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaDoubleRDD) and
+[`JavaPairRDD`]({{HOME_PATH}}api/core/index.html#spark.api.java.JavaPairRDD)
+classes.  RDD methods like `map` are overloaded by specialized `PairFunction`
+and `DoubleFunction` classes, allowing them to return RDDs of the appropriate
+types.  Common methods like `filter` and `sample` are implemented by
+each specialized RDD class, so filtering a `PairRDD` returns a new `PairRDD`,
+etc (this acheives the "same-result-type" principle used by the [Scala collections
+framework](http://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html)).
+
+## Function Classes
+
+The following table lists the function classes used by the Java API.  Each
+class has a single abstract method, `call()`, that must be implemented.
+
+<table class="table">
+<tr><th>Class</th><th>Function Type</th></tr>
+
+<tr><td>Function&lt;T, R&gt;</td><td>T -&gt; R </td></tr>
+<tr><td>DoubleFunction&lt;T&gt;</td><td>T -&gt; Double </td></tr>
+<tr><td>PairFunction&lt;T, K, V&gt;</td><td>T -&gt; Tuple2&lt;K, V&gt; </td></tr>
+
+<tr><td>FlatMapFunction&lt;T, R&gt;</td><td>T -&gt; Iterable&lt;R&gt; </td></tr>
+<tr><td>DoubleFlatMapFunction&lt;T&gt;</td><td>T -&gt; Iterable&lt;Double&gt; </td></tr>
+<tr><td>PairFlatMapFunction&lt;T, K, V&gt;</td><td>T -&gt; Iterable&lt;Tuple2&lt;K, V&gt;&gt; </td></tr>
+
+<tr><td>Function2&lt;T1, T2, R&gt;</td><td>T1, T2 -&gt; R (function of two arguments)</td></tr>
+</table>
+
+# Other Features
+The Java API supports other Spark features, including
+[accumulators]({{HOME_PATH}}scala-programming-guide.html#accumulators),
+[broadcast variables]({{HOME_PATH}}scala-programming-guide.html#broadcast_variables), and
+[caching]({{HOME_PATH}}scala-programming-guide.html#caching).
+
+# Example
+
+As an example, we will implement word count using the Java API.
+
+{% highlight java %}
+import spark.api.java.*;
+import spark.api.java.function.*;
+
+JavaSparkContext sc = new JavaSparkContext(...);
+JavaRDD<String> lines = ctx.textFile("hdfs://...");
+JavaRDD<String> words = lines.flatMap(
+  new FlatMapFunction<String, String>() {
+    public Iterable<String> call(String s) {
+      return Arrays.asList(s.split(" "));
+    }
+  }
+);
+{% endhighlight %}
+
+The word count program starts by creating a `JavaSparkContext`, which accepts
+the same parameters as its Scala counterpart.  `JavaSparkContext` supports the
+same data loading methods as the regular `SparkContext`; here, `textFile`
+loads lines from text files stored in HDFS.
+
+To split the lines into words, we use `flatMap` to split each line on
+whitespace.  `flatMap` is passed a `FlatMapFunction` that accepts a string and
+returns an `java.lang.Iterable` of strings.
+
+Here, the `FlatMapFunction` was created inline; another option is to subclass
+`FlatMapFunction` and pass an instance to `flatMap`:
+
+{% highlight java %}
+class Split extends FlatMapFunction<String, String> {
+  public Iterable<String> call(String s) {
+    return Arrays.asList(s.split(" "));
+  }
+);
+JavaRDD<String> words = lines.flatMap(new Split());
+{% endhighlight %}
+
+Continuing with the word count example, we map each word to a `(word, 1)` pair:
+
+{% highlight java %}
+import scala.Tuple2;
+JavaPairRDD<String, Integer> ones = words.map(
+  new PairFunction<String, String, Integer>() {
+    public Tuple2<String, Integer> call(String s) {
+      return new Tuple2(s, 1);
+    }
+  }
+);
+{% endhighlight %}
+
+Note that `map` was passed a `PairFunction<String, String, Integer>` and
+returned a `JavaPairRDD<String, Integer>`.
+
+
+
+To finish the word count program, we will use `reduceByKey` to count the
+occurrences of each word:
+
+{% highlight java %}
+JavaPairRDD<String, Integer> counts = ones.reduceByKey(
+  new Function2<Integer, Integer, Integer>() {
+    public Integer call(Integer i1, Integer i2) {
+      return i1 + i2;
+    }
+  }
+);
+{% endhighlight %}
+
+Here, `reduceByKey` is passed a `Function2`, which implements a function with
+two arguments.  The resulting `JavaPairRDD` contains `(word, count)` pairs.
+
+In this example, we explicitly showed each intermediate RDD.  It is also
+possible to chain the RDD transformations, so the word count example could also
+be written as:
+
+{% highlight java %}
+JavaPairRDD<String, Integer> counts = lines.flatMap(
+    ...
+  ).map(
+    ...
+  ).reduceByKey(
+    ...
+  );
+{% endhighlight %}
+There is no performance difference between these approaches; the choice is
+a matter of style.
+
+
+# Where to go from here
+Spark includes several sample jobs using the Java API in
+`examples/src/main/java`.  You can run them by passing the class name to the
+`run` script included in Spark -- for example, `./run
+spark.examples.JavaWordCount`.  Each example program prints usage help when run
+without any arguments.
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 203001954aaddd17134fe970283b740d902db30d..0247b46de40e414eaa3cb25027b604d869748b65 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -22,7 +22,7 @@ object SparkBuild extends Build {
   def sharedSettings = Defaults.defaultSettings ++ Seq(
     organization := "org.spark-project",
     version := "0.6.0-SNAPSHOT",
-    scalaVersion := "2.9.1",
+    scalaVersion := "2.9.2",
     scalacOptions := Seq(/*"-deprecation",*/ "-unchecked", "-optimize"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
     unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
     retrieveManaged := true,
@@ -61,9 +61,9 @@ object SparkBuild extends Build {
       "asm" % "asm-all" % "3.3.1",
       "com.google.protobuf" % "protobuf-java" % "2.4.1",
       "de.javakaffee" % "kryo-serializers" % "0.9",
-      "com.typesafe.akka" % "akka-actor" % "2.0.2",
-      "com.typesafe.akka" % "akka-remote" % "2.0.2",
-      "com.typesafe.akka" % "akka-slf4j" % "2.0.2",
+      "com.typesafe.akka" % "akka-actor" % "2.0.3",
+      "com.typesafe.akka" % "akka-remote" % "2.0.3",
+      "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
       "it.unimi.dsi" % "fastutil" % "6.4.4",
       "colt" % "colt" % "1.2.0",
       "cc.spray" % "spray-can" % "1.0-M2.1",
diff --git a/run b/run
index 2946a04d3f7e2d22c59c7a748dc396c19da9bf32..5f640789ffc9c3fb2bcc5b7dfa98a224c1e993f9 100755
--- a/run
+++ b/run
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-SCALA_VERSION=2.9.1
+SCALA_VERSION=2.9.2
 
 # Figure out where the Scala framework is installed
 FWDIR="$(cd `dirname $0`; pwd)"
diff --git a/run.cmd b/run.cmd
new file mode 100644
index 0000000000000000000000000000000000000000..cc5605f8a9e3e2b9fc7da725c86b1a9b7da73a58
--- /dev/null
+++ b/run.cmd
@@ -0,0 +1,2 @@
+@echo off
+cmd /V /E /C %~dp0run2.cmd %*
diff --git a/run2.cmd b/run2.cmd
new file mode 100644
index 0000000000000000000000000000000000000000..9fc4d5054b435282791ea078193d96d04084f521
--- /dev/null
+++ b/run2.cmd
@@ -0,0 +1,68 @@
+@echo off
+
+set SCALA_VERSION=2.9.1
+
+rem Figure out where the Spark framework is installed
+set FWDIR=%~dp0
+
+rem Export this as SPARK_HOME
+set SPARK_HOME=%FWDIR%
+
+rem Load environment variables from conf\spark-env.cmd, if it exists
+if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
+
+rem Check that SCALA_HOME has been specified
+if not "x%SCALA_HOME%"=="x" goto scala_exists
+  echo "SCALA_HOME is not set"
+  goto exit
+:scala_exists
+
+rem If the user specifies a Mesos JAR, put it before our included one on the classpath
+set MESOS_CLASSPATH=
+if not "x%MESOS_JAR%"=="x" set MESOS_CLASSPATH=%MESOS_JAR%
+
+rem Figure out how much memory to use per executor and set it as an environment
+rem variable so that our process sees it and can report it to Mesos
+if "x%SPARK_MEM%"=="x" set SPARK_MEM=512m
+
+rem Set JAVA_OPTS to be able to load native libraries and to set heap size
+set JAVA_OPTS=%SPARK_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%SPARK_MEM% -Xmx%SPARK_MEM%
+rem Load extra JAVA_OPTS from conf/java-opts, if it exists
+if exist "%FWDIR%conf\java-opts.cmd" call "%FWDIR%conf\java-opts.cmd"
+
+set CORE_DIR=%FWDIR%core
+set REPL_DIR=%FWDIR%repl
+set EXAMPLES_DIR=%FWDIR%examples
+set BAGEL_DIR=%FWDIR%bagel
+
+rem Build up classpath
+set CLASSPATH=%SPARK_CLASSPATH%;%MESOS_CLASSPATH%;%FWDIR%conf;%CORE_DIR%\target\scala-%SCALA_VERSION%\classes
+set CLASSPATH=%CLASSPATH%;%CORE_DIR%\target\scala-%SCALA_VERSION%\test-classes;%CORE_DIR%\src\main\resources
+set CLASSPATH=%CLASSPATH%;%REPL_DIR%\target\scala-%SCALA_VERSION%\classes;%EXAMPLES_DIR%\target\scala-%SCALA_VERSION%\classes
+for /R "%CORE_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+for /R "%FWDIR%\lib_managed\jars" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+for /R "%FWDIR%\lib_managed\bundles" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+for /R "%REPL_DIR%\lib" %%j in (*.jar) do set CLASSPATH=!CLASSPATH!;%%j
+set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
+
+rem Figure out whether to run our class with java or with the scala launcher.
+rem In most cases, we'd prefer to execute our process with java because scala
+rem creates a shell script as the parent of its Java process, which makes it
+rem hard to kill the child with stuff like Process.destroy(). However, for
+rem the Spark shell, the wrapper is necessary to properly reset the terminal
+rem when we exit, so we allow it to set a variable to launch with scala.
+if "%SPARK_LAUNCH_WITH_SCALA%" NEQ 1 goto java_runner
+  set RUNNER=%SCALA_HOME%\bin\scala
+  # Java options will be passed to scala as JAVA_OPTS
+  set EXTRA_ARGS=
+  goto run_spark
+:java_runner
+  set CLASSPATH=%CLASSPATH%;%SCALA_HOME%\lib\scala-library.jar;%SCALA_HOME%\lib\scala-compiler.jar;%SCALA_HOME%\lib\jline.jar
+  set RUNNER=java
+  if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java
+  rem The JVM doesn't read JAVA_OPTS by default so we need to pass it in
+  set EXTRA_ARGS=%JAVA_OPTS%
+:run_spark
+
+%RUNNER% -cp "%CLASSPATH%" %EXTRA_ARGS% %*
+:exit
\ No newline at end of file
diff --git a/sbt/sbt.cmd b/sbt/sbt.cmd
new file mode 100644
index 0000000000000000000000000000000000000000..6b289ab44761c11846fb212984a5f022b9d52a3d
--- /dev/null
+++ b/sbt/sbt.cmd
@@ -0,0 +1,5 @@
+@echo off
+set EXTRA_ARGS=
+if not "%MESOS_HOME%x"=="x" set EXTRA_ARGS=-Djava.library.path=%MESOS_HOME%\lib\java
+set SPARK_HOME=%~dp0..
+java -Xmx1200M -XX:MaxPermSize=200m %EXTRA_ARGS% -jar %SPARK_HOME%\sbt\sbt-launch-*.jar "%*"
diff --git a/spark-shell.cmd b/spark-shell.cmd
new file mode 100644
index 0000000000000000000000000000000000000000..34697d52d73f4b4fd0cba4887b021a0d5381127a
--- /dev/null
+++ b/spark-shell.cmd
@@ -0,0 +1,4 @@
+@echo off
+set FWDIR=%~dp0
+set SPARK_LAUNCH_WITH_SCALA=1
+cmd /V /E /C %FWDIR%run2.cmd spark.repl.Main %*