Skip to content
Snippets Groups Projects
Commit 4a872ef3 authored by paaguti-work's avatar paaguti-work
Browse files

Add IControllerCompletionListener to track when the events have been consumed

parent b5b1648d
No related branches found
No related tags found
No related merge requests found
......@@ -238,5 +238,19 @@ public interface IFloodlightProviderService extends
*/
public int getWorkerThreads();
// paag
/**
* Add a completion listener to the controller
*
* @param listener
*/
void addCompletionListener(IControllerCompletionListener listener);
/**
* Remove a completion listener from the controller
*
* @param listener
*/
void removeCompletionListener(IControllerCompletionListener listener);
}
......@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -41,6 +42,7 @@ import net.floodlightcontroller.core.ControllerId;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.HAListenerTypeMarker;
import net.floodlightcontroller.core.HARole;
import net.floodlightcontroller.core.IControllerCompletionListener;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IHAListener;
import net.floodlightcontroller.core.IInfoProvider;
......@@ -66,6 +68,7 @@ import org.projectfloodlight.openflow.protocol.OFPacketIn;
import org.projectfloodlight.openflow.protocol.OFPortDesc;
import org.projectfloodlight.openflow.protocol.OFType;
import org.projectfloodlight.openflow.types.DatapathId;
import org.python.modules.synchronize;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
......@@ -101,6 +104,9 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
protected ConcurrentMap<OFType, ListenerDispatcher<OFType,IOFMessageListener>> messageListeners;
// paag
protected ConcurrentLinkedQueue<IControllerCompletionListener> completionListeners;
// The controllerNodeIPsCache maps Controller IDs to their IP address.
// It's only used by handleControllerNodeIPsChanged
protected HashMap<String, String> controllerNodeIPsCache;
......@@ -489,7 +495,12 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
else
log.debug("Received a Barrier Reply, no listeners for it");
}
// paag
// And just before we exit the controller loop we see if anyone
// is interested in knowing that we are exiting the loop
for (IControllerCompletionListener listener:completionListeners)
listener.onMessageConsumed(sw, m, bc);
if ((bContext == null) && (bc != null)) flcontext_free(bc);
}
}
......@@ -521,6 +532,24 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
return openFlowPort;
}
// paag
@Override
public synchronized void addCompletionListener(IControllerCompletionListener listener) {
completionListeners.add(listener);
}
//paag
@Override
public synchronized void removeCompletionListener(IControllerCompletionListener listener) {
String listenerName = listener.getName();
if (completionListeners.remove(listener)) {
log.debug("Removing completion listener {}" , listenerName);
} else {
log.warn("Trying to remove unknown completion listener {}" , listenerName);
}
listenerName=null; // help gc
}
@Override
public synchronized void addOFMessageListener(OFType type, IOFMessageListener listener) {
ListenerDispatcher<OFType, IOFMessageListener> ldd =
......
......@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IControllerCompletionListener;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
......@@ -77,8 +78,9 @@ import org.projectfloodlight.openflow.util.LRULinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// paag: with IControllerCompletionListener that logswhen an input event has been consumed
public class LearningSwitch
implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
implements IFloodlightModule, ILearningSwitchService, IOFMessageListener, IControllerCompletionListener {
protected static Logger log = LoggerFactory.getLogger(LearningSwitch.class);
// Module dependencies
......@@ -565,6 +567,9 @@ implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
@Override
public void startUp(FloodlightModuleContext context) {
// paag: register the IControllerCompletionListener
floodlightProviderService.addCompletionListener(this);
floodlightProviderService.addOFMessageListener(OFType.PACKET_IN, this);
floodlightProviderService.addOFMessageListener(OFType.FLOW_REMOVED, this);
floodlightProviderService.addOFMessageListener(OFType.ERROR, this);
......@@ -608,4 +613,11 @@ implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
counterFlowMod = debugCounterService.registerCounter(this.getName(), "flow-mods-written", "Flow mods written to switches by LearningSwitch", MetaData.WARN);
counterPacketOut = debugCounterService.registerCounter(this.getName(), "packet-outs-written", "Packet outs written to switches by LearningSwitch", MetaData.WARN);
}
// paag: to show the IControllerCompletion concept
// CAVEAT: extremely noisy
@Override
public void onMessageConsumed(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
log.debug("Learning switch: ended processing packet {}",msg.toString());
}
}
......@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -40,6 +41,7 @@ import org.jboss.netty.util.Timer;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.HAListenerTypeMarker;
import net.floodlightcontroller.core.HARole;
import net.floodlightcontroller.core.IControllerCompletionListener;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IHAListener;
import net.floodlightcontroller.core.IInfoProvider;
......@@ -79,6 +81,8 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
private final boolean useAsyncUpdates;
private volatile ExecutorService executorService;
private volatile Future<?> mostRecentUpdateFuture;
// paag
private ConcurrentLinkedQueue<IControllerCompletionListener> completionListeners;
/**
*
......@@ -158,6 +162,9 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
result = it.next().receive(sw, msg, bc);
}
}
// paag
for (IControllerCompletionListener listener:completionListeners)
listener.onMessageConsumed(sw, msg, bc);
}
@Override
......@@ -422,4 +429,16 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
public int getWorkerThreads() {
return 0;
}
// paag
@Override
public void addCompletionListener(IControllerCompletionListener listener) {
completionListeners.add(listener);
}
// paag
@Override
public void removeCompletionListener(IControllerCompletionListener listener) {
completionListeners.remove(listener);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment