Skip to content
Snippets Groups Projects
Commit f8599410 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Merge branch 'mos-shuffle-parallel' into mos-shuffle-tracked

Conflicts:
	conf/java-opts
parents 90e46720 6c8e9cb2
No related branches found
No related tags found
No related merge requests found
-Dspark.shuffle.class=spark.TrackedCustomParallelLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.maxTxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.blockedInMemoryShuffle.maxRxConnections=2 -Dspark.blockedInMemoryShuffle.maxTxConnections=2 -Dspark.blockedInMemoryShuffle.minKnockInterval=50 -Dspark.blockedInMemoryShuffle.maxKnockInterval=2000 -Dspark.blockedInMemoryShuffle.blockSize=256 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000 -Dspark.parallelInMemoryShuffle.maxRxConnections=2 -Dspark.parallelInMemoryShuffle.maxTxConnections=2 -Dspark.parallelInMemoryShuffle.minKnockInterval=50 -Dspark.parallelInMemoryShuffle.maxKnockInterval=2000
-Dspark.shuffle.class=spark.TrackedCustomParallelLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=50 -Dspark.shuffle.maxKnockInterval=2000
......@@ -13,15 +13,15 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
*
* An implementation of shuffle using local memory served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.blockedInMemoryShuffle.maxRxConnections' config option.
* setting the 'spark.shuffle.maxRxConnections' config option.
*
* By controlling the 'spark.blockedInMemoryShuffle.blockSize' config option
* one can also control the largest block size to divide each map output into.
* Essentially, instead of creating one large output file for each reducer, maps
* create multiple smaller files to enable finer level of engagement.
* By controlling the 'spark.shuffle.blockSize' config option one can also
* control the largest block size to divide each map output into. Essentially,
* instead of creating one large output file for each reducer, maps create
* multiple smaller files to enable finer level of engagement.
*
* 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap.
* Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine
* 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally,
* maxTxConnections >= maxRxConnections * numReducersPerMachine
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -427,17 +427,17 @@ object CustomBlockedInMemoryShuffle extends Logging {
if (!initialized) {
// Load config parameters
BlockSize_ = System.getProperty(
"spark.blockedInMemoryShuffle.blockSize", "1024").toInt * 1024
"spark.shuffle.blockSize", "1024").toInt * 1024
MinKnockInterval_ = System.getProperty(
"spark.blockedInMemoryShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.blockedInMemoryShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.blockedInMemoryShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.blockedInMemoryShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -11,15 +11,15 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.blockedLocalFileShuffle.maxRxConnections' config option.
* setting the 'spark.shuffle.maxRxConnections' config option.
*
* By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
* one can also control the largest block size to divide each map output into.
* Essentially, instead of creating one large output file for each reducer, maps
* create multiple smaller files to enable finer level of engagement.
* By controlling the 'spark.shuffle.blockSize' config option one can also
* control the largest block size to divide each map output into. Essentially,
* instead of creating one large output file for each reducer, maps create
* multiple smaller files to enable finer level of engagement.
*
* 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap.
* Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine
* 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally,
* maxTxConnections >= maxRxConnections * numReducersPerMachine
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -411,17 +411,17 @@ object CustomBlockedLocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
BlockSize_ = System.getProperty(
"spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024
"spark.shuffle.blockSize", "1024").toInt * 1024
MinKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
*
* An implementation of shuffle using local memory served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.parallelInMemoryShuffle.maxRxConnections' config option.
* setting the 'spark.shuffle.maxRxConnections' config option.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -356,14 +356,14 @@ object CustomParallelInMemoryShuffle extends Logging {
if (!initialized) {
// Load config parameters
MinKnockInterval_ = System.getProperty(
"spark.parallelInMemoryShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.parallelInMemoryShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.parallelInMemoryShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.parallelInMemoryShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -11,10 +11,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.parallelLocalFileShuffle.maxRxConnections' config option.
* setting the 'spark.shuffle.maxRxConnections' config option.
*
* 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap.
* Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine
* 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally,
* maxTxConnections >= maxRxConnections * numReducersPerMachine
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -344,14 +344,14 @@ object CustomParallelLocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
MinKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -11,12 +11,11 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.blockedLocalFileShuffle.maxRxConnections' config option.
* 'spark.shuffle.maxRxConnections' config option.
*
* By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
* one can also control the largest block size to retrieve by each reducers.
* An INDEX file keeps track of block boundaries instead of creating many
* smaller files.
* By controlling the 'spark.shuffle.blockSize' config option one can also
* control the largest block size to retrieve by each reducers. An INDEX file
* keeps track of block boundaries instead of creating many smaller files.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -376,17 +375,17 @@ object HttpBlockedLocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
BlockSize_ = System.getProperty(
"spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024
"spark.shuffle.blockSize", "1024").toInt * 1024
MinKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.parallelLocalFileShuffle.maxRxConnections' config option.
* 'spark.shuffle.maxRxConnections' config option.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -301,12 +301,12 @@ object HttpParallelLocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
MinKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -11,12 +11,12 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through HTTP where
* receivers create simultaneous connections to multiple servers by setting the
* 'spark.blockedLocalFileShuffle.maxRxConnections' config option.
* 'spark.shuffle.maxRxConnections' config option.
*
* By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option
* one can also control the largest block size to divide each map output into.
* Essentially, instead of creating one large output file for each reducer, maps
* create multiple smaller files to enable finer level of engagement.
* By controlling the 'spark.shuffle.blockSize' config option one can also
* control the largest block size to divide each map output into. Essentially,
* instead of creating one large output file for each reducer, maps create
* multiple smaller files to enable finer level of engagement.
*
* TODO: Add support for compression when spark.compress is set to true.
*/
......@@ -367,17 +367,17 @@ object ManualBlockedLocalFileShuffle extends Logging {
if (!initialized) {
// Load config parameters
BlockSize_ = System.getProperty(
"spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024
"spark.shuffle.blockSize", "1024").toInt * 1024
MinKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.blockedLocalFileShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
......@@ -11,10 +11,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
/**
* An implementation of shuffle using local files served through custom server
* where receivers create simultaneous connections to multiple servers by
* setting the 'spark.parallelLocalFileShuffle.maxRxConnections' config option.
* setting the 'spark.shuffle.maxRxConnections' config option.
*
* 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap.
* Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine
* 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally,
* maxTxConnections >= maxRxConnections * numReducersPerMachine
*
* 'spark.shuffle.TrackerStrategy' decides which strategy to use
*
......@@ -649,14 +649,14 @@ object TrackedCustomParallelLocalFileShuffle extends Logging {
System.getProperty ("spark.shuffle.masterTrackerPort", "22222").toInt
MinKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt
"spark.shuffle.minKnockInterval", "1000").toInt
MaxKnockInterval_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt
"spark.shuffle.maxKnockInterval", "5000").toInt
MaxRxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt
"spark.shuffle.maxRxConnections", "4").toInt
MaxTxConnections_ = System.getProperty(
"spark.parallelLocalFileShuffle.maxTxConnections", "8").toInt
"spark.shuffle.maxTxConnections", "8").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment