diff --git a/conf/java-opts b/conf/java-opts index 971cb32dc9a73531dce857884695acdffe223710..a72d82d9fa707780a2a3ce560d76b1a55ff67a06 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true -Dspark.shuffle.MaxConnections=2 +-Dspark.shuffle.class=spark.ParallelLocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true -Dspark.parallelLocalFileShuffle.MaxConnections=2 diff --git a/src/scala/spark/BasicLocalFileShuffle.scala b/src/scala/spark/BasicLocalFileShuffle.scala index 6d8b42e58bbc2638b71db6c50ba2a584bbdec555..aa83e5cf8ca1400ede880618fa3cb0dd04fb72fe 100644 --- a/src/scala/spark/BasicLocalFileShuffle.scala +++ b/src/scala/spark/BasicLocalFileShuffle.scala @@ -141,6 +141,7 @@ object BasicLocalFileShuffle extends Logging { shuffleDir = new File(localDir, "shuffle") shuffleDir.mkdirs() logInfo("Shuffle dir: " + shuffleDir) + val extServerPort = System.getProperty( "spark.localFileShuffle.external.server.port", "-1").toInt if (extServerPort != -1) { diff --git a/src/scala/spark/ParallelLocalFileShuffle.scala b/src/scala/spark/ParallelLocalFileShuffle.scala index 208fad10739815174fc6649300db2cab7535d629..462ad8129ad7d11d6546a2f46c95373ede8941e7 100644 --- a/src/scala/spark/ParallelLocalFileShuffle.scala +++ b/src/scala/spark/ParallelLocalFileShuffle.scala @@ -244,13 +244,13 @@ object ParallelLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { // Load config parameters - MinKnockInterval_ = - System.getProperty ("spark.shuffle.MinKnockInterval", "1000").toInt - MaxKnockInterval_ = - System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt + MinKnockInterval_ = System.getProperty ( + "spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt + MaxKnockInterval_ = System.getProperty ( + "spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt - MaxConnections_ = - System.getProperty ("spark.shuffle.MaxConnections", "4").toInt + MaxConnections_ = System.getProperty ( + "spark.parallelLocalFileShuffle.MaxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -280,6 +280,7 @@ object ParallelLocalFileShuffle extends Logging { shuffleDir = new File(localDir, "shuffle") shuffleDir.mkdirs() logInfo("Shuffle dir: " + shuffleDir) + val extServerPort = System.getProperty( "spark.localFileShuffle.external.server.port", "-1").toInt if (extServerPort != -1) {