Skip to content
Snippets Groups Projects
Commit f47fb444 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- Divided maxConnections to max[Rx|Tx]Connections.

 - Fixed config param loading bug in CustomParallelLFS
parent d92b0673
No related branches found
No related tags found
No related merge requests found
......@@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.blockedLocalFileShuffle.maxConnections' config option.
* 'spark.blockedLocalFileShuffle.maxRxConnections' config option.
*
* By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
* one can also control the largest block size to divide each map output into.
......@@ -138,11 +138,11 @@ extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C]
var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool(
CustomBlockedLocalFileShuffle.MaxConnections)
CustomBlockedLocalFileShuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxConnections) -
Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
......@@ -282,7 +282,7 @@ object CustomBlockedLocalFileShuffle extends Logging {
private var MaxKnockInterval_ = 5000
// Maximum number of connections
private var MaxConnections_ = 4
private var MaxRxConnections_ = 4
private var initialized = false
private var nextShuffleId = new AtomicLong(0)
......@@ -306,8 +306,8 @@ object CustomBlockedLocalFileShuffle extends Logging {
MaxKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt
MaxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxConnections", "4").toInt
MaxRxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......@@ -365,7 +365,7 @@ object CustomBlockedLocalFileShuffle extends Logging {
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
def MaxConnections = MaxConnections_
def MaxRxConnections = MaxRxConnections_
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int,
blockId: Int): File = {
......
......@@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.parallelLocalFileShuffle.maxConnections' config option.
* setting the 'spark.parallelLocalFileShuffle.maxRxConnections' config option.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -90,11 +90,11 @@ extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C]
var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool(
CustomParallelLocalFileShuffle.MaxConnections)
CustomParallelLocalFileShuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate = Math.min(totalSplits,
CustomParallelLocalFileShuffle.MaxConnections) -
CustomParallelLocalFileShuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
......@@ -267,7 +267,8 @@ object CustomParallelLocalFileShuffle extends Logging {
private var MaxKnockInterval_ = 5000
// Maximum number of connections
private var MaxConnections_ = 4
private var MaxRxConnections_ = 4
private var MaxTxConnections_ = 8
private var initialized = false
private var nextShuffleId = new AtomicLong(0)
......@@ -286,12 +287,14 @@ object CustomParallelLocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
MinKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt
"spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt
"spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt
MaxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.MaxConnections", "4").toInt
MaxRxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......@@ -336,7 +339,8 @@ object CustomParallelLocalFileShuffle extends Logging {
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
def MaxConnections = MaxConnections_
def MaxRxConnections = MaxRxConnections_
def MaxTxConnections = MaxTxConnections_
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
initializeIfNeeded()
......@@ -374,7 +378,7 @@ object CustomParallelLocalFileShuffle extends Logging {
class ShuffleServer
extends Thread with Logging {
var threadPool =
newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxConnections)
newDaemonFixedThreadPool(CustomParallelLocalFileShuffle.MaxTxConnections)
var serverSocket: ServerSocket = null
......
......@@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.blockedLocalFileShuffle.maxConnections' config option.
* 'spark.blockedLocalFileShuffle.maxRxConnections' config option.
*
* By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
* one can also control the largest block size to retrieve by each reducers.
......@@ -132,11 +132,11 @@ extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C]
var threadPool = HttpBlockedLocalFileShuffle.newDaemonFixedThreadPool(
HttpBlockedLocalFileShuffle.MaxConnections)
HttpBlockedLocalFileShuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxConnections) -
Math.min(totalSplits, HttpBlockedLocalFileShuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
......@@ -304,7 +304,7 @@ object HttpBlockedLocalFileShuffle extends Logging {
private var MaxKnockInterval_ = 5000
// Maximum number of connections
private var MaxConnections_ = 4
private var MaxRxConnections_ = 4
private var initialized = false
private var nextShuffleId = new AtomicLong(0)
......@@ -328,8 +328,8 @@ object HttpBlockedLocalFileShuffle extends Logging {
MaxKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt
MaxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxConnections", "4").toInt
MaxRxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......@@ -387,7 +387,7 @@ object HttpBlockedLocalFileShuffle extends Logging {
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
def MaxConnections = MaxConnections_
def MaxRxConnections = MaxRxConnections_
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
initializeIfNeeded()
......
......@@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.parallelLocalFileShuffle.maxConnections' config option.
* 'spark.parallelLocalFileShuffle.maxRxConnections' config option.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -88,11 +88,11 @@ extends Shuffle[K, V, C] with Logging {
combiners = new HashMap[K, C]
var threadPool = HttpParallelLocalFileShuffle.newDaemonFixedThreadPool(
HttpParallelLocalFileShuffle.MaxConnections)
HttpParallelLocalFileShuffle.MaxRxConnections)
while (hasSplits < totalSplits) {
var numThreadsToCreate =
Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxConnections) -
Math.min(totalSplits, HttpParallelLocalFileShuffle.MaxRxConnections) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
......@@ -213,7 +213,7 @@ object HttpParallelLocalFileShuffle extends Logging {
private var MaxKnockInterval_ = 5000
// Maximum number of connections
private var MaxConnections_ = 4
private var MaxRxConnections_ = 4
private var initialized = false
private var nextShuffleId = new AtomicLong(0)
......@@ -234,8 +234,8 @@ object HttpParallelLocalFileShuffle extends Logging {
MaxKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt
MaxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxConnections", "4").toInt
MaxRxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......@@ -291,7 +291,7 @@ object HttpParallelLocalFileShuffle extends Logging {
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
def MaxConnections = MaxConnections_
def MaxRxConnections = MaxRxConnections_
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
initializeIfNeeded()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment