diff --git a/conf/java-opts b/conf/java-opts index 2acf4975eab4983cad094d8e9ee152a15a37d064..d753f59de44e673ed99ac6c4f24f93cb83d1dfdf 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.LocalFileShuffle +-Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=false diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala index 89d659d81cbc5071adfb576fdcd7ea1da6428aa0..7b829522bd01ca4178a729f3be71f6dffe583855 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/LocalFileShuffle.scala @@ -60,11 +60,27 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { (myIndex, LocalFileShuffle.serverUri) }).collect() - // Build a hashmap from server URI to list of splits (to facillitate - // fetching all the URIs on a server within a single connection) - val splitsByUri = new HashMap[String, ArrayBuffer[Int]] - for ((inputId, serverUri) <- outputLocs) { - splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId + // Load config option to decide whether or not to use HTTP pipelining + val UseHttpPipelining = + System.getProperty("spark.shuffle.UseHttpPipelining", "false").toBoolean + + // Build a traversable list of pairs of server URI and split. Needs to be + // of type TraversableOnce[(String, ArrayBuffer[Int])] + val splitsByUri = if (UseHttpPipelining) { + // Build a hashmap from server URI to list of splits (to facillitate + // fetching all the URIs on a server within a single connection) + val splitsByUriHM = new HashMap[String, ArrayBuffer[Int]] + for ((inputId, serverUri) <- outputLocs) { + splitsByUriHM.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId + } + splitsByUriHM + } else { + // Don't use HTTP pipelining + val splitsByUriAB = new ArrayBuffer[(String, ArrayBuffer[Int])] + for ((inputId, serverUri) <- outputLocs) { + splitsByUriAB += ((serverUri, new ArrayBuffer[Int] += inputId)) + } + splitsByUriAB } // TODO: Could broadcast splitsByUri @@ -110,7 +126,7 @@ object LocalFileShuffle extends Logging { private var shuffleDir: File = null private var server: HttpServer = null private var serverUri: String = null - + private def initializeIfNeeded() = synchronized { if (!initialized) { // TODO: localDir should be created by some mechanism common to Spark @@ -162,7 +178,7 @@ object LocalFileShuffle extends Logging { logInfo ("Local URI: " + serverUri) } } - + def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() val dir = new File(shuffleDir, shuffleId + "/" + inputId)