From e31f062bc52ff79b86a1bf2bd3a0ffed97795f8c Mon Sep 17 00:00:00 2001 From: Gregor Maier <gregor.maier@bigswitch.com> Date: Mon, 23 Jan 2012 11:55:04 -0800 Subject: [PATCH] Fix continouing problems when switches with same DPID connect. After a switch is asynchronously removed due to another switch with the same DPID connecting, the first switch might still have messages qeued. If we process these messages after removing the switch switch listeners can get confused. [#23382219] --- .../floodlightcontroller/core/IOFSwitch.java | 18 ++ .../core/internal/Controller.java | 236 ++++++++++-------- .../core/internal/OFSwitchImpl.java | 14 ++ 3 files changed, 160 insertions(+), 108 deletions(-) diff --git a/src/main/java/net/floodlightcontroller/core/IOFSwitch.java b/src/main/java/net/floodlightcontroller/core/IOFSwitch.java index e1ba7ef84..023769ee4 100644 --- a/src/main/java/net/floodlightcontroller/core/IOFSwitch.java +++ b/src/main/java/net/floodlightcontroller/core/IOFSwitch.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; import net.floodlightcontroller.core.types.MacVlanPair; import net.floodlightcontroller.util.TimedCache; @@ -243,12 +244,14 @@ public interface IOFSwitch { /** * Check if the switch is still connected; + * Only call while holding processMessageLock * @return whether the switch is still disconnected */ public boolean isConnected(); /** * Set whether the switch is connected + * Only call while holding modifySwitchLock * @param connected whether the switch is connected */ public void setConnected(boolean connected); @@ -304,4 +307,19 @@ public interface IOFSwitch { * @return */ public TimedCache<Long> getTimedCache(); + + /** + * Return a lock that need to be held while processing a message. Multiple threads + * can hold this lock. + * @return + */ + public Lock processMessageLock(); + + /** + * Return a lock that needs to be held while the switch is removed asynchronously, i.e., + * the removing is not triggered by events on this switch's channel. + * Mutex with processMessageLock + * @return + */ + public Lock asyncRemoveSwitchLock(); } diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java index a6b4197e8..6c9ef337c 100644 --- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java +++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java @@ -325,6 +325,7 @@ public class Controller @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { + // Can write to switch without holding processLock OFMessage m = factory.getMessage(OFType.ECHO_REQUEST); e.getChannel().write(m); } @@ -467,101 +468,109 @@ public class Controller */ protected void processOFMessage(OFMessage m) throws IOException, SwitchStateException { - switch (m.getType()) { - case HELLO: - log.debug("HELLO from {}", sw); - if (state.hsState.equals(HandshakeState.START)) { - state.hsState = HandshakeState.HELLO; - sendHelloConfiguration(); - } else { - throw new SwitchStateException("Unexpected HELLO from " + sw); - } - break; - case ECHO_REQUEST: - OFEchoReply reply = - (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); - reply.setXid(m.getXid()); - sw.write(reply, null); - break; - case ECHO_REPLY: - break; - case FEATURES_REPLY: - log.debug("Features Reply from {}", sw); - if (state.hsState.equals(HandshakeState.HELLO)) { - sw.setFeaturesReply((OFFeaturesReply) m); - sendFeatureReplyConfiguration(); - state.hsState = HandshakeState.FEATURES_REPLY; - // uncomment to enable "dumb" switches like cbench - // state.hsState = HandshakeState.READY; - // addSwitch(sw); - } else { - String em = "Unexpected FEATURES_REPLY from " + sw; - throw new SwitchStateException(em); - } - break; - case GET_CONFIG_REPLY: - if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) { - String em = "Unexpected GET_CONFIG_REPLY from " + sw; - throw new SwitchStateException(em); - } - OFGetConfigReply cr = (OFGetConfigReply) m; - if (cr.getMissSendLength() == (short)0xffff) { - log.debug("Config Reply from {} confirms " + - "miss length set to 0xffff", sw); - } else { - log.warn("Config Reply from {} has " + - "miss length set to {}", - sw, cr.getMissSendLength()); - } - state.hasGetConfigReply = true; - if (state.hasDescription && state.hasGetConfigReply) { - addSwitch(sw); - state.hsState = HandshakeState.READY; - } - break; - case ERROR: - OFError error = (OFError) m; - logError(sw, error); - break; - case STATS_REPLY: - if (state.hsState.ordinal() < - HandshakeState.FEATURES_REPLY.ordinal()) { - String em = "Unexpected STATS_REPLY from " + sw; - throw new SwitchStateException(em); - } - sw.deliverStatisticsReply(m); - if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) { - processSwitchDescReply(); - } - break; - /* - * "Trivial" server to test raw low-level throughput - case PACKET_IN: - OFPacketIn pi = (OFPacketIn)m; - - OFFlowMod fm = - (OFFlowMod)factory.getMessage(OFType.FLOW_MOD); - OFMatch match = new OFMatch(); - match.loadFromPacket(pi.getPacketData(), pi.getInPort()); - fm.setBufferId(pi.getBufferId()); - fm.setMatch(match); - sw.write(fm, null); - break; - */ - case PORT_STATUS: - boolean swadded = - state.hsState.equals(HandshakeState.READY); - handlePortStatusMessage(sw, (OFPortStatus)m, swadded); - // fall through - default: - if (!state.hsState.equals(HandshakeState.READY)) { - log.debug("Ignoring message type {} received " + - "from switch {} before switch is " + - "fully configured.", m.getType(), sw); + sw.processMessageLock().lock(); + try { + if (!sw.isConnected()) + return; + switch (m.getType()) { + case HELLO: + log.debug("HELLO from {}", sw); + if (state.hsState.equals(HandshakeState.START)) { + state.hsState = HandshakeState.HELLO; + sendHelloConfiguration(); + } else { + throw new SwitchStateException("Unexpected HELLO from " + sw); + } break; - } - handleMessage(sw, m, null); - break; + case ECHO_REQUEST: + OFEchoReply reply = + (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); + reply.setXid(m.getXid()); + sw.write(reply, null); + break; + case ECHO_REPLY: + break; + case FEATURES_REPLY: + log.debug("Features Reply from {}", sw); + if (state.hsState.equals(HandshakeState.HELLO)) { + sw.setFeaturesReply((OFFeaturesReply) m); + sendFeatureReplyConfiguration(); + state.hsState = HandshakeState.FEATURES_REPLY; + // uncomment to enable "dumb" switches like cbench + // state.hsState = HandshakeState.READY; + // addSwitch(sw); + } else { + String em = "Unexpected FEATURES_REPLY from " + sw; + throw new SwitchStateException(em); + } + break; + case GET_CONFIG_REPLY: + if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) { + String em = "Unexpected GET_CONFIG_REPLY from " + sw; + throw new SwitchStateException(em); + } + OFGetConfigReply cr = (OFGetConfigReply) m; + if (cr.getMissSendLength() == (short)0xffff) { + log.debug("Config Reply from {} confirms " + + "miss length set to 0xffff", sw); + } else { + log.warn("Config Reply from {} has " + + "miss length set to {}", + sw, cr.getMissSendLength()); + } + state.hasGetConfigReply = true; + if (state.hasDescription && state.hasGetConfigReply) { + addSwitch(sw); + state.hsState = HandshakeState.READY; + } + break; + case ERROR: + OFError error = (OFError) m; + logError(sw, error); + break; + case STATS_REPLY: + if (state.hsState.ordinal() < + HandshakeState.FEATURES_REPLY.ordinal()) { + String em = "Unexpected STATS_REPLY from " + sw; + throw new SwitchStateException(em); + } + sw.deliverStatisticsReply(m); + if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) { + processSwitchDescReply(); + } + break; + /* + * "Trivial" server to test raw low-level throughput + case PACKET_IN: + OFPacketIn pi = (OFPacketIn)m; + + OFFlowMod fm = + (OFFlowMod)factory.getMessage(OFType.FLOW_MOD); + OFMatch match = new OFMatch(); + match.loadFromPacket(pi.getPacketData(), pi.getInPort()); + fm.setBufferId(pi.getBufferId()); + fm.setMatch(match); + sw.write(fm, null); + break; + */ + case PORT_STATUS: + boolean swadded = + state.hsState.equals(HandshakeState.READY); + handlePortStatusMessage(sw, (OFPortStatus)m, swadded); + // fall through + default: + if (!state.hsState.equals(HandshakeState.READY)) { + log.debug("Ignoring message type {} received " + + "from switch {} before switch is " + + "fully configured.", m.getType(), sw); + break; + } + handleMessage(sw, m, null); + break; + } + } + finally { + sw.processMessageLock().unlock(); } } } @@ -874,6 +883,8 @@ public class Controller * @param sw the new switch */ protected void addSwitch(IOFSwitch sw) { + // TODO: is it save to modify the HashMap without holding + // the old switch's lock IOFSwitch oldSw = this.switches.put(sw.getId(), sw); if (sw == oldSw) { // Note == for object equality, not .equals for value @@ -884,21 +895,27 @@ public class Controller log.info("Switch handshake successful: {}", sw); if (oldSw != null) { - log.error("New switch connection {} for already-connected switch {}", - sw, oldSw); - oldSw.setConnected(false); - updateInactiveSwitchInfo(oldSw); - - // we need to clean out old switch state definitively - // before adding the new switch - if (switchListeners != null) { - for (IOFSwitchListener listener : switchListeners) { - listener.removedSwitch(oldSw); + oldSw.asyncRemoveSwitchLock().lock(); + try { + log.error("New switch connection {} for already-connected switch {}", + sw, oldSw); + oldSw.setConnected(false); + updateInactiveSwitchInfo(oldSw); + + // we need to clean out old switch state definitively + // before adding the new switch + if (switchListeners != null) { + for (IOFSwitchListener listener : switchListeners) { + listener.removedSwitch(oldSw); + } } + // will eventually trigger a removeSwitch(), which will cause + // a "Not removing Switch ... already removed debug message. + oldSw.getChannel().close(); + } + finally { + oldSw.asyncRemoveSwitchLock().unlock(); } - // will eventually trigger a removeSwitch(), which will cause - // a "Not removing Switch ... already removed debug message. - oldSw.getChannel().close(); } updateActiveSwitchInfo(sw); @@ -915,12 +932,15 @@ public class Controller * @param sw the switch that has disconnected */ protected void removeSwitch(IOFSwitch sw) { + // No need to acquire the asyncRemoveSwitch lock, since + // this method is only called after netty has processed all + // pending messages if (!this.switches.remove(sw.getId(), sw) || (sw.isConnected() == false)) { log.debug("Not removing switch {}; already removed", sw); return; } - + sw.setConnected(false); updateInactiveSwitchInfo(sw); Update update = new Update(sw, false); diff --git a/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java b/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java index 46a3c9bf9..afdd7beb9 100644 --- a/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java +++ b/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import net.floodlightcontroller.core.FloodlightContext; import net.floodlightcontroller.core.IFloodlightProvider; @@ -72,6 +74,7 @@ public class OFSwitchImpl implements IOFSwitch { protected Map<Integer,OFStatisticsFuture> statsFutureMap; protected boolean connected; protected TimedCache<Long> timedCache; + protected ReentrantReadWriteLock lock; public static IOFSwitchFeatures switchFeatures; @@ -87,6 +90,7 @@ public class OFSwitchImpl implements IOFSwitch { this.connected = true; this.statsFutureMap = new ConcurrentHashMap<Integer,OFStatisticsFuture>(); this.timedCache = new TimedCache<Long>(100, 5*1000 ); // 5 seconds interval + this.lock = new ReentrantReadWriteLock(); // Defaults properties for an ideal switch this.setAttribute(PROP_FASTWILDCARDS, (Integer) OFMatch.OFPFW_ALL); @@ -360,4 +364,14 @@ public class OFSwitchImpl implements IOFSwitch { public TimedCache<Long> getTimedCache() { return timedCache; } + + @Override + public Lock processMessageLock() { + return lock.readLock(); + } + + @Override + public Lock asyncRemoveSwitchLock() { + return lock.writeLock(); + } } -- GitLab