Skip to content
Snippets Groups Projects
Commit 52dc58a9 authored by Srinivasan Ramasubramanian's avatar Srinivasan Ramasubramanian
Browse files

Move new topology instance computation as a singleton task.

parent 8ab3ceb2
No related branches found
No related tags found
No related merge requests found
...@@ -78,6 +78,15 @@ public interface ILinkDiscovery { ...@@ -78,6 +78,15 @@ public interface ILinkDiscovery {
public UpdateOperation getOperation() { public UpdateOperation getOperation() {
return operation; return operation;
} }
@Override
public String toString() {
return "LDUpdate [src=" + src + ", srcPort=" + srcPort
+ ", srcPortState=" + srcPortState + ", dst=" + dst
+ ", dstPort=" + dstPort + ", dstPortState=" + dstPortState
+ ", srcType=" + srcType + ", type=" + type + ", operation="
+ operation + "]";
}
} }
public enum SwitchType { public enum SwitchType {
......
...@@ -6,11 +6,17 @@ import java.util.HashMap; ...@@ -6,11 +6,17 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext; import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException; import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule; import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService; import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.core.util.SingletonTask;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener; import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryService; import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryService;
import net.floodlightcontroller.routing.BroadcastTree; import net.floodlightcontroller.routing.BroadcastTree;
...@@ -40,12 +46,50 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -40,12 +46,50 @@ IRoutingService, ILinkDiscoveryListener {
protected Map<NodePortTuple, Set<Link>> tunnelLinks; // set of tunnel links protected Map<NodePortTuple, Set<Link>> tunnelLinks; // set of tunnel links
protected ILinkDiscoveryService linkDiscovery; protected ILinkDiscoveryService linkDiscovery;
protected ArrayList<ITopologyListener> topologyAware; protected ArrayList<ITopologyListener> topologyAware;
protected IFloodlightProviderService floodlightProvider;
protected BlockingQueue<LDUpdate> ldUpdates;
protected TopologyInstance currentInstance; protected TopologyInstance currentInstance;
protected SingletonTask newInstanceTask;
public void recompute() { /**
createNewInstance(); * Thread for recomputing topology. The thread is always running,
informListeners(); * however the function applyUpdates() has a blocking call.
*/
protected class NewInstanceWorker implements Runnable {
@Override
public void run() {
applyUpdates();
createNewInstance();
informListeners();
}
}
public void applyUpdates() {
LDUpdate update = null;
while (ldUpdates.peek() != null) {
try {
update = ldUpdates.take();
} catch (Exception e) {
log.error("Error reading link discovery update. {}", e);
}
if (log.isTraceEnabled()) {
log.info("Applying update: {}", update);
}
if (update.getOperation() == UpdateOperation.ADD_OR_UPDATE) {
boolean added = (((update.getSrcPortState() & OFPortState.OFPPS_STP_MASK.getValue()) != OFPortState.OFPPS_STP_BLOCK.getValue()) &&
((update.getDstPortState() & OFPortState.OFPPS_STP_MASK.getValue()) != OFPortState.OFPPS_STP_BLOCK.getValue()));
if (added) {
addOrUpdateLink(update.getSrc(), update.getSrcPort(),
update.getDst(), update.getDstPort(),
update.getType());
} else {
removeLink(update.getSrc(), update.getSrcPort(), update.getDst(), update.getDstPort());
}
} else if (update.getOperation() == UpdateOperation.REMOVE) {
removeLink(update.getSrc(), update.getSrcPort(), update.getDst(), update.getDstPort());
}
}
} }
/** /**
...@@ -141,8 +185,6 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -141,8 +185,6 @@ IRoutingService, ILinkDiscoveryListener {
addLinkToStructure(tunnelLinks, link); addLinkToStructure(tunnelLinks, link);
removeLinkFromStructure(portBroadcastDomainLinks, link); removeLinkFromStructure(portBroadcastDomainLinks, link);
} }
this.createNewInstance();
} }
public void removeLink(Link link) { public void removeLink(Link link) {
...@@ -172,7 +214,6 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -172,7 +214,6 @@ IRoutingService, ILinkDiscoveryListener {
switchPorts.get(link.getDst()).isEmpty()) { switchPorts.get(link.getDst()).isEmpty()) {
switchPorts.remove(link.getDst()); switchPorts.remove(link.getDst());
} }
this.createNewInstance();
} }
public void removeLink(long srcId, short srcPort, long dstId, short dstPort) { public void removeLink(long srcId, short srcPort, long dstId, short dstPort) {
...@@ -216,18 +257,19 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -216,18 +257,19 @@ IRoutingService, ILinkDiscoveryListener {
// //
public void linkDiscoveryUpdate(LDUpdate update) { public void linkDiscoveryUpdate(LDUpdate update) {
if (update.getOperation() == UpdateOperation.ADD_OR_UPDATE) { boolean scheduleFlag = false;
boolean added = (((update.getSrcPortState() & OFPortState.OFPPS_STP_MASK.getValue()) != OFPortState.OFPPS_STP_BLOCK.getValue()) && // if there's no udpates in the queue, then
((update.getDstPortState() & OFPortState.OFPPS_STP_MASK.getValue()) != OFPortState.OFPPS_STP_BLOCK.getValue())); // we need to schedule an update.
if (added) { if (ldUpdates.peek() == null)
addOrUpdateLink(update.getSrc(), update.getSrcPort(), scheduleFlag = true;
update.getDst(), update.getDstPort(),
update.getType()); if (log.isTraceEnabled()) {
} else { log.trace("Queuing update: {}", update);
removeLink(update.getSrc(), update.getSrcPort(), update.getDst(), update.getDstPort()); }
} ldUpdates.add(update);
} else if (update.getOperation() == UpdateOperation.REMOVE) {
removeLink(update.getSrc(), update.getSrcPort(), update.getDst(), update.getDstPort()); if (scheduleFlag) {
newInstanceTask.reschedule(1, TimeUnit.MICROSECONDS);
} }
} }
...@@ -264,6 +306,7 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -264,6 +306,7 @@ IRoutingService, ILinkDiscoveryListener {
getModuleDependencies() { getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l = Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>(); new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(ILinkDiscoveryService.class); l.add(ILinkDiscoveryService.class);
return l; return l;
} }
...@@ -271,19 +314,23 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -271,19 +314,23 @@ IRoutingService, ILinkDiscoveryListener {
@Override @Override
public void init(FloodlightModuleContext context) public void init(FloodlightModuleContext context)
throws FloodlightModuleException { throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
linkDiscovery = context.getServiceImpl(ILinkDiscoveryService.class); linkDiscovery = context.getServiceImpl(ILinkDiscoveryService.class);
switchPorts = new HashMap<Long,Set<Short>>(); switchPorts = new HashMap<Long,Set<Short>>();
switchPortLinks = new HashMap<NodePortTuple, Set<Link>>(); switchPortLinks = new HashMap<NodePortTuple, Set<Link>>();
portBroadcastDomainLinks = new HashMap<NodePortTuple, Set<Link>>(); portBroadcastDomainLinks = new HashMap<NodePortTuple, Set<Link>>();
tunnelLinks = new HashMap<NodePortTuple, Set<Link>>(); tunnelLinks = new HashMap<NodePortTuple, Set<Link>>();
topologyAware = new ArrayList<ITopologyListener>(); topologyAware = new ArrayList<ITopologyListener>();
ldUpdates = new LinkedBlockingQueue<LDUpdate>();
ScheduledExecutorService ses = floodlightProvider.getScheduledExecutor();
newInstanceTask = new SingletonTask(ses, new NewInstanceWorker());
} }
@Override @Override
public void startUp(FloodlightModuleContext context) { public void startUp(FloodlightModuleContext context) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
linkDiscovery.addListener(this); linkDiscovery.addListener(this);
this.createNewInstance(); newInstanceTask.reschedule(1, TimeUnit.MILLISECONDS);
} }
// //
...@@ -338,3 +385,4 @@ IRoutingService, ILinkDiscoveryListener { ...@@ -338,3 +385,4 @@ IRoutingService, ILinkDiscoveryListener {
return currentInstance.getBroadcastTreeForCluster(clusterId); return currentInstance.getBroadcastTreeForCluster(clusterId);
} }
} }
...@@ -48,10 +48,10 @@ import net.floodlightcontroller.topology.TopologyManager; ...@@ -48,10 +48,10 @@ import net.floodlightcontroller.topology.TopologyManager;
* *
* @author David Erickson (daviderickson@cs.stanford.edu) * @author David Erickson (daviderickson@cs.stanford.edu)
*/ */
public class TopologyImplTest extends FloodlightTestCase { public class LinkDiscoveryManagerTest extends FloodlightTestCase {
private LinkDiscoveryManager topology; private LinkDiscoveryManager topology;
protected static Logger log = LoggerFactory.getLogger(TopologyImplTest.class); protected static Logger log = LoggerFactory.getLogger(LinkDiscoveryManagerTest.class);
public LinkDiscoveryManager getTopology() { public LinkDiscoveryManager getTopology() {
return topology; return topology;
......
package net.floodlightcontroller.topology; package net.floodlightcontroller.topology;
import static org.junit.Assert.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import static org.junit.Assert.*;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext; import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.test.MockFloodlightProvider;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery; import net.floodlightcontroller.linkdiscovery.ILinkDiscovery;
import net.floodlightcontroller.topology.NodePortTuple; import net.floodlightcontroller.topology.NodePortTuple;
import net.floodlightcontroller.topology.TopologyInstance; import net.floodlightcontroller.topology.TopologyInstance;
...@@ -23,6 +24,7 @@ public class TopologyInstanceTest { ...@@ -23,6 +24,7 @@ public class TopologyInstanceTest {
protected static Logger log = LoggerFactory.getLogger(TopologyInstanceTest.class); protected static Logger log = LoggerFactory.getLogger(TopologyInstanceTest.class);
protected TopologyManager topologyManager; protected TopologyManager topologyManager;
protected FloodlightModuleContext fmc; protected FloodlightModuleContext fmc;
protected MockFloodlightProvider mockFloodlightProvider;
protected int DIRECT_LINK = 1; protected int DIRECT_LINK = 1;
protected int MULTIHOP_LINK = 2; protected int MULTIHOP_LINK = 2;
...@@ -31,6 +33,8 @@ public class TopologyInstanceTest { ...@@ -31,6 +33,8 @@ public class TopologyInstanceTest {
@Before @Before
public void SetUp() throws Exception { public void SetUp() throws Exception {
fmc = new FloodlightModuleContext(); fmc = new FloodlightModuleContext();
mockFloodlightProvider = new MockFloodlightProvider();
fmc.addService(IFloodlightProviderService.class, mockFloodlightProvider);
topologyManager = new TopologyManager(); topologyManager = new TopologyManager();
topologyManager.init(fmc); topologyManager.init(fmc);
} }
...@@ -103,6 +107,7 @@ public class TopologyInstanceTest { ...@@ -103,6 +107,7 @@ public class TopologyInstanceTest {
topologyManager.addOrUpdateLink((long)r[0], (short)r[1], (long)r[2], (short)r[3], type); topologyManager.addOrUpdateLink((long)r[0], (short)r[1], (long)r[2], (short)r[3], type);
} }
topologyManager.createNewInstance();
} }
public TopologyManager getTopologyManager() { public TopologyManager getTopologyManager() {
...@@ -124,8 +129,8 @@ public class TopologyInstanceTest { ...@@ -124,8 +129,8 @@ public class TopologyInstanceTest {
{1,2,3}, {1,2,3},
{4} {4}
}; };
//tm.recompute();
createTopologyFromLinks(linkArray); createTopologyFromLinks(linkArray);
verifyClusters(expectedClusters); verifyClusters(expectedClusters);
} }
...@@ -219,6 +224,7 @@ public class TopologyInstanceTest { ...@@ -219,6 +224,7 @@ public class TopologyInstanceTest {
int [][] expectedClusters = { int [][] expectedClusters = {
{1,2,3,4,5}, {1,2,3,4,5},
}; };
topologyManager.createNewInstance();
verifyClusters(expectedClusters); verifyClusters(expectedClusters);
} }
...@@ -228,6 +234,7 @@ public class TopologyInstanceTest { ...@@ -228,6 +234,7 @@ public class TopologyInstanceTest {
int [][] expectedClusters = { int [][] expectedClusters = {
{1,2,3,5}, {1,2,3,5},
}; };
topologyManager.createNewInstance();
verifyClusters(expectedClusters); verifyClusters(expectedClusters);
} }
} }
...@@ -260,6 +267,7 @@ public class TopologyInstanceTest { ...@@ -260,6 +267,7 @@ public class TopologyInstanceTest {
}; };
createTopologyFromLinks(linkArray); createTopologyFromLinks(linkArray);
topologyManager.createNewInstance();
verifyClusters(expectedClusters); verifyClusters(expectedClusters);
verifyExpectedBroadcastPortsInClusters(expectedBroadcastPorts); verifyExpectedBroadcastPortsInClusters(expectedBroadcastPorts);
} }
...@@ -321,6 +329,7 @@ public class TopologyInstanceTest { ...@@ -321,6 +329,7 @@ public class TopologyInstanceTest {
}; };
createTopologyFromLinks(linkArray); createTopologyFromLinks(linkArray);
topologyManager.createNewInstance();
verifyClusters(expectedClusters); verifyClusters(expectedClusters);
verifyExpectedBroadcastPortsInClusters(expectedBroadcastPorts); verifyExpectedBroadcastPortsInClusters(expectedBroadcastPorts);
} }
...@@ -367,6 +376,7 @@ public class TopologyInstanceTest { ...@@ -367,6 +376,7 @@ public class TopologyInstanceTest {
}; };
createTopologyFromLinks(linkArray); createTopologyFromLinks(linkArray);
topologyManager.createNewInstance();
verifyClusters(expectedClusters); verifyClusters(expectedClusters);
verifyExpectedBroadcastPortsInClusters(expectedBroadcastPorts); verifyExpectedBroadcastPortsInClusters(expectedBroadcastPorts);
} }
......
package net.floodlightcontroller.topology; package net.floodlightcontroller.topology;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext; import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.test.MockFloodlightProvider;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery; import net.floodlightcontroller.linkdiscovery.ILinkDiscovery;
import net.floodlightcontroller.topology.TopologyManager; import net.floodlightcontroller.topology.TopologyManager;
...@@ -15,10 +16,13 @@ public class TopologyManagerTest { ...@@ -15,10 +16,13 @@ public class TopologyManagerTest {
protected static Logger log = LoggerFactory.getLogger(TopologyManagerTest.class); protected static Logger log = LoggerFactory.getLogger(TopologyManagerTest.class);
TopologyManager topologyManager; TopologyManager topologyManager;
FloodlightModuleContext fmc; FloodlightModuleContext fmc;
protected MockFloodlightProvider mockFloodlightProvider;
@Before @Before
public void SetUp() throws Exception { public void SetUp() throws Exception {
mockFloodlightProvider = new MockFloodlightProvider();
fmc = new FloodlightModuleContext(); fmc = new FloodlightModuleContext();
fmc.addService(IFloodlightProviderService.class, mockFloodlightProvider);
topologyManager = new TopologyManager(); topologyManager = new TopologyManager();
topologyManager.init(fmc); topologyManager.init(fmc);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment