diff --git a/src/main/java/net/floodlightcontroller/core/util/NettyUtils.java b/src/main/java/net/floodlightcontroller/core/util/NettyUtils.java index 88e5a1b9e927f6fbb7c730c3427f6f08ad265c99..e45188df50cb2325124e6eeed9b7bc34c7cd7bcc 100644 --- a/src/main/java/net/floodlightcontroller/core/util/NettyUtils.java +++ b/src/main/java/net/floodlightcontroller/core/util/NettyUtils.java @@ -10,7 +10,7 @@ import org.slf4j.LoggerFactory; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.Future; -/** Collection of static utilitiy functions for netty. +/** Collection of static utility functions for netty. * * @author Andreas Wundsam <andreas.wundsam@bigswitch.com> */ diff --git a/src/main/java/net/floodlightcontroller/core/util/ThriftFrameDecoder.java b/src/main/java/net/floodlightcontroller/core/util/ThriftFrameDecoder.java new file mode 100644 index 0000000000000000000000000000000000000000..1dd2d9297c3243fc3d91d09702a99fe6c6796172 --- /dev/null +++ b/src/main/java/net/floodlightcontroller/core/util/ThriftFrameDecoder.java @@ -0,0 +1,64 @@ +package net.floodlightcontroller.core.util; + +import java.util.ArrayList; +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import org.apache.thrift.TBase; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.transport.TIOStreamTransport; + +/** + * Slice and decode Thrift Serialized Messages from the channel, push a {@link List} of Messages upstream. + * + * Each message is preceeded by a four-byte length field. + * + * Subclasses should implement {@link #allocateMessage()} to allocate an instance of the + * desired message type. + * + * Implementation note: this decoder class was initially built for netty3, and is unnecessarily + * complex and inelegant. In particular, netty has efficient support for handling lists of messages + * now, so this could be replaced by a plain {@link LengthFieldBasedFrameDecoder} to do the slicing + * and then a simple ThriftDecoder that would just decode one message at a time. + * + * @author readams + * @author Andreas Wundsam <andreas.wundsam@bigswitch.com> + */ +public abstract class ThriftFrameDecoder<T extends TBase<?,?>> extends LengthFieldBasedFrameDecoder { + public ThriftFrameDecoder(int maxSize) { + super(maxSize, 0, 4, 0, 4); + } + + protected abstract T allocateMessage(); + + @Override + protected final Object decode(ChannelHandlerContext ctx, + ByteBuf buffer) throws Exception { + /* This is initialized to null because the decode function must return + * null if the buffer does not contain a complete frame and cannot be + * decoded. + */ + List<T> ms = null; + ByteBuf frame = null; + while (null != (frame = (ByteBuf) super.decode(ctx, buffer))) { + if (ms == null) ms = new ArrayList<T>(); + ByteBufInputStream is = new ByteBufInputStream(frame); + TCompactProtocol thriftProtocol = + new TCompactProtocol(new TIOStreamTransport(is)); + T message = allocateMessage(); + message.read(thriftProtocol); + ms.add(message); + } + return ms; + } + + @Override + protected final ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, + int length) { + return buffer.slice(index, length); + } +} \ No newline at end of file diff --git a/src/main/java/net/floodlightcontroller/core/util/ThriftFrameEncoder.java b/src/main/java/net/floodlightcontroller/core/util/ThriftFrameEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..3abd51318477abf6bf132b1d9a7a979d91180493 --- /dev/null +++ b/src/main/java/net/floodlightcontroller/core/util/ThriftFrameEncoder.java @@ -0,0 +1,45 @@ +package net.floodlightcontroller.core.util; + +import org.apache.thrift.TBase; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.transport.TIOStreamTransport; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + + +/** + * Serialize a single Thrift message into the channel. + * + * It uses thrift's {@link TCompactProtocol} and frames the message by preprending a four + * byte length field (big endian). + * + * Note: needs to be subclasses with a concrete type implementing T. + * + * @author readams + * @author Andreas Wundsam <andreas.wundsam@bigswitch.com> + */ +public abstract class ThriftFrameEncoder<T extends TBase<?,?>> extends MessageToByteEncoder<T> { + @Override + protected void encode(ChannelHandlerContext ctx, T msg, ByteBuf out) + throws Exception { + + int lengthIndex = out.writerIndex(); + // length field, will be filled in later. + out.writeInt(0); + + int startIndex = out.writerIndex(); + ByteBufOutputStream os = new ByteBufOutputStream(out); + TCompactProtocol thriftProtocol = + new TCompactProtocol(new TIOStreamTransport(os)); + msg.write(thriftProtocol); + os.close(); + int endIndex = out.writerIndex(); + + // update the length field + int length = endIndex - startIndex; + out.setInt(lengthIndex, length); + } +} \ No newline at end of file diff --git a/src/main/java/net/floodlightcontroller/core/web/serializers/StatsReplySerializer.java b/src/main/java/net/floodlightcontroller/core/web/serializers/StatsReplySerializer.java index 0c6a520e3161a05f6589f09b1e152ebf78ff5b9c..e3981da5fcd74ae4b0abc54b52348c52d3830bdd 100644 --- a/src/main/java/net/floodlightcontroller/core/web/serializers/StatsReplySerializer.java +++ b/src/main/java/net/floodlightcontroller/core/web/serializers/StatsReplySerializer.java @@ -42,6 +42,7 @@ import org.projectfloodlight.openflow.protocol.OFGroupFeaturesStatsReply; import org.projectfloodlight.openflow.protocol.OFGroupStatsEntry; import org.projectfloodlight.openflow.protocol.OFGroupStatsReply; import org.projectfloodlight.openflow.protocol.OFMeterBandStats; +import org.projectfloodlight.openflow.protocol.OFMeterConfig; import org.projectfloodlight.openflow.protocol.OFMeterConfigStatsReply; import org.projectfloodlight.openflow.protocol.OFMeterFeatures; import org.projectfloodlight.openflow.protocol.OFMeterFeaturesStatsReply; @@ -409,37 +410,46 @@ public class StatsReplySerializer extends JsonSerializer<StatsReply> { jGen.writeStringField("version", meterConfigReply.getVersion().toString()); //return the enum name jGen.writeFieldName("meterConfig"); jGen.writeStartArray(); - for (OFMeterBand band : meterConfigReply.getEntries()) { + for (OFMeterConfig config : meterConfigReply.getEntries()) { jGen.writeStartObject(); - short type = (short)band.getType(); - jGen.writeNumberField("bandType",type); - - switch (type) { - case OFMeterBandTypeSerializerVer13.DROP_VAL: - OFMeterBandDrop bandDrop = (OFMeterBandDrop) band; - jGen.writeNumberField("rate", bandDrop.getRate()); - jGen.writeNumberField("burstSize", bandDrop.getBurstSize()); - break; + jGen.writeNumberField("meterId", config.getMeterId()); + jGen.writeNumberField("flags", config.getFlags()); + jGen.writeFieldName("meterBands"); + jGen.writeStartArray(); + for (OFMeterBand band : config.getEntries()) { + jGen.writeStartObject(); + short type = (short)band.getType(); + jGen.writeNumberField("bandType",type); + + switch (type) { + case OFMeterBandTypeSerializerVer13.DROP_VAL: + OFMeterBandDrop bandDrop = (OFMeterBandDrop) band; + jGen.writeNumberField("rate", bandDrop.getRate()); + jGen.writeNumberField("burstSize", bandDrop.getBurstSize()); + break; - case OFMeterBandTypeSerializerVer13.DSCP_REMARK_VAL: - OFMeterBandDscpRemark bandDscp = (OFMeterBandDscpRemark) band; - jGen.writeNumberField("rate", bandDscp.getRate()); - jGen.writeNumberField("burstSize", bandDscp.getBurstSize()); - jGen.writeNumberField("precLevel", bandDscp.getPrecLevel()); - break; + case OFMeterBandTypeSerializerVer13.DSCP_REMARK_VAL: + OFMeterBandDscpRemark bandDscp = (OFMeterBandDscpRemark) band; + jGen.writeNumberField("rate", bandDscp.getRate()); + jGen.writeNumberField("burstSize", bandDscp.getBurstSize()); + jGen.writeNumberField("precLevel", bandDscp.getPrecLevel()); + break; - case OFMeterBandTypeSerializerVer13.EXPERIMENTER_VAL: - OFMeterBandExperimenter bandExp = (OFMeterBandExperimenter) band; - jGen.writeNumberField("rate", bandExp.getRate()); - jGen.writeNumberField("burstSize", bandExp.getBurstSize()); - jGen.writeNumberField("experimenter", bandExp.getExperimenter()); - break; + case OFMeterBandTypeSerializerVer13.EXPERIMENTER_VAL: + OFMeterBandExperimenter bandExp = (OFMeterBandExperimenter) band; + jGen.writeNumberField("rate", bandExp.getRate()); + jGen.writeNumberField("burstSize", bandExp.getBurstSize()); + jGen.writeNumberField("experimenter", bandExp.getExperimenter()); + break; - default: - // shouldn't ever get here - break; - }//end of Switch Case + default: + // shouldn't ever get here + break; + }//end of Switch Case + jGen.writeEndObject(); + }//end of for loop + jGen.writeEndArray(); jGen.writeEndObject(); }//end of for loop jGen.writeEndArray(); diff --git a/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java b/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java index f1efbe49f5ae79ce1bfb529eeb720cb681085e54..4cd7442dfd745431dea125eb2ec544ec97326382 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/SyncStoreCCProvider.java @@ -26,7 +26,7 @@ import org.sdnplatform.sync.Versioned; import org.sdnplatform.sync.error.ObsoleteVersionException; import org.sdnplatform.sync.error.SyncException; import org.sdnplatform.sync.internal.SyncManager; -import org.sdnplatform.sync.internal.config.bootstrap.Bootstrap; +import org.sdnplatform.sync.internal.config.bootstrap.BootstrapClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -345,7 +345,7 @@ public class SyncStoreCCProvider hosts.add(HostAndPort.fromString(s). withDefaultPort(6642)); } - Bootstrap bs = new Bootstrap(syncManager, + BootstrapClient bs = new BootstrapClient(syncManager, authScheme, keyStorePath, keyStorePassword); diff --git a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelHandler.java b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelHandler.java index 4e7802b7889db6c85b077fe9bc60a52ab1249973..1272dd849d0578d7a28a564bc6a05bd49206db13 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelHandler.java @@ -1,8 +1,7 @@ package org.sdnplatform.sync.internal.config.bootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import org.sdnplatform.sync.IStoreClient; import org.sdnplatform.sync.Versioned; import org.sdnplatform.sync.error.AuthException; @@ -30,10 +29,10 @@ public class BootstrapChannelHandler extends AbstractRPCChannelHandler { protected static final Logger logger = LoggerFactory.getLogger(BootstrapChannelHandler.class); - private Bootstrap bootstrap; + private BootstrapClient bootstrap; private Short remoteNodeId; - public BootstrapChannelHandler(Bootstrap bootstrap) { + public BootstrapChannelHandler(BootstrapClient bootstrap) { super(); this.bootstrap = bootstrap; } @@ -43,9 +42,9 @@ public class BootstrapChannelHandler extends AbstractRPCChannelHandler { // **************************** @Override - public void channelOpen(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - bootstrap.cg.add(ctx.getChannel()); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + bootstrap.getChannelGroup().add(ctx.channel()); + super.channelActive(ctx); } // ****************************************** @@ -73,7 +72,7 @@ public class BootstrapChannelHandler extends AbstractRPCChannelHandler { SyncMessage bsm = new SyncMessage(MessageType.CLUSTER_JOIN_REQUEST); bsm.setClusterJoinRequest(cjrm); - channel.write(bsm); + channel.writeAndFlush(bsm); } @Override @@ -107,7 +106,7 @@ public class BootstrapChannelHandler extends AbstractRPCChannelHandler { bootstrap.succeeded = true; } catch (Exception e) { logger.error("Error processing cluster join response", e); - channel.write(getError(response.getHeader().getTransactionId(), e, + channel.writeAndFlush(getError(response.getHeader().getTransactionId(), e, MessageType.CLUSTER_JOIN_RESPONSE)); } channel.disconnect(); diff --git a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelInitializer.java b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..845836ae1db61d1a46a78bee1720474fa98c71a2 --- /dev/null +++ b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapChannelInitializer.java @@ -0,0 +1,39 @@ +package org.sdnplatform.sync.internal.config.bootstrap; + +import org.sdnplatform.sync.internal.rpc.SyncMessageDecoder; +import org.sdnplatform.sync.internal.rpc.SyncMessageEncoder; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.util.Timer; + + + +public class BootstrapChannelInitializer extends ChannelInitializer<Channel> { + private final BootstrapClient bootstrap; + private static final int maxFrameSize = 1024 * 1024 * 10; + protected Timer timer; + + public BootstrapChannelInitializer(Timer timer, BootstrapClient bootstrap) { + super(); + this.timer = timer; + this.bootstrap = bootstrap; + } + + @Override + protected void initChannel(Channel ch) throws Exception { + BootstrapChannelHandler handler = + new BootstrapChannelHandler(bootstrap); + + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast("syncMessageDecoder", new SyncMessageDecoder(maxFrameSize)); + + pipeline.addLast("syncMessageEncoder", new SyncMessageEncoder()); + + pipeline.addLast("timeout", new BootstrapTimeoutHandler(timer, 10)); + + pipeline.addLast("handler", handler); + } +} \ No newline at end of file diff --git a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/Bootstrap.java b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapClient.java similarity index 53% rename from src/main/java/org/sdnplatform/sync/internal/config/bootstrap/Bootstrap.java rename to src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapClient.java index 6b994e3c8dc3960eb716d1f822bf8d9a45a9c578..af6965210f81d253a9d96bdabec825ab69532767 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/Bootstrap.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapClient.java @@ -2,16 +2,23 @@ package org.sdnplatform.sync.internal.config.bootstrap; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.timeout.TimeoutException; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; +import io.netty.util.concurrent.GlobalEventExecutor; +import net.floodlightcontroller.core.util.NettyUtils; + import org.sdnplatform.sync.error.SyncException; import org.sdnplatform.sync.internal.SyncManager; import org.sdnplatform.sync.internal.config.AuthScheme; @@ -27,14 +34,14 @@ import com.google.common.net.HostAndPort; * local system store * @author readams */ -public class Bootstrap { +public class BootstrapClient { protected static final Logger logger = - LoggerFactory.getLogger(Bootstrap.class); + LoggerFactory.getLogger(BootstrapClient.class); /** * Channel group that will hold all our channels */ - protected ChannelGroup cg; + private ChannelGroup cg; /** * Transaction ID used in message headers in the RPC protocol @@ -49,15 +56,16 @@ public class Bootstrap { protected final String keyStorePath; protected final String keyStorePassword; - ExecutorService bossExecutor = null; - ExecutorService workerExecutor = null; - ClientBootstrap bootstrap = null; - BootstrapPipelineFactory pipelineFactory; + EventLoopGroup workerExecutor = null; + Bootstrap bootstrap = null; + BootstrapChannelInitializer pipelineFactory; protected Node localNode; protected volatile boolean succeeded = false; + + private Timer timer; - public Bootstrap(SyncManager syncManager, AuthScheme authScheme, + public BootstrapClient(SyncManager syncManager, AuthScheme authScheme, String keyStorePath, String keyStorePassword) { super(); this.syncManager = syncManager; @@ -67,25 +75,23 @@ public class Bootstrap { } public void init() throws SyncException { - cg = new DefaultChannelGroup("Cluster Bootstrap"); - - bossExecutor = Executors.newCachedThreadPool(); - workerExecutor = Executors.newCachedThreadPool(); + cg = new DefaultChannelGroup("Cluster Bootstrap", GlobalEventExecutor.INSTANCE); - bootstrap = - new ClientBootstrap(new NioClientSocketChannelFactory(bossExecutor, - workerExecutor)); - bootstrap.setOption("child.reuseAddr", true); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.sendBufferSize", - RPCService.SEND_BUFFER_SIZE); - bootstrap.setOption("child.receiveBufferSize", - RPCService.SEND_BUFFER_SIZE); - bootstrap.setOption("child.connectTimeoutMillis", - RPCService.CONNECT_TIMEOUT); - pipelineFactory = new BootstrapPipelineFactory(this); - bootstrap.setPipelineFactory(pipelineFactory); + workerExecutor = new NioEventLoopGroup(); + timer = new HashedWheelTimer(); + + bootstrap = new Bootstrap() + .group(workerExecutor) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, RPCService.SEND_BUFFER_SIZE) + .option(ChannelOption.SO_RCVBUF, RPCService.SEND_BUFFER_SIZE) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, RPCService.CONNECT_TIMEOUT); + + pipelineFactory = new BootstrapChannelInitializer(timer, this); + bootstrap.handler(pipelineFactory); } public void shutdown() { @@ -93,18 +99,20 @@ public class Bootstrap { cg.close().awaitUninterruptibly(); cg = null; } - if (bootstrap != null) - bootstrap.releaseExternalResources(); bootstrap = null; - if (pipelineFactory != null) - pipelineFactory.releaseExternalResources(); pipelineFactory = null; - if (workerExecutor != null) - workerExecutor.shutdown(); - workerExecutor = null; - if (bossExecutor != null) - bossExecutor.shutdown(); - bossExecutor = null; + if (workerExecutor != null) { + try { + NettyUtils.shutdownAndWait("Sync BootstrapClient", workerExecutor); + } catch (InterruptedException | TimeoutException e) { + logger.warn("Error waiting for gracefull shutdown of BootstrapClient {}", e); + } + workerExecutor = null; + } + if (timer != null) { + timer.stop(); + timer = null; + } } public boolean bootstrap(HostAndPort seed, @@ -116,20 +124,24 @@ public class Bootstrap { ChannelFuture future = bootstrap.connect(sa); future.awaitUninterruptibly(); if (!future.isSuccess()) { - logger.debug("Could not connect to " + seed, future.getCause()); + logger.debug("Could not connect to " + seed, future.cause()); return false; } - Channel channel = future.getChannel(); + Channel channel = future.channel(); logger.debug("[{}] Connected to {}", localNode != null ? localNode.getNodeId() : null, seed); try { - channel.getCloseFuture().await(); + channel.closeFuture().await(); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for bootstrap"); return succeeded; } return succeeded; } + + public ChannelGroup getChannelGroup() { + return cg; + } } diff --git a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapPipelineFactory.java b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapPipelineFactory.java deleted file mode 100644 index f8850150fed79064b32ae678e1144bac79c2cbf0..0000000000000000000000000000000000000000 --- a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapPipelineFactory.java +++ /dev/null @@ -1,46 +0,0 @@ -package org.sdnplatform.sync.internal.config.bootstrap; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timer; -import org.sdnplatform.sync.internal.rpc.ThriftFrameDecoder; -import org.sdnplatform.sync.internal.rpc.ThriftFrameEncoder; - -public class BootstrapPipelineFactory - implements ChannelPipelineFactory, ExternalResourceReleasable { - private Bootstrap bootstrap; - private static final int maxFrameSize = 1024 * 1024 * 10; - protected Timer timer; - - public BootstrapPipelineFactory(Bootstrap bootstrap) { - super(); - this.bootstrap = bootstrap; - this.timer = new HashedWheelTimer(); - } - - @Override - public ChannelPipeline getPipeline() throws Exception { - BootstrapChannelHandler handler = - new BootstrapChannelHandler(bootstrap); - ChannelPipeline pipeline = Channels.pipeline(); - - pipeline.addLast("frameDecoder", - new ThriftFrameDecoder(maxFrameSize)); - pipeline.addLast("frameEncoder", - new ThriftFrameEncoder()); - pipeline.addLast("timeout", - new BootstrapTimeoutHandler(timer, 10)); - - pipeline.addLast("handler", handler); - - return pipeline; - } - - @Override - public void releaseExternalResources() { - timer.stop(); - } -} diff --git a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapTimeoutHandler.java b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapTimeoutHandler.java index fa2de52701d9f7f75ac5389d87bc1d7fb55c9414..9ad9cb7d30a449e460646a030f7e8b8556fa24d4 100644 --- a/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapTimeoutHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/config/bootstrap/BootstrapTimeoutHandler.java @@ -18,18 +18,16 @@ package org.sdnplatform.sync.internal.config.bootstrap; import java.util.concurrent.TimeUnit; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.Timer; -import org.jboss.netty.util.TimerTask; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; /** * Trigger a timeout if the bootstrap process stalls */ -public class BootstrapTimeoutHandler - extends SimpleChannelUpstreamHandler { +public class BootstrapTimeoutHandler extends ChannelInboundHandlerAdapter { final Timer timer; final long timeoutNanos; @@ -44,22 +42,23 @@ public class BootstrapTimeoutHandler } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (timeoutNanos > 0) { timeout = timer.newTimeout(new HandshakeTimeoutTask(ctx), timeoutNanos, TimeUnit.NANOSECONDS); } - ctx.sendUpstream(e); + super.channelActive(ctx); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (timeout != null) { timeout.cancel(); timeout = null; } + super.channelInactive(ctx); } private final class HandshakeTimeoutTask implements TimerTask { @@ -76,10 +75,10 @@ public class BootstrapTimeoutHandler return; } - if (!ctx.getChannel().isOpen()) { + if (!ctx.channel().isOpen()) { return; } - ctx.getChannel().disconnect(); + ctx.channel().disconnect(); } } } diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RSHandshakeTimeoutHandler.java b/src/main/java/org/sdnplatform/sync/internal/remote/RSHandshakeTimeoutHandler.java index b8cf1b7e9aa33791d34e2f5ea55d8aacb85886ea..4c6ad767c2ba91ff7f622e98db20311fb4d320e8 100644 --- a/src/main/java/org/sdnplatform/sync/internal/remote/RSHandshakeTimeoutHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/remote/RSHandshakeTimeoutHandler.java @@ -18,18 +18,16 @@ package org.sdnplatform.sync.internal.remote; import java.util.concurrent.TimeUnit; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.Timer; -import org.jboss.netty.util.TimerTask; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; /** * Trigger a timeout if the bootstrap process stalls */ -public class RSHandshakeTimeoutHandler - extends SimpleChannelUpstreamHandler { +public class RSHandshakeTimeoutHandler extends ChannelInboundHandlerAdapter { final Timer timer; final long timeoutNanos; @@ -46,22 +44,23 @@ public class RSHandshakeTimeoutHandler } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (timeoutNanos > 0) { timeout = timer.newTimeout(new HandshakeTimeoutTask(ctx), timeoutNanos, TimeUnit.NANOSECONDS); } - ctx.sendUpstream(e); + super.channelActive(ctx); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (timeout != null) { timeout.cancel(); timeout = null; } + super.channelInactive(ctx); } private final class HandshakeTimeoutTask implements TimerTask { @@ -78,11 +77,11 @@ public class RSHandshakeTimeoutHandler return; } - if (!ctx.getChannel().isOpen()) { + if (!ctx.channel().isOpen()) { return; } if (channelHandler.syncManager.ready == false) - ctx.getChannel().disconnect(); + ctx.channel().disconnect(); } } } diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelHandler.java b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelHandler.java index 9ce4ecbb375e24949c093847b6d44c5f10942881..38ce819579c27f611e4049acd73d95d0b9f9ee9a 100644 --- a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelHandler.java @@ -2,9 +2,8 @@ package org.sdnplatform.sync.internal.remote; import java.util.List; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; import org.sdnplatform.sync.Versioned; import org.sdnplatform.sync.error.AuthException; import org.sdnplatform.sync.error.SyncException; @@ -42,17 +41,17 @@ public class RemoteSyncChannelHandler extends AbstractRPCChannelHandler { // **************************** @Override - public void channelOpen(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - syncManager.cg.add(ctx.getChannel()); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + syncManager.cg.add(ctx.channel()); + super.channelActive(ctx); } @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { this.syncManager.channel = null; syncManager.ready = false; syncManager.channelDisconnected(null); + super.channelInactive(ctx); } // ****************************************** diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelInitializer.java b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..bba84651d504a73698085bb1d9d9cea3e5745ef9 --- /dev/null +++ b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncChannelInitializer.java @@ -0,0 +1,47 @@ +package org.sdnplatform.sync.internal.remote; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.util.Timer; + +import org.sdnplatform.sync.internal.rpc.SyncMessageDecoder; +import org.sdnplatform.sync.internal.rpc.SyncMessageEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Pipeline factory for the remote sync service + * @author readams + */ +public class RemoteSyncChannelInitializer extends ChannelInitializer<Channel> { + protected static final Logger logger = + LoggerFactory.getLogger(RemoteSyncChannelInitializer.class.getName()); + + private final RemoteSyncManager syncManager; + private final Timer timer; + + private static final int maxFrameSize = 1024 * 1024 * 10; + + public RemoteSyncChannelInitializer(Timer timer, RemoteSyncManager syncManager) { + super(); + this.syncManager = syncManager; + this.timer = timer; + } + + @Override + protected void initChannel(Channel ch) throws Exception { + RemoteSyncChannelHandler channelHandler = + new RemoteSyncChannelHandler(syncManager); + + ChannelPipeline pipeline = ch.pipeline(); + + pipeline.addLast("syncMessageDecoder", new SyncMessageDecoder(maxFrameSize)); + + pipeline.addLast("syncMessageEncoder", new SyncMessageEncoder()); + + pipeline.addLast("timeout", new RSHandshakeTimeoutHandler(channelHandler, timer, 3)); + + pipeline.addLast("handler", channelHandler); + } +} \ No newline at end of file diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java index efcc3ec67d602d722420d0b1989a77b97045a720..89491128d30ea75d1a4f653eb47613390871bfbe 100644 --- a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java +++ b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncManager.java @@ -5,19 +5,23 @@ import java.net.SocketAddress; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.HashedWheelTimer; +import io.netty.util.concurrent.GlobalEventExecutor; + import org.sdnplatform.sync.error.RemoteStoreException; import org.sdnplatform.sync.error.SyncException; import org.sdnplatform.sync.error.SyncRuntimeException; @@ -40,6 +44,7 @@ import org.slf4j.LoggerFactory; import net.floodlightcontroller.core.module.FloodlightModuleContext; import net.floodlightcontroller.core.module.FloodlightModuleException; import net.floodlightcontroller.core.module.IFloodlightService; +import net.floodlightcontroller.core.util.NettyUtils; /** * Implementation of a sync service that passes its functionality off to a @@ -53,10 +58,9 @@ public class RemoteSyncManager extends AbstractSyncManager { /** * Channel group that will hold all our channels */ - final ChannelGroup cg = new DefaultChannelGroup("Internal RPC"); - RemoteSyncPipelineFactory pipelineFactory; - ExecutorService bossExecutor; - ExecutorService workerExecutor; + final ChannelGroup cg = new DefaultChannelGroup("Internal RPC", GlobalEventExecutor.INSTANCE); + RemoteSyncChannelInitializer pipelineFactory; + EventLoopGroup workerExecutor; /** * Active connection to server @@ -75,7 +79,7 @@ public class RemoteSyncManager extends AbstractSyncManager { /** * Client bootstrap */ - protected ClientBootstrap clientBootstrap; + protected Bootstrap clientBootstrap; /** * Transaction ID used in message headers in the RPC protocol @@ -92,6 +96,11 @@ public class RemoteSyncManager extends AbstractSyncManager { */ protected int port = 6642; + /** + * Timer for Netty + */ + private HashedWheelTimer timer; + protected AuthScheme authScheme; protected String keyStorePath; protected String keyStorePassword; @@ -153,19 +162,16 @@ public class RemoteSyncManager extends AbstractSyncManager { logger.debug("Failed to cleanly shut down remote sync"); return; } - if (clientBootstrap != null) { - clientBootstrap.releaseExternalResources(); - } clientBootstrap = null; - if (pipelineFactory != null) - pipelineFactory.releaseExternalResources(); pipelineFactory = null; - if (workerExecutor != null) - workerExecutor.shutdown(); - workerExecutor = null; - if (bossExecutor != null) - bossExecutor.shutdown(); - bossExecutor = null; + if (workerExecutor != null) { + NettyUtils.shutdownAndWait("worker group", workerExecutor); + workerExecutor = null; + } + if (timer != null) { + timer.stop(); + timer = null; + } } catch (InterruptedException e) { logger.debug("Interrupted while shutting down remote sync"); } @@ -195,24 +201,22 @@ public class RemoteSyncManager extends AbstractSyncManager { public void startUp(FloodlightModuleContext context) throws FloodlightModuleException { shutdown = false; - bossExecutor = Executors.newCachedThreadPool(); - workerExecutor = Executors.newCachedThreadPool(); + workerExecutor = new NioEventLoopGroup(); + timer = new HashedWheelTimer(); + + pipelineFactory = new RemoteSyncChannelInitializer(timer, this); + + final Bootstrap bootstrap = new Bootstrap() + .channel(NioSocketChannel.class) + .group(workerExecutor) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, RPCService.SEND_BUFFER_SIZE) + .option(ChannelOption.SO_RCVBUF, RPCService.SEND_BUFFER_SIZE) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, RPCService.CONNECT_TIMEOUT) + .handler(pipelineFactory); - final ClientBootstrap bootstrap = - new ClientBootstrap( - new NioClientSocketChannelFactory(bossExecutor, - workerExecutor)); - bootstrap.setOption("child.reuseAddr", true); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.sendBufferSize", - RPCService.SEND_BUFFER_SIZE); - bootstrap.setOption("child.receiveBufferSize", - RPCService.SEND_BUFFER_SIZE); - bootstrap.setOption("child.connectTimeoutMillis", - RPCService.CONNECT_TIMEOUT); - pipelineFactory = new RemoteSyncPipelineFactory(this); - bootstrap.setPipelineFactory(pipelineFactory); clientBootstrap = bootstrap; } @@ -264,7 +268,7 @@ public class RemoteSyncManager extends AbstractSyncManager { } } } - channel.write(request); + channel.writeAndFlush(request); return future; } @@ -316,24 +320,24 @@ public class RemoteSyncManager extends AbstractSyncManager { protected boolean connect(String hostname, int port) { ready = false; - if (channel == null || !channel.isConnected()) { + if (channel == null || !channel.isActive()) { SocketAddress sa = new InetSocketAddress(hostname, port); ChannelFuture future = clientBootstrap.connect(sa); future.awaitUninterruptibly(); if (!future.isSuccess()) { logger.error("Could not connect to " + hostname + - ":" + port, future.getCause()); + ":" + port, future.cause()); return false; } - channel = future.getChannel(); + channel = future.channel(); } - while (!ready && channel != null && channel.isConnected()) { + while (!ready && channel != null && channel.isActive()) { try { Thread.sleep(10); } catch (InterruptedException e) { } } - if (!ready || channel == null || !channel.isConnected()) { + if (!ready || channel == null || !channel.isActive()) { logger.warn("Timed out connecting to {}:{}", hostname, port); return false; } diff --git a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncPipelineFactory.java b/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncPipelineFactory.java deleted file mode 100644 index 4cf2d60bee6b1bcbe2958317d2ce0dda25530156..0000000000000000000000000000000000000000 --- a/src/main/java/org/sdnplatform/sync/internal/remote/RemoteSyncPipelineFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.sdnplatform.sync.internal.remote; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timer; -import org.sdnplatform.sync.internal.rpc.ThriftFrameDecoder; -import org.sdnplatform.sync.internal.rpc.ThriftFrameEncoder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Pipeline factory for the remote sync service - * @author readams - */ -public class RemoteSyncPipelineFactory - implements ChannelPipelineFactory, ExternalResourceReleasable { - protected static final Logger logger = - LoggerFactory.getLogger(RemoteSyncPipelineFactory.class.getName()); - - protected RemoteSyncManager syncManager; - protected Timer timer; - - private static final int maxFrameSize = 1024 * 1024 * 10; - - public RemoteSyncPipelineFactory(RemoteSyncManager syncManager) { - super(); - this.syncManager = syncManager; - this.timer = new HashedWheelTimer(); - } - - @Override - public ChannelPipeline getPipeline() throws Exception { - RemoteSyncChannelHandler channelHandler = - new RemoteSyncChannelHandler(syncManager); - ChannelPipeline pipeline = Channels.pipeline(); - - pipeline.addLast("frameDecoder", - new ThriftFrameDecoder(maxFrameSize)); - pipeline.addLast("frameEncoder", - new ThriftFrameEncoder()); - pipeline.addLast("timeout", - new RSHandshakeTimeoutHandler(channelHandler, - timer, 3)); - - pipeline.addLast("handler", channelHandler); - return pipeline; - } - - @Override - public void releaseExternalResources() { - timer.stop(); - } -} diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/AbstractRPCChannelHandler.java b/src/main/java/org/sdnplatform/sync/internal/rpc/AbstractRPCChannelHandler.java index 8fe90ff43552e1dbec4e9e9191039bddc89c8767..9d0f2b447908b3b413a2e837a5a4a71dddd38448 100644 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/AbstractRPCChannelHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/AbstractRPCChannelHandler.java @@ -11,15 +11,12 @@ import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.xml.bind.DatatypeConverter; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler; -import org.jboss.netty.handler.timeout.IdleStateEvent; -import org.jboss.netty.handler.timeout.ReadTimeoutException; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.ReadTimeoutException; + import org.sdnplatform.sync.error.AuthException; import org.sdnplatform.sync.error.HandshakeTimeoutException; import org.sdnplatform.sync.error.SyncException; @@ -61,8 +58,7 @@ import org.slf4j.LoggerFactory; * a {@link SyncMessage} which will provide specific type information. * @author readams */ -public abstract class AbstractRPCChannelHandler - extends IdleStateAwareChannelHandler { +public abstract class AbstractRPCChannelHandler extends ChannelInboundHandlerAdapter { protected static final Logger logger = LoggerFactory.getLogger(AbstractRPCChannelHandler.class); protected String currentChallenge; @@ -84,8 +80,7 @@ public abstract class AbstractRPCChannelHandler // **************************** @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelActive(ChannelHandlerContext ctx) throws Exception { channelState = ChannelState.CONNECTED; HelloMessage m = new HelloMessage(); @@ -110,12 +105,18 @@ public abstract class AbstractRPCChannelHandler } SyncMessage bsm = new SyncMessage(MessageType.HELLO); bsm.setHello(m); - ctx.getChannel().write(bsm); + ctx.channel().writeAndFlush(bsm); } - + @Override - public void channelIdle(ChannelHandlerContext ctx, - IdleStateEvent e) throws Exception { + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + channelIdle(ctx, (IdleStateEvent) evt); + } + super.userEventTriggered(ctx, evt); + } + + public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { // send an echo request EchoRequestMessage m = new EchoRequestMessage(); AsyncMessageHeader header = new AsyncMessageHeader(); @@ -123,53 +124,50 @@ public abstract class AbstractRPCChannelHandler m.setHeader(header); SyncMessage bsm = new SyncMessage(MessageType.ECHO_REQUEST); bsm.setEchoRequest(m); - ctx.getChannel().write(bsm); + ctx.channel().writeAndFlush(bsm); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, - ExceptionEvent e) throws Exception { - if (e.getCause() instanceof ReadTimeoutException) { + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + if (cause instanceof ReadTimeoutException) { // read timeout logger.error("[{}->{}] Disconnecting RPC node due to read timeout", getLocalNodeIdString(), getRemoteNodeIdString()); - ctx.getChannel().close(); - } else if (e.getCause() instanceof HandshakeTimeoutException) { + ctx.channel().close(); + } else if (cause instanceof HandshakeTimeoutException) { // read timeout logger.error("[{}->{}] Disconnecting RPC node due to " + "handshake timeout", getLocalNodeIdString(), getRemoteNodeIdString()); - ctx.getChannel().close(); - } else if (e.getCause() instanceof ConnectException || - e.getCause() instanceof IOException) { + ctx.channel().close(); + } else if (cause instanceof ConnectException || + cause instanceof IOException) { logger.debug("[{}->{}] {}: {}", new Object[] {getLocalNodeIdString(), getRemoteNodeIdString(), - e.getCause().getClass().getName(), - e.getCause().getMessage()}); + cause.getClass().getName(), + cause.getMessage()}); } else { logger.error("[{}->{}] An error occurred on RPC channel", new Object[]{getLocalNodeIdString(), getRemoteNodeIdString(), - e.getCause()}); - ctx.getChannel().close(); + cause}); + ctx.channel().close(); } } @Override - public void messageReceived(ChannelHandlerContext ctx, - MessageEvent e) throws Exception { - Object message = e.getMessage(); + public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception { if (message instanceof SyncMessage) { - handleSyncMessage((SyncMessage)message, ctx.getChannel()); + handleSyncMessage((SyncMessage)message, ctx.channel()); } else if (message instanceof List) { for (Object i : (List<?>)message) { if (i instanceof SyncMessage) { try { handleSyncMessage((SyncMessage)i, - ctx.getChannel()); + ctx.channel()); } catch (Exception ex) { - Channels.fireExceptionCaught(ctx, ex); + ctx.fireExceptionCaught(ex); } } } @@ -323,7 +321,7 @@ public abstract class AbstractRPCChannelHandler new Object[]{getLocalNodeIdString(), getRemoteNodeIdString(), e.getMessage()}); - channel.write(getError(request.getHeader().getTransactionId(), + channel.writeAndFlush(getError(request.getHeader().getTransactionId(), e, MessageType.HELLO)); channel.close(); } @@ -355,7 +353,7 @@ public abstract class AbstractRPCChannelHandler AuthChallengeResponse reply = new AuthChallengeResponse(); reply.setResponse(generateResponse(cr.getChallenge())); m.setAuthChallengeResponse(reply); - channel.write(bsm); + channel.writeAndFlush(bsm); } else { throw new AuthException("No authentication data in " + "handshake message"); @@ -382,7 +380,7 @@ public abstract class AbstractRPCChannelHandler m.setHeader(header); SyncMessage bsm = new SyncMessage(MessageType.ECHO_REPLY); bsm.setEchoReply(m); - channel.write(bsm); + channel.writeAndFlush(bsm); } protected void handleGetRequest(GetRequestMessage request, @@ -548,7 +546,7 @@ public abstract class AbstractRPCChannelHandler new Object[]{getLocalNodeIdString(), getRemoteNodeIdString(), message}); - channel.write(getError(transactionId, + channel.writeAndFlush(getError(transactionId, new SyncException(message), type)); } diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/HandshakeTimeoutHandler.java b/src/main/java/org/sdnplatform/sync/internal/rpc/HandshakeTimeoutHandler.java index 2ba0e4bb9be944b6729ee23cf0bf31b70040e4d0..7fc9b7f9a59fa3d7f4fef2cdb35d6eafdb7809ee 100644 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/HandshakeTimeoutHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/HandshakeTimeoutHandler.java @@ -19,23 +19,18 @@ package org.sdnplatform.sync.internal.rpc; import java.util.concurrent.TimeUnit; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.Timeout; -import org.jboss.netty.util.Timer; -import org.jboss.netty.util.TimerTask; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; import org.sdnplatform.sync.error.HandshakeTimeoutException; /** * Trigger a timeout if a switch fails to complete handshake soon enough */ -public class HandshakeTimeoutHandler - extends SimpleChannelUpstreamHandler - implements ExternalResourceReleasable { +public class HandshakeTimeoutHandler extends ChannelInboundHandlerAdapter { static final HandshakeTimeoutException EXCEPTION = new HandshakeTimeoutException(); @@ -55,27 +50,23 @@ public class HandshakeTimeoutHandler } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (timeoutNanos > 0) { timeout = timer.newTimeout(new HandshakeTimeoutTask(ctx), timeoutNanos, TimeUnit.NANOSECONDS); } - ctx.sendUpstream(e); + ctx.fireChannelActive(); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (timeout != null) { timeout.cancel(); timeout = null; } - } - - @Override - public void releaseExternalResources() { - timer.stop(); + ctx.fireChannelInactive(); } private final class HandshakeTimeoutTask implements TimerTask { @@ -92,14 +83,14 @@ public class HandshakeTimeoutHandler return; } - if (!ctx.getChannel().isOpen()) { + if (!ctx.channel().isOpen()) { return; } if (!handler.isClientConnection && ((handler.remoteNode == null || !handler.rpcService.isConnected(handler.remoteNode. getNodeId())))) - Channels.fireExceptionCaught(ctx, EXCEPTION); + ctx.fireExceptionCaught(EXCEPTION); } } } diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java index bb7f942cf14abf72bd7aefba05b6d46291f3756a..b6f75e6b596aa9b71e24337c866e5825b801530d 100644 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelHandler.java @@ -8,10 +8,9 @@ import java.util.Map.Entry; import net.floodlightcontroller.debugcounter.IDebugCounter; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; + import org.sdnplatform.sync.IClosableIterator; import org.sdnplatform.sync.IStoreClient; import org.sdnplatform.sync.IVersion; @@ -61,14 +60,12 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { // **************************** @Override - public void channelOpen(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - rpcService.cg.add(ctx.getChannel()); + public void channelActive(ChannelHandlerContext ctx) throws Exception { + rpcService.getChannelGroup().add(ctx.channel()); } @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (remoteNode != null) { rpcService.disconnectNode(remoteNode.getNodeId()); } @@ -78,12 +75,6 @@ public class RPCChannelHandler extends AbstractRPCChannelHandler { // AbstractRPCChannelHandler message handlers // ****************************************** - @Override - public void messageReceived(ChannelHandlerContext ctx, - MessageEvent e) throws Exception { - super.messageReceived(ctx, e); - } - @Override protected void handleHello(HelloMessage hello, Channel channel) { if (!hello.isSetNodeId()) { diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java new file mode 100644 index 0000000000000000000000000000000000000000..57ef76b38f0ab45de1dfffe826c954d616f28b56 --- /dev/null +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCChannelInitializer.java @@ -0,0 +1,59 @@ +package org.sdnplatform.sync.internal.rpc; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timer; + +import org.sdnplatform.sync.internal.SyncManager; + + +/** + * Pipeline factory for the sync service. + * @see SyncManager + * @author readams + */ +public class RPCChannelInitializer extends ChannelInitializer<Channel> { + + protected SyncManager syncManager; + protected RPCService rpcService; + protected Timer timer; + + private static final int maxFrameSize = 512 * 1024; + + public RPCChannelInitializer(SyncManager syncManager, + RPCService rpcService) { + super(); + this.syncManager = syncManager; + this.rpcService = rpcService; + + this.timer = new HashedWheelTimer(); + } + + @Override + protected void initChannel(Channel ch) throws Exception { + RPCChannelHandler channelHandler = + new RPCChannelHandler(syncManager, rpcService); + + IdleStateHandler idleHandler = + new IdleStateHandler(5, 10, 0); + ReadTimeoutHandler readTimeoutHandler = + new ReadTimeoutHandler(30); + + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast("idle", idleHandler); + pipeline.addLast("timeout", readTimeoutHandler); + pipeline.addLast("handshaketimeout", + new HandshakeTimeoutHandler(channelHandler, timer, 10)); + + pipeline.addLast("syncMessageDecoder", + new SyncMessageDecoder(maxFrameSize)); + pipeline.addLast("syncMessageEncoder", + new SyncMessageEncoder()); + + pipeline.addLast("handler", channelHandler); + } +} diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCPipelineFactory.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCPipelineFactory.java deleted file mode 100644 index 4016711e57d8fe851f8a6aa5ad1288974f265e37..0000000000000000000000000000000000000000 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCPipelineFactory.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.sdnplatform.sync.internal.rpc; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.timeout.IdleStateHandler; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -import org.jboss.netty.util.ExternalResourceReleasable; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timer; -import org.sdnplatform.sync.internal.SyncManager; - - -/** - * Pipeline factory for the sync service. - * @see SyncManager - * @author readams - */ -public class RPCPipelineFactory - implements ChannelPipelineFactory, ExternalResourceReleasable { - - protected SyncManager syncManager; - protected RPCService rpcService; - protected Timer timer; - - private static final int maxFrameSize = 512 * 1024; - - public RPCPipelineFactory(SyncManager syncManager, - RPCService rpcService) { - super(); - this.syncManager = syncManager; - this.rpcService = rpcService; - - this.timer = new HashedWheelTimer(); - } - - @Override - public ChannelPipeline getPipeline() throws Exception { - RPCChannelHandler channelHandler = - new RPCChannelHandler(syncManager, rpcService); - - IdleStateHandler idleHandler = - new IdleStateHandler(timer, 5, 10, 0); - ReadTimeoutHandler readTimeoutHandler = - new ReadTimeoutHandler(timer, 30); - - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("idle", idleHandler); - pipeline.addLast("timeout", readTimeoutHandler); - pipeline.addLast("handshaketimeout", - new HandshakeTimeoutHandler(channelHandler, timer, 10)); - - pipeline.addLast("frameDecoder", - new ThriftFrameDecoder(maxFrameSize)); - pipeline.addLast("frameEncoder", - new ThriftFrameEncoder()); - - pipeline.addLast("handler", channelHandler); - return pipeline; - } - - @Override - public void releaseExternalResources() { - timer.stop(); - } -} diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java index 60dbcbbc8e9089476db022de386e70c4bd51162f..b75a880788dd4cb930c612ab78eb6e3885d96038 100644 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/RPCService.java @@ -16,19 +16,23 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.LinkedTransferQueue; +import net.floodlightcontroller.core.util.NettyUtils; import net.floodlightcontroller.core.util.SingletonTask; import net.floodlightcontroller.debugcounter.IDebugCounterService; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.sdnplatform.sync.internal.SyncManager; import org.sdnplatform.sync.internal.config.Node; import org.sdnplatform.sync.internal.util.Pair; @@ -58,22 +62,22 @@ public class RPCService { /** * Channel group that will hold all our channels */ - final ChannelGroup cg = new DefaultChannelGroup("Internal RPC"); + private final ChannelGroup cg = new DefaultChannelGroup("Internal RPC", GlobalEventExecutor.INSTANCE); /** - * {@link ExecutorService} used for netty boss threads + * {@link EventLoopGroup} used for netty boss threads */ - protected ExecutorService bossExecutor; + protected EventLoopGroup bossExecutor; /** - * {@link ExecutorService} used for netty worker threads + * {@link EventLoopGroup} used for netty worker threads */ - protected ExecutorService workerExecutor; + protected EventLoopGroup workerExecutor; /** * Netty {@link ClientBootstrap} used for creating client connections */ - protected ClientBootstrap clientBootstrap; + protected Bootstrap clientBootstrap; /** * Netty {@link ServerBootstrap} used for creating server connections @@ -81,9 +85,9 @@ public class RPCService { protected ServerBootstrap serverBootstrap; /** - * {@link ChannelPipelineFactory} for creating connections + * {@link RPCChannelInitializer} for creating connections */ - protected RPCPipelineFactory pipelineFactory; + protected RPCChannelInitializer pipelineFactory; /** * Node connections @@ -205,10 +209,10 @@ public class RPCService { } }; - bossExecutor = Executors.newCachedThreadPool(f2); - workerExecutor = Executors.newCachedThreadPool(f2); + bossExecutor = new NioEventLoopGroup(0, f2); + workerExecutor = new NioEventLoopGroup(0, f2); - pipelineFactory = new RPCPipelineFactory(syncManager, this); + pipelineFactory = new RPCChannelInitializer(syncManager, this); startServer(pipelineFactory); startClients(pipelineFactory); @@ -224,20 +228,15 @@ public class RPCService { logger.warn("Failed to cleanly shut down RPC server"); return; } - if (clientBootstrap != null) - clientBootstrap.releaseExternalResources(); + clientBootstrap = null; - if (serverBootstrap != null) - serverBootstrap.releaseExternalResources(); serverBootstrap = null; - if (pipelineFactory != null) - pipelineFactory.releaseExternalResources(); pipelineFactory = null; if (bossExecutor != null) - bossExecutor.shutdown(); + NettyUtils.shutdownAndWait("boss group", bossExecutor); bossExecutor = null; if (workerExecutor != null) - workerExecutor.shutdown(); + NettyUtils.shutdownAndWait("worker group", workerExecutor); workerExecutor = null; } catch (InterruptedException e) { logger.warn("Interrupted while shutting down RPC server"); @@ -422,19 +421,17 @@ public class RPCService { /** * Start listening sockets */ - protected void startServer(ChannelPipelineFactory pipelineFactory) { - final ServerBootstrap bootstrap = - new ServerBootstrap( - new NioServerSocketChannelFactory(bossExecutor, - workerExecutor)); - bootstrap.setOption("reuseAddr", true); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.sendBufferSize", SEND_BUFFER_SIZE); - bootstrap.setOption("child.receiveBufferSize", SEND_BUFFER_SIZE); - - bootstrap.setPipelineFactory(pipelineFactory); - serverBootstrap = bootstrap; + protected void startServer(RPCChannelInitializer channelInitializer) { + final ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossExecutor, workerExecutor) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE) + .option(ChannelOption.SO_RCVBUF, SEND_BUFFER_SIZE) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT) + .childHandler(channelInitializer); int port = syncManager.getClusterConfig().getNode().getPort(); InetSocketAddress sa; @@ -445,7 +442,10 @@ public class RPCService { else sa = new InetSocketAddress(port); - cg.add(bootstrap.bind(sa)); + ChannelFuture bindFuture = bootstrap.bind(sa); + cg.add(bindFuture.channel()); + + serverBootstrap = bootstrap; logger.info("Listening for internal floodlight RPC on {}", sa); } @@ -468,12 +468,12 @@ public class RPCService { synchronized (connections) { NodeConnection c = connections.remove(node.getNodeId()); if (c != null) c.nuke(); - cf.getChannel().close(); + cf.channel().close(); } String message = "[unknown error]"; if (cf.isCancelled()) message = "Timed out on connect"; - if (cf.getCause() != null) message = cf.getCause().getMessage(); + if (cf.cause() != null) message = cf.cause().getMessage(); logger.debug("[{}->{}] Could not connect to RPC " + "node: {}", new Object[]{syncManager.getLocalNodeId(), @@ -511,17 +511,16 @@ public class RPCService { * any nodes with a lower ID so that there will be a single connection * between each pair of nodes which we'll use symmetrically */ - protected void startClients(ChannelPipelineFactory pipelineFactory) { - final ClientBootstrap bootstrap = - new ClientBootstrap( - new NioClientSocketChannelFactory(bossExecutor, - workerExecutor)); - bootstrap.setOption("child.reuseAddr", true); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.sendBufferSize", SEND_BUFFER_SIZE); - bootstrap.setOption("child.connectTimeoutMillis", CONNECT_TIMEOUT); - bootstrap.setPipelineFactory(pipelineFactory); + protected void startClients(RPCChannelInitializer channelInitializer) { + final Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(workerExecutor) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_SNDBUF, SEND_BUFFER_SIZE) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECT_TIMEOUT) + .handler(channelInitializer); clientBootstrap = bootstrap; ScheduledExecutorService ses = @@ -576,6 +575,14 @@ public class RPCService { doNodeConnect(n); } } + + /** + * Retrieve the Netty ChannelGroup + * @return + */ + protected ChannelGroup getChannelGroup() { + return cg; + } /** * Periodically ensure that all the node connections are alive @@ -617,7 +624,7 @@ public class RPCService { protected void nuke() { state = NodeConnectionState.NONE; - if (pendingFuture != null) pendingFuture.cancel(); + if (pendingFuture != null) pendingFuture.cancel(false); if (nodeChannel != null) nodeChannel.close(); pendingFuture = null; nodeChannel = null; @@ -668,4 +675,4 @@ public class RPCService { } } } -} +} \ No newline at end of file diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/SyncMessageDecoder.java b/src/main/java/org/sdnplatform/sync/internal/rpc/SyncMessageDecoder.java new file mode 100644 index 0000000000000000000000000000000000000000..c5ac20eb50bbaec865048c9050b6c86c66ac6e6e --- /dev/null +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/SyncMessageDecoder.java @@ -0,0 +1,18 @@ +package org.sdnplatform.sync.internal.rpc; + +import net.floodlightcontroller.core.util.ThriftFrameDecoder; + +import org.sdnplatform.sync.thrift.SyncMessage; + +public class SyncMessageDecoder extends ThriftFrameDecoder<SyncMessage> { + + public SyncMessageDecoder(int maxSize) { + super(maxSize); + } + + @Override + protected SyncMessage allocateMessage() { + return new SyncMessage(); + } + +} diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/SyncMessageEncoder.java b/src/main/java/org/sdnplatform/sync/internal/rpc/SyncMessageEncoder.java new file mode 100644 index 0000000000000000000000000000000000000000..21e625f097b06b80721d0ea8b730c3527debb9ce --- /dev/null +++ b/src/main/java/org/sdnplatform/sync/internal/rpc/SyncMessageEncoder.java @@ -0,0 +1,9 @@ +package org.sdnplatform.sync.internal.rpc; + +import org.sdnplatform.sync.thrift.SyncMessage; + +import net.floodlightcontroller.core.util.ThriftFrameEncoder; + +public class SyncMessageEncoder extends ThriftFrameEncoder<SyncMessage> { + +} \ No newline at end of file diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/ThriftFrameDecoder.java b/src/main/java/org/sdnplatform/sync/internal/rpc/ThriftFrameDecoder.java deleted file mode 100644 index 50ec9d0f81f6613ba33555ed7366634baac2f759..0000000000000000000000000000000000000000 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/ThriftFrameDecoder.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.sdnplatform.sync.internal.rpc; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.transport.TIOStreamTransport; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; -import org.sdnplatform.sync.thrift.SyncMessage; - -/** - * Decode a {@link SyncMessage} from the channel - * @author readams - */ -public class ThriftFrameDecoder extends LengthFieldBasedFrameDecoder { - - public ThriftFrameDecoder(int maxSize) { - super(maxSize, 0, 4, 0, 4); - } - - @Override - protected Object decode(ChannelHandlerContext ctx, - Channel channel, - ChannelBuffer buffer) throws Exception { - List<SyncMessage> ms = null; - ChannelBuffer frame = null; - while (null != (frame = (ChannelBuffer) super.decode(ctx, channel, - buffer))) { - if (ms == null) ms = new ArrayList<SyncMessage>(); - ChannelBufferInputStream is = new ChannelBufferInputStream(frame); - TCompactProtocol thriftProtocol = - new TCompactProtocol(new TIOStreamTransport(is)); - SyncMessage bsm = new SyncMessage(); - bsm.read(thriftProtocol); - ms.add(bsm); - } - return ms; - } - - @Override - protected ChannelBuffer extractFrame(ChannelBuffer buffer, - int index, int length) { - return buffer.slice(index, length); - } -} diff --git a/src/main/java/org/sdnplatform/sync/internal/rpc/ThriftFrameEncoder.java b/src/main/java/org/sdnplatform/sync/internal/rpc/ThriftFrameEncoder.java deleted file mode 100644 index d71e8d2de8e51dd0285fa55b85c3d6e6c75bcc4e..0000000000000000000000000000000000000000 --- a/src/main/java/org/sdnplatform/sync/internal/rpc/ThriftFrameEncoder.java +++ /dev/null @@ -1,39 +0,0 @@ -package org.sdnplatform.sync.internal.rpc; - -import org.apache.thrift.protocol.TCompactProtocol; -import org.apache.thrift.transport.TIOStreamTransport; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.DynamicChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; -import org.sdnplatform.sync.thrift.SyncMessage; - - -/** - * Encode a {@link SyncMessage} into the channel - * @author readams - * - */ -public class ThriftFrameEncoder extends OneToOneEncoder { - - @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object message) throws Exception { - if (message instanceof SyncMessage) { - ChannelBuffer buf = new DynamicChannelBuffer(512); - ChannelBufferOutputStream os = new ChannelBufferOutputStream(buf); - TCompactProtocol thriftProtocol = - new TCompactProtocol(new TIOStreamTransport(os)); - ((SyncMessage) message).write(thriftProtocol); - - ChannelBuffer len = ChannelBuffers.buffer(4); - len.writeInt(buf.readableBytes()); - return ChannelBuffers.wrappedBuffer(len, buf); - } - return message; - } - -}