diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 771255e4414b5c39850e06038edce160c91b9b5c..382c39758932314e18a6ea7c0b473af5fa8c4dab 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -99,7 +99,7 @@ extends Shuffle[K, V, C] with Logging { isDirty = true // Close the old file if has crossed the blockSize limit - if (baos.size > CustomBlockedInMemoryShuffle.BlockSize) { + if (baos.size > Shuffle.BlockSize) { CustomBlockedInMemoryShuffle.splitsCache(splitName) = baos.toByteArray @@ -162,12 +162,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = CustomBlockedInMemoryShuffle.newDaemonFixedThreadPool( - CustomBlockedInMemoryShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, CustomBlockedInMemoryShuffle.MaxRxConnections) - + Math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -190,7 +190,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(CustomBlockedInMemoryShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -206,7 +206,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -287,8 +287,7 @@ extends Shuffle[K, V, C] with Logging { } var timeOutTimer = new Timer - timeOutTimer.schedule(timeOutTask, - CustomBlockedLocalFileShuffle.MaxKnockInterval) + timeOutTimer.schedule(timeOutTask, Shuffle.MaxKnockInterval) try { // Everything will break if BLOCKNUM is not correctly received @@ -407,16 +406,6 @@ object CustomBlockedInMemoryShuffle extends Logging { // Cache for keeping the splits around val splitsCache = new HashMap[String, Array[Byte]] - // Used thoughout the code for small and large waits/timeouts - private var BlockSize_ = 1024 * 1024 - - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -432,20 +421,6 @@ object CustomBlockedInMemoryShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - BlockSize_ = System.getProperty( - "spark.shuffle.blockSize", "1024").toInt * 1024 - - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -485,14 +460,6 @@ object CustomBlockedInMemoryShuffle extends Logging { } } - def BlockSize = BlockSize_ - - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def MaxTxConnections = MaxTxConnections_ - def getSplitName(shuffleId: Long, inputId: Int, outputId: Int, blockId: Int): String = { initializeIfNeeded() @@ -535,8 +502,7 @@ object CustomBlockedInMemoryShuffle extends Logging { class ShuffleServer extends Thread with Logging { - var threadPool = - newDaemonFixedThreadPool(CustomBlockedInMemoryShuffle.MaxTxConnections) + var threadPool = newDaemonFixedThreadPool(Shuffle.MaxTxConnections) var serverSocket: ServerSocket = null diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index 220289b007a8ad24313e95e3beb6ce066ab22936..7f03d4a525d362d68afc02a8ff0c6e3b0207fa6b 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -94,7 +94,7 @@ extends Shuffle[K, V, C] with Logging { isDirty = true // Close the old file if has crossed the blockSize limit - if (file.length > CustomBlockedLocalFileShuffle.BlockSize) { + if (file.length > Shuffle.BlockSize) { out.close() logInfo("END WRITE: " + file) val writeTime = System.currentTimeMillis - writeStartTime @@ -148,12 +148,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool( - CustomBlockedLocalFileShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxRxConnections) - + Math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -177,7 +177,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -193,7 +193,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -274,8 +274,7 @@ extends Shuffle[K, V, C] with Logging { } var timeOutTimer = new Timer - timeOutTimer.schedule(timeOutTask, - CustomBlockedLocalFileShuffle.MaxKnockInterval) + timeOutTimer.schedule(timeOutTask, Shuffle.MaxKnockInterval) try { // Everything will break if BLOCKNUM is not correctly received @@ -397,16 +396,6 @@ extends Shuffle[K, V, C] with Logging { } object CustomBlockedLocalFileShuffle extends Logging { - // Used thoughout the code for small and large waits/timeouts - private var BlockSize_ = 1024 * 1024 - - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -422,20 +411,6 @@ object CustomBlockedLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - BlockSize_ = System.getProperty( - "spark.shuffle.blockSize", "1024").toInt * 1024 - - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -475,14 +450,6 @@ object CustomBlockedLocalFileShuffle extends Logging { } } - def BlockSize = BlockSize_ - - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def MaxTxConnections = MaxTxConnections_ - def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int, blockId: Int): File = { initializeIfNeeded() @@ -529,7 +496,7 @@ object CustomBlockedLocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { var threadPool = - newDaemonFixedThreadPool(CustomBlockedLocalFileShuffle.MaxTxConnections) + newDaemonFixedThreadPool(Shuffle.MaxTxConnections) var serverSocket: ServerSocket = null diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala index 4806e14e80e7ab2f0d713859de2b1abd18de20ff..1a47c24b714208a6a1ec482a14e5e7720c26d4a5 100644 --- a/src/scala/spark/CustomParallelInMemoryShuffle.scala +++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala @@ -105,12 +105,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = CustomParallelInMemoryShuffle.newDaemonFixedThreadPool( - CustomParallelInMemoryShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, - CustomParallelInMemoryShuffle.MaxRxConnections) - + Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -134,7 +134,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(CustomParallelInMemoryShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -150,7 +150,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -231,8 +231,7 @@ extends Shuffle[K, V, C] with Logging { } var timeOutTimer = new Timer - timeOutTimer.schedule(timeOutTask, - CustomParallelInMemoryShuffle.MaxKnockInterval) + timeOutTimer.schedule(timeOutTask, Shuffle.MaxKnockInterval) logInfo("ShuffleClient started... => %s:%d#%s".format(hostAddress, listenPort, requestSplit)) @@ -337,14 +336,6 @@ object CustomParallelInMemoryShuffle extends Logging { // Cache for keeping the splits around val splitsCache = new HashMap[String, Array[Byte]] - // Used thoughout the code for small and large waits/timeouts - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -360,17 +351,6 @@ object CustomParallelInMemoryShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -411,12 +391,6 @@ object CustomParallelInMemoryShuffle extends Logging { } } - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def MaxTxConnections = MaxTxConnections_ - def getSplitName(shuffleId: Long, inputId: Int, outputId: Int): String = { initializeIfNeeded() // Adding shuffleDir is unnecessary. Added to keep the parsers working @@ -450,8 +424,7 @@ object CustomParallelInMemoryShuffle extends Logging { class ShuffleServer extends Thread with Logging { - var threadPool = - newDaemonFixedThreadPool(CustomParallelInMemoryShuffle.MaxTxConnections) + var threadPool = newDaemonFixedThreadPool(Shuffle.MaxTxConnections) var serverSocket: ServerSocket = null diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala index 45f629e414d277cab5ba662b4d1897317c6ab19a..68059c0e6783ef8881d5bdd04f8fd89f360fb25b 100644 --- a/src/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -96,12 +96,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool( - CustomParallelLocalFileShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, - CustomParallelLocalFileShuffle.MaxRxConnections) - + Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -125,7 +125,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(CustomParallelLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -141,7 +141,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -222,8 +222,7 @@ extends Shuffle[K, V, C] with Logging { } var timeOutTimer = new Timer - timeOutTimer.schedule(timeOutTask, - CustomParallelLocalFileShuffle.MaxKnockInterval) + timeOutTimer.schedule(timeOutTask, Shuffle.MaxKnockInterval) logInfo("ShuffleClient started... => %s:%d#%s".format(hostAddress, listenPort, requestPath)) @@ -325,14 +324,6 @@ extends Shuffle[K, V, C] with Logging { } object CustomParallelLocalFileShuffle extends Logging { - // Used thoughout the code for small and large waits/timeouts - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -348,17 +339,6 @@ object CustomParallelLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -399,12 +379,6 @@ object CustomParallelLocalFileShuffle extends Logging { } } - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def MaxTxConnections = MaxTxConnections_ - def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() val dir = new File(shuffleDir, shuffleId + "/" + inputId) @@ -441,7 +415,7 @@ object CustomParallelLocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { var threadPool = - newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxTxConnections) + newDaemonFixedThreadPool(Shuffle.MaxTxConnections) var serverSocket: ServerSocket = null diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index e6579fc5aa4e2fec6723e24445a202f43e57e002..171d07eb18e36ba9eb4b079745c30830df2c4db5 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -90,8 +90,7 @@ extends Shuffle[K, V, C] with Logging { indexDirty = true // Update the INDEX file if more than blockSize limit has been written - if (file.length - alreadyWritten > - HttpBlockedLocalFileShuffle.BlockSize) { + if (file.length - alreadyWritten > Shuffle.BlockSize) { indexOut.writeObject(file.length) indexDirty = false alreadyWritten = file.length @@ -132,12 +131,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool( - HttpBlockedLocalFileShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { - var numThreadsToCreate = - Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxRxConnections) - + var numThreadsToCreate = + Math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -160,7 +159,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(HttpBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -176,7 +175,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -357,16 +356,6 @@ extends Shuffle[K, V, C] with Logging { } object HttpBlockedLocalFileShuffle extends Logging { - // Used thoughout the code for small and large waits/timeouts - private var BlockSize_ = 1024 * 1024 - - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -380,20 +369,6 @@ object HttpBlockedLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - BlockSize_ = System.getProperty( - "spark.shuffle.blockSize", "1024").toInt * 1024 - - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -445,14 +420,6 @@ object HttpBlockedLocalFileShuffle extends Logging { } } - def BlockSize = BlockSize_ - - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def MaxTxConnections = MaxTxConnections_ - def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() val dir = new File(shuffleDir, shuffleId + "/" + inputId) diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index b6ddee6d2fc32e8dca46abaaf5a12720000150d6..1a6f65314dc4f30e7772cd4cf650803e02436167 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -93,12 +93,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool( - HttpParallelLocalFileShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxRxConnections) - + Math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -121,7 +121,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(HttpParallelLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -137,7 +137,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -285,13 +285,6 @@ extends Shuffle[K, V, C] with Logging { } object HttpParallelLocalFileShuffle extends Logging { - // Used thoughout the code for small and large waits/timeouts - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -305,15 +298,6 @@ object HttpParallelLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -365,11 +349,6 @@ object HttpParallelLocalFileShuffle extends Logging { } } - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() val dir = new File(shuffleDir, shuffleId + "/" + inputId) diff --git a/src/scala/spark/ManualBlockedLocalFileShuffle.scala b/src/scala/spark/ManualBlockedLocalFileShuffle.scala index 13b90ce40ff2ebd6ab7015822747cb816972ea9d..782593349549eeafcb9cd18ecdab9659095173b4 100644 --- a/src/scala/spark/ManualBlockedLocalFileShuffle.scala +++ b/src/scala/spark/ManualBlockedLocalFileShuffle.scala @@ -91,7 +91,7 @@ extends Shuffle[K, V, C] with Logging { isDirty = true // Close the old file if has crossed the blockSize limit - if (file.length > ManualBlockedLocalFileShuffle.BlockSize) { + if (file.length > Shuffle.BlockSize) { out.close() logInfo("END WRITE: " + file) val writeTime = System.currentTimeMillis - writeStartTime @@ -139,12 +139,12 @@ extends Shuffle[K, V, C] with Logging { receivedData = new LinkedBlockingQueue[(Int, Array[Byte])] combiners = new HashMap[K, C] - var threadPool = ManualBlockedLocalFileShuffle.newDaemonFixedThreadPool( - ManualBlockedLocalFileShuffle.MaxRxConnections) + var threadPool = Shuffle.newDaemonFixedThreadPool( + Shuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, ManualBlockedLocalFileShuffle.MaxRxConnections) - + Math.min(totalSplits, Shuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -167,7 +167,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(ManualBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } threadPool.shutdown() @@ -183,7 +183,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners @@ -349,16 +349,6 @@ extends Shuffle[K, V, C] with Logging { } object ManualBlockedLocalFileShuffle extends Logging { - // Used thoughout the code for small and large waits/timeouts - private var BlockSize_ = 1024 * 1024 - - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -372,20 +362,6 @@ object ManualBlockedLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - BlockSize_ = System.getProperty( - "spark.shuffle.blockSize", "1024").toInt * 1024 - - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -437,14 +413,6 @@ object ManualBlockedLocalFileShuffle extends Logging { } } - def BlockSize = BlockSize_ - - def MinKnockInterval = MinKnockInterval_ - def MaxKnockInterval = MaxKnockInterval_ - - def MaxRxConnections = MaxRxConnections_ - def MaxTxConnections = MaxTxConnections_ - def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int, blockId: Int): File = { initializeIfNeeded() diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index 936e8bee48fa7a2bf86176239377e0c12640bfd7..f84d2e3ae31d8b8b9a7a01c4bb7217560257f196 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -195,7 +195,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners diff --git a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala index 54b4e8e13006302276e469cc843fda611ac947b7..1a58155a85847419e6fba243183e61a5fdddc86b 100644 --- a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala @@ -142,7 +142,7 @@ extends Shuffle[K, V, C] with Logging { // Don't return until consumption is finished // TODO: Replace with a lock later. while (receivedData.size > 0) { - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(Shuffle.MinKnockInterval) } combiners