Skip to content
Snippets Groups Projects
Commit bb0178d1 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- Receiving retry now starts from where the last try left off, not from the very

  beginning.
- Some refactoring.
parent ee6c524f
No related branches found
No related tags found
No related merge requests found
...@@ -136,10 +136,11 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -136,10 +136,11 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
val start = System.nanoTime val start = System.nanoTime
val retByteArray = receiveBroadcast (uuid) val receptionSucceeded = receiveBroadcast (uuid)
// If does not succeed, then get from HDFS copy // If does not succeed, then get from HDFS copy
if (retByteArray != null) { if (receptionSucceeded) {
value_ = byteArrayToObject[T] (retByteArray) // value_ = byteArrayToObject[T] (retByteArray)
value_ = unBlockifyObject[T]
BroadcastCS.values.put (uuid, value_) BroadcastCS.values.put (uuid, value_)
} else { } else {
val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
...@@ -199,6 +200,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -199,6 +200,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
return variableInfo return variableInfo
} }
private def unBlockifyObject[A]: A = {
var retByteArray = new Array[Byte] (totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy (arrayOfBlocks(i).byteArray, 0, retByteArray,
i * BroadcastCS.blockSize, arrayOfBlocks(i).byteArray.length)
}
byteArrayToObject (retByteArray)
}
private def byteArrayToObject[A] (bytes: Array[Byte]): A = { private def byteArrayToObject[A] (bytes: Array[Byte]): A = {
val in = new ObjectInputStream (new ByteArrayInputStream (bytes)) val in = new ObjectInputStream (new ByteArrayInputStream (bytes))
val retVal = in.readObject.asInstanceOf[A] val retVal = in.readObject.asInstanceOf[A]
...@@ -215,15 +225,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -215,15 +225,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
return bOut return bOut
} }
def receiveBroadcast (variableUUID: UUID): Array[Byte] = { // masterListenPort aka guidePort value legend
// 0 = missed the broadcast, read from HDFS;
// <0 = hasn't started yet, wait & retry;
// >0 = Read from this port
def getMasterListenPort (variableUUID: UUID): Int = {
var clientSocketToTracker: Socket = null var clientSocketToTracker: Socket = null
var oisTracker: ObjectInputStream = null var oisTracker: ObjectInputStream = null
var oosTracker: ObjectOutputStream = null var oosTracker: ObjectOutputStream = null
// masterListenPort aka guidePort value legend
// 0 = missed the broadcast, read from HDFS;
// <0 = hasn't started yet, wait & retry;
// >0 = Read from this port
var masterListenPort: Int = -1 var masterListenPort: Int = -1
var retriesLeft = BroadcastCS.maxRetryCount var retriesLeft = BroadcastCS.maxRetryCount
...@@ -248,13 +258,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -248,13 +258,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
if (oosTracker != null) { oosTracker.close } if (oosTracker != null) { oosTracker.close }
if (clientSocketToTracker != null) { clientSocketToTracker.close } if (clientSocketToTracker != null) { clientSocketToTracker.close }
} }
retriesLeft -= 1 retriesLeft -= 1
// TODO: Should wait before retrying
} while (retriesLeft > 0 && masterListenPort < 0) } while (retriesLeft > 0 && masterListenPort < 0)
// println (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + masterListenPort) // println (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + masterListenPort)
return masterListenPort
}
def receiveBroadcast (variableUUID: UUID): Boolean = {
// Get masterListenPort for this variable from the Tracker
val masterListenPort = getMasterListenPort (variableUUID)
// If Tracker says that there is no guide for this object, read from HDFS // If Tracker says that there is no guide for this object, read from HDFS
if (masterListenPort == 0) { return null } if (masterListenPort == 0) { return false }
// Wait until hostAddress and listenPort are created by the // Wait until hostAddress and listenPort are created by the
// ServeMultipleRequests thread // ServeMultipleRequests thread
...@@ -266,8 +281,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -266,8 +281,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Connect and receive broadcast from the specified source, retrying the // Connect and receive broadcast from the specified source, retrying the
// specified number of times in case of failures // specified number of times in case of failures
retriesLeft = BroadcastCS.maxRetryCount var retriesLeft = BroadcastCS.maxRetryCount
var retByteArray: Array[Byte] = null
do { do {
// Connect to Master and send this worker's Information // Connect to Master and send this worker's Information
val clientSocketToMaster = val clientSocketToMaster =
...@@ -294,13 +308,13 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -294,13 +308,13 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) // println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
val start = System.nanoTime val start = System.nanoTime
retByteArray = receiveSingleTransmission (sourceInfo) val receptionSucceeded = receiveSingleTransmission (sourceInfo)
val time = (System.nanoTime - start) / 1e9 val time = (System.nanoTime - start) / 1e9
// println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray) // println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray)
// Updating some statistics in sourceInfo. Master will be using them later // Updating some statistics in sourceInfo. Master will be using them later
if (retByteArray == null) { sourceInfo.receptionFailed = true } if (!receptionSucceeded) { sourceInfo.receptionFailed = true }
sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time
// Send back statistics to the Master // Send back statistics to the Master
...@@ -308,23 +322,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -308,23 +322,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
oisMaster.close oisMaster.close
oosMaster.close oosMaster.close
clientSocketToMaster.close clientSocketToMaster.close
retriesLeft -= 1 retriesLeft -= 1
} while (retriesLeft > 0 && retByteArray == null) } while (retriesLeft > 0 && hasBlocks != totalBlocks)
return retByteArray return (hasBlocks == totalBlocks)
} }
// Tries to receive broadcast from the source and returns Boolean status. // Tries to receive broadcast from the source and returns Boolean status.
// This might be called multiple times to retry a defined number of times. // This might be called multiple times to retry a defined number of times.
private def receiveSingleTransmission(sourceInfo: SourceInfo): Array[Byte] = { private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = {
var clientSocketToSource: Socket = null var clientSocketToSource: Socket = null
var oisSource: ObjectInputStream = null var oisSource: ObjectInputStream = null
var oosSource: ObjectOutputStream = null var oosSource: ObjectOutputStream = null
var retByteArray:Array[Byte] = null var receptionSucceeded = false
try { try {
// Connect to the source to get the object itself // Connect to the source to get the object itself
clientSocketToSource = clientSocketToSource =
...@@ -338,25 +351,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -338,25 +351,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// println (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) // println (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
// Send the range // Send the range
oosSource.writeObject((0, totalBlocks)) oosSource.writeObject((hasBlocks, totalBlocks))
retByteArray = new Array[Byte] (totalBytes) for (i <- hasBlocks until totalBlocks) {
for (i <- 0 until totalBlocks) {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
System.arraycopy (bcBlock.byteArray, 0, retByteArray,
i * BroadcastCS.blockSize, bcBlock.byteArray.length)
arrayOfBlocks(hasBlocks) = bcBlock arrayOfBlocks(hasBlocks) = bcBlock
hasBlocks += 1 hasBlocks += 1
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized { hasBlocksLock.synchronized {
hasBlocksLock.notifyAll hasBlocksLock.notifyAll
} }
// println (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock) // println (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock)
} }
assert (hasBlocks == totalBlocks)
// println (System.currentTimeMillis + ": " + "After the receive loop") // println (System.currentTimeMillis + ": " + "After the receive loop")
} catch { } catch {
case e: Exception => { case e: Exception => {
retByteArray = null
// println (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e) // println (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e)
} }
} finally { } finally {
...@@ -365,7 +375,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -365,7 +375,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
if (clientSocketToSource != null) { clientSocketToSource.close } if (clientSocketToSource != null) { clientSocketToSource.close }
} }
return retByteArray return receptionSucceeded
} }
class GuideMultipleRequests extends Thread { class GuideMultipleRequests extends Thread {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment