From d369945246573381841b5b52c60e7befafb1bb8f Mon Sep 17 00:00:00 2001
From: Ryan Izard <rizard@g.clemson.edu>
Date: Fri, 18 Dec 2015 14:37:38 -0500
Subject: [PATCH] Fixed a blunder in OFSwitchManager and Controller to allow
 switches to connect if a server IP isn't explicitly given in
 floodlihgtdefault.properties. Still can't get sync's bootstrap test to pass.
 Feels like a race condition b/t update threads.

---
 .../core/internal/Controller.java             |    4 +-
 .../core/internal/OFSwitchManager.java        |    2 +-
 .../sync/internal/SyncManager.java            | 1477 +++++++++--------
 .../sync/internal/rpc/RPCChannelHandler.java  |   38 +-
 .../internal/rpc/RPCChannelInitializer.java   |    7 +-
 .../sync/internal/rpc/RPCService.java         |   54 +-
 .../sync/internal/BootstrapTest.java          |    4 +
 7 files changed, 802 insertions(+), 784 deletions(-)

diff --git a/src/main/java/net/floodlightcontroller/core/internal/Controller.java b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
index 4e27cb95f..fa8194649 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/Controller.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/Controller.java
@@ -698,7 +698,7 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
         log.info("Number of worker threads set to {}", this.workerThreads);
         
         String addresses = configParams.get("openFlowAddresses");
-        if (!Strings.isNullOrEmpty(ofPort)) {
+        if (!Strings.isNullOrEmpty(addresses)) {
             try {
                 openFlowAddresses = Collections.singleton(IPv4Address.of(addresses)); //TODO support list of addresses for multi-honed controllers
             } catch (Exception e) {
@@ -706,6 +706,8 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
                 throw new FloodlightModuleException("Invalid OpenFlow address of " + addresses + " in config");
             }
             log.info("OpenFlow addresses set to {}", openFlowAddresses);
+        } else {
+        	openFlowAddresses.add(IPv4Address.NONE);
         }
     }
 
diff --git a/src/main/java/net/floodlightcontroller/core/internal/OFSwitchManager.java b/src/main/java/net/floodlightcontroller/core/internal/OFSwitchManager.java
index e0f30158c..9b2285153 100644
--- a/src/main/java/net/floodlightcontroller/core/internal/OFSwitchManager.java
+++ b/src/main/java/net/floodlightcontroller/core/internal/OFSwitchManager.java
@@ -1009,7 +1009,7 @@ public class OFSwitchManager implements IOFSwitchManager, INewOFConnectionListen
 
 			Set<InetSocketAddress> addrs = new HashSet<InetSocketAddress>();
 			if (floodlightProvider.getOFAddresses().isEmpty()) {
-				cg.add(bootstrap.bind(addrs.iterator().next()).channel());
+				cg.add(bootstrap.bind(new InetSocketAddress(InetAddress.getByAddress(IPv4Address.NONE.getBytes()), floodlightProvider.getOFPort().getPort())).channel());
 			} else {
 				for (IPv4Address ip : floodlightProvider.getOFAddresses()) {
 					addrs.add(new InetSocketAddress(InetAddress.getByAddress(ip.getBytes()), floodlightProvider.getOFPort().getPort()));
diff --git a/src/main/java/org/sdnplatform/sync/internal/SyncManager.java b/src/main/java/org/sdnplatform/sync/internal/SyncManager.java
index 9db240c9c..540fa42b1 100644
--- a/src/main/java/org/sdnplatform/sync/internal/SyncManager.java
+++ b/src/main/java/org/sdnplatform/sync/internal/SyncManager.java
@@ -1,5 +1,8 @@
 package org.sdnplatform.sync.internal;
 
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -68,737 +71,745 @@ import net.floodlightcontroller.threadpool.IThreadPoolService;
  * @see ISyncService
  */
 public class SyncManager extends AbstractSyncManager {
-    protected static final Logger logger =
-            LoggerFactory.getLogger(SyncManager.class.getName());
-
-    protected IThreadPoolService threadPool;
-    protected IDebugCounterService debugCounter;
-
-    /**
-     * The store registry holds the storage engines that provide
-     * access to the data
-     */
-    private StoreRegistry storeRegistry = null;
-
-    private IClusterConfigProvider clusterConfigProvider;
-    private ClusterConfig clusterConfig = new ClusterConfig();
-
-    protected RPCService rpcService = null;
-
-    /**
-     * Interval between cleanup tasks in seconds
-     */
-    private static final int CLEANUP_INTERVAL = 60 * 60;
-
-    /**
-     * Interval between antientropy tasks in seconds
-     */
-    private static final int ANTIENTROPY_INTERVAL = 5 * 60;
-
-    /**
-     * Interval between configuration rescans
-     */
-    private static final int CONFIG_RESCAN_INTERVAL = 10;
-
-    /**
-     * Task for performing periodic maintenance/cleanup on local stores
-     */
-    private SingletonTask cleanupTask;
-
-    /**
-     * Task for periodic antientropy between nodes
-     */
-    private SingletonTask antientropyTask;
-
-    /**
-     * Task to periodically rescan configuration
-     */
-    private SingletonTask updateConfigTask;
-
-    /**
-     * Number of {@link HintWorker} workers used to drain the queue of writes
-     * that need to be sent to the connected nodes
-     */
-    private static final int SYNC_WORKER_POOL = 2;
-
-    /**
-     * A thread pool for the {@link HintWorker} threads.
-     */
-    private ExecutorService hintThreadPool;
-
-    /**
-     * Random number generator
-     */
-    private final Random random = new Random();
-
-    /**
-     * A map of the currently-allocated cursors
-     */
-    private final Map<Integer, Cursor> cursorMap =
-            new ConcurrentHashMap<Integer, Cursor>();
-
-    /**
-     * Whether to allow persistent stores or to use in-memory even
-     * when persistence is requested
-     */
-    private boolean persistenceEnabled = true;
-
-    private static final String PACKAGE =
-            ISyncService.class.getPackage().getName();
-
-    /**
-     * Debug Counters
-     */
-    public static IDebugCounter counterHints;
-    public static IDebugCounter counterSentValues;
-    public static IDebugCounter counterReceivedValues;
-    public static IDebugCounter counterPuts;
-    public static IDebugCounter counterGets;
-    public static IDebugCounter counterIterators;
-    public static IDebugCounter counterErrorRemote;
-    public static IDebugCounter counterErrorProcessing;
-
-    // ************
-    // ISyncService
-    // ************
-
-    @Override
-    public void registerStore(String storeName, Scope scope) {
-        try {
-            storeRegistry.register(storeName, scope, false);
-        } catch (PersistException e) {
-            // not possible
-            throw new SyncRuntimeException(e);
-        }
-    }
-
-    @Override
-    public void registerPersistentStore(String storeName, Scope scope)
-            throws PersistException {
-        storeRegistry.register(storeName, scope, persistenceEnabled);
-    }
-
-    // **************************
-    // SyncManager public methods
-    // **************************
-
-    /**
-     * Get the cluster configuration object
-     * @return the {@link ClusterConfig} object
-     * @see ClusterConfig
-     */
-    public ClusterConfig getClusterConfig() {
-        return clusterConfig;
-    }
-
-    /**
-     * Perform periodic scheduled cleanup.  Note that this will be called
-     * automatically and you shouldn't generally call it directly except for
-     * testing
-     * @throws SyncException
-     */
-    public void cleanup() throws SyncException {
-        for (SynchronizingStorageEngine store : storeRegistry.values()) {
-            store.cleanupTask();
-        }
-    }
-
-    /**
-     * Perform a synchronization with the node specified
-     */
-    public void antientropy(Node node) {
-        if (!rpcService.isConnected(node.getNodeId())) return;
-
-        logger.info("[{}->{}] Synchronizing local state to remote node",
-                    getLocalNodeId(), node.getNodeId());
-
-        for (SynchronizingStorageEngine store : storeRegistry.values()) {
-            if (Scope.LOCAL.equals(store.getScope())) {
-                if (node.getDomainId() !=
-                        getClusterConfig().getNode().getDomainId())
-                    continue;
-            } else if (Scope.UNSYNCHRONIZED.equals(store.getScope())) {
-                continue;
-            }
-
-            IClosableIterator<Entry<ByteArray,
-                                  List<Versioned<byte[]>>>> entries =
-                    store.entries();
-            try {
-                SyncMessage bsm =
-                        TProtocolUtil.getTSyncOfferMessage(store.getName(),
-                                                           store.getScope(),
-                                                           store.isPersistent());
-                int count = 0;
-                while (entries.hasNext()) {
-                    if (!rpcService.isConnected(node.getNodeId())) return;
-
-                    Entry<ByteArray, List<Versioned<byte[]>>> pair =
-                            entries.next();
-                    KeyedVersions kv =
-                            TProtocolUtil.getTKeyedVersions(pair.getKey(),
-                                                            pair.getValue());
-                    bsm.getSyncOffer().addToVersions(kv);
-                    count += 1;
-                    if (count >= 50) {
-                        sendSyncOffer(node.getNodeId(), bsm);
-                        bsm.getSyncOffer().unsetVersions();
-                        count = 0;
-                    }
-                }
-                sendSyncOffer(node.getNodeId(), bsm);
-            } catch (InterruptedException e) {
-                // This can't really happen
-                throw new RuntimeException(e);
-            } finally {
-                entries.close();
-            }
-        }
-    }
-
-    /**
-     * Communicate with a random node and do a full synchronization of the
-     * all the stores on each node that have the appropriate scope.
-     */
-    public void antientropy() {
-        ArrayList<Node> candidates = new ArrayList<Node>();
-        for (Node n : clusterConfig.getNodes())
-            if (rpcService.isConnected(n.getNodeId()))
-                candidates.add(n);
-
-        int numNodes = candidates.size();
-        if (numNodes == 0) return;
-        Node[] nodes = candidates.toArray(new Node[numNodes]);
-        int rn = random.nextInt(numNodes);
-        antientropy(nodes[rn]);
-    }
-
-    /**
-     * Write a value synchronized from another node, bypassing some of the
-     * usual logic when a client writes data.  If the store is not known,
-     * this will automatically register it
-     * @param storeName the store name
-     * @param scope the scope for the store
-     * @param persist TODO
-     * @param key the key to write
-     * @param values a list of versions for the key to write
-     * @throws PersistException
-     */
-    public void writeSyncValue(String storeName, Scope scope,
-                               boolean persist,
-                               byte[] key, Iterable<Versioned<byte[]>> values)
-                                       throws PersistException {
-        SynchronizingStorageEngine store = storeRegistry.get(storeName);
-        if (store == null) {
-            store = storeRegistry.register(storeName, scope, persist);
-        }
-        store.writeSyncValue(new ByteArray(key), values);
-    }
-
-    /**
-     * Check whether any of the specified versions for the key are not older
-     * than the versions we already have
-     * @param storeName the store to check
-     * @param key the key to check
-     * @param versions an iterable over the versions
-     * @return true if we'd like a copy of the data indicated
-     * @throws SyncException
-     */
-    public boolean handleSyncOffer(String storeName,
-                                   byte[] key,
-                                   Iterable<VectorClock> versions)
-                                           throws SyncException {
-        SynchronizingStorageEngine store = storeRegistry.get(storeName);
-        if (store == null) return true;
-
-        List<Versioned<byte[]>> values = store.get(new ByteArray(key));
-        if (values == null || values.size() == 0) return true;
-
-        // check whether any of the versions are not older than what we have
-        for (VectorClock vc : versions) {
-            for (Versioned<byte[]> value : values) {
-                VectorClock existingVc = (VectorClock)value.getVersion();
-                if (!vc.compare(existingVc).equals(Occurred.BEFORE))
-                    return true;
-            }
-        }
-
-        return false;
-    }
-
-    /**
-     * Get access to the raw storage engine.  This is useful for some
-     * on-the-wire communication
-     * @param storeName the store name to get
-     * @return the {@link IStorageEngine}
-     * @throws UnknownStoreException
-     */
-    public IStorageEngine<ByteArray, byte[]> getRawStore(String storeName)
-            throws UnknownStoreException {
-        return getStoreInternal(storeName);
-    }
-
-    /**
-     * Return the threadpool
-     * @return the {@link IThreadPoolService}
-     */
-    public IThreadPoolService getThreadPool() {
-        return threadPool;
-    }
-
-    /**
-     * Queue a synchronization of the specified {@link KeyedValues} to all nodes
-     * assocatiated with the storage engine specified
-     * @param e the storage engine for the values
-     * @param kv the values to synchronize
-     */
-    public void queueSyncTask(SynchronizingStorageEngine e,
-                              ByteArray key, Versioned<byte[]> value) {
-        storeRegistry.queueHint(e.getName(), key, value);
-    }
-
-    @Override
-    public void addListener(String storeName, MappingStoreListener listener)
-            throws UnknownStoreException {
-        SynchronizingStorageEngine store = getStoreInternal(storeName);
-        store.addListener(listener);
-    }
-
-    /**
-     * Update the node configuration to add or remove nodes
-     * @throws FloodlightModuleException
-     */
-    public void updateConfiguration() {
-        if (updateConfigTask != null)
-            updateConfigTask.reschedule(500, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Retrieve the cursor, if any, for the given cursor ID
-     * @param cursorId the cursor ID
-     * @return the {@link Cursor}
-     */
-    public Cursor getCursor(int cursorId) {
-        return cursorMap.get(Integer.valueOf(cursorId));
-    }
-
-    /**
-     * Allocate a new cursor for the given store name
-     * @param storeName the store name
-     * @return the {@link Cursor}
-     * @throws SyncException
-     */
-    public Cursor newCursor(String storeName) throws UnknownStoreException {
-        IStore<ByteArray, byte[]> store = getStore(storeName);
-        int cursorId = rpcService.getTransactionId();
-        Cursor cursor = new Cursor(cursorId, store.entries());
-        cursorMap.put(Integer.valueOf(cursorId), cursor);
-        return cursor;
-    }
-
-    /**
-     * Close the given cursor and remove it from the map
-     * @param cursor the cursor to close
-     */
-    public void closeCursor(Cursor cursor) {
-        cursor.close();
-        cursorMap.remove(Integer.valueOf(cursor.getCursorId()));
-    }
-
-    // *******************
-    // AbstractSyncManager
-    // *******************
-
-    @Override
-    public IStore<ByteArray,byte[]> getStore(String storeName)
-            throws UnknownStoreException {
-        return getRawStore(storeName);
-    }
-
-    @Override
-    public short getLocalNodeId() {
-        Node l = clusterConfig.getNode();
-        if (l == null) return Short.MAX_VALUE;
-        return l.getNodeId();
-    }
-
-    @Override
-    public void shutdown() {
-        logger.debug("Shutting down Sync Manager: {} {}",
-                     clusterConfig.getNode().getHostname(),
-                     clusterConfig.getNode().getPort());
-
-        if (rpcService != null) {
-            rpcService.shutdown();
-        }
-        if (hintThreadPool != null) {
-            hintThreadPool.shutdown();
-        }
-        if (storeRegistry != null) {
-            storeRegistry.shutdown();
-        }
-        hintThreadPool = null;
-        rpcService = null;
-    }
-
-    // *****************
-    // IFloodlightModule
-    // *****************
-
-    @Override
-    public void init(FloodlightModuleContext context)
-            throws FloodlightModuleException {
-        threadPool = context.getServiceImpl(IThreadPoolService.class);
-        debugCounter = context.getServiceImpl(IDebugCounterService.class);
-        Map<String, String> config = context.getConfigParams(this);
-        storeRegistry = new StoreRegistry(this, config.get("dbPath"));
-
-        String[] configProviders =
-             {PropertyCCProvider.class.getName(),
-              SyncStoreCCProvider.class.getName(),
-              StorageCCProvider.class.getName(),
-              FallbackCCProvider.class.getName()};
-        try {
-            if (config.containsKey("persistenceEnabled")) {
-                persistenceEnabled =
-                        Boolean.parseBoolean(config.get("persistenceEnabled"));
-            }
-            if (config.containsKey("configProviders")) {
-                configProviders = config.get("configProviders").split(",");
-            }
-            DelegatingCCProvider dprovider = new DelegatingCCProvider();
-            for (String configProvider : configProviders) {
-                Class<?> cClass = Class.forName(configProvider);
-                IClusterConfigProvider provider =
-                        (IClusterConfigProvider) cClass.newInstance();
-                dprovider.addProvider(provider);
-            }
-            dprovider.init(this, context);
-            clusterConfigProvider = dprovider;
-        } catch (Exception e) {
-            throw new FloodlightModuleException("Could not instantiate config" +
-                    "providers " + Arrays.toString(configProviders), e);
-        }
-
-        String manualStoreString = config.get("manualStores");
-        if (manualStoreString != null) {
-            List<String> manualStores = null;
-            try {
-                manualStores =
-                        (new ObjectMapper()).readValue(manualStoreString,
-                                         new TypeReference<List<String>>() {});
-            } catch (Exception e) {
-                throw new FloodlightModuleException("Failed to parse sync " +
-                        "manager manual stores: " + manualStoreString, e);
-            }
-            for (String s : manualStores) {
-                registerStore(s, Scope.GLOBAL);
-            }
-        }
-        registerDebugCounters(context);
-    }
-
-    private void registerDebugCounters(FloodlightModuleContext context)
-            throws FloodlightModuleException {
-    	if (context != null) {
-    		debugCounter.registerModule(PACKAGE);
-    		counterHints = debugCounter.registerCounter(PACKAGE, "hints",
-    				"Queued sync events processed");
-    		counterSentValues = debugCounter.registerCounter(PACKAGE, "sent-values",
-    				"Values synced to remote node");
-    		counterReceivedValues = debugCounter.registerCounter(PACKAGE, "received-values",
-    				"Values received from remote node");
-    		counterPuts = debugCounter.registerCounter(PACKAGE, "puts",
-    				"Local puts to store");
-    		counterGets = debugCounter.registerCounter(PACKAGE, "gets",
-    				"Local gets from store");
-    		counterIterators = debugCounter.registerCounter(PACKAGE, "iterators",
-    				"Local iterators created over store");
-    		counterErrorRemote = debugCounter.registerCounter(PACKAGE, "error-remote",
-    				"Number of errors sent from remote clients",
-    				IDebugCounterService.MetaData.ERROR);
-    		counterErrorProcessing = debugCounter.registerCounter(PACKAGE,
-    				"error-processing",
-    				"Number of errors processing messages from remote clients",
-    				IDebugCounterService.MetaData.ERROR);
-    	}
-
-    }
-
-    @Override
-    public void startUp(FloodlightModuleContext context)
-            throws FloodlightModuleException {
-
-        rpcService = new RPCService(this, debugCounter);
-
-        cleanupTask = new SingletonTask(threadPool.getScheduledExecutor(),
-                                        new CleanupTask());
-        cleanupTask.reschedule(CLEANUP_INTERVAL +
-                               random.nextInt(30), TimeUnit.SECONDS);
-
-        antientropyTask = new SingletonTask(threadPool.getScheduledExecutor(),
-                                       new AntientropyTask());
-        antientropyTask.reschedule(ANTIENTROPY_INTERVAL +
-                                   random.nextInt(30), TimeUnit.SECONDS);
-
-        final ThreadGroup tg = new ThreadGroup("Hint Workers");
-        tg.setMaxPriority(Thread.NORM_PRIORITY - 2);
-        ThreadFactory f = new ThreadFactory() {
-            AtomicInteger id = new AtomicInteger();
-
-            @Override
-            public Thread newThread(Runnable runnable) {
-                return new Thread(tg, runnable,
-                                  "HintWorker-" + id.getAndIncrement());
-            }
-        };
-        hintThreadPool = Executors.newCachedThreadPool(f);
-        for (int i = 0; i < SYNC_WORKER_POOL; i++) {
-            hintThreadPool.execute(new HintWorker());
-        }
-
-        doUpdateConfiguration();
-        rpcService.run();
-
-        updateConfigTask =
-                new SingletonTask(threadPool.getScheduledExecutor(),
-                                  new UpdateConfigTask());
-        updateConfigTask.reschedule(CONFIG_RESCAN_INTERVAL, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public Collection<Class<? extends IFloodlightService>>
-            getModuleDependencies() {
-        Collection<Class<? extends IFloodlightService>> l =
-                new ArrayList<Class<? extends IFloodlightService>>();
-        l.add(IThreadPoolService.class);
-        l.add(IStorageSourceService.class);
-        l.add(IDebugCounterService.class);
-        return l;
-    }
-
-    // ***************
-    // Local methods
-    // ***************
-
-    protected void doUpdateConfiguration()
-            throws FloodlightModuleException {
-
-        try {
-            ClusterConfig oldConfig = clusterConfig;
-            clusterConfig = clusterConfigProvider.getConfig();
-            if (clusterConfig.equals(oldConfig)) return;
-
-            logger.info("[{}] Updating sync configuration {}",
-                        clusterConfig.getNode().getNodeId(),
-                        clusterConfig);
-            if (oldConfig.getNode() != null &&
-                !clusterConfig.getNode().equals(oldConfig.getNode())) {
-                logger.info("[{}] Local node configuration changed; restarting sync" +
-                        "service", oldConfig.getNode().getNodeId());
-                shutdown();
-                startUp(null);
-            }
-
-            for (Node n : clusterConfig.getNodes()) {
-                Node existing = oldConfig.getNode(n.getNodeId());
-                if (existing != null && !n.equals(existing)) {
-                    // we already had this node's configuration, but it's
-                    // changed.  Disconnect from the node and let it
-                    // reinitialize
-                    logger.debug("[{}->{}] Configuration for node has changed",
-                                 getLocalNodeId(), n.getNodeId());
-                    rpcService.disconnectNode(n.getNodeId());
-                }
-            }
-            for (Node n : oldConfig.getNodes()) {
-                Node nn = clusterConfig.getNode(n.getNodeId());
-                if (nn == null) {
-                    // n is a node that doesn't appear in the new config
-                    logger.debug("[{}->{}] Disconnecting deconfigured node",
-                                 getLocalNodeId(), n.getNodeId());
-                    rpcService.disconnectNode(n.getNodeId());
-                }
-            }
-        } catch (Exception e) {
-            throw new FloodlightModuleException("Could not update " +
-                                                "configuration", e);
-        }
-    }
-
-    protected SynchronizingStorageEngine getStoreInternal(String storeName)
-            throws UnknownStoreException {
-        SynchronizingStorageEngine store = storeRegistry.get(storeName);
-        if (store == null) {
-            throw new UnknownStoreException("Store " + storeName +
-                                            " has not been registered");
-        }
-        return store;
-    }
-
-    private void sendSyncOffer(short nodeId, SyncMessage bsm)
-            throws InterruptedException {
-        SyncOfferMessage som = bsm.getSyncOffer();
-        if (!som.isSetVersions()) return;
-        if (logger.isTraceEnabled()) {
-            logger.trace("[{}->{}] Sending SyncOffer with {} elements",
-                         new Object[]{getLocalNodeId(), nodeId,
-                                      som.getVersionsSize()});
-        }
-
-        som.getHeader().setTransactionId(rpcService.getTransactionId());
-        rpcService.writeToNode(nodeId, bsm);
-    }
-
-    /**
-     * Periodically perform cleanup
-     * @author readams
-     */
-    protected class CleanupTask implements Runnable {
-        @Override
-        public void run() {
-            try {
-                if (rpcService != null)
-                    cleanup();
-            } catch (Exception e) {
-                logger.error("Cleanup task failed", e);
-            }
-
-            if (rpcService != null) {
-                cleanupTask.reschedule(CLEANUP_INTERVAL +
-                                       random.nextInt(30), TimeUnit.SECONDS);
-            }
-        }
-    }
-
-    /**
-     * Periodically perform antientropy
-     * @author readams
-     */
-    protected class AntientropyTask implements Runnable {
-        @Override
-        public void run() {
-            try {
-                if (rpcService != null)
-                    antientropy();
-            } catch (Exception e) {
-                logger.error("Antientropy task failed", e);
-            }
-
-            if (rpcService != null) {
-                antientropyTask.reschedule(ANTIENTROPY_INTERVAL +
-                                           random.nextInt(30),
-                                           TimeUnit.SECONDS);
-            }
-        }
-    }
-
-    /**
-     * Worker task to periodically rescan the configuration
-     * @author readams
-     */
-    protected class UpdateConfigTask implements Runnable {
-        @Override
-        public void run() {
-            try {
-                if (rpcService != null)
-                    doUpdateConfiguration();
-            } catch (Exception e) {
-                logger.error("Failed to update configuration", e);
-            }
-            if (rpcService != null) {
-                updateConfigTask.reschedule(CONFIG_RESCAN_INTERVAL,
-                                            TimeUnit.SECONDS);
-            }
-        }
-    }
-
-    /**
-     * Worker thread that will drain the sync item queue and write the
-     * appropriate messages to the node I/O channels
-     * @author readams
-     */
-    protected class HintWorker implements Runnable {
-        ArrayList<Hint> tasks = new ArrayList<Hint>(50);
-        protected Map<String, SyncMessage> messages =
-                new LinkedHashMap<String, SyncMessage>();
-
-        @Override
-        public void run() {
-            while (rpcService != null) {
-                try {
-                    // Batch up sync tasks so we use fewer, larger messages
-                    // XXX - todo - handle hints targeted to specific nodes
-                    storeRegistry.takeHints(tasks, 50);
-                    for (Hint task : tasks) {
-                        counterHints.increment();
-                        SynchronizingStorageEngine store =
-                                storeRegistry.get(task.getHintKey().
-                                                  getStoreName());
-                        SyncMessage bsm = getMessage(store);
-                        KeyedValues kv =
-                                TProtocolUtil.
-                                getTKeyedValues(task.getHintKey().getKey(),
-                                                task.getValues());
-                        bsm.getSyncValue().addToValues(kv);
-                    }
-
-                    Iterable<Node> nodes = getClusterConfig().getNodes();
-                    short localDomainId =
-                            getClusterConfig().getNode().getDomainId();
-                    short localNodeId =
-                            getClusterConfig().getNode().getNodeId();
-                    for (Node n : nodes) {
-                        if (localNodeId == n.getNodeId())
-                            continue;
-                        for (SyncMessage bsm : messages.values()) {
-                            SyncValueMessage svm = bsm.getSyncValue();
-                            if (svm.getStore().getScope().
-                                    equals(org.sdnplatform.sync.thrift.
-                                           Scope.LOCAL) &&
-                                           n.getDomainId() != localDomainId) {
-                                // This message is only for local domain
-                                continue;
-                            }
-
-                            svm.getHeader().
-                            setTransactionId(rpcService.getTransactionId());
-                            counterSentValues.add(bsm.getSyncValue().getValuesSize());
-                            rpcService.writeToNode(n.getNodeId(), bsm);
-                        }
-                    }
-                    tasks.clear();
-                    clearMessages();
-
-                } catch (Exception e) {
-                    logger.error("Error occured in synchronization worker", e);
-                }
-            }
-        }
-
-        /**
-         * Clear the current list of pending messages
-         */
-        private void clearMessages() {
-            messages.clear();
-        }
-
-        /**
-         * Allocate a partially-initialized {@link SyncMessage} object for
-         * the given store
-         * @param store the store
-         * @return the {@link SyncMessage} object
-         */
-        private SyncMessage getMessage(SynchronizingStorageEngine store) {
-            String storeName = store.getName();
-            SyncMessage bsm = messages.get(storeName);
-            if (bsm == null) {
-                bsm = TProtocolUtil.getTSyncValueMessage(storeName,
-                                                         store.getScope(),
-                                                         store.isPersistent());
-                messages.put(storeName, bsm);
-            }
-            return bsm;
-        }
-    }
+	protected static final Logger logger =
+			LoggerFactory.getLogger(SyncManager.class.getName());
+
+	protected IThreadPoolService threadPool;
+	protected IDebugCounterService debugCounter;
+
+	/**
+	 * The store registry holds the storage engines that provide
+	 * access to the data
+	 */
+	private StoreRegistry storeRegistry = null;
+	
+	private Timer timer;
+
+	private IClusterConfigProvider clusterConfigProvider;
+	private ClusterConfig clusterConfig = new ClusterConfig();
+
+	protected RPCService rpcService = null;
+
+	/**
+	 * Interval between cleanup tasks in seconds
+	 */
+	private static final int CLEANUP_INTERVAL = 60 * 60;
+
+	/**
+	 * Interval between antientropy tasks in seconds
+	 */
+	private static final int ANTIENTROPY_INTERVAL = 5 * 60;
+
+	/**
+	 * Interval between configuration rescans
+	 */
+	private static final int CONFIG_RESCAN_INTERVAL = 10;
+
+	/**
+	 * Task for performing periodic maintenance/cleanup on local stores
+	 */
+	private SingletonTask cleanupTask;
+
+	/**
+	 * Task for periodic antientropy between nodes
+	 */
+	private SingletonTask antientropyTask;
+
+	/**
+	 * Task to periodically rescan configuration
+	 */
+	private SingletonTask updateConfigTask;
+
+	/**
+	 * Number of {@link HintWorker} workers used to drain the queue of writes
+	 * that need to be sent to the connected nodes
+	 */
+	private static final int SYNC_WORKER_POOL = 2;
+
+	/**
+	 * A thread pool for the {@link HintWorker} threads.
+	 */
+	private ExecutorService hintThreadPool;
+
+	/**
+	 * Random number generator
+	 */
+	private final Random random = new Random();
+
+	/**
+	 * A map of the currently-allocated cursors
+	 */
+	private final Map<Integer, Cursor> cursorMap =
+			new ConcurrentHashMap<Integer, Cursor>();
+
+	/**
+	 * Whether to allow persistent stores or to use in-memory even
+	 * when persistence is requested
+	 */
+	private boolean persistenceEnabled = true;
+
+	private static final String PACKAGE =
+			ISyncService.class.getPackage().getName();
+
+	/**
+	 * Debug Counters
+	 */
+	public static IDebugCounter counterHints;
+	public static IDebugCounter counterSentValues;
+	public static IDebugCounter counterReceivedValues;
+	public static IDebugCounter counterPuts;
+	public static IDebugCounter counterGets;
+	public static IDebugCounter counterIterators;
+	public static IDebugCounter counterErrorRemote;
+	public static IDebugCounter counterErrorProcessing;
+
+	// ************
+	// ISyncService
+	// ************
+
+	@Override
+	public void registerStore(String storeName, Scope scope) {
+		try {
+			storeRegistry.register(storeName, scope, false);
+		} catch (PersistException e) {
+			// not possible
+			throw new SyncRuntimeException(e);
+		}
+	}
+
+	@Override
+	public void registerPersistentStore(String storeName, Scope scope)
+			throws PersistException {
+		storeRegistry.register(storeName, scope, persistenceEnabled);
+	}
+
+	// **************************
+	// SyncManager public methods
+	// **************************
+
+	/**
+	 * Get the cluster configuration object
+	 * @return the {@link ClusterConfig} object
+	 * @see ClusterConfig
+	 */
+	public ClusterConfig getClusterConfig() {
+		return clusterConfig;
+	}
+
+	/**
+	 * Perform periodic scheduled cleanup.  Note that this will be called
+	 * automatically and you shouldn't generally call it directly except for
+	 * testing
+	 * @throws SyncException
+	 */
+	public void cleanup() throws SyncException {
+		for (SynchronizingStorageEngine store : storeRegistry.values()) {
+			store.cleanupTask();
+		}
+	}
+
+	/**
+	 * Perform a synchronization with the node specified
+	 */
+	public void antientropy(Node node) {
+		if (!rpcService.isConnected(node.getNodeId())) return;
+
+		logger.info("[{}->{}] Synchronizing local state to remote node",
+				getLocalNodeId(), node.getNodeId());
+
+		for (SynchronizingStorageEngine store : storeRegistry.values()) {
+			if (Scope.LOCAL.equals(store.getScope())) {
+				if (node.getDomainId() !=
+						getClusterConfig().getNode().getDomainId())
+					continue;
+			} else if (Scope.UNSYNCHRONIZED.equals(store.getScope())) {
+				continue;
+			}
+
+			IClosableIterator<Entry<ByteArray,
+			List<Versioned<byte[]>>>> entries =
+			store.entries();
+			try {
+				SyncMessage bsm =
+						TProtocolUtil.getTSyncOfferMessage(store.getName(),
+								store.getScope(),
+								store.isPersistent());
+				int count = 0;
+				while (entries.hasNext()) {
+					if (!rpcService.isConnected(node.getNodeId())) return;
+
+					Entry<ByteArray, List<Versioned<byte[]>>> pair =
+							entries.next();
+					KeyedVersions kv =
+							TProtocolUtil.getTKeyedVersions(pair.getKey(),
+									pair.getValue());
+					bsm.getSyncOffer().addToVersions(kv);
+					count += 1;
+					if (count >= 50) {
+						sendSyncOffer(node.getNodeId(), bsm);
+						// realloc sync message - it is still queued up by netty!
+						bsm = TProtocolUtil.getTSyncOfferMessage(store.getName(),
+								store.getScope(),
+								store.isPersistent());
+						count = 0;
+					}
+				}
+				sendSyncOffer(node.getNodeId(), bsm);
+			} catch (InterruptedException e) {
+				// This can't really happen
+				throw new RuntimeException(e);
+			} finally {
+				entries.close();
+			}
+		}
+	}
+
+	/**
+	 * Communicate with a random node and do a full synchronization of the
+	 * all the stores on each node that have the appropriate scope.
+	 */
+	public void antientropy() {
+		ArrayList<Node> candidates = new ArrayList<Node>();
+		for (Node n : clusterConfig.getNodes())
+			if (rpcService.isConnected(n.getNodeId()))
+				candidates.add(n);
+
+		int numNodes = candidates.size();
+		if (numNodes == 0) return;
+		Node[] nodes = candidates.toArray(new Node[numNodes]);
+		int rn = random.nextInt(numNodes);
+		antientropy(nodes[rn]);
+	}
+
+	/**
+	 * Write a value synchronized from another node, bypassing some of the
+	 * usual logic when a client writes data.  If the store is not known,
+	 * this will automatically register it
+	 * @param storeName the store name
+	 * @param scope the scope for the store
+	 * @param persist TODO
+	 * @param key the key to write
+	 * @param values a list of versions for the key to write
+	 * @throws PersistException
+	 */
+	public void writeSyncValue(String storeName, Scope scope,
+			boolean persist,
+			byte[] key, Iterable<Versioned<byte[]>> values)
+					throws PersistException {
+		SynchronizingStorageEngine store = storeRegistry.get(storeName);
+		if (store == null) {
+			store = storeRegistry.register(storeName, scope, persist);
+		}
+		store.writeSyncValue(new ByteArray(key), values);
+	}
+
+	/**
+	 * Check whether any of the specified versions for the key are not older
+	 * than the versions we already have
+	 * @param storeName the store to check
+	 * @param key the key to check
+	 * @param versions an iterable over the versions
+	 * @return true if we'd like a copy of the data indicated
+	 * @throws SyncException
+	 */
+	public boolean handleSyncOffer(String storeName,
+			byte[] key,
+			Iterable<VectorClock> versions)
+					throws SyncException {
+		SynchronizingStorageEngine store = storeRegistry.get(storeName);
+		if (store == null) return true;
+
+		List<Versioned<byte[]>> values = store.get(new ByteArray(key));
+		if (values == null || values.size() == 0) return true;
+
+		// check whether any of the versions are not older than what we have
+		for (VectorClock vc : versions) {
+			for (Versioned<byte[]> value : values) {
+				VectorClock existingVc = (VectorClock)value.getVersion();
+				if (!vc.compare(existingVc).equals(Occurred.BEFORE))
+					return true;
+			}
+		}
+
+		return false;
+	}
+
+	/**
+	 * Get access to the raw storage engine.  This is useful for some
+	 * on-the-wire communication
+	 * @param storeName the store name to get
+	 * @return the {@link IStorageEngine}
+	 * @throws UnknownStoreException
+	 */
+	public IStorageEngine<ByteArray, byte[]> getRawStore(String storeName)
+			throws UnknownStoreException {
+		return getStoreInternal(storeName);
+	}
+
+	/**
+	 * Return the threadpool
+	 * @return the {@link IThreadPoolService}
+	 */
+	public IThreadPoolService getThreadPool() {
+		return threadPool;
+	}
+
+	/**
+	 * Queue a synchronization of the specified {@link KeyedValues} to all nodes
+	 * assocatiated with the storage engine specified
+	 * @param e the storage engine for the values
+	 * @param kv the values to synchronize
+	 */
+	public void queueSyncTask(SynchronizingStorageEngine e,
+			ByteArray key, Versioned<byte[]> value) {
+		storeRegistry.queueHint(e.getName(), key, value);
+	}
+
+	@Override
+	public void addListener(String storeName, MappingStoreListener listener)
+			throws UnknownStoreException {
+		SynchronizingStorageEngine store = getStoreInternal(storeName);
+		store.addListener(listener);
+	}
+
+	/**
+	 * Update the node configuration to add or remove nodes
+	 * @throws FloodlightModuleException
+	 */
+	public void updateConfiguration() {
+		if (updateConfigTask != null)
+			updateConfigTask.reschedule(500, TimeUnit.MILLISECONDS);
+	}
+
+	/**
+	 * Retrieve the cursor, if any, for the given cursor ID
+	 * @param cursorId the cursor ID
+	 * @return the {@link Cursor}
+	 */
+	public Cursor getCursor(int cursorId) {
+		return cursorMap.get(Integer.valueOf(cursorId));
+	}
+
+	/**
+	 * Allocate a new cursor for the given store name
+	 * @param storeName the store name
+	 * @return the {@link Cursor}
+	 * @throws SyncException
+	 */
+	public Cursor newCursor(String storeName) throws UnknownStoreException {
+		IStore<ByteArray, byte[]> store = getStore(storeName);
+		int cursorId = rpcService.getTransactionId();
+		Cursor cursor = new Cursor(cursorId, store.entries());
+		cursorMap.put(Integer.valueOf(cursorId), cursor);
+		return cursor;
+	}
+
+	/**
+	 * Close the given cursor and remove it from the map
+	 * @param cursor the cursor to close
+	 */
+	public void closeCursor(Cursor cursor) {
+		cursor.close();
+		cursorMap.remove(Integer.valueOf(cursor.getCursorId()));
+	}
+
+	// *******************
+	// AbstractSyncManager
+	// *******************
+
+	@Override
+	public IStore<ByteArray,byte[]> getStore(String storeName)
+			throws UnknownStoreException {
+		return getRawStore(storeName);
+	}
+
+	@Override
+	public short getLocalNodeId() {
+		Node l = clusterConfig.getNode();
+		if (l == null) return Short.MAX_VALUE;
+		return l.getNodeId();
+	}
+
+	@Override
+	public void shutdown() {
+		logger.info("Shutting down Sync Manager: {} {}",
+				clusterConfig.getNode().getHostname(),
+				clusterConfig.getNode().getPort());
+
+		if (rpcService != null) {
+			rpcService.shutdown();
+		}
+		if (hintThreadPool != null) {
+			hintThreadPool.shutdown();
+		}
+		if (storeRegistry != null) {
+			storeRegistry.shutdown();
+		}
+		if (timer != null)
+            timer.stop();
+        timer = null;
+		hintThreadPool = null;
+		rpcService = null;
+	}
+
+	// *****************
+	// IFloodlightModule
+	// *****************
+
+	@Override
+	public void init(FloodlightModuleContext context)
+			throws FloodlightModuleException {
+		threadPool = context.getServiceImpl(IThreadPoolService.class);
+		debugCounter = context.getServiceImpl(IDebugCounterService.class);
+		Map<String, String> config = context.getConfigParams(this);
+		storeRegistry = new StoreRegistry(this, config.get("dbPath"));
+
+		String[] configProviders =
+			{PropertyCCProvider.class.getName(),
+				SyncStoreCCProvider.class.getName(),
+				StorageCCProvider.class.getName(),
+				FallbackCCProvider.class.getName()};
+		try {
+			if (config.containsKey("persistenceEnabled")) {
+				persistenceEnabled =
+						Boolean.parseBoolean(config.get("persistenceEnabled"));
+			}
+			if (config.containsKey("configProviders")) {
+				configProviders = config.get("configProviders").split(",");
+			}
+			DelegatingCCProvider dprovider = new DelegatingCCProvider();
+			for (String configProvider : configProviders) {
+				Class<?> cClass = Class.forName(configProvider);
+				IClusterConfigProvider provider =
+						(IClusterConfigProvider) cClass.newInstance();
+				dprovider.addProvider(provider);
+			}
+			dprovider.init(this, context);
+			clusterConfigProvider = dprovider;
+		} catch (Exception e) {
+			throw new FloodlightModuleException("Could not instantiate config" +
+					"providers " + Arrays.toString(configProviders), e);
+		}
+
+		String manualStoreString = config.get("manualStores");
+		if (manualStoreString != null) {
+			List<String> manualStores = null;
+			try {
+				manualStores =
+						(new ObjectMapper()).readValue(manualStoreString,
+								new TypeReference<List<String>>() {});
+			} catch (Exception e) {
+				throw new FloodlightModuleException("Failed to parse sync " +
+						"manager manual stores: " + manualStoreString, e);
+			}
+			for (String s : manualStores) {
+				registerStore(s, Scope.GLOBAL);
+			}
+		}
+		registerDebugCounters(context);
+	}
+
+	private void registerDebugCounters(FloodlightModuleContext context)
+			throws FloodlightModuleException {
+		if (context != null) {
+			debugCounter.registerModule(PACKAGE);
+			counterHints = debugCounter.registerCounter(PACKAGE, "hints",
+					"Queued sync events processed");
+			counterSentValues = debugCounter.registerCounter(PACKAGE, "sent-values",
+					"Values synced to remote node");
+			counterReceivedValues = debugCounter.registerCounter(PACKAGE, "received-values",
+					"Values received from remote node");
+			counterPuts = debugCounter.registerCounter(PACKAGE, "puts",
+					"Local puts to store");
+			counterGets = debugCounter.registerCounter(PACKAGE, "gets",
+					"Local gets from store");
+			counterIterators = debugCounter.registerCounter(PACKAGE, "iterators",
+					"Local iterators created over store");
+			counterErrorRemote = debugCounter.registerCounter(PACKAGE, "error-remote",
+					"Number of errors sent from remote clients",
+					IDebugCounterService.MetaData.ERROR);
+			counterErrorProcessing = debugCounter.registerCounter(PACKAGE,
+					"error-processing",
+					"Number of errors processing messages from remote clients",
+					IDebugCounterService.MetaData.ERROR);
+		}
+
+	}
+
+	@Override
+	public void startUp(FloodlightModuleContext context)
+			throws FloodlightModuleException {
+
+		timer = new HashedWheelTimer();
+		rpcService = new RPCService(this, debugCounter, timer);
+
+		cleanupTask = new SingletonTask(threadPool.getScheduledExecutor(),
+				new CleanupTask());
+		cleanupTask.reschedule(CLEANUP_INTERVAL +
+				random.nextInt(30), TimeUnit.SECONDS);
+
+		antientropyTask = new SingletonTask(threadPool.getScheduledExecutor(),
+				new AntientropyTask());
+		antientropyTask.reschedule(ANTIENTROPY_INTERVAL +
+				random.nextInt(30), TimeUnit.SECONDS);
+
+		final ThreadGroup tg = new ThreadGroup("Hint Workers");
+		tg.setMaxPriority(Thread.NORM_PRIORITY - 2);
+		ThreadFactory f = new ThreadFactory() {
+			AtomicInteger id = new AtomicInteger();
+
+			@Override
+			public Thread newThread(Runnable runnable) {
+				return new Thread(tg, runnable,
+						"HintWorker-" + id.getAndIncrement());
+			}
+		};
+		hintThreadPool = Executors.newCachedThreadPool(f);
+		for (int i = 0; i < SYNC_WORKER_POOL; i++) {
+			hintThreadPool.execute(new HintWorker());
+		}
+
+		doUpdateConfiguration();
+		rpcService.run();
+
+		updateConfigTask =
+				new SingletonTask(threadPool.getScheduledExecutor(),
+						new UpdateConfigTask());
+		updateConfigTask.reschedule(CONFIG_RESCAN_INTERVAL, TimeUnit.SECONDS);
+	}
+
+	@Override
+	public Collection<Class<? extends IFloodlightService>>
+	getModuleDependencies() {
+		Collection<Class<? extends IFloodlightService>> l =
+				new ArrayList<Class<? extends IFloodlightService>>();
+		l.add(IThreadPoolService.class);
+		l.add(IStorageSourceService.class);
+		l.add(IDebugCounterService.class);
+		return l;
+	}
+
+	// ***************
+	// Local methods
+	// ***************
+
+	protected void doUpdateConfiguration()
+			throws FloodlightModuleException {
+
+		try {
+			ClusterConfig oldConfig = clusterConfig;
+			clusterConfig = clusterConfigProvider.getConfig();
+			if (clusterConfig.equals(oldConfig)) return;
+
+			logger.info("[{}] Updating sync configuration {}",
+					clusterConfig.getNode().getNodeId(),
+					clusterConfig);
+			if (oldConfig.getNode() != null &&
+					!clusterConfig.getNode().equals(oldConfig.getNode())) {
+				logger.info("[{}] Local node configuration changed; restarting sync" +
+						"service", oldConfig.getNode().getNodeId());
+				shutdown();
+				startUp(null);
+			}
+
+			for (Node n : clusterConfig.getNodes()) {
+				Node existing = oldConfig.getNode(n.getNodeId());
+				if (existing != null && !n.equals(existing)) {
+					// we already had this node's configuration, but it's
+					// changed.  Disconnect from the node and let it
+					// reinitialize
+					logger.info("[{}->{}] Configuration for node has changed",
+							getLocalNodeId(), n.getNodeId());
+					rpcService.disconnectNode(n.getNodeId());
+				}
+			}
+			for (Node n : oldConfig.getNodes()) {
+				Node nn = clusterConfig.getNode(n.getNodeId());
+				if (nn == null) {
+					// n is a node that doesn't appear in the new config
+					logger.info("[{}->{}] Disconnecting deconfigured node",
+							getLocalNodeId(), n.getNodeId());
+					rpcService.disconnectNode(n.getNodeId());
+				}
+			}
+		} catch (Exception e) {
+			throw new FloodlightModuleException("Could not update " +
+					"configuration", e);
+		}
+	}
+
+	protected SynchronizingStorageEngine getStoreInternal(String storeName)
+			throws UnknownStoreException {
+		SynchronizingStorageEngine store = storeRegistry.get(storeName);
+		if (store == null) {
+			throw new UnknownStoreException("Store " + storeName +
+					" has not been registered");
+		}
+		return store;
+	}
+
+	private void sendSyncOffer(short nodeId, SyncMessage bsm)
+			throws InterruptedException {
+		SyncOfferMessage som = bsm.getSyncOffer();
+		if (!som.isSetVersions()) return;
+		if (logger.isTraceEnabled()) {
+			logger.trace("[{}->{}] Sending SyncOffer with {} elements",
+					new Object[]{getLocalNodeId(), nodeId,
+					som.getVersionsSize()});
+		}
+
+		som.getHeader().setTransactionId(rpcService.getTransactionId());
+		rpcService.writeToNode(nodeId, bsm);
+	}
+
+	/**
+	 * Periodically perform cleanup
+	 * @author readams
+	 */
+	protected class CleanupTask implements Runnable {
+		@Override
+		public void run() {
+			try {
+				if (rpcService != null)
+					cleanup();
+			} catch (Exception e) {
+				logger.error("Cleanup task failed", e);
+			}
+
+			if (rpcService != null) {
+				cleanupTask.reschedule(CLEANUP_INTERVAL +
+						random.nextInt(30), TimeUnit.SECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Periodically perform antientropy
+	 * @author readams
+	 */
+	protected class AntientropyTask implements Runnable {
+		@Override
+		public void run() {
+			try {
+				if (rpcService != null)
+					antientropy();
+			} catch (Exception e) {
+				logger.error("Antientropy task failed", e);
+			}
+
+			if (rpcService != null) {
+				antientropyTask.reschedule(ANTIENTROPY_INTERVAL +
+						random.nextInt(30),
+						TimeUnit.SECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Worker task to periodically rescan the configuration
+	 * @author readams
+	 */
+	protected class UpdateConfigTask implements Runnable {
+		@Override
+		public void run() {
+			try {
+				if (rpcService != null)
+					doUpdateConfiguration();
+			} catch (Exception e) {
+				logger.error("Failed to update configuration", e);
+			}
+			if (rpcService != null) {
+				updateConfigTask.reschedule(CONFIG_RESCAN_INTERVAL,
+						TimeUnit.SECONDS);
+			}
+		}
+	}
+
+	/**
+	 * Worker thread that will drain the sync item queue and write the
+	 * appropriate messages to the node I/O channels
+	 * @author readams
+	 */
+	protected class HintWorker implements Runnable {
+		ArrayList<Hint> tasks = new ArrayList<Hint>(50);
+		protected Map<String, SyncMessage> messages =
+				new LinkedHashMap<String, SyncMessage>();
+
+		@Override
+		public void run() {
+			while (rpcService != null) {
+				try {
+					// Batch up sync tasks so we use fewer, larger messages
+					// XXX - todo - handle hints targeted to specific nodes
+					storeRegistry.takeHints(tasks, 50);
+					for (Hint task : tasks) {
+						counterHints.increment();
+						SynchronizingStorageEngine store =
+								storeRegistry.get(task.getHintKey().
+										getStoreName());
+						SyncMessage bsm = getMessage(store);
+						KeyedValues kv =
+								TProtocolUtil.
+								getTKeyedValues(task.getHintKey().getKey(),
+										task.getValues());
+						bsm.getSyncValue().addToValues(kv);
+					}
+
+					Iterable<Node> nodes = getClusterConfig().getNodes();
+					short localDomainId =
+							getClusterConfig().getNode().getDomainId();
+					short localNodeId =
+							getClusterConfig().getNode().getNodeId();
+					for (Node n : nodes) {
+						if (localNodeId == n.getNodeId())
+							continue;
+						for (SyncMessage bsm : messages.values()) {
+							SyncValueMessage svm = bsm.getSyncValue();
+							if (svm.getStore().getScope().
+									equals(org.sdnplatform.sync.thrift.
+											Scope.LOCAL) &&
+											n.getDomainId() != localDomainId) {
+								// This message is only for local domain
+								continue;
+							}
+
+							svm.getHeader().setTransactionId(rpcService.getTransactionId());
+							counterSentValues.add(bsm.getSyncValue().getValuesSize());
+							rpcService.writeToNode(n.getNodeId(), bsm);
+						}
+					}
+					tasks.clear();
+					clearMessages();
+
+				} catch (Exception e) {
+					logger.error("Error occured in synchronization worker", e);
+				}
+			}
+		}
+
+		/**
+		 * Clear the current list of pending messages
+		 */
+		private void clearMessages() {
+			messages.clear();
+		}
+
+		/**
+		 * Allocate a partially-initialized {@link SyncMessage} object for
+		 * the given store
+		 * @param store the store
+		 * @return the {@link SyncMessage} object
+		 */
+		private SyncMessage getMessage(SynchronizingStorageEngine store) {
+			String storeName = store.getName();
+			SyncMessage bsm = messages.get(storeName);
+			if (bsm == null) {
+				bsm = TProtocolUtil.getTSyncValueMessage(storeName,
+						store.getScope(),
+						store.isPersistent());
+				messages.put(storeName, bsm);
+			}
+			return bsm;
+		}
+	}
 }
diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java
index b6f75e6b5..a1c4fd7af 100644
--- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java
+++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java
@@ -62,6 +62,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         rpcService.getChannelGroup().add(ctx.channel());
+        super.channelActive(ctx);
     }
 
     @Override
@@ -69,6 +70,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
         if (remoteNode != null) {
             rpcService.disconnectNode(remoteNode.getNodeId());
         }
+        super.channelInactive(ctx);
     }
 
     // ******************************************
@@ -100,7 +102,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
         header.setTransactionId(getTransactionId());
         srm.setHeader(header);
         SyncMessage bsm = new SyncMessage(MessageType.FULL_SYNC_REQUEST);
-        channel.write(bsm);
+        channel.writeAndFlush(bsm);
 
         // XXX - TODO - if last connection was longer ago than the tombstone
         // timeout, then we need to do a complete flush and reload of our
@@ -133,9 +135,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
 
             SyncMessage bsm = new SyncMessage(MessageType.GET_RESPONSE);
             bsm.setGetResponse(m);
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.GET_REQUEST));
         }
     }
@@ -178,9 +180,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
 
             SyncMessage bsm = new SyncMessage(MessageType.PUT_RESPONSE);
             bsm.setPutResponse(m);
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.PUT_REQUEST));
         }
     }
@@ -217,9 +219,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
             SyncMessage bsm =
                     new SyncMessage(MessageType.DELETE_RESPONSE);
             bsm.setDeleteResponse(m);
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.DELETE_REQUEST));
         }
     }
@@ -259,9 +261,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
 
             updateCounter(SyncManager.counterReceivedValues,
                           request.getValuesSize());
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.SYNC_VALUE));
         }
     }
@@ -304,10 +306,10 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
                                           getRemoteNodeIdString(),
                                           srm.getKeysSize()});
             }
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
 
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(),
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(),
                                    e, MessageType.SYNC_OFFER));
         }
     }
@@ -346,7 +348,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
                                                          bsm));
             }
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.SYNC_REQUEST));
         }
     }
@@ -393,9 +395,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
             SyncMessage bsm =
                     new SyncMessage(MessageType.CURSOR_RESPONSE);
             bsm.setCursorResponse(m);
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(),
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(),
                                    e, MessageType.CURSOR_REQUEST));
         }
     }
@@ -417,9 +419,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
             SyncMessage bsm =
                     new SyncMessage(MessageType.REGISTER_RESPONSE);
             bsm.setRegisterResponse(m);
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.REGISTER_REQUEST));
         }
     }
@@ -502,9 +504,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
             SyncMessage bsm =
                     new SyncMessage(MessageType.CLUSTER_JOIN_RESPONSE);
             bsm.setClusterJoinResponse(cjrm);
-            channel.write(bsm);
+            channel.writeAndFlush(bsm);
         } catch (Exception e) {
-            channel.write(getError(request.getHeader().getTransactionId(), e,
+            channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
                                    MessageType.CLUSTER_JOIN_REQUEST));
         }
     }
diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java
index 57ef76b38..28b96de13 100644
--- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java
+++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java
@@ -5,7 +5,6 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 
 import org.sdnplatform.sync.internal.SyncManager;
@@ -25,12 +24,12 @@ public class RPCChannelInitializer extends ChannelInitializer<Channel> {
     private static final int maxFrameSize = 512 * 1024;
     
     public RPCChannelInitializer(SyncManager syncManager,
-                              RPCService rpcService) {
+                              RPCService rpcService,
+                              Timer timer) {
         super();
         this.syncManager = syncManager;
         this.rpcService = rpcService;
-
-        this.timer = new HashedWheelTimer();
+        this.timer = timer;
     }
 
     @Override
diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java
index b75a88078..e7f30d9cd 100644
--- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java
+++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java
@@ -31,6 +31,7 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.Timer;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
 import org.sdnplatform.sync.internal.SyncManager;
@@ -67,27 +68,22 @@ public class RPCService {
     /**
      * {@link EventLoopGroup} used for netty boss threads
      */
-    protected EventLoopGroup bossExecutor;
+    protected EventLoopGroup bossGroup;
     
     /**
      * {@link EventLoopGroup} used for netty worker threads
      */
-    protected EventLoopGroup workerExecutor;
+    protected EventLoopGroup workerGroup;
 
     /**
      * Netty {@link ClientBootstrap} used for creating client connections 
      */
     protected Bootstrap clientBootstrap;
-    
-    /**
-     * Netty {@link ServerBootstrap} used for creating server connections 
-     */
-    protected ServerBootstrap serverBootstrap;
 
     /**
      * {@link RPCChannelInitializer} for creating connections 
      */
-    protected RPCChannelInitializer pipelineFactory;
+    protected RPCChannelInitializer channelInitializer;
 
     /**
      * Node connections
@@ -126,6 +122,11 @@ public class RPCService {
      */
     protected SingletonTask reconnectTask;
     
+    /**
+     * Timer used for timeouts
+     */
+    private final Timer timer;
+    
     /**
      * If we want to rate-limit certain types of messages, we can do
      * so by limiting the overall number of outstanding messages.  
@@ -166,10 +167,12 @@ public class RPCService {
     protected static final int MAX_PENDING_MESSAGES = 500;
 
     public RPCService(SyncManager syncManager, 
-                      IDebugCounterService debugCounter) {
+                      IDebugCounterService debugCounter,
+                      Timer timer) {
         super();
         this.syncManager = syncManager;
         this.debugCounter = debugCounter;
+        this.timer = timer;
 
         messageWindows = new ConcurrentHashMap<Short, MessageWindow>();
     }
@@ -209,13 +212,13 @@ public class RPCService {
             }
         };
         
-        bossExecutor = new NioEventLoopGroup(0, f2);
-        workerExecutor = new NioEventLoopGroup(0, f2);
+        bossGroup = new NioEventLoopGroup(0, f2);
+        workerGroup = new NioEventLoopGroup(0, f2);
 
-        pipelineFactory = new RPCChannelInitializer(syncManager, this);
+        channelInitializer = new RPCChannelInitializer(syncManager, this, timer);
 
-        startServer(pipelineFactory);
-        startClients(pipelineFactory);
+        startServer(channelInitializer);
+        startClients(channelInitializer);
     }
 
     /**
@@ -230,14 +233,13 @@ public class RPCService {
             }
 
             clientBootstrap = null;
-            serverBootstrap = null;
-            pipelineFactory = null;
-            if (bossExecutor != null)
-            	NettyUtils.shutdownAndWait("boss group", bossExecutor);
-            bossExecutor = null;
-            if (workerExecutor != null)
-            	NettyUtils.shutdownAndWait("worker group", workerExecutor);
-            workerExecutor = null;
+            channelInitializer = null;
+            if (bossGroup != null)
+            	NettyUtils.shutdownAndWait("Sync RPC Service boss group", bossGroup);
+            bossGroup = null;
+            if (workerGroup != null)
+            	NettyUtils.shutdownAndWait("Sync RPC Service worker group", workerGroup);
+            workerGroup = null;
         } catch (InterruptedException e) {
             logger.warn("Interrupted while shutting down RPC server");
         }
@@ -267,7 +269,7 @@ public class RPCService {
         NodeConnection nc = connections.get(nodeId);
         if (nc != null && nc.state == NodeConnectionState.CONNECTED) {
             waitForMessageWindow(bsm.getType(), nodeId, 0);
-            nc.nodeChannel.write(bsm);
+            nc.nodeChannel.writeAndFlush(bsm);
             return true;
         }
         return false;
@@ -423,7 +425,7 @@ public class RPCService {
      */
     protected void startServer(RPCChannelInitializer channelInitializer) {
         final ServerBootstrap bootstrap = new ServerBootstrap();
-        bootstrap.group(bossExecutor, workerExecutor)
+        bootstrap.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_REUSEADDR, true)
         .option(ChannelOption.SO_KEEPALIVE, true)
@@ -445,8 +447,6 @@ public class RPCService {
         ChannelFuture bindFuture = bootstrap.bind(sa);
         cg.add(bindFuture.channel());
         
-        serverBootstrap = bootstrap;
-
         logger.info("Listening for internal floodlight RPC on {}", sa);
     }
 
@@ -513,7 +513,7 @@ public class RPCService {
      */
     protected void startClients(RPCChannelInitializer channelInitializer) {
         final Bootstrap bootstrap = new Bootstrap();
-        bootstrap.group(workerExecutor)
+        bootstrap.group(workerGroup)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.SO_REUSEADDR, true)
         .option(ChannelOption.SO_KEEPALIVE, true)
diff --git a/src/test/java/org/sdnplatform/sync/internal/BootstrapTest.java b/src/test/java/org/sdnplatform/sync/internal/BootstrapTest.java
index 4aeb57ac9..378333a6d 100644
--- a/src/test/java/org/sdnplatform/sync/internal/BootstrapTest.java
+++ b/src/test/java/org/sdnplatform/sync/internal/BootstrapTest.java
@@ -70,6 +70,10 @@ public class BootstrapTest {
                     new File(dbFolder.getRoot(), 
                              "server" + i).getAbsolutePath();
             fmc.addConfigParam(syncManager, "dbPath", dbPath);
+            /*fmc.addConfigParam(syncManager, "keystorePath", keyStorePath);
+            fmc.addConfigParam(syncManager, "keystorePassword", keyStorePassword);
+            fmc.addConfigParam(syncManager, "authScheme", dbPath);
+            fmc.addConfigParam(syncManager, "port", dbPath);*/
 
             tp.init(fmc);
             syncManager.init(fmc);
-- 
GitLab