diff --git a/conf/java-opts b/conf/java-opts index 21795fbd8d3b676ee2989d77dbc654ad14c7b7b7..59a9d98fc93b6ca28e14b1dd0ff9bc15c2442607 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 +-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=50 diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 4a8aae57325b3983f9434e16029eb5a86883bf87..7f844883a545b839cf01f51f5eece03fed67ee5e 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -298,6 +298,9 @@ extends Shuffle[K, V, C] with Logging { // Receive BLOCKNUM totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int] + // Turn the timer OFF, if the sender responds before timeout + timeOutTimer.cancel() + // Request specific block oosSource.writeObject(hasBlocksInSplit(splitIndex)) @@ -305,9 +308,6 @@ extends Shuffle[K, V, C] with Logging { var requestedFileLen = oisSource.readObject.asInstanceOf[Int] logInfo("Received requestedFileLen = " + requestedFileLen) - // Turn the timer OFF, if the sender responds before timeout - timeOutTimer.cancel() - val requestSplit = "%d/%d/%d-%d".format(shuffleId, inputId, myId, hasBlocksInSplit(splitIndex)) diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index 2853e29cd9a82954db7176a81105d0e63a090c4b..96ac923b25b35a10cc1a46f7c84fd5252ec6c364 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -285,69 +285,75 @@ extends Shuffle[K, V, C] with Logging { // Receive BLOCKNUM totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int] - // Request specific block - oosSource.writeObject(hasBlocksInSplit(splitIndex)) - - // Good to go. First, receive the length of the requested file - var requestedFileLen = oisSource.readObject.asInstanceOf[Int] - logInfo("Received requestedFileLen = " + requestedFileLen) - // Turn the timer OFF, if the sender responds before timeout timeOutTimer.cancel() - - val requestPath = "%d/%d/%d-%d".format(shuffleId, inputId, myId, - hasBlocksInSplit(splitIndex)) - - // Receive the file - if (requestedFileLen != -1) { - val readStartTime = System.currentTimeMillis - logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath)) - - // Receive data in an Array[Byte] - var recvByteArray = new Array[Byte](requestedFileLen) - var alreadyRead = 0 - var bytesRead = 0 + + while (hasBlocksInSplit(splitIndex) < totalBlocksInSplit(splitIndex)) { + // Set receptionSucceeded to false before trying for each block + receptionSucceeded = false + + // Request specific block + oosSource.writeObject(hasBlocksInSplit(splitIndex)) - while (alreadyRead != requestedFileLen) { - bytesRead = isSource.read(recvByteArray, alreadyRead, - requestedFileLen - alreadyRead) - if (bytesRead > 0) { - alreadyRead = alreadyRead + bytesRead - } - } + // Good to go. First, receive the length of the requested file + var requestedFileLen = oisSource.readObject.asInstanceOf[Int] + logInfo("Received requestedFileLen = " + requestedFileLen) + + val requestPath = "%d/%d/%d-%d".format(shuffleId, inputId, myId, + hasBlocksInSplit(splitIndex)) - // Make it available to the consumer - try { - receivedData.put((splitIndex, recvByteArray)) - } catch { - case e: Exception => { - logInfo("Exception during putting data into receivedData") + // Receive the file + if (requestedFileLen != -1) { + val readStartTime = System.currentTimeMillis + logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath)) + + // Receive data in an Array[Byte] + var recvByteArray = new Array[Byte](requestedFileLen) + var alreadyRead = 0 + var bytesRead = 0 + + while (alreadyRead != requestedFileLen) { + bytesRead = isSource.read(recvByteArray, alreadyRead, + requestedFileLen - alreadyRead) + if (bytesRead > 0) { + alreadyRead = alreadyRead + bytesRead + } + } + + // Make it available to the consumer + try { + receivedData.put((splitIndex, recvByteArray)) + } catch { + case e: Exception => { + logInfo("Exception during putting data into receivedData") + } } - } - - // TODO: Updating stats before consumption is completed - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) + + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 } - hasSplits += 1 - } - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } - receptionSucceeded = true + // Consistent state in accounting variables + receptionSucceeded = true - logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath)) - val readTime = System.currentTimeMillis - readStartTime - logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath) + " took " + readTime + " millis.") - } else { + logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath)) + val readTime = System.currentTimeMillis - readStartTime + logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath) + " took " + readTime + " millis.") + } else { throw new SparkException("ShuffleServer " + hostAddress + " does not have " + requestPath) + } } } catch { // EOFException is expected to happen because sender can break @@ -573,9 +579,9 @@ object CustomBlockedLocalFileShuffle extends Logging { val (shuffleId, myIndex, outputId) = ois.readObject.asInstanceOf[(Int, Int, Int)] - var requestPath = "%d/%d/%d".format(shuffleId, myIndex, outputId) + var requestPathBase = "%d/%d/%d".format(shuffleId, myIndex, outputId) - logInfo("requestPath: " + requestPath) + logInfo("requestPathBase: " + requestPathBase) // Read BLOCKNUM file and send back the total number of blocks val blockNumFilePath = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir, @@ -586,61 +592,81 @@ object CustomBlockedLocalFileShuffle extends Logging { blockNumIn.close() oos.writeObject(BLOCKNUM) - - // Receive specific block request - val blockId = ois.readObject.asInstanceOf[Int] - - // Ready to send - requestPath = requestPath + "-" + blockId - - // Open the file - var requestedFile: File = null - var requestedFileLen = -1 - try { - requestedFile = new File(shuffleDir + "/" + requestPath) - requestedFileLen = requestedFile.length.toInt - } catch { - case e: Exception => { } - } - - // Send the length of the requestPath to let the receiver know that - // transfer is about to start - // In the case of receiver timeout and connection close, this will - // throw a java.net.SocketException: Broken pipe - oos.writeObject(requestedFileLen) - oos.flush() - - logInfo("requestedFileLen = " + requestedFileLen) - // Read and send the requested file - if (requestedFileLen != -1) { - // Read - var byteArray = new Array[Byte](requestedFileLen) - val bis = - new BufferedInputStream(new FileInputStream(requestedFile)) - - var bytesRead = bis.read(byteArray, 0, byteArray.length) - var alreadyRead = bytesRead - - while (alreadyRead < requestedFileLen) { - bytesRead = bis.read(byteArray, alreadyRead, - (byteArray.length - alreadyRead)) - if(bytesRead > 0) { - alreadyRead = alreadyRead + bytesRead - } - } - bis.close() + val startTime = System.currentTimeMillis + var curTime = startTime + var keepSending = true + var numBlocksToSend = Shuffle.MaxChatBlocks + + while (keepSending && numBlocksToSend > 0) { + // Receive specific block request + val blockId = ois.readObject.asInstanceOf[Int] - // Send - bos.write(byteArray, 0, byteArray.length) - bos.flush() - } else { - // Close the connection + // Ready to send + val requestPath = requestPathBase + "-" + blockId + + // Open the file + var requestedFile: File = null + var requestedFileLen = -1 + try { + requestedFile = new File(shuffleDir + "/" + requestPath) + requestedFileLen = requestedFile.length.toInt + } catch { + case e: Exception => { } + } + + // Send the length of the requestPath to let the receiver know that + // transfer is about to start + // In the case of receiver timeout and connection close, this will + // throw a java.net.SocketException: Broken pipe + oos.writeObject(requestedFileLen) + oos.flush() + + logInfo("requestedFileLen = " + requestedFileLen) + + // Read and send the requested file + if (requestedFileLen != -1) { + // Read + var byteArray = new Array[Byte](requestedFileLen) + val bis = + new BufferedInputStream(new FileInputStream(requestedFile)) + + var bytesRead = bis.read(byteArray, 0, byteArray.length) + var alreadyRead = bytesRead + + while (alreadyRead < requestedFileLen) { + bytesRead = bis.read(byteArray, alreadyRead, + (byteArray.length - alreadyRead)) + if(bytesRead > 0) { + alreadyRead = alreadyRead + bytesRead + } + } + bis.close() + + // Send + bos.write(byteArray, 0, byteArray.length) + bos.flush() + + // Update loop variables + numBlocksToSend = numBlocksToSend - 1 + + curTime = System.currentTimeMillis + // Revoke sending only if there is anyone waiting in the queue + if (curTime - startTime >= Shuffle.MaxChatTime && + threadPool.getQueue.size > 0) { + keepSending = false + } + } else { + // Close the connection + } } } catch { // If something went wrong, e.g., the worker at the other end died etc // then close everything up // Exception can happen if the receiver stops receiving + // EOFException is expected to happen because receiver can break + // connection as soon as it has all the blocks + case eofe: java.io.EOFException => { } case e: Exception => { logInfo("ShuffleServerThread had a " + e) } diff --git a/src/scala/spark/Shuffle.scala b/src/scala/spark/Shuffle.scala index 31ae0976658becbd03754220cef67bf71888f328..be78bf9a3c4aa858abf96056e683c135137d4640 100644 --- a/src/scala/spark/Shuffle.scala +++ b/src/scala/spark/Shuffle.scala @@ -44,6 +44,12 @@ extends Logging { private var MaxTxConnections_ = System.getProperty( "spark.shuffle.maxTxConnections", "8").toInt + // Upper limit on receiving in blocked implementations (whichever comes first) + private var MaxChatTime_ = System.getProperty( + "spark.shuffle.maxChatTime", "250").toInt + private var MaxChatBlocks_ = System.getProperty( + "spark.shuffle.maxChatBlocks", "1024").toInt + def MasterHostAddress = MasterHostAddress_ def MasterTrackerPort = MasterTrackerPort_ @@ -55,6 +61,9 @@ extends Logging { def MaxRxConnections = MaxRxConnections_ def MaxTxConnections = MaxTxConnections_ + def MaxChatTime = MaxChatTime_ + def MaxChatBlocks = MaxChatBlocks_ + // Returns a standard ThreadFactory except all threads are daemons private def newDaemonThreadFactory: ThreadFactory = { new ThreadFactory { diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index 156c9d4dcc0f327249a9da44bde69a3500651790..f0534bebc11ad12e996e18e8e7783288bce1aec2 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -458,6 +458,9 @@ extends Shuffle[K, V, C] with Logging { // Receive BLOCKNUM totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int] + // Turn the timer OFF, if the sender responds before timeout + timeOutTimer.cancel() + // Request specific block oosSource.writeObject(hasBlocksInSplit(splitIndex)) @@ -465,9 +468,6 @@ extends Shuffle[K, V, C] with Logging { var requestedFileLen = oisSource.readObject.asInstanceOf[Int] logInfo("Received requestedFileLen = " + requestedFileLen) - // Turn the timer OFF, if the sender responds before timeout - timeOutTimer.cancel() - // Create a temp variable to be used in different places val requestPath = "http://%s:%d/shuffle/%s-%d".format( serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit,