diff --git a/src/main/java/org/sdnplatform/sync/internal/SyncManager.java b/src/main/java/org/sdnplatform/sync/internal/SyncManager.java index 540fa42b1c1155ee49869caf8403c937e4d83052..a67700677cf840844fca78eed5c624e42aecb8e5 100644 --- a/src/main/java/org/sdnplatform/sync/internal/SyncManager.java +++ b/src/main/java/org/sdnplatform/sync/internal/SyncManager.java @@ -32,6 +32,7 @@ import org.sdnplatform.sync.error.UnknownStoreException; import org.sdnplatform.sync.internal.StoreRegistry.Hint; import org.sdnplatform.sync.internal.config.ClusterConfig; import org.sdnplatform.sync.internal.config.DelegatingCCProvider; +import org.sdnplatform.sync.internal.config.FTCCProvider; import org.sdnplatform.sync.internal.config.FallbackCCProvider; import org.sdnplatform.sync.internal.config.IClusterConfigProvider; import org.sdnplatform.sync.internal.config.Node; @@ -465,10 +466,16 @@ public class SyncManager extends AbstractSyncManager { storeRegistry = new StoreRegistry(this, config.get("dbPath")); String[] configProviders = - {PropertyCCProvider.class.getName(), - SyncStoreCCProvider.class.getName(), - StorageCCProvider.class.getName(), + { + /** + * Tulio Ribeiro + */ + FTCCProvider.class.getName(), + //PropertyCCProvider.class.getName(), + //SyncStoreCCProvider.class.getName(), + //StorageCCProvider.class.getName(), FallbackCCProvider.class.getName()}; + try { if (config.containsKey("persistenceEnabled")) { persistenceEnabled = @@ -476,12 +483,14 @@ public class SyncManager extends AbstractSyncManager { } if (config.containsKey("configProviders")) { configProviders = config.get("configProviders").split(","); + logger.info("configProviders: ",configProviders); } DelegatingCCProvider dprovider = new DelegatingCCProvider(); for (String configProvider : configProviders) { Class<?> cClass = Class.forName(configProvider); IClusterConfigProvider provider = (IClusterConfigProvider) cClass.newInstance(); + logger.info("Adding provider: {}",provider); dprovider.addProvider(provider); } dprovider.init(this, context); diff --git a/src/main/java/org/sdnplatform/sync/internal/config/ClusterConfig.java b/src/main/java/org/sdnplatform/sync/internal/config/ClusterConfig.java index 416f4ebaacddf5455a40e404ceefe251526a2617..563f2a1fd9b86e558d123ca40a61e787233c540a 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/ClusterConfig.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/ClusterConfig.java @@ -9,6 +9,8 @@ import java.util.List; import org.sdnplatform.sync.error.SyncException; +import sun.util.logging.resources.logging; + /** * Represent the configuration of a cluster in the sync manager @@ -201,6 +203,7 @@ public class ClusterConfig { this.authScheme = AuthScheme.NO_AUTH; this.keyStorePath = keyStorePath; this.keyStorePassword = keyStorePassword; + } @Override diff --git a/src/main/java/org/sdnplatform/sync/internal/config/PropertyCCProvider.java b/src/main/java/org/sdnplatform/sync/internal/config/PropertyCCProvider.java index c78218c206be2ca0a7a54ac0971e828c407806c2..d88c156611e991614f4a1df25001526faf84aaf1 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/PropertyCCProvider.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/PropertyCCProvider.java @@ -44,6 +44,9 @@ public class PropertyCCProvider implements IClusterConfigProvider { List<Node> nodes = mapper.readValue(config.get("nodes"), new TypeReference<List<Node>>() { }); + //nodes.add(new Node("192.168.1.131",6642,(short)1,(short)1)); + //nodes.add(new Node("192.168.1.131",6643,(short)2,(short)2)); + return new ClusterConfig(nodes, thisNodeId, authScheme, keyStorePath, diff --git a/src/main/java/org/sdnplatform/sync/internal/config/StorageCCProvider.java b/src/main/java/org/sdnplatform/sync/internal/config/StorageCCProvider.java index 7e22ad81e16ae712d8ee1783134a1b70958c12ea..ce6edc4db44402c62938dc25379adc3d5aa8e060 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/StorageCCProvider.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/StorageCCProvider.java @@ -31,7 +31,8 @@ public class StorageCCProvider private IStorageSourceService storageSource; - HashMap<Short, Node> clusterNode; + private List<Node> clusterInitialNode; + String thisControllerID; AuthScheme authScheme; String keyStorePath; @@ -68,15 +69,10 @@ public class StorageCCProvider Map<String, String> config = context.getConfigParams(FloodlightProvider.class); - - thisControllerID = config.get("controllerId"); config = context.getConfigParams(SyncManager.class); - String clusterNodes = config.get("clusterNodes"); - clusterNode = jsonToNodeMap(clusterNodes, thisControllerID); - logger.info("Initial Cluster Nodes: {}",clusterNode); - logger.info("ControllerId at: {}", thisControllerID); + logger.info("ControllerId: {}", thisControllerID); keyStorePath = config.get("keyStorePath"); keyStorePassword = config.get("keyStorePassword"); @@ -84,6 +80,11 @@ public class StorageCCProvider try { authScheme = AuthScheme.valueOf(config.get("authScheme")); } catch (Exception e) {} + String clusterNodes = config.get("clusterNodes"); + clusterInitialNode = jsonToNodeMap(clusterNodes, config.get("nodeId")); + logger.info("Initial Cluster Node: {} {}",config.get("nodeId"),clusterInitialNode); + + } @Override @@ -149,11 +150,16 @@ public class StorageCCProvider if (res != null) res.close(); } + nodes.add(new Node("192.168.1.131", 6642, (short)1, (short)1)); + nodes.add(new Node("192.168.1.131", 6643, (short)2, (short)2)); + + if (nodes.size() == 0) throw new SyncException("No valid nodes found"); if (thisNodeId < 0) throw new SyncException("Could not find a node for the local node"); + logger.info("Nodes: {}",nodes); return new ClusterConfig(nodes, thisNodeId, authScheme, keyStorePath, keyStorePassword); } @@ -190,18 +196,19 @@ public class StorageCCProvider } } + /** - * Tulio Ribeiro - * @param String json + * @param String org.sdnplatform.sync.internal.SyncManager.clusterNodes foodlightdefault.properties. + * @param String controllerId * @return Map<String, Node> */ - private static HashMap<Short, Node> jsonToNodeMap(String json, String controllerId) { + private static List<Node> jsonToNodeMap(String json, String controllerId) { MappingJsonFactory f = new MappingJsonFactory(); JsonParser jp; - HashMap<Short, Node> retValue = new HashMap<Short, Node>(); + List<Node> nodes = new ArrayList<Node>(); if (json == null || json.isEmpty()) { - return retValue; + return nodes; } try { @@ -222,39 +229,40 @@ public class StorageCCProvider } String nodeId = jp.getCurrentName(); - + String host=null; String domainId = controllerId; String [] aux; int port; Node node=null; - + jp.nextToken(); if (jp.getText().equals("")) { continue; } host = jp.getValueAsString(); - + aux= host.split(":"); host = aux[0]; port = Integer.parseInt(aux[1]); try { - logger.info("Initialize node: {}:{} {} {}", + logger.debug("Creating node: {}:{} {} {}", new Object[]{host, port, nodeId, nodeId} - ); + ); node = new Node(host, port, Short.parseShort(nodeId), Short.parseShort(nodeId)); - retValue.put(Short.parseShort(nodeId), node); - //logger.info("Parsing JSON controllerId:{}, node:{}", controllerId, host+":"+port); + nodes.add(node); } catch(Exception e){ e.printStackTrace(); } - + } } catch (IOException e) { logger.error("Problem: {}", e); } - return retValue; + return nodes; } + + } diff --git a/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java b/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java index 6d436030c765e314a217828ae2da97d7b3d226b8..f1609c237c1cd9bf10da226aeb5ef858ebc597cc 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java @@ -1,5 +1,6 @@ package org.sdnplatform.sync.internal.config; +import java.io.IOException; import java.net.Inet4Address; import java.net.Inet6Address; import java.net.InetAddress; @@ -7,6 +8,7 @@ import java.net.NetworkInterface; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -15,6 +17,7 @@ import java.util.concurrent.TimeUnit; import java.util.Random; import net.floodlightcontroller.core.module.FloodlightModuleContext; +import net.floodlightcontroller.core.module.IFloodlightModule; import net.floodlightcontroller.core.util.SingletonTask; import net.floodlightcontroller.threadpool.IThreadPoolService; @@ -30,6 +33,10 @@ import org.sdnplatform.sync.internal.config.bootstrap.BootstrapClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonToken; +import com.fasterxml.jackson.databind.MappingJsonFactory; import com.google.common.base.Joiner; import com.google.common.net.HostAndPort; @@ -39,369 +46,419 @@ import com.google.common.net.HostAndPort; * @author readams */ public class SyncStoreCCProvider - implements IClusterConfigProvider { - protected static final Logger logger = - LoggerFactory.getLogger(SyncStoreCCProvider.class); - - private SyncManager syncManager; - private IThreadPoolService threadPool; - - private SingletonTask bootstrapTask; - - private IStoreClient<Short, Node> nodeStoreClient; - private IStoreClient<String, String> unsyncStoreClient; - - private volatile AuthScheme authScheme; - private volatile String keyStorePath; - private volatile String keyStorePassword; - - private static final String PREFIX = - SyncManager.class.getCanonicalName(); - public static final String SYSTEM_NODE_STORE = - PREFIX + ".systemNodeStore"; - public static final String SYSTEM_UNSYNC_STORE = - PREFIX + ".systemUnsyncStore"; - - public static final String SEEDS = "seeds"; - public static final String LOCAL_NODE_ID = "localNodeId"; - public static final String LOCAL_NODE_IFACE = "localNodeIface"; - public static final String LOCAL_NODE_HOSTNAME = "localNodeHostname"; - public static final String LOCAL_NODE_PORT = "localNodePort"; - public static final String AUTH_SCHEME = "authScheme"; - public static final String KEY_STORE_PATH = "keyStorePath"; - public static final String KEY_STORE_PASSWORD = "keyStorePassword"; - - Map<String, String> config; - - // ********************** - // IClusterConfigProvider - // ********************** - - @Override - public void init(SyncManager syncManager, FloodlightModuleContext context) - throws SyncException { - this.syncManager = syncManager; - threadPool = context.getServiceImpl(IThreadPoolService.class); - syncManager.registerPersistentStore(SYSTEM_NODE_STORE, Scope.GLOBAL); - syncManager.registerPersistentStore(SYSTEM_UNSYNC_STORE, - Scope.UNSYNCHRONIZED); - this.nodeStoreClient = - syncManager.getStoreClient(SYSTEM_NODE_STORE, - Short.class, Node.class); - this.nodeStoreClient.addStoreListener(new ShortListener()); - this.unsyncStoreClient = - syncManager.getStoreClient(SYSTEM_UNSYNC_STORE, - String.class, String.class); - this.unsyncStoreClient.addStoreListener(new StringListener()); - - config = context.getConfigParams(syncManager); - } - - @Override - public ClusterConfig getConfig() throws SyncException { - if (bootstrapTask == null) - bootstrapTask = new SingletonTask(threadPool.getScheduledExecutor(), - new BootstrapTask()); - - keyStorePath = config.get("keyStorePath"); - keyStorePassword = config.get("keyStorePassword"); - try { - authScheme = AuthScheme.valueOf(config.get("authScheme")); - } catch (Exception e) { - authScheme = null; - } - - if (keyStorePath == null) - keyStorePath = unsyncStoreClient.getValue(KEY_STORE_PATH); - if (keyStorePassword == null) - keyStorePassword = - unsyncStoreClient.getValue(KEY_STORE_PASSWORD); - if (authScheme == null) { - try { - authScheme = - AuthScheme.valueOf(unsyncStoreClient. - getValue(AUTH_SCHEME)); - } catch (Exception e) { - authScheme = AuthScheme.NO_AUTH; - } - } - - Short localNodeId = getLocalNodeId(); - //Short localNodeId = 2; - - if (localNodeId == null) { - String seedStr = - unsyncStoreClient.getValue(SyncStoreCCProvider.SEEDS); - if (seedStr == null) { - throw new SyncException("No local node ID and no seeds"); - } - bootstrapTask.reschedule(0, TimeUnit.SECONDS); - throw new SyncException("Local node ID not yet configured"); - } - - IClosableIterator<Entry<Short, Versioned<Node>>> iter = - nodeStoreClient.entries(); - List<Node> nodes = new ArrayList<Node>(); - try { - while (iter.hasNext()) { - Entry<Short, Versioned<Node>> e = iter.next(); - if (e.getValue().getValue() != null) { - if (e.getValue().getValue().getNodeId() == localNodeId) - continue; - nodes.add(e.getValue().getValue()); - } - } - - Node oldLocalNode = null; - Node newLocalNode = null; - while (true) { - try { - Versioned<Node> v = - nodeStoreClient.get(Short.valueOf(localNodeId)); - oldLocalNode = v.getValue(); - if (oldLocalNode != null) { - newLocalNode = getLocalNode(oldLocalNode.getNodeId(), - oldLocalNode.getDomainId()); - v.setValue(newLocalNode); - } - break; - } catch (ObsoleteVersionException e) { } - } - if (newLocalNode == null) { - newLocalNode = getLocalNode(localNodeId, localNodeId); - } - nodes.add(newLocalNode); - - if (oldLocalNode == null || !oldLocalNode.equals(newLocalNode)) { - // If we have no local node or our hostname or port changes, - // we should trigger a new cluster join to ensure that the - // new value can propagate everywhere - bootstrapTask.reschedule(0, TimeUnit.SECONDS); - } - - ClusterConfig config = new ClusterConfig(nodes, localNodeId, - authScheme, - keyStorePath, - keyStorePassword); - updateSeeds(syncManager.getClusterConfig()); - return config; - } finally { - iter.close(); - } - } - - // ************* - // Local methods - // ************* - - private Short getLocalNodeId() throws SyncException { - String localNodeIdStr = unsyncStoreClient.getValue(LOCAL_NODE_ID); - if (localNodeIdStr == null) - return null; - - short localNodeId; - try { - localNodeId = Short.parseShort(localNodeIdStr); - } catch (NumberFormatException e) { - throw new SyncException("Failed to parse local node ID: " + - localNodeIdStr, e); - } - return localNodeId; - } - - private void updateSeeds(ClusterConfig config) throws SyncException { - List<String> hosts = new ArrayList<String>(); - for (Node n : config.getNodes()) { - if (!config.getNode().equals(n)) { - HostAndPort h = - HostAndPort.fromParts(n.getHostname(), n.getPort()); - hosts.add(h.toString()); - } - } - Collections.sort(hosts); - String seeds = Joiner.on(',').join(hosts); - while (true) { - try { - Versioned<String> sv = unsyncStoreClient.get(SEEDS); - if (sv.getValue() == null || !sv.getValue().equals(seeds)) { - if (logger.isDebugEnabled()) { - logger.debug("[{}] Updating seeds to \"{}\" from \"{}\"", - new Object[]{config.getNode().getNodeId(), - seeds, sv.getValue()}); - } - unsyncStoreClient.put(SEEDS, seeds); - } - break; - } catch (ObsoleteVersionException e) { } - } - } - - private Node getLocalNode(short nodeId, short domainId) - throws SyncException { - String hostname = unsyncStoreClient.getValue(LOCAL_NODE_HOSTNAME); - if (hostname == null) - hostname = getLocalHostname(); - - int port = 6642; - String portStr = unsyncStoreClient.getValue(LOCAL_NODE_PORT); - if (portStr != null) { - port = Integer.parseInt(portStr); - } - - return new Node(hostname, port, nodeId, domainId); - } - - private String getLocalHostname() throws SyncException { - String ifaceStr = unsyncStoreClient.getValue(LOCAL_NODE_IFACE); - - try { - Enumeration<NetworkInterface> ifaces = - NetworkInterface.getNetworkInterfaces(); - - InetAddress bestAddr = null; - for (NetworkInterface iface : Collections.list(ifaces)) { - try { - if (iface.isLoopback()) continue; - if (ifaceStr != null) { - if (!ifaceStr.equals(iface.getName())) - continue; - } - Enumeration<InetAddress> addrs = iface.getInetAddresses(); - for (InetAddress addr : Collections.list(addrs)) { - if (bestAddr == null || - (!addr.isLinkLocalAddress() && - bestAddr.isLinkLocalAddress()) || - (addr instanceof Inet6Address && - bestAddr instanceof Inet4Address)) { - bestAddr = addr; - } - } - } catch (Exception e) { - logger.debug("Failed to examine address", e); - } - } - if (bestAddr != null) - return bestAddr.getHostName(); - } catch (Exception e) { - throw new SyncException("Failed to find interface addresses", e); - } - throw new SyncException("No usable interface addresses found"); - } - - protected class BootstrapTask implements Runnable { - @Override - public void run() { - Short localNodeId = null; - try { - Node localNode = null; - - localNodeId = getLocalNodeId(); - if (localNodeId != null) - localNode = nodeStoreClient.getValue(localNodeId); - - String seedStr = - unsyncStoreClient.getValue(SyncStoreCCProvider.SEEDS); - if (seedStr == null) return; - - logger.debug("[{}] Attempting to bootstrap cluster", - localNodeId); - - if (seedStr.equals("")) { - localNode = setupLocalNode(localNode, localNodeId, true); - if (logger.isDebugEnabled()) { - logger.debug("[{}] First node configuration: {}", - localNode.getNodeId(), localNode); - } - - while (true) { - try { - nodeStoreClient.put(localNode.getNodeId(), - localNode); - break; - } catch (ObsoleteVersionException e) {} - } - - while (true) { - try { - unsyncStoreClient.put(LOCAL_NODE_ID, - Short.toString(localNode. - getNodeId())); - break; - } catch (ObsoleteVersionException e) {} - } - if (logger.isDebugEnabled()) { - logger.debug("[{}] Successfully bootstrapped", - localNode.getNodeId()); - } - } else { - localNode = setupLocalNode(localNode, localNodeId, false); - if (logger.isDebugEnabled()) { - logger.debug("[{}] Adding new node from seeds {}: {}", - new Object[]{localNodeId, seedStr, - localNode}); - } - - String[] seeds = seedStr.split(","); - ArrayList<HostAndPort> hosts = new ArrayList<HostAndPort>(); - for (String s : seeds) { - hosts.add(HostAndPort.fromString(s). - withDefaultPort(6642)); - } - BootstrapClient bs = new BootstrapClient(syncManager, - authScheme, - keyStorePath, - keyStorePassword); - bs.init(); - try { - for (HostAndPort host : hosts) { - if (bs.bootstrap(host, localNode)) - break; - } - } finally { - bs.shutdown(); - } - if (logger.isDebugEnabled()) { - logger.debug("[{}] Successfully bootstrapped", - unsyncStoreClient.getValue(LOCAL_NODE_ID)); - } - } - syncManager.updateConfiguration(); - } catch (Exception e) { - logger.error("[" + localNodeId + - "] Failed to bootstrap cluster", e); - } - } - - private Node setupLocalNode(Node localNode, Short localNodeId, - boolean firstNode) - throws SyncException { - short nodeId = -1; - short domainId = -1; - if (localNode != null) { - nodeId = localNode.getNodeId(); - domainId = localNode.getDomainId(); - } else if (localNodeId != null) { - domainId = nodeId = localNodeId; - } else if (firstNode) { - domainId = nodeId = - (short)(new Random().nextInt(Short.MAX_VALUE)); - } - Node n = getLocalNode(nodeId, domainId); - return n; - } - } - - protected class ShortListener implements IStoreListener<Short> { - @Override - public void keysModified(Iterator<Short> keys, UpdateType type) { - syncManager.updateConfiguration(); - } - } - - protected class StringListener implements IStoreListener<String> { - @Override - public void keysModified(Iterator<String> keys, UpdateType type) { - syncManager.updateConfiguration(); - } - } +implements IClusterConfigProvider { + protected static final Logger logger = + LoggerFactory.getLogger(SyncStoreCCProvider.class); + + private SyncManager syncManager; + private IThreadPoolService threadPool; + + private List<Node> clusterInitialNode; + + private SingletonTask bootstrapTask; + + private IStoreClient<Short, Node> nodeStoreClient; + private IStoreClient<String, String> unsyncStoreClient; + + private volatile AuthScheme authScheme; + private volatile String keyStorePath; + private volatile String keyStorePassword; + + private static final String PREFIX = + SyncManager.class.getCanonicalName(); + public static final String SYSTEM_NODE_STORE = + PREFIX + ".systemNodeStore"; + public static final String SYSTEM_UNSYNC_STORE = + PREFIX + ".systemUnsyncStore"; + + public static final String SEEDS = "seeds"; + public static final String LOCAL_NODE_ID = "localNodeId"; + public static final String LOCAL_NODE_IFACE = "localNodeIface"; + public static final String LOCAL_NODE_HOSTNAME = "localNodeHostname"; + public static final String LOCAL_NODE_PORT = "localNodePort"; + public static final String AUTH_SCHEME = "authScheme"; + public static final String KEY_STORE_PATH = "keyStorePath"; + public static final String KEY_STORE_PASSWORD = "keyStorePassword"; + + Map<String, String> config; + + // ********************** + // IClusterConfigProvider + // ********************** + + @Override + public void init(SyncManager syncManager, FloodlightModuleContext context) + throws SyncException { + + this.syncManager = syncManager; + + threadPool = context.getServiceImpl(IThreadPoolService.class); + syncManager.registerPersistentStore(SYSTEM_NODE_STORE, Scope.GLOBAL); + syncManager.registerPersistentStore(SYSTEM_UNSYNC_STORE, + Scope.UNSYNCHRONIZED); + this.nodeStoreClient = + syncManager.getStoreClient(SYSTEM_NODE_STORE, + Short.class, Node.class); + this.nodeStoreClient.addStoreListener(new ShortListener()); + this.unsyncStoreClient = + syncManager.getStoreClient(SYSTEM_UNSYNC_STORE, + String.class, String.class); + this.unsyncStoreClient.addStoreListener(new StringListener()); + + config = context.getConfigParams(syncManager); + logger.info("init config: ", config); + } + + @Override + public ClusterConfig getConfig() throws SyncException { + if (bootstrapTask == null) + bootstrapTask = new SingletonTask(threadPool.getScheduledExecutor(), + new BootstrapTask()); + + keyStorePath = config.get("keyStorePath"); + keyStorePassword = config.get("keyStorePassword"); + try { + authScheme = AuthScheme.valueOf(config.get("authScheme")); + } catch (Exception e) { + authScheme = null; + } + + if (keyStorePath == null) + keyStorePath = unsyncStoreClient.getValue(KEY_STORE_PATH); + if (keyStorePassword == null) + keyStorePassword = + unsyncStoreClient.getValue(KEY_STORE_PASSWORD); + if (authScheme == null) { + try { + authScheme = + AuthScheme.valueOf(unsyncStoreClient. + getValue(AUTH_SCHEME)); + } catch (Exception e) { + authScheme = AuthScheme.NO_AUTH; + } + } + + + Short localNodeId = getLocalNodeId(); + //Short localNodeId = Short.parseShort(config.get("nodeId").toString()); + + if (localNodeId == null) { + String seedStr = + unsyncStoreClient.getValue(SyncStoreCCProvider.SEEDS); + if (seedStr == null) { + logger.debug("localNodeId: {}",localNodeId); + throw new SyncException("No local node ID and no seeds"); + } + bootstrapTask.reschedule(0, TimeUnit.SECONDS); + throw new SyncException("Local node ID not yet configured"); + } + logger.info("localNodeId: {}",localNodeId); + ClusterConfig clusterConfig=null; + try { + clusterConfig = new ClusterConfig(clusterInitialNode, + Short.parseShort(config.get("nodeId").trim()), + authScheme, + keyStorePath, + keyStorePassword); + + } catch (Exception e) { + // TODO: handle exception + e.printStackTrace(); + //logger.debug("Algo ruim aconteceu: {}", e.toString()); + } + + + logger.info("config: {}", clusterConfig); + + updateSeeds(syncManager.getClusterConfig()); + + if(!clusterConfig.getNodes().isEmpty()) + return clusterConfig; + + + + + + IClosableIterator<Entry<Short, Versioned<Node>>> iter = + nodeStoreClient.entries(); + List<Node> nodes = new ArrayList<Node>(); + try { + while (iter.hasNext()) { + Entry<Short, Versioned<Node>> e = iter.next(); + if (e.getValue().getValue() != null) { + if (e.getValue().getValue().getNodeId() == localNodeId) + continue; + nodes.add(e.getValue().getValue()); + } + } + + Node oldLocalNode = null; + Node newLocalNode = null; + while (true) { + try { + Versioned<Node> v = + nodeStoreClient.get(Short.valueOf(localNodeId)); + oldLocalNode = v.getValue(); + if (oldLocalNode != null) { + newLocalNode = getLocalNode(oldLocalNode.getNodeId(), + oldLocalNode.getDomainId()); + v.setValue(newLocalNode); + } + break; + } catch (ObsoleteVersionException e) { } + } + if (newLocalNode == null) { + newLocalNode = getLocalNode(localNodeId, localNodeId); + } + nodes.add(newLocalNode); + + if (oldLocalNode == null || !oldLocalNode.equals(newLocalNode)) { + // If we have no local node or our hostname or port changes, + // we should trigger a new cluster join to ensure that the + // new value can propagate everywhere + bootstrapTask.reschedule(0, TimeUnit.SECONDS); + } + + // ClusterConfig config = new ClusterConfig(nodes, localNodeId, + // authScheme, + // keyStorePath, + // keyStorePassword); + //overwrite + + Iterator<Node> n = clusterInitialNode.iterator(); + while(n.hasNext()){ + Node a=n.next(); + nodeStoreClient.put(a.getNodeId(), a); + logger.info("adding node: {}",a); + } + ClusterConfig config = new ClusterConfig(clusterInitialNode, + localNodeId, + authScheme, + keyStorePath, + keyStorePassword); + + updateSeeds(syncManager.getClusterConfig()); + logger.info("config: {}", config); + return config; + } finally { + iter.close(); + } + } + + // ************* + // Local methods + // ************* + + private Short getLocalNodeId() throws SyncException { + String localNodeIdStr = unsyncStoreClient.getValue(LOCAL_NODE_ID); + if (localNodeIdStr == null) + return null; + + short localNodeId; + try { + localNodeId = Short.parseShort(localNodeIdStr); + } catch (NumberFormatException e) { + throw new SyncException("Failed to parse local node ID: " + + localNodeIdStr, e); + } + return localNodeId; + } + + private void updateSeeds(ClusterConfig config) throws SyncException { + List<String> hosts = new ArrayList<String>(); + for (Node n : config.getNodes()) { + if (!config.getNode().equals(n)) { + HostAndPort h = + HostAndPort.fromParts(n.getHostname(), n.getPort()); + hosts.add(h.toString()); + } + } + Collections.sort(hosts); + String seeds = Joiner.on(',').join(hosts); + while (true) { + try { + Versioned<String> sv = unsyncStoreClient.get(SEEDS); + if (sv.getValue() == null || !sv.getValue().equals(seeds)) { + if (logger.isDebugEnabled()) { + logger.debug("[{}] Updating seeds to \"{}\" from \"{}\"", + new Object[]{config.getNode().getNodeId(), + seeds, sv.getValue()}); + } + unsyncStoreClient.put(SEEDS, seeds); + } + break; + } catch (ObsoleteVersionException e) { } + } + } + + private Node getLocalNode(short nodeId, short domainId) + throws SyncException { + String hostname = unsyncStoreClient.getValue(LOCAL_NODE_HOSTNAME); + if (hostname == null) + hostname = getLocalHostname(); + + int port = 6642; + String portStr = unsyncStoreClient.getValue(LOCAL_NODE_PORT); + if (portStr != null) { + port = Integer.parseInt(portStr); + } + + return new Node(hostname, port, nodeId, domainId); + } + + private String getLocalHostname() throws SyncException { + String ifaceStr = unsyncStoreClient.getValue(LOCAL_NODE_IFACE); + + try { + Enumeration<NetworkInterface> ifaces = + NetworkInterface.getNetworkInterfaces(); + + InetAddress bestAddr = null; + for (NetworkInterface iface : Collections.list(ifaces)) { + try { + if (iface.isLoopback()) continue; + if (ifaceStr != null) { + if (!ifaceStr.equals(iface.getName())) + continue; + } + Enumeration<InetAddress> addrs = iface.getInetAddresses(); + for (InetAddress addr : Collections.list(addrs)) { + if (bestAddr == null || + (!addr.isLinkLocalAddress() && + bestAddr.isLinkLocalAddress()) || + (addr instanceof Inet6Address && + bestAddr instanceof Inet4Address)) { + bestAddr = addr; + } + } + } catch (Exception e) { + logger.debug("Failed to examine address", e); + } + } + if (bestAddr != null) + return bestAddr.getHostName(); + } catch (Exception e) { + throw new SyncException("Failed to find interface addresses", e); + } + throw new SyncException("No usable interface addresses found"); + } + + protected class BootstrapTask implements Runnable { + @Override + public void run() { + Short localNodeId = null; + try { + Node localNode = null; + + localNodeId = getLocalNodeId(); + if (localNodeId != null) + localNode = nodeStoreClient.getValue(localNodeId); + + String seedStr = + unsyncStoreClient.getValue(SyncStoreCCProvider.SEEDS); + if (seedStr == null) return; + + logger.debug("[{}] Attempting to bootstrap cluster", + localNodeId); + + if (seedStr.equals("")) { + localNode = setupLocalNode(localNode, localNodeId, true); + if (logger.isDebugEnabled()) { + logger.debug("[{}] First node configuration: {}", + localNode.getNodeId(), localNode); + } + + while (true) { + try { + nodeStoreClient.put(localNode.getNodeId(), + localNode); + break; + } catch (ObsoleteVersionException e) {} + } + + while (true) { + try { + unsyncStoreClient.put(LOCAL_NODE_ID, + Short.toString(localNode. + getNodeId())); + break; + } catch (ObsoleteVersionException e) {} + } + if (logger.isDebugEnabled()) { + logger.debug("[{}] Successfully bootstrapped", + localNode.getNodeId()); + } + } else { + localNode = setupLocalNode(localNode, localNodeId, false); + if (logger.isDebugEnabled()) { + logger.debug("[{}] Adding new node from seeds {}: {}", + new Object[]{localNodeId, seedStr, + localNode}); + } + + String[] seeds = seedStr.split(","); + ArrayList<HostAndPort> hosts = new ArrayList<HostAndPort>(); + for (String s : seeds) { + hosts.add(HostAndPort.fromString(s). + withDefaultPort(6642)); + } + BootstrapClient bs = new BootstrapClient(syncManager, + authScheme, + keyStorePath, + keyStorePassword); + bs.init(); + try { + for (HostAndPort host : hosts) { + if (bs.bootstrap(host, localNode)) + break; + } + } finally { + bs.shutdown(); + } + if (logger.isDebugEnabled()) { + logger.debug("[{}] Successfully bootstrapped", + unsyncStoreClient.getValue(LOCAL_NODE_ID)); + } + } + syncManager.updateConfiguration(); + } catch (Exception e) { + logger.error("[" + localNodeId + + "] Failed to bootstrap cluster", e); + } + } + + private Node setupLocalNode(Node localNode, Short localNodeId, + boolean firstNode) + throws SyncException { + short nodeId = -1; + short domainId = -1; + if (localNode != null) { + nodeId = localNode.getNodeId(); + domainId = localNode.getDomainId(); + } else if (localNodeId != null) { + domainId = nodeId = localNodeId; + } else if (firstNode) { + domainId = nodeId = + (short)(new Random().nextInt(Short.MAX_VALUE)); + } + Node n = getLocalNode(nodeId, domainId); + return n; + } + } + + protected class ShortListener implements IStoreListener<Short> { + @Override + public void keysModified(Iterator<Short> keys, UpdateType type) { + syncManager.updateConfiguration(); + } + } + + protected class StringListener implements IStoreListener<String> { + @Override + public void keysModified(Iterator<String> keys, UpdateType type) { + syncManager.updateConfiguration(); + } + } + + } diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java index 23b8e7bc24b93d37d565081c5da17252be97a95f..4d3958f318e769f4015ef345a04217d8e1df9618 100644 --- a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java +++ b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java @@ -88,7 +88,7 @@ public class RemoteSyncManager extends AbstractSyncManager { /** * The hostname of the server to connect to */ - protected String hostname = "localhost"; + protected String hostname = "192.168.1.131"; /** * Port to connect to diff --git a/src/main/resources/floodlightdefault.properties b/src/main/resources/floodlightdefault.properties index 3b130ba1eccfe555979866af9287b92806be2b9f..aafa1560cf1c9237c9436d080114c5a0bc2e562b 100644 --- a/src/main/resources/floodlightdefault.properties +++ b/src/main/resources/floodlightdefault.properties @@ -22,6 +22,7 @@ org.sdnplatform.sync.internal.SyncManager.authScheme=CHALLENGE_RESPONSE org.sdnplatform.sync.internal.SyncManager.keyStorePath=/etc/floodlight/mykey.jceks org.sdnplatform.sync.internal.SyncManager.dbPath=/var/lib/floodlight/ org.sdnplatform.sync.internal.SyncManager.port=6642 +org.sdnplatform.sync.internal.SyncManager.nodeId=1 org.sdnplatform.sync.internal.SyncManager.clusterNodes={"1":"192.168.1.131:6642","2":"192.168.1.131:6643"} net.floodlightcontroller.forwarding.Forwarding.match=vlan, mac, ip, transport net.floodlightcontroller.forwarding.Forwarding.flood-arp=NO diff --git a/src/main/resources/floodlightdefault.properties2 b/src/main/resources/floodlightdefault.properties2 index c53b05f56e3ccbaba89f99b62625d22370e770a6..bf5efadfbf6d9e30c9b48691845fe46f4284772a 100644 --- a/src/main/resources/floodlightdefault.properties2 +++ b/src/main/resources/floodlightdefault.properties2 @@ -19,13 +19,15 @@ net.floodlightcontroller.statistics.StatisticsCollector org.sdnplatform.sync.internal.SyncManager.authScheme=CHALLENGE_RESPONSE org.sdnplatform.sync.internal.SyncManager.keyStorePath=/etc/floodlight/auth_credentials.jceks org.sdnplatform.sync.internal.SyncManager.dbPath=/var/lib/floodlight2/ -org.sdnplatform.sync.internal.SyncManager.port=6642 +org.sdnplatform.sync.internal.SyncManager.port=6643 +org.sdnplatform.sync.internal.SyncManager.nodeId=2 +org.sdnplatform.sync.internal.SyncManager.clusterNodes={"1":"192.168.1.131:6642","2":"192.168.1.131:6643"} net.floodlightcontroller.forwarding.Forwarding.match=vlan, mac, ip, transport net.floodlightcontroller.forwarding.Forwarding.flood-arp=NO net.floodlightcontroller.core.internal.FloodlightProvider.openFlowPort=7753 net.floodlightcontroller.core.internal.FloodlightProvider.role=ACTIVE net.floodlightcontroller.core.internal.FloodlightProvider.workerThreads=8 -net.floodlightcontroller.core.internal.FloodlightProvider.controllerId=C2 +net.floodlightcontroller.core.internal.FloodlightProvider.controllerId=2 net.floodlightcontroller.linkdiscovery.internal.LinkDiscoveryManager.latency-history-size=10 net.floodlightcontroller.linkdiscovery.internal.LinkDiscoveryManager.latency-update-threshold=0.5 net.floodlightcontroller.core.internal.OFSwitchManager.defaultMaxTablesToReceiveTableMissFlow=1 diff --git a/src/main/resources/logback-test.xml b/src/main/resources/logback-test.xml index 95055e0b5febc8adb207ee8c90da7b3523380489..85249ba766703f97e2f165ec97d126525d143dc1 100644 --- a/src/main/resources/logback-test.xml +++ b/src/main/resources/logback-test.xml @@ -15,12 +15,13 @@ <logger name="LogService" level="DEBUG"></logger> <!-- Restlet access logging --> <logger name="net.floodlightcontroller" level="INFO"/> <logger name="org.sdnplatform" level="INFO"></logger> + <logger name="org.sdnplatform.sync.internal.config.SyncStoreCCProvider" level="TRACE"></logger> <logger name="net.floodlightcontroller.devicemanager" level="INFO"></logger> <logger name="net.floodlightcontroller.linkdiscovery" level="INFO"></logger> <logger name="net.floodlightcontroller.forwarding" level="INFO"></logger> <logger name="net.floodlightcontroller.simpleft.FT" level="TRACE"></logger> <logger name="net.floodlightcontroller.core" level="INFO"></logger> - <logger name="net.floodlightcontroller.topology" level="TRACE" ></logger> + <logger name="net.floodlightcontroller.topology" level="INFO" ></logger> <logger name="org.projectfloodlight.openflow" level="INFO" ></logger> <logger name="net.floodlightcontroller.core.internal.OFSwitchManager" level="INFO"></logger> <logger name="net.floodlightcontroller.core.internal.OFSwitchHandshakeHandler" level="INFO"></logger>