diff --git a/src/main/java/org/sdnplatform/sync/IStoreListener.java b/src/main/java/org/sdnplatform/sync/IStoreListener.java index 9bfe53dfd9b24960696a47d5afb4bf2b6182f05f..48a12f25d025356730be7a13741356c8e619b6fb 100644 --- a/src/main/java/org/sdnplatform/sync/IStoreListener.java +++ b/src/main/java/org/sdnplatform/sync/IStoreListener.java @@ -2,9 +2,33 @@ package org.sdnplatform.sync; import java.util.Iterator; +/** + * A listener interface that will receive updates on a particular store + * @author readams + * @param <K> the key type for the store + */ public interface IStoreListener<K> { + /** + * The origin of the update + * @author readams + */ + public enum UpdateType { + /** + * An update that originated from a write to the local store + */ + LOCAL, + /** + * An update that originated from a value synchronized from a remote + * node. Note that it is still possible that this includes only + * information that originated from the current node. + */ + REMOTE + }; + /** * Called when keys in the store are modified or deleted. + * @param type the type of the update + * @see UpdateType */ - public void keysModified(Iterator<K> keys); + public void keysModified(Iterator<K> keys, UpdateType type); } diff --git a/src/main/java/org/sdnplatform/sync/internal/store/ListenerStorageEngine.java b/src/main/java/org/sdnplatform/sync/internal/store/ListenerStorageEngine.java index 3bfd07b01144553838aceaeb1beb335b86461873..56ced90b80300c668e2bc40114ac64722f5b2062 100644 --- a/src/main/java/org/sdnplatform/sync/internal/store/ListenerStorageEngine.java +++ b/src/main/java/org/sdnplatform/sync/internal/store/ListenerStorageEngine.java @@ -11,6 +11,7 @@ import net.floodlightcontroller.debugcounter.IDebugCounterService; import org.sdnplatform.sync.IClosableIterator; import org.sdnplatform.sync.IVersion; import org.sdnplatform.sync.Versioned; +import org.sdnplatform.sync.IStoreListener.UpdateType; import org.sdnplatform.sync.error.SyncException; import org.sdnplatform.sync.internal.SyncManager; import org.sdnplatform.sync.internal.util.ByteArray; @@ -73,7 +74,7 @@ public class ListenerStorageEngine throws SyncException { updateCounter(SyncManager.COUNTER_PUTS); localStorage.put(key, value); - notifyListeners(key); + notifyListeners(key, UpdateType.LOCAL); } @Override @@ -105,7 +106,7 @@ public class ListenerStorageEngine public boolean writeSyncValue(ByteArray key, Iterable<Versioned<byte[]>> values) { boolean r = localStorage.writeSyncValue(key, values); - if (r) notifyListeners(key); + if (r) notifyListeners(key, UpdateType.REMOTE); return r; } @@ -132,13 +133,13 @@ public class ListenerStorageEngine listeners.add(listener); } - protected void notifyListeners(ByteArray key) { - notifyListeners(Collections.singleton(key).iterator()); + protected void notifyListeners(ByteArray key, UpdateType type) { + notifyListeners(Collections.singleton(key).iterator(), type); } - protected void notifyListeners(Iterator<ByteArray> keys) { + protected void notifyListeners(Iterator<ByteArray> keys, UpdateType type) { for (MappingStoreListener msl : listeners) { - msl.notify(keys); + msl.notify(keys, type); } } diff --git a/src/main/java/org/sdnplatform/sync/internal/store/MappingStoreListener.java b/src/main/java/org/sdnplatform/sync/internal/store/MappingStoreListener.java index 3af70ccd7480a6e00e7a3e9f7374d1700e86f4eb..920c60f421e783689d6424e699bbef8d4f68ff3b 100644 --- a/src/main/java/org/sdnplatform/sync/internal/store/MappingStoreListener.java +++ b/src/main/java/org/sdnplatform/sync/internal/store/MappingStoreListener.java @@ -4,6 +4,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import org.sdnplatform.sync.IStoreListener; +import org.sdnplatform.sync.IStoreListener.UpdateType; import org.sdnplatform.sync.internal.util.ByteArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +33,8 @@ public class MappingStoreListener { this.listener = listener; } - public void notify(Iterator<ByteArray> keys) { - listener.keysModified(new MappingIterator(keys)); + public void notify(Iterator<ByteArray> keys, UpdateType type) { + listener.keysModified(new MappingIterator(keys), type); } class MappingIterator implements Iterator { diff --git a/src/test/java/org/sdnplatform/sync/internal/SyncManagerTest.java b/src/test/java/org/sdnplatform/sync/internal/SyncManagerTest.java index 3d4b6d11d4d4e0b970c02f0d81fe6a19638e6987..9f0a56bdf100f398c8ab44526006a7f83f1b028b 100644 --- a/src/test/java/org/sdnplatform/sync/internal/SyncManagerTest.java +++ b/src/test/java/org/sdnplatform/sync/internal/SyncManagerTest.java @@ -28,6 +28,7 @@ import org.sdnplatform.sync.IClosableIterator; import org.sdnplatform.sync.IInconsistencyResolver; import org.sdnplatform.sync.IStoreClient; import org.sdnplatform.sync.IStoreListener; +import org.sdnplatform.sync.IStoreListener.UpdateType; import org.sdnplatform.sync.ISyncService; import org.sdnplatform.sync.Versioned; import org.sdnplatform.sync.ISyncService.Scope; @@ -396,13 +397,53 @@ public class SyncManagerTest { } } + protected class Update { + String key; + UpdateType type; + + public Update(String key, UpdateType type) { + super(); + this.key = key; + this.type = type; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime * result + ((key == null) ? 0 : key.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null) return false; + if (getClass() != obj.getClass()) return false; + Update other = (Update) obj; + if (!getOuterType().equals(other.getOuterType())) return false; + if (key == null) { + if (other.key != null) return false; + } else if (!key.equals(other.key)) return false; + if (type != other.type) return false; + return true; + } + + private SyncManagerTest getOuterType() { + return SyncManagerTest.this; + } + } + protected class TestListener implements IStoreListener<String> { - HashSet<String> notified = new HashSet<String>(); + HashSet<Update> notified = new HashSet<Update>(); @Override - public void keysModified(Iterator<String> keys) { + public void keysModified(Iterator<String> keys, + UpdateType type) { while (keys.hasNext()) - notified.add(keys.next()); + notified.add(new Update(keys.next(), type)); } } @@ -438,15 +479,21 @@ public class SyncManagerTest { client0.put("test0", "value"); client2.put("test2", "value"); - HashSet<String> c = new HashSet<String>(); - c.add("test0"); - c.add("test2"); + HashSet<Update> c0 = new HashSet<Update>(); + c0.add(new Update("test0", UpdateType.LOCAL)); + c0.add(new Update("test2", UpdateType.REMOTE)); + HashSet<Update> c2 = new HashSet<Update>(); + c2.add(new Update("test0", UpdateType.REMOTE)); + c2.add(new Update("test2", UpdateType.LOCAL)); - waitForNotify(t0, c, 2000); - waitForNotify(t2, c, 2000); + waitForNotify(t0, c0, 2000); + waitForNotify(t2, c2, 2000); assertEquals(2, t0.notified.size()); assertEquals(2, t2.notified.size()); + t0.notified.clear(); + t2.notified.clear(); + Versioned<String> v0 = client0.get("test0"); v0.setValue("newvalue"); client0.put("test0", v0); @@ -455,8 +502,8 @@ public class SyncManagerTest { v2.setValue("newvalue"); client2.put("test2", v2); - waitForNotify(t0, c, 2000); - waitForNotify(t2, c, 2000); + waitForNotify(t0, c0, 2000); + waitForNotify(t2, c2, 2000); assertEquals(2, t0.notified.size()); assertEquals(2, t2.notified.size()); @@ -466,8 +513,8 @@ public class SyncManagerTest { client0.delete("test0"); client2.delete("test2"); - waitForNotify(t0, c, 2000); - waitForNotify(t2, c, 2000); + waitForNotify(t0, c0, 2000); + waitForNotify(t2, c2, 2000); assertEquals(2, t0.notified.size()); assertEquals(2, t2.notified.size()); }