Skip to content
Snippets Groups Projects
Commit 81f78282 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

All shuffle implementations are now in the same place. Time to work on new things.

parent 272c72b4
No related branches found
No related tags found
No related merge requests found
......@@ -7,7 +7,6 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* A basic implementation of shuffle using local files served through HTTP.
*
......
......@@ -8,7 +8,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
......@@ -272,7 +271,6 @@ class CustomBlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Loggi
}
}
object CustomBlockedLocalFileShuffle extends Logging {
// Used thoughout the code for small and large waits/timeouts
private var BlockSize_ = 1024 * 1024
......
......@@ -9,7 +9,6 @@ import scala.collection.mutable.HashMap
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
/**
* A simple implementation of shuffle using a distributed file system.
*
......@@ -82,7 +81,6 @@ class DfsShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
}
/**
* Companion object of DfsShuffle; responsible for initializing a Hadoop
* FileSystem object based on the spark.dfs property and generating names
......
......@@ -8,16 +8,15 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.blockedLocalFileShuffle.maxConnections' config option.
*
* By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
* one can also control the largest block size to divide each map output into.
* Essentially, instead of creating one large output file for each reducer, maps
* create multiple smaller files to enable finer level of engagement.
* one can also control the largest block size to retrieve by each reducers.
* An INDEX file keeps track of block boundaries instead of creating many
* smaller files.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......
......@@ -8,7 +8,6 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
......@@ -17,7 +16,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
* TODO: Add support for compression when spark.compress is set to true.
*/
@serializable
class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
class HttpParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
@transient var totalSplits = 0
@transient var hasSplits = 0
......@@ -34,7 +33,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
: RDD[(K, C)] =
{
val sc = input.sparkContext
val shuffleId = ParallelLocalFileShuffle.newShuffleId()
val shuffleId = HttpParallelLocalFileShuffle.newShuffleId()
logInfo("Shuffle ID: " + shuffleId)
val splitRdd = new NumberedSplitRDD(input)
......@@ -59,7 +58,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
for (i <- 0 until numOutputSplits) {
val file = ParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val file = HttpParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val writeStartTime = System.currentTimeMillis
logInfo("BEGIN WRITE: " + file)
val out = new ObjectOutputStream(new FileOutputStream(file))
......@@ -70,7 +69,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.")
}
(myIndex, ParallelLocalFileShuffle.serverUri)
(myIndex, HttpParallelLocalFileShuffle.serverUri)
}).collect()
// TODO: Could broadcast outputLocs
......@@ -86,12 +85,12 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C]
var threadPool = ParallelLocalFileShuffle.newDaemonFixedThreadPool(
ParallelLocalFileShuffle.MaxConnections)
var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool(
HttpParallelLocalFileShuffle.MaxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
Math.min(totalSplits, ParallelLocalFileShuffle.MaxConnections) -
Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
......@@ -114,7 +113,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
// Sleep for a while before creating new threads
Thread.sleep(ParallelLocalFileShuffle.MinKnockInterval)
Thread.sleep(HttpParallelLocalFileShuffle.MinKnockInterval)
}
combiners
})
......@@ -132,7 +131,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
if (requiredSplits.size > 0) {
requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt(
requiredSplits(HttpParallelLocalFileShuffle.ranGen.nextInt(
requiredSplits.size))
} else {
-1
......@@ -204,8 +203,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
}
object ParallelLocalFileShuffle extends Logging {
object HttpParallelLocalFileShuffle extends Logging {
// Used thoughout the code for small and large waits/timeouts
private var MinKnockInterval_ = 1000
private var MaxKnockInterval_ = 5000
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment