From d9ea6d69a5171d397bb9d79c9dc4517f292f6a44 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sat, 6 Nov 2010 10:53:57 -0700
Subject: [PATCH] Create output files one by one instead of at the same time in
 the map phase of DfsShuffle.

---
 src/scala/spark/DfsShuffle.scala | 23 +++++++++++------------
 1 file changed, 11 insertions(+), 12 deletions(-)

diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala
index 256bf4ea9c..2ef0321a63 100644
--- a/src/scala/spark/DfsShuffle.scala
+++ b/src/scala/spark/DfsShuffle.scala
@@ -38,26 +38,25 @@ extends Logging
     numberedSplitRdd.foreach((pair: (Int, Iterator[(K, V)])) => {
       val myIndex = pair._1
       val myIterator = pair._2
-      val combiners = new HashMap[K, C]
+      val buckets = Array.tabulate(numOutputSplits)(_ => new HashMap[K, C])
       for ((k, v) <- myIterator) {
-        combiners(k) = combiners.get(k) match {
+        var bucketId = k.hashCode % numOutputSplits
+        if (bucketId < 0) { // Fix bucket ID if hash code was negative
+          bucketId += numOutputSplits
+        }
+        val bucket = buckets(bucketId)
+        bucket(k) = bucket.get(k) match {
           case Some(c) => mergeValue(c, v)
           case None => createCombiner(v)
         }
       }
       val fs = DfsShuffle.getFileSystem()
-      val outputStreams = (0 until numOutputSplits).map(i => {
+      for (i <- 0 until numOutputSplits) {
         val path = new Path(dir, "%d-to-%d".format(myIndex, i))
-        new ObjectOutputStream(fs.create(path, true))
-      }).toArray
-      for ((k, c) <- combiners) {
-        var bucket = k.hashCode % numOutputSplits
-        if (bucket < 0) {
-          bucket += numOutputSplits
-        }
-        outputStreams(bucket).writeObject((k, c))
+        val out = new ObjectOutputStream(fs.create(path, true))
+        buckets(i).foreach(pair => out.writeObject(pair))
+        out.close()
       }
-      outputStreams.foreach(_.close())
     })
 
     // Return an RDD that does each of the merges for a given partition
-- 
GitLab