diff --git a/conf/java-opts b/conf/java-opts index 02ea42776014fa2490eea8edcb6bfb1620f960ac..997be5c9e80dc454b2d4bf39a598b6a54729d681 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true +-Dspark.shuffle.class=spark.CustomParallelLocalFileShuffle diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala similarity index 91% rename from src/scala/spark/LocalFileShuffle.scala rename to src/scala/spark/CustomParallelLocalFileShuffle.scala index a26bb08dbb2278f4eb22935f7e2f1080d3f4a07c..8f5fad4a85fb57c738fd977849fb6f418ef53b73 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -9,12 +9,14 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} /** - * A simple implementation of shuffle using local files served through HTTP. + * An implementation of shuffle using local files served through custom server + * where receivers create simultaneous connections to multiple servers by + * setting the 'spark.parallelLocalFileShuffle.maxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @serializable -class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { +class CustomParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { @transient var totalSplits = 0 @transient var hasSplits = 0 @transient var hasSplitsBitVector: BitSet = null @@ -30,7 +32,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { : RDD[(K, C)] = { val sc = input.sparkContext - val shuffleId = LocalFileShuffle.newShuffleId() + val shuffleId = CustomParallelLocalFileShuffle.newShuffleId() logInfo("Shuffle ID: " + shuffleId) val splitRdd = new NumberedSplitRDD(input) @@ -55,7 +57,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } for (i <- 0 until numOutputSplits) { - val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i) + val file = CustomParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i) val writeStartTime = System.currentTimeMillis logInfo ("BEGIN WRITE: " + file) val out = new ObjectOutputStream(new FileOutputStream(file)) @@ -65,7 +67,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val writeTime = (System.currentTimeMillis - writeStartTime) logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") } - (myIndex, LocalFileShuffle.serverAddress, LocalFileShuffle.serverPort) + (myIndex, CustomParallelLocalFileShuffle.serverAddress, CustomParallelLocalFileShuffle.serverPort) }).collect() val splitsByUri = new ArrayBuffer[(String, Int, Int)] @@ -85,11 +87,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = - LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxConnections) + CustomParallelLocalFileShuffle.newDaemonFixedThreadPool (CustomParallelLocalFileShuffle.MaxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min (totalSplits, LocalFileShuffle.MaxConnections) - + Math.min (totalSplits, CustomParallelLocalFileShuffle.MaxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -113,7 +115,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep (LocalFileShuffle.MinKnockInterval) + Thread.sleep (CustomParallelLocalFileShuffle.MinKnockInterval) } threadPool.shutdown @@ -133,7 +135,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } if (requiredSplits.size > 0) { - requiredSplits(LocalFileShuffle.ranGen.nextInt (requiredSplits.size)) + requiredSplits(CustomParallelLocalFileShuffle.ranGen.nextInt (requiredSplits.size)) } else { -1 } @@ -160,7 +162,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } var timeOutTimer = new Timer - timeOutTimer.schedule (timeOutTask, LocalFileShuffle.MaxKnockInterval) + timeOutTimer.schedule (timeOutTask, CustomParallelLocalFileShuffle.MaxKnockInterval) logInfo ("ShuffleClient started... => %s:%d#%s".format(hostAddress, listenPort, requestPath)) @@ -254,7 +256,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } } -object LocalFileShuffle extends Logging { +object CustomParallelLocalFileShuffle extends Logging { // Used thoughout the code for small and large waits/timeouts private var MinKnockInterval_ = 1000 private var MaxKnockInterval_ = 5000 @@ -279,12 +281,12 @@ object LocalFileShuffle extends Logging { if (!initialized) { // Load config parameters MinKnockInterval_ = - System.getProperty ("spark.shuffle.MinKnockInterval", "1000").toInt + System.getProperty ("spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt MaxKnockInterval_ = - System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt + System.getProperty ("spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt MaxConnections_ = - System.getProperty ("spark.shuffle.MaxConnections", "4").toInt + System.getProperty ("spark.parallelLocalFileShuffle.MaxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -366,7 +368,7 @@ object LocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { - var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxConnections) + var threadPool = newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxConnections) var serverSocket: ServerSocket = null