diff --git a/conf/java-opts b/conf/java-opts index 6b5ee590641b4659c10f930413a3a18c7cd8d36d..e615f9dbfe7165aa481757dd19fb190b139a98c6 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.TrackedCustomParallelLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=50 -Dspark.shuffle.maxKnockInterval=2000 +-Dspark.shuffle.class=spark.TrackedCustomParallelLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 diff --git a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala index 5b14f1e2d1916a5de4af6d64e06a3511121c4806..42dcb88ba44257faf6a2ac4ec9a4dc2c28573020 100644 --- a/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomParallelLocalFileShuffle.scala @@ -574,7 +574,7 @@ extends ShuffleTrackerStrategy with Logging { totalConnectionsPerLoc(splitIndex) = totalConnectionsPerLoc(splitIndex) + 1 - totalConnectionsPerLoc.foreach { i => + curConnectionsPerLoc.foreach { i => print ("" + i + " ") } println("") @@ -590,7 +590,7 @@ extends ShuffleTrackerStrategy with Logging { curConnectionsPerLoc(serverSplitIndex) = curConnectionsPerLoc(serverSplitIndex) - 1 - totalConnectionsPerLoc.foreach { i => + curConnectionsPerLoc.foreach { i => print ("" + i + " ") } println("") @@ -619,18 +619,6 @@ object SplitInfo { } object TrackedCustomParallelLocalFileShuffle extends Logging { - // ShuffleTracker info - private var MasterHostAddress_ = InetAddress.getLocalHost.getHostAddress - private var MasterTrackerPort_ : Int = 22222 - - // Used thoughout the code for small and large waits/timeouts - private var MinKnockInterval_ = 1000 - private var MaxKnockInterval_ = 5000 - - // Maximum number of connections - private var MaxRxConnections_ = 4 - private var MaxTxConnections_ = 8 - // Tracker communication constants val ReducerEntering = 0 val ReducerLeaving = 1 @@ -648,24 +636,29 @@ object TrackedCustomParallelLocalFileShuffle extends Logging { // Random number generator var ranGen = new Random + // Load config parameters + + // ShuffleTracker info + private var MasterHostAddress_ = System.getProperty( + "spark.shuffle.masterHostAddress", InetAddress.getLocalHost.getHostAddress) + private var MasterTrackerPort_ = System.getProperty( + "spark.shuffle.masterTrackerPort", "22222").toInt + + // Used thoughout the code for small and large waits/timeouts + private var MinKnockInterval_ = System.getProperty( + "spark.shuffle.minKnockInterval", "1000").toInt + private var MaxKnockInterval_ = System.getProperty( + "spark.shuffle.maxKnockInterval", "5000").toInt + + // Maximum number of connections + private var MaxRxConnections_ = System.getProperty( + "spark.shuffle.maxRxConnections", "4").toInt + private var MaxTxConnections_ = System.getProperty( + "spark.shuffle.maxTxConnections", "8").toInt + + private def initializeIfNeeded() = synchronized { if (!initialized) { - // Load config parameters - MasterHostAddress_ = - System.getProperty ("spark.shuffle.masterHostAddress", "127.0.0.1") - MasterTrackerPort_ = - System.getProperty ("spark.shuffle.masterTrackerPort", "22222").toInt - - MinKnockInterval_ = System.getProperty( - "spark.shuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty( - "spark.shuffle.maxKnockInterval", "5000").toInt - - MaxRxConnections_ = System.getProperty( - "spark.shuffle.maxRxConnections", "4").toInt - MaxTxConnections_ = System.getProperty( - "spark.shuffle.maxTxConnections", "8").toInt - // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp")