diff --git a/README.md b/README.md
index b0fc3524fa6b769c5c6f8ba95fae8cbadd0970da..1f8f7b6876c4cb24a8d941d0ef3d15bdc8822046 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,8 @@ which is packaged with it. To build Spark and its example programs, run:
 
     sbt/sbt package
 
+Spark also supports building using Maven. If you would like to build using Maven, see the [instructions for building Spark with Maven](http://spark-project.org/docs/latest/building-with-maven.html) in the spark documentation..
+
 To run Spark, you will need to have Scala's bin directory in your `PATH`, or
 you will need to set the `SCALA_HOME` environment variable to point to where
 you've installed Scala. Scala must be accessible through one of these
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index b88d711dcfe2bb648dc0006e64efdb26b9463905..33dc7627a3dc3636a32478f9c9b9defe9ee210a9 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -372,6 +372,62 @@ abstract class RDD[T: ClassManifest](
     preservesPartitioning: Boolean = false): RDD[U] =
     new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
 
+  /**
+   * Maps f over this RDD, where f takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+    (f:(T, A) => U): RDD[U] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+        val a = constructA(index)
+        iter.map(t => f(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+  }
+
+  /**
+   * FlatMaps f over this RDD, where f takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+    (f:(T, A) => Seq[U]): RDD[U] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+        val a = constructA(index)
+        iter.flatMap(t => f(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+  }
+
+  /**
+   * Applies f to each element of this RDD, where f takes an additional parameter of type A.
+   * This additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def foreachWith[A: ClassManifest](constructA: Int => A)
+    (f:(T, A) => Unit) {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+        val a = constructA(index)
+        iter.map(t => {f(t, a); t})
+      }
+    (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
+  }
+
+  /**
+   * Filters this RDD with p, where p takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def filterWith[A: ClassManifest](constructA: Int => A)
+    (p:(T, A) => Boolean): RDD[T] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+        val a = constructA(index)
+        iter.filter(t => p(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
+  }
+
   /**
    * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
    * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
@@ -390,6 +446,14 @@ abstract class RDD[T: ClassManifest](
     sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
   }
 
+  /**
+   * Applies a function f to each partition of this RDD.
+   */
+  def foreachPartition(f: Iterator[T] => Unit) {
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => f(iter))
+  }
+
   /**
    * Return an array that contains all of the elements in this RDD.
    */
@@ -412,7 +476,7 @@ abstract class RDD[T: ClassManifest](
 
   /**
    * Return an RDD with the elements from `this` that are not in `other`.
-   * 
+   *
    * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
    * RDD will be <= us.
    */
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 4474ef4593703423215a172d103d7b98a3952b36..3e7407b58d8e6dacc52e405f1407c5b713087667 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -16,66 +16,61 @@ import java.nio.ByteBuffer
 /**
  * The Mesos executor for Spark.
  */
-private[spark] class Executor extends Logging {
-  var urlClassLoader : ExecutorURLClassLoader = null
-  var threadPool: ExecutorService = null
-  var env: SparkEnv = null
-
+private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
+  
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
   // Each map holds the master's timestamp for the version of that file or JAR we got.
-  val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
-  val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
+  private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
+  private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
 
-  val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
+  private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
 
   initLogging()
 
-  def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
-    // Make sure the local hostname we report matches the cluster scheduler's name for this host
-    Utils.setCustomHostname(slaveHostname)
+  // Make sure the local hostname we report matches the cluster scheduler's name for this host
+  Utils.setCustomHostname(slaveHostname)
 
-    // Set spark.* system properties from executor arg
-    for ((key, value) <- properties) {
-      System.setProperty(key, value)
-    }
+  // Set spark.* system properties from executor arg
+  for ((key, value) <- properties) {
+    System.setProperty(key, value)
+  }
+
+  // Create our ClassLoader and set it on this thread
+  private val urlClassLoader = createClassLoader()
+  Thread.currentThread.setContextClassLoader(urlClassLoader)
 
-    // Create our ClassLoader and set it on this thread
-    urlClassLoader = createClassLoader()
-    Thread.currentThread.setContextClassLoader(urlClassLoader)
-
-    // Make any thread terminations due to uncaught exceptions kill the entire
-    // executor process to avoid surprising stalls.
-    Thread.setDefaultUncaughtExceptionHandler(
-      new Thread.UncaughtExceptionHandler {
-        override def uncaughtException(thread: Thread, exception: Throwable) {
-          try {
-            logError("Uncaught exception in thread " + thread, exception)
-            
-            // We may have been called from a shutdown hook. If so, we must not call System.exit().
-            // (If we do, we will deadlock.)
-            if (!Utils.inShutdown()) {
-              if (exception.isInstanceOf[OutOfMemoryError]) {
-                System.exit(ExecutorExitCode.OOM)
-              } else {
-                System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
-              }
+  // Make any thread terminations due to uncaught exceptions kill the entire
+  // executor process to avoid surprising stalls.
+  Thread.setDefaultUncaughtExceptionHandler(
+    new Thread.UncaughtExceptionHandler {
+      override def uncaughtException(thread: Thread, exception: Throwable) {
+        try {
+          logError("Uncaught exception in thread " + thread, exception)
+          
+          // We may have been called from a shutdown hook. If so, we must not call System.exit().
+          // (If we do, we will deadlock.)
+          if (!Utils.inShutdown()) {
+            if (exception.isInstanceOf[OutOfMemoryError]) {
+              System.exit(ExecutorExitCode.OOM)
+            } else {
+              System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
             }
-          } catch {
-            case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
-            case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
           }
+        } catch {
+          case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
+          case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
         }
       }
-    )
+    }
+  )
 
-    // Initialize Spark environment (using system properties read above)
-    env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
-    SparkEnv.set(env)
+  // Initialize Spark environment (using system properties read above)
+  val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
+  SparkEnv.set(env)
 
-    // Start worker thread pool
-    threadPool = new ThreadPoolExecutor(
-      1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-  }
+  // Start worker thread pool
+  val threadPool = new ThreadPoolExecutor(
+    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
 
   def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
     threadPool.execute(new TaskRunner(context, taskId, serializedTask))
diff --git a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
index 818d6d1dda7f9774c60abfbf6ee0dff7730983a2..10f3531df0d350ee15487a0fc73ca6122a4150fd 100644
--- a/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/MesosExecutorBackend.scala
@@ -8,11 +8,12 @@ import com.google.protobuf.ByteString
 import spark.{Utils, Logging}
 import spark.TaskState
 
-private[spark] class MesosExecutorBackend(executor: Executor)
+private[spark] class MesosExecutorBackend
   extends MesosExecutor
   with ExecutorBackend
   with Logging {
 
+  var executor: Executor = null
   var driver: ExecutorDriver = null
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
@@ -32,16 +33,19 @@ private[spark] class MesosExecutorBackend(executor: Executor)
     logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
     this.driver = driver
     val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
-    executor.initialize(
+    executor = new Executor(
       executorInfo.getExecutorId.getValue,
       slaveInfo.getHostname,
-      properties
-    )
+      properties)
   }
 
   override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
     val taskId = taskInfo.getTaskId.getValue.toLong
-    executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+    if (executor == null) {
+      logError("Received launchTask but executor was null")
+    } else {
+      executor.launchTask(this, taskId, taskInfo.getData.asReadOnlyByteBuffer)
+    }
   }
 
   override def error(d: ExecutorDriver, message: String) {
@@ -68,7 +72,7 @@ private[spark] object MesosExecutorBackend {
   def main(args: Array[String]) {
     MesosNativeLibrary.load()
     // Create a new Executor and start it running
-    val runner = new MesosExecutorBackend(new Executor)
+    val runner = new MesosExecutorBackend()
     new MesosExecutorDriver(runner).run()
   }
 }
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index 9a82c3054c0fcf3ff6d0ecf5dc36c54d658fdaa0..1047f71c6ae0dc13eaf873049fbfeceaa4d1ff69 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -14,7 +14,6 @@ import spark.scheduler.cluster.RegisterExecutorFailed
 import spark.scheduler.cluster.RegisterExecutor
 
 private[spark] class StandaloneExecutorBackend(
-    executor: Executor,
     driverUrl: String,
     executorId: String,
     hostname: String,
@@ -23,6 +22,7 @@ private[spark] class StandaloneExecutorBackend(
   with ExecutorBackend
   with Logging {
 
+  var executor: Executor = null
   var driver: ActorRef = null
 
   override def preStart() {
@@ -36,7 +36,7 @@ private[spark] class StandaloneExecutorBackend(
   override def receive = {
     case RegisteredExecutor(sparkProperties) =>
       logInfo("Successfully registered with driver")
-      executor.initialize(executorId, hostname, sparkProperties)
+      executor = new Executor(executorId, hostname, sparkProperties)
 
     case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)
@@ -44,7 +44,12 @@ private[spark] class StandaloneExecutorBackend(
 
     case LaunchTask(taskDesc) =>
       logInfo("Got assigned task " + taskDesc.taskId)
-      executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+      if (executor == null) {
+        logError("Received launchTask but executor was null")
+        System.exit(1)
+      } else {
+        executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
+      }
 
     case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
       logError("Driver terminated or disconnected! Shutting down.")
@@ -62,7 +67,7 @@ private[spark] object StandaloneExecutorBackend {
     // before getting started with all our system properties, etc
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
     val actor = actorSystem.actorOf(
-      Props(new StandaloneExecutorBackend(new Executor, driverUrl, executorId, hostname, cores)),
+      Props(new StandaloneExecutorBackend(driverUrl, executorId, hostname, cores)),
       name = "Executor")
     actorSystem.awaitTermination()
   }
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 65b4621b87ed0370a82e8a73769a7b3e28ef33c2..9213513e80914fd7ad67df953e80f04ddc1f07da 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -2,10 +2,11 @@ package spark.rdd
 
 import java.io.{ObjectOutputStream, IOException}
 import java.util.{HashMap => JHashMap}
+
 import scala.collection.JavaConversions
 import scala.collection.mutable.ArrayBuffer
 
-import spark.{Aggregator, Logging, Partitioner, RDD, SparkEnv, Partition, TaskContext}
+import spark.{Aggregator, Logging, Partition, Partitioner, RDD, SparkEnv, TaskContext}
 import spark.{Dependency, OneToOneDependency, ShuffleDependency}
 
 
@@ -28,7 +29,8 @@ private[spark] case class NarrowCoGroupSplitDep(
 private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
 
 private[spark]
-class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Partition with Serializable {
+class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
+  extends Partition with Serializable {
   override val index: Int = idx
   override def hashCode(): Int = idx
 }
@@ -40,7 +42,19 @@ private[spark] class CoGroupAggregator
     { (b1, b2) => b1 ++ b2 })
   with Serializable
 
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+
+/**
+ * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
+ * tuple with the list of values for that key.
+ *
+ * @param rdds parent RDDs.
+ * @param part partitioner used to partition the shuffle output.
+ * @param mapSideCombine flag indicating whether to merge values before shuffle step.
+ */
+class CoGroupedRDD[K](
+  @transient var rdds: Seq[RDD[(K, _)]],
+  part: Partitioner,
+  val mapSideCombine: Boolean = true)
   extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
 
   private val aggr = new CoGroupAggregator
@@ -52,8 +66,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
         new OneToOneDependency(rdd)
       } else {
         logInfo("Adding shuffle dependency with " + rdd)
-        val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
-        new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+        if (mapSideCombine) {
+          val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
+          new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
+        } else {
+          new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part)
+        }
       }
     }
   }
@@ -82,6 +100,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
     val numRdds = split.deps.size
     // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
     val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
+
     def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
       val seq = map.get(k)
       if (seq != null) {
@@ -92,6 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
         seq
       }
     }
+
     for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
       case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
         // Read them from the parent
@@ -102,9 +122,16 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
       case ShuffleCoGroupSplitDep(shuffleId) => {
         // Read map outputs of shuffle
         val fetcher = SparkEnv.get.shuffleFetcher
-        val fetchItr = fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics)
-        for ((k, vs) <- fetchItr) {
-          getSeq(k)(depNum) ++= vs
+        if (mapSideCombine) {
+          // With map side combine on, for each key, the shuffle fetcher returns a list of values.
+          fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics).foreach {
+            case (key, values) => getSeq(key)(depNum) ++= values
+          }
+        } else {
+          // With map side combine off, for each key the shuffle fetcher returns a single value.
+          fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics).foreach {
+            case (key, value) => getSeq(key)(depNum) += value
+          }
         }
       }
     }
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index bcbb472f6c2191f9813b7970210ec902b362d87c..a6178867bc6e07f663bc32bf39619a11f36f0143 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -3,7 +3,7 @@ package spark
 import scala.collection.mutable.HashMap
 import org.scalatest.FunSuite
 import spark.SparkContext._
-import spark.rdd.{CoalescedRDD, PartitionPruningRDD, ShuffledRDD}
+import spark.rdd.{CoalescedRDD, CoGroupedRDD, PartitionPruningRDD, ShuffledRDD}
 
 class RDDSuite extends FunSuite with LocalSparkContext {
 
@@ -123,6 +123,36 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     assert(rdd.collect().toList === List(1, 2, 3, 4))
   }
 
+  test("cogrouped RDDs") {
+    sc = new SparkContext("local", "test")
+    val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
+    val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2)
+
+    // Use cogroup function
+    val cogrouped = rdd1.cogroup(rdd2).collectAsMap()
+    assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1")))
+    assert(cogrouped(2) === (Seq("two"), Seq("two1")))
+    assert(cogrouped(3) === (Seq("three"), Seq()))
+
+    // Construct CoGroupedRDD directly, with map side combine enabled
+    val cogrouped1 = new CoGroupedRDD[Int](
+      Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
+      new HashPartitioner(3),
+      true).collectAsMap()
+    assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
+    assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1")))
+    assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq()))
+
+    // Construct CoGroupedRDD directly, with map side combine disabled
+    val cogrouped2 = new CoGroupedRDD[Int](
+      Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
+      new HashPartitioner(3),
+      false).collectAsMap()
+    assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
+    assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1")))
+    assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq()))
+  }
+
   test("coalesced RDDs") {
     sc = new SparkContext("local", "test")
     val data = sc.parallelize(1 to 10, 10)
@@ -182,4 +212,64 @@ class RDDSuite extends FunSuite with LocalSparkContext {
     assert(prunedData.size === 1)
     assert(prunedData(0) === 10)
   }
+
+  test("mapWith") {
+    import java.util.Random
+    sc = new SparkContext("local", "test")
+    val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+    val randoms = ones.mapWith(
+      (index: Int) => new Random(index + 42))
+      {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
+    val prn42_3 = {
+      val prng42 = new Random(42)
+      prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+    }
+    val prn43_3 = {
+      val prng43 = new Random(43)
+      prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+    }
+    assert(randoms(2) === prn42_3)
+    assert(randoms(5) === prn43_3)
+  }
+
+  test("flatMapWith") {
+    import java.util.Random
+    sc = new SparkContext("local", "test")
+    val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
+    val randoms = ones.flatMapWith(
+      (index: Int) => new Random(index + 42))
+      {(t: Int, prng: Random) =>
+        val random = prng.nextDouble()
+        Seq(random * t, random * t * 10)}.
+      collect()
+    val prn42_3 = {
+      val prng42 = new Random(42)
+      prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
+    }
+    val prn43_3 = {
+      val prng43 = new Random(43)
+      prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
+    }
+    assert(randoms(5) === prn42_3 * 10)
+    assert(randoms(11) === prn43_3 * 10)
+  }
+
+  test("filterWith") {
+    import java.util.Random
+    sc = new SparkContext("local", "test")
+    val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
+    val sample = ints.filterWith(
+      (index: Int) => new Random(index + 42))
+      {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
+      collect()
+    val checkSample = {
+      val prng42 = new Random(42)
+      val prng43 = new Random(43)
+      Array(1, 2, 3, 4, 5, 6).filter{i =>
+	      if (i < 4) 0 == prng42.nextInt(3)
+	      else 0 == prng43.nextInt(3)}
+    }
+    assert(sample.size === checkSample.size)
+    for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
+  }
 }
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 280ead03232adaa7da876b8016d0e779ef5c8a33..f06ab2d5b08ccf70cbca7c6f7a7ba34d0e0b9945 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -90,6 +90,7 @@
                         <li class="dropdown">
                             <a href="api.html" class="dropdown-toggle" data-toggle="dropdown">More<b class="caret"></b></a>
                             <ul class="dropdown-menu">
+                                <li><a href="building-with-maven.html">Building Spark with Maven</a></li>
                                 <li><a href="configuration.html">Configuration</a></li>
                                 <li><a href="tuning.html">Tuning Guide</a></li>
                                 <li><a href="bagel-programming-guide.html">Bagel (Pregel on Spark)</a></li>
diff --git a/docs/building-with-maven.md b/docs/building-with-maven.md
new file mode 100644
index 0000000000000000000000000000000000000000..c2eeafd07af751f4a8768e70b23e60ad0fad6a21
--- /dev/null
+++ b/docs/building-with-maven.md
@@ -0,0 +1,66 @@
+---
+layout: global
+title: Building Spark with Maven
+---
+
+* This will become a table of contents (this text will be scraped).
+{:toc}
+
+Building Spark using Maven Requires Maven 3 (the build process is tested with Maven 3.0.4) and Java 1.6 or newer.
+
+Building with Maven requires that a Hadoop profile be specified explicitly at the command line, there is no default. There are two profiles to choose from, one for building for Hadoop 1 or Hadoop 2.
+
+for Hadoop 1 (using 0.20.205.0) use:
+
+    $ mvn -Phadoop1 clean install
+
+
+for Hadoop 2 (using 2.0.0-mr1-cdh4.1.1) use:
+
+    $ mvn -Phadoop2 clean install
+
+It uses the scala-maven-plugin which supports incremental and continuous compilation. E.g.
+
+    $ mvn -Phadoop2 scala:cc
+
+…should run continuous compilation (i.e. wait for changes). However, this has not been tested extensively.
+
+## Spark Tests in Maven ##
+
+Tests are run by default via the scalatest-maven-plugin. With this you can do things like:
+
+Skip test execution (but not compilation):
+
+    $ mvn -DskipTests -Phadoop2 clean install
+
+To run a specific test suite:
+
+    $ mvn -Phadoop2 -Dsuites=spark.repl.ReplSuite test
+
+
+## Setting up JVM Memory Usage Via Maven ##
+
+You might run into the following errors if you're using a vanilla installation of Maven:
+
+    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+    [ERROR] PermGen space -> [Help 1]
+
+    [INFO] Compiling 203 Scala sources and 9 Java sources to /Users/andyk/Development/spark/core/target/scala-2.9.2/classes...
+    [ERROR] Java heap space -> [Help 1]
+
+To fix these, you can do the following:
+
+    export MAVEN_OPTS="-Xmx1024m -XX:MaxPermSize=128M"
+
+
+## Using With IntelliJ IDEA ##
+
+This setup works fine in IntelliJ IDEA 11.1.4. After opening the project via the pom.xml file in the project root folder, you only need to activate either the hadoop1 or hadoop2 profile in the "Maven Properties" popout. We have not tried Eclipse/Scala IDE with this.
+
+## Building Spark Debian Packages ##
+
+It includes support for building a Debian package containing a 'fat-jar' which includes the repl, the examples and bagel. This can be created by specifying the deb profile:
+
+    $ mvn -Phadoop2,deb clean install
+
+The debian package can then be found under repl/target. We added the short commit hash to the file name so that we can distinguish individual packages build for SNAPSHOT versions.
diff --git a/docs/index.md b/docs/index.md
index 45facd8e63f32494edb5759f2b2f8eb46421c17c..51d505e1fa8c969397d969bc26fddd792f46f4aa 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -22,6 +22,8 @@ Spark uses [Simple Build Tool](https://github.com/harrah/xsbt/wiki), which is bu
 
     sbt/sbt package
 
+Spark also supports building using Maven. If you would like to build using Maven, see the [instructions for building Spark with Maven](building-with-maven.html).
+
 # Testing the Build
 
 Spark comes with a number of sample programs in the `examples` directory.
@@ -72,6 +74,7 @@ of `project/SparkBuild.scala`, then rebuilding Spark (`sbt/sbt clean compile`).
 
 **Other documents:**
 
+* [Building Spark With Maven](building-with-maven.html): Build Spark using the Maven build tool
 * [Configuration](configuration.html): customize Spark via its configuration system
 * [Tuning Guide](tuning.html): best practices to optimize performance and memory use
 * [Bagel](bagel-programming-guide.html): an implementation of Google's Pregel on Spark