From 1c1ac3161de50af7fbdb70f735f7ff7d8cc597d6 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury <mosharaf@mosharaf-ubuntu.(none)> Date: Mon, 19 Apr 2010 20:32:17 -0700 Subject: [PATCH] More porting of SplitStream code. --- src/scala/spark/Broadcast.scala | 132 ++++++++++++++++++++++++++++++-- 1 file changed, 127 insertions(+), 5 deletions(-) diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 1a95860e32..6aee3d5a26 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -640,7 +640,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } @serializable -class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean) +class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) extends BroadcastRecipe { def value = value_ @@ -664,6 +664,123 @@ class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean) } } } + + class SSClient (pastryNode: PastryNode) extends SplitStreamClient with Application { + // Length of a message in bytes. + val DATA_LENGTH = 10 + // Number of messages to publish. + val NUM_PUBLISHES = 10 + + /** + * The message sequence number. Will be incremented after each send. + * Out of laziness we are encoding this as a byte in the stream, so the range is limited + */ + var seqNum: Byte = 0 + + // Data source... + // protected RandomSource random; + + //This task kicks off publishing and anycasting. We hold it around in case + // we ever want to cancel the publishTask. + var publishTask: CancellableTask = null + + // The Endpoint represents the underlying node. By making calls on the + // Endpoint, it assures that the message will be delivered to a MyApp on + // whichever node the message is intended for. + protected val endpoint = pastryNode.buildEndpoint (this, "myInstance") + + // use this to generate data + // this.random = endpoint.getEnvironment().getRandomSource() + + // Handle to a SplitStream implementation + val mySplitStream = new SplitStreamImpl (pastryNode, "splitStreamImpl") + + // The ChannelId is constructed from a normal PastryId + val tmp = new PastryIdFactory (pastryNode.getEnvironment).buildId ("myChannel") + val myChannelId = new ChannelId (tmp) + + // The channel. + var myChannel: Channel= null + + // The stripes. Acquired from myChannel. + var myStripes: Array[Stripe] = null + + // Now we can receive messages + endpoint.register + + // Subscribes to all stripes in myChannelId. + def subscribe = { + // Attaching makes you part of the Channel, and volunteers to be an + // internal node of one of the trees + myChannel = mySplitStream.attachChannel (myChannelId) + + // Subscribing notifies your application when data comes through the tree + myStripes = myChannel.getStripes + for (curStripe <- myStripes) { curStripe.subscribe (this) } + } + + // Starts the publish task. + def startPublishTask = { + // TODO: The last two parameters are delays to wait before delivering the + // first and the subsequent messages. Needs tweaking. + publishTask = endpoint.scheduleMessage (new PublishContent, 5000, 5000) + } + + + // Part of the Application interface. Will receive PublishContent. + def deliver (id: rice.p2p.commonapi.Id, message: Message) = { + // TODO: Couldn't perform dynamic type checking. This can cause problems. + // if (message.isInstanceof[PublishContent]) + { publish } + } + + // Called whenever we receive a published message. + def deliver (s: Stripe, data: Array[Byte]) = { + println(endpoint.getId()+" deliver("+s+"):seq:"+data(0)+" stripe:"+data(1)+" "+data+")") + } + + /** + * Multicasts data. + */ + def publish = { + + for (curStripe <- myStripes) { + // format of the data: + // first byte: seqNum + // second byte: stripe + // rest: random + var data = new Array[Byte] (DATA_LENGTH) + + // yes, we waste some random bytes here + // random.nextBytes(data) + data(0) = seqNum + data(1) = 13 // curStripe + + // print what we are sending + println("Node "+endpoint.getLocalNodeHandle+" publishing "+seqNum+" "+data) + + // publish the data + curStripe.publish (data) + } + + // increment the sequence number + // seqNum = seqNum + 1 + + // cancel after sending all the messages + if (seqNum >= NUM_PUBLISHES) { publishTask.cancel } + } + + class PublishContent extends Message { + def getPriority: Int = { Message.MEDIUM_PRIORITY } + } + + // Error handling + def joinFailed(s: Stripe) = { println ("joinFailed(" + s + ")") } + + // Rest of the Application interface. NOT USED. + def forward (message: RouteMessage): Boolean = false + def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { } + } } @serializable @@ -965,7 +1082,7 @@ private object BroadcastSS { isMaster_ = isMaster__ // Initialize the SplitStream tree - initializeSplitStream + initializeSplitStream initialized = true } @@ -975,8 +1092,10 @@ private object BroadcastSS { def masterBootAddress = masterBootAddress_ def blockSize = blockSize_ def maxRetryCount = maxRetryCount_ + def pEnvironment = pEnvironment_ def pastryNode = pastryNode_ + def localBindPort = { if (localBindPort_ == -1) { if (isMaster) { localBindPort_ = masterBootPort_ } @@ -993,7 +1112,9 @@ private object BroadcastSS { def isMaster = isMaster_ - private def initializeSplitStream = { + private def initializeSplitStream: PastryNode = { + if (pastryNode != null) { return pastryNode } + pEnvironment_ = new Environment // Generate the NodeIds Randomly @@ -1006,9 +1127,9 @@ private object BroadcastSS { // Construct a Pastry node pastryNode_ = pastryNodeFactory.newNode - // Boot the node. If its Master, start a new ring. + // Boot the node. If its the Master, start a new ring. if (isMaster) { pastryNode.boot (null) } - else { pastryNode.boot (masterBootAddress)} + else { pastryNode.boot (masterBootAddress) } // The node may require sending several messages to fully boot into the ring pastryNode.synchronized { @@ -1023,6 +1144,7 @@ private object BroadcastSS { } } + return pastryNode // construct a new splitstream application // val app = new MySplitStreamClient(pastryNode) // app.subscribe -- GitLab