From 3620e5d1a3d73f2731e5e1dcb78306abece93761 Mon Sep 17 00:00:00 2001 From: Gregor Maier <gregor.maier@bigswitch.com> Date: Tue, 25 Sep 2012 12:46:51 -0700 Subject: [PATCH] Add a cache to dampen open flow messages. Remove non thread-safe counters from TimedCache Add hashCode() and equals() to OFEchoRequest --- .../util/OFMessageDamper.java | 123 +++++++ .../floodlightcontroller/util/TimedCache.java | 20 +- .../org/openflow/protocol/OFEchoRequest.java | 30 ++ .../util/OFMessageDamperMockSwitch.java | 312 ++++++++++++++++++ .../util/OFMessageDamperTest.java | 150 +++++++++ .../util/TimedCacheTest.java | 90 +++++ 6 files changed, 711 insertions(+), 14 deletions(-) create mode 100644 src/main/java/net/floodlightcontroller/util/OFMessageDamper.java create mode 100644 src/test/java/net/floodlightcontroller/util/OFMessageDamperMockSwitch.java create mode 100644 src/test/java/net/floodlightcontroller/util/OFMessageDamperTest.java create mode 100644 src/test/java/net/floodlightcontroller/util/TimedCacheTest.java diff --git a/src/main/java/net/floodlightcontroller/util/OFMessageDamper.java b/src/main/java/net/floodlightcontroller/util/OFMessageDamper.java new file mode 100644 index 000000000..d9ae386f9 --- /dev/null +++ b/src/main/java/net/floodlightcontroller/util/OFMessageDamper.java @@ -0,0 +1,123 @@ +/* + * Copyright Big Switch Networks 2012 + */ + +package net.floodlightcontroller.util; + +import java.io.IOException; +import java.util.EnumSet; +import java.util.Set; + +import net.floodlightcontroller.core.FloodlightContext; +import net.floodlightcontroller.core.IOFSwitch; + +import org.openflow.protocol.OFMessage; +import org.openflow.protocol.OFType; + +/** + * Dampens OFMessages sent to an OF switch. A message is only written to + * a switch if the same message (as defined by .equals()) has not been written + * in the last n milliseconds. Timer granularity is based on TimedCache + * @author gregor + * + */ +public class OFMessageDamper { + /** + * An entry in the TimedCache. A cache entry consists of the sent message + * as well as the switch to which the message was sent. + * + * NOTE: We currently use the full OFMessage object. To save space, we + * could use a cryptographic hash (e.g., SHA-1). However, this would + * obviously be more time-consuming.... + * + * We also store a reference to the actual IOFSwitch object and /not/ + * the switch DPID. This way we are guarnteed to not dampen messages if + * a switch disconnects and then reconnects. + * + * @author gregor + */ + protected static class DamperEntry { + OFMessage msg; + IOFSwitch sw; + public DamperEntry(OFMessage msg, IOFSwitch sw) { + super(); + this.msg = msg; + this.sw = sw; + } + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((msg == null) ? 0 : msg.hashCode()); + result = prime * result + ((sw == null) ? 0 : sw.hashCode()); + return result; + } + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + DamperEntry other = (DamperEntry) obj; + if (msg == null) { + if (other.msg != null) return false; + } else if (!msg.equals(other.msg)) return false; + if (sw == null) { + if (other.sw != null) return false; + } else if (!sw.equals(other.sw)) return false; + return true; + } + + + } + TimedCache<DamperEntry> cache; + EnumSet<OFType> msgTypesToCache; + /** + * + * @param capacity the maximum number of messages that should be + * kept + * @param typesToDampen The set of OFMessageTypes that should be + * dampened by this instance. Other types will be passed through + * @param timeout The dampening timeout. A message will only be + * written if the last write for the an equal message more than + * timeout ms ago. + */ + public OFMessageDamper(int capacity, + Set<OFType> typesToDampen, + int timeout) { + cache = new TimedCache<DamperEntry>(capacity, timeout); + msgTypesToCache = EnumSet.copyOf(typesToDampen); + } + + + /** + * write the messag to the switch according to our dampening settings + * @param sw + * @param msg + * @param cntx + * @return true if the message was written to the switch, false if + * the message was dampened. + * @throws IOException + */ + public boolean write(IOFSwitch sw, OFMessage msg, FloodlightContext cntx) + throws IOException { + if (! msgTypesToCache.contains(msg.getType())) { + sw.write(msg, cntx); + return true; + } + + DamperEntry entry = new DamperEntry(msg, sw); + if (cache.update(entry)) { + // entry exists in cache. Dampening. + return false; + } else { + sw.write(msg, cntx); + return true; + } + } +} \ No newline at end of file diff --git a/src/main/java/net/floodlightcontroller/util/TimedCache.java b/src/main/java/net/floodlightcontroller/util/TimedCache.java index 857d57de9..7341df7d3 100644 --- a/src/main/java/net/floodlightcontroller/util/TimedCache.java +++ b/src/main/java/net/floodlightcontroller/util/TimedCache.java @@ -31,30 +31,24 @@ import java.util.concurrent.ConcurrentMap; public class TimedCache<K> { private final long timeoutInterval; //specified in milliseconds. private ConcurrentMap<K, Long> cache; - private long cacheHits; - private long totalHits; + /** + * + * @param capacity the maximum number of entries in the cache before the + * oldest entry is evicted. + * @param timeToLive specified in milliseconds + */ public TimedCache(int capacity, int timeToLive) { cache = new ConcurrentLinkedHashMap.Builder<K, Long>() .maximumWeightedCapacity(capacity) .build(); this.timeoutInterval = timeToLive; - this.cacheHits = 0; - this.totalHits = 0; } public long getTimeoutInterval() { return this.timeoutInterval; } - public long getCacheHits() { - return cacheHits; - } - - public long getTotalHits() { - return totalHits; - } - /** * Always try to update the cache and set the last-seen value for this key. * @@ -70,7 +64,6 @@ public class TimedCache<K> { Long curr = new Long(System.currentTimeMillis()); Long prev = cache.putIfAbsent(key, curr); - this.totalHits++; if (prev == null) { return false; } @@ -81,7 +74,6 @@ public class TimedCache<K> { } } - this.cacheHits++; return true; } } diff --git a/src/main/java/org/openflow/protocol/OFEchoRequest.java b/src/main/java/org/openflow/protocol/OFEchoRequest.java index 708c6ea81..295a3972e 100644 --- a/src/main/java/org/openflow/protocol/OFEchoRequest.java +++ b/src/main/java/org/openflow/protocol/OFEchoRequest.java @@ -18,6 +18,8 @@ package org.openflow.protocol; +import java.util.Arrays; + import org.jboss.netty.buffer.ChannelBuffer; import org.openflow.util.U16; @@ -68,4 +70,32 @@ public class OFEchoRequest extends OFMessage { if (payload != null) bb.writeBytes(payload); } + + /* (non-Javadoc) + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + Arrays.hashCode(payload); + return result; + } + + /* (non-Javadoc) + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (getClass() != obj.getClass()) + return false; + OFEchoRequest other = (OFEchoRequest) obj; + if (!Arrays.equals(payload, other.payload)) + return false; + return true; + } } diff --git a/src/test/java/net/floodlightcontroller/util/OFMessageDamperMockSwitch.java b/src/test/java/net/floodlightcontroller/util/OFMessageDamperMockSwitch.java new file mode 100644 index 000000000..693294857 --- /dev/null +++ b/src/test/java/net/floodlightcontroller/util/OFMessageDamperMockSwitch.java @@ -0,0 +1,312 @@ +package net.floodlightcontroller.util; + +import static org.junit.Assert.*; +import java.io.IOException; + +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import net.floodlightcontroller.core.FloodlightContext; +import net.floodlightcontroller.core.IOFMessageListener; +import net.floodlightcontroller.core.IOFSwitch; +import net.floodlightcontroller.core.IFloodlightProviderService.Role; + +import org.jboss.netty.channel.Channel; +import org.openflow.protocol.OFFeaturesReply; +import org.openflow.protocol.OFMessage; +import org.openflow.protocol.OFPhysicalPort; +import org.openflow.protocol.OFStatisticsRequest; +import org.openflow.protocol.statistics.OFDescriptionStatistics; +import org.openflow.protocol.statistics.OFStatistics; + + +/** + * A mock implementation of IFOSwitch we use for {@link OFMessageDamper} + * + * We need to mock equals() and hashCode() but alas, EasyMock doesn't support + * this. Sigh. And of course this happens to be the interface with the most + * methods. + * @author gregor + * + */ +public class OFMessageDamperMockSwitch implements IOFSwitch { + OFMessage writtenMessage; + FloodlightContext writtenContext; + + public OFMessageDamperMockSwitch() { + reset(); + } + + /* reset this mock. I.e., clear the stored message previously written */ + public void reset() { + writtenMessage = null; + writtenContext = null; + } + + /* assert that a message was written to this switch and that the + * written message and context matches the expected values + * @param expected + * @param expectedContext + */ + public void assertMessageWasWritten(OFMessage expected, + FloodlightContext expectedContext) { + assertNotNull("No OFMessage was written", writtenMessage); + assertEquals(expected, writtenMessage); + assertEquals(expectedContext, writtenContext); + } + + /* + * assert that no message was written + */ + public void assertNoMessageWritten() { + assertNull("OFMessage was written but didn't expect one", + writtenMessage); + assertNull("There was a context but didn't expect one", + writtenContext); + } + + /* + * use hashCode() and equals() from Object + */ + + + //------------------------------------------------------- + // IOFSwitch: mocked methods + @Override + public void write(OFMessage m, FloodlightContext bc) throws IOException { + assertNull("write() called but already have message", writtenMessage); + assertNull("write() called but already have context", writtenContext); + writtenContext = bc; + writtenMessage = m; + } + + //------------------------------------------------------- + // IOFSwitch: not-implemented methods + @Override + public void write(List<OFMessage> msglist, FloodlightContext bc) + throws IOException { + assertTrue("Unexpected method call", false); + } + + @Override + public void disconnectOutputStream() { + assertTrue("Unexpected method call", false); + } + + @Override + public Channel getChannel() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public OFFeaturesReply getFeaturesReply() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public void setFeaturesReply(OFFeaturesReply featuresReply) { + assertTrue("Unexpected method call", false); + } + + @Override + public void setSwitchProperties(OFDescriptionStatistics description) { + assertTrue("Unexpected method call", false); + // TODO Auto-generated method stub + } + + @Override + public Collection<OFPhysicalPort> getEnabledPorts() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public Collection<Short> getEnabledPortNumbers() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public OFPhysicalPort getPort(short portNumber) { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public OFPhysicalPort getPort(String portName) { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public void setPort(OFPhysicalPort port) { + assertTrue("Unexpected method call", false); + } + + @Override + public void deletePort(short portNumber) { + assertTrue("Unexpected method call", false); + } + + @Override + public void deletePort(String portName) { + assertTrue("Unexpected method call", false); + } + + @Override + public Collection<OFPhysicalPort> getPorts() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public boolean portEnabled(short portName) { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public boolean portEnabled(String portName) { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public boolean portEnabled(OFPhysicalPort port) { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public long getId() { + assertTrue("Unexpected method call", false); + return 0; + } + + @Override + public String getStringId() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public Map<Object, Object> getAttributes() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public Date getConnectedSince() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public int getNextTransactionId() { + assertTrue("Unexpected method call", false); + return 0; + } + + @Override + public Future<List<OFStatistics>> + getStatistics(OFStatisticsRequest request) throws IOException { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public boolean isConnected() { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public void setConnected(boolean connected) { + assertTrue("Unexpected method call", false); + } + + @Override + public Role getRole() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public boolean isActive() { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public void deliverStatisticsReply(OFMessage reply) { + assertTrue("Unexpected method call", false); + } + + @Override + public void cancelStatisticsReply(int transactionId) { + assertTrue("Unexpected method call", false); + } + + @Override + public void cancelAllStatisticsReplies() { + assertTrue("Unexpected method call", false); + } + + @Override + public boolean hasAttribute(String name) { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public Object getAttribute(String name) { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public void setAttribute(String name, Object value) { + assertTrue("Unexpected method call", false); + } + + @Override + public Object removeAttribute(String name) { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public void clearAllFlowMods() { + assertTrue("Unexpected method call", false); + } + + @Override + public boolean updateBroadcastCache(Long entry, Short port) { + assertTrue("Unexpected method call", false); + return false; + } + + @Override + public Map<Short, Long> getPortBroadcastHits() { + assertTrue("Unexpected method call", false); + return null; + } + + @Override + public void sendStatsQuery(OFStatisticsRequest request, int xid, + IOFMessageListener caller) + throws IOException { + assertTrue("Unexpected method call", false); + } + + @Override + public void flush() { + assertTrue("Unexpected method call", false); + } + +} \ No newline at end of file diff --git a/src/test/java/net/floodlightcontroller/util/OFMessageDamperTest.java b/src/test/java/net/floodlightcontroller/util/OFMessageDamperTest.java new file mode 100644 index 000000000..8cad75686 --- /dev/null +++ b/src/test/java/net/floodlightcontroller/util/OFMessageDamperTest.java @@ -0,0 +1,150 @@ +package net.floodlightcontroller.util; + +import static org.junit.Assert.*; + +import net.floodlightcontroller.core.FloodlightContext; + +import org.junit.Before; +import org.junit.Test; +import org.openflow.protocol.OFEchoRequest; +import org.openflow.protocol.OFHello; +import org.openflow.protocol.OFMessage; +import org.openflow.protocol.OFType; +import org.openflow.protocol.factory.BasicFactory; +import org.openflow.protocol.factory.OFMessageFactory; + +import java.io.IOException; +import java.util.EnumSet; + +public class OFMessageDamperTest { + OFMessageFactory factory; + OFMessageDamper damper; + FloodlightContext cntx; + + OFMessageDamperMockSwitch sw1; + OFMessageDamperMockSwitch sw2; + + OFEchoRequest echoRequst1; + OFEchoRequest echoRequst1Clone; + OFEchoRequest echoRequst2; + OFHello hello1; + OFHello hello2; + + + + @Before + public void setUp() throws IOException { + factory = new BasicFactory(); + cntx = new FloodlightContext(); + + sw1 = new OFMessageDamperMockSwitch(); + sw2 = new OFMessageDamperMockSwitch(); + + echoRequst1 = (OFEchoRequest)factory.getMessage(OFType.ECHO_REQUEST); + echoRequst1.setPayload(new byte[] { 1 }); + echoRequst1Clone = (OFEchoRequest) + factory.getMessage(OFType.ECHO_REQUEST); + echoRequst1Clone.setPayload(new byte[] { 1 }); + echoRequst2 = (OFEchoRequest)factory.getMessage(OFType.ECHO_REQUEST); + echoRequst2.setPayload(new byte[] { 2 }); + + hello1 = (OFHello)factory.getMessage(OFType.HELLO); + hello1.setXid(1); + hello2 = (OFHello)factory.getMessage(OFType.HELLO); + hello2.setXid(2); + + } + + protected void doWrite(boolean expectWrite, + OFMessageDamperMockSwitch sw, + OFMessage msg, + FloodlightContext cntx) throws IOException { + + boolean result; + sw.reset(); + result = damper.write(sw, msg, cntx); + + if (expectWrite) { + assertEquals(true, result); + sw.assertMessageWasWritten(msg, cntx); + } else { + assertEquals(false, result); + sw.assertNoMessageWritten(); + } + } + + + @Test + public void testOneMessageType() throws IOException, InterruptedException { + int timeout = 50; + int sleepTime = 60; + damper = new OFMessageDamper(100, + EnumSet.of(OFType.ECHO_REQUEST), + timeout); + + + + // echo requests should be dampened + doWrite(true, sw1, echoRequst1, cntx); + doWrite(false, sw1, echoRequst1, cntx); + doWrite(false, sw1, echoRequst1Clone, cntx); + doWrite(true, sw1, echoRequst2, cntx); + doWrite(false, sw1, echoRequst2, cntx); + + // we don't dampen hellos. All should succeed + doWrite(true, sw1, hello1, cntx); + doWrite(true, sw1, hello1, cntx); + doWrite(true, sw1, hello1, cntx); + + // echo request should also be dampened on sw2 + doWrite(true, sw2, echoRequst1, cntx); + doWrite(false, sw2, echoRequst1, cntx); + doWrite(true, sw2, echoRequst2, cntx); + + + Thread.sleep(sleepTime); + doWrite(true, sw1, echoRequst1, cntx); + doWrite(true, sw2, echoRequst1, cntx); + + } + + @Test + public void testTwoMessageTypes() throws IOException, InterruptedException { + int timeout = 50; + int sleepTime = 60; + damper = new OFMessageDamper(100, + EnumSet.of(OFType.ECHO_REQUEST, + OFType.HELLO), + timeout); + + + + // echo requests should be dampened + doWrite(true, sw1, echoRequst1, cntx); + doWrite(false, sw1, echoRequst1, cntx); + doWrite(false, sw1, echoRequst1Clone, cntx); + doWrite(true, sw1, echoRequst2, cntx); + doWrite(false, sw1, echoRequst2, cntx); + + // hello should be dampened as well + doWrite(true, sw1, hello1, cntx); + doWrite(false, sw1, hello1, cntx); + doWrite(false, sw1, hello1, cntx); + + doWrite(true, sw1, hello2, cntx); + doWrite(false, sw1, hello2, cntx); + doWrite(false, sw1, hello2, cntx); + + // echo request should also be dampened on sw2 + doWrite(true, sw2, echoRequst1, cntx); + doWrite(false, sw2, echoRequst1, cntx); + doWrite(true, sw2, echoRequst2, cntx); + + Thread.sleep(sleepTime); + doWrite(true, sw1, echoRequst1, cntx); + doWrite(true, sw2, echoRequst1, cntx); + doWrite(true, sw1, hello1, cntx); + doWrite(true, sw1, hello2, cntx); + } + +} diff --git a/src/test/java/net/floodlightcontroller/util/TimedCacheTest.java b/src/test/java/net/floodlightcontroller/util/TimedCacheTest.java new file mode 100644 index 000000000..5e70641cb --- /dev/null +++ b/src/test/java/net/floodlightcontroller/util/TimedCacheTest.java @@ -0,0 +1,90 @@ +package net.floodlightcontroller.util; + +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.Test; + +public class TimedCacheTest { + public static class CacheEntry { + public int key; + + public CacheEntry(int key) { + this.key = key; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + key; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CacheEntry other = (CacheEntry) obj; + if (key != other.key) + return false; + return true; + } + } + + protected TimedCache<CacheEntry> cache; + + @Before + public void setUp() { + // + } + + + @Test + public void testCaching() throws InterruptedException { + int timeout = 50; + int timeToSleep = 60; + cache = new TimedCache<TimedCacheTest.CacheEntry>(100, timeout); + + CacheEntry e1a = new CacheEntry(1); + CacheEntry e1b = new CacheEntry(1); + CacheEntry e1c = new CacheEntry(1); + CacheEntry e2 = new CacheEntry(2); + + assertEquals(false, cache.update(e1a)); + assertEquals(true, cache.update(e1a)); + assertEquals(true, cache.update(e1b)); + assertEquals(true, cache.update(e1c)); + assertEquals(false, cache.update(e2)); + assertEquals(true, cache.update(e2)); + + Thread.sleep(timeToSleep); + assertEquals(false, cache.update(e1a)); + assertEquals(false, cache.update(e2)); + } + + @Test + public void testCapacity() throws InterruptedException { + int timeout = 5000; + cache = new TimedCache<TimedCacheTest.CacheEntry>(2, timeout); + + // Testing the capacity is tricky since the capacity can be + // exceeded for short amounts of time, so we try to flood the cache + // to make sure the first entry is expired + CacheEntry e1 = new CacheEntry(1); + for (int i=0; i < 100; i++) { + CacheEntry e = new CacheEntry(i); + cache.update(e); + } + + // entry 1 should have been expired due to capacity limits + assertEquals(false, cache.update(e1)); + } + + + +} -- GitLab