From 3447f903da90ee9ba099fbd39bd5c398705a6647 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> Date: Wed, 22 Dec 2010 17:17:33 -0800 Subject: [PATCH] Renamed CustomBlockedLocalFileShuffle to ManualBlockedLocalFileShuffle. There will be a new CustomBlockedLocalFileShuffle where 'Custom' will mean ManualBlockedLocalFileShuffle with custom server instead of jetty. --- conf/java-opts | 2 +- ...la => ManualBlockedLocalFileShuffle.scala} | 24 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) rename src/scala/spark/{CustomBlockedLocalFileShuffle.scala => ManualBlockedLocalFileShuffle.scala} (95%) diff --git a/conf/java-opts b/conf/java-opts index 5b4ae25c06..c006c7cfc7 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.CustomParallelInMemoryShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000 -Dspark.parallelInMemoryShuffle.maxRxConnections=2 -Dspark.parallelInMemoryShuffle.maxTxConnections=2 -Dspark.parallelInMemoryShuffle.minKnockInterval=50 -Dspark.parallelInMemoryShuffle.maxKnockInterval=2000 +-Dspark.shuffle.class=spark.ManualBlockedLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=2000 -Dspark.parallelInMemoryShuffle.maxRxConnections=2 -Dspark.parallelInMemoryShuffle.maxTxConnections=2 -Dspark.parallelInMemoryShuffle.minKnockInterval=50 -Dspark.parallelInMemoryShuffle.maxKnockInterval=2000 diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/ManualBlockedLocalFileShuffle.scala similarity index 95% rename from src/scala/spark/CustomBlockedLocalFileShuffle.scala rename to src/scala/spark/ManualBlockedLocalFileShuffle.scala index 75f8c0bffe..ffe3181353 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/ManualBlockedLocalFileShuffle.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap} * TODO: Add support for compression when spark.compress is set to true. */ @serializable -class CustomBlockedLocalFileShuffle[K, V, C] +class ManualBlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { @transient var totalSplits = 0 @transient var hasSplits = 0 @@ -43,7 +43,7 @@ extends Shuffle[K, V, C] with Logging { : RDD[(K, C)] = { val sc = input.sparkContext - val shuffleId = CustomBlockedLocalFileShuffle.newShuffleId() + val shuffleId = ManualBlockedLocalFileShuffle.newShuffleId() logInfo("Shuffle ID: " + shuffleId) val splitRdd = new NumberedSplitRDD(input) @@ -78,7 +78,7 @@ extends Shuffle[K, V, C] with Logging { buckets(i).foreach(pair => { // Open a new file if necessary if (!isDirty) { - file = CustomBlockedLocalFileShuffle.getOutputFile(shuffleId, + file = ManualBlockedLocalFileShuffle.getOutputFile(shuffleId, myIndex, i, blockNum) writeStartTime = System.currentTimeMillis logInfo("BEGIN WRITE: " + file) @@ -91,7 +91,7 @@ extends Shuffle[K, V, C] with Logging { isDirty = true // Close the old file if has crossed the blockSize limit - if (file.length > CustomBlockedLocalFileShuffle.BlockSize) { + if (file.length > ManualBlockedLocalFileShuffle.BlockSize) { out.close() logInfo("END WRITE: " + file) val writeTime = System.currentTimeMillis - writeStartTime @@ -112,14 +112,14 @@ extends Shuffle[K, V, C] with Logging { } // Write the BLOCKNUM file - file = CustomBlockedLocalFileShuffle.getBlockNumOutputFile(shuffleId, + file = ManualBlockedLocalFileShuffle.getBlockNumOutputFile(shuffleId, myIndex, i) out = new ObjectOutputStream(new FileOutputStream(file)) out.writeObject(blockNum) out.close() } - (myIndex, CustomBlockedLocalFileShuffle.serverUri) + (myIndex, ManualBlockedLocalFileShuffle.serverUri) }).collect() // TODO: Could broadcast outputLocs @@ -145,12 +145,12 @@ extends Shuffle[K, V, C] with Logging { shuffleConsumer.start() logInfo("ShuffleConsumer started...") - var threadPool = CustomBlockedLocalFileShuffle.newDaemonFixedThreadPool( - CustomBlockedLocalFileShuffle.MaxRxConnections) + var threadPool = ManualBlockedLocalFileShuffle.newDaemonFixedThreadPool( + ManualBlockedLocalFileShuffle.MaxRxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min(totalSplits, CustomBlockedLocalFileShuffle.MaxRxConnections) - + Math.min(totalSplits, ManualBlockedLocalFileShuffle.MaxRxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -173,7 +173,7 @@ extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep(CustomBlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(ManualBlockedLocalFileShuffle.MinKnockInterval) } threadPool.shutdown() @@ -193,7 +193,7 @@ extends Shuffle[K, V, C] with Logging { } if (requiredSplits.size > 0) { - requiredSplits(CustomBlockedLocalFileShuffle.ranGen.nextInt( + requiredSplits(ManualBlockedLocalFileShuffle.ranGen.nextInt( requiredSplits.size)) } else { -1 @@ -341,7 +341,7 @@ extends Shuffle[K, V, C] with Logging { } } -object CustomBlockedLocalFileShuffle extends Logging { +object ManualBlockedLocalFileShuffle extends Logging { // Used thoughout the code for small and large waits/timeouts private var BlockSize_ = 1024 * 1024 -- GitLab