diff --git a/run b/run index 415d50cabf2c678d863f3abccce8923ee7708eef..2e78025d7c26cfb6a9f7bb8d0eb98f9788d8a6d3 100755 --- a/run +++ b/run @@ -4,7 +4,7 @@ FWDIR=`dirname $0` # Set JAVA_OPTS to be able to load libnexus.so and set various other misc options -export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xmx2000m -Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=4096 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=50000 -Dspark.broadcast.dualMode=false" +export JAVA_OPTS="-Djava.library.path=$FWDIR/third_party:$FWDIR/src/native -Xmx2000m -Dspark.broadcast.masterHostAddress=127.0.0.1 -Dspark.broadcast.masterTrackerPort=11111 -Dspark.broadcast.blockSize=4096 -Dspark.broadcast.maxRetryCount=2 -Dspark.broadcast.serverSocketTimout=5000 -Dspark.broadcast.dualMode=false" if [ -e $FWDIR/conf/java-opts ] ; then JAVA_OPTS+=" `cat $FWDIR/conf/java-opts`" diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index cc81bac01f4118579f8f103bf0485432e03b5a75..e6cac5389ff86a3dd975fc5af58dcf62b720f121 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -104,14 +104,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) BroadcastCS.registerValue (uuid, guidePort) // Now store a persistent copy in HDFS, in a separate thread - new Runnable { - override def run = { + // new Runnable { + // override def run = { + // TODO: When threaded, its not written to file + // TODO: On second thought, its better to have it stored before anything starts val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) out.writeObject (value_) out.close hasCopyInHDFS = true - } - } + // } + // } } private def readObject (in: ObjectInputStream) { @@ -139,8 +141,6 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) if (retByteArray != null) { value_ = byteArrayToObject[T] (retByteArray) BroadcastCS.values.put (uuid, value_) - // val variableInfo = blockifyObject (value_, BroadcastCS.blockSize) - // BroadcastCS.valueInfos.put (uuid, variableInfo) } else { val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) value_ = fileIn.readObject.asInstanceOf[T] @@ -294,15 +294,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort) + val start = System.nanoTime retByteArray = receiveSingleTransmission (sourceInfo) + val time = (System.nanoTime - start) / 1e9 // println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray) // TODO: Update sourceInfo to add error notifactions for Master if (retByteArray == null) { sourceInfo.receptionFailed = true } - // TODO: Supposed to update values here, but we don't support advanced - // statistics right now. Master can handle leecherCount by itself. + // Updating some statistics here. Master will be using them later + sourceInfo.MBps = (sourceInfo.totalBytes.toDouble / 1048576) / time // Send back statistics to the Master oosMaster.writeObject (sourceInfo) @@ -362,9 +364,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } } finally { if (oisSource != null) { oisSource.close } - if (oosSource != null) { - oosSource.close - } + if (oosSource != null) { oosSource.close } if (clientSocketToSource != null) { clientSocketToSource.close } } @@ -373,6 +373,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) class GuideMultipleRequests extends Thread { override def run = { + // TODO: Cached threadpool has 60 s keep alive timer var threadPool = Executors.newCachedThreadPool var serverSocket: ServerSocket = null @@ -394,7 +395,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) clientSocket = serverSocket.accept } catch { case e: Exception => { - // println ("GuideMultipleRequests Timeout. Stopping listening...") + // println ("GuideMultipleRequests Timeout. Stopping listening..." + hasCopyInHDFS) keepAccepting = false } } @@ -453,11 +454,18 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // Remove first pqOfSources.remove (selectedSourceInfo) // TODO: Removing a source based on just one failure notification! - // Update leecher count and put it back in IF reception succeeded + // Update sourceInfo and put it back in, IF reception succeeded if (!sourceInfo.receptionFailed) { selectedSourceInfo.currentLeechers -= 1 + selectedSourceInfo.MBps = sourceInfo.MBps + + // Put it back pqOfSources.add (selectedSourceInfo) + // Update global source speed statistics + BroadcastCS.setSourceSpeed ( + sourceInfo.hostAddress, sourceInfo.MBps) + // No need to find and update thisWorkerInfo, but add its replica if (BroadcastCS.dualMode) { pqOfSources.add (new SourceInfo (thisWorkerInfo.hostAddress, @@ -469,7 +477,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // If something went wrong, e.g., the worker at the other end died etc. // then close everything up case e: Exception => { - // Assuming that exception caused due to receiver worker failure + // Assuming that exception caused due to receiver worker failure. // Remove failed worker from pqOfSources and update leecherCount of // corresponding source worker pqOfSources.synchronized { @@ -659,8 +667,27 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int, var currentLeechers = 0 var receptionFailed = false + var MBps: Double = 0.0 - def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers) + // Ascending sort based on leecher count + // def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers) + + // Descending sort based on speed + // def compareTo (o: SourceInfo): Int = { + // if (MBps > o.MBps) -1 + // else if (MBps < o.MBps) 1 + // else 0 + // } + + // Descending sort based on globally stored speed + def compareTo (o: SourceInfo): Int = { + val mySpeed = BroadcastCS.getSourceSpeed (hostAddress) + val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress) + + if (mySpeed > urSpeed) -1 + else if (mySpeed < urSpeed) 1 + else 0 + } } @serializable @@ -700,9 +727,10 @@ private object Broadcast { private object BroadcastCS { val values = new MapMaker ().softValues ().makeMap[UUID, Any] - // val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any] var valueToGuidePortMap = Map[UUID, Int] () + + var sourceToSpeedMap = Map[String, Double] () private var initialized = false private var isMaster_ = false @@ -716,6 +744,9 @@ private object BroadcastCS { private var trackMV: TrackMultipleValues = null + // newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * curSpeed + private val ALPHA = 0.7 + def initialize (isMaster__ : Boolean) { synchronized { if (!initialized) { @@ -736,7 +767,7 @@ private object BroadcastCS { if (isMaster) { trackMV = new TrackMultipleValues - // trackMV.setDaemon (true) + trackMV.setDaemon (true) trackMV.start // println (System.currentTimeMillis + ": " + "TrackMultipleValues started") } @@ -762,7 +793,7 @@ private object BroadcastCS { } } - def unregisterValue (uuid: UUID) { + def unregisterValue (uuid: UUID) = { valueToGuidePortMap.synchronized { // Set to 0 to make sure that people read it from HDFS valueToGuidePortMap (uuid) = 0 @@ -770,6 +801,20 @@ private object BroadcastCS { } } + def getSourceSpeed (hostAddress: String): Double = { + sourceToSpeedMap.synchronized { + sourceToSpeedMap.getOrElseUpdate(hostAddress, 0.0) + } + } + + def setSourceSpeed (hostAddress: String, MBps: Double) = { + sourceToSpeedMap.synchronized { + var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, 0.0) + var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps + sourceToSpeedMap.update (hostAddress, newSpeed) + } + } + class TrackMultipleValues extends Thread { override def run = { var threadPool = Executors.newCachedThreadPool @@ -789,7 +834,8 @@ private object BroadcastCS { } catch { case e: Exception => { // println ("TrackMultipleValues Timeout. Stopping listening...") - keepAccepting = false + // TODO: Tracking should be explicitly stopped by the SparkContext + // keepAccepting = false } }