From f47fb44479da45a00cad57031163b42e845685cf Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> Date: Tue, 21 Dec 2010 17:34:51 -0800 Subject: [PATCH] - Divided maxConnections to max[Rx|Tx]Connections. - Fixed config param loading bug in CustomParallelLFS --- .../spark/CustomBlockedLocalFileShuffle.scala | 14 +++++------ .../CustomParallelLocalFileShuffle.scala | 24 +++++++++++-------- .../spark/HttpBlockedLocalFileShuffle.scala | 14 +++++------ .../spark/HttpParallelLocalFileShuffle.scala | 14 +++++------ 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index f3d20c9f7e..319a9d360c 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.blockedLocalFileShuffle.maxConnections' config option. + * 'spark.blockedLocalFileShuffle.maxRxConnections' config option. * * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option * one can also control the largest block size to divide each map output into. @@ -138,11 +138,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool( - CustomBlockedLocalFileShuffle.MaxConnections) + CustomBlockedLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -282,7 +282,7 @@ object CustomBlockedLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -306,8 +306,8 @@ object CustomBlockedLocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty( "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -365,7 +365,7 @@ object CustomBlockedLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int, blockId: Int): File = { diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala index a98b14fa5e..1b5e4ef929 100644 --- a/src/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through custom server * where receivers create simultaneous connections to multiple servers by - * setting the 'spark.parallelLocalFileShuffle.maxConnections' config option. + * setting the 'spark.parallelLocalFileShuffle.maxRxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -90,11 +90,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool( - CustomParallelLocalFileShuffle.MaxConnections) + CustomParallelLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = Math.min(totalSplits, - CustomParallelLocalFileShuffle.MaxConnections) - + CustomParallelLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -267,7 +267,8 @@ object CustomParallelLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 + private var MaxTxConnections_ = 8 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -286,12 +287,14 @@ object CustomParallelLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters MinKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt + "spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt + "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.MaxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt + MaxTxConnections_ = System.getProperty( + "spark.parallelLocalFileShuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -336,7 +339,8 @@ object CustomParallelLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ + def MaxTxConnections = MaxTxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() @@ -374,7 +378,7 @@ object CustomParallelLocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { var threadPool = - newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxConnections) + newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxTxConnections) var serverSocket: ServerSocket = null diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index be26dabc5f..cc927b6e9e 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.blockedLocalFileShuffle.maxConnections' config option. + * 'spark.blockedLocalFileShuffle.maxRxConnections' config option. * * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option * one can also control the largest block size to retrieve by each reducers. @@ -132,11 +132,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool( - HttpBlockedLocalFileShuffle.MaxConnections) + HttpBlockedLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -304,7 +304,7 @@ object HttpBlockedLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -328,8 +328,8 @@ object HttpBlockedLocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty( "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -387,7 +387,7 @@ object HttpBlockedLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index b9baeaee30..935c9717b3 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.parallelLocalFileShuffle.maxConnections' config option. + * 'spark.parallelLocalFileShuffle.maxRxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -88,11 +88,11 @@ extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool( - HttpParallelLocalFileShuffle.MaxConnections) + HttpParallelLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -213,7 +213,7 @@ object HttpParallelLocalFileShuffle extends Logging { private var MaxKnockInterval_ = 5000 // Maximum number of connections - private var MaxConnections_ = 4 + private var MaxRxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -234,8 +234,8 @@ object HttpParallelLocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty( "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxConnections", "4").toInt + MaxRxConnections_ = System.getProperty( + "spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -291,7 +291,7 @@ object HttpParallelLocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxConnections = MaxConnections_ + def MaxRxConnections = MaxRxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() -- GitLab