diff --git a/conf/java-opts b/conf/java-opts index d753f59de44e673ed99ac6c4f24f93cb83d1dfdf..02ea42776014fa2490eea8edcb6bfb1620f960ac 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=false +-Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala index 7b829522bd01ca4178a729f3be71f6dffe583855..03b6931f0a0330dcf8e35cecbba6776dafb2839c 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/LocalFileShuffle.scala @@ -62,7 +62,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { // Load config option to decide whether or not to use HTTP pipelining val UseHttpPipelining = - System.getProperty("spark.shuffle.UseHttpPipelining", "false").toBoolean + System.getProperty("spark.shuffle.UseHttpPipelining", "true").toBoolean // Build a traversable list of pairs of server URI and split. Needs to be // of type TraversableOnce[(String, ArrayBuffer[Int])] @@ -94,6 +94,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId) val readStartTime = System.currentTimeMillis logInfo ("BEGIN READ: " + url) + // TODO: Insert data transfer code before this place val inputStream = new ObjectInputStream(new URL(url).openStream()) try { while (true) {