Skip to content
Snippets Groups Projects
Commit 8758aff3 authored by Kanzhe Jiang's avatar Kanzhe Jiang
Browse files

Add javadoc to packetstreamer

parent 60fd2fe7
No related branches found
No related tags found
No related merge requests found
......@@ -16,9 +16,16 @@ import org.apache.thrift.protocol.TProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The PacketStreamer Sample Client.
*/
public class PacketStreamerClient {
protected static Logger log = LoggerFactory.getLogger(PacketStreamerClient.class);
/**
* Main function entry point;
* @param args
*/
public static void main(String [] args) {
try {
int serverPort = Integer.parseInt(System.getProperty("net.floodlightcontroller.packetstreamer.port", "9090"));
......@@ -40,6 +47,14 @@ public class PacketStreamerClient {
}
}
/**
* Send test packets of the given OFMessageType to the packetstreamer server;
* @param client Packetstreamer client object
* @param numPackets number of test packets to be sent
* @param ofType OFMessageType of the test packets
* @param sync true if send with synchronous interface, false for asynchronous interface
* @throws TException
*/
private static void sendPackets(PacketStreamer.Client client, short numPackets, OFMessageType ofType, boolean sync)
throws TException {
while (numPackets-- > 0) {
......
......@@ -13,37 +13,56 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The PacketStreamer handler class that implements the service APIs.
*/
public class PacketStreamerHandler implements PacketStreamer.Iface {
/**
* The queue wrapper class that contains the queue for the streamed packets.
*/
protected class SessionQueue {
protected BlockingQueue<ByteBuffer> pQueue;
protected boolean terminated;
/**
* The queue wrapper constructor
*/
public SessionQueue() {
this.pQueue = new LinkedBlockingQueue<ByteBuffer>();
this.terminated = false;
}
public boolean isTerminated() {
return this.terminated;
}
public void setTerminated(boolean terminated) {
this.terminated = terminated;
}
/**
* The access method to get to the internal queue.
*/
public BlockingQueue<ByteBuffer> getQueue() {
return this.pQueue;
}
}
/**
* The class logger object
*/
protected static Logger log = LoggerFactory.getLogger(PacketStreamerServer.class);
/**
* A sessionId-to-queue mapping
*/
protected Map<String, SessionQueue> msgQueues;
/**
* The handler's constructor
*/
public PacketStreamerHandler() {
this.msgQueues = new ConcurrentHashMap<String, SessionQueue>();
}
/**
* The implementation for getPackets() function.
* This is a blocking API.
*
* @param sessionid
* @return A list of packets associated with the session
*/
@Override
public List<ByteBuffer> getPackets(String sessionid)
throws org.apache.thrift.TException {
......@@ -71,6 +90,13 @@ public class PacketStreamerHandler implements PacketStreamer.Iface {
return packets;
}
/**
* The implementation for pushMessageSync() function.
*
* @param msg
* @return 1 for success, 0 for failure
* @throws TException
*/
@Override
public int pushMessageSync(Message msg)
throws org.apache.thrift.TException {
......@@ -109,6 +135,12 @@ public class PacketStreamerHandler implements PacketStreamer.Iface {
return 1;
}
/**
* The implementation for pushMessageAsync() function.
*
* @param msg
* @throws TException
*/
@Override
public void pushMessageAsync(Message msg)
throws org.apache.thrift.TException {
......@@ -116,26 +148,32 @@ public class PacketStreamerHandler implements PacketStreamer.Iface {
return;
}
/**
* The implementation for terminateSession() function.
* It removes the session to queue association.
* @param sessionid
* @throws TException
*/
@Override
public void terminateSession(String sid)
public void terminateSession(String sessionid)
throws org.apache.thrift.TException {
if (!msgQueues.containsKey(sid)) {
if (!msgQueues.containsKey(sessionid)) {
return;
}
SessionQueue pQueue = msgQueues.get(sid);
SessionQueue pQueue = msgQueues.get(sessionid);
log.debug("terminateSession: SessionId: " + sid + "\n");
log.debug("terminateSession: SessionId: " + sessionid + "\n");
String data = "FilterTimeout";
ByteBuffer bb = ByteBuffer.wrap(data.getBytes());
BlockingQueue<ByteBuffer> queue = pQueue.getQueue();
if (queue != null) {
if (!queue.offer(bb)) {
log.error("Failed to queue message for session: " + sid);
log.error("Failed to queue message for session: " + sessionid);
}
msgQueues.remove(sid);
msgQueues.remove(sessionid);
} else {
log.error("queue for session {} is null", sid);
log.error("queue for session {} is null", sessionid);
}
}
}
......
......@@ -14,12 +14,20 @@ import org.slf4j.LoggerFactory;
// Generated code
import net.floodlightcontroller.packetstreamer.thrift.*;
/**
* The PacketStreamer Server that brokers the packet streaming service.
*/
public class PacketStreamerServer {
protected static Logger log = LoggerFactory.getLogger(PacketStreamerServer.class);
protected static int port = 9090;
protected static PacketStreamerHandler handler;
protected static PacketStreamer.Processor<PacketStreamerHandler> processor;
/**
* Main function entry point;
* @param args
*/
public static void main(String [] args) {
try {
port = Integer.parseInt(System.getProperty("net.floodlightcontroller.packetstreamer.port", "9090"));
......@@ -39,9 +47,13 @@ public class PacketStreamerServer {
}
}
/**
* The function to create a thrift Half-Sync and Half-Async Server.
* @param processor
*/
public static void hshaServer(PacketStreamer.Processor<PacketStreamerHandler> processor) {
try {
// Use this for a HsHa server
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(port);
THsHaServer.Args args = new THsHaServer.Args(serverTransport);
args.processor(processor);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment