Skip to content
Snippets Groups Projects
Commit ca37e7b3 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Renamed CustomParallelLocalFileShuffle

parent c6df327d
No related branches found
No related tags found
No related merge requests found
-Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true
-Dspark.shuffle.class=spark.CustomParallelLocalFileShuffle
......@@ -9,12 +9,14 @@ import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* A simple implementation of shuffle using local files served through HTTP.
* An implementation of shuffle using local files served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.parallelLocalFileShuffle.maxConnections' config option.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
@serializable
class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
class CustomParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
@transient var totalSplits = 0
@transient var hasSplits = 0
@transient var hasSplitsBitVector: BitSet = null
......@@ -30,7 +32,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
: RDD[(K, C)] =
{
val sc = input.sparkContext
val shuffleId = LocalFileShuffle.newShuffleId()
val shuffleId = CustomParallelLocalFileShuffle.newShuffleId()
logInfo("Shuffle ID: " + shuffleId)
val splitRdd = new NumberedSplitRDD(input)
......@@ -55,7 +57,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
for (i <- 0 until numOutputSplits) {
val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val file = CustomParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val writeStartTime = System.currentTimeMillis
logInfo ("BEGIN WRITE: " + file)
val out = new ObjectOutputStream(new FileOutputStream(file))
......@@ -65,7 +67,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
val writeTime = (System.currentTimeMillis - writeStartTime)
logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.")
}
(myIndex, LocalFileShuffle.serverAddress, LocalFileShuffle.serverPort)
(myIndex, CustomParallelLocalFileShuffle.serverAddress, CustomParallelLocalFileShuffle.serverPort)
}).collect()
val splitsByUri = new ArrayBuffer[(String, Int, Int)]
......@@ -85,11 +87,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C]
var threadPool =
LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxConnections)
CustomParallelLocalFileShuffle.newDaemonFixedThreadPool (CustomParallelLocalFileShuffle.MaxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
Math.min (totalSplits, LocalFileShuffle.MaxConnections) -
Math.min (totalSplits, CustomParallelLocalFileShuffle.MaxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
......@@ -113,7 +115,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
// Sleep for a while before creating new threads
Thread.sleep (LocalFileShuffle.MinKnockInterval)
Thread.sleep (CustomParallelLocalFileShuffle.MinKnockInterval)
}
threadPool.shutdown
......@@ -133,7 +135,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
if (requiredSplits.size > 0) {
requiredSplits(LocalFileShuffle.ranGen.nextInt (requiredSplits.size))
requiredSplits(CustomParallelLocalFileShuffle.ranGen.nextInt (requiredSplits.size))
} else {
-1
}
......@@ -160,7 +162,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
var timeOutTimer = new Timer
timeOutTimer.schedule (timeOutTask, LocalFileShuffle.MaxKnockInterval)
timeOutTimer.schedule (timeOutTask, CustomParallelLocalFileShuffle.MaxKnockInterval)
logInfo ("ShuffleClient started... => %s:%d#%s".format(hostAddress, listenPort, requestPath))
......@@ -254,7 +256,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
}
object LocalFileShuffle extends Logging {
object CustomParallelLocalFileShuffle extends Logging {
// Used thoughout the code for small and large waits/timeouts
private var MinKnockInterval_ = 1000
private var MaxKnockInterval_ = 5000
......@@ -279,12 +281,12 @@ object LocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
MinKnockInterval_ =
System.getProperty ("spark.shuffle.MinKnockInterval", "1000").toInt
System.getProperty ("spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt
MaxKnockInterval_ =
System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt
System.getProperty ("spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt
MaxConnections_ =
System.getProperty ("spark.shuffle.MaxConnections", "4").toInt
System.getProperty ("spark.parallelLocalFileShuffle.MaxConnections", "4").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......@@ -366,7 +368,7 @@ object LocalFileShuffle extends Logging {
class ShuffleServer
extends Thread with Logging {
var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxConnections)
var threadPool = newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxConnections)
var serverSocket: ServerSocket = null
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment