Skip to content
Snippets Groups Projects
Commit c6962f51 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

Several things, but the most important one is that now we are using node speed

to select source instead of leecher count.
parent e0db4e04
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
FWDIR=`dirname $0` FWDIR=`dirname $0`
# Set JAVA_OPTS to be able to load libnexus.so and set various other misc options # Set JAVA_OPTS to be able to load libnexus.so and set various other misc options
export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xmx2000m -Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=4096 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false" export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xmx2000m -Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=4096 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=5000 -Dspark.broadcast.dualMode=false"
if [ -e $FWDIR/conf/java-opts ] ; then if [ -e $FWDIR/conf/java-opts ] ; then
JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`" JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`"
......
...@@ -104,14 +104,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -104,14 +104,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
BroadcastCS.registerValue (uuid, guidePort) BroadcastCS.registerValue (uuid, guidePort)
// Now store a persistent copy in HDFS, in a separate thread // Now store a persistent copy in HDFS, in a separate thread
new Runnable { // new Runnable {
override def run = { // override def run = {
// TODO: When threaded, its not written to file
// TODO: On second thought, its better to have it stored before anything starts
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
out.writeObject (value_) out.writeObject (value_)
out.close out.close
hasCopyInHDFS = true hasCopyInHDFS = true
} // }
} // }
} }
private def readObject (in: ObjectInputStream) { private def readObject (in: ObjectInputStream) {
...@@ -139,8 +141,6 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -139,8 +141,6 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
if (retByteArray != null) { if (retByteArray != null) {
value_ = byteArrayToObject[T] (retByteArray) value_ = byteArrayToObject[T] (retByteArray)
BroadcastCS.values.put (uuid, value_) BroadcastCS.values.put (uuid, value_)
// val variableInfo = blockifyObject (value_, BroadcastCS.blockSize)
// BroadcastCS.valueInfos.put (uuid, variableInfo)
} else { } else {
val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
value_ = fileIn.readObject.asInstanceOf[T] value_ = fileIn.readObject.asInstanceOf[T]
...@@ -294,15 +294,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -294,15 +294,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) // println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
val start = System.nanoTime
retByteArray = receiveSingleTransmission (sourceInfo) retByteArray = receiveSingleTransmission (sourceInfo)
val time = (System.nanoTime - start) / 1e9
// println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray) // println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray)
// TODO: Update sourceInfo to add error notifactions for Master // TODO: Update sourceInfo to add error notifactions for Master
if (retByteArray == null) { sourceInfo.receptionFailed = true } if (retByteArray == null) { sourceInfo.receptionFailed = true }
// TODO: Supposed to update values here, but we don't support advanced // Updating some statistics here. Master will be using them later
// statistics right now. Master can handle leecherCount by itself. sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time
// Send back statistics to the Master // Send back statistics to the Master
oosMaster.writeObject (sourceInfo) oosMaster.writeObject (sourceInfo)
...@@ -362,9 +364,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -362,9 +364,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
} }
} finally { } finally {
if (oisSource != null) { oisSource.close } if (oisSource != null) { oisSource.close }
if (oosSource != null) { if (oosSource != null) { oosSource.close }
oosSource.close
}
if (clientSocketToSource != null) { clientSocketToSource.close } if (clientSocketToSource != null) { clientSocketToSource.close }
} }
...@@ -373,6 +373,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -373,6 +373,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
class GuideMultipleRequests extends Thread { class GuideMultipleRequests extends Thread {
override def run = { override def run = {
// TODO: Cached threadpool has 60 s keep alive timer
var threadPool = Executors.newCachedThreadPool var threadPool = Executors.newCachedThreadPool
var serverSocket: ServerSocket = null var serverSocket: ServerSocket = null
...@@ -394,7 +395,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -394,7 +395,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
clientSocket = serverSocket.accept clientSocket = serverSocket.accept
} catch { } catch {
case e: Exception => { case e: Exception => {
// println ("GuideMultipleRequests Timeout. Stopping listening...") // println ("GuideMultipleRequests Timeout. Stopping listening..." + hasCopyInHDFS)
keepAccepting = false keepAccepting = false
} }
} }
...@@ -453,11 +454,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -453,11 +454,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Remove first // Remove first
pqOfSources.remove (selectedSourceInfo) pqOfSources.remove (selectedSourceInfo)
// TODO: Removing a source based on just one failure notification! // TODO: Removing a source based on just one failure notification!
// Update leecher count and put it back in IF reception succeeded // Update sourceInfo and put it back in, IF reception succeeded
if (!sourceInfo.receptionFailed) { if (!sourceInfo.receptionFailed) {
selectedSourceInfo.currentLeechers -= 1 selectedSourceInfo.currentLeechers -= 1
selectedSourceInfo.MBps = sourceInfo.MBps
// Put it back
pqOfSources.add (selectedSourceInfo) pqOfSources.add (selectedSourceInfo)
// Update global source speed statistics
BroadcastCS.setSourceSpeed (
sourceInfo.hostAddress, sourceInfo.MBps)
// No need to find and update thisWorkerInfo, but add its replica // No need to find and update thisWorkerInfo, but add its replica
if (BroadcastCS.dualMode) { if (BroadcastCS.dualMode) {
pqOfSources.add (new SourceInfo (thisWorkerInfo.hostAddress, pqOfSources.add (new SourceInfo (thisWorkerInfo.hostAddress,
...@@ -469,7 +477,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -469,7 +477,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// If something went wrong, e.g., the worker at the other end died etc. // If something went wrong, e.g., the worker at the other end died etc.
// then close everything up // then close everything up
case e: Exception => { case e: Exception => {
// Assuming that exception caused due to receiver worker failure // Assuming that exception caused due to receiver worker failure.
// Remove failed worker from pqOfSources and update leecherCount of // Remove failed worker from pqOfSources and update leecherCount of
// corresponding source worker // corresponding source worker
pqOfSources.synchronized { pqOfSources.synchronized {
...@@ -659,8 +667,27 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int, ...@@ -659,8 +667,27 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int,
var currentLeechers = 0 var currentLeechers = 0
var receptionFailed = false var receptionFailed = false
var MBps: Double = 0.0
def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers) // Ascending sort based on leecher count
// def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
// Descending sort based on speed
// def compareTo (o: SourceInfo): Int = {
// if (MBps > o.MBps) -1
// else if (MBps < o.MBps) 1
// else 0
// }
// Descending sort based on globally stored speed
def compareTo (o: SourceInfo): Int = {
val mySpeed = BroadcastCS.getSourceSpeed (hostAddress)
val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress)
if (mySpeed > urSpeed) -1
else if (mySpeed < urSpeed) 1
else 0
}
} }
@serializable @serializable
...@@ -700,9 +727,10 @@ private object Broadcast { ...@@ -700,9 +727,10 @@ private object Broadcast {
private object BroadcastCS { private object BroadcastCS {
val values = new MapMaker ().softValues ().makeMap[UUID, Any] val values = new MapMaker ().softValues ().makeMap[UUID, Any]
// val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any]
var valueToGuidePortMap = Map[UUID, Int] () var valueToGuidePortMap = Map[UUID, Int] ()
var sourceToSpeedMap = Map[String, Double] ()
private var initialized = false private var initialized = false
private var isMaster_ = false private var isMaster_ = false
...@@ -716,6 +744,9 @@ private object BroadcastCS { ...@@ -716,6 +744,9 @@ private object BroadcastCS {
private var trackMV: TrackMultipleValues = null private var trackMV: TrackMultipleValues = null
// newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed
private val ALPHA = 0.7
def initialize (isMaster__ : Boolean) { def initialize (isMaster__ : Boolean) {
synchronized { synchronized {
if (!initialized) { if (!initialized) {
...@@ -736,7 +767,7 @@ private object BroadcastCS { ...@@ -736,7 +767,7 @@ private object BroadcastCS {
if (isMaster) { if (isMaster) {
trackMV = new TrackMultipleValues trackMV = new TrackMultipleValues
// trackMV.setDaemon (true) trackMV.setDaemon (true)
trackMV.start trackMV.start
// println (System.currentTimeMillis + ": " + "TrackMultipleValues started") // println (System.currentTimeMillis + ": " + "TrackMultipleValues started")
} }
...@@ -762,7 +793,7 @@ private object BroadcastCS { ...@@ -762,7 +793,7 @@ private object BroadcastCS {
} }
} }
def unregisterValue (uuid: UUID) { def unregisterValue (uuid: UUID) = {
valueToGuidePortMap.synchronized { valueToGuidePortMap.synchronized {
// Set to 0 to make sure that people read it from HDFS // Set to 0 to make sure that people read it from HDFS
valueToGuidePortMap (uuid) = 0 valueToGuidePortMap (uuid) = 0
...@@ -770,6 +801,20 @@ private object BroadcastCS { ...@@ -770,6 +801,20 @@ private object BroadcastCS {
} }
} }
def getSourceSpeed (hostAddress: String): Double = {
sourceToSpeedMap.synchronized {
sourceToSpeedMap.getOrElseUpdate(hostAddress, 0.0)
}
}
def setSourceSpeed (hostAddress: String, MBps: Double) = {
sourceToSpeedMap.synchronized {
var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, 0.0)
var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps
sourceToSpeedMap.update (hostAddress, newSpeed)
}
}
class TrackMultipleValues extends Thread { class TrackMultipleValues extends Thread {
override def run = { override def run = {
var threadPool = Executors.newCachedThreadPool var threadPool = Executors.newCachedThreadPool
...@@ -789,7 +834,8 @@ private object BroadcastCS { ...@@ -789,7 +834,8 @@ private object BroadcastCS {
} catch { } catch {
case e: Exception => { case e: Exception => {
// println ("TrackMultipleValues Timeout. Stopping listening...") // println ("TrackMultipleValues Timeout. Stopping listening...")
keepAccepting = false // TODO: Tracking should be explicitly stopped by the SparkContext
// keepAccepting = false
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment