Skip to content
Snippets Groups Projects
Commit 22f44f01 authored by Gregor Maier's avatar Gregor Maier
Browse files

Remove switches from store after master transition that didn't reconnect.

parent 61a65b59
No related branches found
No related tags found
No related merge requests found
...@@ -42,6 +42,8 @@ import java.util.concurrent.CopyOnWriteArraySet; ...@@ -42,6 +42,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.FloodlightContext; import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.HAListenerTypeMarker; import net.floodlightcontroller.core.HAListenerTypeMarker;
import net.floodlightcontroller.core.IFloodlightProviderService; import net.floodlightcontroller.core.IFloodlightProviderService;
...@@ -63,6 +65,7 @@ import net.floodlightcontroller.core.util.ListenerDispatcher; ...@@ -63,6 +65,7 @@ import net.floodlightcontroller.core.util.ListenerDispatcher;
import net.floodlightcontroller.core.web.CoreWebRoutable; import net.floodlightcontroller.core.web.CoreWebRoutable;
import net.floodlightcontroller.counter.ICounterStoreService; import net.floodlightcontroller.counter.ICounterStoreService;
import net.floodlightcontroller.debugcounter.IDebugCounterService; import net.floodlightcontroller.debugcounter.IDebugCounterService;
import net.floodlightcontroller.devicemanager.internal.DeviceSyncRepresentation;
import net.floodlightcontroller.flowcache.IFlowCacheService; import net.floodlightcontroller.flowcache.IFlowCacheService;
import net.floodlightcontroller.packet.Ethernet; import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.perfmon.IPktInProcessingTimeService; import net.floodlightcontroller.perfmon.IPktInProcessingTimeService;
...@@ -87,6 +90,7 @@ import org.openflow.protocol.factory.BasicFactory; ...@@ -87,6 +90,7 @@ import org.openflow.protocol.factory.BasicFactory;
import org.openflow.protocol.statistics.OFDescriptionStatistics; import org.openflow.protocol.statistics.OFDescriptionStatistics;
import org.openflow.util.HexString; import org.openflow.util.HexString;
import org.openflow.vendor.nicira.OFNiciraVendorExtensions; import org.openflow.vendor.nicira.OFNiciraVendorExtensions;
import org.sdnplatform.sync.IClosableIterator;
import org.sdnplatform.sync.IStoreClient; import org.sdnplatform.sync.IStoreClient;
import org.sdnplatform.sync.IStoreListener; import org.sdnplatform.sync.IStoreListener;
import org.sdnplatform.sync.ISyncService; import org.sdnplatform.sync.ISyncService;
...@@ -157,6 +161,11 @@ public class Controller implements IFloodlightProviderService, ...@@ -157,6 +161,11 @@ public class Controller implements IFloodlightProviderService,
private RoleManager roleManager; private RoleManager roleManager;
private SwitchManager switchManager; private SwitchManager switchManager;
private static final int DEFAULT_CONSOLIDATE_STORE_TIME_DELAY_MS =
15*1000; // 15s
private int consolidateStoreTimeDelayMs =
DEFAULT_CONSOLIDATE_STORE_TIME_DELAY_MS;
// Flag to always flush flow table on switch reconnect (HA or otherwise) // Flag to always flush flow table on switch reconnect (HA or otherwise)
private boolean alwaysClearFlowsOnSwAdd = false; private boolean alwaysClearFlowsOnSwAdd = false;
...@@ -319,6 +328,9 @@ public class Controller implements IFloodlightProviderService, ...@@ -319,6 +328,9 @@ public class Controller implements IFloodlightProviderService,
roleChangeDescription); roleChangeDescription);
System.exit(0); System.exit(0);
} }
// At this point we are guaranteed that we will execute the code
// below exactly once during the lifetime of this process!
log.info("Received role request for {} (reason: {})." log.info("Received role request for {} (reason: {})."
+ " Initiating transition", role, roleChangeDescription); + " Initiating transition", role, roleChangeDescription);
...@@ -402,6 +414,15 @@ public class Controller implements IFloodlightProviderService, ...@@ -402,6 +414,15 @@ public class Controller implements IFloodlightProviderService,
public synchronized void setRole(Role role) { public synchronized void setRole(Role role) {
this.role = role; this.role = role;
Runnable consolidateStoreTask = new Runnable() {
@Override
public void run() {
consolidateStore();
}
};
Controller.this.ses.schedule(consolidateStoreTask,
consolidateStoreTimeDelayMs,
TimeUnit.MILLISECONDS);
} }
@LogMessageDoc(level="ERROR", @LogMessageDoc(level="ERROR",
...@@ -451,15 +472,18 @@ public class Controller implements IFloodlightProviderService, ...@@ -451,15 +472,18 @@ public class Controller implements IFloodlightProviderService,
// added. One could argue that a switchChanged notification // added. One could argue that a switchChanged notification
// might be more appropriate in this case.... // might be more appropriate in this case....
oldSw.cancelAllStatisticsReplies(); oldSw.cancelAllStatisticsReplies();
addUpdateToQueue(new SwitchUpdate(oldSw, SwitchUpdateType.REMOVED)); addUpdateToQueue(new SwitchUpdate(dpid,
SwitchUpdateType.REMOVED));
oldSw.disconnectOutputStream(); oldSw.disconnectOutputStream();
// Add the new switch and clear FlowMods // Add the new switch and clear FlowMods
// TODO: if this is the same switch re-connecting rather than // TODO: if this is the same switch re-connecting rather than
// a DPID collision it would make sense to not wipe the flow // a DPID collision it would make sense to not wipe the flow
// table. // table.
sw.clearAllFlowMods(); sw.clearAllFlowMods();
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.ADDED)); addUpdateToQueue(new SwitchUpdate(dpid,
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.ACTIVATED)); SwitchUpdateType.ADDED));
addUpdateToQueue(new SwitchUpdate(dpid,
SwitchUpdateType.ACTIVATED));
return; return;
} }
...@@ -471,15 +495,18 @@ public class Controller implements IFloodlightProviderService, ...@@ -471,15 +495,18 @@ public class Controller implements IFloodlightProviderService,
// TODO: if we switch was recently (seconds) connected we // TODO: if we switch was recently (seconds) connected we
// might decide to not wipe the flow table. // might decide to not wipe the flow table.
sw.clearAllFlowMods(); sw.clearAllFlowMods();
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.ADDED)); addUpdateToQueue(new SwitchUpdate(dpid,
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.ACTIVATED)); SwitchUpdateType.ADDED));
this.syncedSwitches.remove(sw.getId()); addUpdateToQueue(new SwitchUpdate(dpid,
SwitchUpdateType.ACTIVATED));
this.syncedSwitches.remove(dpid);
} else { } else {
// FIXME: switch was in store. check if ports or anything else // FIXME: switch was in store. check if ports or anything else
// has changed and send update. // has changed and send update.
if (alwaysClearFlowsOnSwAdd) if (alwaysClearFlowsOnSwAdd)
sw.clearAllFlowMods(); sw.clearAllFlowMods();
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.ACTIVATED)); addUpdateToQueue(new SwitchUpdate(dpid,
SwitchUpdateType.ACTIVATED));
sendNotificationsIfSwitchDiffers(oldSw, sw); sendNotificationsIfSwitchDiffers(oldSw, sw);
} }
} }
...@@ -492,7 +519,7 @@ public class Controller implements IFloodlightProviderService, ...@@ -492,7 +519,7 @@ public class Controller implements IFloodlightProviderService,
IOFSwitch oldSw = syncedSwitches.put(dpid, sw); IOFSwitch oldSw = syncedSwitches.put(dpid, sw);
if (oldSw == null) { if (oldSw == null) {
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.ADDED)); addUpdateToQueue(new SwitchUpdate(dpid, SwitchUpdateType.ADDED));
} else { } else {
// The switch already exists in storage, see if anything // The switch already exists in storage, see if anything
// has changed // has changed
...@@ -520,13 +547,14 @@ public class Controller implements IFloodlightProviderService, ...@@ -520,13 +547,14 @@ public class Controller implements IFloodlightProviderService,
} }
log.debug("removeSwitch {}", sw); log.debug("removeSwitch {}", sw);
this.activeSwitches.remove(sw.getId()); this.activeSwitches.remove(sw.getId());
removeSwitchFromStore(sw); removeSwitchFromStore(sw.getId());
// We cancel all outstanding statistics replies if the switch transition // We cancel all outstanding statistics replies if the switch transition
// from active. In the future we might allow statistics requests // from active. In the future we might allow statistics requests
// from slave controllers. Then we need to move this cancelation // from slave controllers. Then we need to move this cancelation
// to switch disconnect // to switch disconnect
sw.cancelAllStatisticsReplies(); sw.cancelAllStatisticsReplies();
addUpdateToQueue(new SwitchUpdate(sw, SwitchUpdateType.REMOVED)); addUpdateToQueue(new SwitchUpdate(sw.getId(),
SwitchUpdateType.REMOVED));
} }
private synchronized void addSwitchToStore(IOFSwitch sw) { private synchronized void addSwitchToStore(IOFSwitch sw) {
...@@ -544,11 +572,14 @@ public class Controller implements IFloodlightProviderService, ...@@ -544,11 +572,14 @@ public class Controller implements IFloodlightProviderService,
} }
} }
private synchronized void removeSwitchFromStore(IOFSwitch sw) { private synchronized void removeSwitchFromStore(long dpid) {
try { try {
storeClient.delete(sw.getId()); storeClient.delete(dpid);
} catch (SyncException e) { } catch (SyncException e) {
log.error("Could not remove switch " + sw.getStringId() + // ObsoleteVerisonException can't happend because all
// store modifications are synchronized
log.error("Could not remove switch " +
HexString.toHexString(dpid) +
" from sync store:", e); " from sync store:", e);
} }
} }
...@@ -562,12 +593,46 @@ public class Controller implements IFloodlightProviderService, ...@@ -562,12 +593,46 @@ public class Controller implements IFloodlightProviderService,
new HashSet<OFPhysicalPort>(sw1.getPorts()); new HashSet<OFPhysicalPort>(sw1.getPorts());
if (! sw1Ports.equals(sw2Ports)) { if (! sw1Ports.equals(sw2Ports)) {
addUpdateToQueue( addUpdateToQueue(
new SwitchUpdate(sw2, SwitchUpdateType.PORTCHANGED)); new SwitchUpdate(sw2.getId(),
SwitchUpdateType.PORTCHANGED));
} }
if (false) { if (false) {
// FIXME: IF ANYTHING ELSE HAS CHANGED // FIXME: IF ANYTHING ELSE HAS CHANGED
addUpdateToQueue( addUpdateToQueue(
new SwitchUpdate(sw2, SwitchUpdateType.OTHERCHANGE)); new SwitchUpdate(sw2.getId(),
SwitchUpdateType.OTHERCHANGE));
}
}
/**
* Remove all entries from the store that don't correspond to an
* active switch.
* TODO: is it a problem that this is fully synchronized
*/
private synchronized void consolidateStore() {
if (role == Role.SLAVE)
return;
this.syncedSwitches.clear();
IClosableIterator<Map.Entry<Long,Versioned<SwitchSyncRepresentation>>>
iter = null;
try {
iter = storeClient.entries();
} catch (SyncException e) {
log.error("Failed to read switches from sync store", e);
return;
}
try {
while(iter.hasNext()) {
Entry<Long, Versioned<SwitchSyncRepresentation>> entry =
iter.next();
if (!this.activeSwitches.contains(entry.getKey())) {
removeSwitchFromStore(entry.getKey());
//addUpdateToQueue(new SwitchUpdate(sw, switchUpdateType))
}
}
} finally {
if (iter != null)
iter.close();
} }
} }
...@@ -600,8 +665,6 @@ public class Controller implements IFloodlightProviderService, ...@@ -600,8 +665,6 @@ public class Controller implements IFloodlightProviderService,
return sw; return sw;
return this.syncedSwitches.get(dpid); return this.syncedSwitches.get(dpid);
} }
} }
...@@ -626,38 +689,38 @@ public class Controller implements IFloodlightProviderService, ...@@ -626,38 +689,38 @@ public class Controller implements IFloodlightProviderService,
* Update message indicating a switch was added or removed * Update message indicating a switch was added or removed
*/ */
class SwitchUpdate implements IUpdate { class SwitchUpdate implements IUpdate {
public IOFSwitch sw; public long swId;
public SwitchUpdateType switchUpdateType; public SwitchUpdateType switchUpdateType;
public SwitchUpdate(IOFSwitch sw, SwitchUpdateType switchUpdateType) { public SwitchUpdate(long swId, SwitchUpdateType switchUpdateType) {
this.sw = sw; this.swId = swId;
this.switchUpdateType = switchUpdateType; this.switchUpdateType = switchUpdateType;
} }
@Override @Override
public void dispatch() { public void dispatch() {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("Dispatching switch update {} {}", log.trace("Dispatching switch update {} {}",
sw, switchUpdateType); HexString.toHexString(swId), switchUpdateType);
} }
if (switchListeners != null) { if (switchListeners != null) {
for (IOFSwitchListener listener : switchListeners) { for (IOFSwitchListener listener : switchListeners) {
switch(switchUpdateType) { switch(switchUpdateType) {
case ADDED: case ADDED:
listener.switchAdded(sw.getId()); listener.switchAdded(swId);
break; break;
case REMOVED: case REMOVED:
listener.switchRemoved(sw.getId()); listener.switchRemoved(swId);
break; break;
case PORTCHANGED: case PORTCHANGED:
listener.switchPortChanged(sw.getId()); listener.switchPortChanged(swId);
break; break;
case ACTIVATED: case ACTIVATED:
listener.switchActivated(sw.getId()); listener.switchActivated(swId);
break; break;
case DEACTIVATED: case DEACTIVATED:
// ignore // ignore
break; break;
case OTHERCHANGE: case OTHERCHANGE:
listener.switchChanged(sw.getId()); listener.switchChanged(swId);
break; break;
} }
} }
...@@ -807,7 +870,8 @@ public class Controller implements IFloodlightProviderService, ...@@ -807,7 +870,8 @@ public class Controller implements IFloodlightProviderService,
* @param sw * @param sw
*/ */
protected void notifyPortChanged(IOFSwitch sw) { protected void notifyPortChanged(IOFSwitch sw) {
SwitchUpdate update = new SwitchUpdate(sw, SwitchUpdateType.PORTCHANGED); SwitchUpdate update = new SwitchUpdate(sw.getId(),
SwitchUpdateType.PORTCHANGED);
addUpdateToQueue(update); addUpdateToQueue(update);
} }
...@@ -1613,6 +1677,14 @@ public class Controller implements IFloodlightProviderService, ...@@ -1613,6 +1677,14 @@ public class Controller implements IFloodlightProviderService,
} }
} }
/**
* FOR TESTING ONLY
* @param update
*/
void setConsolidateStoreTaskDelay(int consolidateStoreTaskDelayMs) {
this.consolidateStoreTimeDelayMs = consolidateStoreTaskDelayMs;
}
@LogMessageDoc(level="WARN", @LogMessageDoc(level="WARN",
message="Failure adding update {} to queue", message="Failure adding update {} to queue",
explanation="The controller tried to add an internal notification" + explanation="The controller tried to add an internal notification" +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment