Skip to content
Snippets Groups Projects
DeviceManagerImpl.java 24.91 KiB
/**
*    Copyright 2011,2012 Big Switch Networks, Inc. 
*    Originally created by David Erickson, Stanford University
* 
*    Licensed under the Apache License, Version 2.0 (the "License"); you may
*    not use this file except in compliance with the License. You may obtain
*    a copy of the License at
*
*         http://www.apache.org/licenses/LICENSE-2.0
*
*    Unless required by applicable law or agreed to in writing, software
*    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
*    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
*    License for the specific language governing permissions and limitations
*    under the License.
**/

package net.floodlightcontroller.devicemanager.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProvider;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.devicemanager.IDevice;
import net.floodlightcontroller.devicemanager.IDeviceManager;
import net.floodlightcontroller.devicemanager.IEntityClass;
import net.floodlightcontroller.devicemanager.IEntityClassifier;
import net.floodlightcontroller.devicemanager.IEntityClassifier.EntityField;
import net.floodlightcontroller.packet.ARP;
import net.floodlightcontroller.packet.DHCP;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packet.IPv4;
import net.floodlightcontroller.packet.UDP;
import net.floodlightcontroller.storage.IStorageSource;
import net.floodlightcontroller.storage.IStorageSourceListener;
import net.floodlightcontroller.topology.ITopology;
import net.floodlightcontroller.topology.ITopologyAware;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketIn;
import org.openflow.protocol.OFPortStatus;
import org.openflow.protocol.OFType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * DeviceManager creates Devices based upon MAC addresses seen in the network.
 * It tracks any network addresses mapped to the Device, and its location
 * within the network.
 * @author readams
 */
public class DeviceManagerImpl implements IDeviceManager, IOFMessageListener,
        IOFSwitchListener, ITopologyAware, IStorageSourceListener {  
    protected static Logger logger = 
        LoggerFactory.getLogger(DeviceManagerImpl.class);

    protected IFloodlightProvider floodlightProvider;
    protected ITopology topology;
    protected IStorageSource storageSource;
    
    /**
     * This is the master device map that maps device IDs to {@link Device}
     * objects.
     */
    protected ConcurrentHashMap<Long, Device> deviceMap;
    
    /**
     * Counter used to generate device keys
     */
    protected long deviceKeyCounter = 0;
    
    /**
     * Lock for incrementing the device key counter
     */
    protected Object deviceKeyLock = new Object();
    
    /**
     * This is the primary entity index that maps entities to device IDs.
     */
    protected ConcurrentHashMap<IndexedEntity, Long> primaryIndex;
    
    /**
     * The primary key fields used in the primary index
     */
    protected Set<EntityField> primaryKeyFields;
    
    /**
     * This map contains secondary indices for each of the configured {@ref IEntityClass} 
     * that exist
     */
    protected ConcurrentHashMap<IEntityClass, 
                                ConcurrentHashMap<IndexedEntity, 
                                                  Long>> classIndexMap;

    /**
     * The entity classifier currently in use
     */
    IEntityClassifier entityClassifier;
    
    // **************
    // IDeviceManager
    // **************

    @Override
    public void setEntityClassifier(IEntityClassifier classifier) {
        entityClassifier = classifier;
        primaryKeyFields = classifier.getKeyFields();
    }
    
    @Override
    public void flushEntityCache(IEntityClass entityClass, 
                                 boolean reclassify) {
        // TODO Auto-generated method stub
    }

    @Override
    public IDevice findDevice(long macAddress, Integer ipv4Address, 
                              Short vlan, Long switchDPID, 
                              Integer switchPort) {
        return findDeviceByEntity(new Entity(macAddress, vlan, 
                                             ipv4Address, switchDPID, 
                                             switchPort, null));
    }

    @Override
    public Collection<? extends IDevice> getAllDevices() {
        return Collections.unmodifiableCollection(deviceMap.values());
    }
    
    // ******************
    // IOFMessageListener
    // ******************


    @Override
    public String getName() {
        return "devicemanager";
    }

    @Override
    public int getId() {
        return IOFMessageListener.FlListenerID.DEVICEMANAGERIMPL;
    }

    @Override
    public boolean isCallbackOrderingPrereq(OFType type, String name) {
        // TODO Auto-generated method stub
        return false;
    }

    @Override
    public boolean isCallbackOrderingPostreq(OFType type, String name) {
        // TODO Auto-generated method stub
        return false;
    }
    
    @Override
    public Command receive(IOFSwitch sw, OFMessage msg, 
                           FloodlightContext cntx) {
        switch (msg.getType()) {
            case PACKET_IN:
                return this.processPacketInMessage(sw, 
                                                   (OFPacketIn) msg, cntx);
            case PORT_STATUS:
                return this.processPortStatusMessage(sw, 
                                                     (OFPortStatus) msg);
        }

        logger.error("received an unexpected message {} from switch {}", 
                     msg, sw);
        return Command.CONTINUE;
    }
    
    // **********************
    // IStorageSourceListener
    // **********************
    
    @Override
    public void rowsModified(String tableName, Set<Object> rowKeys) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void rowsDeleted(String tableName, Set<Object> rowKeys) {
        // TODO Auto-generated method stub
        
    }

    // **************
    // ITopologyAware
    // **************
    
    @Override
    public void addedLink(IOFSwitch srcSw, short srcPort, int srcPortState,
                          IOFSwitch dstSw, short dstPort, int dstPortState) {
        // Nothing to do
    }

    @Override
    public void updatedLink(IOFSwitch srcSw, short srcPort,
                            int srcPortState, IOFSwitch dstSw,
                            short dstPort, int dstPortState) {
        // TODO Auto-generated method stub
    }

    @Override
    public void removedLink(IOFSwitch srcSw, short srcPort, IOFSwitch dstSw,
                            short dstPort) {
        // TODO Auto-generated method stub
    }

    @Override
    public void clusterMerged() {
        // TODO Auto-generated method stub
    }

    // *****************
    // IOFSwitchListener
    // *****************
    
    @Override
    public void updatedSwitch(IOFSwitch sw) {
        // TODO Auto-generated method stub
    }


    @Override
    public void addedSwitch(IOFSwitch sw) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void removedSwitch(IOFSwitch sw) {
        // TODO Auto-generated method stub
    }

    // **************
    // Initialization
    // **************

    public void setFloodlightProvider(IFloodlightProvider floodlightProvider) {
        this.floodlightProvider = floodlightProvider;
    }

    public void setStorageSource(IStorageSource storageSource) {
        this.storageSource = storageSource;
    }

    public void setTopology(ITopology topology) {
        this.topology = topology;
    }
    
    public void init() {
        
    }

    public void startUp() {
        if (entityClassifier == null)
            setEntityClassifier(new DefaultEntityClassifier());
        
        deviceMap = new ConcurrentHashMap<Long, Device>();
        primaryIndex = new ConcurrentHashMap<IndexedEntity, Long>();
        classIndexMap = 
                new ConcurrentHashMap<IEntityClass, 
                                      ConcurrentHashMap<IndexedEntity, 
                                                        Long>>();
        
        floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
        floodlightProvider.addOFMessageListener(OFType.PORT_STATUS, this);
        
        // XXX - TODO entity aging timer
    }
    
    // ****************
    // Internal methods
    // ****************
    
    protected Command processPortStatusMessage(IOFSwitch sw, OFPortStatus ps) {
        // XXX - TODO
        return null;        
    }

    /**
     * This method is called for every packet-in and should be optimized for
     * performance.
     * @param sw
     * @param pi
     * @param cntx
     * @return
     */
    protected Command processPacketInMessage(IOFSwitch sw, OFPacketIn pi, 
                                             FloodlightContext cntx) {
        try {
            Ethernet eth = 
                    IFloodlightProvider.bcStore.
                    get(cntx,IFloodlightProvider.CONTEXT_PI_PAYLOAD);

            // Extract source entity information
            Entity srcEntity = 
                    getSourceEntityFromPacket(eth, sw, pi.getInPort());
            if (srcEntity == null)
                return Command.STOP;
            
            if (isGratArp(eth)) {
                // XXX - TODO - Clear attachment points from other clusters
            }
            
            // Learn/lookup device information
            Device srcDevice = learnDeviceByEntity(srcEntity);
            if (srcDevice == null)
                return Command.STOP;
            
            // Store the source device in the context
            fcStore.put(cntx, CONTEXT_SRC_DEVICE, srcDevice);
            
            // Find the device matching the destination from the entity
            // classes of the source.
            Entity dstEntity = getDestEntityFromPacket(eth);
            if (dstEntity != null) {
                Device dstDevice = findDeviceByEntity(dstEntity);

                if (dstDevice == null) {
                    // This can only happen if we have attachment point
                    // key fields since attachment point information isn't
                    // available for destination devices.
                    /*
                    ArrayList<Device> candidates = new ArrayList<Device>();
                    for (IEntityClass clazz : srcDevice.getEntityClasses()) {
                        Device c = findDeviceInClassByEntity(clazz, dstEntity);
                        if (c != null)
                            candidates.add(c);
                    }
                    if (candidates.size() == 1) {
                        dstDevice = candidates.get(0);
                    } else if (candidates.size() > 1) {
                        // ambiguous device.  A higher-order component will 
                        // need to deal with it by assigning priority
                        // XXX - TODO
                    }
                    */

                }
                if (dstDevice != null)
                    fcStore.put(cntx, CONTEXT_DST_DEVICE, dstDevice);
            }
            
            return Command.CONTINUE;

        } finally {
            processUpdates();
        }
        
    }
    
    private void processUpdates() {
        // XXX - TODO
    }
    
    /**
     * Check whether the port is a physical port. We should not learn 
     * attachment points on "special" ports.
     * @param port the port to check
     * @return
     */
    private boolean isValidInputPort(short port) {
        return ((int)port & 0xff00) != 0xff00 ||
                     port == (short)0xfffe;
    }
    
    private boolean isGratArp(Ethernet eth) {
        if (eth.getPayload() instanceof ARP) {
            ARP arp = (ARP) eth.getPayload();
            if (arp.isGratuitous()) {
                return true;
            }
        }
        return false;
    }

    private int getSrcNwAddr(Ethernet eth, long dlAddr) {
        if (eth.getPayload() instanceof ARP) {
            ARP arp = (ARP) eth.getPayload();
            if ((arp.getProtocolType() == ARP.PROTO_TYPE_IP) &&
                (Ethernet.toLong(arp.getSenderHardwareAddress()) == dlAddr)) {
                return IPv4.toIPv4Address(arp.getSenderProtocolAddress());
            }
        } else if (eth.getPayload() instanceof IPv4) {
            IPv4 ipv4 = (IPv4) eth.getPayload();
            if (ipv4.getPayload() instanceof UDP) {
                UDP udp = (UDP)ipv4.getPayload();
                if (udp.getPayload() instanceof DHCP) {
                    DHCP dhcp = (DHCP)udp.getPayload();
                    if (dhcp.getOpCode() == DHCP.OPCODE_REPLY) {
                        return ipv4.getSourceAddress();
                    }
                }
            }
        }
        return 0;
    }
    
    /**
     * Parse an entity from an {@link Ethernet} packet.
     * @param eth the packet to parse
     * @param sw the switch on which the packet arrived
     * @param pi the original packetin
     * @return the entity from the packet
     */
    private Entity getSourceEntityFromPacket(Ethernet eth, 
                                             IOFSwitch sw, 
                                             int port) {
        byte[] dlAddrArr = eth.getSourceMACAddress();
        long dlAddr = Ethernet.toLong(dlAddrArr);

        // Ignore broadcast/multicast source
        if ((dlAddrArr[0] & 0x1) != 0)
            return null;

        boolean learnap = true;
        if (topology.isInternal(sw, (short)port) ||
            !isValidInputPort((short)port)) {
            // If this is an internal port or we otherwise don't want
            // to learn on these ports.  In the future, we should
            // handle this case by labeling flows with something that
            // will give us the entity class.  For now, we'll do our
            // best assuming attachment point information isn't used
            // as a key field.
            learnap = false;
        }
       
        short vlan = eth.getVlanID();
        int nwSrc = getSrcNwAddr(eth, dlAddr);
        return new Entity(dlAddr,
                          ((vlan >= 0) ? vlan : null),
                          ((nwSrc != 0) ? nwSrc : null),
                          (learnap ? sw.getId() : null),
                          (learnap ? port : null),
                          new Date());
    }
    
    /**
     * Get a (partial) entity for the destination from the packet. 
     * @param eth
     * @return
     */
    private Entity getDestEntityFromPacket(Ethernet eth) {
        byte[] dlAddrArr = eth.getDestinationMACAddress();
        long dlAddr = Ethernet.toLong(dlAddrArr);
        short vlan = eth.getVlanID();
        int nwDst = 0;

        // Ignore broadcast/multicast destination
        if ((dlAddrArr[0] & 0x1) != 0)
            return null;

        if (eth.getPayload() instanceof IPv4) {
            IPv4 ipv4 = (IPv4) eth.getPayload();
            nwDst = ipv4.getDestinationAddress();
        }
        
        return new Entity(dlAddr,
                          ((vlan >= 0) ? vlan : null),
                          ((nwDst != 0) ? nwDst : null),
                          null,
                          null,
                          null);
    }

    /**
     * Look up a {@link Device} based on the provided {@link Entity}.
     * @param entity the entity to search for
     * @return The {@link Device} object if found
     */
    protected Device findDeviceByEntity(Entity entity) {
        IndexedEntity ie = new IndexedEntity(primaryKeyFields, entity);
        Long deviceKey = primaryIndex.get(ie);
        if (deviceKey == null)
            return null;
        return deviceMap.get(deviceKey);
    }

    /**
     * Look up a {@link Device} within a particular entity class based on 
     * the provided {@link Entity}.
     * @param clazz the entity class to search for the entity
     * @param entity the entity to search for
     * @return The {@link Device} object if found
     */
    protected Device findDeviceInClassByEntity(IEntityClass clazz,
                                               Entity entity) {
        // XXX - TODO
        throw new UnsupportedOperationException();
    }
 
    /**
     * Look up a {@link Device} based on the provided {@link Entity}.  Also learns
     * based on the new entity, and will update existing devices as required. 
     * 
     * @param entity the {@link Entity}
     * @return The {@link Device} object if found
     */
    protected Device learnDeviceByEntity(Entity entity) {
        IndexedEntity ie = new IndexedEntity(primaryKeyFields, entity);
        ArrayList<Long> deleteQueue = null;
        Device device = null;
        
        // we may need to restart the learning process if we detect
        // concurrent modification.  Note that we ensure that at least
        // one thread should always succeed so we don't get into infinite
        // starvation loops
        while (true) {
            // Look up the fully-qualified entity to see if it already
            // exists in the primary entity index.
            Long deviceKey = primaryIndex.get(ie);
            Collection<IEntityClass> classes = null;
            if (deviceKey == null) {
                // If the entity does not exist in the primary entity index, 
                // use the entity classifier for find the classes for the 
                // entity. Look up the entity in each of the returned classes'
                // class entity indexes.
                classes = entityClassifier.classifyEntity(entity);
                for (IEntityClass clazz : classes) {
                    // ensure that a class index exists for the class if
                    // needed
                    ConcurrentHashMap<IndexedEntity, Long> classIndex;
                    try {
                        classIndex = getClassIndex(clazz);
                    } catch (ConcurrentModificationException e) {
                        continue;
                    }
                        
                    if (classIndex != null) {
                        IndexedEntity sie = 
                                new IndexedEntity(clazz.getKeyFields(), 
                                                  entity);
                        deviceKey = classIndex.get(sie);
                    }
                }
            }
            if (deviceKey != null) {
                // If the primary or secondary index contains the entity, 
                // update the entity timestamp, then use resulting device 
                // key to look up the device in the device map, and
                // use the referenced Device below.
                entity.setLastSeenTimestamp(new Date());
                device = deviceMap.get(deviceKey);
                if (device == null)
                    continue;
            } else {
                // If the secondary index does not contain the entity, 
                // create a new Device object containing the entity, and 
                // generate a new device ID
                synchronized (deviceKeyLock) {
                    deviceKey = Long.valueOf(deviceKeyCounter++);
                }
                device = new Device(deviceKey, entity, classes);
                
                // Add the new device to the primary map with a simple put
                deviceMap.put(deviceKey, device);
                
                if (!updateIndices(device, deviceKey)) {
                    if (deleteQueue == null)
                        deleteQueue = new ArrayList<Long>();
                    deleteQueue.add(deviceKey);
                    continue;
                }
            }
            
            if (device.containsEntity(entity)) {
                break;
            } else {
                Device newDevice = new Device(device, entity, classes);
                // XXX - TODO When adding an entity, any existing entities on the
                // same OpenFlow switch cluster but a different attachment point
                // should be removed. If an entity being removed contains an 
                // IP address but the new entity does not contain that IP, 
                // then a new entity should be added containing the IP 
                // (including updating the entity caches), preserving the old 
                // timestamp of the entity.
                
                // XXX - TODO Handle port channels
                
                // XXX - TODO Handle broadcast domains
                
                // XXX - TODO Prevent flapping of entities
                
                boolean res = deviceMap.replace(deviceKey, device, newDevice);
                // If replace returns false, restart the process from the 
                // beginning (this implies another thread concurrently 
                // modified this Device).
                if (!res)
                    continue;
                
                device = newDevice;
                
                if (!updateIndices(device, deviceKey)) {
                    continue;
                }
            }
        }
        
        if (deleteQueue != null) {
            for (Long l : deleteQueue) {
                deviceMap.remove(l);
            }
        }
        return device;
    }
    
    /**
     * Get the secondary index for a class.  Will return null if the 
     * secondary index was created concurrently in another thread. 
     * @param clazz the class for the index
     * @return
     */
    private ConcurrentHashMap<IndexedEntity, 
                              Long> getClassIndex(IEntityClass clazz) 
                                      throws ConcurrentModificationException {
        ConcurrentHashMap<IndexedEntity, Long> classIndex =
                classIndexMap.get(clazz);
        if (classIndex != null) return classIndex;
        
        if (primaryKeyFields.equals(clazz.getKeyFields())) {
            return null;
        }
        
        classIndex = 
                new ConcurrentHashMap<IndexedEntity, Long>();
        ConcurrentHashMap<IndexedEntity, Long> r = 
                classIndexMap.putIfAbsent(clazz, 
                                          classIndex);
        if (r != null) {
            // concurrent add; restart
            throw new ConcurrentModificationException();
        }
        return classIndex;
    }
    
    /**
     * Update both the primary and class indices for the provided device.
     * If the update fails because of a concurrent update, will return false.
     * @param device the device to update
     * @param deviceKey the device key for the device
     * @return true if the update succeeded, false otherwise.
     */
    private boolean updateIndices(Device device, Long deviceKey) {
        if (!updateIndex(device, deviceKey, 
                         primaryIndex, primaryKeyFields)) {
            return false;
        }
        for (IEntityClass clazz : device.getEntityClasses()) {
            Set<EntityField> ef = clazz.getKeyFields();
            if (primaryKeyFields.equals(ef))
                continue;
            ConcurrentHashMap<IndexedEntity, Long> classIndex;
            try {
                classIndex = getClassIndex(clazz); 
            } catch (ConcurrentModificationException e) {
                return false;
            }
            if (classIndex != null &&
                !updateIndex(device, deviceKey, classIndex, ef))
                return false;
        }
        // XXX - TODO handle indexed views into data
        return true;
    }                     
    
    /**
     * Attempt to update an index with the entities in the provided
     * {@link Device}.  If the update fails because of a concurrent update,
     * will return false.
     * @param device the device to update
     * @param deviceKey the device key for the device
     * @param index the index to update
     * @param keyFields the key fields to use for the index
     * @return true if the update succeeded, false otherwise.
     */
    private boolean updateIndex(Device device, 
                                Long deviceKey,
                                ConcurrentHashMap<IndexedEntity, 
                                                  Long> index, 
                                Set<EntityField> keyFields) {
        for (Entity e : device.entities) {
            IndexedEntity ie = new IndexedEntity(keyFields, e);
            Long ret = index.putIfAbsent(ie, deviceKey);
            if (ret != null) {
                // If the return value is non-null, then fail the insert 
                // (this implies that a device using this entity has 
                // already been created in another thread).
                return false;
            }
        }
        
        return true;
    }
    
}