WebSocket服务器

WebSocket是一个基于消息的应用层协议(传输层基于TCP),而且浏览器中JavaScript脚本对于处理Json和XML是非常擅长的,所以WebSocket经常用于传输文本格式的Json或XML数据。因此,我们定义自己的协议报文其实非常简单,也不用像TCP一样考虑变长的报文拆装等问题了,直接传一个Json字符串即可。

实际上,WebSocket因为简单易用,相关框架多,在游戏、即时通讯等领域应用非常广泛。

这篇笔记中,我们介绍如何使用Netty实现服务端和浏览器端的WebSocket通信。

例子代码

这里我们实现一个和前篇一模一样的IM功能,支持消息和心跳两种Json报文格式,只不过客户端是浏览器。在服务端,我们使用com.fasterxml.jackson这个库作为Json报文的序列化和反序列化库。

服务端

WsImServer.java

package com.gacfox.demo.netty.ws.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WsImServer {

    private static Logger logger = LoggerFactory.getLogger(WsImServer.class);

    public static void main(String[] args) {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WsImServerInitializer());
            serverBootstrap
                    .bind(8080).sync()
                    .channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
        }
    }
}

WsServerInitializer.java

package com.gacfox.demo.netty.ws.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class WsImServerInitializer extends ChannelInitializer<SocketChannel> {

    private WsImServerHandler wsImServerHandler = new WsImServerHandler();

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // HTTP协议编解码器
        pipeline.addLast(new HttpServerCodec());
        // HTTP协议组装
        pipeline.addLast(new HttpObjectAggregator(65535));
        // 分块写入
        pipeline.addLast(new ChunkedWriteHandler());
        // WebSocket协议处理器
        pipeline.addLast(new WebSocketServerProtocolHandler("/", null, true));
        // 心跳超时处理器
        pipeline.addLast(new IdleStateHandler(5, 7, 10, TimeUnit.SECONDS));
        pipeline.addLast(new WsIdleEventHandler());
        // 业务逻辑处理器
        pipeline.addLast(wsImServerHandler);
    }
}

这里注意pipeline的配置,了解WebSocket协议就知道,WebSocket建立连接时握手还是基于HTTP,所以首先配置了HttpServerCodecHttpObjectAggregator这两个组件。

ChunkedWriteHandler能够实现分块写入,防止数据包太大导致JVM内存溢出。

WebSocketServerProtocolHandler用于处理WebSocket协议,第一个参数是服务端运行的地址。

关于心跳超时处理,和前一篇是一模一样的,这里就不再赘述了。

WsImServerHandler.java

package com.gacfox.demo.netty.ws.server;

import com.gacfox.demo.netty.ws.model.Sms;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
public class WsImServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * ChannelGroup其实可以看成包含很多Channel的线程安全Set
     */
    private ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    private Logger logger = LoggerFactory.getLogger(this.getClass());
    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        broadcast("[" + ctx.channel().remoteAddress() + "] " + " 加入聊天");
        logger.info(ctx.channel().remoteAddress() + " 加入聊天");
        channels.add(ctx.channel());
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        broadcast("[" + ctx.channel().remoteAddress() + "] " + " 退出聊天");
        logger.info(ctx.channel().remoteAddress() + " 退出聊天");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        Sms sms = objectMapper.readValue(frame.text(), Sms.class);
        if (sms != null) {
            if ("TYPE_SMS".equals(sms.getSmsType())) {
                broadcast("[" + ctx.channel().remoteAddress() + "] " + sms.getContent());
                logger.info(ctx.channel().remoteAddress() + " 收到消息");
            } else if ("TYPE_HEART_BEAT".equals(sms.getSmsType())) {
                echoHeartBeat(ctx.channel());
                logger.info(ctx.channel().remoteAddress() + " 收到心跳");
            }
        }
    }

    /**
     * 向一个Channel回复一个心跳包
     *
     * @param channel 通道
     */
    private void echoHeartBeat(Channel channel) {
        Sms sms = new Sms();
        sms.setId(1);
        sms.setContent("Heart beat echo!");
        sms.setSmsType("TYPE_HEART_BEAT");
        try {
            String smsStr = objectMapper.writeValueAsString(sms);
            TextWebSocketFrame frame = new TextWebSocketFrame(smsStr);
            channel.writeAndFlush(frame);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 向所有Channel广播消息
     *
     * @param message 消息字符串
     */
    private void broadcast(String message) {
        Sms sms = new Sms();
        sms.setId(1);
        sms.setContent(message);
        sms.setSmsType("TYPE_SMS");
        try {
            String smsStr = objectMapper.writeValueAsString(sms);
            TextWebSocketFrame frame = new TextWebSocketFrame(smsStr);
            channels.writeAndFlush(frame);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

对于WebSocket协议报文,我们需要处理的其实是Netty为我们包装的WebSocketFrame对象,而这里我们使用的是Json文本报文,因此直接使用其子类TextWebSocketFrame即可。具体逻辑非常简单,和之前其实是一样的,只不过把protobuf给换成了Json的序列化和反序列化器。

浏览器客户端

index.html

<!doctype html>
<html lang="zh-cmn-Hans">
<head>
    <meta charset="UTF-8">
    <meta name="viewport"
          content="width=device-width, user-scalable=no, initial-scale=1.0, maximum-scale=1.0, minimum-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">

    <script src="index.js"></script>
    <title>ws测试</title>
</head>
<body>
    <div id="info-block"></div>
    <div>
        <label for="reply">回复:</label>
        <textarea id="reply"></textarea>
        <button id="reply-btn">发送消息</button>
    </div>
</body>
</html>

index.js

window.onload = function () {
    var infoBlock = document.getElementById('info-block');
    var reply = document.getElementById('reply');
    var replyBtn = document.getElementById('reply-btn');

    var socket = new WebSocket('ws://127.0.0.1:8080/');
    // 收到消息时,解析json并显示
    socket.onmessage = function (msg) {
        var sms = JSON.parse(msg.data);
        if (sms.smsType === 'TYPE_SMS') {
            infoBlock.innerText += sms.content;
        }
    };
    // 发送消息时,使用序列化为json并写入socket
    replyBtn.onclick = function () {
        if (socket.readyState === WebSocket.OPEN) {
            var wsMsg = JSON.stringify({ id: 1, content: reply.value, smsType: 'TYPE_SMS' });
            socket.send(wsMsg);
        }
    };
    // 发送心跳包
    setInterval(function () {
        if (socket.readyState === WebSocket.OPEN) {
            var wsMsg = JSON.stringify({ id: 1, content: '', smsType: 'TYPE_HEART_BEAT' });
            socket.send(wsMsg);
        }
    }, 1000);
};

浏览器端就非常简单了,JavaScript天生支持Json,只需要调用WebSocket相关接口,读取消息和写入消息即可。

关于protobuf

实际上,我们也可以用WebSocket来传输protobuf序列化的二进制数据,protobuf比文本的json更加高效一些,但大多数情况Json已经足够使用了,而且文本格式方便调试,这里就不多介绍了。

关于帧长度

我们知道WebSocket虽然是基于消息的协议,而且也没有具体限制消息报文的大小,但消息也不能是无限长的,否则无论是网络可靠性,还是服务端内存,都是无法支撑的。

Netty默认WebSocket的最大消息大小是65535字节(64KB),这已经够大了,因此不建议再调的更大,客户端最好不要一次发送过大的消息,而是应在应用层上划分数据。比如传送一个较大的二进制图片,那么实际上我们可以将图片划分为若干个消息报文,每段报文注明所包含字节的偏移量,由接收端负责组装。

Netty中,如果向服务端发送了一个超过64KB的消息,WebSocket连接会立即断开并报错。

io.netty.handler.codec.http.websocketx.CorruptedWebSocketFrameException: Max frame length of 65536 has been exceeded.
    at io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder.protocolViolation(WebSocket08FrameDecoder.java:426)
    at io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder.decode(WebSocket08FrameDecoder.java:286)
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:505)
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:444)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:283)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap