diff --git a/conf/java-opts b/conf/java-opts index 3bd5c7980388b73bc87c1e372e2e7f045b7968e8..168f0f0da1daffa5d6dafd18ba98d6c39e33f1b9 100644 --- a/conf/java-opts +++ b/conf/java-opts @@ -1 +1 @@ --Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500 +-Dspark.shuffle.class=spark.CustomBlockedInMemoryShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=500 diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 382c39758932314e18a6ea7c0b473af5fa8c4dab..9a43e442b397c6c61943fe698b722eb659c2bdab 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -309,66 +309,72 @@ extends Shuffle[K, V, C] with Logging { // Turn the timer OFF, if the sender responds before timeout timeOutTimer.cancel() - // Request specific block - oosSource.writeObject(hasBlocksInSplit(splitIndex)) - - // Good to go. First, receive the length of the requested file - var requestedFileLen = oisSource.readObject.asInstanceOf[Int] - logInfo("Received requestedFileLen = " + requestedFileLen) + while (hasBlocksInSplit(splitIndex) < totalBlocksInSplit(splitIndex)) { + // Set receptionSucceeded to false before trying for each block + receptionSucceeded = false - val requestSplit = "%d/%d/%d-%d".format(shuffleId, inputId, myId, - hasBlocksInSplit(splitIndex)) - - // Receive the file - if (requestedFileLen != -1) { - val readStartTime = System.currentTimeMillis - logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestSplit)) - - // Receive data in an Array[Byte] - var recvByteArray = new Array[Byte](requestedFileLen) - var alreadyRead = 0 - var bytesRead = 0 + // Request specific block + oosSource.writeObject(hasBlocksInSplit(splitIndex)) - while (alreadyRead != requestedFileLen) { - bytesRead = isSource.read(recvByteArray, alreadyRead, - requestedFileLen - alreadyRead) - if (bytesRead > 0) { - alreadyRead = alreadyRead + bytesRead - } - } + // Good to go. First, receive the length of the requested file + var requestedFileLen = oisSource.readObject.asInstanceOf[Int] + logInfo("Received requestedFileLen = " + requestedFileLen) + + val requestSplit = "%d/%d/%d-%d".format(shuffleId, inputId, myId, + hasBlocksInSplit(splitIndex)) - // Make it available to the consumer - try { - receivedData.put((splitIndex, recvByteArray)) - } catch { - case e: Exception => { - logInfo("Exception during putting data into receivedData") + // Receive the file + if (requestedFileLen != -1) { + val readStartTime = System.currentTimeMillis + logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestSplit)) + + // Receive data in an Array[Byte] + var recvByteArray = new Array[Byte](requestedFileLen) + var alreadyRead = 0 + var bytesRead = 0 + + while (alreadyRead != requestedFileLen) { + bytesRead = isSource.read(recvByteArray, alreadyRead, + requestedFileLen - alreadyRead) + if (bytesRead > 0) { + alreadyRead = alreadyRead + bytesRead + } + } + + // Make it available to the consumer + try { + receivedData.put((splitIndex, recvByteArray)) + } catch { + case e: Exception => { + logInfo("Exception during putting data into receivedData") + } } - } - - // TODO: Updating stats before consumption is completed - hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 - - // Split has been received only if all the blocks have been received - if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { - hasSplitsBitVector.synchronized { - hasSplitsBitVector.set(splitIndex) + + // TODO: Updating stats before consumption is completed + hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1 + + // Split has been received only if all the blocks have been received + if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { + hasSplitsBitVector.synchronized { + hasSplitsBitVector.set(splitIndex) + } + hasSplits += 1 } - hasSplits += 1 - } - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } + // We have received splitIndex + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) + } - receptionSucceeded = true + // Consistent state in accounting variables + receptionSucceeded = true - logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestSplit)) - val readTime = System.currentTimeMillis - readStartTime - logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestSplit) + " took " + readTime + " millis.") - } else { + logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestSplit)) + val readTime = System.currentTimeMillis - readStartTime + logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestSplit) + " took " + readTime + " millis.") + } else { throw new SparkException("ShuffleServer " + hostAddress + " does not have " + requestSplit) + } } } catch { // EOFException is expected to happen because sender can break @@ -561,9 +567,9 @@ object CustomBlockedInMemoryShuffle extends Logging { val (shuffleId, myIndex, outputId) = ois.readObject.asInstanceOf[(Int, Int, Int)] - var requestedSplit = - "%s/%d/%d/%d".format(shuffleDir, shuffleId, myIndex, outputId) - logInfo("requestedSplit: " + requestedSplit) + var requestedSplitBase = "%s/%d/%d/%d".format( + shuffleDir, shuffleId, myIndex, outputId) + logInfo("requestedSplit: " + requestedSplitBase) // Read BLOCKNUM and send back the total number of blocks val blockNumName = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir, @@ -576,43 +582,63 @@ object CustomBlockedInMemoryShuffle extends Logging { oos.writeObject(BLOCKNUM) - // Receive specific block request - val blockId = ois.readObject.asInstanceOf[Int] - - // Ready to send - requestedSplit = requestedSplit + "-" + blockId + val startTime = System.currentTimeMillis + var curTime = startTime + var keepSending = true + var numBlocksToSend = Shuffle.MaxChatBlocks - // Send the length of the requestedSplit to let the receiver know that - // transfer is about to start - // In the case of receiver timeout and connection close, this will - // throw a java.net.SocketException: Broken pipe - var requestedSplitLen = -1 - - try { - requestedSplitLen = - CustomBlockedInMemoryShuffle.splitsCache(requestedSplit).length - } catch { - case e: Exception => { } - } + while (keepSending && numBlocksToSend > 0) { + // Receive specific block request + val blockId = ois.readObject.asInstanceOf[Int] + + // Ready to send + var requestedSplit = requestedSplitBase + "-" + blockId + + // Send the length of the requestedSplit to let the receiver know that + // transfer is about to start + // In the case of receiver timeout and connection close, this will + // throw a java.net.SocketException: Broken pipe + var requestedSplitLen = -1 + + try { + requestedSplitLen = + CustomBlockedInMemoryShuffle.splitsCache(requestedSplit).length + } catch { + case e: Exception => { } + } - oos.writeObject(requestedSplitLen) - oos.flush() - - logInfo("requestedSplitLen = " + requestedSplitLen) - - // Read and send the requested file - if (requestedSplitLen != -1) { - // Send - bos.write(CustomBlockedInMemoryShuffle.splitsCache(requestedSplit), - 0, requestedSplitLen) - bos.flush() - } else { - // Close the connection + oos.writeObject(requestedSplitLen) + oos.flush() + + logInfo("requestedSplitLen = " + requestedSplitLen) + + // Read and send the requested file + if (requestedSplitLen != -1) { + // Send + bos.write(CustomBlockedInMemoryShuffle.splitsCache(requestedSplit), + 0, requestedSplitLen) + bos.flush() + + // Update loop variables + numBlocksToSend = numBlocksToSend - 1 + + curTime = System.currentTimeMillis + // Revoke sending only if there is anyone waiting in the queue + if (curTime - startTime >= Shuffle.MaxChatTime && + threadPool.getQueue.size > 0) { + keepSending = false + } + } else { + // Close the connection + } } } catch { // If something went wrong, e.g., the worker at the other end died etc // then close everything up // Exception can happen if the receiver stops receiving + // EOFException is expected to happen because receiver can break + // connection as soon as it has all the blocks + case eofe: java.io.EOFException => { } case e: Exception => { logInfo("ShuffleServerThread had a " + e) }