diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index fb1a877a714d4078a82967864fc013d3cc9a429b..40353decdea59e04189dd43cf5b040d80913a8c7 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -136,10 +136,11 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) val start = System.nanoTime - val retByteArray = receiveBroadcast (uuid) + val receptionSucceeded = receiveBroadcast (uuid) // If does not succeed, then get from HDFS copy - if (retByteArray != null) { - value_ = byteArrayToObject[T] (retByteArray) + if (receptionSucceeded) { + // value_ = byteArrayToObject[T] (retByteArray) + value_ = unBlockifyObject[T] BroadcastCS.values.put (uuid, value_) } else { val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) @@ -199,6 +200,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) return variableInfo } + private def unBlockifyObject[A]: A = { + var retByteArray = new Array[Byte] (totalBytes) + for (i <- 0 until totalBlocks) { + System.arraycopy (arrayOfBlocks(i).byteArray, 0, retByteArray, + i * BroadcastCS.blockSize, arrayOfBlocks(i).byteArray.length) + } + byteArrayToObject (retByteArray) + } + private def byteArrayToObject[A] (bytes: Array[Byte]): A = { val in = new ObjectInputStream (new ByteArrayInputStream (bytes)) val retVal = in.readObject.asInstanceOf[A] @@ -215,15 +225,15 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) return bOut } - def receiveBroadcast (variableUUID: UUID): Array[Byte] = { + // masterListenPort aka guidePort value legend + // 0 = missed the broadcast, read from HDFS; + // <0 = hasn't started yet, wait & retry; + // >0 = Read from this port + def getMasterListenPort (variableUUID: UUID): Int = { var clientSocketToTracker: Socket = null var oisTracker: ObjectInputStream = null var oosTracker: ObjectOutputStream = null - // masterListenPort aka guidePort value legend - // 0 = missed the broadcast, read from HDFS; - // <0 = hasn't started yet, wait & retry; - // >0 = Read from this port var masterListenPort: Int = -1 var retriesLeft = BroadcastCS.maxRetryCount @@ -248,13 +258,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) if (oosTracker != null) { oosTracker.close } if (clientSocketToTracker != null) { clientSocketToTracker.close } } - retriesLeft -= 1 + // TODO: Should wait before retrying } while (retriesLeft > 0 && masterListenPort < 0) // println (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + masterListenPort) - + return masterListenPort + } + + def receiveBroadcast (variableUUID: UUID): Boolean = { + // Get masterListenPort for this variable from the Tracker + val masterListenPort = getMasterListenPort (variableUUID) // If Tracker says that there is no guide for this object, read from HDFS - if (masterListenPort == 0) { return null } + if (masterListenPort == 0) { return false } // Wait until hostAddress and listenPort are created by the // ServeMultipleRequests thread @@ -266,8 +281,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // Connect and receive broadcast from the specified source, retrying the // specified number of times in case of failures - retriesLeft = BroadcastCS.maxRetryCount - var retByteArray: Array[Byte] = null + var retriesLeft = BroadcastCS.maxRetryCount do { // Connect to Master and send this worker's Information val clientSocketToMaster = @@ -294,13 +308,13 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) val start = System.nanoTime - retByteArray = receiveSingleTransmission (sourceInfo) + val receptionSucceeded = receiveSingleTransmission (sourceInfo) val time = (System.nanoTime - start) / 1e9 // println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray) // Updating some statistics in sourceInfo. Master will be using them later - if (retByteArray == null) { sourceInfo.receptionFailed = true } + if (!receptionSucceeded) { sourceInfo.receptionFailed = true } sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time // Send back statistics to the Master @@ -308,23 +322,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) oisMaster.close oosMaster.close - clientSocketToMaster.close + clientSocketToMaster.close retriesLeft -= 1 - } while (retriesLeft > 0 && retByteArray == null) + } while (retriesLeft > 0 && hasBlocks != totalBlocks) - return retByteArray + return (hasBlocks == totalBlocks) } // Tries to receive broadcast from the source and returns Boolean status. // This might be called multiple times to retry a defined number of times. - private def receiveSingleTransmission(sourceInfo: SourceInfo): Array[Byte] = { + private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = { var clientSocketToSource: Socket = null var oisSource: ObjectInputStream = null var oosSource: ObjectOutputStream = null - var retByteArray:Array[Byte] = null - + var receptionSucceeded = false try { // Connect to the source to get the object itself clientSocketToSource = @@ -338,25 +351,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // println (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) // Send the range - oosSource.writeObject((0, totalBlocks)) + oosSource.writeObject((hasBlocks, totalBlocks)) - retByteArray = new Array[Byte] (totalBytes) - for (i <- 0 until totalBlocks) { + for (i <- hasBlocks until totalBlocks) { val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] - System.arraycopy (bcBlock.byteArray, 0, retByteArray, - i * BroadcastCS.blockSize, bcBlock.byteArray.length) arrayOfBlocks(hasBlocks) = bcBlock hasBlocks += 1 + // Set to true if at least one block is received + receptionSucceeded = true hasBlocksLock.synchronized { hasBlocksLock.notifyAll } // println (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock) } - assert (hasBlocks == totalBlocks) // println (System.currentTimeMillis + ": " + "After the receive loop") } catch { case e: Exception => { - retByteArray = null // println (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e) } } finally { @@ -365,7 +375,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) if (clientSocketToSource != null) { clientSocketToSource.close } } - return retByteArray + return receptionSucceeded } class GuideMultipleRequests extends Thread {