diff --git a/src/main/java/net/floodlightcontroller/core/IOFSwitch.java b/src/main/java/net/floodlightcontroller/core/IOFSwitch.java index e1ba7ef84ef32c53068a2e66d646ea6c1b846f2c..023769ee40f054477b81c0a6bdc56d3f053701fe 100644 --- a/src/main/java/net/floodlightcontroller/core/IOFSwitch.java +++ b/src/main/java/net/floodlightcontroller/core/IOFSwitch.java @@ -22,6 +22,7 @@ import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.locks.Lock; import net.floodlightcontroller.core.types.MacVlanPair; import net.floodlightcontroller.util.TimedCache; @@ -243,12 +244,14 @@ public interface IOFSwitch { /** * Check if the switch is still connected; + * Only call while holding processMessageLock * @return whether the switch is still disconnected */ public boolean isConnected(); /** * Set whether the switch is connected + * Only call while holding modifySwitchLock * @param connected whether the switch is connected */ public void setConnected(boolean connected); @@ -304,4 +307,19 @@ public interface IOFSwitch { * @return */ public TimedCache<Long> getTimedCache(); + + /** + * Return a lock that need to be held while processing a message. Multiple threads + * can hold this lock. + * @return + */ + public Lock processMessageLock(); + + /** + * Return a lock that needs to be held while the switch is removed asynchronously, i.e., + * the removing is not triggered by events on this switch's channel. + * Mutex with processMessageLock + * @return + */ + public Lock asyncRemoveSwitchLock(); } diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java index a6b4197e884544402119328fe36abc0d464234ff..6c9ef337c57d83a5d33f7b81d1ea2e782cf7c95f 100644 --- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java +++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java @@ -325,6 +325,7 @@ public class Controller @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { + // Can write to switch without holding processLock OFMessage m = factory.getMessage(OFType.ECHO_REQUEST); e.getChannel().write(m); } @@ -467,101 +468,109 @@ public class Controller */ protected void processOFMessage(OFMessage m) throws IOException, SwitchStateException { - switch (m.getType()) { - case HELLO: - log.debug("HELLO from {}", sw); - if (state.hsState.equals(HandshakeState.START)) { - state.hsState = HandshakeState.HELLO; - sendHelloConfiguration(); - } else { - throw new SwitchStateException("Unexpected HELLO from " + sw); - } - break; - case ECHO_REQUEST: - OFEchoReply reply = - (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); - reply.setXid(m.getXid()); - sw.write(reply, null); - break; - case ECHO_REPLY: - break; - case FEATURES_REPLY: - log.debug("Features Reply from {}", sw); - if (state.hsState.equals(HandshakeState.HELLO)) { - sw.setFeaturesReply((OFFeaturesReply) m); - sendFeatureReplyConfiguration(); - state.hsState = HandshakeState.FEATURES_REPLY; - // uncomment to enable "dumb" switches like cbench - // state.hsState = HandshakeState.READY; - // addSwitch(sw); - } else { - String em = "Unexpected FEATURES_REPLY from " + sw; - throw new SwitchStateException(em); - } - break; - case GET_CONFIG_REPLY: - if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) { - String em = "Unexpected GET_CONFIG_REPLY from " + sw; - throw new SwitchStateException(em); - } - OFGetConfigReply cr = (OFGetConfigReply) m; - if (cr.getMissSendLength() == (short)0xffff) { - log.debug("Config Reply from {} confirms " + - "miss length set to 0xffff", sw); - } else { - log.warn("Config Reply from {} has " + - "miss length set to {}", - sw, cr.getMissSendLength()); - } - state.hasGetConfigReply = true; - if (state.hasDescription && state.hasGetConfigReply) { - addSwitch(sw); - state.hsState = HandshakeState.READY; - } - break; - case ERROR: - OFError error = (OFError) m; - logError(sw, error); - break; - case STATS_REPLY: - if (state.hsState.ordinal() < - HandshakeState.FEATURES_REPLY.ordinal()) { - String em = "Unexpected STATS_REPLY from " + sw; - throw new SwitchStateException(em); - } - sw.deliverStatisticsReply(m); - if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) { - processSwitchDescReply(); - } - break; - /* - * "Trivial" server to test raw low-level throughput - case PACKET_IN: - OFPacketIn pi = (OFPacketIn)m; - - OFFlowMod fm = - (OFFlowMod)factory.getMessage(OFType.FLOW_MOD); - OFMatch match = new OFMatch(); - match.loadFromPacket(pi.getPacketData(), pi.getInPort()); - fm.setBufferId(pi.getBufferId()); - fm.setMatch(match); - sw.write(fm, null); - break; - */ - case PORT_STATUS: - boolean swadded = - state.hsState.equals(HandshakeState.READY); - handlePortStatusMessage(sw, (OFPortStatus)m, swadded); - // fall through - default: - if (!state.hsState.equals(HandshakeState.READY)) { - log.debug("Ignoring message type {} received " + - "from switch {} before switch is " + - "fully configured.", m.getType(), sw); + sw.processMessageLock().lock(); + try { + if (!sw.isConnected()) + return; + switch (m.getType()) { + case HELLO: + log.debug("HELLO from {}", sw); + if (state.hsState.equals(HandshakeState.START)) { + state.hsState = HandshakeState.HELLO; + sendHelloConfiguration(); + } else { + throw new SwitchStateException("Unexpected HELLO from " + sw); + } break; - } - handleMessage(sw, m, null); - break; + case ECHO_REQUEST: + OFEchoReply reply = + (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY); + reply.setXid(m.getXid()); + sw.write(reply, null); + break; + case ECHO_REPLY: + break; + case FEATURES_REPLY: + log.debug("Features Reply from {}", sw); + if (state.hsState.equals(HandshakeState.HELLO)) { + sw.setFeaturesReply((OFFeaturesReply) m); + sendFeatureReplyConfiguration(); + state.hsState = HandshakeState.FEATURES_REPLY; + // uncomment to enable "dumb" switches like cbench + // state.hsState = HandshakeState.READY; + // addSwitch(sw); + } else { + String em = "Unexpected FEATURES_REPLY from " + sw; + throw new SwitchStateException(em); + } + break; + case GET_CONFIG_REPLY: + if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) { + String em = "Unexpected GET_CONFIG_REPLY from " + sw; + throw new SwitchStateException(em); + } + OFGetConfigReply cr = (OFGetConfigReply) m; + if (cr.getMissSendLength() == (short)0xffff) { + log.debug("Config Reply from {} confirms " + + "miss length set to 0xffff", sw); + } else { + log.warn("Config Reply from {} has " + + "miss length set to {}", + sw, cr.getMissSendLength()); + } + state.hasGetConfigReply = true; + if (state.hasDescription && state.hasGetConfigReply) { + addSwitch(sw); + state.hsState = HandshakeState.READY; + } + break; + case ERROR: + OFError error = (OFError) m; + logError(sw, error); + break; + case STATS_REPLY: + if (state.hsState.ordinal() < + HandshakeState.FEATURES_REPLY.ordinal()) { + String em = "Unexpected STATS_REPLY from " + sw; + throw new SwitchStateException(em); + } + sw.deliverStatisticsReply(m); + if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) { + processSwitchDescReply(); + } + break; + /* + * "Trivial" server to test raw low-level throughput + case PACKET_IN: + OFPacketIn pi = (OFPacketIn)m; + + OFFlowMod fm = + (OFFlowMod)factory.getMessage(OFType.FLOW_MOD); + OFMatch match = new OFMatch(); + match.loadFromPacket(pi.getPacketData(), pi.getInPort()); + fm.setBufferId(pi.getBufferId()); + fm.setMatch(match); + sw.write(fm, null); + break; + */ + case PORT_STATUS: + boolean swadded = + state.hsState.equals(HandshakeState.READY); + handlePortStatusMessage(sw, (OFPortStatus)m, swadded); + // fall through + default: + if (!state.hsState.equals(HandshakeState.READY)) { + log.debug("Ignoring message type {} received " + + "from switch {} before switch is " + + "fully configured.", m.getType(), sw); + break; + } + handleMessage(sw, m, null); + break; + } + } + finally { + sw.processMessageLock().unlock(); } } } @@ -874,6 +883,8 @@ public class Controller * @param sw the new switch */ protected void addSwitch(IOFSwitch sw) { + // TODO: is it save to modify the HashMap without holding + // the old switch's lock IOFSwitch oldSw = this.switches.put(sw.getId(), sw); if (sw == oldSw) { // Note == for object equality, not .equals for value @@ -884,21 +895,27 @@ public class Controller log.info("Switch handshake successful: {}", sw); if (oldSw != null) { - log.error("New switch connection {} for already-connected switch {}", - sw, oldSw); - oldSw.setConnected(false); - updateInactiveSwitchInfo(oldSw); - - // we need to clean out old switch state definitively - // before adding the new switch - if (switchListeners != null) { - for (IOFSwitchListener listener : switchListeners) { - listener.removedSwitch(oldSw); + oldSw.asyncRemoveSwitchLock().lock(); + try { + log.error("New switch connection {} for already-connected switch {}", + sw, oldSw); + oldSw.setConnected(false); + updateInactiveSwitchInfo(oldSw); + + // we need to clean out old switch state definitively + // before adding the new switch + if (switchListeners != null) { + for (IOFSwitchListener listener : switchListeners) { + listener.removedSwitch(oldSw); + } } + // will eventually trigger a removeSwitch(), which will cause + // a "Not removing Switch ... already removed debug message. + oldSw.getChannel().close(); + } + finally { + oldSw.asyncRemoveSwitchLock().unlock(); } - // will eventually trigger a removeSwitch(), which will cause - // a "Not removing Switch ... already removed debug message. - oldSw.getChannel().close(); } updateActiveSwitchInfo(sw); @@ -915,12 +932,15 @@ public class Controller * @param sw the switch that has disconnected */ protected void removeSwitch(IOFSwitch sw) { + // No need to acquire the asyncRemoveSwitch lock, since + // this method is only called after netty has processed all + // pending messages if (!this.switches.remove(sw.getId(), sw) || (sw.isConnected() == false)) { log.debug("Not removing switch {}; already removed", sw); return; } - + sw.setConnected(false); updateInactiveSwitchInfo(sw); Update update = new Update(sw, false); diff --git a/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java b/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java index 46a3c9bf91182ec70cabc61929c40bcbaa52951d..afdd7beb97645bde699b1566609b6df2ff17ae93 100644 --- a/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java +++ b/src/main/java/net/floodlightcontroller/core/internal/OFSwitchImpl.java @@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import net.floodlightcontroller.core.FloodlightContext; import net.floodlightcontroller.core.IFloodlightProvider; @@ -72,6 +74,7 @@ public class OFSwitchImpl implements IOFSwitch { protected Map<Integer,OFStatisticsFuture> statsFutureMap; protected boolean connected; protected TimedCache<Long> timedCache; + protected ReentrantReadWriteLock lock; public static IOFSwitchFeatures switchFeatures; @@ -87,6 +90,7 @@ public class OFSwitchImpl implements IOFSwitch { this.connected = true; this.statsFutureMap = new ConcurrentHashMap<Integer,OFStatisticsFuture>(); this.timedCache = new TimedCache<Long>(100, 5*1000 ); // 5 seconds interval + this.lock = new ReentrantReadWriteLock(); // Defaults properties for an ideal switch this.setAttribute(PROP_FASTWILDCARDS, (Integer) OFMatch.OFPFW_ALL); @@ -360,4 +364,14 @@ public class OFSwitchImpl implements IOFSwitch { public TimedCache<Long> getTimedCache() { return timedCache; } + + @Override + public Lock processMessageLock() { + return lock.readLock(); + } + + @Override + public Lock asyncRemoveSwitchLock() { + return lock.writeLock(); + } }