Skip to content
Snippets Groups Projects
Commit e0db4e04 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

- HDFS storing is in separate thread.

- Receivers now ask for a range instead of expecting the whole variable. But,
  they are still asking for the whole range from a single source.
- Next step: make receivers ask for different parts from different sources.
  Also, make sure that Master sends back a list of sources instead of a single
  one.
parent 10cf3828
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@
FWDIR=`dirname $0`
# Set JAVA_OPTS to be able to load libnexus.so and set various other misc options
export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xmx2000m -Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=1024 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false"
export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xmx2000m -Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=4096 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false"
if [ -e $FWDIR/conf/java-opts ] ; then
JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`"
......
......@@ -3,7 +3,7 @@ import spark.SparkContext
object BroadcastTest {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: BroadcastTest <host> [<slices>]")
System.err.println("Usage: BroadcastTest <host> [<slices>] [<array-len>]")
System.exit(1)
}
val spark = new SparkContext(args(0), "Broadcast Test")
......
......@@ -52,6 +52,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@transient var hostAddress = InetAddress.getLocalHost.getHostAddress
@transient var listenPort = -1
@transient var guidePort = -1
@transient var hasCopyInHDFS = false
if (!local) { sendBroadcast }
......@@ -99,16 +101,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
guidePortLock.wait
}
}
BroadcastCS.synchronized {
BroadcastCS.registerValue (uuid, guidePort)
}
BroadcastCS.registerValue (uuid, guidePort)
// TODO: Make it a separate thread?
// Now store a persistent copy in HDFS, just in case
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
out.writeObject (value_)
out.close
// Now store a persistent copy in HDFS, in a separate thread
new Runnable {
override def run = {
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
out.writeObject (value_)
out.close
hasCopyInHDFS = true
}
}
}
private def readObject (in: ObjectInputStream) {
......@@ -218,12 +221,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
var oisTracker: ObjectInputStream = null
var oosTracker: ObjectOutputStream = null
var masterListenPort: Int = -1
// masterListenPort aka guidePort value legend
// 0 = missed the broadcast, read from HDFS;
// <0 = hasn't started yet, wait & retry; (never happens)
// <0 = hasn't started yet, wait & retry;
// >0 = Read from this port
var masterListenPort: Int = -1
var retriesLeft = BroadcastCS.maxRetryCount
do {
try {
......@@ -314,7 +317,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
return retByteArray
}
// Tries to receive broadcast from the Master and returns Boolean status.
// Tries to receive broadcast from the source and returns Boolean status.
// This might be called multiple times to retry a defined number of times.
private def receiveSingleTransmission(sourceInfo: SourceInfo): Array[Byte] = {
var clientSocketToSource: Socket = null
......@@ -334,6 +337,10 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// println (System.currentTimeMillis + ": " + "Inside receiveSingleTransmission")
// println (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
// Send the range
oosSource.writeObject((0, totalBlocks))
retByteArray = new Array[Byte] (totalBytes)
for (i <- 0 until totalBlocks) {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
......@@ -379,7 +386,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
var keepAccepting = true
try {
while (keepAccepting) {
// Don't stop until there is a copy in HDFS
while (keepAccepting || !hasCopyInHDFS) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout (BroadcastCS.serverSocketTimout)
......@@ -400,6 +408,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
}
BroadcastCS.unregisterValue (uuid)
} finally {
serverSocket.close
}
......@@ -549,9 +558,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
private val ois = new ObjectInputStream (clientSocket.getInputStream)
private var sendFrom = 0
private var sendUntil = totalBlocks
def run = {
try {
// println (System.currentTimeMillis + ": " + "new ServeSingleRequest is running")
// Receive range to send
var sendRange = ois.readObject.asInstanceOf[(Int, Int)]
sendFrom = sendRange._1
sendUntil = sendRange._2
sendObject
} catch {
// TODO: Need to add better exception handling here
......@@ -576,7 +594,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
for (i <- 0 until totalBlocks) {
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized {
hasBlocksLock.wait
......@@ -744,9 +762,9 @@ private object BroadcastCS {
}
}
// TODO: Who call this and when?
def unregisterValue (uuid: UUID) {
valueToGuidePortMap.synchronized {
// Set to 0 to make sure that people read it from HDFS
valueToGuidePortMap (uuid) = 0
// println (System.currentTimeMillis + ": " + "Value unregistered from the Tracker " + valueToGuidePortMap)
}
......@@ -778,14 +796,14 @@ private object BroadcastCS {
if (clientSocket != null) {
try {
threadPool.execute (new Runnable {
def run = {
override def run = {
val oos = new ObjectOutputStream (clientSocket.getOutputStream)
val ois = new ObjectInputStream (clientSocket.getInputStream)
try {
val uuid = ois.readObject.asInstanceOf[UUID]
// masterListenPort/guidePort value legend
// 0 = missed the broadcast, read from HDFS;
// <0 = hasn't started yet, wait & retry; (never happens)
// <0 = hasn't started yet, wait & retry;
// >0 = Read from this port
var guidePort = if (valueToGuidePortMap.contains (uuid)) {
valueToGuidePortMap (uuid)
......
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment