From 0de859fbe2528330d7f96b5710d4fa77742d4702 Mon Sep 17 00:00:00 2001
From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)>
Date: Thu, 2 Dec 2010 02:32:44 -0800
Subject: [PATCH] Enabling/disabling HTTP pipelining is a config option now.
 Performance tradeoffs are not obvious yet.

---
 conf/java-opts                         |  2 +-
 src/scala/spark/LocalFileShuffle.scala | 30 ++++++++++++++++++++------
 2 files changed, 24 insertions(+), 8 deletions(-)

diff --git a/conf/java-opts b/conf/java-opts
index 2acf4975ea..d753f59de4 100644
--- a/conf/java-opts
+++ b/conf/java-opts
@@ -1 +1 @@
--Dspark.shuffle.class=spark.LocalFileShuffle
+-Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=false
diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala
index 89d659d81c..7b829522bd 100644
--- a/src/scala/spark/LocalFileShuffle.scala
+++ b/src/scala/spark/LocalFileShuffle.scala
@@ -60,11 +60,27 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
       (myIndex, LocalFileShuffle.serverUri)
     }).collect()
 
-    // Build a hashmap from server URI to list of splits (to facillitate
-    // fetching all the URIs on a server within a single connection)
-    val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
-    for ((inputId, serverUri) <- outputLocs) {
-      splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId
+    // Load config option to decide whether or not to use HTTP pipelining
+    val UseHttpPipelining = 
+        System.getProperty("spark.shuffle.UseHttpPipelining", "false").toBoolean
+
+    // Build a traversable list of pairs of server URI and split. Needs to be 
+    // of type TraversableOnce[(String, ArrayBuffer[Int])]
+    val splitsByUri = if (UseHttpPipelining) {
+      // Build a hashmap from server URI to list of splits (to facillitate
+      // fetching all the URIs on a server within a single connection)
+      val splitsByUriHM = new HashMap[String, ArrayBuffer[Int]]
+      for ((inputId, serverUri) <- outputLocs) {
+        splitsByUriHM.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId
+      }
+      splitsByUriHM
+    } else {
+      // Don't use HTTP pipelining
+      val splitsByUriAB = new ArrayBuffer[(String, ArrayBuffer[Int])]
+      for ((inputId, serverUri) <- outputLocs) {
+        splitsByUriAB += ((serverUri, new ArrayBuffer[Int] += inputId))
+      }
+      splitsByUriAB
     }
 
     // TODO: Could broadcast splitsByUri
@@ -110,7 +126,7 @@ object LocalFileShuffle extends Logging {
   private var shuffleDir: File = null
   private var server: HttpServer = null
   private var serverUri: String = null
-
+  
   private def initializeIfNeeded() = synchronized {
     if (!initialized) {
       // TODO: localDir should be created by some mechanism common to Spark
@@ -162,7 +178,7 @@ object LocalFileShuffle extends Logging {
       logInfo ("Local URI: " + serverUri)
     }
   }
-
+  
   def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
     initializeIfNeeded()
     val dir = new File(shuffleDir, shuffleId + "/" + inputId)
-- 
GitLab