diff --git a/conf/java-opts b/conf/java-opts index d2fb332b923376e5f4cbe43c644e9d4daf3d4f15..29b5faa743c31b05209bfc9777290e39ff9a55ec 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.maxTxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.blockedInMemoryShuffle.maxRxConnections=2 -Dspark.blockedInMemoryShuffle.maxTxConnections=2 -Dspark.blockedInMemoryShuffle.minKnockInterval=50 -Dspark.blockedInMemoryShuffle.maxKnockInterval=2000 -Dspark.blockedInMemoryShuffle.blockSize=256 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000 -Dspark.parallelInMemoryShuffle.maxRxConnections=2 -Dspark.parallelInMemoryShuffle.maxTxConnections=2 -Dspark.parallelInMemoryShuffle.minKnockInterval=50 -Dspark.parallelInMemoryShuffle.maxKnockInterval=2000 +-Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=50 -Dspark.shuffle.maxKnockInterval=2000 diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 1c5fe0f7b5a4a6520e1d284e3399d83d1f67938a..ca159e1d10b1efd099a4d1461e60d6bfaff9f2d7 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -13,15 +13,15 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * * An implementation of shuffle using local memory served through custom server * where receivers create simultaneous connections to multiple servers by - * setting the 'spark.blockedInMemoryShuffle.maxRxConnections' config option. + * setting the 'spark.shuffle.maxRxConnections' config option. * - * By controlling the 'spark.blockedInMemoryShuffle.blockSize' config option - * one can also control the largest block size to divide each map output into. - * Essentially, instead of creating one large output file for each reducer, maps - * create multiple smaller files to enable finer level of engagement. + * By controlling the 'spark.shuffle.blockSize' config option one can also + * control the largest block size to divide each map output into. Essentially, + * instead of creating one large output file for each reducer, maps create + * multiple smaller files to enable finer level of engagement. * - * 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap. - * Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine + * 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally, + * maxTxConnections >= maxRxConnections * numReducersPerMachine * * TODO: Add support for compression when spark.compress is set to true. */ @@ -427,17 +427,17 @@ object CustomBlockedInMemoryShuffle extends Logging { if (!initialized) { // Load config parameters BlockSize_ = System.getProperty( - "spark.blockedInMemoryShuffle.blockSize", "1024").toInt * 1024 + "spark.shuffle.blockSize", "1024").toInt * 1024 MinKnockInterval_ = System.getProperty( - "spark.blockedInMemoryShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.blockedInMemoryShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.blockedInMemoryShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt MaxTxConnections_ = System.getProperty( - "spark.blockedInMemoryShuffle.maxTxConnections", "8").toInt + "spark.shuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index 19ce1f242a0d23000075f835e1b1c40180175923..f4d1ab9b0a6fb931f36473f890c80db0fdc97f5d 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -11,15 +11,15 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through custom server * where receivers create simultaneous connections to multiple servers by - * setting the 'spark.blockedLocalFileShuffle.maxRxConnections' config option. + * setting the 'spark.shuffle.maxRxConnections' config option. * - * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option - * one can also control the largest block size to divide each map output into. - * Essentially, instead of creating one large output file for each reducer, maps - * create multiple smaller files to enable finer level of engagement. + * By controlling the 'spark.shuffle.blockSize' config option one can also + * control the largest block size to divide each map output into. Essentially, + * instead of creating one large output file for each reducer, maps create + * multiple smaller files to enable finer level of engagement. * - * 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap. - * Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine + * 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally, + * maxTxConnections >= maxRxConnections * numReducersPerMachine * * TODO: Add support for compression when spark.compress is set to true. */ @@ -411,17 +411,17 @@ object CustomBlockedLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters BlockSize_ = System.getProperty( - "spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024 + "spark.shuffle.blockSize", "1024").toInt * 1024 MinKnockInterval_ = System.getProperty( - "spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt MaxTxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxTxConnections", "8").toInt + "spark.shuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc diff --git a/src/scala/spark/CustomParallelInMemoryShuffle.scala b/src/scala/spark/CustomParallelInMemoryShuffle.scala index 33bad1d9f7ea55f3855671401258f653fe521682..7d4c6f1cc386840ea7c52bcfce60163fa0a8e508 100644 --- a/src/scala/spark/CustomParallelInMemoryShuffle.scala +++ b/src/scala/spark/CustomParallelInMemoryShuffle.scala @@ -13,7 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * * An implementation of shuffle using local memory served through custom server * where receivers create simultaneous connections to multiple servers by - * setting the 'spark.parallelInMemoryShuffle.maxRxConnections' config option. + * setting the 'spark.shuffle.maxRxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -356,14 +356,14 @@ object CustomParallelInMemoryShuffle extends Logging { if (!initialized) { // Load config parameters MinKnockInterval_ = System.getProperty( - "spark.parallelInMemoryShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.parallelInMemoryShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.parallelInMemoryShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt MaxTxConnections_ = System.getProperty( - "spark.parallelInMemoryShuffle.maxTxConnections", "8").toInt + "spark.shuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc diff --git a/src/scala/spark/CustomParallelLocalFileShuffle.scala b/src/scala/spark/CustomParallelLocalFileShuffle.scala index 20669d4cce288f72b67608d20b3aeb5ea27b9ff8..d41057b0d95a3e4305e2dccbd88758478eca6018 100644 --- a/src/scala/spark/CustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/CustomParallelLocalFileShuffle.scala @@ -11,10 +11,10 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through custom server * where receivers create simultaneous connections to multiple servers by - * setting the 'spark.parallelLocalFileShuffle.maxRxConnections' config option. + * setting the 'spark.shuffle.maxRxConnections' config option. * - * 'spark.parallelLocalFileShuffle.maxTxConnections' enforces server-side cap. - * Ideally maxTxConnections >= maxRxConnections * numReducersPerMachine + * 'spark.shuffle.maxTxConnections' enforces server-side cap. Ideally, + * maxTxConnections >= maxRxConnections * numReducersPerMachine * * TODO: Add support for compression when spark.compress is set to true. */ @@ -344,14 +344,14 @@ object CustomParallelLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters MinKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt MaxTxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxTxConnections", "8").toInt + "spark.shuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc diff --git a/src/scala/spark/HttpBlockedLocalFileShuffle.scala b/src/scala/spark/HttpBlockedLocalFileShuffle.scala index 07efe7f6203cf6cea03a391bc7b78ce6ed8f7678..bd6f274264068669a69a4f139aee6b8336de2fd2 100644 --- a/src/scala/spark/HttpBlockedLocalFileShuffle.scala +++ b/src/scala/spark/HttpBlockedLocalFileShuffle.scala @@ -11,12 +11,11 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.blockedLocalFileShuffle.maxRxConnections' config option. + * 'spark.shuffle.maxRxConnections' config option. * - * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option - * one can also control the largest block size to retrieve by each reducers. - * An INDEX file keeps track of block boundaries instead of creating many - * smaller files. + * By controlling the 'spark.shuffle.blockSize' config option one can also + * control the largest block size to retrieve by each reducers. An INDEX file + * keeps track of block boundaries instead of creating many smaller files. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -376,17 +375,17 @@ object HttpBlockedLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters BlockSize_ = System.getProperty( - "spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024 + "spark.shuffle.blockSize", "1024").toInt * 1024 MinKnockInterval_ = System.getProperty( - "spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt MaxTxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxTxConnections", "8").toInt + "spark.shuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc diff --git a/src/scala/spark/HttpParallelLocalFileShuffle.scala b/src/scala/spark/HttpParallelLocalFileShuffle.scala index 629cb075ea5c1a244e3ecdbd43549dfbc20b96ac..cdc961da32931862e185b006d0d8540c11b75ea8 100644 --- a/src/scala/spark/HttpParallelLocalFileShuffle.scala +++ b/src/scala/spark/HttpParallelLocalFileShuffle.scala @@ -11,7 +11,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.parallelLocalFileShuffle.maxRxConnections' config option. + * 'spark.shuffle.maxRxConnections' config option. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -301,12 +301,12 @@ object HttpParallelLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters MinKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.parallelLocalFileShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc diff --git a/src/scala/spark/ManualBlockedLocalFileShuffle.scala b/src/scala/spark/ManualBlockedLocalFileShuffle.scala index fe625d9c9c62f517f92a393007a48b19fd632bbb..03b7b384150ace9a6c6f4ea6dd4230ef0bc8ecb8 100644 --- a/src/scala/spark/ManualBlockedLocalFileShuffle.scala +++ b/src/scala/spark/ManualBlockedLocalFileShuffle.scala @@ -11,12 +11,12 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} /** * An implementation of shuffle using local files served through HTTP where * receivers create simultaneous connections to multiple servers by setting the - * 'spark.blockedLocalFileShuffle.maxRxConnections' config option. + * 'spark.shuffle.maxRxConnections' config option. * - * By controlling the 'spark.blockedLocalFileShuffle.blockSize' config option - * one can also control the largest block size to divide each map output into. - * Essentially, instead of creating one large output file for each reducer, maps - * create multiple smaller files to enable finer level of engagement. + * By controlling the 'spark.shuffle.blockSize' config option one can also + * control the largest block size to divide each map output into. Essentially, + * instead of creating one large output file for each reducer, maps create + * multiple smaller files to enable finer level of engagement. * * TODO: Add support for compression when spark.compress is set to true. */ @@ -367,17 +367,17 @@ object ManualBlockedLocalFileShuffle extends Logging { if (!initialized) { // Load config parameters BlockSize_ = System.getProperty( - "spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024 + "spark.shuffle.blockSize", "1024").toInt * 1024 MinKnockInterval_ = System.getProperty( - "spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt + "spark.shuffle.minKnockInterval", "1000").toInt MaxKnockInterval_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt + "spark.shuffle.maxKnockInterval", "5000").toInt MaxRxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxRxConnections", "4").toInt + "spark.shuffle.maxRxConnections", "4").toInt MaxTxConnections_ = System.getProperty( - "spark.blockedLocalFileShuffle.maxTxConnections", "8").toInt + "spark.shuffle.maxTxConnections", "8").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc