Skip to content
Snippets Groups Projects
Commit efbf8f57 authored by Ryan Izard's avatar Ryan Izard
Browse files

Merge pull request #614 from paaguti-work/master

Add a callback after an input event has been consumed
parents 22c771e5 d0a900cd
No related branches found
No related tags found
No related merge requests found
......@@ -14,3 +14,4 @@ findbugs-results
/thrift
*.idea/*
*.iml
.metadata/
package net.floodlightcontroller.core;
import org.projectfloodlight.openflow.protocol.OFMessage;
public interface IControllerCompletionListener {
/**
* This mimics the behaviour of the IOFMessageListener. Will be called at the end of the message processing loop
* Modules implementing this interface will know when the message processing queue has digested an input event
*
* @param sw
* @param msg
* @param cntx
*
*/
public void onMessageConsumed(IOFSwitch sw, OFMessage msg, FloodlightContext cntx);
public String getName();
}
......@@ -238,5 +238,19 @@ public interface IFloodlightProviderService extends
*/
public int getWorkerThreads();
// paag
/**
* Add a completion listener to the controller
*
* @param listener
*/
void addCompletionListener(IControllerCompletionListener listener);
/**
* Remove a completion listener from the controller
*
* @param listener
*/
void removeCompletionListener(IControllerCompletionListener listener);
}
......@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.Stack;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
......@@ -41,6 +42,7 @@ import net.floodlightcontroller.core.ControllerId;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.HAListenerTypeMarker;
import net.floodlightcontroller.core.HARole;
import net.floodlightcontroller.core.IControllerCompletionListener;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IHAListener;
import net.floodlightcontroller.core.IInfoProvider;
......@@ -103,6 +105,9 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
protected ConcurrentMap<OFType, ListenerDispatcher<OFType,IOFMessageListener>> messageListeners;
// paag
protected ConcurrentLinkedQueue<IControllerCompletionListener> completionListeners;
// The controllerNodeIPsCache maps Controller IDs to their IP address.
// It's only used by handleControllerNodeIPsChanged
protected HashMap<String, String> controllerNodeIPsCache;
......@@ -487,7 +492,12 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
else
log.debug("Received a Barrier Reply, no listeners for it");
}
// paag
// And just before we exit the controller loop we see if anyone
// is interested in knowing that we are exiting the loop
for (IControllerCompletionListener listener:completionListeners)
listener.onMessageConsumed(sw, m, bc);
if ((bContext == null) && (bc != null)) flcontext_free(bc);
}
}
......@@ -520,6 +530,24 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
return openFlowPort;
}
// paag
@Override
public synchronized void addCompletionListener(IControllerCompletionListener listener) {
completionListeners.add(listener);
}
//paag
@Override
public synchronized void removeCompletionListener(IControllerCompletionListener listener) {
String listenerName = listener.getName();
if (completionListeners.remove(listener)) {
log.debug("Removing completion listener {}" , listenerName);
} else {
log.warn("Trying to remove unknown completion listener {}" , listenerName);
}
listenerName=null; // help gc
}
@Override
public synchronized void addOFMessageListener(OFType type, IOFMessageListener listener) {
ListenerDispatcher<OFType, IOFMessageListener> ldd =
......@@ -720,7 +748,8 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
this.controllerNodeIPsCache = new HashMap<String, String>();
this.updates = new LinkedBlockingQueue<IUpdate>();
this.providerMap = new HashMap<String, List<IInfoProvider>>();
this.completionListeners = new ConcurrentLinkedQueue<IControllerCompletionListener>();
setConfigParams(configParams);
HARole initialRole = getInitialRole(configParams);
......
......@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IControllerCompletionListener;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
......@@ -77,8 +78,9 @@ import org.projectfloodlight.openflow.util.LRULinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// paag: with IControllerCompletionListener that logswhen an input event has been consumed
public class LearningSwitch
implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
implements IFloodlightModule, ILearningSwitchService, IOFMessageListener, IControllerCompletionListener {
protected static Logger log = LoggerFactory.getLogger(LearningSwitch.class);
// Module dependencies
......@@ -112,6 +114,10 @@ implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
// normally, setup reverse flow as well. Disable only for using cbench for comparison with NOX etc.
protected static final boolean LEARNING_SWITCH_REVERSE_FLOW = true;
// set this flag to true if you want to see the completion messages and
// have the switch flushed
protected final boolean flushAtCompletion = false;
/**
* @param floodlightProvider the floodlightProvider to set
*/
......@@ -565,6 +571,8 @@ implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
@Override
public void startUp(FloodlightModuleContext context) {
// paag: register the IControllerCompletionListener
floodlightProviderService.addCompletionListener(this);
floodlightProviderService.addOFMessageListener(OFType.PACKET_IN, this);
floodlightProviderService.addOFMessageListener(OFType.FLOW_REMOVED, this);
floodlightProviderService.addOFMessageListener(OFType.ERROR, this);
......@@ -608,4 +616,13 @@ implements IFloodlightModule, ILearningSwitchService, IOFMessageListener {
counterFlowMod = debugCounterService.registerCounter(this.getName(), "flow-mods-written", "Flow mods written to switches by LearningSwitch", MetaData.WARN);
counterPacketOut = debugCounterService.registerCounter(this.getName(), "packet-outs-written", "Packet outs written to switches by LearningSwitch", MetaData.WARN);
}
// paag: to show the IControllerCompletion concept
// CAVEAT: extremely noisy when tracking enabled
@Override
public void onMessageConsumed(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
if (this.flushAtCompletion) {
log.debug("Learning switch: ended processing packet {}",msg.toString());
}
}
}
......@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -39,6 +40,7 @@ import io.netty.util.Timer;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.HAListenerTypeMarker;
import net.floodlightcontroller.core.HARole;
import net.floodlightcontroller.core.IControllerCompletionListener;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IHAListener;
import net.floodlightcontroller.core.IInfoProvider;
......@@ -80,6 +82,8 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
private final boolean useAsyncUpdates;
private volatile ExecutorService executorService;
private volatile Future<?> mostRecentUpdateFuture;
// paag
private ConcurrentLinkedQueue<IControllerCompletionListener> completionListeners;
/**
*
......@@ -89,7 +93,8 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
IOFMessageListener>>();
haListeners =
new ListenerDispatcher<HAListenerTypeMarker, IHAListener>();
completionListeners =
new ConcurrentLinkedQueue<IControllerCompletionListener>();
role = null;
this.useAsyncUpdates = useAsyncUpdates;
}
......@@ -159,6 +164,9 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
result = it.next().receive(sw, msg, bc);
}
}
// paag
for (IControllerCompletionListener listener:completionListeners)
listener.onMessageConsumed(sw, msg, bc);
}
@Override
......@@ -423,4 +431,16 @@ public class MockFloodlightProvider implements IFloodlightModule, IFloodlightPro
public int getWorkerThreads() {
return 0;
}
// paag
@Override
public void addCompletionListener(IControllerCompletionListener listener) {
completionListeners.add(listener);
}
// paag
@Override
public void removeCompletionListener(IControllerCompletionListener listener) {
completionListeners.remove(listener);
}
}
......@@ -171,7 +171,7 @@ public class LearningSwitchTest extends FloodlightTestCase {
this.learningSwitch.startUp(fmc);
this.mockFloodlightProvider.addOFMessageListener(OFType.PACKET_IN, learningSwitch);
this.mockFloodlightProvider.addCompletionListener(learningSwitch);
}
@Test
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment