Skip to content
Snippets Groups Projects
Commit b6647d02 authored by Rob Adams's avatar Rob Adams
Browse files

Add parameter to IStoreListener to allow differentiating local vs. remote updates

parent de5be349
No related branches found
No related tags found
No related merge requests found
......@@ -2,9 +2,33 @@ package org.sdnplatform.sync;
import java.util.Iterator;
/**
* A listener interface that will receive updates on a particular store
* @author readams
* @param <K> the key type for the store
*/
public interface IStoreListener<K> {
/**
* The origin of the update
* @author readams
*/
public enum UpdateType {
/**
* An update that originated from a write to the local store
*/
LOCAL,
/**
* An update that originated from a value synchronized from a remote
* node. Note that it is still possible that this includes only
* information that originated from the current node.
*/
REMOTE
};
/**
* Called when keys in the store are modified or deleted.
* @param type the type of the update
* @see UpdateType
*/
public void keysModified(Iterator<K> keys);
public void keysModified(Iterator<K> keys, UpdateType type);
}
......@@ -11,6 +11,7 @@ import net.floodlightcontroller.debugcounter.IDebugCounterService;
import org.sdnplatform.sync.IClosableIterator;
import org.sdnplatform.sync.IVersion;
import org.sdnplatform.sync.Versioned;
import org.sdnplatform.sync.IStoreListener.UpdateType;
import org.sdnplatform.sync.error.SyncException;
import org.sdnplatform.sync.internal.SyncManager;
import org.sdnplatform.sync.internal.util.ByteArray;
......@@ -73,7 +74,7 @@ public class ListenerStorageEngine
throws SyncException {
updateCounter(SyncManager.COUNTER_PUTS);
localStorage.put(key, value);
notifyListeners(key);
notifyListeners(key, UpdateType.LOCAL);
}
@Override
......@@ -105,7 +106,7 @@ public class ListenerStorageEngine
public boolean writeSyncValue(ByteArray key,
Iterable<Versioned<byte[]>> values) {
boolean r = localStorage.writeSyncValue(key, values);
if (r) notifyListeners(key);
if (r) notifyListeners(key, UpdateType.REMOTE);
return r;
}
......@@ -132,13 +133,13 @@ public class ListenerStorageEngine
listeners.add(listener);
}
protected void notifyListeners(ByteArray key) {
notifyListeners(Collections.singleton(key).iterator());
protected void notifyListeners(ByteArray key, UpdateType type) {
notifyListeners(Collections.singleton(key).iterator(), type);
}
protected void notifyListeners(Iterator<ByteArray> keys) {
protected void notifyListeners(Iterator<ByteArray> keys, UpdateType type) {
for (MappingStoreListener msl : listeners) {
msl.notify(keys);
msl.notify(keys, type);
}
}
......
......@@ -4,6 +4,7 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import org.sdnplatform.sync.IStoreListener;
import org.sdnplatform.sync.IStoreListener.UpdateType;
import org.sdnplatform.sync.internal.util.ByteArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -32,8 +33,8 @@ public class MappingStoreListener {
this.listener = listener;
}
public void notify(Iterator<ByteArray> keys) {
listener.keysModified(new MappingIterator(keys));
public void notify(Iterator<ByteArray> keys, UpdateType type) {
listener.keysModified(new MappingIterator(keys), type);
}
class MappingIterator implements Iterator {
......
......@@ -28,6 +28,7 @@ import org.sdnplatform.sync.IClosableIterator;
import org.sdnplatform.sync.IInconsistencyResolver;
import org.sdnplatform.sync.IStoreClient;
import org.sdnplatform.sync.IStoreListener;
import org.sdnplatform.sync.IStoreListener.UpdateType;
import org.sdnplatform.sync.ISyncService;
import org.sdnplatform.sync.Versioned;
import org.sdnplatform.sync.ISyncService.Scope;
......@@ -396,13 +397,53 @@ public class SyncManagerTest {
}
}
protected class Update {
String key;
UpdateType type;
public Update(String key, UpdateType type) {
super();
this.key = key;
this.type = type;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getOuterType().hashCode();
result = prime * result + ((key == null) ? 0 : key.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass()) return false;
Update other = (Update) obj;
if (!getOuterType().equals(other.getOuterType())) return false;
if (key == null) {
if (other.key != null) return false;
} else if (!key.equals(other.key)) return false;
if (type != other.type) return false;
return true;
}
private SyncManagerTest getOuterType() {
return SyncManagerTest.this;
}
}
protected class TestListener implements IStoreListener<String> {
HashSet<String> notified = new HashSet<String>();
HashSet<Update> notified = new HashSet<Update>();
@Override
public void keysModified(Iterator<String> keys) {
public void keysModified(Iterator<String> keys,
UpdateType type) {
while (keys.hasNext())
notified.add(keys.next());
notified.add(new Update(keys.next(), type));
}
}
......@@ -438,15 +479,21 @@ public class SyncManagerTest {
client0.put("test0", "value");
client2.put("test2", "value");
HashSet<String> c = new HashSet<String>();
c.add("test0");
c.add("test2");
HashSet<Update> c0 = new HashSet<Update>();
c0.add(new Update("test0", UpdateType.LOCAL));
c0.add(new Update("test2", UpdateType.REMOTE));
HashSet<Update> c2 = new HashSet<Update>();
c2.add(new Update("test0", UpdateType.REMOTE));
c2.add(new Update("test2", UpdateType.LOCAL));
waitForNotify(t0, c, 2000);
waitForNotify(t2, c, 2000);
waitForNotify(t0, c0, 2000);
waitForNotify(t2, c2, 2000);
assertEquals(2, t0.notified.size());
assertEquals(2, t2.notified.size());
t0.notified.clear();
t2.notified.clear();
Versioned<String> v0 = client0.get("test0");
v0.setValue("newvalue");
client0.put("test0", v0);
......@@ -455,8 +502,8 @@ public class SyncManagerTest {
v2.setValue("newvalue");
client2.put("test2", v2);
waitForNotify(t0, c, 2000);
waitForNotify(t2, c, 2000);
waitForNotify(t0, c0, 2000);
waitForNotify(t2, c2, 2000);
assertEquals(2, t0.notified.size());
assertEquals(2, t2.notified.size());
......@@ -466,8 +513,8 @@ public class SyncManagerTest {
client0.delete("test0");
client2.delete("test2");
waitForNotify(t0, c, 2000);
waitForNotify(t2, c, 2000);
waitForNotify(t0, c0, 2000);
waitForNotify(t2, c2, 2000);
assertEquals(2, t0.notified.size());
assertEquals(2, t2.notified.size());
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment