From 126089e2b37054c3b8c60adfd4c74b257375192a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mikl=C3=B3s=20M=C3=A1rton?= Date: Thu, 24 Oct 2013 02:56:11 +0200 Subject: [PATCH] Migrated to Netty 4.0.10.final --- handshake/pom.xml | 16 +--- .../handshake/client/Client.java | 59 ++++++------ .../handshake/client/ClientHandler.java | 24 ++--- .../client/ClientHandshakeHandler.java | 92 +++++++++---------- .../handshake/common/ByteCounter.java | 54 ++++++----- .../handshake/common/HandshakeEvent.java | 32 ++----- .../handshake/common/MessageCounter.java | 32 +++---- .../handshake/server/Server.java | 67 +++++++------- .../handshake/server/ServerHandler.java | 32 +++---- .../server/ServerHandshakeHandler.java | 91 ++++++++---------- 10 files changed, 220 insertions(+), 279 deletions(-) diff --git a/handshake/pom.xml b/handshake/pom.xml index 842d2dc..57a6a9f 100644 --- a/handshake/pom.xml +++ b/handshake/pom.xml @@ -32,16 +32,6 @@ UTF-8 - - - repository.jboss.org - http://repository.jboss.org/nexus/content/groups/public/ - - false - - - - junit @@ -50,9 +40,9 @@ test - org.jboss.netty - netty - 3.2.3.Final + io.netty + netty-all + 4.0.10.Final compile diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/Client.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/Client.java index 5ec4a69..b871756 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/Client.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/Client.java @@ -2,22 +2,18 @@ import com.biasedbit.nettytutorials.handshake.common.ByteCounter; import com.biasedbit.nettytutorials.handshake.common.MessageCounter; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.Delimiters; -import org.jboss.netty.handler.codec.string.StringDecoder; -import org.jboss.netty.handler.codec.string.StringEncoder; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; import java.net.InetSocketAddress; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; /** * @author Bruno de Carvalho @@ -29,7 +25,7 @@ public class Client { private final String id; private final String serverId; private final ClientListener listener; - private ClientBootstrap bootstrap; + private Bootstrap bootstrap; private Channel connector; // constructors ----------------------------------------------------------- @@ -44,26 +40,27 @@ public Client(String id, String serverId, ClientListener listener) { public boolean start() { // Standard netty bootstrapping stuff. - Executor bossPool = Executors.newCachedThreadPool(); - Executor workerPool = Executors.newCachedThreadPool(); - ChannelFactory factory = - new NioClientSocketChannelFactory(bossPool, workerPool); - this.bootstrap = new ClientBootstrap(factory); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + this.bootstrap = new Bootstrap() + .channel(NioSocketChannel.class) + .group(workerGroup); // Declared outside to fit under 80 char limit final DelimiterBasedFrameDecoder frameDecoder = new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()); - this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - public ChannelPipeline getPipeline() throws Exception { + this.bootstrap.handler(new ChannelInitializer() { + + @Override + protected void initChannel(Channel ch) throws Exception { ByteCounter byteCounter = new ByteCounter("--- CLIENT-COUNTER :: "); MessageCounter messageCounter = new MessageCounter("--- CLIENT-MSGCOUNTER :: "); ClientHandshakeHandler handshakeHandler = new ClientHandshakeHandler(id, serverId, 5000); - - return Channels.pipeline(byteCounter, + + ch.pipeline().addLast(byteCounter, frameDecoder, new StringDecoder(), new StringEncoder(), @@ -78,29 +75,29 @@ public ChannelPipeline getPipeline() throws Exception { if (!future.awaitUninterruptibly().isSuccess()) { System.out.println("--- CLIENT - Failed to connect to server at " + "localhost:12345."); - this.bootstrap.releaseExternalResources(); + workerGroup.shutdownGracefully(); return false; } - this.connector = future.getChannel(); - return this.connector.isConnected(); + this.connector = future.channel(); + return this.connector.isActive(); } public void stop() { if (this.connector != null) { this.connector.close().awaitUninterruptibly(); } - this.bootstrap.releaseExternalResources(); + this.bootstrap.group().shutdownGracefully(); System.out.println("--- CLIENT - Stopped."); } public boolean sendMessage(String message) { - if (this.connector.isConnected()) { + if (this.connector.isActive()) { // Append \n if it's not present, because of the frame delimiter if (!message.endsWith("\n")) { - this.connector.write(message + '\n'); + this.connector.writeAndFlush(message + '\n'); } else { - this.connector.write(message); + this.connector.writeAndFlush(message); } return true; } diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandler.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandler.java index a84426a..3e3ad2b 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandler.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandler.java @@ -1,18 +1,15 @@ package com.biasedbit.nettytutorials.handshake.client; import com.biasedbit.nettytutorials.handshake.common.HandshakeEvent; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; /** * @author Bruno de Carvalho */ -public class ClientHandler extends SimpleChannelUpstreamHandler { +public class ClientHandler extends ChannelInboundHandlerAdapter { // internal vars ---------------------------------------------------------- @@ -29,7 +26,7 @@ public ClientHandler(ClientListener listener) { // SimpleChannelUpstreamHandler ------------------------------------------- @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) + public void userEventTriggered(ChannelHandlerContext ctx, Object e) throws Exception { if (e instanceof HandshakeEvent) { if (((HandshakeEvent) e).isSuccessful()) { @@ -41,22 +38,21 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) return; } - super.handleUpstream(ctx, e); + super.userEventTriggered(ctx, e); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { this.counter.incrementAndGet(); - this.listener.messageReceived(e.getMessage().toString()); + this.listener.messageReceived(msg.toString()); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - super.channelClosed(ctx, e); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); out("--- CLIENT-HANDLER :: Channel closed, received " + - this.counter.get() + " messages: " + e.getChannel()); + this.counter.get() + " messages: " + ctx.channel()); } // private static helpers ------------------------------------------------- diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandshakeHandler.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandshakeHandler.java index 6cddb66..6d10bff 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandshakeHandler.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/client/ClientHandshakeHandler.java @@ -2,16 +2,12 @@ import com.biasedbit.nettytutorials.handshake.common.Challenge; import com.biasedbit.nettytutorials.handshake.common.HandshakeEvent; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.DownstreamMessageEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.nio.channels.Channels; import java.util.ArrayDeque; import java.util.Queue; @@ -22,7 +18,7 @@ /** * @author Bruno de Carvalho */ -public class ClientHandshakeHandler extends SimpleChannelHandler { +public class ClientHandshakeHandler extends ChannelDuplexHandler { // internal vars ---------------------------------------------------------- @@ -32,7 +28,7 @@ public class ClientHandshakeHandler extends SimpleChannelHandler { private final AtomicBoolean handshakeComplete; private final AtomicBoolean handshakeFailed; private final CountDownLatch latch = new CountDownLatch(1); - private final Queue messages = new ArrayDeque(); + private final Queue messages = new ArrayDeque(); private final Object handshakeMutex = new Object(); private String challenge; @@ -50,8 +46,8 @@ public ClientHandshakeHandler(String localId, String remoteId, // SimpleChannelHandler --------------------------------------------------- @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { + public void channelRead(ChannelHandlerContext ctx, + Object msg) throws Exception { if (this.handshakeFailed.get()) { // Bail out fast if handshake already failed return; @@ -62,7 +58,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) // handler, then immediately send it upwards. // Chances are it's the last time a message passes through // this handler... - super.messageReceived(ctx, e); + super.channelRead(ctx, msg); return; } @@ -74,13 +70,13 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) } if (this.handshakeComplete.get()) { - super.messageReceived(ctx, e); + super.channelRead(ctx, msg); return; } // Parse the challenge. // Expected format is "clientId:serverId:challenge" - String[] params = ((String) e.getMessage()).trim().split(":"); + String[] params = ((String) msg).trim().split(":"); if (params.length != 3) { out("--- CLIENT-HS :: Invalid handshake: expected 3 params, " + "got " + params.length); @@ -125,12 +121,13 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) // to pass through this class's writeRequest() and be re-queued. out("--- CLIENT-HS :: " + this.messages.size() + " messages in queue to be flushed."); - for (MessageEvent message : this.messages) { - ctx.sendDownstream(message); + for (Object message : this.messages) { + ctx.write(message); } + ctx.flush(); // Remove this handler from the pipeline; its job is finished. - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); // Finally fire success message upwards. this.fireHandshakeSucceeded(this.remoteId, ctx); @@ -138,13 +135,20 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) } @Override - public void channelConnected(final ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { + public void channelActive(final ChannelHandlerContext ctx) + throws Exception { out("--- CLIENT-HS :: Outgoing connection established to: " + - e.getChannel().getRemoteAddress()); + ctx.channel().remoteAddress()); // Write the handshake & add a timeout listener. - ChannelFuture f = Channels.future(ctx.getChannel()); + this.challenge = Challenge.generateChallenge(); + String handshake = + this.localId + ':' + this.remoteId + ':' + challenge + '\n'; + + // We need to write to ctx directly rather than call + // ctx.channel().write() otherwise the message would pass through this + // class's write() method defined below. + ChannelFuture f = ctx.writeAndFlush(handshake); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) @@ -213,23 +217,10 @@ public void run() { }.start(); } }); - - this.challenge = Challenge.generateChallenge(); - String handshake = - this.localId + ':' + this.remoteId + ':' + challenge + '\n'; - Channel c = ctx.getChannel(); - // Passing null as remoteAddress, since constructor in - // DownstreamMessageEvent will use remote address from the channel if - // remoteAddress is null. - // Also, we need to send the data directly downstream rather than - // call c.write() otherwise the message would pass through this - // class's writeRequested() method defined below. - ctx.sendDownstream(new DownstreamMessageEvent(c, f, handshake, null)); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { out("--- CLIENT-HS :: Channel closed."); if (!this.handshakeComplete.get()) { this.fireHandshakeFailed(ctx); @@ -237,13 +228,13 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { out("--- CLIENT-HS :: Exception caught."); - e.getCause().printStackTrace(); - if (e.getChannel().isConnected()) { + cause.printStackTrace(); + if (ctx.channel().isActive()) { // Closing the channel will trigger handshake failure. - e.getChannel().close(); + ctx.channel().close(); } else { // Channel didn't open, so we must fire handshake failure directly. this.fireHandshakeFailed(ctx); @@ -251,8 +242,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) } @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { // Before doing anything, ensure that noone else is working by // acquiring a lock on the handshakeMutex. synchronized (this.handshakeMutex) { @@ -266,13 +257,13 @@ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) // them downwards. if (this.handshakeComplete.get()) { out("--- CLIENT-HS :: Handshake already completed, not " + - "appending '" + e.getMessage().toString().trim() + + "appending '" + msg.toString().trim() + "' to queue!"); - super.writeRequested(ctx, e); + super.write(ctx, msg, promise); } else { // Otherwise, queue messages in order until the handshake // completes. - this.messages.offer(e); + this.messages.offer(msg); } } } @@ -289,8 +280,8 @@ private void fireHandshakeFailed(ChannelHandlerContext ctx) { this.handshakeComplete.set(true); this.handshakeFailed.set(true); this.latch.countDown(); - ctx.getChannel().close(); - ctx.sendUpstream(HandshakeEvent.handshakeFailed(ctx.getChannel())); + ctx.channel().close(); + ctx.fireUserEventTriggered(HandshakeEvent.handshakeFailed()); } private void fireHandshakeSucceeded(String server, @@ -298,7 +289,6 @@ private void fireHandshakeSucceeded(String server, this.handshakeComplete.set(true); this.handshakeFailed.set(false); this.latch.countDown(); - ctx.sendUpstream(HandshakeEvent - .handshakeSucceeded(server, ctx.getChannel())); + ctx.fireUserEventTriggered(HandshakeEvent.handshakeSucceeded(server)); } } diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/ByteCounter.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/ByteCounter.java index bbe2a3d..8cff910 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/ByteCounter.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/ByteCounter.java @@ -1,18 +1,17 @@ package com.biasedbit.nettytutorials.handshake.common; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.WriteCompletionEvent; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufHolder; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import java.util.concurrent.atomic.AtomicLong; /** * @author Bruno de Carvalho */ -public class ByteCounter extends SimpleChannelUpstreamHandler { +public class ByteCounter extends ChannelDuplexHandler { // internal vars ---------------------------------------------------------- @@ -29,30 +28,41 @@ public ByteCounter(String id) { } // SimpleChannelUpstreamHandler ------------------------------------------- - + @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (e.getMessage() instanceof ChannelBuffer) { - this.readBytes.addAndGet(((ChannelBuffer) e.getMessage()) - .readableBytes()); + long size; + if (msg instanceof ByteBuf) { + size = ((ByteBuf) msg).readableBytes(); + } else if (msg instanceof ByteBufHolder) { + size = ((ByteBufHolder) msg).content().readableBytes(); + } else { + size = -1; } - - super.messageReceived(ctx, e); + readBytes.addAndGet(size); + ctx.fireChannelRead(msg); } @Override - public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) - throws Exception { - super.writeComplete(ctx, e); - this.writtenBytes.addAndGet(e.getWrittenAmount()); + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { + long size; + if (msg instanceof ByteBuf) { + size = ((ByteBuf) msg).writableBytes(); + } else if (msg instanceof ByteBufHolder) { + size = ((ByteBufHolder) msg).content().writableBytes(); + } else { + size = -1; + } + writtenBytes.addAndGet(size); + ctx.write(msg, promise); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - super.channelClosed(ctx, e); - System.out.println(this.id + ctx.getChannel() + " -> sent: " + + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelInactive(); + System.out.println(this.id + ctx.channel() + " -> sent: " + this.getWrittenBytes() + "b, recv: " + this.getReadBytes() + "b"); } diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/HandshakeEvent.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/HandshakeEvent.java index 77ff980..7ab1bcf 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/HandshakeEvent.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/HandshakeEvent.java @@ -1,50 +1,30 @@ package com.biasedbit.nettytutorials.handshake.common; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.Channels; - /** * @author Bruno de Carvalho */ -public class HandshakeEvent implements ChannelEvent { +public class HandshakeEvent { // internal vars ---------------------------------------------------------- private final boolean successful; private final String remoteId; - private final Channel channel; // constructors ----------------------------------------------------------- - private HandshakeEvent(String remoteId, Channel channel) { + private HandshakeEvent(String remoteId) { this.remoteId = remoteId; this.successful = remoteId != null; - this.channel = channel; } // public static methods -------------------------------------------------- - public static HandshakeEvent handshakeSucceeded(String remoteId, - Channel channel) { - return new HandshakeEvent(remoteId, channel); - } - - public static HandshakeEvent handshakeFailed(Channel channel) { - return new HandshakeEvent(null, channel); - } - - // ChannelEvent ----------------------------------------------------------- - - @Override - public Channel getChannel() { - return this.channel; + public static HandshakeEvent handshakeSucceeded(String remoteId) { + return new HandshakeEvent(remoteId); } - @Override - public ChannelFuture getFuture() { - return Channels.succeededFuture(this.channel); + public static HandshakeEvent handshakeFailed() { + return new HandshakeEvent(null); } // getters & setters ------------------------------------------------------ diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/MessageCounter.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/MessageCounter.java index 11368e2..98ecb85 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/MessageCounter.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/common/MessageCounter.java @@ -1,16 +1,15 @@ package com.biasedbit.nettytutorials.handshake.common; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; import java.util.concurrent.atomic.AtomicLong; /** * @author Bruno de Carvalho */ -public class MessageCounter extends SimpleChannelHandler { +public class MessageCounter extends ChannelDuplexHandler { // internal vars ---------------------------------------------------------- @@ -29,24 +28,23 @@ public MessageCounter(String id) { // SimpleChannelHandler --------------------------------------------------- @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - this.readMessages.incrementAndGet(); - super.messageReceived(ctx, e); + public void channelRead(ChannelHandlerContext ctx, + Object msg) throws Exception { + readMessages.incrementAndGet(); + ctx.fireChannelRead(msg); } @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - this.writtenMessages.incrementAndGet(); - super.writeRequested(ctx, e); + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { + writtenMessages.incrementAndGet(); + ctx.write(msg, promise); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - super.channelClosed(ctx, e); - System.out.println(this.id + ctx.getChannel() + " -> sent: " + + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelInactive(); + System.out.println(this.id + ctx.channel() + " -> sent: " + this.getWrittenMessages() + ", recv: " + this.getReadMessages()); } diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/Server.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/Server.java index 90c9f90..eb7c1db 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/Server.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/Server.java @@ -2,24 +2,21 @@ import com.biasedbit.nettytutorials.handshake.common.ByteCounter; import com.biasedbit.nettytutorials.handshake.common.MessageCounter; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelHandler; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; -import org.jboss.netty.handler.codec.frame.Delimiters; -import org.jboss.netty.handler.codec.string.StringDecoder; -import org.jboss.netty.handler.codec.string.StringEncoder; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; /** * @author Bruno de Carvalho @@ -45,30 +42,31 @@ public Server(String id, ServerListener listener) { public boolean start() { // Pretty standard Netty startup stuff... // boss/worker executors, channel factory, channel group, pipeline, ... - Executor bossPool = Executors.newCachedThreadPool(); - Executor workerPool = Executors.newCachedThreadPool(); - ChannelFactory factory = - new NioServerSocketChannelFactory(bossPool, workerPool); - this.bootstrap = new ServerBootstrap(factory); + NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + this.bootstrap = new ServerBootstrap() + .channel(NioServerSocketChannel.class) + .group(bossGroup, workerGroup); - this.channelGroup = new DefaultChannelGroup(this.id + "-all-channels"); + this.channelGroup = new DefaultChannelGroup(this.id + "-all-channels", + GlobalEventExecutor.INSTANCE); - // declared here to fit under the 80 char limit - final ChannelHandler delimiter = - new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, - Delimiters.lineDelimiter()); - this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() { + this.bootstrap.childHandler(new ChannelInitializer() { @Override - public ChannelPipeline getPipeline() throws Exception { + protected void initChannel(Channel ch) throws Exception { ByteCounter counter = new ByteCounter("+++ SERVER-COUNTER :: "); + ChannelHandler delimiter = + new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, + Delimiters.lineDelimiter()); MessageCounter messageCounter = new MessageCounter("+++ SERVER-MSGCOUNTER :: "); ServerHandshakeHandler handshakeHandler = new ServerHandshakeHandler(id, channelGroup, 5000); - return Channels.pipeline(counter, + + ch.pipeline().addLast(counter, delimiter, new StringDecoder(), new StringEncoder(), @@ -78,21 +76,24 @@ public ChannelPipeline getPipeline() throws Exception { } }); - Channel acceptor = this.bootstrap.bind(new InetSocketAddress(12345)); - if (acceptor.isBound()) { + Channel acceptor = this.bootstrap.bind(new InetSocketAddress(12345)) + .awaitUninterruptibly().channel(); + if (acceptor.isActive()) { System.err.println("+++ SERVER - bound to *:12345"); this.channelGroup.add(acceptor); return true; } else { System.err.println("+++ SERVER - Failed to bind to *:12345"); - this.bootstrap.releaseExternalResources(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); return false; } } public void stop() { this.channelGroup.close().awaitUninterruptibly(); - this.bootstrap.releaseExternalResources(); + this.bootstrap.group().shutdownGracefully(); + this.bootstrap.childGroup().shutdownGracefully(); System.err.println("+++ SERVER - Stopped."); } } diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandler.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandler.java index 46220ec..4b46195 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandler.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandler.java @@ -1,19 +1,16 @@ package com.biasedbit.nettytutorials.handshake.server; import com.biasedbit.nettytutorials.handshake.common.HandshakeEvent; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelEvent; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger; /** * @author Bruno de Carvalho */ -public class ServerHandler extends SimpleChannelUpstreamHandler { +public class ServerHandler extends ChannelInboundHandlerAdapter { // internal vars ---------------------------------------------------------- @@ -32,14 +29,14 @@ public ServerHandler(ServerListener listener) { // SimpleChannelUpstreamHandler ------------------------------------------- @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) + public void userEventTriggered(ChannelHandlerContext ctx, Object e) throws Exception { if (e instanceof HandshakeEvent) { if (((HandshakeEvent) e).isSuccessful()) { out("+++ SERVER-HANDLER :: Handshake successful, connection " + "to " + ((HandshakeEvent) e).getRemoteId() + " is up."); this.remoteId = ((HandshakeEvent) e).getRemoteId(); - this.channel = ctx.getChannel(); + this.channel = ctx.channel(); // Notify the listener that a new connection is now READY this.listener.connectionOpen(this); } else { @@ -48,31 +45,30 @@ public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) return; } - super.handleUpstream(ctx, e); + super.userEventTriggered(ctx, e); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { this.counter.incrementAndGet(); - this.listener.messageReceived(this, e.getMessage().toString()); + this.listener.messageReceived(this, msg.toString()); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - super.channelClosed(ctx, e); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); out("+++ SERVER-HANDLER :: Channel closed, received " + - this.counter.get() + " messages: " + e.getChannel()); + this.counter.get() + " messages: " + ctx.channel()); } // public methods --------------------------------------------------------- public void sendMessage(String message) { if (!message.endsWith("\n")) { - this.channel.write(message + '\n'); + this.channel.writeAndFlush(message + '\n'); } else { - this.channel.write(message); + this.channel.writeAndFlush(message); } } diff --git a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandshakeHandler.java b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandshakeHandler.java index de913cc..a82bd50 100644 --- a/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandshakeHandler.java +++ b/handshake/src/main/java/com/biasedbit/nettytutorials/handshake/server/ServerHandshakeHandler.java @@ -2,18 +2,11 @@ import com.biasedbit.nettytutorials.handshake.common.Challenge; import com.biasedbit.nettytutorials.handshake.common.HandshakeEvent; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.DownstreamMessageEvent; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.channel.group.ChannelGroup; - -import java.net.SocketAddress; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.group.ChannelGroup; + import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CountDownLatch; @@ -23,7 +16,7 @@ /** * @author Bruno de Carvalho */ -public class ServerHandshakeHandler extends SimpleChannelHandler { +public class ServerHandshakeHandler extends ChannelDuplexHandler { // internal vars ---------------------------------------------------------- @@ -33,7 +26,7 @@ public class ServerHandshakeHandler extends SimpleChannelHandler { private final AtomicBoolean handshakeComplete; private final AtomicBoolean handshakeFailed; private final Object handshakeMutex = new Object(); - private final Queue messages = new ArrayDeque(); + private final Queue messages = new ArrayDeque(); private final CountDownLatch latch = new CountDownLatch(1); // constructors ----------------------------------------------------------- @@ -50,7 +43,7 @@ public ServerHandshakeHandler(String localId, ChannelGroup group, // SimpleChannelHandler --------------------------------------------------- @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (this.handshakeFailed.get()) { // Bail out fast if handshake already failed @@ -60,7 +53,7 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) if (this.handshakeComplete.get()) { // If handshake succeeded but message still came through this // handler, then immediately send it upwards. - super.messageReceived(ctx, e); + super.channelRead(ctx, msg); return; } @@ -72,12 +65,12 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) } if (this.handshakeComplete.get()) { - super.messageReceived(ctx, e); + super.channelRead(ctx, msg); return; } // Validate handshake - String handshake = (String) e.getMessage(); + String handshake = (String) msg; // 1. Validate expected clientId:serverId:challenge format String[] params = handshake.trim().split(":"); if (params.length != 3) { @@ -109,31 +102,32 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) "removing handshake handler from pipeline."); String response = params[0] + ':' + params[1] + ':' + Challenge.generateResponse(params[2]) + '\n'; - this.writeDownstream(ctx, response); + ctx.write(response); // Flush any pending messages (in this tutorial, no messages will // ever be queued because the server does not take the initiative // of sending messages to clients on its own... out("+++ SERVER-HS :: " + this.messages.size() + " messages in queue to be flushed."); - for (MessageEvent message : this.messages) { - ctx.sendDownstream(message); + for (Object message : this.messages) { + ctx.write(message); } + ctx.flush(); // Finally, remove this handler from the pipeline and fire success // event up the pipeline. out("+++ SERVER-HS :: Removing handshake handler from pipeline."); - ctx.getPipeline().remove(this); + ctx.pipeline().remove(this); this.fireHandshakeSucceeded(client, ctx); } } @Override - public void channelConnected(final ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - this.group.add(ctx.getChannel()); + public void channelActive(final ChannelHandlerContext ctx) + throws Exception { + this.group.add(ctx.channel()); out("+++ SERVER-HS :: Incoming connection established from: " + - e.getChannel().getRemoteAddress()); + ctx.channel().remoteAddress()); // Fire up the handshake handler timeout checker. // Wait X seconds for the handshake then disconnect. @@ -171,10 +165,10 @@ public void run() { if (!handshakeComplete.get()) { out("+++ SERVER-HS :: (synchro) Handshake timeout " + "checker: timed out, killing connection."); - ctx.sendUpstream(HandshakeEvent - .handshakeFailed(ctx.getChannel())); + ctx.fireUserEventTriggered(HandshakeEvent + .handshakeFailed()); handshakeFailed.set(true); - ctx.getChannel().close(); + ctx.channel().close(); } else { out("+++ SERVER-HS :: (synchro) Handshake timeout " + "checker: discarded (handshake OK)"); @@ -185,8 +179,7 @@ public void run() { } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { out("+++ SERVER-HS :: Channel closed."); if (!this.handshakeComplete.get()) { this.fireHandshakeFailed(ctx); @@ -194,13 +187,13 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { out("+++ SERVER-HS :: Exception caught."); - e.getCause().printStackTrace(); - if (e.getChannel().isConnected()) { + cause.printStackTrace(); + if (ctx.channel().isActive()) { // Closing the channel will trigger handshake failure. - e.getChannel().close(); + ctx.channel().close(); } else { // Channel didn't open, so we must fire handshake failure directly. this.fireHandshakeFailed(ctx); @@ -209,8 +202,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { // Before doing anything, ensure that noone else is working by // acquiring a lock on the handshakeMutex. synchronized (this.handshakeMutex) { @@ -224,13 +217,13 @@ public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) // them downwards. if (this.handshakeComplete.get()) { out("+++ SERVER-HS :: Handshake already completed, not " + - "appending '" + e.getMessage().toString().trim() + + "appending '" + msg.toString().trim() + "' to queue!"); - super.writeRequested(ctx, e); + super.write(ctx, msg, promise); } else { // Otherwise, queue messages in order until the handshake // completes. - this.messages.offer(e); + this.messages.offer(msg); } } } @@ -243,22 +236,12 @@ private static void out(String s) { // private helpers -------------------------------------------------------- - private void writeDownstream(ChannelHandlerContext ctx, Object data) { - // Just declaring these variables so that last statement in this - // method fits inside the 80 char limit... I typically use 120 :) - ChannelFuture f = Channels.succeededFuture(ctx.getChannel()); - SocketAddress address = ctx.getChannel().getRemoteAddress(); - Channel c = ctx.getChannel(); - - ctx.sendDownstream(new DownstreamMessageEvent(c, f, data, address)); - } - private void fireHandshakeFailed(ChannelHandlerContext ctx) { this.handshakeComplete.set(true); this.handshakeFailed.set(true); this.latch.countDown(); - ctx.getChannel().close(); - ctx.sendUpstream(HandshakeEvent.handshakeFailed(ctx.getChannel())); + ctx.channel().close(); + ctx.fireUserEventTriggered(HandshakeEvent.handshakeFailed()); } private void fireHandshakeSucceeded(String client, @@ -266,7 +249,7 @@ private void fireHandshakeSucceeded(String client, this.handshakeComplete.set(true); this.handshakeFailed.set(false); this.latch.countDown(); - ctx.sendUpstream(HandshakeEvent - .handshakeSucceeded(client, ctx.getChannel())); + ctx.fireUserEventTriggered(HandshakeEvent + .handshakeSucceeded(client)); } }