Netty 是一个基于 NIO 技术的网络编程框架,底层实现了对 java 中 NIO API的封装。它基于异步事件驱动,可以快速开发高性能网络应用程序,并在可维护性方面有很好的表现
Netty 的健壮性、功能、性能、可定制性和可扩展性在同类框架中都首屈一指,它已经得到成百上千的商用项目验证,当然这些都离不开它背后的 NIO 技术,线程技术的合理应用。
基于一个时间服务器对象与时间客户端对象的通讯为案例,分析一下netty 中 NIO 的应用. 本项目采用的 netty 为 4.x 的版本,例如
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.16.Final</version> </dependency>1. 服务端创建关键步骤
1) 创建服务端启动类对象(ServerBootstrap)2) 设置线程组(Boss 线程组和 Worker 线程组)3) 设置服务端 channel 对象(NioServerSocketChannel)4) 设置 ChanelHandler 对象5) 绑定并启动端口监听(等待客户端链接)2. 服务端代码实现
创建事件服务器:
public class TimeServer { public void start() throws Exception{ NioEventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup=new NioEventLoopGroup(); try { ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); }; }) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f=b.bind(9999).sync(); f.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args)throws Exception { new TimeServer().start(); } }创建服务端时间处理器:
public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { final ByteBuf time=ctx.alloc().buffer(8); time.writeLong(System.currentTimeMillis()); final ChannelFuture f=ctx.writeAndFlush(time); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { assert f == future; ctx.close(); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }1. 客户端创建关键步骤
1) 创建客户端启动类对象(Bootstrap)2) 设置线程组(Worker 线程组)3) 设置客户端 channel 对象(NioSocketChannel)4) 设置 ChanelHandler 对象5) 连接服务端2. 客户端代码实现
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient { public void connect(String ip, Integer port) throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //Bootstrap客户端用于简单建立Channel Bootstrap b = new Bootstrap(); b.group(workerGroup); //NioSocketChannel用于客户端创建Channel b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //指定使用的数据处理方式 ch.pipeline().addLast(new TimeClientHandler()); } ; }); //客户端开始连接 ChannelFuture f = b.connect(ip, port).sync(); //等待直到这个连接被关闭 f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new TimeClient().connect("127.0.0.1", 9999); } }创建客户端时间处理器:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class TimeClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf m=(ByteBuf)msg; try { long currentTimeMillis = m.readLong(); System.out.println("from server:"+new Date(currentTimeMillis)); ctx.close(); } finally { m.release(); } } }