Skip to content
Snippets Groups Projects
Commit 1e26fb39 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

CustomBlockedLocalFileShuffle: reducers are reading multiple blocks per...

CustomBlockedLocalFileShuffle: reducers are reading multiple blocks per connections instead of just one.
Sometimes ShuffleServer fails to start for small shuffle data with small block size in local VM. No problem with large block size.
parent fb51df0b
No related branches found
No related tags found
No related merge requests found
-Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=4096 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.class=spark.CustomBlockedLocalFileShuffle -Dspark.shuffle.masterHostAddress=127.0.0.1 -Dspark.shuffle.masterTrackerPort=22222 -Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy -Dspark.shuffle.maxRxConnections=2 -Dspark.shuffle.maxTxConnections=2 -Dspark.shuffle.blockSize=256 -Dspark.shuffle.minKnockInterval=100 -Dspark.shuffle.maxKnockInterval=2000 -Dspark.shuffle.maxChatTime=50
...@@ -298,6 +298,9 @@ extends Shuffle[K, V, C] with Logging { ...@@ -298,6 +298,9 @@ extends Shuffle[K, V, C] with Logging {
// Receive BLOCKNUM // Receive BLOCKNUM
totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int] totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int]
// Turn the timer OFF, if the sender responds before timeout
timeOutTimer.cancel()
// Request specific block // Request specific block
oosSource.writeObject(hasBlocksInSplit(splitIndex)) oosSource.writeObject(hasBlocksInSplit(splitIndex))
...@@ -305,9 +308,6 @@ extends Shuffle[K, V, C] with Logging { ...@@ -305,9 +308,6 @@ extends Shuffle[K, V, C] with Logging {
var requestedFileLen = oisSource.readObject.asInstanceOf[Int] var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
logInfo("Received requestedFileLen = " + requestedFileLen) logInfo("Received requestedFileLen = " + requestedFileLen)
// Turn the timer OFF, if the sender responds before timeout
timeOutTimer.cancel()
val requestSplit = "%d/%d/%d-%d".format(shuffleId, inputId, myId, val requestSplit = "%d/%d/%d-%d".format(shuffleId, inputId, myId,
hasBlocksInSplit(splitIndex)) hasBlocksInSplit(splitIndex))
......
...@@ -285,69 +285,75 @@ extends Shuffle[K, V, C] with Logging { ...@@ -285,69 +285,75 @@ extends Shuffle[K, V, C] with Logging {
// Receive BLOCKNUM // Receive BLOCKNUM
totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int] totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int]
// Request specific block
oosSource.writeObject(hasBlocksInSplit(splitIndex))
// Good to go. First, receive the length of the requested file
var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
logInfo("Received requestedFileLen = " + requestedFileLen)
// Turn the timer OFF, if the sender responds before timeout // Turn the timer OFF, if the sender responds before timeout
timeOutTimer.cancel() timeOutTimer.cancel()
val requestPath = "%d/%d/%d-%d".format(shuffleId, inputId, myId, while (hasBlocksInSplit(splitIndex) < totalBlocksInSplit(splitIndex)) {
hasBlocksInSplit(splitIndex)) // Set receptionSucceeded to false before trying for each block
receptionSucceeded = false
// Receive the file
if (requestedFileLen != -1) { // Request specific block
val readStartTime = System.currentTimeMillis oosSource.writeObject(hasBlocksInSplit(splitIndex))
logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
// Receive data in an Array[Byte]
var recvByteArray = new Array[Byte](requestedFileLen)
var alreadyRead = 0
var bytesRead = 0
while (alreadyRead != requestedFileLen) { // Good to go. First, receive the length of the requested file
bytesRead = isSource.read(recvByteArray, alreadyRead, var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
requestedFileLen - alreadyRead) logInfo("Received requestedFileLen = " + requestedFileLen)
if (bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead val requestPath = "%d/%d/%d-%d".format(shuffleId, inputId, myId,
} hasBlocksInSplit(splitIndex))
}
// Make it available to the consumer // Receive the file
try { if (requestedFileLen != -1) {
receivedData.put((splitIndex, recvByteArray)) val readStartTime = System.currentTimeMillis
} catch { logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
case e: Exception => {
logInfo("Exception during putting data into receivedData") // Receive data in an Array[Byte]
var recvByteArray = new Array[Byte](requestedFileLen)
var alreadyRead = 0
var bytesRead = 0
while (alreadyRead != requestedFileLen) {
bytesRead = isSource.read(recvByteArray, alreadyRead,
requestedFileLen - alreadyRead)
if (bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
}
}
// Make it available to the consumer
try {
receivedData.put((splitIndex, recvByteArray))
} catch {
case e: Exception => {
logInfo("Exception during putting data into receivedData")
}
} }
}
// TODO: Updating stats before consumption is completed
// TODO: Updating stats before consumption is completed hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
hasBlocksInSplit(splitIndex) = hasBlocksInSplit(splitIndex) + 1
// Split has been received only if all the blocks have been received
// Split has been received only if all the blocks have been received if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) {
if (hasBlocksInSplit(splitIndex) == totalBlocksInSplit(splitIndex)) { hasSplitsBitVector.synchronized {
hasSplitsBitVector.synchronized { hasSplitsBitVector.set(splitIndex)
hasSplitsBitVector.set(splitIndex) }
hasSplits += 1
} }
hasSplits += 1
}
// We have received splitIndex // We have received splitIndex
splitsInRequestBitVector.synchronized { splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex, false) splitsInRequestBitVector.set(splitIndex, false)
} }
receptionSucceeded = true // Consistent state in accounting variables
receptionSucceeded = true
logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath)) logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
val readTime = System.currentTimeMillis - readStartTime val readTime = System.currentTimeMillis - readStartTime
logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath) + " took " + readTime + " millis.") logInfo("Reading http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath) + " took " + readTime + " millis.")
} else { } else {
throw new SparkException("ShuffleServer " + hostAddress + " does not have " + requestPath) throw new SparkException("ShuffleServer " + hostAddress + " does not have " + requestPath)
}
} }
} catch { } catch {
// EOFException is expected to happen because sender can break // EOFException is expected to happen because sender can break
...@@ -573,9 +579,9 @@ object CustomBlockedLocalFileShuffle extends Logging { ...@@ -573,9 +579,9 @@ object CustomBlockedLocalFileShuffle extends Logging {
val (shuffleId, myIndex, outputId) = val (shuffleId, myIndex, outputId) =
ois.readObject.asInstanceOf[(Int, Int, Int)] ois.readObject.asInstanceOf[(Int, Int, Int)]
var requestPath = "%d/%d/%d".format(shuffleId, myIndex, outputId) var requestPathBase = "%d/%d/%d".format(shuffleId, myIndex, outputId)
logInfo("requestPath: " + requestPath) logInfo("requestPathBase: " + requestPathBase)
// Read BLOCKNUM file and send back the total number of blocks // Read BLOCKNUM file and send back the total number of blocks
val blockNumFilePath = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir, val blockNumFilePath = "%s/%d/%d/BLOCKNUM-%d".format(shuffleDir,
...@@ -586,61 +592,81 @@ object CustomBlockedLocalFileShuffle extends Logging { ...@@ -586,61 +592,81 @@ object CustomBlockedLocalFileShuffle extends Logging {
blockNumIn.close() blockNumIn.close()
oos.writeObject(BLOCKNUM) oos.writeObject(BLOCKNUM)
// Receive specific block request
val blockId = ois.readObject.asInstanceOf[Int]
// Ready to send
requestPath = requestPath + "-" + blockId
// Open the file
var requestedFile: File = null
var requestedFileLen = -1
try {
requestedFile = new File(shuffleDir + "/" + requestPath)
requestedFileLen = requestedFile.length.toInt
} catch {
case e: Exception => { }
}
// Send the length of the requestPath to let the receiver know that
// transfer is about to start
// In the case of receiver timeout and connection close, this will
// throw a java.net.SocketException: Broken pipe
oos.writeObject(requestedFileLen)
oos.flush()
logInfo("requestedFileLen = " + requestedFileLen)
// Read and send the requested file val startTime = System.currentTimeMillis
if (requestedFileLen != -1) { var curTime = startTime
// Read var keepSending = true
var byteArray = new Array[Byte](requestedFileLen) var numBlocksToSend = Shuffle.MaxChatBlocks
val bis =
new BufferedInputStream(new FileInputStream(requestedFile)) while (keepSending && numBlocksToSend > 0) {
// Receive specific block request
var bytesRead = bis.read(byteArray, 0, byteArray.length) val blockId = ois.readObject.asInstanceOf[Int]
var alreadyRead = bytesRead
while (alreadyRead < requestedFileLen) {
bytesRead = bis.read(byteArray, alreadyRead,
(byteArray.length - alreadyRead))
if(bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
}
}
bis.close()
// Send // Ready to send
bos.write(byteArray, 0, byteArray.length) val requestPath = requestPathBase + "-" + blockId
bos.flush()
} else { // Open the file
// Close the connection var requestedFile: File = null
var requestedFileLen = -1
try {
requestedFile = new File(shuffleDir + "/" + requestPath)
requestedFileLen = requestedFile.length.toInt
} catch {
case e: Exception => { }
}
// Send the length of the requestPath to let the receiver know that
// transfer is about to start
// In the case of receiver timeout and connection close, this will
// throw a java.net.SocketException: Broken pipe
oos.writeObject(requestedFileLen)
oos.flush()
logInfo("requestedFileLen = " + requestedFileLen)
// Read and send the requested file
if (requestedFileLen != -1) {
// Read
var byteArray = new Array[Byte](requestedFileLen)
val bis =
new BufferedInputStream(new FileInputStream(requestedFile))
var bytesRead = bis.read(byteArray, 0, byteArray.length)
var alreadyRead = bytesRead
while (alreadyRead < requestedFileLen) {
bytesRead = bis.read(byteArray, alreadyRead,
(byteArray.length - alreadyRead))
if(bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
}
}
bis.close()
// Send
bos.write(byteArray, 0, byteArray.length)
bos.flush()
// Update loop variables
numBlocksToSend = numBlocksToSend - 1
curTime = System.currentTimeMillis
// Revoke sending only if there is anyone waiting in the queue
if (curTime - startTime >= Shuffle.MaxChatTime &&
threadPool.getQueue.size > 0) {
keepSending = false
}
} else {
// Close the connection
}
} }
} catch { } catch {
// If something went wrong, e.g., the worker at the other end died etc // If something went wrong, e.g., the worker at the other end died etc
// then close everything up // then close everything up
// Exception can happen if the receiver stops receiving // Exception can happen if the receiver stops receiving
// EOFException is expected to happen because receiver can break
// connection as soon as it has all the blocks
case eofe: java.io.EOFException => { }
case e: Exception => { case e: Exception => {
logInfo("ShuffleServerThread had a " + e) logInfo("ShuffleServerThread had a " + e)
} }
......
...@@ -44,6 +44,12 @@ extends Logging { ...@@ -44,6 +44,12 @@ extends Logging {
private var MaxTxConnections_ = System.getProperty( private var MaxTxConnections_ = System.getProperty(
"spark.shuffle.maxTxConnections", "8").toInt "spark.shuffle.maxTxConnections", "8").toInt
// Upper limit on receiving in blocked implementations (whichever comes first)
private var MaxChatTime_ = System.getProperty(
"spark.shuffle.maxChatTime", "250").toInt
private var MaxChatBlocks_ = System.getProperty(
"spark.shuffle.maxChatBlocks", "1024").toInt
def MasterHostAddress = MasterHostAddress_ def MasterHostAddress = MasterHostAddress_
def MasterTrackerPort = MasterTrackerPort_ def MasterTrackerPort = MasterTrackerPort_
...@@ -55,6 +61,9 @@ extends Logging { ...@@ -55,6 +61,9 @@ extends Logging {
def MaxRxConnections = MaxRxConnections_ def MaxRxConnections = MaxRxConnections_
def MaxTxConnections = MaxTxConnections_ def MaxTxConnections = MaxTxConnections_
def MaxChatTime = MaxChatTime_
def MaxChatBlocks = MaxChatBlocks_
// Returns a standard ThreadFactory except all threads are daemons // Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = { private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory { new ThreadFactory {
......
...@@ -458,6 +458,9 @@ extends Shuffle[K, V, C] with Logging { ...@@ -458,6 +458,9 @@ extends Shuffle[K, V, C] with Logging {
// Receive BLOCKNUM // Receive BLOCKNUM
totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int] totalBlocksInSplit(splitIndex) = oisSource.readObject.asInstanceOf[Int]
// Turn the timer OFF, if the sender responds before timeout
timeOutTimer.cancel()
// Request specific block // Request specific block
oosSource.writeObject(hasBlocksInSplit(splitIndex)) oosSource.writeObject(hasBlocksInSplit(splitIndex))
...@@ -465,9 +468,6 @@ extends Shuffle[K, V, C] with Logging { ...@@ -465,9 +468,6 @@ extends Shuffle[K, V, C] with Logging {
var requestedFileLen = oisSource.readObject.asInstanceOf[Int] var requestedFileLen = oisSource.readObject.asInstanceOf[Int]
logInfo("Received requestedFileLen = " + requestedFileLen) logInfo("Received requestedFileLen = " + requestedFileLen)
// Turn the timer OFF, if the sender responds before timeout
timeOutTimer.cancel()
// Create a temp variable to be used in different places // Create a temp variable to be used in different places
val requestPath = "http://%s:%d/shuffle/%s-%d".format( val requestPath = "http://%s:%d/shuffle/%s-%d".format(
serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit, serversplitInfo.hostAddress, serversplitInfo.listenPort, requestSplit,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment