diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index f3d20c9f7e781d52971a594556b66388a2de31bf..319a9d360cf36e75e2539fdf8fd2b95beba65796 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.blockedLocalFileShuffle.maxConnections' config option. + * 'spark.blockedLocalFileShuffle.maxRxConnections' config option. * * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option * one can also control the largest block size to divide each map output into. @@ -138,11 +138,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool( - CustomBlockedLocalFileShuffle.MaxConnections) + CustomBlockedLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -282,7 +282,7 @@ object CustomBlockedLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -306,8 +306,8 @@ object CustomBlockedLocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty( "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -365,7 +365,7 @@ object CustomBlockedLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int, blockId: Int): File = { diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala index a98b14fa5e51afabb99f215d5032cc844c5e871b..1b5e4ef9292dfcac2031fd8056e8774f752f2c44 100644 --- a/src/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through custom server * where receivers create simultaneous connections to multiple servers by - * setting the 'spark.parallelLocalFileShuffle.maxConnections' config option. + * setting the 'spark.parallelLocalFileShuffle.maxRxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -90,11 +90,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool( - CustomParallelLocalFileShuffle.MaxConnections) + CustomParallelLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, - CustomParallelLocalFileShuffle.MaxConnections) - + CustomParallelLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -267,7 +267,8 @@ object CustomParallelLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 + private var MaxTxConnections_ = 8 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -286,12 +287,14 @@ object CustomParallelLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters MinKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt + "spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt + "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.MaxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt + MaxTxConnections_ = System.getProperty( + "spark.parallelLocalFileShuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -336,7 +339,8 @@ object CustomParallelLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ + def MaxTxConnections = MaxTxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() @@ -374,7 +378,7 @@ object CustomParallelLocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { var threadPool = - newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxConnections) + newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxTxConnections) var serverSocket: ServerSocket = null diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index be26dabc5f1fdbbad697c96a3ae630b19e9d38f1..cc927b6e9e203d8ea42feb5c48e3750ff56b76b5 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.blockedLocalFileShuffle.maxConnections' config option. + * 'spark.blockedLocalFileShuffle.maxRxConnections' config option. * * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option * one can also control the largest block size to retrieve by each reducers. @@ -132,11 +132,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool( - HttpBlockedLocalFileShuffle.MaxConnections) + HttpBlockedLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -304,7 +304,7 @@ object HttpBlockedLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -328,8 +328,8 @@ object HttpBlockedLocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty( "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -387,7 +387,7 @@ object HttpBlockedLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index b9baeaee30a2ed3386017d0b9c713a4e50ad85fa..935c9717b312a6799dcabc579f0fa7e88bb51478 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.parallelLocalFileShuffle.maxConnections' config option. + * 'spark.parallelLocalFileShuffle.maxRxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -88,11 +88,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool( - HttpParallelLocalFileShuffle.MaxConnections) + HttpParallelLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -213,7 +213,7 @@ object HttpParallelLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -234,8 +234,8 @@ object HttpParallelLocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty( "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -291,7 +291,7 @@ object HttpParallelLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded()