Skip to content
Snippets Groups Projects
Commit d3699452 authored by Ryan Izard's avatar Ryan Izard
Browse files

Fixed a blunder in OFSwitchManager and Controller to allow switches to connect...

Fixed a blunder in OFSwitchManager and Controller to allow switches to connect if a server IP isn't explicitly given in floodlihgtdefault.properties. Still can't get sync's bootstrap test to pass. Feels like a race condition b/t update threads.
parent f9907670
No related branches found
No related tags found
No related merge requests found
...@@ -698,7 +698,7 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis ...@@ -698,7 +698,7 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
log.info("Number of worker threads set to {}", this.workerThreads); log.info("Number of worker threads set to {}", this.workerThreads);
String addresses = configParams.get("openFlowAddresses"); String addresses = configParams.get("openFlowAddresses");
if (!Strings.isNullOrEmpty(ofPort)) { if (!Strings.isNullOrEmpty(addresses)) {
try { try {
openFlowAddresses = Collections.singleton(IPv4Address.of(addresses)); //TODO support list of addresses for multi-honed controllers openFlowAddresses = Collections.singleton(IPv4Address.of(addresses)); //TODO support list of addresses for multi-honed controllers
} catch (Exception e) { } catch (Exception e) {
...@@ -706,6 +706,8 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis ...@@ -706,6 +706,8 @@ public class Controller implements IFloodlightProviderService, IStorageSourceLis
throw new FloodlightModuleException("Invalid OpenFlow address of " + addresses + " in config"); throw new FloodlightModuleException("Invalid OpenFlow address of " + addresses + " in config");
} }
log.info("OpenFlow addresses set to {}", openFlowAddresses); log.info("OpenFlow addresses set to {}", openFlowAddresses);
} else {
openFlowAddresses.add(IPv4Address.NONE);
} }
} }
......
...@@ -1009,7 +1009,7 @@ public class OFSwitchManager implements IOFSwitchManager, INewOFConnectionListen ...@@ -1009,7 +1009,7 @@ public class OFSwitchManager implements IOFSwitchManager, INewOFConnectionListen
Set<InetSocketAddress> addrs = new HashSet<InetSocketAddress>(); Set<InetSocketAddress> addrs = new HashSet<InetSocketAddress>();
if (floodlightProvider.getOFAddresses().isEmpty()) { if (floodlightProvider.getOFAddresses().isEmpty()) {
cg.add(bootstrap.bind(addrs.iterator().next()).channel()); cg.add(bootstrap.bind(new InetSocketAddress(InetAddress.getByAddress(IPv4Address.NONE.getBytes()), floodlightProvider.getOFPort().getPort())).channel());
} else { } else {
for (IPv4Address ip : floodlightProvider.getOFAddresses()) { for (IPv4Address ip : floodlightProvider.getOFAddresses()) {
addrs.add(new InetSocketAddress(InetAddress.getByAddress(ip.getBytes()), floodlightProvider.getOFPort().getPort())); addrs.add(new InetSocketAddress(InetAddress.getByAddress(ip.getBytes()), floodlightProvider.getOFPort().getPort()));
......
...@@ -62,6 +62,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -62,6 +62,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
@Override @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelHandlerContext ctx) throws Exception {
rpcService.getChannelGroup().add(ctx.channel()); rpcService.getChannelGroup().add(ctx.channel());
super.channelActive(ctx);
} }
@Override @Override
...@@ -69,6 +70,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -69,6 +70,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
if (remoteNode != null) { if (remoteNode != null) {
rpcService.disconnectNode(remoteNode.getNodeId()); rpcService.disconnectNode(remoteNode.getNodeId());
} }
super.channelInactive(ctx);
} }
// ****************************************** // ******************************************
...@@ -100,7 +102,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -100,7 +102,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
header.setTransactionId(getTransactionId()); header.setTransactionId(getTransactionId());
srm.setHeader(header); srm.setHeader(header);
SyncMessage bsm = new SyncMessage(MessageType.FULL_SYNC_REQUEST); SyncMessage bsm = new SyncMessage(MessageType.FULL_SYNC_REQUEST);
channel.write(bsm); channel.writeAndFlush(bsm);
// XXX - TODO - if last connection was longer ago than the tombstone // XXX - TODO - if last connection was longer ago than the tombstone
// timeout, then we need to do a complete flush and reload of our // timeout, then we need to do a complete flush and reload of our
...@@ -133,9 +135,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -133,9 +135,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
SyncMessage bsm = new SyncMessage(MessageType.GET_RESPONSE); SyncMessage bsm = new SyncMessage(MessageType.GET_RESPONSE);
bsm.setGetResponse(m); bsm.setGetResponse(m);
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.GET_REQUEST)); MessageType.GET_REQUEST));
} }
} }
...@@ -178,9 +180,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -178,9 +180,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
SyncMessage bsm = new SyncMessage(MessageType.PUT_RESPONSE); SyncMessage bsm = new SyncMessage(MessageType.PUT_RESPONSE);
bsm.setPutResponse(m); bsm.setPutResponse(m);
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.PUT_REQUEST)); MessageType.PUT_REQUEST));
} }
} }
...@@ -217,9 +219,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -217,9 +219,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
SyncMessage bsm = SyncMessage bsm =
new SyncMessage(MessageType.DELETE_RESPONSE); new SyncMessage(MessageType.DELETE_RESPONSE);
bsm.setDeleteResponse(m); bsm.setDeleteResponse(m);
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.DELETE_REQUEST)); MessageType.DELETE_REQUEST));
} }
} }
...@@ -259,9 +261,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -259,9 +261,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
updateCounter(SyncManager.counterReceivedValues, updateCounter(SyncManager.counterReceivedValues,
request.getValuesSize()); request.getValuesSize());
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.SYNC_VALUE)); MessageType.SYNC_VALUE));
} }
} }
...@@ -304,10 +306,10 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -304,10 +306,10 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
getRemoteNodeIdString(), getRemoteNodeIdString(),
srm.getKeysSize()}); srm.getKeysSize()});
} }
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), channel.writeAndFlush(getError(request.getHeader().getTransactionId(),
e, MessageType.SYNC_OFFER)); e, MessageType.SYNC_OFFER));
} }
} }
...@@ -346,7 +348,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -346,7 +348,7 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
bsm)); bsm));
} }
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.SYNC_REQUEST)); MessageType.SYNC_REQUEST));
} }
} }
...@@ -393,9 +395,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -393,9 +395,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
SyncMessage bsm = SyncMessage bsm =
new SyncMessage(MessageType.CURSOR_RESPONSE); new SyncMessage(MessageType.CURSOR_RESPONSE);
bsm.setCursorResponse(m); bsm.setCursorResponse(m);
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), channel.writeAndFlush(getError(request.getHeader().getTransactionId(),
e, MessageType.CURSOR_REQUEST)); e, MessageType.CURSOR_REQUEST));
} }
} }
...@@ -417,9 +419,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -417,9 +419,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
SyncMessage bsm = SyncMessage bsm =
new SyncMessage(MessageType.REGISTER_RESPONSE); new SyncMessage(MessageType.REGISTER_RESPONSE);
bsm.setRegisterResponse(m); bsm.setRegisterResponse(m);
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.REGISTER_REQUEST)); MessageType.REGISTER_REQUEST));
} }
} }
...@@ -502,9 +504,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { ...@@ -502,9 +504,9 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler {
SyncMessage bsm = SyncMessage bsm =
new SyncMessage(MessageType.CLUSTER_JOIN_RESPONSE); new SyncMessage(MessageType.CLUSTER_JOIN_RESPONSE);
bsm.setClusterJoinResponse(cjrm); bsm.setClusterJoinResponse(cjrm);
channel.write(bsm); channel.writeAndFlush(bsm);
} catch (Exception e) { } catch (Exception e) {
channel.write(getError(request.getHeader().getTransactionId(), e, channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e,
MessageType.CLUSTER_JOIN_REQUEST)); MessageType.CLUSTER_JOIN_REQUEST));
} }
} }
......
...@@ -5,7 +5,6 @@ import io.netty.channel.ChannelInitializer; ...@@ -5,7 +5,6 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer; import io.netty.util.Timer;
import org.sdnplatform.sync.internal.SyncManager; import org.sdnplatform.sync.internal.SyncManager;
...@@ -25,12 +24,12 @@ public class RPCChannelInitializer extends ChannelInitializer<Channel> { ...@@ -25,12 +24,12 @@ public class RPCChannelInitializer extends ChannelInitializer<Channel> {
private static final int maxFrameSize = 512 * 1024; private static final int maxFrameSize = 512 * 1024;
public RPCChannelInitializer(SyncManager syncManager, public RPCChannelInitializer(SyncManager syncManager,
RPCService rpcService) { RPCService rpcService,
Timer timer) {
super(); super();
this.syncManager = syncManager; this.syncManager = syncManager;
this.rpcService = rpcService; this.rpcService = rpcService;
this.timer = timer;
this.timer = new HashedWheelTimer();
} }
@Override @Override
......
...@@ -31,6 +31,7 @@ import io.netty.channel.group.DefaultChannelGroup; ...@@ -31,6 +31,7 @@ import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.Timer;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import org.sdnplatform.sync.internal.SyncManager; import org.sdnplatform.sync.internal.SyncManager;
...@@ -67,27 +68,22 @@ public class RPCService { ...@@ -67,27 +68,22 @@ public class RPCService {
/** /**
* {@link EventLoopGroup} used for netty boss threads * {@link EventLoopGroup} used for netty boss threads
*/ */
protected EventLoopGroup bossExecutor; protected EventLoopGroup bossGroup;
/** /**
* {@link EventLoopGroup} used for netty worker threads * {@link EventLoopGroup} used for netty worker threads
*/ */
protected EventLoopGroup workerExecutor; protected EventLoopGroup workerGroup;
/** /**
* Netty {@link ClientBootstrap} used for creating client connections * Netty {@link ClientBootstrap} used for creating client connections
*/ */
protected Bootstrap clientBootstrap; protected Bootstrap clientBootstrap;
/**
* Netty {@link ServerBootstrap} used for creating server connections
*/
protected ServerBootstrap serverBootstrap;
/** /**
* {@link RPCChannelInitializer} for creating connections * {@link RPCChannelInitializer} for creating connections
*/ */
protected RPCChannelInitializer pipelineFactory; protected RPCChannelInitializer channelInitializer;
/** /**
* Node connections * Node connections
...@@ -126,6 +122,11 @@ public class RPCService { ...@@ -126,6 +122,11 @@ public class RPCService {
*/ */
protected SingletonTask reconnectTask; protected SingletonTask reconnectTask;
/**
* Timer used for timeouts
*/
private final Timer timer;
/** /**
* If we want to rate-limit certain types of messages, we can do * If we want to rate-limit certain types of messages, we can do
* so by limiting the overall number of outstanding messages. * so by limiting the overall number of outstanding messages.
...@@ -166,10 +167,12 @@ public class RPCService { ...@@ -166,10 +167,12 @@ public class RPCService {
protected static final int MAX_PENDING_MESSAGES = 500; protected static final int MAX_PENDING_MESSAGES = 500;
public RPCService(SyncManager syncManager, public RPCService(SyncManager syncManager,
IDebugCounterService debugCounter) { IDebugCounterService debugCounter,
Timer timer) {
super(); super();
this.syncManager = syncManager; this.syncManager = syncManager;
this.debugCounter = debugCounter; this.debugCounter = debugCounter;
this.timer = timer;
messageWindows = new ConcurrentHashMap<Short, MessageWindow>(); messageWindows = new ConcurrentHashMap<Short, MessageWindow>();
} }
...@@ -209,13 +212,13 @@ public class RPCService { ...@@ -209,13 +212,13 @@ public class RPCService {
} }
}; };
bossExecutor = new NioEventLoopGroup(0, f2); bossGroup = new NioEventLoopGroup(0, f2);
workerExecutor = new NioEventLoopGroup(0, f2); workerGroup = new NioEventLoopGroup(0, f2);
pipelineFactory = new RPCChannelInitializer(syncManager, this); channelInitializer = new RPCChannelInitializer(syncManager, this, timer);
startServer(pipelineFactory); startServer(channelInitializer);
startClients(pipelineFactory); startClients(channelInitializer);
} }
/** /**
...@@ -230,14 +233,13 @@ public class RPCService { ...@@ -230,14 +233,13 @@ public class RPCService {
} }
clientBootstrap = null; clientBootstrap = null;
serverBootstrap = null; channelInitializer = null;
pipelineFactory = null; if (bossGroup != null)
if (bossExecutor != null) NettyUtils.shutdownAndWait("Sync RPC Service boss group", bossGroup);
NettyUtils.shutdownAndWait("boss group", bossExecutor); bossGroup = null;
bossExecutor = null; if (workerGroup != null)
if (workerExecutor != null) NettyUtils.shutdownAndWait("Sync RPC Service worker group", workerGroup);
NettyUtils.shutdownAndWait("worker group", workerExecutor); workerGroup = null;
workerExecutor = null;
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.warn("Interrupted while shutting down RPC server"); logger.warn("Interrupted while shutting down RPC server");
} }
...@@ -267,7 +269,7 @@ public class RPCService { ...@@ -267,7 +269,7 @@ public class RPCService {
NodeConnection nc = connections.get(nodeId); NodeConnection nc = connections.get(nodeId);
if (nc != null && nc.state == NodeConnectionState.CONNECTED) { if (nc != null && nc.state == NodeConnectionState.CONNECTED) {
waitForMessageWindow(bsm.getType(), nodeId, 0); waitForMessageWindow(bsm.getType(), nodeId, 0);
nc.nodeChannel.write(bsm); nc.nodeChannel.writeAndFlush(bsm);
return true; return true;
} }
return false; return false;
...@@ -423,7 +425,7 @@ public class RPCService { ...@@ -423,7 +425,7 @@ public class RPCService {
*/ */
protected void startServer(RPCChannelInitializer channelInitializer) { protected void startServer(RPCChannelInitializer channelInitializer) {
final ServerBootstrap bootstrap = new ServerBootstrap(); final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossExecutor, workerExecutor) bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) .channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_KEEPALIVE, true)
...@@ -445,8 +447,6 @@ public class RPCService { ...@@ -445,8 +447,6 @@ public class RPCService {
ChannelFuture bindFuture = bootstrap.bind(sa); ChannelFuture bindFuture = bootstrap.bind(sa);
cg.add(bindFuture.channel()); cg.add(bindFuture.channel());
serverBootstrap = bootstrap;
logger.info("Listening for internal floodlight RPC on {}", sa); logger.info("Listening for internal floodlight RPC on {}", sa);
} }
...@@ -513,7 +513,7 @@ public class RPCService { ...@@ -513,7 +513,7 @@ public class RPCService {
*/ */
protected void startClients(RPCChannelInitializer channelInitializer) { protected void startClients(RPCChannelInitializer channelInitializer) {
final Bootstrap bootstrap = new Bootstrap(); final Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerExecutor) bootstrap.group(workerGroup)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.SO_KEEPALIVE, true)
......
...@@ -70,6 +70,10 @@ public class BootstrapTest { ...@@ -70,6 +70,10 @@ public class BootstrapTest {
new File(dbFolder.getRoot(), new File(dbFolder.getRoot(),
"server" + i).getAbsolutePath(); "server" + i).getAbsolutePath();
fmc.addConfigParam(syncManager, "dbPath", dbPath); fmc.addConfigParam(syncManager, "dbPath", dbPath);
/*fmc.addConfigParam(syncManager, "keystorePath", keyStorePath);
fmc.addConfigParam(syncManager, "keystorePassword", keyStorePassword);
fmc.addConfigParam(syncManager, "authScheme", dbPath);
fmc.addConfigParam(syncManager, "port", dbPath);*/
tp.init(fmc); tp.init(fmc);
syncManager.init(fmc); syncManager.init(fmc);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment