-
Gregor Maier authoredGregor Maier authored
LinkDiscoveryManager.java 61.83 KiB
/**
* Copyright 2011, Big Switch Networks, Inc.
* Originally created by David Erickson, Stanford University
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
**/
package net.floodlightcontroller.linkdiscovery.internal;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IInfoProvider;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IOFSwitchListener;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.core.util.SingletonTask;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery.LinkType;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery.SwitchType;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery.LDUpdate;
import net.floodlightcontroller.linkdiscovery.ILinkDiscovery.UpdateOperation;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryListener;
import net.floodlightcontroller.linkdiscovery.ILinkDiscoveryService;
import net.floodlightcontroller.linkdiscovery.LinkInfo;
import net.floodlightcontroller.linkdiscovery.LinkTuple;
import net.floodlightcontroller.linkdiscovery.SwitchPortTuple;
import net.floodlightcontroller.packet.BDDP;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packet.IPv4;
import net.floodlightcontroller.packet.LLDP;
import net.floodlightcontroller.packet.LLDPTLV;
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.routing.IRoutingService;
import net.floodlightcontroller.storage.IResultSet;
import net.floodlightcontroller.storage.IStorageSourceService;
import net.floodlightcontroller.storage.IStorageSourceListener;
import net.floodlightcontroller.storage.OperatorPredicate;
import net.floodlightcontroller.storage.StorageException;
import net.floodlightcontroller.threadpool.IThreadPoolService;
import net.floodlightcontroller.topology.web.TopologyWebRoutable;
import net.floodlightcontroller.util.EventHistory;
import net.floodlightcontroller.util.EventHistory.EvAction;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPacketIn;
import org.openflow.protocol.OFPacketOut;
import org.openflow.protocol.OFPhysicalPort;
import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
import org.openflow.protocol.OFPhysicalPort.OFPortState;
import org.openflow.protocol.OFPort;
import org.openflow.protocol.OFPortStatus;
import org.openflow.protocol.OFPortStatus.OFPortReason;
import org.openflow.protocol.OFType;
import org.openflow.protocol.action.OFAction;
import org.openflow.protocol.action.OFActionOutput;
import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class sends out LLDP messages containing the sending switch's datapath
* id as well as the outgoing port number. Received LLrescDP messages that
* match a known switch cause a new LinkTuple to be created according to the
* invariant rules listed below. This new LinkTuple is also passed to routing
* if it exists to trigger updates.
*
* This class also handles removing links that are associated to switch ports
* that go down, and switches that are disconnected.
*
* Invariants:
* -portLinks and switchLinks will not contain empty Sets outside of
* critical sections
* -portLinks contains LinkTuples where one of the src or dst
* SwitchPortTuple matches the map key
* -switchLinks contains LinkTuples where one of the src or dst
* SwitchPortTuple's id matches the switch id
* -Each LinkTuple will be indexed into switchLinks for both
* src.id and dst.id, and portLinks for each src and dst
* -The updates queue is only added to from within a held write lock
*
* @author David Erickson (daviderickson@cs.stanford.edu)
*/
public class LinkDiscoveryManager
implements IOFMessageListener, IOFSwitchListener,
IStorageSourceListener, ILinkDiscoveryService,
IFloodlightModule, IInfoProvider {
protected static Logger log = LoggerFactory.getLogger(LinkDiscoveryManager.class);
// Names of table/fields for links in the storage API
private static final String LINK_TABLE_NAME = "controller_link";
private static final String LINK_ID = "id";
private static final String LINK_SRC_SWITCH = "src_switch_id";
private static final String LINK_SRC_PORT = "src_port";
private static final String LINK_SRC_PORT_STATE = "src_port_state";
private static final String LINK_DST_SWITCH = "dst_switch_id";
private static final String LINK_DST_PORT = "dst_port";
private static final String LINK_DST_PORT_STATE = "dst_port_state";
private static final String LINK_VALID_TIME = "valid_time";
private static final String LINK_TYPE = "link_type";
private static final String SWITCH_TABLE_NAME = "controller_switch";
private static final String SWITCH_CORE_SWITCH = "core_switch";
protected IFloodlightProviderService floodlightProvider;
protected IStorageSourceService storageSource;
protected IRoutingService routingEngine;
protected IRestApiService restApi;
protected IThreadPoolService threadPool;
private static final byte[] LLDP_STANDARD_DST_MAC_STRING =
HexString.fromHexString("01:80:c2:00:00:00");
// BigSwitch OUI is 5C:16:C7, so 5D:16:C7 is the multicast version
// private static final String LLDP_BSN_DST_MAC_STRING = "5d:16:c7:00:00:01";
private static final String LLDP_BSN_DST_MAC_STRING = "ff:ff:ff:ff:ff:ff";
/**
* Map from link to the most recent time it was verified functioning
*/
protected Map<LinkTuple, LinkInfo> links;
protected int lldpFrequency = 15 * 1000; // sending frequency
protected int lldpTimeout = 35 * 1000; // timeout
protected LLDPTLV controllerTLV;
protected ReentrantReadWriteLock lock;
/**
* Map from a id:port to the set of links containing it as an endpoint
*/
protected Map<SwitchPortTuple, Set<LinkTuple>> portLinks;
/**
* Set of link tuples over which multicast LLDPs are received
* and unicast LLDPs are not received.
*/
protected Map<SwitchPortTuple, Set<LinkTuple>> portBroadcastDomainLinks;
SingletonTask loopDetectTask;
protected volatile boolean shuttingDown = false;
/**
* Map from switch id to a set of all links with it as an endpoint
*/
protected Map<IOFSwitch, Set<LinkTuple>> switchLinks;
/* topology aware components are called in the order they were added to the
* the array */
protected ArrayList<ILinkDiscoveryListener> linkDiscoveryAware;
protected BlockingQueue<LDUpdate> updates;
protected Thread updatesThread;
//This map provides the ids of broadcast domains connected to a switch cluster
protected Map<Long, Set<Long>> switchClusterBroadcastDomainMap;
protected boolean isTopologyValid = false;
public int getLldpFrequency() {
return lldpFrequency;
}
public int getLldpTimeout() {
return lldpTimeout;
}
public Map<SwitchPortTuple, Set<LinkTuple>> getPortLinks() {
return portLinks;
}
public boolean isShuttingDown() {
return shuttingDown;
}
private void doUpdatesThread() throws InterruptedException {
do {
LDUpdate update = updates.take();
if (log.isTraceEnabled()) {
log.trace("Dispatching link discovery update {} {} {} {} {} for {}",
new Object[]{update.getOperation(),
HexString.toHexString(update.getSrc()), update.getSrcPort(),
HexString.toHexString(update.getDst()), update.getDstPort(),
linkDiscoveryAware});
}
if (linkDiscoveryAware != null) {
for (ILinkDiscoveryListener lda : linkDiscoveryAware) { // order maintained
lda.linkDiscoveryUpdate(update);
}
}
} while (updates.peek() != null);
}
protected void sendLLDPs(IOFSwitch sw, OFPhysicalPort port, boolean isStandard) {
if (log.isTraceEnabled()) {
log.trace("Sending LLDP packet out of swich: {}, port: {}",
sw.getStringId(), port.getPortNumber());
}
LLDP lldp;
Ethernet ethernet;
if (isStandard) {
ethernet = new Ethernet()
.setSourceMACAddress(port.getHardwareAddress())
.setDestinationMACAddress(LLDP_STANDARD_DST_MAC_STRING)
.setEtherType(Ethernet.TYPE_LLDP);
lldp = new LLDP();
} else {
ethernet = new Ethernet()
.setSourceMACAddress(port.getHardwareAddress())
.setDestinationMACAddress(LLDP_BSN_DST_MAC_STRING)
.setEtherType(Ethernet.TYPE_BDDP);
lldp = new BDDP();
}
// using "nearest customer bridge" MAC address for broadest possible propagation
// through provider and TPMR bridges (see IEEE 802.1AB-2009 and 802.1Q-2011),
// in particular the Linux bridge which behaves mostly like a provider bridge
ethernet.setPayload(lldp);
byte[] chassisId = new byte[] {4, 0, 0, 0, 0, 0, 0}; // filled in later
byte[] portId = new byte[] {2, 0, 0}; // filled in later
lldp.setChassisId(new LLDPTLV().setType((byte) 1).setLength((short) 7).setValue(chassisId));
lldp.setPortId(new LLDPTLV().setType((byte) 2).setLength((short) 3).setValue(portId));
lldp.setTtl(new LLDPTLV().setType((byte) 3).setLength((short) 2).setValue(new byte[] {0, 0x78}));
// OpenFlow OUI - 00-26-E1
byte[] dpidTLVValue = new byte[] {0x0, 0x26, (byte) 0xe1, 0, 0, 0, 0, 0, 0, 0, 0, 0};
LLDPTLV dpidTLV = new LLDPTLV().setType((byte) 127).setLength((short) 12).setValue(dpidTLVValue);
lldp.getOptionalTLVList().add(dpidTLV);
// Add the controller identifier to the TLV value.
lldp.getOptionalTLVList().add(controllerTLV);
byte[] dpidArray = new byte[8];
ByteBuffer dpidBB = ByteBuffer.wrap(dpidArray);
ByteBuffer portBB = ByteBuffer.wrap(portId, 1, 2);
Long dpid = sw.getId();
dpidBB.putLong(dpid);
// set the ethernet source mac to last 6 bytes of dpid
System.arraycopy(dpidArray, 2, ethernet.getSourceMACAddress(), 0, 6);
// set the chassis id's value to last 6 bytes of dpid
System.arraycopy(dpidArray, 2, chassisId, 1, 6);
// set the optional tlv to the full dpid
System.arraycopy(dpidArray, 0, dpidTLVValue, 4, 8);
if (port.getPortNumber() == OFPort.OFPP_LOCAL.getValue())
return;
// set the portId to the outgoing port
portBB.putShort(port.getPortNumber());
if (log.isTraceEnabled()) {
log.trace("Sending LLDP out of interface: {}/{}",
sw.toString(), port.getPortNumber());
}
// serialize and wrap in a packet out
byte[] data = ethernet.serialize();
OFPacketOut po = (OFPacketOut) floodlightProvider.getOFMessageFactory().getMessage(OFType.PACKET_OUT);
po.setBufferId(OFPacketOut.BUFFER_ID_NONE);
po.setInPort(OFPort.OFPP_NONE);
// set actions
List<OFAction> actions = new ArrayList<OFAction>();
actions.add(new OFActionOutput(port.getPortNumber(), (short) 0));
po.setActions(actions);
po.setActionsLength((short) OFActionOutput.MINIMUM_LENGTH);
// set data
po.setLengthU(OFPacketOut.MINIMUM_LENGTH + po.getActionsLength() + data.length);
po.setPacketData(data);
// send
try {
sw.write(po, null);
} catch (IOException e) {
log.error("Failure sending LLDP out port {} on switch {}",
new Object[]{ port.getPortNumber(), sw }, e);
}
}
protected void sendLLDPs(SwitchPortTuple swt) {
IOFSwitch sw = swt.getSw();
if (sw == null) return;
OFPhysicalPort port = sw.getPort(swt.getPort());
if (port != null) {
sendLLDPs(sw, port, true);
sendLLDPs(sw, port, false);
}
}
protected void sendLLDPs(IOFSwitch sw) {
if (sw.getEnabledPorts() != null) {
for (OFPhysicalPort port : sw.getEnabledPorts()) {
sendLLDPs(sw, port, true);
sendLLDPs(sw, port, false);
}
}
sw.flush();
}
protected void sendLLDPs() {
if (log.isTraceEnabled()) {
log.trace("Sending LLDP packets out of all the enabled ports on switch {}");
}
Map<Long, IOFSwitch> switches = floodlightProvider.getSwitches();
for (Entry<Long, IOFSwitch> entry : switches.entrySet()) {
IOFSwitch sw = entry.getValue();
sendLLDPs(sw);
}
}
protected void setControllerTLV() {
//Setting the controllerTLVValue based on current nano time,
//controller's IP address, and the network interface object hash
//the corresponding IP address.
final int prime = 7867;
InetAddress localIPAddress = null;
NetworkInterface localInterface = null;
byte[] controllerTLVValue = new byte[] {0, 0, 0, 0, 0, 0, 0, 0}; // 8 byte value.
ByteBuffer bb = ByteBuffer.allocate(10);
try{
localIPAddress = java.net.InetAddress.getLocalHost();
localInterface = NetworkInterface.getByInetAddress(localIPAddress);
} catch (Exception e) {
e.printStackTrace();
}
long result = System.nanoTime();
if (localIPAddress != null)
result = result * prime + IPv4.toIPv4Address(localIPAddress.getHostAddress());
if (localInterface != null)
result = result * prime + localInterface.hashCode();
// set the first 4 bits to 0.
result = result & (0x0fffffffffffffffL);
bb.putLong(result);
log.info("Controller TLV: {}", result);
bb.rewind();
bb.get(controllerTLVValue, 0, 8);
this.controllerTLV = new LLDPTLV().setType((byte) 0x0c).setLength((short) 8).setValue(controllerTLVValue);
}
@Override
public String getName() {
return "topology";
}
@Override
public Command receive(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
switch (msg.getType()) {
case PACKET_IN:
return this.handlePacketIn(sw, (OFPacketIn) msg, cntx);
case PORT_STATUS:
return this.handlePortStatus(sw, (OFPortStatus) msg);
}
log.error("Received an unexpected message {} from switch {}", msg, sw);
return Command.CONTINUE;
}
private Command handleLldp(LLDP lldp, IOFSwitch sw, OFPacketIn pi, boolean isStandard, FloodlightContext cntx) {
// If this is a malformed LLDP, or not from us, exit
if (lldp.getPortId() == null || lldp.getPortId().getLength() != 3)
return Command.CONTINUE;
long myId = ByteBuffer.wrap(controllerTLV.getValue()).getLong();
long otherId = 0;
boolean myLLDP = false;
ByteBuffer portBB = ByteBuffer.wrap(lldp.getPortId().getValue());
portBB.position(1);
Short remotePort = portBB.getShort();
IOFSwitch remoteSwitch = null;
// Verify this LLDP packet matches what we're looking for
for (LLDPTLV lldptlv : lldp.getOptionalTLVList()) {
if (lldptlv.getType() == 127 && lldptlv.getLength() == 12 &&
lldptlv.getValue()[0] == 0x0 && lldptlv.getValue()[1] == 0x26 &&
lldptlv.getValue()[2] == (byte)0xe1 && lldptlv.getValue()[3] == 0x0) {
ByteBuffer dpidBB = ByteBuffer.wrap(lldptlv.getValue());
remoteSwitch = floodlightProvider.getSwitches().get(dpidBB.getLong(4));
} else if (lldptlv.getType() == 12 && lldptlv.getLength() == 8){
otherId = ByteBuffer.wrap(lldptlv.getValue()).getLong();
if (myId == otherId)
myLLDP = true;
}
}
if (myLLDP == false) {
// This is not the LLDP sent by this controller.
// If the LLDP message has multicast bit set, then we need to broadcast
// the packet as a regular packet.
if (isStandard) {
if (log.isTraceEnabled()) {
log.trace("Getting standard LLDP from a different controller and quelching it.");
}
return Command.STOP;
}
else if (myId < otherId) {
if (log.isTraceEnabled()) {
log.trace("Getting BDDP packets from a different controller" +
"and letting it go through normal processing chain.");
}
return Command.CONTINUE;
}
}
if (remoteSwitch == null) {
// Ignore LLDPs not generated by Floodlight, or from a switch that has recently
// disconnected, or from a switch connected to another Floodlight instance
if (log.isDebugEnabled()) {
log.debug("Received LLDP from remote switch not connected to the controller");
}
return Command.STOP;
}
if (!remoteSwitch.portEnabled(remotePort)) {
log.debug("Ignoring link with disabled source port: switch {} port {}", remoteSwitch, remotePort);
return Command.STOP;
}
if (!sw.portEnabled(pi.getInPort())) {
log.debug("Ignoring link with disabled dest port: switch {} port {}", sw, pi.getInPort());
return Command.STOP;
}
OFPhysicalPort physicalPort = remoteSwitch.getPort(remotePort);
int srcPortState = (physicalPort != null) ? physicalPort.getState() : 0;
physicalPort = sw.getPort(remotePort);
int dstPortState = (physicalPort != null) ? physicalPort.getState() : 0;
// Store the time of update to this link, and push it out to routingEngine
LinkTuple lt = new LinkTuple(new SwitchPortTuple(remoteSwitch, remotePort),
new SwitchPortTuple(sw, pi.getInPort()));
Integer srcPortStateObj = Integer.valueOf(srcPortState);
Integer dstPortStateObj = Integer.valueOf(dstPortState);
Long unicastValidTime = null;
Long multicastValidTime = null;
if (isStandard)
unicastValidTime = System.currentTimeMillis();
else
multicastValidTime = System.currentTimeMillis();
LinkInfo newLinkInfo =
new LinkInfo(unicastValidTime, multicastValidTime, srcPortStateObj, dstPortStateObj);
addOrUpdateLink(lt, newLinkInfo);
// Consume this message
return Command.STOP;
}
protected Command handlePacketIn(IOFSwitch sw, OFPacketIn pi,
FloodlightContext cntx) {
Ethernet eth =
IFloodlightProviderService.bcStore.get(cntx,
IFloodlightProviderService.CONTEXT_PI_PAYLOAD);
if(eth.getEtherType() == Ethernet.TYPE_BDDP) {
return handleLldp((LLDP) eth.getPayload(), sw, pi, false, cntx);
} else if (eth.getEtherType() == Ethernet.TYPE_LLDP) {
return handleLldp((LLDP) eth.getPayload(), sw, pi, true, cntx);
} else if (eth.getEtherType() < 1500 &&
Arrays.equals(eth.getDestinationMACAddress(),
LLDP_STANDARD_DST_MAC_STRING)) {
// drop any other link discovery/spanning tree protocols
return Command.STOP;
}
return Command.CONTINUE;
}
protected ILinkDiscovery.LinkType getLinkType(LinkTuple lt, LinkInfo info) {
if (info.getUnicastValidTime() != null) {
return ILinkDiscovery.LinkType.DIRECT_LINK;
} else if (info.getMulticastValidTime() != null) {
return ILinkDiscovery.LinkType.MULTIHOP_LINK;
}
return ILinkDiscovery.LinkType.INVALID_LINK;
}
protected void addOrUpdateLink(LinkTuple lt, LinkInfo newLinkInfo) {
lock.writeLock().lock();
try {
LinkInfo oldLinkInfo = links.put(lt, newLinkInfo);
if (log.isTraceEnabled()) {
log.trace("addOrUpdateLink: {} {}",
lt,
(newLinkInfo.getMulticastValidTime()!=null) ? "multicast" : "unicast");
}
UpdateOperation updateOperation = null;
boolean linkChanged = false;
if (oldLinkInfo == null) {
// index it by switch source
if (!switchLinks.containsKey(lt.getSrc().getSw()))
switchLinks.put(lt.getSrc().getSw(),
new HashSet<LinkTuple>());
switchLinks.get(lt.getSrc().getSw()).add(lt);
// index it by switch dest
if (!switchLinks.containsKey(lt.getDst().getSw()))
switchLinks.put(lt.getDst().getSw(),
new HashSet<LinkTuple>());
switchLinks.get(lt.getDst().getSw()).add(lt);
// index both ends by switch:port
if (!portLinks.containsKey(lt.getSrc()))
portLinks.put(lt.getSrc(), new HashSet<LinkTuple>());
portLinks.get(lt.getSrc()).add(lt);
if (!portLinks.containsKey(lt.getDst()))
portLinks.put(lt.getDst(), new HashSet<LinkTuple>());
portLinks.get(lt.getDst()).add(lt);
// Add to portNOFLinks if the unicast valid time is null
if (newLinkInfo.getUnicastValidTime() == null)
addLinkToBroadcastDomain(lt);
writeLink(lt, newLinkInfo);
updateOperation = UpdateOperation.ADD_OR_UPDATE;
linkChanged = true;
if (log.isDebugEnabled()) {
log.debug("Added link {}", lt);
}
// Add to event history
evHistTopoLink(lt.getSrc().getSw().getId(),
lt.getDst().getSw().getId(),
lt.getSrc().getPort(),
lt.getDst().getPort(),
newLinkInfo.getSrcPortState(), newLinkInfo.getDstPortState(),
EvAction.LINK_ADDED, "LLDP Recvd");
} else {
// Since the link info is already there, we need to
// update the right fields.
if (newLinkInfo.getUnicastValidTime() == null) {
// This is due to a multicast LLDP, so copy the old unicast
// value.
if (oldLinkInfo.getUnicastValidTime() != null) {
newLinkInfo.setUnicastValidTime(oldLinkInfo.getUnicastValidTime());
}
} else if (newLinkInfo.getMulticastValidTime() == null) {
// This is due to a unicast LLDP, so copy the old multicast
// value.
if (oldLinkInfo.getMulticastValidTime() != null) {
newLinkInfo.setMulticastValidTime(oldLinkInfo.getMulticastValidTime());
}
}
Long oldTime = oldLinkInfo.getUnicastValidTime();
Long newTime = newLinkInfo.getUnicastValidTime();
// the link has changed its state between openflow and non-openflow
// if the unicastValidTimes are null or not null
if (oldTime != null & newTime == null) {
// openflow -> non-openflow transition
// we need to add the link tuple to the portNOFLinks
addLinkToBroadcastDomain(lt);
linkChanged = true;
} else if (oldTime == null & newTime != null) {
// non-openflow -> openflow transition
// we need to remove the link from the portNOFLinks
removeLinkFromBroadcastDomain(lt);
linkChanged = true;
}
// Only update the port states if they've changed
if (newLinkInfo.getSrcPortState().intValue() != oldLinkInfo.getSrcPortState().intValue() ||
newLinkInfo.getDstPortState().intValue() != oldLinkInfo.getDstPortState().intValue())
linkChanged = true;
// Write changes to storage. This will always write the updated
// valid time, plus the port states if they've changed (i.e. if
// they weren't set to null in the previous block of code.
writeLink(lt, newLinkInfo);
if (linkChanged) {
updateOperation = UpdateOperation.ADD_OR_UPDATE;
if (log.isDebugEnabled()) {
log.debug("Updated link {}", lt);
}
// Add to event history
evHistTopoLink(lt.getSrc().getSw().getId(),
lt.getDst().getSw().getId(),
lt.getSrc().getPort(),
lt.getDst().getPort(),
newLinkInfo.getSrcPortState(), newLinkInfo.getDstPortState(),
EvAction.LINK_PORT_STATE_UPDATED,
"LLDP Recvd");
}
}
if (linkChanged) {
updates.add(new LDUpdate(lt, newLinkInfo.getSrcPortState(), newLinkInfo.getDstPortState(), getLinkType(lt, newLinkInfo), updateOperation));
}
} finally {
lock.writeLock().unlock();
}
}
public Map<IOFSwitch, Set<LinkTuple>> getSwitchLinks() {
return this.switchLinks;
}
/**
* Removes links from memory and storage.
* @param links The List of @LinkTuple to delete.
*/
protected void deleteLinks(List<LinkTuple> links, String reason) {
lock.writeLock().lock();
try {
for (LinkTuple lt : links) {
this.switchLinks.get(lt.getSrc().getSw()).remove(lt);
this.switchLinks.get(lt.getDst().getSw()).remove(lt);
if (this.switchLinks.containsKey(lt.getSrc().getSw()) &&
this.switchLinks.get(lt.getSrc().getSw()).isEmpty())
this.switchLinks.remove(lt.getSrc().getSw());
if (this.switchLinks.containsKey(lt.getDst().getSw()) &&
this.switchLinks.get(lt.getDst().getSw()).isEmpty())
this.switchLinks.remove(lt.getDst().getSw());
this.portLinks.get(lt.getSrc()).remove(lt);
if (this.portLinks.get(lt.getSrc()).isEmpty())
this.portLinks.remove(lt.getSrc());
this.portLinks.get(lt.getDst()).remove(lt);
if (this.portLinks.get(lt.getDst()).isEmpty())
this.portLinks.remove(lt.getDst());
this.links.remove(lt);
updates.add(new LDUpdate(lt, 0, 0, null, UpdateOperation.REMOVE));
// Update Event History
evHistTopoLink(lt.getSrc().getSw().getId(),
lt.getDst().getSw().getId(),
lt.getSrc().getPort(),
lt.getDst().getPort(),
0, 0, // Port states
EvAction.LINK_DELETED, reason);
// remove link from
removeLinkFromStorage(lt);
if (log.isDebugEnabled()) {
log.debug("Deleted link {}", lt);
}
}
} finally {
lock.writeLock().unlock();
}
}
/**
* Handles an OFPortStatus message from a switch. We will add or
* delete LinkTupes as well re-compute the topology if needed.
* @param sw The IOFSwitch that sent the port status message
* @param ps The OFPortStatus message
* @return The Command to continue or stop after we process this message
*/
protected Command handlePortStatus(IOFSwitch sw, OFPortStatus ps) {
if (log.isDebugEnabled()) {
log.debug("handlePortStatus: Switch {} port #{} reason {}; " +
"config is {} state is {}",
new Object[] {sw.getStringId(),
ps.getDesc().getPortNumber(),
ps.getReason(),
ps.getDesc().getConfig(),
ps.getDesc().getState()});
}
SwitchPortTuple tuple =
new SwitchPortTuple(sw, ps.getDesc().getPortNumber());
boolean link_deleted = false;
lock.writeLock().lock();
try {
boolean topologyChanged = false;
// if ps is a delete, or a modify where the port is down or
// configured down
if ((byte)OFPortReason.OFPPR_DELETE.ordinal() == ps.getReason() ||
((byte)OFPortReason.OFPPR_MODIFY.ordinal() ==
ps.getReason() && !portEnabled(ps.getDesc()))) {
List<LinkTuple> eraseList = new ArrayList<LinkTuple>();
if (this.portLinks.containsKey(tuple)) {
if (log.isDebugEnabled()) {
log.debug("handlePortStatus: Switch {} port #{} " +
"reason {}; removing links {}",
new Object[] {HexString.toHexString(sw.getId()),
ps.getDesc().getPortNumber(),
ps.getReason(),
this.portLinks.get(tuple)});
}
eraseList.addAll(this.portLinks.get(tuple));
deleteLinks(eraseList, "Port Status Changed");
topologyChanged = true;
link_deleted = true;
}
} else if (ps.getReason() ==
(byte)OFPortReason.OFPPR_MODIFY.ordinal()) {
// If ps is a port modification and the port state has changed
// that affects links in the topology
if (this.portLinks.containsKey(tuple)) {
for (LinkTuple link: this.portLinks.get(tuple)) {
LinkInfo linkInfo = links.get(link);
assert(linkInfo != null);
Integer updatedSrcPortState = null;
Integer updatedDstPortState = null;
if (link.getSrc().equals(tuple) &&
(linkInfo.getSrcPortState() !=
ps.getDesc().getState())) {
updatedSrcPortState = ps.getDesc().getState();
linkInfo.setSrcPortState(updatedSrcPortState);
}
if (link.getDst().equals(tuple) &&
(linkInfo.getDstPortState() !=
ps.getDesc().getState())) {
updatedDstPortState = ps.getDesc().getState();
linkInfo.setDstPortState(updatedDstPortState);
}
if ((updatedSrcPortState != null) ||
(updatedDstPortState != null)) {
writeLink(link, linkInfo);
topologyChanged = true;
}
}
}
}
if (topologyChanged) {
//updateClusters();
} else {
if (log.isDebugEnabled()) {
log.debug("handlePortStatus: Switch {} port #{} reason {};"+
" no links to update/remove",
new Object[] {HexString.toHexString(sw.getId()),
ps.getDesc().getPortNumber(),
ps.getReason()});
}
}
} finally {
lock.writeLock().unlock();
}
if (!link_deleted) {
// Send LLDP right away when port state is changed for faster
// cluster-merge. If it is a link delete then there is not need
// to send the LLDPs right away and instead we wait for the LLDPs
// to be sent on the timer as it is normally done
sendLLDPs(); // do it outside the write-lock
}
return Command.CONTINUE;
}
/**
* We send out LLDP messages when a switch is added to discover the topology
* @param sw The IOFSwitch that connected to the controller
*/
@Override
public void addedSwitch(IOFSwitch sw) {
// It's probably overkill to send LLDP from all switches, but we don't
// know which switches might be connected to the new switch.
// Need to optimize when supporting a large number of switches.
sendLLDPs();
// Update event history
evHistTopoSwitch(sw, EvAction.SWITCH_CONNECTED, "None");
}
/**
* When a switch disconnects we remove any links from our map and re-compute
* the topology.
* @param sw The IOFSwitch that disconnected from the controller
*/
@Override
public void removedSwitch(IOFSwitch sw) {
// Update event history
evHistTopoSwitch(sw, EvAction.SWITCH_DISCONNECTED, "None");
List<LinkTuple> eraseList = new ArrayList<LinkTuple>();
lock.writeLock().lock();
try {
if (switchLinks.containsKey(sw)) {
if (log.isDebugEnabled()) {
log.debug("Handle switchRemoved. Switch {}; removing links {}",
sw, switchLinks.get(sw));
}
// add all tuples with an endpoint on this switch to erase list
eraseList.addAll(switchLinks.get(sw));
deleteLinks(eraseList, "Switch Removed");
//updateClusters();
}
} finally {
lock.writeLock().unlock();
}
}
/**
* Iterates though @SwitchCluster links and then deletes ones
* that have timed out. The timeout is set by lldpTimeout.
* If links are deleted updateClusters() is then called.
*/
protected void timeoutLinks() {
List<LinkTuple> eraseList = new ArrayList<LinkTuple>();
Long curTime = System.currentTimeMillis();
boolean linkChanged = false;
// reentrant required here because deleteLink also write locks
lock.writeLock().lock();
try {
Iterator<Entry<LinkTuple, LinkInfo>> it =
this.links.entrySet().iterator();
while (it.hasNext()) {
Entry<LinkTuple, LinkInfo> entry = it.next();
LinkTuple lt = entry.getKey();
LinkInfo info = entry.getValue();
// Timeout the unicast and multicast LLDP valid times
// independently.
if ((info.getUnicastValidTime() != null) &&
(info.getUnicastValidTime() + this.lldpTimeout < curTime)){
info.setUnicastValidTime(null);
if (info.getMulticastValidTime() != null)
addLinkToBroadcastDomain(lt);
// Note that even if mTime becomes null later on,
// the link would be deleted, which would trigger updateClusters().
linkChanged = true;
}
if ((info.getMulticastValidTime()!= null) &&
(info.getMulticastValidTime()+ this.lldpTimeout < curTime)) {
info.setMulticastValidTime(null);
// if uTime is not null, then link will remain as openflow
// link. If uTime is null, it will be deleted. So, we
// don't care about linkChanged flag here.
removeLinkFromBroadcastDomain(lt);
linkChanged = true;
}
// Add to the erase list only if the unicast
// time is null.
if (info.getUnicastValidTime() == null &&
info.getMulticastValidTime() == null){
eraseList.add(entry.getKey());
} else if (linkChanged) {
updates.add(new LDUpdate(lt, info.getSrcPortState(),
info.getDstPortState(),
getLinkType(lt, info),
UpdateOperation.ADD_OR_UPDATE));
}
}
// if any link was deleted or any link was changed.
if ((eraseList.size() > 0) || linkChanged) {
deleteLinks(eraseList, "LLDP timeout");
//updateClusters();
}
} finally {
lock.writeLock().unlock();
}
}
private boolean portEnabled(OFPhysicalPort port) {
if (port == null)
return false;
if ((OFPortConfig.OFPPC_PORT_DOWN.getValue() & port.getConfig()) > 0)
return false;
if ((OFPortState.OFPPS_LINK_DOWN.getValue() & port.getState()) > 0)
return false;
// Port STP state doesn't work with multiple VLANs, so ignore it for now
//if ((port.getState() & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue())
// return false;
return true;
}
@Override
public LinkInfo getLinkInfo(SwitchPortTuple idPort, boolean isSrcPort) {
Set<LinkTuple> links = this.portLinks.get(idPort);
if (links == null) {
return null;
}
LinkTuple retLink = null;
for (LinkTuple link : links) {
if (log.isTraceEnabled()) {
log.trace("getLinkInfo: check link {} against swPortTuple {}",
link, idPort);
}
if (link.getSrc().equals(idPort) && isSrcPort) {
retLink = link;
} else if (link.getDst().equals(idPort) && !isSrcPort) {
retLink = link;
}
}
LinkInfo linkInfo = null;
if (retLink != null) {
linkInfo = this.links.get(retLink);
} else {
if (log.isDebugEnabled()) {
log.debug("getLinkInfo: No link out of {} links is from port {}, "+
"isSrcPort {}",
new Object[] {links.size(), idPort, isSrcPort});
}
}
return linkInfo;
}
public Map<SwitchPortTuple, Set<LinkTuple>> getPortBroadcastDomainLinks() {
return portBroadcastDomainLinks;
}
@Override
public Map<LinkTuple, LinkInfo> getLinks() {
lock.readLock().lock();
Map<LinkTuple, LinkInfo> result;
try {
result = new HashMap<LinkTuple, LinkInfo>(links);
} finally {
lock.readLock().unlock();
}
return result;
}
protected void addLinkToBroadcastDomain(LinkTuple lt) {
if (!portBroadcastDomainLinks.containsKey(lt.getSrc()))
portBroadcastDomainLinks.put(lt.getSrc(), new HashSet<LinkTuple>());
portBroadcastDomainLinks.get(lt.getSrc()).add(lt);
if (!portBroadcastDomainLinks.containsKey(lt.getDst()))
portBroadcastDomainLinks.put(lt.getDst(), new HashSet<LinkTuple>());
portBroadcastDomainLinks.get(lt.getDst()).add(lt);
}
protected void removeLinkFromBroadcastDomain(LinkTuple lt) {
if (portBroadcastDomainLinks.containsKey(lt.getSrc())) {
portBroadcastDomainLinks.get(lt.getSrc()).remove(lt);
if (portBroadcastDomainLinks.get(lt.getSrc()).isEmpty())
portBroadcastDomainLinks.remove(lt.getSrc());
}
if (portBroadcastDomainLinks.containsKey(lt.getDst())) {
portBroadcastDomainLinks.get(lt.getDst()).remove(lt);
if (portBroadcastDomainLinks.get(lt.getDst()).isEmpty())
portBroadcastDomainLinks.remove(lt.getDst());
}
}
// STORAGE METHODS
/**
* Deletes all links from storage
*/
void clearAllLinks() {
storageSource.deleteRowsAsync(LINK_TABLE_NAME, null);
}
/**
* Gets the storage key for a LinkTuple
* @param lt The LinkTuple to get
* @return The storage key as a String
*/
private String getLinkId(LinkTuple lt) {
String srcDpid = lt.getSrc().getSw().getStringId();
String dstDpid = lt.getDst().getSw().getStringId();
return srcDpid + "-" + lt.getSrc().getPort() + "-" +
dstDpid + "-" + lt.getDst().getPort();
}
/**
* Writes a LinkTuple and corresponding LinkInfo to storage
* @param lt The LinkTuple to write
* @param linkInfo The LinkInfo to write
*/
void writeLink(LinkTuple lt, LinkInfo linkInfo) {
Map<String, Object> rowValues = new HashMap<String, Object>();
String id = getLinkId(lt);
rowValues.put(LINK_ID, id);
rowValues.put(LINK_VALID_TIME, linkInfo.getUnicastValidTime());
String srcDpid = lt.getSrc().getSw().getStringId();
rowValues.put(LINK_SRC_SWITCH, srcDpid);
rowValues.put(LINK_SRC_PORT, lt.getSrc().getPort());
LinkType type = (this.getLinkType(lt, linkInfo));
if (type == LinkType.DIRECT_LINK)
rowValues.put(LINK_TYPE, "internal");
else if (type == LinkType.MULTIHOP_LINK)
rowValues.put(LINK_TYPE, "external");
else if (type == LinkType.TUNNEL)
rowValues.put(LINK_TYPE, "tunnel");
if (linkInfo.linkStpBlocked()) {
if (log.isTraceEnabled()) {
log.trace("writeLink, link {}, info {}, srcPortState Blocked",
lt, linkInfo);
}
rowValues.put(LINK_SRC_PORT_STATE,
OFPhysicalPort.OFPortState.OFPPS_STP_BLOCK.getValue());
} else {
if (log.isTraceEnabled()) {
log.trace("writeLink, link {}, info {}, srcPortState {}",
new Object[]{ lt, linkInfo, linkInfo.getSrcPortState() });
}
rowValues.put(LINK_SRC_PORT_STATE, linkInfo.getSrcPortState());
}
String dstDpid = lt.getDst().getSw().getStringId();
rowValues.put(LINK_DST_SWITCH, dstDpid);
rowValues.put(LINK_DST_PORT, lt.getDst().getPort());
if (linkInfo.linkStpBlocked()) {
if (log.isTraceEnabled()) {
log.trace("writeLink, link {}, info {}, dstPortState Blocked",
lt, linkInfo);
}
rowValues.put(LINK_DST_PORT_STATE,
OFPhysicalPort.OFPortState.OFPPS_STP_BLOCK.getValue());
} else {
if (log.isTraceEnabled()) {
log.trace("writeLink, link {}, info {}, dstPortState {}",
new Object[]{ lt, linkInfo, linkInfo.getDstPortState() });
}
rowValues.put(LINK_DST_PORT_STATE, linkInfo.getDstPortState());
}
storageSource.updateRowAsync(LINK_TABLE_NAME, rowValues);
}
public Long readLinkValidTime(LinkTuple lt) {
// FIXME: We're not currently using this right now, but if we start
// to use this again, we probably shouldn't use it in its current
// form, because it's doing synchronous storage calls. Depending
// on the context this may still be OK, but if it's being called
// on the packet in processing thread it should be reworked to
// use asynchronous storage calls.
Long validTime = null;
IResultSet resultSet = null;
try {
String[] columns = { LINK_VALID_TIME };
String id = getLinkId(lt);
resultSet = storageSource.executeQuery(LINK_TABLE_NAME, columns,
new OperatorPredicate(LINK_ID, OperatorPredicate.Operator.EQ, id), null);
if (resultSet.next())
validTime = resultSet.getLong(LINK_VALID_TIME);
}
finally {
if (resultSet != null)
resultSet.close();
}
return validTime;
}
public void writeLinkInfo(LinkTuple lt, LinkInfo linkInfo) {
if (linkInfo.getUnicastValidTime() != null) {
Map<String, Object> rowValues = new HashMap<String, Object>();
String id = getLinkId(lt);
rowValues.put(LINK_ID, id);
//LinkInfo linkInfo = links.get(lt);
if (linkInfo.getUnicastValidTime() != null)
rowValues.put(LINK_VALID_TIME, linkInfo.getUnicastValidTime());
if (linkInfo.getSrcPortState() != null) {
if (linkInfo != null && linkInfo.linkStpBlocked()) {
if (log.isTraceEnabled()) {
log.trace("writeLinkInfo, link {}, info {}, srcPortState Blocked",
lt, linkInfo);
}
rowValues.put(LINK_SRC_PORT_STATE,
OFPhysicalPort.OFPortState.OFPPS_STP_BLOCK.getValue());
} else {
if (log.isTraceEnabled()) {
log.trace("writeLinkInfo, link {}, info {}",
new Object[]{ lt, linkInfo});
}
rowValues.put(LINK_SRC_PORT_STATE, linkInfo.getSrcPortState());
}
}
if (linkInfo.getDstPortState() != null) {
if (linkInfo != null && linkInfo.linkStpBlocked()) {
if (log.isTraceEnabled()) {
log.trace("writeLinkInfo, link {}, info {}, dstPortState Blocked",
lt, linkInfo);
}
rowValues.put(LINK_DST_PORT_STATE,
OFPhysicalPort.OFPortState.OFPPS_STP_BLOCK.getValue());
} else {
if (log.isTraceEnabled()) {
log.trace("writeLinkInfo, link {}, info {}",
new Object[]{ lt, linkInfo});
}
rowValues.put(LINK_DST_PORT_STATE, linkInfo.getDstPortState());
}
}
storageSource.updateRowAsync(LINK_TABLE_NAME, id, rowValues);
}
}
/**
* Removes a link from storage using an asynchronous call.
* @param lt The LinkTuple to delete.
*/
void removeLinkFromStorage(LinkTuple lt) {
String id = getLinkId(lt);
storageSource.deleteRowAsync(LINK_TABLE_NAME, id);
}
@Override
public void addListener(ILinkDiscoveryListener listener) {
linkDiscoveryAware.add(listener);
}
/**
* Register a link discovery aware component
* @param linkDiscoveryAwareComponent
*/
public void addLinkDiscoveryAware(ILinkDiscoveryListener linkDiscoveryAwareComponent) {
// TODO make this a copy on write set or lock it somehow
this.linkDiscoveryAware.add(linkDiscoveryAwareComponent);
}
/**
* Deregister a link discovery aware component
* @param linkDiscoveryAwareComponent
*/
public void removeLinkDiscoveryAware(ILinkDiscoveryListener linkDiscoveryAwareComponent) {
// TODO make this a copy on write set or lock it somehow
this.linkDiscoveryAware.remove(linkDiscoveryAwareComponent);
}
/**
* Sets the IStorageSource to use for ITology
* @param storageSource the storage source to use
*/
public void setStorageSource(IStorageSourceService storageSource) {
this.storageSource = storageSource;
}
/**
* Gets the storage source for this ITopology
* @return The IStorageSource ITopology is writing to
*/
public IStorageSourceService getStorageSource() {
return storageSource;
}
/**
* @param routingEngine the storage source to use for persisting link info
*/
public void setRoutingEngine(IRoutingService routingEngine) {
this.routingEngine = routingEngine;
}
@Override
public boolean isCallbackOrderingPrereq(OFType type, String name) {
return false;
}
@Override
public boolean isCallbackOrderingPostreq(OFType type, String name) {
return false;
}
@Override
public void rowsModified(String tableName, Set<Object> rowKeys) {
Map<Long, IOFSwitch> switches = floodlightProvider.getSwitches();
ArrayList<IOFSwitch> updated_switches = new ArrayList<IOFSwitch>();
for(Object key: rowKeys) {
Long swId = new Long(HexString.toLong((String)key));
if (switches.containsKey(swId)) {
IOFSwitch sw = switches.get(swId);
boolean curr_status = sw.hasAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH);
boolean new_status = false;
IResultSet resultSet = null;
try {
resultSet = storageSource.getRow(tableName, key);
for (Iterator<IResultSet> it = resultSet.iterator(); it.hasNext();) {
// In case of multiple rows, use the status in last row?
Map<String, Object> row = it.next().getRow();
if (row.containsKey(SWITCH_CORE_SWITCH)) {
new_status = ((String)row.get(SWITCH_CORE_SWITCH)).equals("true");
}
}
}
finally {
if (resultSet != null)
resultSet.close();
}
if (curr_status != new_status) {
updated_switches.add(sw);
}
} else {
if (log.isDebugEnabled()) {
log.debug("Update for switch which has no entry in switch " +
"list (dpid={}), a delete action.", (String)key);
}
}
}
for (IOFSwitch sw : updated_switches) {
// Set SWITCH_IS_CORE_SWITCH to it's inverse value
if (sw.hasAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH)) {
sw.removeAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH);
if (log.isDebugEnabled()) {
log.debug("SWITCH_IS_CORE_SWITCH set to False for {}", sw);
}
updates.add(new LDUpdate(sw.getId(), SwitchType.BASIC_SWITCH));
}
else {
sw.setAttribute(IOFSwitch.SWITCH_IS_CORE_SWITCH, new Boolean(true));
if (log.isDebugEnabled()) {
log.debug("SWITCH_IS_CORE_SWITCH set to True for {}", sw);
}
updates.add(new LDUpdate(sw.getId(), SwitchType.CORE_SWITCH));
}
}
}
@Override
public void rowsDeleted(String tableName, Set<Object> rowKeys) {
// Ignore delete events, the switch delete will do the right thing on it's own
}
// IFloodlightModule classes
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(ILinkDiscoveryService.class);
//l.add(ITopologyService.class);
return l;
}
@Override
public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
Map<Class<? extends IFloodlightService>,
IFloodlightService> m =
new HashMap<Class<? extends IFloodlightService>,
IFloodlightService>();
// We are the class that implements the service
m.put(ILinkDiscoveryService.class, this);
//m.put(ITopologyService.class, this);
return m;
}
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
Collection<Class<? extends IFloodlightService>> l =
new ArrayList<Class<? extends IFloodlightService>>();
l.add(IFloodlightProviderService.class);
l.add(IStorageSourceService.class);
l.add(IRoutingService.class);
l.add(IRestApiService.class);
l.add(IThreadPoolService.class);
return l;
}
@Override
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
storageSource = context.getServiceImpl(IStorageSourceService.class);
routingEngine = context.getServiceImpl(IRoutingService.class);
restApi = context.getServiceImpl(IRestApiService.class);
threadPool = context.getServiceImpl(IThreadPoolService.class);
// We create this here because there is no ordering guarantee
this.linkDiscoveryAware = new ArrayList<ILinkDiscoveryListener>();
this.lock = new ReentrantReadWriteLock();
this.updates = new LinkedBlockingQueue<LDUpdate>();
this.links = new HashMap<LinkTuple, LinkInfo>();
this.portLinks = new HashMap<SwitchPortTuple, Set<LinkTuple>>();
this.portBroadcastDomainLinks = new HashMap<SwitchPortTuple, Set<LinkTuple>>();
this.switchLinks = new HashMap<IOFSwitch, Set<LinkTuple>>();
this.evHistTopologySwitch =
new EventHistory<EventHistoryTopologySwitch>("Topology: Switch");
this.evHistTopologyLink =
new EventHistory<EventHistoryTopologyLink>("Topology: Link");
this.evHistTopologyCluster =
new EventHistory<EventHistoryTopologyCluster>("Topology: Cluster");
}
@Override
public void startUp(FloodlightModuleContext context) {
// Create our storage tables
storageSource.createTable(LINK_TABLE_NAME, null);
storageSource.setTablePrimaryKeyName(LINK_TABLE_NAME, LINK_ID);
storageSource.createTable(LINK_TABLE_NAME, null);
storageSource.setTablePrimaryKeyName(LINK_TABLE_NAME, LINK_ID);
// Register for storage updates for the switch table
try {
storageSource.addListener(SWITCH_TABLE_NAME, this);
} catch (StorageException ex) {
log.error("Error in installing listener for switch table - {}", SWITCH_TABLE_NAME);
}
ScheduledExecutorService ses = threadPool.getScheduledExecutor();
// Setup sending out LLDPs
Runnable lldpSendTimer = new Runnable() {
@Override
public void run() {
try {
sendLLDPs();
if (!shuttingDown) {
ScheduledExecutorService ses =
threadPool.getScheduledExecutor();
ses.schedule(this, lldpFrequency,
TimeUnit.MILLISECONDS);
}
} catch (StorageException e) {
log.error("Storage exception in LLDP send timer; " +
"terminating process", e);
floodlightProvider.terminate();
} catch (Exception e) {
log.error("Exception in LLDP send timer", e);
}
}
};
ses.schedule(lldpSendTimer, 1000, TimeUnit.MILLISECONDS);
Runnable timeoutLinksTimer = new Runnable() {
@Override
public void run() {
if (log.isTraceEnabled()) {
log.trace("Running timeoutLinksTimer");
}
try {
timeoutLinks();
if (!shuttingDown) {
ScheduledExecutorService ses =
threadPool.getScheduledExecutor();
ses.schedule(this, lldpTimeout, TimeUnit.MILLISECONDS);
}
} catch (StorageException e) {
log.error("Storage exception in link timer; " +
"terminating process", e);
floodlightProvider.terminate();
} catch (Exception e) {
log.error("Exception in timeoutLinksTimer", e);
}
}
};
ses.schedule(timeoutLinksTimer, 1000, TimeUnit.MILLISECONDS);
updatesThread = new Thread(new Runnable () {
@Override
public void run() {
while (true) {
try {
doUpdatesThread();
} catch (InterruptedException e) {
return;
}
}
}}, "Topology Updates");
updatesThread.start();
// Register for the OpenFlow messages we want to receive
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
floodlightProvider.addOFMessageListener(OFType.PORT_STATUS, this);
// Register for switch updates
floodlightProvider.addOFSwitchListener(this);
floodlightProvider.addInfoProvider("summary", this);
// init our rest api
if (restApi != null) {
restApi.addRestletRoutable(new TopologyWebRoutable());
} else {
log.error("Could not instantiate REST API");
}
setControllerTLV();
}
// ****************************************************
// Topology Manager's Event History members and methods
// ****************************************************
// Topology Manager event history
public EventHistory<EventHistoryTopologySwitch> evHistTopologySwitch;
public EventHistory<EventHistoryTopologyLink> evHistTopologyLink;
public EventHistory<EventHistoryTopologyCluster> evHistTopologyCluster;
public EventHistoryTopologySwitch evTopoSwitch;
public EventHistoryTopologyLink evTopoLink;
public EventHistoryTopologyCluster evTopoCluster;
// Switch Added/Deleted
private void evHistTopoSwitch(IOFSwitch sw, EvAction actn, String reason) {
if (evTopoSwitch == null) {
evTopoSwitch = new EventHistoryTopologySwitch();
}
evTopoSwitch.dpid = sw.getId();
if ((sw.getChannel() != null) &&
(SocketAddress.class.isInstance(
sw.getChannel().getRemoteAddress()))) {
evTopoSwitch.ipv4Addr =
((InetSocketAddress)(sw.getChannel().
getRemoteAddress())).getAddress().getAddress();
evTopoSwitch.l4Port =
(short)(((InetSocketAddress)(sw.getChannel().
getRemoteAddress())).getPort());
} else {
byte[] zeroIpa = new byte[] {(byte)0, (byte)0, (byte)0, (byte)0};
evTopoSwitch.ipv4Addr = zeroIpa;
evTopoSwitch.l4Port = 0;
}
evTopoSwitch.reason = reason;
evTopoSwitch = evHistTopologySwitch.put(evTopoSwitch, actn);
}
private void evHistTopoLink(long srcDpid, long dstDpid, short srcPort,
short dstPort, int srcPortState, int dstPortState,
EvAction actn, String reason) {
if (evTopoLink == null) {
evTopoLink = new EventHistoryTopologyLink();
}
evTopoLink.srcSwDpid = srcDpid;
evTopoLink.dstSwDpid = dstDpid;
evTopoLink.srcSwport = srcPort;
evTopoLink.dstSwport = dstPort;
evTopoLink.srcPortState = srcPortState;
evTopoLink.dstPortState = dstPortState;
evTopoLink.reason = reason;
evTopoLink = evHistTopologyLink.put(evTopoLink, actn);
}
public void evHistTopoCluster(long dpid, long clusterIdOld,
long clusterIdNew, EvAction action, String reason) {
if (evTopoCluster == null) {
evTopoCluster = new EventHistoryTopologyCluster();
}
evTopoCluster.dpid = dpid;
evTopoCluster.clusterIdOld = clusterIdOld;
evTopoCluster.clusterIdNew = clusterIdNew;
evTopoCluster.reason = reason;
evTopoCluster = evHistTopologyCluster.put(evTopoCluster, action);
}
@Override
public Map<String, Object> getInfo(String type) {
if (!"summary".equals(type)) return null;
Map<String, Object> info = new HashMap<String, Object>();
int num_links = 0;
for (Set<LinkTuple> links : switchLinks.values())
num_links += links.size();
info.put("# inter-switch links", num_links / 2);
return info;
}
}