From 81f78282e10a852b4fd81ff0d170e4c92fa6d33b Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Sun, 19 Dec 2010 14:32:40 -0800
Subject: [PATCH] All shuffle implementations are now in the same place. Time
 to work on new things.

---
 src/scala/spark/BasicLocalFileShuffle.scala   |  1 -
 .../spark/CustomBlockedLocalFileShuffle.scala |  2 --
 src/scala/spark/DfsShuffle.scala              |  2 --
 .../spark/HttpBlockedLocalFileShuffle.scala   |  7 +++---
 ...ala => HttpParallelLocalFileShuffle.scala} | 22 +++++++++----------
 5 files changed, 13 insertions(+), 21 deletions(-)
 rename src/scala/spark/{ParallelLocalFileShuffle.scala => HttpParallelLocalFileShuffle.scala} (93%)

diff --git a/src/scala/spark/BasicLocalFileShuffle.scala b/src/scala/spark/BasicLocalFileShuffle.scala
index 95160badd4..3c3f132083 100644
--- a/src/scala/spark/BasicLocalFileShuffle.scala
+++ b/src/scala/spark/BasicLocalFileShuffle.scala
@@ -7,7 +7,6 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-
 /**
  * A basic implementation of shuffle using local files served through HTTP.
  *
diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala
index 7f3ac231f9..a28322196b 100644
--- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala
@@ -8,7 +8,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-
 /**
  * An implementation of shuffle using local files served through HTTP where 
  * receivers create simultaneous connections to multiple servers by setting the
@@ -272,7 +271,6 @@ class CustomBlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Loggi
   }     
 }
 
-
 object CustomBlockedLocalFileShuffle extends Logging {
   // Used thoughout the code for small and large waits/timeouts
   private var BlockSize_ = 1024 * 1024
diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala
index 7a42bf2d06..bf91be7d2c 100644
--- a/src/scala/spark/DfsShuffle.scala
+++ b/src/scala/spark/DfsShuffle.scala
@@ -9,7 +9,6 @@ import scala.collection.mutable.HashMap
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
 
-
 /**
  * A simple implementation of shuffle using a distributed file system.
  *
@@ -82,7 +81,6 @@ class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
   }
 }
 
-
 /**
  * Companion object of DfsShuffle; responsible for initializing a Hadoop
  * FileSystem object based on the spark.dfs property and generating names
diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala
index 2b48db6886..cd4e0abcec 100644
--- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala
+++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala
@@ -8,16 +8,15 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-
 /**
  * An implementation of shuffle using local files served through HTTP where 
  * receivers create simultaneous connections to multiple servers by setting the
  * 'spark.blockedLocalFileShuffle.maxConnections' config option.
  *
  * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
- * one can also control the largest block size to divide each map output into.
- * Essentially, instead of creating one large output file for each reducer, maps
- * create multiple smaller files to enable finer level of engagement.
+ * one can also control the largest block size to retrieve by each reducers.
+ * An INDEX file keeps track of block boundaries instead of creating many 
+ * smaller files. 
  *
  * TODO: Add support for compression when spark.compress is set to true.
  */
diff --git a/src/scala/spark/ParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala
similarity index 93%
rename from src/scala/spark/ParallelLocalFileShuffle.scala
rename to src/scala/spark/HttpParallelLocalFileShuffle.scala
index 1b15c27a05..89daccf7a7 100644
--- a/src/scala/spark/ParallelLocalFileShuffle.scala
+++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala
@@ -8,7 +8,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 
-
 /**
  * An implementation of shuffle using local files served through HTTP where 
  * receivers create simultaneous connections to multiple servers by setting the
@@ -17,7 +16,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
  * TODO: Add support for compression when spark.compress is set to true.
  */
 @serializable
-class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
+class HttpParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
   @transient var totalSplits = 0
   @transient var hasSplits = 0
   
@@ -34,7 +33,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
   : RDD[(K, C)] =
   {
     val sc = input.sparkContext
-    val shuffleId = ParallelLocalFileShuffle.newShuffleId()
+    val shuffleId = HttpParallelLocalFileShuffle.newShuffleId()
     logInfo("Shuffle ID: " + shuffleId)
 
     val splitRdd = new NumberedSplitRDD(input)
@@ -59,7 +58,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
       }
       
       for (i <- 0 until numOutputSplits) {
-        val file = ParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
+        val file = HttpParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
         val writeStartTime = System.currentTimeMillis
         logInfo("BEGIN WRITE: " + file)
         val out = new ObjectOutputStream(new FileOutputStream(file))
@@ -70,7 +69,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
         logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.")
       }
       
-      (myIndex, ParallelLocalFileShuffle.serverUri)
+      (myIndex, HttpParallelLocalFileShuffle.serverUri)
     }).collect()
 
     // TODO: Could broadcast outputLocs
@@ -86,12 +85,12 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
       
       combiners = new HashMap[K, C]
       
-      var threadPool = ParallelLocalFileShuffle.newDaemonFixedThreadPool(
-        ParallelLocalFileShuffle.MaxConnections)
+      var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool(
+        HttpParallelLocalFileShuffle.MaxConnections)
         
       while (hasSplits < totalSplits) {
         var numThreadsToCreate =
-          Math.min(totalSplits, ParallelLocalFileShuffle.MaxConnections) -
+          Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxConnections) -
           threadPool.getActiveCount
       
         while (hasSplits < totalSplits && numThreadsToCreate > 0) {
@@ -114,7 +113,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
         }
         
         // Sleep for a while before creating new threads
-        Thread.sleep(ParallelLocalFileShuffle.MinKnockInterval)
+        Thread.sleep(HttpParallelLocalFileShuffle.MinKnockInterval)
       }
       combiners
     })
@@ -132,7 +131,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
     }
     
     if (requiredSplits.size > 0) {
-      requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt(
+      requiredSplits(HttpParallelLocalFileShuffle.ranGen.nextInt(
         requiredSplits.size))
     } else {
       -1
@@ -204,8 +203,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
   }     
 }
 
-
-object ParallelLocalFileShuffle extends Logging {
+object HttpParallelLocalFileShuffle extends Logging {
   // Used thoughout the code for small and large waits/timeouts
   private var MinKnockInterval_ = 1000
   private var MaxKnockInterval_ = 5000
-- 
GitLab