diff --git a/src/scala/spark/BasicLocalFileShuffle.scala b/src/scala/spark/BasicLocalFileShuffle.scala index 95160badd49eef5e20f413c882a7c2ff14451f73..3c3f132083726d04f5e978dbcec119ccbe062a21 100644 --- a/src/scala/spark/BasicLocalFileShuffle.scala +++ b/src/scala/spark/BasicLocalFileShuffle.scala @@ -7,7 +7,6 @@ import java.util.concurrent.atomic.AtomicLong import scala.collection.mutable.{ArrayBuffer, HashMap} - /** * A basic implementation of shuffle using local files served through HTTP. * diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index 7f3ac231f9b8518fa850f7874bfcead52db9fa76..a28322196bcf276951df427c095691ed9ef10862 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -8,7 +8,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} - /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the @@ -272,7 +271,6 @@ class CustomBlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Loggi } } - object CustomBlockedLocalFileShuffle extends Logging { // Used thoughout the code for small and large waits/timeouts private var BlockSize_ = 1024 * 1024 diff --git a/src/scala/spark/DfsShuffle.scala b/src/scala/spark/DfsShuffle.scala index 7a42bf2d06f624b40ad7d9c17ba01e9849524772..bf91be7d2cb86f020d5097070ed5ee412f953a7b 100644 --- a/src/scala/spark/DfsShuffle.scala +++ b/src/scala/spark/DfsShuffle.scala @@ -9,7 +9,6 @@ import scala.collection.mutable.HashMap import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem} - /** * A simple implementation of shuffle using a distributed file system. * @@ -82,7 +81,6 @@ class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } } - /** * Companion object of DfsShuffle; responsible for initializing a Hadoop * FileSystem object based on the spark.dfs property and generating names diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index 2b48db68867fdeb220eeab50441ec547f51f679f..cd4e0abceca3635e651d6eaee2db24ce7f3e31c4 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -8,16 +8,15 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} - /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the * 'spark.blockedLocalFileShuffle.maxConnections' config option. * * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option - * one can also control the largest block size to divide each map output into. - * Essentially, instead of creating one large output file for each reducer, maps - * create multiple smaller files to enable finer level of engagement. + * one can also control the largest block size to retrieve by each reducers. + * An INDEX file keeps track of block boundaries instead of creating many + * smaller files. * * TODO: Add support for compression when spark.compress is set to true. */ diff --git a/src/scala/spark/ParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala similarity index 93% rename from src/scala/spark/ParallelLocalFileShuffle.scala rename to src/scala/spark/HttpParallelLocalFileShuffle.scala index 1b15c27a05219d0687670e0bf128e214fb8660f1..89daccf7a732a6b5123eb792cf49745f28323e0c 100644 --- a/src/scala/spark/ParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -8,7 +8,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory} import scala.collection.mutable.{ArrayBuffer, HashMap} - /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the @@ -17,7 +16,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * TODO: Add support for compression when spark.compress is set to true. */ @serializable -class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { +class HttpParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { @transient var totalSplits = 0 @transient var hasSplits = 0 @@ -34,7 +33,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { : RDD[(K, C)] = { val sc = input.sparkContext - val shuffleId = ParallelLocalFileShuffle.newShuffleId() + val shuffleId = HttpParallelLocalFileShuffle.newShuffleId() logInfo("Shuffle ID: " + shuffleId) val splitRdd = new NumberedSplitRDD(input) @@ -59,7 +58,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } for (i <- 0 until numOutputSplits) { - val file = ParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i) + val file = HttpParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i) val writeStartTime = System.currentTimeMillis logInfo("BEGIN WRITE: " + file) val out = new ObjectOutputStream(new FileOutputStream(file)) @@ -70,7 +69,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") } - (myIndex, ParallelLocalFileShuffle.serverUri) + (myIndex, HttpParallelLocalFileShuffle.serverUri) }).collect() // TODO: Could broadcast outputLocs @@ -86,12 +85,12 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] - var threadPool = ParallelLocalFileShuffle.newDaemonFixedThreadPool( - ParallelLocalFileShuffle.MaxConnections) + var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool( + HttpParallelLocalFileShuffle.MaxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, ParallelLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -114,7 +113,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(ParallelLocalFileShuffle.MinKnockInterval) + Thread.sleep(HttpParallelLocalFileShuffle.MinKnockInterval) } combiners }) @@ -132,7 +131,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } if (requiredSplits.size > 0) { - requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt( + requiredSplits(HttpParallelLocalFileShuffle.ranGen.nextInt( requiredSplits.size)) } else { -1 @@ -204,8 +203,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } } - -object ParallelLocalFileShuffle extends Logging { +object HttpParallelLocalFileShuffle extends Logging { // Used thoughout the code for small and large waits/timeouts private var MinKnockInterval_ = 1000 private var MaxKnockInterval_ = 5000