diff --git a/src/scala/spark/BasicLocalFileShuffle.scala b/src/scala/spark/BasicLocalFileShuffle.scala index f258d6757d7e0bcc4b2f9418b1e7cc40e12b1551..95160badd49eef5e20f413c882a7c2ff14451f73 100644 --- a/src/scala/spark/BasicLocalFileShuffle.scala +++ b/src/scala/spark/BasicLocalFileShuffle.scala @@ -50,13 +50,13 @@ class BasicLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { for (i <- 0 until numOutputSplits) { val file = BasicLocalFileShuffle.getOutputFile(shuffleId, myIndex, i) val writeStartTime = System.currentTimeMillis - logInfo ("BEGIN WRITE: " + file) + logInfo("BEGIN WRITE: " + file) val out = new ObjectOutputStream(new FileOutputStream(file)) buckets(i).foreach(pair => out.writeObject(pair)) out.close() - logInfo ("END WRITE: " + file) + logInfo("END WRITE: " + file) val writeTime = (System.currentTimeMillis - writeStartTime) - logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") + logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") } (myIndex, BasicLocalFileShuffle.serverUri) @@ -79,7 +79,7 @@ class BasicLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { for (i <- inputIds) { val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId) val readStartTime = System.currentTimeMillis - logInfo ("BEGIN READ: " + url) + logInfo("BEGIN READ: " + url) val inputStream = new ObjectInputStream(new URL(url).openStream()) try { while (true) { @@ -93,9 +93,9 @@ class BasicLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { case e: EOFException => {} } inputStream.close() - logInfo ("END READ: " + url) - val readTime = (System.currentTimeMillis - readStartTime) - logInfo ("Reading " + url + " took " + readTime + " millis.") + logInfo("END READ: " + url) + val readTime = System.currentTimeMillis - readStartTime + logInfo("Reading " + url + " took " + readTime + " millis.") } } combiners @@ -103,7 +103,6 @@ class BasicLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } } - object BasicLocalFileShuffle extends Logging { private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -125,9 +124,9 @@ object BasicLocalFileShuffle extends Logging { while (!foundLocalDir && tries < 10) { tries += 1 try { - localDirUuid = UUID.randomUUID() + localDirUuid = UUID.randomUUID localDir = new File(localDirRoot, "spark-local-" + localDirUuid) - if (!localDir.exists()) { + if (!localDir.exists) { localDir.mkdirs() foundLocalDir = true } @@ -162,7 +161,7 @@ object BasicLocalFileShuffle extends Logging { serverUri = server.uri } initialized = true - logInfo ("Local URI: " + serverUri) + logInfo("Local URI: " + serverUri) } } diff --git a/src/scala/spark/BlockedLocalFileShuffle.scala b/src/scala/spark/BlockedLocalFileShuffle.scala index 2685424a645c22e61fd59bdcb6f1cc11960709fd..bd88a263b9925200ed8dd39442c4905afac688f4 100644 --- a/src/scala/spark/BlockedLocalFileShuffle.scala +++ b/src/scala/spark/BlockedLocalFileShuffle.scala @@ -80,7 +80,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { file = BlockedLocalFileShuffle.getOutputFile(shuffleId, myIndex, i, blockNum) writeStartTime = System.currentTimeMillis - logInfo ("BEGIN WRITE: " + file) + logInfo("BEGIN WRITE: " + file) out = new ObjectOutputStream(new FileOutputStream(file)) } @@ -92,9 +92,9 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { // Close the old file if has crossed the blockSize limit if (file.length > BlockedLocalFileShuffle.BlockSize) { out.close() - logInfo ("END WRITE: " + file) - val writeTime = (System.currentTimeMillis - writeStartTime) - logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") + logInfo("END WRITE: " + file) + val writeTime = System.currentTimeMillis - writeStartTime + logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") blockNum = blockNum + 1 isDirty = false @@ -103,9 +103,9 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { if (isDirty) { out.close() - logInfo ("END WRITE: " + file) - val writeTime = (System.currentTimeMillis - writeStartTime) - logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") + logInfo("END WRITE: " + file) + val writeTime = System.currentTimeMillis - writeStartTime + logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") blockNum = blockNum + 1 } @@ -132,17 +132,17 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { totalBlocksInSplit = Array.tabulate(totalSplits)(_ => -1) hasBlocksInSplit = Array.tabulate(totalSplits)(_ => 0) - hasSplitsBitVector = new BitSet (totalSplits) - splitsInRequestBitVector = new BitSet (totalSplits) + hasSplitsBitVector = new BitSet(totalSplits) + splitsInRequestBitVector = new BitSet(totalSplits) combiners = new HashMap[K, C] - var threadPool = BlockedLocalFileShuffle.newDaemonFixedThreadPool ( + var threadPool = BlockedLocalFileShuffle.newDaemonFixedThreadPool( BlockedLocalFileShuffle.MaxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min (totalSplits, BlockedLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, BlockedLocalFileShuffle.MaxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -150,14 +150,14 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val splitIndex = selectRandomSplit if (splitIndex != -1) { - val (inputId, serverUri) = outputLocs (splitIndex) + val (inputId, serverUri) = outputLocs(splitIndex) - threadPool.execute ( new ShuffleClient (serverUri, shuffleId.toInt, + threadPool.execute(new ShuffleClient(serverUri, shuffleId.toInt, inputId, myId, splitIndex, mergeCombiners)) // splitIndex is in transit. Will be unset in the ShuffleClient splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set (splitIndex) + splitsInRequestBitVector.set(splitIndex) } } @@ -165,7 +165,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep (BlockedLocalFileShuffle.MinKnockInterval) + Thread.sleep(BlockedLocalFileShuffle.MinKnockInterval) } combiners }) @@ -183,14 +183,14 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } if (requiredSplits.size > 0) { - requiredSplits(BlockedLocalFileShuffle.ranGen.nextInt ( + requiredSplits(BlockedLocalFileShuffle.ranGen.nextInt( requiredSplits.size)) } else { -1 } } - class ShuffleClient (serverUri: String, shuffleId: Int, + class ShuffleClient(serverUri: String, shuffleId: Int, inputId: Int, myId: Int, splitIndex: Int, mergeCombiners: (C, C) => C) extends Thread with Logging { @@ -203,7 +203,6 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { if (totalBlocksInSplit(inputId) == -1) { val url = "%s/shuffle/%d/%d/BLOCKNUM-%d".format(serverUri, shuffleId, inputId, myId) - logInfo (url) val inputStream = new ObjectInputStream(new URL(url).openStream()) totalBlocksInSplit(inputId) = inputStream.readObject().asInstanceOf[Int] @@ -215,7 +214,7 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { myId, hasBlocksInSplit(inputId)) val readStartTime = System.currentTimeMillis - logInfo ("BEGIN READ: " + url) + logInfo("BEGIN READ: " + url) val inputStream = new ObjectInputStream(new URL(url).openStream()) try { @@ -233,9 +232,9 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } inputStream.close() - logInfo ("END READ: " + url) - val readTime = (System.currentTimeMillis - readStartTime) - logInfo ("Reading " + url + " took " + readTime + " millis.") + logInfo("END READ: " + url) + val readTime = System.currentTimeMillis - readStartTime + logInfo("Reading " + url + " took " + readTime + " millis.") // Reception completed. Update stats. hasBlocksInSplit(inputId) = hasBlocksInSplit(inputId) + 1 @@ -243,14 +242,14 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { // Split has been received only if all the blocks have been received if (hasBlocksInSplit(inputId) == totalBlocksInSplit(inputId)) { hasSplitsBitVector.synchronized { - hasSplitsBitVector.set (splitIndex) + hasSplitsBitVector.set(splitIndex) } hasSplits += 1 } // We have received splitIndex splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set (splitIndex, false) + splitsInRequestBitVector.set(splitIndex, false) } receptionSucceeded = true @@ -259,13 +258,13 @@ class BlockedLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { // connection due to timeout case eofe: java.io.EOFException => { } case e: Exception => { - logInfo ("ShuffleClient had a " + e) + logInfo("ShuffleClient had a " + e) } } finally { // If reception failed, unset for future retry if (!receptionSucceeded) { splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set (splitIndex, false) + splitsInRequestBitVector.set(splitIndex, false) } } } @@ -298,15 +297,15 @@ object BlockedLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { // Load config parameters - BlockSize_ = System.getProperty ( + BlockSize_ = System.getProperty( "spark.blockedLocalFileShuffle.blockSize", "1024").toInt * 1024 - MinKnockInterval_ = System.getProperty ( + MinKnockInterval_ = System.getProperty( "spark.blockedLocalFileShuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty ( + MaxKnockInterval_ = System.getProperty( "spark.blockedLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty ( + MaxConnections_ = System.getProperty( "spark.blockedLocalFileShuffle.maxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark @@ -319,9 +318,9 @@ object BlockedLocalFileShuffle extends Logging { while (!foundLocalDir && tries < 10) { tries += 1 try { - localDirUuid = UUID.randomUUID() + localDirUuid = UUID.randomUUID localDir = new File(localDirRoot, "spark-local-" + localDirUuid) - if (!localDir.exists()) { + if (!localDir.exists) { localDir.mkdirs() foundLocalDir = true } @@ -356,7 +355,7 @@ object BlockedLocalFileShuffle extends Logging { serverUri = server.uri } initialized = true - logInfo ("Local URI: " + serverUri) + logInfo("Local URI: " + serverUri) } } @@ -398,19 +397,19 @@ object BlockedLocalFileShuffle extends Logging { private def newDaemonThreadFactory: ThreadFactory = { new ThreadFactory { def newThread(r: Runnable): Thread = { - var t = Executors.defaultThreadFactory.newThread (r) - t.setDaemon (true) + var t = Executors.defaultThreadFactory.newThread(r) + t.setDaemon(true) return t } } } // Wrapper over newFixedThreadPool - def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = { + def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = { var threadPool = - Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor] + Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor] - threadPool.setThreadFactory (newDaemonThreadFactory) + threadPool.setThreadFactory(newDaemonThreadFactory) return threadPool } diff --git a/src/scala/spark/ParallelLocalFileShuffle.scala b/src/scala/spark/ParallelLocalFileShuffle.scala index c37f5b2b42fb60847ba17fc217d44ce613d05067..1b15c27a05219d0687670e0bf128e214fb8660f1 100644 --- a/src/scala/spark/ParallelLocalFileShuffle.scala +++ b/src/scala/spark/ParallelLocalFileShuffle.scala @@ -61,13 +61,13 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { for (i <- 0 until numOutputSplits) { val file = ParallelLocalFileShuffle.getOutputFile(shuffleId, myIndex, i) val writeStartTime = System.currentTimeMillis - logInfo ("BEGIN WRITE: " + file) + logInfo("BEGIN WRITE: " + file) val out = new ObjectOutputStream(new FileOutputStream(file)) buckets(i).foreach(pair => out.writeObject(pair)) out.close() - logInfo ("END WRITE: " + file) - val writeTime = (System.currentTimeMillis - writeStartTime) - logInfo ("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") + logInfo("END WRITE: " + file) + val writeTime = System.currentTimeMillis - writeStartTime + logInfo("Writing " + file + " of size " + file.length + " bytes took " + writeTime + " millis.") } (myIndex, ParallelLocalFileShuffle.serverUri) @@ -81,17 +81,17 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { totalSplits = outputLocs.size hasSplits = 0 - hasSplitsBitVector = new BitSet (totalSplits) - splitsInRequestBitVector = new BitSet (totalSplits) + hasSplitsBitVector = new BitSet(totalSplits) + splitsInRequestBitVector = new BitSet(totalSplits) combiners = new HashMap[K, C] - var threadPool = ParallelLocalFileShuffle.newDaemonFixedThreadPool ( + var threadPool = ParallelLocalFileShuffle.newDaemonFixedThreadPool( ParallelLocalFileShuffle.MaxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min (totalSplits, ParallelLocalFileShuffle.MaxConnections) - + Math.min(totalSplits, ParallelLocalFileShuffle.MaxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -99,14 +99,14 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val splitIndex = selectRandomSplit if (splitIndex != -1) { - val (inputId, serverUri) = outputLocs (splitIndex) + val (inputId, serverUri) = outputLocs(splitIndex) - threadPool.execute ( new ShuffleClient (serverUri, shuffleId.toInt, + threadPool.execute(new ShuffleClient(serverUri, shuffleId.toInt, inputId, myId, splitIndex, mergeCombiners)) // splitIndex is in transit. Will be unset in the ShuffleClient splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set (splitIndex) + splitsInRequestBitVector.set(splitIndex) } } @@ -114,7 +114,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } // Sleep for a while before creating new threads - Thread.sleep (ParallelLocalFileShuffle.MinKnockInterval) + Thread.sleep(ParallelLocalFileShuffle.MinKnockInterval) } combiners }) @@ -132,14 +132,14 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } if (requiredSplits.size > 0) { - requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt ( + requiredSplits(ParallelLocalFileShuffle.ranGen.nextInt( requiredSplits.size)) } else { -1 } } - class ShuffleClient (serverUri: String, shuffleId: Int, + class ShuffleClient(serverUri: String, shuffleId: Int, inputId: Int, myId: Int, splitIndex: Int, mergeCombiners: (C, C) => C) extends Thread with Logging { @@ -151,7 +151,7 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId) val readStartTime = System.currentTimeMillis - logInfo ("BEGIN READ: " + url) + logInfo("BEGIN READ: " + url) val inputStream = new ObjectInputStream(new URL(url).openStream()) try { @@ -169,19 +169,19 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } inputStream.close() - logInfo ("END READ: " + url) - val readTime = (System.currentTimeMillis - readStartTime) - logInfo ("Reading " + url + " took " + readTime + " millis.") + logInfo("END READ: " + url) + val readTime = System.currentTimeMillis - readStartTime + logInfo("Reading " + url + " took " + readTime + " millis.") // Reception completed. Update stats. hasSplitsBitVector.synchronized { - hasSplitsBitVector.set (splitIndex) + hasSplitsBitVector.set(splitIndex) } hasSplits += 1 // We have received splitIndex splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set (splitIndex, false) + splitsInRequestBitVector.set(splitIndex, false) } receptionSucceeded = true @@ -190,13 +190,13 @@ class ParallelLocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { // connection due to timeout case eofe: java.io.EOFException => { } case e: Exception => { - logInfo ("ShuffleClient had a " + e) + logInfo("ShuffleClient had a " + e) } } finally { // If reception failed, unset for future retry if (!receptionSucceeded) { splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set (splitIndex, false) + splitsInRequestBitVector.set(splitIndex, false) } } } @@ -227,12 +227,12 @@ object ParallelLocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { // Load config parameters - MinKnockInterval_ = System.getProperty ( + MinKnockInterval_ = System.getProperty( "spark.parallelLocalFileShuffle.minKnockInterval", "1000").toInt - MaxKnockInterval_ = System.getProperty ( + MaxKnockInterval_ = System.getProperty( "spark.parallelLocalFileShuffle.maxKnockInterval", "5000").toInt - MaxConnections_ = System.getProperty ( + MaxConnections_ = System.getProperty( "spark.parallelLocalFileShuffle.maxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark @@ -245,9 +245,9 @@ object ParallelLocalFileShuffle extends Logging { while (!foundLocalDir && tries < 10) { tries += 1 try { - localDirUuid = UUID.randomUUID() + localDirUuid = UUID.randomUUID localDir = new File(localDirRoot, "spark-local-" + localDirUuid) - if (!localDir.exists()) { + if (!localDir.exists) { localDir.mkdirs() foundLocalDir = true } @@ -282,7 +282,7 @@ object ParallelLocalFileShuffle extends Logging { serverUri = server.uri } initialized = true - logInfo ("Local URI: " + serverUri) + logInfo("Local URI: " + serverUri) } } @@ -312,19 +312,19 @@ object ParallelLocalFileShuffle extends Logging { private def newDaemonThreadFactory: ThreadFactory = { new ThreadFactory { def newThread(r: Runnable): Thread = { - var t = Executors.defaultThreadFactory.newThread (r) - t.setDaemon (true) + var t = Executors.defaultThreadFactory.newThread(r) + t.setDaemon(true) return t } } } // Wrapper over newFixedThreadPool - def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = { + def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor = { var threadPool = - Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor] + Executors.newFixedThreadPool(nThreads).asInstanceOf[ThreadPoolExecutor] - threadPool.setThreadFactory (newDaemonThreadFactory) + threadPool.setThreadFactory(newDaemonThreadFactory) return threadPool }