Skip to content
Snippets Groups Projects
Commit 3620e5d1 authored by Gregor Maier's avatar Gregor Maier
Browse files

Add a cache to dampen open flow messages.

Remove non thread-safe counters from TimedCache
Add hashCode() and equals() to OFEchoRequest
parent c4f7aca7
No related branches found
No related tags found
No related merge requests found
/*
* Copyright Big Switch Networks 2012
*/
package net.floodlightcontroller.util;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IOFSwitch;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFType;
/**
* Dampens OFMessages sent to an OF switch. A message is only written to
* a switch if the same message (as defined by .equals()) has not been written
* in the last n milliseconds. Timer granularity is based on TimedCache
* @author gregor
*
*/
public class OFMessageDamper {
/**
* An entry in the TimedCache. A cache entry consists of the sent message
* as well as the switch to which the message was sent.
*
* NOTE: We currently use the full OFMessage object. To save space, we
* could use a cryptographic hash (e.g., SHA-1). However, this would
* obviously be more time-consuming....
*
* We also store a reference to the actual IOFSwitch object and /not/
* the switch DPID. This way we are guarnteed to not dampen messages if
* a switch disconnects and then reconnects.
*
* @author gregor
*/
protected static class DamperEntry {
OFMessage msg;
IOFSwitch sw;
public DamperEntry(OFMessage msg, IOFSwitch sw) {
super();
this.msg = msg;
this.sw = sw;
}
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((msg == null) ? 0 : msg.hashCode());
result = prime * result + ((sw == null) ? 0 : sw.hashCode());
return result;
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
DamperEntry other = (DamperEntry) obj;
if (msg == null) {
if (other.msg != null) return false;
} else if (!msg.equals(other.msg)) return false;
if (sw == null) {
if (other.sw != null) return false;
} else if (!sw.equals(other.sw)) return false;
return true;
}
}
TimedCache<DamperEntry> cache;
EnumSet<OFType> msgTypesToCache;
/**
*
* @param capacity the maximum number of messages that should be
* kept
* @param typesToDampen The set of OFMessageTypes that should be
* dampened by this instance. Other types will be passed through
* @param timeout The dampening timeout. A message will only be
* written if the last write for the an equal message more than
* timeout ms ago.
*/
public OFMessageDamper(int capacity,
Set<OFType> typesToDampen,
int timeout) {
cache = new TimedCache<DamperEntry>(capacity, timeout);
msgTypesToCache = EnumSet.copyOf(typesToDampen);
}
/**
* write the messag to the switch according to our dampening settings
* @param sw
* @param msg
* @param cntx
* @return true if the message was written to the switch, false if
* the message was dampened.
* @throws IOException
*/
public boolean write(IOFSwitch sw, OFMessage msg, FloodlightContext cntx)
throws IOException {
if (! msgTypesToCache.contains(msg.getType())) {
sw.write(msg, cntx);
return true;
}
DamperEntry entry = new DamperEntry(msg, sw);
if (cache.update(entry)) {
// entry exists in cache. Dampening.
return false;
} else {
sw.write(msg, cntx);
return true;
}
}
}
\ No newline at end of file
......@@ -31,30 +31,24 @@ import java.util.concurrent.ConcurrentMap;
public class TimedCache<K> {
private final long timeoutInterval; //specified in milliseconds.
private ConcurrentMap<K, Long> cache;
private long cacheHits;
private long totalHits;
/**
*
* @param capacity the maximum number of entries in the cache before the
* oldest entry is evicted.
* @param timeToLive specified in milliseconds
*/
public TimedCache(int capacity, int timeToLive) {
cache = new ConcurrentLinkedHashMap.Builder<K, Long>()
.maximumWeightedCapacity(capacity)
.build();
this.timeoutInterval = timeToLive;
this.cacheHits = 0;
this.totalHits = 0;
}
public long getTimeoutInterval() {
return this.timeoutInterval;
}
public long getCacheHits() {
return cacheHits;
}
public long getTotalHits() {
return totalHits;
}
/**
* Always try to update the cache and set the last-seen value for this key.
*
......@@ -70,7 +64,6 @@ public class TimedCache<K> {
Long curr = new Long(System.currentTimeMillis());
Long prev = cache.putIfAbsent(key, curr);
this.totalHits++;
if (prev == null) {
return false;
}
......@@ -81,7 +74,6 @@ public class TimedCache<K> {
}
}
this.cacheHits++;
return true;
}
}
......@@ -18,6 +18,8 @@
package org.openflow.protocol;
import java.util.Arrays;
import org.jboss.netty.buffer.ChannelBuffer;
import org.openflow.util.U16;
......@@ -68,4 +70,32 @@ public class OFEchoRequest extends OFMessage {
if (payload != null)
bb.writeBytes(payload);
}
/* (non-Javadoc)
* @see java.lang.Object#hashCode()
*/
@Override
public int hashCode() {
final int prime = 31;
int result = super.hashCode();
result = prime * result + Arrays.hashCode(payload);
return result;
}
/* (non-Javadoc)
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (!super.equals(obj))
return false;
if (getClass() != obj.getClass())
return false;
OFEchoRequest other = (OFEchoRequest) obj;
if (!Arrays.equals(payload, other.payload))
return false;
return true;
}
}
package net.floodlightcontroller.util;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import net.floodlightcontroller.core.FloodlightContext;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
import net.floodlightcontroller.core.IFloodlightProviderService.Role;
import org.jboss.netty.channel.Channel;
import org.openflow.protocol.OFFeaturesReply;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFPhysicalPort;
import org.openflow.protocol.OFStatisticsRequest;
import org.openflow.protocol.statistics.OFDescriptionStatistics;
import org.openflow.protocol.statistics.OFStatistics;
/**
* A mock implementation of IFOSwitch we use for {@link OFMessageDamper}
*
* We need to mock equals() and hashCode() but alas, EasyMock doesn't support
* this. Sigh. And of course this happens to be the interface with the most
* methods.
* @author gregor
*
*/
public class OFMessageDamperMockSwitch implements IOFSwitch {
OFMessage writtenMessage;
FloodlightContext writtenContext;
public OFMessageDamperMockSwitch() {
reset();
}
/* reset this mock. I.e., clear the stored message previously written */
public void reset() {
writtenMessage = null;
writtenContext = null;
}
/* assert that a message was written to this switch and that the
* written message and context matches the expected values
* @param expected
* @param expectedContext
*/
public void assertMessageWasWritten(OFMessage expected,
FloodlightContext expectedContext) {
assertNotNull("No OFMessage was written", writtenMessage);
assertEquals(expected, writtenMessage);
assertEquals(expectedContext, writtenContext);
}
/*
* assert that no message was written
*/
public void assertNoMessageWritten() {
assertNull("OFMessage was written but didn't expect one",
writtenMessage);
assertNull("There was a context but didn't expect one",
writtenContext);
}
/*
* use hashCode() and equals() from Object
*/
//-------------------------------------------------------
// IOFSwitch: mocked methods
@Override
public void write(OFMessage m, FloodlightContext bc) throws IOException {
assertNull("write() called but already have message", writtenMessage);
assertNull("write() called but already have context", writtenContext);
writtenContext = bc;
writtenMessage = m;
}
//-------------------------------------------------------
// IOFSwitch: not-implemented methods
@Override
public void write(List<OFMessage> msglist, FloodlightContext bc)
throws IOException {
assertTrue("Unexpected method call", false);
}
@Override
public void disconnectOutputStream() {
assertTrue("Unexpected method call", false);
}
@Override
public Channel getChannel() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public OFFeaturesReply getFeaturesReply() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public void setFeaturesReply(OFFeaturesReply featuresReply) {
assertTrue("Unexpected method call", false);
}
@Override
public void setSwitchProperties(OFDescriptionStatistics description) {
assertTrue("Unexpected method call", false);
// TODO Auto-generated method stub
}
@Override
public Collection<OFPhysicalPort> getEnabledPorts() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public Collection<Short> getEnabledPortNumbers() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public OFPhysicalPort getPort(short portNumber) {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public OFPhysicalPort getPort(String portName) {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public void setPort(OFPhysicalPort port) {
assertTrue("Unexpected method call", false);
}
@Override
public void deletePort(short portNumber) {
assertTrue("Unexpected method call", false);
}
@Override
public void deletePort(String portName) {
assertTrue("Unexpected method call", false);
}
@Override
public Collection<OFPhysicalPort> getPorts() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public boolean portEnabled(short portName) {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public boolean portEnabled(String portName) {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public boolean portEnabled(OFPhysicalPort port) {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public long getId() {
assertTrue("Unexpected method call", false);
return 0;
}
@Override
public String getStringId() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public Map<Object, Object> getAttributes() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public Date getConnectedSince() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public int getNextTransactionId() {
assertTrue("Unexpected method call", false);
return 0;
}
@Override
public Future<List<OFStatistics>>
getStatistics(OFStatisticsRequest request) throws IOException {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public boolean isConnected() {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public void setConnected(boolean connected) {
assertTrue("Unexpected method call", false);
}
@Override
public Role getRole() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public boolean isActive() {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public void deliverStatisticsReply(OFMessage reply) {
assertTrue("Unexpected method call", false);
}
@Override
public void cancelStatisticsReply(int transactionId) {
assertTrue("Unexpected method call", false);
}
@Override
public void cancelAllStatisticsReplies() {
assertTrue("Unexpected method call", false);
}
@Override
public boolean hasAttribute(String name) {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public Object getAttribute(String name) {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public void setAttribute(String name, Object value) {
assertTrue("Unexpected method call", false);
}
@Override
public Object removeAttribute(String name) {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public void clearAllFlowMods() {
assertTrue("Unexpected method call", false);
}
@Override
public boolean updateBroadcastCache(Long entry, Short port) {
assertTrue("Unexpected method call", false);
return false;
}
@Override
public Map<Short, Long> getPortBroadcastHits() {
assertTrue("Unexpected method call", false);
return null;
}
@Override
public void sendStatsQuery(OFStatisticsRequest request, int xid,
IOFMessageListener caller)
throws IOException {
assertTrue("Unexpected method call", false);
}
@Override
public void flush() {
assertTrue("Unexpected method call", false);
}
}
\ No newline at end of file
package net.floodlightcontroller.util;
import static org.junit.Assert.*;
import net.floodlightcontroller.core.FloodlightContext;
import org.junit.Before;
import org.junit.Test;
import org.openflow.protocol.OFEchoRequest;
import org.openflow.protocol.OFHello;
import org.openflow.protocol.OFMessage;
import org.openflow.protocol.OFType;
import org.openflow.protocol.factory.BasicFactory;
import org.openflow.protocol.factory.OFMessageFactory;
import java.io.IOException;
import java.util.EnumSet;
public class OFMessageDamperTest {
OFMessageFactory factory;
OFMessageDamper damper;
FloodlightContext cntx;
OFMessageDamperMockSwitch sw1;
OFMessageDamperMockSwitch sw2;
OFEchoRequest echoRequst1;
OFEchoRequest echoRequst1Clone;
OFEchoRequest echoRequst2;
OFHello hello1;
OFHello hello2;
@Before
public void setUp() throws IOException {
factory = new BasicFactory();
cntx = new FloodlightContext();
sw1 = new OFMessageDamperMockSwitch();
sw2 = new OFMessageDamperMockSwitch();
echoRequst1 = (OFEchoRequest)factory.getMessage(OFType.ECHO_REQUEST);
echoRequst1.setPayload(new byte[] { 1 });
echoRequst1Clone = (OFEchoRequest)
factory.getMessage(OFType.ECHO_REQUEST);
echoRequst1Clone.setPayload(new byte[] { 1 });
echoRequst2 = (OFEchoRequest)factory.getMessage(OFType.ECHO_REQUEST);
echoRequst2.setPayload(new byte[] { 2 });
hello1 = (OFHello)factory.getMessage(OFType.HELLO);
hello1.setXid(1);
hello2 = (OFHello)factory.getMessage(OFType.HELLO);
hello2.setXid(2);
}
protected void doWrite(boolean expectWrite,
OFMessageDamperMockSwitch sw,
OFMessage msg,
FloodlightContext cntx) throws IOException {
boolean result;
sw.reset();
result = damper.write(sw, msg, cntx);
if (expectWrite) {
assertEquals(true, result);
sw.assertMessageWasWritten(msg, cntx);
} else {
assertEquals(false, result);
sw.assertNoMessageWritten();
}
}
@Test
public void testOneMessageType() throws IOException, InterruptedException {
int timeout = 50;
int sleepTime = 60;
damper = new OFMessageDamper(100,
EnumSet.of(OFType.ECHO_REQUEST),
timeout);
// echo requests should be dampened
doWrite(true, sw1, echoRequst1, cntx);
doWrite(false, sw1, echoRequst1, cntx);
doWrite(false, sw1, echoRequst1Clone, cntx);
doWrite(true, sw1, echoRequst2, cntx);
doWrite(false, sw1, echoRequst2, cntx);
// we don't dampen hellos. All should succeed
doWrite(true, sw1, hello1, cntx);
doWrite(true, sw1, hello1, cntx);
doWrite(true, sw1, hello1, cntx);
// echo request should also be dampened on sw2
doWrite(true, sw2, echoRequst1, cntx);
doWrite(false, sw2, echoRequst1, cntx);
doWrite(true, sw2, echoRequst2, cntx);
Thread.sleep(sleepTime);
doWrite(true, sw1, echoRequst1, cntx);
doWrite(true, sw2, echoRequst1, cntx);
}
@Test
public void testTwoMessageTypes() throws IOException, InterruptedException {
int timeout = 50;
int sleepTime = 60;
damper = new OFMessageDamper(100,
EnumSet.of(OFType.ECHO_REQUEST,
OFType.HELLO),
timeout);
// echo requests should be dampened
doWrite(true, sw1, echoRequst1, cntx);
doWrite(false, sw1, echoRequst1, cntx);
doWrite(false, sw1, echoRequst1Clone, cntx);
doWrite(true, sw1, echoRequst2, cntx);
doWrite(false, sw1, echoRequst2, cntx);
// hello should be dampened as well
doWrite(true, sw1, hello1, cntx);
doWrite(false, sw1, hello1, cntx);
doWrite(false, sw1, hello1, cntx);
doWrite(true, sw1, hello2, cntx);
doWrite(false, sw1, hello2, cntx);
doWrite(false, sw1, hello2, cntx);
// echo request should also be dampened on sw2
doWrite(true, sw2, echoRequst1, cntx);
doWrite(false, sw2, echoRequst1, cntx);
doWrite(true, sw2, echoRequst2, cntx);
Thread.sleep(sleepTime);
doWrite(true, sw1, echoRequst1, cntx);
doWrite(true, sw2, echoRequst1, cntx);
doWrite(true, sw1, hello1, cntx);
doWrite(true, sw1, hello2, cntx);
}
}
package net.floodlightcontroller.util;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
public class TimedCacheTest {
public static class CacheEntry {
public int key;
public CacheEntry(int key) {
this.key = key;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + key;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
CacheEntry other = (CacheEntry) obj;
if (key != other.key)
return false;
return true;
}
}
protected TimedCache<CacheEntry> cache;
@Before
public void setUp() {
//
}
@Test
public void testCaching() throws InterruptedException {
int timeout = 50;
int timeToSleep = 60;
cache = new TimedCache<TimedCacheTest.CacheEntry>(100, timeout);
CacheEntry e1a = new CacheEntry(1);
CacheEntry e1b = new CacheEntry(1);
CacheEntry e1c = new CacheEntry(1);
CacheEntry e2 = new CacheEntry(2);
assertEquals(false, cache.update(e1a));
assertEquals(true, cache.update(e1a));
assertEquals(true, cache.update(e1b));
assertEquals(true, cache.update(e1c));
assertEquals(false, cache.update(e2));
assertEquals(true, cache.update(e2));
Thread.sleep(timeToSleep);
assertEquals(false, cache.update(e1a));
assertEquals(false, cache.update(e2));
}
@Test
public void testCapacity() throws InterruptedException {
int timeout = 5000;
cache = new TimedCache<TimedCacheTest.CacheEntry>(2, timeout);
// Testing the capacity is tricky since the capacity can be
// exceeded for short amounts of time, so we try to flood the cache
// to make sure the first entry is expired
CacheEntry e1 = new CacheEntry(1);
for (int i=0; i < 100; i++) {
CacheEntry e = new CacheEntry(i);
cache.update(e);
}
// entry 1 should have been expired due to capacity limits
assertEquals(false, cache.update(e1));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment