基于TCP的长连接服务器,为了时刻保持服务端到客户端在“传输层”这一抽象概念上的连通性,最常采用的是心跳包技术。当检测到连接意外断开,客户端需要主动尝试重连,恢复连接。如果最终无法连接,再做出相应的错误处理。
注:本篇笔记基于上一篇基于TCP的服务端和客户端代码。
一般来说,心跳包是由客户端定时发给服务端的,由服务端对客户端是否还在线做出判断。Netty中,心跳包具有三种类型的超时时间:
Channel
读到数据,触发该超时Channel
写数据,触发该超时Channel
读也没有向Channel
写,触发该超时IdleStateHandler
这个处理器已经帮我们封装好了对三种超时方式的判断。
下面服务端代码中,我们配置IdleStateHandler
,设定三个超时时间参数,读超时5秒,写超时7秒,读写超时10秒。
TcpServerInitializer.java
package com.gacfox.demo.netty.tcp.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {
public static final int MAX_FRAME_LENGTH = 1024;
private TcpServerHandler tcpServerHandler = new TcpServerHandler();
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS));
pipeline.addLast(new IdleEventHandler());
pipeline.addLast(tcpServerHandler);
}
}
IdleEventHandler
是我们自定义的一个处理器,它的作用是用来捕获超时事件。我们处理超时的逻辑很简单,将Channel
断开即可。
IdleEventHandler.java
package com.gacfox.demo.netty.tcp.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IdleEventHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String evtType;
switch (idleStateEvent.state()) {
case READER_IDLE:
evtType = "读超时";
break;
case WRITER_IDLE:
evtType = "写超时";
break;
case ALL_IDLE:
evtType = "读写超时";
break;
default:
evtType = "未知原因";
break;
}
ctx.channel().close();
logger.info(ctx.channel().remoteAddress()+ " 通道关闭 " + evtType);
}
}
}
注意,这里我们继承的是ChannelInboundHandlerAdapter
,这便于我们重写userEventTriggered()
方法,上一步超时处理器会以传入事件的形式,回调该方法。我们可以通过读取方法的evt
参数,得知超时事件的类型。
此外还要注意,TcpServerInitializer
中处理器的消息传递问题,ChannelInboundHandlerAdapter
默认收到消息的处理机制是ctx.fireChannelRead(msg);
,它默认会将消息传递给下一个处理器,因此我们的TcpServerHandler
才能正确的收到消息。
客户端需要定时向服务端发送心跳包。实际上,对于这类需求,我们一般会基于TCP设计一套应用层协议,协议规定了通用的数据包报文格式,心跳包会作为报文的一种。我们这里简单起见,就不搞那么复杂了,直接发送一个字符串heart beat
。
private void pingServer(Channel channel) {
// 发送心跳
channel.eventLoop().scheduleAtFixedRate(() -> {
if (channel.isActive()) {
// 通道仍连通,发送心跳
// 这里只是测试,因此用字符串'heart beat'代表心跳包内容
channel.writeAndFlush("heart beat");
}
}, 1,1, TimeUnit.SECONDS);
}
代码可能和你想象的不太一样,我们没有单起一个线程,sleep()
一段时间向Channel
写一段数据。这是因为Netty中对很多操作做了封装,我们这里使用一个执行器来定时执行我们的操作(实际上Java本身也具有该功能,但是Netty封装的执行器能更好的和Netty整合)。
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
// channel注册上时设置到对象成员变量,供输入接收线程使用
channel = ctx.channel();
pingServer(channel);
}
pingServer()
方法在Channel
注册时将首次调用。
一般来说,服务端收到心跳包都会再响应一个心跳包,客户端如果很久都没收到来自服务端的任何响应,就要考虑自己是否是掉线了。
我们之前在服务端使用的IdleStateHandler
也同样可以用于客户端,这里就不多介绍了。
断线重连的逻辑主要还是由客户端实现的。当捕获到Channel
未预期的突然关闭,或者发出的心跳包很久都没有收到响应,就要考虑重新和服务端建立连接。
当然,重连也是有一定策略的,最简单的策略:侦测到断线后,每隔5秒重试一次,共重试3次,如果仍不能重新建立连接,就把网络错误报告给用户。
如果使用Netty开发客户端,这里要注意的一个问题就是断线的检测,让我纠结了很久的一个错误做法是在处理器中的channelUnregistered
实现重连逻辑。因为重连时,无论客户端是否能连上,都会经历channelRegistered -> channelUnregistered
这个过程。正确的做法是为客户端连接返回的Future
对象设置监听器。
/**
* 重试次数
*/
public static AtomicInteger retries = new AtomicInteger(1);
/**
* 最大重试次数
*/
public static final int MAX_RETRIES = 3;
/**
* 是否已连接
*/
public static AtomicBoolean connected = new AtomicBoolean(false);
我们代码中有这么几个用来记录连接状态的变量:
retries
:记录已重试次数MAX_RETRIES
:记录最大的重试次数connected
:记录客户端应用是否已连接ChannelFuture future = bootstrap.connect(HOST, PORT);
future.addListener(future1 -> {
if (future.isSuccess()) {
connected.set(true);
retries.set(1);
} else {
connected.set(false);
System.out.println("出错:" + future1.cause().getMessage());
}
});
上面代码中,我们可以通过future.isSuccess
判断连接是否成功,如果连接成功,就将应用状态设置为已连接,重试变量设为1
。除上述代码外,我们还有一个额外的线程用来监控连接状态,一旦状态变为未连接,就开始重试。
// 进入断线监控循环
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!connected.get() && retries.getAndIncrement() <= MAX_RETRIES) {
// 断线,停止监控循环,重新connect次
System.out.println("第" + (retries.get() - 1) + "次重试...");
connect();
break;
} else if(retries.get() > MAX_RETRIES) {
System.out.println("不行了。。。");
System.exit(-1);
}
}
我们可以在主线程的最后,添加上述逻辑,或者新开一个线程。
当然,客户端可能并不是Netty写的,那就需要使用对应的应用框架实现相应的逻辑,这里不多作介绍了。