Skip to content
Snippets Groups Projects
Commit e31f062b authored by Gregor Maier's avatar Gregor Maier
Browse files

Fix continouing problems when switches with same DPID connect.

After a switch is asynchronously removed due to another switch with the
same DPID connecting, the first switch might still have messages qeued.
If we process these messages after removing the switch switch listeners
can get confused.

[#23382219]
parent 0ba0e8bb
No related branches found
No related tags found
No related merge requests found
......@@ -22,6 +22,7 @@ import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import net.floodlightcontroller.core.types.MacVlanPair;
import net.floodlightcontroller.util.TimedCache;
......@@ -243,12 +244,14 @@ public interface IOFSwitch {
/**
* Check if the switch is still connected;
* Only call while holding processMessageLock
* @return whether the switch is still disconnected
*/
public boolean isConnected();
/**
* Set whether the switch is connected
* Only call while holding modifySwitchLock
* @param connected whether the switch is connected
*/
public void setConnected(boolean connected);
......@@ -304,4 +307,19 @@ public interface IOFSwitch {
* @return
*/
public TimedCache<Long> getTimedCache();
/**
* Return a lock that need to be held while processing a message. Multiple threads
* can hold this lock.
* @return
*/
public Lock processMessageLock();
/**
* Return a lock that needs to be held while the switch is removed asynchronously, i.e.,
* the removing is not triggered by events on this switch's channel.
* Mutex with processMessageLock
* @return
*/
public Lock asyncRemoveSwitchLock();
}
......@@ -325,6 +325,7 @@ public class Controller
@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
throws Exception {
// Can write to switch without holding processLock
OFMessage m = factory.getMessage(OFType.ECHO_REQUEST);
e.getChannel().write(m);
}
......@@ -467,101 +468,109 @@ public class Controller
*/
protected void processOFMessage(OFMessage m)
throws IOException, SwitchStateException {
switch (m.getType()) {
case HELLO:
log.debug("HELLO from {}", sw);
if (state.hsState.equals(HandshakeState.START)) {
state.hsState = HandshakeState.HELLO;
sendHelloConfiguration();
} else {
throw new SwitchStateException("Unexpected HELLO from " + sw);
}
break;
case ECHO_REQUEST:
OFEchoReply reply =
(OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
reply.setXid(m.getXid());
sw.write(reply, null);
break;
case ECHO_REPLY:
break;
case FEATURES_REPLY:
log.debug("Features Reply from {}", sw);
if (state.hsState.equals(HandshakeState.HELLO)) {
sw.setFeaturesReply((OFFeaturesReply) m);
sendFeatureReplyConfiguration();
state.hsState = HandshakeState.FEATURES_REPLY;
// uncomment to enable "dumb" switches like cbench
// state.hsState = HandshakeState.READY;
// addSwitch(sw);
} else {
String em = "Unexpected FEATURES_REPLY from " + sw;
throw new SwitchStateException(em);
}
break;
case GET_CONFIG_REPLY:
if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
String em = "Unexpected GET_CONFIG_REPLY from " + sw;
throw new SwitchStateException(em);
}
OFGetConfigReply cr = (OFGetConfigReply) m;
if (cr.getMissSendLength() == (short)0xffff) {
log.debug("Config Reply from {} confirms " +
"miss length set to 0xffff", sw);
} else {
log.warn("Config Reply from {} has " +
"miss length set to {}",
sw, cr.getMissSendLength());
}
state.hasGetConfigReply = true;
if (state.hasDescription && state.hasGetConfigReply) {
addSwitch(sw);
state.hsState = HandshakeState.READY;
}
break;
case ERROR:
OFError error = (OFError) m;
logError(sw, error);
break;
case STATS_REPLY:
if (state.hsState.ordinal() <
HandshakeState.FEATURES_REPLY.ordinal()) {
String em = "Unexpected STATS_REPLY from " + sw;
throw new SwitchStateException(em);
}
sw.deliverStatisticsReply(m);
if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
processSwitchDescReply();
}
break;
/*
* "Trivial" server to test raw low-level throughput
case PACKET_IN:
OFPacketIn pi = (OFPacketIn)m;
OFFlowMod fm =
(OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
OFMatch match = new OFMatch();
match.loadFromPacket(pi.getPacketData(), pi.getInPort());
fm.setBufferId(pi.getBufferId());
fm.setMatch(match);
sw.write(fm, null);
break;
*/
case PORT_STATUS:
boolean swadded =
state.hsState.equals(HandshakeState.READY);
handlePortStatusMessage(sw, (OFPortStatus)m, swadded);
// fall through
default:
if (!state.hsState.equals(HandshakeState.READY)) {
log.debug("Ignoring message type {} received " +
"from switch {} before switch is " +
"fully configured.", m.getType(), sw);
sw.processMessageLock().lock();
try {
if (!sw.isConnected())
return;
switch (m.getType()) {
case HELLO:
log.debug("HELLO from {}", sw);
if (state.hsState.equals(HandshakeState.START)) {
state.hsState = HandshakeState.HELLO;
sendHelloConfiguration();
} else {
throw new SwitchStateException("Unexpected HELLO from " + sw);
}
break;
}
handleMessage(sw, m, null);
break;
case ECHO_REQUEST:
OFEchoReply reply =
(OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
reply.setXid(m.getXid());
sw.write(reply, null);
break;
case ECHO_REPLY:
break;
case FEATURES_REPLY:
log.debug("Features Reply from {}", sw);
if (state.hsState.equals(HandshakeState.HELLO)) {
sw.setFeaturesReply((OFFeaturesReply) m);
sendFeatureReplyConfiguration();
state.hsState = HandshakeState.FEATURES_REPLY;
// uncomment to enable "dumb" switches like cbench
// state.hsState = HandshakeState.READY;
// addSwitch(sw);
} else {
String em = "Unexpected FEATURES_REPLY from " + sw;
throw new SwitchStateException(em);
}
break;
case GET_CONFIG_REPLY:
if (!state.hsState.equals(HandshakeState.FEATURES_REPLY)) {
String em = "Unexpected GET_CONFIG_REPLY from " + sw;
throw new SwitchStateException(em);
}
OFGetConfigReply cr = (OFGetConfigReply) m;
if (cr.getMissSendLength() == (short)0xffff) {
log.debug("Config Reply from {} confirms " +
"miss length set to 0xffff", sw);
} else {
log.warn("Config Reply from {} has " +
"miss length set to {}",
sw, cr.getMissSendLength());
}
state.hasGetConfigReply = true;
if (state.hasDescription && state.hasGetConfigReply) {
addSwitch(sw);
state.hsState = HandshakeState.READY;
}
break;
case ERROR:
OFError error = (OFError) m;
logError(sw, error);
break;
case STATS_REPLY:
if (state.hsState.ordinal() <
HandshakeState.FEATURES_REPLY.ordinal()) {
String em = "Unexpected STATS_REPLY from " + sw;
throw new SwitchStateException(em);
}
sw.deliverStatisticsReply(m);
if (sw.hasAttribute(IOFSwitch.SWITCH_DESCRIPTION_FUTURE)) {
processSwitchDescReply();
}
break;
/*
* "Trivial" server to test raw low-level throughput
case PACKET_IN:
OFPacketIn pi = (OFPacketIn)m;
OFFlowMod fm =
(OFFlowMod)factory.getMessage(OFType.FLOW_MOD);
OFMatch match = new OFMatch();
match.loadFromPacket(pi.getPacketData(), pi.getInPort());
fm.setBufferId(pi.getBufferId());
fm.setMatch(match);
sw.write(fm, null);
break;
*/
case PORT_STATUS:
boolean swadded =
state.hsState.equals(HandshakeState.READY);
handlePortStatusMessage(sw, (OFPortStatus)m, swadded);
// fall through
default:
if (!state.hsState.equals(HandshakeState.READY)) {
log.debug("Ignoring message type {} received " +
"from switch {} before switch is " +
"fully configured.", m.getType(), sw);
break;
}
handleMessage(sw, m, null);
break;
}
}
finally {
sw.processMessageLock().unlock();
}
}
}
......@@ -874,6 +883,8 @@ public class Controller
* @param sw the new switch
*/
protected void addSwitch(IOFSwitch sw) {
// TODO: is it save to modify the HashMap without holding
// the old switch's lock
IOFSwitch oldSw = this.switches.put(sw.getId(), sw);
if (sw == oldSw) {
// Note == for object equality, not .equals for value
......@@ -884,21 +895,27 @@ public class Controller
log.info("Switch handshake successful: {}", sw);
if (oldSw != null) {
log.error("New switch connection {} for already-connected switch {}",
sw, oldSw);
oldSw.setConnected(false);
updateInactiveSwitchInfo(oldSw);
// we need to clean out old switch state definitively
// before adding the new switch
if (switchListeners != null) {
for (IOFSwitchListener listener : switchListeners) {
listener.removedSwitch(oldSw);
oldSw.asyncRemoveSwitchLock().lock();
try {
log.error("New switch connection {} for already-connected switch {}",
sw, oldSw);
oldSw.setConnected(false);
updateInactiveSwitchInfo(oldSw);
// we need to clean out old switch state definitively
// before adding the new switch
if (switchListeners != null) {
for (IOFSwitchListener listener : switchListeners) {
listener.removedSwitch(oldSw);
}
}
// will eventually trigger a removeSwitch(), which will cause
// a "Not removing Switch ... already removed debug message.
oldSw.getChannel().close();
}
finally {
oldSw.asyncRemoveSwitchLock().unlock();
}
// will eventually trigger a removeSwitch(), which will cause
// a "Not removing Switch ... already removed debug message.
oldSw.getChannel().close();
}
updateActiveSwitchInfo(sw);
......@@ -915,12 +932,15 @@ public class Controller
* @param sw the switch that has disconnected
*/
protected void removeSwitch(IOFSwitch sw) {
// No need to acquire the asyncRemoveSwitch lock, since
// this method is only called after netty has processed all
// pending messages
if (!this.switches.remove(sw.getId(), sw) ||
(sw.isConnected() == false)) {
log.debug("Not removing switch {}; already removed", sw);
return;
}
sw.setConnected(false);
updateInactiveSwitchInfo(sw);
Update update = new Update(sw, false);
......
......@@ -26,6 +26,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IFloodlightProvider;
......@@ -72,6 +74,7 @@ public class OFSwitchImpl implements IOFSwitch {
protected Map<Integer,OFStatisticsFuture> statsFutureMap;
protected boolean connected;
protected TimedCache<Long> timedCache;
protected ReentrantReadWriteLock lock;
public static IOFSwitchFeatures switchFeatures;
......@@ -87,6 +90,7 @@ public class OFSwitchImpl implements IOFSwitch {
this.connected = true;
this.statsFutureMap = new ConcurrentHashMap<Integer,OFStatisticsFuture>();
this.timedCache = new TimedCache<Long>(100, 5*1000 ); // 5 seconds interval
this.lock = new ReentrantReadWriteLock();
// Defaults properties for an ideal switch
this.setAttribute(PROP_FASTWILDCARDS, (Integer) OFMatch.OFPFW_ALL);
......@@ -360,4 +364,14 @@ public class OFSwitchImpl implements IOFSwitch {
public TimedCache<Long> getTimedCache() {
return timedCache;
}
@Override
public Lock processMessageLock() {
return lock.readLock();
}
@Override
public Lock asyncRemoveSwitchLock() {
return lock.writeLock();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment