diff --git a/src/main/java/net/floodlightcontroller/simpleft/FT.java b/src/main/java/net/floodlightcontroller/simpleft/FT.java index e0ce2768f8ae3133c073b0b952d7227e79cebf08..1dd74659cdddfc5c407df995d483ceebbde5dda8 100644 --- a/src/main/java/net/floodlightcontroller/simpleft/FT.java +++ b/src/main/java/net/floodlightcontroller/simpleft/FT.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -15,6 +16,8 @@ import net.floodlightcontroller.core.IFloodlightProviderService; import net.floodlightcontroller.core.IOFConnectionBackend; import net.floodlightcontroller.core.IOFMessageListener; import net.floodlightcontroller.core.IOFSwitch; +import net.floodlightcontroller.core.IOFSwitchListener; +import net.floodlightcontroller.core.PortChangeType; import net.floodlightcontroller.core.internal.FloodlightProvider; import net.floodlightcontroller.core.internal.IOFConnectionListener; import net.floodlightcontroller.core.internal.IOFSwitchManager; @@ -31,8 +34,13 @@ import net.floodlightcontroller.threadpool.IThreadPoolService; import net.floodlightcontroller.threadpool.ThreadPool; import net.floodlightcontroller.util.TimedCache; +import org.projectfloodlight.openflow.protocol.OFControllerRole; import org.projectfloodlight.openflow.protocol.OFMessage; +import org.projectfloodlight.openflow.protocol.OFPortDesc; +import org.projectfloodlight.openflow.protocol.OFRoleReply; import org.projectfloodlight.openflow.protocol.OFType; +import org.projectfloodlight.openflow.types.DatapathId; +import org.projectfloodlight.openflow.types.U64; import org.sdnplatform.sync.IStoreClient; import org.sdnplatform.sync.IStoreListener; import org.sdnplatform.sync.ISyncService; @@ -46,29 +54,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.util.concurrent.ListenableFuture; public class FT implements IOFMessageListener, IFloodlightModule, -IStoreListener<String> +IStoreListener<String>, +IOFSwitchListener { private ISyncService syncService; - private IStoreClient<String, Long> storeClient; + private IStoreClient<String, String> storeClient; //private IFloodlightProviderService floodlightProvider; protected static Logger logger = LoggerFactory.getLogger(FT.class); protected static IThreadPoolService threadPoolService; - protected static SingletonTask testTask; + protected static SingletonTask syncTestTask; + protected static SingletonTask heartBeatTask; + private final int HEARTBEAT = 3;//time to discover a crashed node - protected IOFSwitchManager sm; - + protected static IOFSwitchService switchService; + private static UtilDurable utilDurable; + private String controllerId; - //private HashMap<String, Long> cluster; - //private TimedCache<String> tc = new TimedCache<>(10, 20000); - + private HashMap<Short, Integer> cluster; @Override public String getName() { @@ -115,6 +126,7 @@ IStoreListener<String> l.add(IStorageSourceService.class); l.add(ISyncService.class); l.add(IThreadPoolService.class); + l.add(IOFSwitchService.class); return l; } @@ -125,11 +137,14 @@ IStoreListener<String> this.syncService = context.getServiceImpl(ISyncService.class); threadPoolService = context.getServiceImpl(IThreadPoolService.class); + switchService = context.getServiceImpl(IOFSwitchService.class); //floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class); - + utilDurable = new UtilDurable(); Map<String, String> configParams = context.getConfigParams(FloodlightProvider.class); controllerId = configParams.get("controllerId"); + cluster = new HashMap<>(); + } @@ -137,51 +152,87 @@ IStoreListener<String> public void startUp(FloodlightModuleContext context) throws FloodlightModuleException { - + /** + * ############################################################## + * FAULT TOLERANCE BEGIN + * ############################################################## + * + */ + Collection<Node> nodes = syncService.getClusterConfig().getNodes(); + Iterator<Node> it = nodes.iterator(); + while (it.hasNext()) { + Node node = it.next(); + if(!controllerId.equals(""+node.getNodeId())){ + cluster.put(node.getNodeId(), 0); + } + } + logger.debug("ClusterConfig: {}", cluster); + + ScheduledExecutorService ses = threadPoolService.getScheduledExecutor(); + heartBeatTask = new SingletonTask(ses, new Runnable() { + @Override + public void run() { + HashMap<Short, Integer> mapC = syncService.getConnections(); + Iterator<Short> it = cluster.keySet().iterator(); + while (it.hasNext()) { + Short nodeId = it.next(); + if(!mapC.containsKey(nodeId)){ + logger.debug("Crashed nodeId: {}. Role request to switches...", nodeId); + String swIds=null; + try { + swIds = storeClient.get(""+nodeId).getValue(); + logger.debug("Switches managed by nodeId:{}, {}", nodeId, swIds); + } catch (SyncException e) { + e.printStackTrace(); + } + + if(swIds!= null){ + String swId[] = swIds.split(","); + for (int i = 0; i < swId.length; i++) { + setSwitchRole(OFControllerRole.ROLE_MASTER, swId[i]); + } + } + } + } + heartBeatTask.reschedule(HEARTBEAT, TimeUnit.SECONDS); + } + }); + heartBeatTask.reschedule(10, TimeUnit.SECONDS); + + + + /** + * ############################################################## + * UPDATE SWITCHES + * ############################################################## + */ + try { - this.syncService.registerStore("FT", Scope.GLOBAL); + this.syncService.registerStore("FT_Switches", Scope.GLOBAL); this.storeClient = this.syncService - .getStoreClient("FT", + .getStoreClient("FT_Switches", String.class, - Long.class); - this.storeClient.put(controllerId, new Long(0L)); + String.class); this.storeClient.addStoreListener(this); } catch (SyncException e) { throw new FloodlightModuleException("Error while setting up sync service", e); } - - - /*String obj; - try { - obj = this.storeClient.getValue("FT").toString(); - logger.debug("Sync retrieve: {}", obj); - this.storeClient.put("FT", new String("INIT")); - } catch (SyncException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - }*/ - - ScheduledExecutorService ses = threadPoolService.getScheduledExecutor(); - testTask = new SingletonTask(ses, new Runnable() { - int counter=0; - Long tsOld=0L; + ses = threadPoolService.getScheduledExecutor(); + syncTestTask = new SingletonTask(ses, new Runnable() { @Override - public void run() { - Random r = new Random(); + public void run() { try { - //storeClient.put(controllerId, new String("vl:"+r.nextInt(1000))); - storeClient.put(controllerId, new Long(System.nanoTime())); + storeClient.put(controllerId, getActiveSwitches()); } catch (SyncException e) { // TODO Auto-generated catch block e.printStackTrace(); } - - - testTask.reschedule(4, TimeUnit.SECONDS); + syncTestTask.reschedule(6, TimeUnit.SECONDS); } }); - testTask.reschedule(3, TimeUnit.SECONDS); + syncTestTask.reschedule(6, TimeUnit.SECONDS); + } @@ -208,12 +259,9 @@ IStoreListener<String> type.name()} );*/ if(type.name().equals("REMOTE")){ - Long ts = storeClient.get(k).getValue(); - logger.debug("REMOTE: k:{}, ts:{}", k, ts); - //cluster.put(k, ts); + String swIds = storeClient.get(k).getValue(); + logger.debug("REMOTE: key:{}, Value:{}", k, swIds); } - - //logger.debug("localNodeID: {}",this.syncService.getLocalNodeId()); } catch (SyncException e) { // TODO Auto-generated catch block e.printStackTrace(); @@ -222,6 +270,88 @@ IStoreListener<String> } } + @Override + public void switchAdded(DatapathId switchId) { + // TODO Auto-generated method stub + try { + this.storeClient.put(controllerId, getActiveSwitches()); + } catch (SyncException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void switchRemoved(DatapathId switchId) { + // TODO Auto-generated method stub + try { + this.storeClient.put(controllerId, getActiveSwitches()); + } catch (SyncException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void switchActivated(DatapathId switchId) { + // TODO Auto-generated method stub + try { + this.storeClient.put(controllerId, getActiveSwitches()); + } catch (SyncException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + @Override + public void switchPortChanged(DatapathId switchId, OFPortDesc port, + PortChangeType type) { + // TODO Auto-generated method stub + + } + + @Override + public void switchChanged(DatapathId switchId) { + // TODO Auto-generated method stub + try { + this.storeClient.put(controllerId, getActiveSwitches()); + } catch (SyncException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + public String getActiveSwitches(){ + String activeSwitches = ""; + Iterator<DatapathId> itDpid = switchService.getAllSwitchDpids().iterator(); + while (itDpid.hasNext()) { + DatapathId dpid = itDpid.next(); + if(switchService.getActiveSwitch(dpid).isActive()){ + activeSwitches += dpid; + if(itDpid.hasNext()) + activeSwitches += ","; + } + } + return activeSwitches; + } + + public void setSwitchRole(OFControllerRole role, String swId){ + + IOFSwitch sw = switchService.getActiveSwitch(DatapathId.of(swId)); + OFRoleReply reply=null; + UtilDurable utilDurable = new UtilDurable(); + reply = utilDurable.setSwitchRole(sw, role); + + if(reply!=null){ + logger.info("DEFINED {} as {}, reply.getRole:{}!", + new Object[]{ + sw.getId(), + role, + reply.getRole()}); + } + else + logger.info("Reply NULL!"); + } + } diff --git a/src/main/java/org/sdnplatform/sync/ISyncService.java b/src/main/java/org/sdnplatform/sync/ISyncService.java index 48feead4a3c378e61147292203db3ccaa6435900..5b3680ca61f00fabe5a8972e45a7abb9112b20a9 100644 --- a/src/main/java/org/sdnplatform/sync/ISyncService.java +++ b/src/main/java/org/sdnplatform/sync/ISyncService.java @@ -1,7 +1,11 @@ package org.sdnplatform.sync; +import java.util.HashMap; + import org.sdnplatform.sync.error.SyncException; import org.sdnplatform.sync.error.UnknownStoreException; +import org.sdnplatform.sync.internal.config.ClusterConfig; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.core.type.TypeReference; @@ -133,4 +137,11 @@ public interface ISyncService extends IFloodlightService { IInconsistencyResolver<Versioned<V>> resolver) throws UnknownStoreException; + + + + public ClusterConfig getClusterConfig(); + + public HashMap<Short, Integer> getConnections(); + } diff --git a/src/main/java/org/sdnplatform/sync/internal/SyncManager.java b/src/main/java/org/sdnplatform/sync/internal/SyncManager.java index 540fa42b1c1155ee49869caf8403c937e4d83052..b0692fab2fbef9550a86d96fd5631f9814c55fc3 100644 --- a/src/main/java/org/sdnplatform/sync/internal/SyncManager.java +++ b/src/main/java/org/sdnplatform/sync/internal/SyncManager.java @@ -6,11 +6,14 @@ import io.netty.util.Timer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -812,4 +815,12 @@ public class SyncManager extends AbstractSyncManager { return bsm; } } + + @Override + public HashMap<Short, Integer> getConnections() { + // TODO Auto-generated method stub + return rpcService.getConnections(); + } + + } diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java index 23b8e7bc24b93d37d565081c5da17252be97a95f..29f61db1e09afe68891709cc9de7c50156d9bbe3 100644 --- a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java +++ b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java @@ -3,6 +3,7 @@ package org.sdnplatform.sync.internal.remote; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collection; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -28,6 +29,7 @@ import org.sdnplatform.sync.error.SyncRuntimeException; import org.sdnplatform.sync.error.UnknownStoreException; import org.sdnplatform.sync.internal.AbstractSyncManager; import org.sdnplatform.sync.internal.config.AuthScheme; +import org.sdnplatform.sync.internal.config.ClusterConfig; import org.sdnplatform.sync.internal.rpc.RPCService; import org.sdnplatform.sync.internal.rpc.TProtocolUtil; import org.sdnplatform.sync.internal.store.IStore; @@ -370,4 +372,17 @@ public class RemoteSyncManager extends AbstractSyncManager { throw new RemoteStoreException("Error while waiting for reply", e); } } + + + @Override + public ClusterConfig getClusterConfig() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HashMap<Short, Integer> getConnections() { + // TODO Auto-generated method stub + return null; + } } diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java index 962e355437fd946335f5f56c7a2c95977ca75670..2835e21355c81d9abe7f2defc623c59226a8e13b 100644 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java @@ -4,6 +4,8 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.EnumSet; import java.util.HashMap; +import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -674,4 +676,18 @@ public class RPCService { } } } + + + public HashMap<Short, Integer> getConnections(){ + HashMap<Short, Integer> r = new HashMap<>(); + + Iterator<Short> it = connections.keySet().iterator(); + while (it.hasNext()) { + Short nodeId = it.next(); + r.put(nodeId, 0);// 0 it it for heartbeat count + } + return r; + } + + } \ No newline at end of file diff --git a/src/main/resources/floodlightdefault.properties b/src/main/resources/floodlightdefault.properties index 75aa64ea183acf90ecb8ee7c21b1af66328d17ca..936cbea2b26f58d422e513a969d72dac55818703 100644 --- a/src/main/resources/floodlightdefault.properties +++ b/src/main/resources/floodlightdefault.properties @@ -45,7 +45,7 @@ net.floodlightcontroller.core.internal.OFSwitchManager.keyStorePath=/path/to.jec net.floodlightcontroller.core.internal.OFSwitchManager.keyStorePassword=PassFL net.floodlightcontroller.core.internal.OFSwitchManager.useSsl=NO net.floodlightcontroller.core.internal.OFSwitchManager.supportedOpenFlowVersions=1.0, 1.1, 1.2, 1.3, 1.4 -net.floodlightcontroller.core.internal.OFSwitchManager.switchesInitialState={"00:00:00:00:00:00:00:01":"ROLE_MASTER","00:00:00:00:00:00:00:02":"ROLE_MASTER", "00:00:00:00:00:00:00:03":"ROLE_MASTER", "00:00:00:00:00:00:00:04":"ROLE_SLAVE","00:00:00:00:00:00:00:05":"ROLE_SLAVE"} +net.floodlightcontroller.core.internal.OFSwitchManager.switchesInitialState={"00:00:00:00:00:00:00:01":"ROLE_MASTER","00:00:00:00:00:00:00:02":"ROLE_MASTER", "00:00:00:00:00:00:00:03":"ROLE_MASTER", "00:00:00:00:00:00:00:04":"ROLE_MASTER","00:00:00:00:00:00:00:05":"ROLE_MASTER"} net.floodlightcontroller.restserver.RestApiServer.keyStorePath=/path/to/file net.floodlightcontroller.restserver.RestApiServer.keyStorePassword=passs net.floodlightcontroller.restserver.RestApiServer.httpsNeedClientAuthentication=NO diff --git a/src/main/resources/floodlightdefault.properties2 b/src/main/resources/floodlightdefault.properties2 index d9a4aeb7d867f611f016144f842985e75fd02809..002deb1729658884c670f076a59e04b8d04e478a 100644 --- a/src/main/resources/floodlightdefault.properties2 +++ b/src/main/resources/floodlightdefault.properties2 @@ -43,6 +43,6 @@ net.floodlightcontroller.core.internal.OFSwitchManager.keyStorePath=/path/to/fil net.floodlightcontroller.core.internal.OFSwitchManager.keyStorePassword=pass net.floodlightcontroller.core.internal.OFSwitchManager.useSsl=NO net.floodlightcontroller.core.internal.OFSwitchManager.supportedOpenFlowVersions=1.0, 1.1, 1.2, 1.3, 1.4 -net.floodlightcontroller.core.internal.OFSwitchManager.switchesInitialState={"00:00:00:00:00:00:00:01":"ROLE_SLAVE","00:00:00:00:00:00:00:02":"ROLE_SLAVE", "00:00:00:00:00:00:00:03":"ROLE_SLAVE", "00:00:00:00:00:00:00:04":"ROLE_MASTER","00:00:00:00:00:00:00:05":"ROLE_MASTER"} +net.floodlightcontroller.core.internal.OFSwitchManager.switchesInitialState={"00:00:00:00:00:00:00:01":"ROLE_SLAVE","00:00:00:00:00:00:00:02":"ROLE_SLAVE", "00:00:00:00:00:00:00:03":"ROLE_SLAVE", "00:00:00:00:00:00:00:04":"ROLE_SLAVE","00:00:00:00:00:00:00:05":"ROLE_SLAVE"} net.floodlightcontroller.statistics.StatisticsCollector.enable=FALSE net.floodlightcontroller.statistics.StatisticsCollector.collectionIntervalPortStatsSeconds=10 \ No newline at end of file diff --git a/src/main/resources/logback-test.xml b/src/main/resources/logback-test.xml index eb13c37c3e3fb44a411003f962dddd9b0acd6c8b..d0ad9e999eab9315211473a9d464a4accb2d9d1a 100644 --- a/src/main/resources/logback-test.xml +++ b/src/main/resources/logback-test.xml @@ -19,6 +19,7 @@ <logger name="org.sdnplatform.sync.internal.SyncManager" level="TRACE"></logger> <logger name="org.sdnplatform.sync.internal.config.StorageCCProvider" level="TRACE"></logger> <logger name="org.sdnplatform.sync.internal.config.PropertyCCProvider" level="TRACE"></logger> + <logger name="org.sdnplatform.sync.internal.rpc.RPCService" level="INFO"></logger> <logger name="net.floodlightcontroller.devicemanager" level="INFO"></logger> <logger name="net.floodlightcontroller.linkdiscovery" level="INFO"></logger> <logger name="net.floodlightcontroller.forwarding" level="INFO"></logger> diff --git a/src/test/java/org/sdnplatform/sync/test/MockSyncService.java b/src/test/java/org/sdnplatform/sync/test/MockSyncService.java index e4f1c63261b897bc55875d5db0c8bdfb4a51c5e9..4b9fca81dfaf0785a331cc6293f274aeacd6528a 100644 --- a/src/test/java/org/sdnplatform/sync/test/MockSyncService.java +++ b/src/test/java/org/sdnplatform/sync/test/MockSyncService.java @@ -7,6 +7,7 @@ import org.sdnplatform.sync.ISyncService; import org.sdnplatform.sync.error.SyncException; import org.sdnplatform.sync.error.UnknownStoreException; import org.sdnplatform.sync.internal.AbstractSyncManager; +import org.sdnplatform.sync.internal.config.ClusterConfig; import org.sdnplatform.sync.internal.store.IStorageEngine; import org.sdnplatform.sync.internal.store.IStore; import org.sdnplatform.sync.internal.store.InMemoryStorageEngine; @@ -119,4 +120,16 @@ public class MockSyncService extends AbstractSyncManager { public void reset() { localStores = new HashMap<String, ListenerStorageEngine>(); } + + @Override + public ClusterConfig getClusterConfig() { + // TODO Auto-generated method stub + return null; + } + + @Override + public HashMap<Short, Integer> getConnections() { + // TODO Auto-generated method stub + return null; + } }