心跳检测和断线重连

基于TCP的长连接服务器,为了时刻保持服务端到客户端在“传输层”这一抽象概念上的连通性,最常采用的是心跳包技术。当检测到连接意外断开,客户端需要主动尝试重连,恢复连接。如果最终无法连接,再做出相应的错误处理。

注:本篇笔记基于上一篇基于TCP的服务端和客户端代码。

心跳检测

一般来说,心跳包是由客户端定时发给服务端的,由服务端对客户端是否还在线做出判断。Netty中,心跳包具有三种类型的超时时间:

  • 读超时:在规定时间内服务端没有从Channel读到数据,触发该超时
  • 写超时:在规定时间内服务端没有向Channel写数据,触发该超时
  • 读写超时:在规定时间内既没有从Channel读也没有向Channel写,触发该超时

IdleStateHandler这个处理器已经帮我们封装好了对三种超时方式的判断。

服务端超时处理器

下面服务端代码中,我们配置IdleStateHandler,设定三个超时时间参数,读超时5秒,写超时7秒,读写超时10秒。

TcpServerInitializer.java

package com.gacfox.demo.netty.tcp.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;

public class TcpServerInitializer extends ChannelInitializer<SocketChannel> {

    public static final int MAX_FRAME_LENGTH = 1024;
    private TcpServerHandler tcpServerHandler = new TcpServerHandler();

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, 4, 0, 4));
        pipeline.addLast(new LengthFieldPrepender(4));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS));
        pipeline.addLast(new IdleEventHandler());
        pipeline.addLast(tcpServerHandler);
    }
}

IdleEventHandler是我们自定义的一个处理器,它的作用是用来捕获超时事件。我们处理超时的逻辑很简单,将Channel断开即可。

IdleEventHandler.java

package com.gacfox.demo.netty.tcp.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IdleEventHandler extends ChannelInboundHandlerAdapter {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            String evtType;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    evtType = "读超时";
                    break;
                case WRITER_IDLE:
                    evtType = "写超时";
                    break;
                case ALL_IDLE:
                    evtType = "读写超时";
                    break;
                default:
                    evtType = "未知原因";
                    break;
            }
            ctx.channel().close();
            logger.info(ctx.channel().remoteAddress()+ " 通道关闭 " + evtType);
        }
    }
}

注意,这里我们继承的是ChannelInboundHandlerAdapter,这便于我们重写userEventTriggered()方法,上一步超时处理器会以传入事件的形式,回调该方法。我们可以通过读取方法的evt参数,得知超时事件的类型。

此外还要注意,TcpServerInitializer中处理器的消息传递问题,ChannelInboundHandlerAdapter默认收到消息的处理机制是ctx.fireChannelRead(msg);,它默认会将消息传递给下一个处理器,因此我们的TcpServerHandler才能正确的收到消息。

客户端实现定时心跳

客户端需要定时向服务端发送心跳包。实际上,对于这类需求,我们一般会基于TCP设计一套应用层协议,协议规定了通用的数据包报文格式,心跳包会作为报文的一种。我们这里简单起见,就不搞那么复杂了,直接发送一个字符串heart beat

private void pingServer(Channel channel) {
    // 发送心跳
    channel.eventLoop().scheduleAtFixedRate(() -> {
        if (channel.isActive()) {
            // 通道仍连通,发送心跳
            // 这里只是测试,因此用字符串'heart beat'代表心跳包内容
            channel.writeAndFlush("heart beat");
        }
    }, 1,1, TimeUnit.SECONDS);
}

代码可能和你想象的不太一样,我们没有单起一个线程,sleep()一段时间向Channel写一段数据。这是因为Netty中对很多操作做了封装,我们这里使用一个执行器来定时执行我们的操作(实际上Java本身也具有该功能,但是Netty封装的执行器能更好的和Netty整合)。

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    super.channelRegistered(ctx);
    // channel注册上时设置到对象成员变量,供输入接收线程使用
    channel = ctx.channel();

    pingServer(channel);
}

pingServer()方法在Channel注册时将首次调用。

客户端超时处理器

一般来说,服务端收到心跳包都会再响应一个心跳包,客户端如果很久都没收到来自服务端的任何响应,就要考虑自己是否是掉线了。

我们之前在服务端使用的IdleStateHandler也同样可以用于客户端,这里就不多介绍了。

断线重连

断线重连的逻辑主要还是由客户端实现的。当捕获到Channel未预期的突然关闭,或者发出的心跳包很久都没有收到响应,就要考虑重新和服务端建立连接。

当然,重连也是有一定策略的,最简单的策略:侦测到断线后,每隔5秒重试一次,共重试3次,如果仍不能重新建立连接,就把网络错误报告给用户。

如果使用Netty开发客户端,这里要注意的一个问题就是断线的检测,让我纠结了很久的一个错误做法是在处理器中的channelUnregistered实现重连逻辑。因为重连时,无论客户端是否能连上,都会经历channelRegistered -> channelUnregistered这个过程。正确的做法是为客户端连接返回的Future对象设置监听器。

/**
* 重试次数
*/
public static AtomicInteger retries = new AtomicInteger(1);
/**
* 最大重试次数
*/
public static final int MAX_RETRIES = 3;
/**
* 是否已连接
*/
public static AtomicBoolean connected = new AtomicBoolean(false);

我们代码中有这么几个用来记录连接状态的变量:

  • retries:记录已重试次数
  • MAX_RETRIES:记录最大的重试次数
  • connected:记录客户端应用是否已连接
ChannelFuture future = bootstrap.connect(HOST, PORT);
future.addListener(future1 -> {
    if (future.isSuccess()) {
        connected.set(true);
        retries.set(1);
    } else {
        connected.set(false);
        System.out.println("出错:" + future1.cause().getMessage());
    }
});

上面代码中,我们可以通过future.isSuccess判断连接是否成功,如果连接成功,就将应用状态设置为已连接,重试变量设为1。除上述代码外,我们还有一个额外的线程用来监控连接状态,一旦状态变为未连接,就开始重试。

// 进入断线监控循环
while (true) {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (!connected.get() && retries.getAndIncrement() <= MAX_RETRIES) {
        // 断线,停止监控循环,重新connect次
        System.out.println("第" + (retries.get() - 1) + "次重试...");
        connect();
        break;
    } else if(retries.get() > MAX_RETRIES) {
        System.out.println("不行了。。。");
        System.exit(-1);
    }
}

我们可以在主线程的最后,添加上述逻辑,或者新开一个线程。

当然,客户端可能并不是Netty写的,那就需要使用对应的应用框架实现相应的逻辑,这里不多作介绍了。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap