Skip to content
Snippets Groups Projects
TopologyManager.java 14.07 KiB
package net.floodlightcontroller.topology;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.core.util.SingletonTask;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryService;
import net.floodlightcontroller.routing.BroadcastTree;
import net.floodlightcontroller.routing.IRoutingService;
import net.floodlightcontroller.routing.Link;
import net.floodlightcontroller.routing.Route;
import net.floodlightcontroller.threadpool.IThreadPoolService;


import org.openflow.protocol.OFPhysicalPort.OFPortState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * @author srini
 *
 */

public class TopologyManager implements IFloodlightModule, ITopologyService, 
IRoutingService, ILinkDiscoveryListener {

    protected static Logger log = LoggerFactory.getLogger(TopologyManager.class);

    protected Map<Long, Set<Short>> switchPorts; // Set of ports for each switch
    protected Map<NodePortTuple, Set<Link>> switchPortLinks; // Set of links organized by node port tuple
    protected Map<NodePortTuple, Set<Link>> portBroadcastDomainLinks; // set of links that are broadcast domain links.
    protected Map<NodePortTuple, Set<Link>> tunnelLinks; // set of tunnel links
    protected ILinkDiscoveryService linkDiscovery;
    protected ArrayList<ITopologyListener> topologyAware;
    protected IThreadPoolService threadPool;

    protected BlockingQueue<LDUpdate> ldUpdates;
    protected TopologyInstance currentInstance;
    protected SingletonTask newInstanceTask;

    /**
     * Thread for recomputing topology.  The thread is always running, 
     * however the function applyUpdates() has a blocking call.
     */
    protected class NewInstanceWorker implements Runnable {
        @Override 
        public void run() {
            applyUpdates();
            createNewInstance();
            informListeners();
        }
    }

    public void applyUpdates() {
        LDUpdate update = null;
        while (ldUpdates.peek() != null) {
            try {
                update = ldUpdates.take();
            } catch (Exception e) {
                log.error("Error reading link discovery update. {}", e);
            }
            if (log.isTraceEnabled()) {
                log.info("Applying update: {}", update);
            }
            if (update.getOperation() == UpdateOperation.ADD_OR_UPDATE) {
                boolean added = (((update.getSrcPortState() & OFPortState.OFPPS_STP_MASK.getValue()) != OFPortState.OFPPS_STP_BLOCK.getValue()) &&
                        ((update.getDstPortState() & OFPortState.OFPPS_STP_MASK.getValue()) != OFPortState.OFPPS_STP_BLOCK.getValue()));
                if (added) {
                    addOrUpdateLink(update.getSrc(), update.getSrcPort(), 
                                    update.getDst(), update.getDstPort(), 
                                    update.getType());
                } else  {
                    removeLink(update.getSrc(), update.getSrcPort(), update.getDst(), update.getDstPort());
                }
            } else if (update.getOperation() == UpdateOperation.REMOVE) {
                removeLink(update.getSrc(), update.getSrcPort(), update.getDst(), update.getDstPort());
            }
        }
    }

    /**
     * This function computes a new topology.
     */
    public void createNewInstance() {
        TopologyInstance nt = new TopologyInstance(switchPorts, switchPortLinks, portBroadcastDomainLinks, tunnelLinks);
        nt.compute();
        currentInstance = nt;
    }

    public void informListeners() {
        for(int i=0; i<topologyAware.size(); ++i) {
            ITopologyListener listener = topologyAware.get(i);
            listener.toplogyChanged();
        }
    }

    public void addSwitch(long sid) {
        if (switchPorts.containsKey(sid) == false) {
            switchPorts.put(sid, new HashSet<Short>());
        }
    }

    private void addPortToSwitch(long s, short p) {
        addSwitch(s);
        switchPorts.get(s).add(p);
    }

    public void removeSwitch(long sid) {
        // Delete all the links in the switch, switch and all 
        // associated data should be deleted.
        if (switchPorts.containsKey(sid) == false) return;

        Set<Link> linksToRemove = new HashSet<Link>();
        for(Short p: switchPorts.get(sid)) {
            NodePortTuple n1 = new NodePortTuple(sid, p);
            linksToRemove.addAll(switchPortLinks.get(n1));
        }

        for(Link link: linksToRemove) {
            removeLink(link);
        }
    }

    private boolean addLinkToStructure(Map<NodePortTuple, Set<Link>> s, Link l) {
        boolean result1 = false, result2 = false; 
        NodePortTuple n1 = new NodePortTuple(l.getSrc(), l.getSrcPort());
        NodePortTuple n2 = new NodePortTuple(l.getDst(), l.getDstPort());

        if (s.get(n1) == null) {
            s.put(n1, new HashSet<Link>()); 
        }
        if (s.get(n2) == null) {
            s.put(n2, new HashSet<Link>()); 
        }
        result1 = s.get(n1).add(l);
        result2 = s.get(n2).add(l);

        return (result1 && result2);
    }

    private boolean removeLinkFromStructure(Map<NodePortTuple, Set<Link>> s, Link l) {

        boolean result1 = false, result2 = false;
        NodePortTuple n1 = new NodePortTuple(l.getSrc(), l.getSrcPort());
        NodePortTuple n2 = new NodePortTuple(l.getDst(), l.getDstPort());

        if (s.get(n1) != null) {
            result1 = s.get(n1).remove(l);
            if (s.get(n1).isEmpty()) s.remove(n1);
        }
        if (s.get(n2) != null) {
            result2 = s.get(n2).remove(l);
            if (s.get(n2).isEmpty()) s.remove(n2);
        }
        return result1 && result2; 
    }

    public void addOrUpdateLink(long srcId, short srcPort, long dstId, short dstPort, LinkType type) {
        Link link = new Link(srcId, srcPort, dstId, dstPort);

        addPortToSwitch(srcId, srcPort);
        addPortToSwitch(dstId, dstPort);

        addLinkToStructure(switchPortLinks, link);

        if (type.equals(LinkType.MULTIHOP_LINK)) {
            addLinkToStructure(portBroadcastDomainLinks, link);
            removeLinkFromStructure(tunnelLinks, link);
        } else if (type.equals(LinkType.TUNNEL)) {
            addLinkToStructure(tunnelLinks, link);
            removeLinkFromStructure(portBroadcastDomainLinks, link);
        } else if (type.equals(LinkType.DIRECT_LINK)) {
            removeLinkFromStructure(tunnelLinks, link);
            removeLinkFromStructure(portBroadcastDomainLinks, link);
        }
    }

    public void removeLink(Link link)  {
        removeLinkFromStructure(portBroadcastDomainLinks, link);
        removeLinkFromStructure(tunnelLinks, link);
        removeLinkFromStructure(switchPortLinks, link);

        NodePortTuple n1 = new NodePortTuple(link.getSrc(), link.getSrcPort());
        NodePortTuple n2 = new NodePortTuple(link.getDst(), link.getDstPort());

        // Remove switch ports if there are no links through those switch ports
        if (switchPortLinks.get(n1) == null) {
            if (switchPorts.get(link.getSrc()) != null)
                switchPorts.get(link.getSrc()).remove(link.getSrcPort());
        }
        if (switchPortLinks.get(n2) == null) {
            if (switchPorts.get(link.getDst()) != null)
                switchPorts.get(link.getDst()).remove(link.getDstPort());
        }
        // Remove the node if no ports are present
        if (switchPorts.get(link.getSrc())!=null && 
                switchPorts.get(link.getSrc()).isEmpty()) {
            switchPorts.remove(link.getSrc());
        }
        if (switchPorts.get(link.getDst())!=null && 
                switchPorts.get(link.getDst()).isEmpty()) {
            switchPorts.remove(link.getDst());
        }
    }

    public void removeLink(long srcId, short srcPort, long dstId, short dstPort) {
        Link link = new Link(srcId, srcPort, dstId, dstPort);
        removeLink(link);
    }

    public void clear() {
        switchPorts.clear();
        switchPortLinks.clear();
        portBroadcastDomainLinks.clear();
        tunnelLinks.clear();
    }


    /**
     * Getters.  No Setters.
     */
    public Map<Long, Set<Short>> getSwitchPorts() {
        return switchPorts;
    }

    public Map<NodePortTuple, Set<Link>> getSwitchPortLinks() {
        return switchPortLinks;
    }

    public Map<NodePortTuple, Set<Link>> getPortBroadcastDomainLinks() {
        return portBroadcastDomainLinks;
    }

    public Map<NodePortTuple, Set<Link>> getTunnelLinks() {
        return tunnelLinks;
    }

    public TopologyInstance getCurrentInstance() {
        return currentInstance;
    }

    //
    //  ILinkDiscoveryListener interface methods
    //

    public void linkDiscoveryUpdate(LDUpdate update) {
        boolean scheduleFlag = false;
        // if there's no udpates in the queue, then
        // we need to schedule an update.
        if (ldUpdates.peek() == null)
            scheduleFlag = true;

        if (log.isTraceEnabled()) {
            log.trace("Queuing update: {}", update);
        }
        ldUpdates.add(update);

        if (scheduleFlag) {
            newInstanceTask.reschedule(1, TimeUnit.MICROSECONDS);
        }
    }

    //
    //   IFloodlightModule interfaces
    //

    @Override
    public Collection<Class<? extends IFloodlightService>> getModuleServices() {
        // TODO Auto-generated method stub
        Collection<Class<? extends IFloodlightService>> l = 
                new ArrayList<Class<? extends IFloodlightService>>();
        l.add(ITopologyService.class);
        l.add(IRoutingService.class);
        return l;
    }

    @Override
    public Map<Class<? extends IFloodlightService>, IFloodlightService>
    getServiceImpls() {
        Map<Class<? extends IFloodlightService>,
        IFloodlightService> m = 
        new HashMap<Class<? extends IFloodlightService>,
        IFloodlightService>();
        // We are the class that implements the service
        m.put(ITopologyService.class, this);
        m.put(IRoutingService.class, this);
        return m;

    }

    @Override
    public Collection<Class<? extends IFloodlightService>>
    getModuleDependencies() {
        Collection<Class<? extends IFloodlightService>> l = 
                new ArrayList<Class<? extends IFloodlightService>>();
        l.add(ILinkDiscoveryService.class);
        l.add(IThreadPoolService.class);
        return l;
    }

    @Override
    public void init(FloodlightModuleContext context)
            throws FloodlightModuleException {
        linkDiscovery = context.getServiceImpl(ILinkDiscoveryService.class);
        threadPool = context.getServiceImpl(IThreadPoolService.class);
        
        switchPorts = new HashMap<Long,Set<Short>>();
        switchPortLinks = new HashMap<NodePortTuple, Set<Link>>();
        portBroadcastDomainLinks = new HashMap<NodePortTuple, Set<Link>>();
        tunnelLinks = new HashMap<NodePortTuple, Set<Link>>();
        topologyAware = new ArrayList<ITopologyListener>();
        ldUpdates = new LinkedBlockingQueue<LDUpdate>();
        
        ScheduledExecutorService ses = threadPool.getScheduledExecutor();
        newInstanceTask = new SingletonTask(ses, new NewInstanceWorker());
    }

    @Override
    public void startUp(FloodlightModuleContext context) {
        linkDiscovery.addListener(this);
        newInstanceTask.reschedule(1, TimeUnit.MILLISECONDS);
    }

    //
    // ITopologyService interface methods
    //
    @Override
    public boolean isInternal(long switchid, short port) {
        return currentInstance.isInternal(switchid, port);
    }

    @Override
    public long getSwitchClusterId(long switchId) {
        return currentInstance.getSwitchClusterId(switchId);
    }

    @Override
    public Set<Long> getSwitchesInCluster(long switchId) {
        return currentInstance.getSwitchesInCluster(switchId);
    }

    @Override
    public boolean inSameCluster(long switch1, long switch2) {
        return currentInstance.inSameCluster(switch1, switch2);
    }

    @Override
    public void addListener(ITopologyListener listener) {
        topologyAware.add(listener);
    }

    @Override
    public boolean isIncomingBroadcastAllowedOnSwitchPort(long sw, short portId) {
        return currentInstance.isIncomingBroadcastAllowedOnSwitchPort(sw, portId);
    }

    @Override
    public Set<Short> getPorts(long sw) {
        return currentInstance.getPorts(sw);
    }

    public Set<Short> getBroadcastPorts(long targetSw, long src, short srcPort) {
        return currentInstance.getBroadcastPorts(targetSw, src, srcPort);
    }

    //
    // IRoutingService interface methods
    //
    @Override
    public Route getRoute(long src, long dst) {
        Route r = currentInstance.getRoute(src, dst);
        return r;
    }

    @Override
    public boolean routeExists(long src, long dst) {
        return currentInstance.routeExists(src, dst);
    }

    @Override
    public BroadcastTree getBroadcastTreeForCluster(long clusterId) {
        return currentInstance.getBroadcastTreeForCluster(clusterId);
    }

    @Override
    public boolean isInSameBroadcastDomain(long s1, short p1, long s2, short p2) {
        return currentInstance.isInSameBroadcastDomain(s1, p1, s2, p2);

    }

    @Override
    public NodePortTuple getOutgoingSwitchPort(long src, short srcPort,
                                               long dst, short dstPort) {
        // Use this function to redirect traffic if needed.
        return currentInstance.getOutgoingSwitchPort(src, srcPort, dst, dstPort);
    }

	@Override
	public boolean isBroadcastDomainPort(long sw, short port) {
		return currentInstance.isBroadcastDomainPort(new NodePortTuple(sw, port));
	}
}