From 56d8a2afa1b294bda95b1285f7d99280360ff228 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> Date: Wed, 15 Dec 2010 20:56:22 -0800 Subject: [PATCH] - Updated java-opts file of this branch. - Renamed some ParallelLocalFileShuffle config options for clarity. --- conf/java-opts | 2 +- src/scala/spark/BasicLocalFileShuffle.scala | 1 + src/scala/spark/ParallelLocalFileShuffle.scala | 13 +++++++------ 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/conf/java-opts b/conf/java-opts index 971cb32dc9..a72d82d9fa 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true -Dspark.shuffle.MaxConnections=2 +-Dspark.shuffle.class=spark.ParallelLocalFileShuffle -Dspark.shuffle.UseHttpPipelining=true -Dspark.parallelLocalFileShuffle.MaxConnections=2 diff --git a/src/scala/spark/BasicLocalFileShuffle.scala b/src/scala/spark/BasicLocalFileShuffle.scala index 6d8b42e58b..aa83e5cf8c 100644 --- a/src/scala/spark/BasicLocalFileShuffle.scala +++ b/src/scala/spark/BasicLocalFileShuffle.scala @@ -141,6 +141,7 @@ object BasicLocalFileShuffle extends Logging { shuffleDir = new File(localDir, "shuffle") shuffleDir.mkdirs() logInfo("Shuffle dir: " + shuffleDir) + val extServerPort = System.getProperty( "spark.localFileShuffle.external.server.port", "-1").toInt if (extServerPort != -1) { diff --git a/src/scala/spark/ParallelLocalFileShuffle.scala b/src/scala/spark/ParallelLocalFileShuffle.scala index 208fad1073..462ad8129a 100644 --- a/src/scala/spark/ParallelLocalFileShuffle.scala +++ b/src/scala/spark/ParallelLocalFileShuffle.scala @@ -244,13 +244,13 @@ object ParallelLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { // Load config parameters - MinKnockInterval_ = - System.getProperty ("spark.shuffle.MinKnockInterval", "1000").toInt - MaxKnockInterval_ = - System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt + MinKnockInterval_ = System.getProperty ( + "spark.parallelLocalFileShuffle.MinKnockInterval", "1000").toInt + MaxKnockInterval_ = System.getProperty ( + "spark.parallelLocalFileShuffle.MaxKnockInterval", "5000").toInt - MaxConnections_ = - System.getProperty ("spark.shuffle.MaxConnections", "4").toInt + MaxConnections_ = System.getProperty ( + "spark.parallelLocalFileShuffle.MaxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -280,6 +280,7 @@ object ParallelLocalFileShuffle extends Logging { shuffleDir = new File(localDir, "shuffle") shuffleDir.mkdirs() logInfo("Shuffle dir: " + shuffleDir) + val extServerPort = System.getProperty( "spark.localFileShuffle.external.server.port", "-1").toInt if (extServerPort != -1) { -- GitLab