Skip to content
Snippets Groups Projects
Commit 1c1ac316 authored by Mosharaf Chowdhury's avatar Mosharaf Chowdhury
Browse files

More porting of SplitStream code.

parent dc2c69e6
Branches demo_pandas2
No related tags found
No related merge requests found
...@@ -640,7 +640,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) ...@@ -640,7 +640,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
} }
@serializable @serializable
class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean) class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
extends BroadcastRecipe { extends BroadcastRecipe {
def value = value_ def value = value_
...@@ -664,6 +664,123 @@ class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean) ...@@ -664,6 +664,123 @@ class SplitStreamBroadcast[T](@transient var value_ : T, local: Boolean)
} }
} }
} }
class SSClient (pastryNode: PastryNode) extends SplitStreamClient with Application {
// Length of a message in bytes.
val DATA_LENGTH = 10
// Number of messages to publish.
val NUM_PUBLISHES = 10
/**
* The message sequence number. Will be incremented after each send.
* Out of laziness we are encoding this as a byte in the stream, so the range is limited
*/
var seqNum: Byte = 0
// Data source...
// protected RandomSource random;
//This task kicks off publishing and anycasting. We hold it around in case
// we ever want to cancel the publishTask.
var publishTask: CancellableTask = null
// The Endpoint represents the underlying node. By making calls on the
// Endpoint, it assures that the message will be delivered to a MyApp on
// whichever node the message is intended for.
protected val endpoint = pastryNode.buildEndpoint (this, "myInstance")
// use this to generate data
// this.random = endpoint.getEnvironment().getRandomSource()
// Handle to a SplitStream implementation
val mySplitStream = new SplitStreamImpl (pastryNode, "splitStreamImpl")
// The ChannelId is constructed from a normal PastryId
val tmp = new PastryIdFactory (pastryNode.getEnvironment).buildId ("myChannel")
val myChannelId = new ChannelId (tmp)
// The channel.
var myChannel: Channel= null
// The stripes. Acquired from myChannel.
var myStripes: Array[Stripe] = null
// Now we can receive messages
endpoint.register
// Subscribes to all stripes in myChannelId.
def subscribe = {
// Attaching makes you part of the Channel, and volunteers to be an
// internal node of one of the trees
myChannel = mySplitStream.attachChannel (myChannelId)
// Subscribing notifies your application when data comes through the tree
myStripes = myChannel.getStripes
for (curStripe <- myStripes) { curStripe.subscribe (this) }
}
// Starts the publish task.
def startPublishTask = {
// TODO: The last two parameters are delays to wait before delivering the
// first and the subsequent messages. Needs tweaking.
publishTask = endpoint.scheduleMessage (new PublishContent, 5000, 5000)
}
// Part of the Application interface. Will receive PublishContent.
def deliver (id: rice.p2p.commonapi.Id, message: Message) = {
// TODO: Couldn't perform dynamic type checking. This can cause problems.
// if (message.isInstanceof[PublishContent])
{ publish }
}
// Called whenever we receive a published message.
def deliver (s: Stripe, data: Array[Byte]) = {
println(endpoint.getId()+" deliver("+s+"):seq:"+data(0)+" stripe:"+data(1)+" "+data+")")
}
/**
* Multicasts data.
*/
def publish = {
for (curStripe <- myStripes) {
// format of the data:
// first byte: seqNum
// second byte: stripe
// rest: random
var data = new Array[Byte] (DATA_LENGTH)
// yes, we waste some random bytes here
// random.nextBytes(data)
data(0) = seqNum
data(1) = 13 // curStripe
// print what we are sending
println("Node "+endpoint.getLocalNodeHandle+" publishing "+seqNum+" "+data)
// publish the data
curStripe.publish (data)
}
// increment the sequence number
// seqNum = seqNum + 1
// cancel after sending all the messages
if (seqNum >= NUM_PUBLISHES) { publishTask.cancel }
}
class PublishContent extends Message {
def getPriority: Int = { Message.MEDIUM_PRIORITY }
}
// Error handling
def joinFailed(s: Stripe) = { println ("joinFailed(" + s + ")") }
// Rest of the Application interface. NOT USED.
def forward (message: RouteMessage): Boolean = false
def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { }
}
} }
@serializable @serializable
...@@ -965,7 +1082,7 @@ private object BroadcastSS { ...@@ -965,7 +1082,7 @@ private object BroadcastSS {
isMaster_ = isMaster__ isMaster_ = isMaster__
// Initialize the SplitStream tree // Initialize the SplitStream tree
initializeSplitStream initializeSplitStream
initialized = true initialized = true
} }
...@@ -975,8 +1092,10 @@ private object BroadcastSS { ...@@ -975,8 +1092,10 @@ private object BroadcastSS {
def masterBootAddress = masterBootAddress_ def masterBootAddress = masterBootAddress_
def blockSize = blockSize_ def blockSize = blockSize_
def maxRetryCount = maxRetryCount_ def maxRetryCount = maxRetryCount_
def pEnvironment = pEnvironment_ def pEnvironment = pEnvironment_
def pastryNode = pastryNode_ def pastryNode = pastryNode_
def localBindPort = { def localBindPort = {
if (localBindPort_ == -1) { if (localBindPort_ == -1) {
if (isMaster) { localBindPort_ = masterBootPort_ } if (isMaster) { localBindPort_ = masterBootPort_ }
...@@ -993,7 +1112,9 @@ private object BroadcastSS { ...@@ -993,7 +1112,9 @@ private object BroadcastSS {
def isMaster = isMaster_ def isMaster = isMaster_
private def initializeSplitStream = { private def initializeSplitStream: PastryNode = {
if (pastryNode != null) { return pastryNode }
pEnvironment_ = new Environment pEnvironment_ = new Environment
// Generate the NodeIds Randomly // Generate the NodeIds Randomly
...@@ -1006,9 +1127,9 @@ private object BroadcastSS { ...@@ -1006,9 +1127,9 @@ private object BroadcastSS {
// Construct a Pastry node // Construct a Pastry node
pastryNode_ = pastryNodeFactory.newNode pastryNode_ = pastryNodeFactory.newNode
// Boot the node. If its Master, start a new ring. // Boot the node. If its the Master, start a new ring.
if (isMaster) { pastryNode.boot (null) } if (isMaster) { pastryNode.boot (null) }
else { pastryNode.boot (masterBootAddress)} else { pastryNode.boot (masterBootAddress) }
// The node may require sending several messages to fully boot into the ring // The node may require sending several messages to fully boot into the ring
pastryNode.synchronized { pastryNode.synchronized {
...@@ -1023,6 +1144,7 @@ private object BroadcastSS { ...@@ -1023,6 +1144,7 @@ private object BroadcastSS {
} }
} }
return pastryNode
// construct a new splitstream application // construct a new splitstream application
// val app = new MySplitStreamClient(pastryNode) // val app = new MySplitStreamClient(pastryNode)
// app.subscribe // app.subscribe
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment