转载自易吉欢的博客
在Netty中实现重连的操作比较简单,Netty已经封装好了,我们只需要稍微扩展一下即可。
连接的操作是客户端这边执行的,重连的逻辑也得加在客户端,首先我们来看启动时要是连接不上怎么去重试
增加一个负责重试逻辑的监听器,代码如下:
import java.util.concurrent.TimeUnit; import com.netty.im.client.ImClientApp; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; /** * 负责监听启动时连接失败,重新连接功能 * @author yinjihuan * */ public class ConnectionListener implements ChannelFutureListener { private ImConnection imConnection = new ImConnection(); @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { System.err.println("服务端链接不上,开始重连操作..."); imConnection.connect(ImClientApp.HOST, ImClientApp.PORT); } }, 1L, TimeUnit.SECONDS); } else { System.err.println("服务端链接成功..."); } } }通过channelFuture.isSuccess()可以知道在连接的时候是成功了还是失败了,如果失败了我们就启动一个单独的线程来执行重新连接的操作。
只需要在ConnectionListener添加到ChannelFuture中去即可使用
public class ImConnection { private Channel channel; public Channel connect(String host, int port) { doConnect(host, port); return this.channel; } private void doConnect(String host, int port) { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 实体类传输数据,protobuf序列化 ch.pipeline().addLast("decoder", new ProtobufDecoder(MessageProto.Message.getDefaultInstance())); ch.pipeline().addLast("encoder", new ProtobufEncoder()); ch.pipeline().addLast(new ClientPoHandlerProto()); } }); ChannelFuture f = b.connect(host, port); f.addListener(new ConnectionListener()); channel = f.channel(); } catch(Exception e) { e.printStackTrace(); } } }使用的过程中服务端突然挂了,就得用另一种方式来重连了,可以在处理数据的Handler中进行处理。
public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter { private ImConnection imConnection = new ImConnection(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MessageProto.Message message = (MessageProto.Message) msg; System.out.println("client:" + message.getContent()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("掉线了..."); //使用过程中断线重连 final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { imConnection.connect(ImClientApp.HOST, ImClientApp.PORT); } }, 1L, TimeUnit.SECONDS); super.channelInactive(ctx); } }