基于TCP传输二进制报文

上一篇笔记中我们实现了简单的TCP服务器和客户端,但我们仍遗留了一个问题,之前我们试图使用TCP协议收发“消息”,但可能会出现多条消息黏在一起或一条消息被拆开处理的情况,这篇笔记我们通过设计消息分割机制来解决这个问题。

什么是流传输协议

TCP是一种流传输协议,以读数据为例,操作系统网络协议栈的传输层收到数据后,数据可能会进入缓冲区,何时将缓冲区内的数据交给应用程序处理则跟协议栈的具体实现有关,它可以理解为设计上就是不确定的,我们只能以流的方式从连接中连续读取数据,没有数据就阻塞等待,发送一条“消息”在接收方则不一定是收到一条单一、完整的“消息”,此时收发消息时就可能会出现多条消息黏在一起或一条消息被拆开处理的情况。

有一些人喜欢将这种现象称为TCP的“粘包”和“拆包”问题,但我个人认为这个说法极具误导性,因为TCP本身就是一种流传输协议,协议中根本不存在“包”这个概念,所谓的“粘包”和“拆包”也就无从谈起了。我们想要传输的“包”这个东西,其实本质上是需要我们基于TCP之上再设计一种应用层协议,这个应用层协议才是以“包”作为数据的传输单元。为了避免误导大家,后文不会再出现“粘包”、“拆包”字样。

包分割机制

传输层基于TCP协议设计一种基于“包”的应用层协议非常简单,具体来说有几种方式:

  1. 定长数据包:例如将所有数据包都设计为512字节,那么我们每读取到512字节就看作获得了一个完整数据包,继续读取的数据则是下一个包的
  2. 附加包长度前缀:例如每个数据包前加一个4字节前缀标识包长度,读取到包长度后继续读取指定长度的字节数作为一个完整数据包
  3. 基于包分隔符:例如使用\r\n作为包分隔符,那么我们就连续读取数据直到碰到\r\n结束,并将读取到的内容作为一个完整数据包,但使用这种方式就要考虑包内是否有数据和分隔符冲突

实现有包分割机制的客户端和服务端

得益于设计精良的标准库,Go语言实现包分割机制非常简洁易懂,这里我们以一个简单的例子形式进行介绍,下面例子中客户端可以向服务端发送真正的“消息”,这些消息带有包分割机制,不会出现黏在一起或被拆开处理的情况,出于简单起见这次我们的服务端就不回复内容了,因为回复消息的代码也是类似的。

结构体序列化和反序列化

在具体编写网络服务端和客户端之前,我们需要先定义什么是包(或者称之为消息、报文等)以及它的数据结构,下面代码我们设计了一个结构体,并编写了这个结构体和[]byte二进制数据之间的转换方法。

package message

import (
    "bytes"
    "encoding/binary"
    "io"
)

type Message struct {
    ID   uint32
    Data string
}

func (m *Message) Marshal() ([]byte, error) {
    buffer := &bytes.Buffer{}
    dataBytes := []byte(m.Data)
    // 写入消息长度,占用4字节
    if err := binary.Write(buffer, binary.BigEndian, uint32(4+len(dataBytes))); err != nil {
        return nil, err
    }
    // 写入ID字段,占用4字节
    if err := binary.Write(buffer, binary.BigEndian, m.ID); err != nil {
        return nil, err
    }
    // 写入Data字段,不定长度
    if err := binary.Write(buffer, binary.BigEndian, dataBytes); err != nil {
        return nil, err
    }
    return buffer.Bytes(), nil
}

func (m *Message) Unmarshal(data []byte) error {
    reader := bytes.NewReader(data)
    // 读取ID字段
    if err := binary.Read(reader, binary.BigEndian, &m.ID); err != nil {
        return err
    }
    // 读取Data字段
    dataBytes, err := io.ReadAll(reader)
    if err != nil {
        return err
    }
    m.Data = string(dataBytes)
    return nil
}

上面代码中,我们用到了标准库中的encoding/binary模块,它用于顺序读写字节缓冲区,它在网络编程中非常常用。

我们定义了一个结构体Message,它有一个4字节的ID字段,还有一个不定长度的Data字符串字段。代码中,Marshal方法用于将结构体序列化为二进制字节数组,序列化时我们会先计算并写入结构体各个字段总共需要的字节数,然后顺序写入各个字段的二进制数据;Unmarshal方法用于将字节数组转换回结构体,即反序列化,代码中我们先读取固定4字节的ID字段,然后将其余所有数据读取出来作为Data字段。

服务端代码实现

服务端代码的大致框架和之前章节的类似,我们主要关注handleConn()函数如何处理和分割包的。

package main

import (
    "bufio"
    "demo-go/message"
    "encoding/binary"
    "io"
    "log/slog"
    "net"
    "strconv"
)

func handleConn(conn net.Conn) {
    defer func(conn net.Conn) {
        err := conn.Close()
        if err != nil {
            slog.Error(err.Error())
            return
        }
    }(conn)
    reader := bufio.NewReader(conn)
    for {
        // 获取消息长度
        lengthBuffer := make([]byte, 4)
        if _, err := io.ReadFull(reader, lengthBuffer); err != nil {
            if err == io.EOF {
                slog.Info("Client EOF disconnected")
            } else {
                slog.Error(err.Error())
            }
            return
        }
        length := binary.BigEndian.Uint32(lengthBuffer)

        // 根据消息长度读取消息
        dataBuffer := make([]byte, length)
        if _, err := io.ReadFull(reader, dataBuffer); err != nil {
            if err == io.EOF {
                slog.Info("Client EOF disconnected")
            } else {
                slog.Error(err.Error())
            }
            return
        }

        // 反序列化消息
        var msg message.Message
        if err := msg.Unmarshal(dataBuffer); err != nil {
            slog.Error(err.Error())
            return
        }

        // 打印消息
        slog.Info("[" + strconv.Itoa(int(msg.ID)) + "] " + msg.Data)
    }
}

func main() {
    listener, err := net.Listen("tcp", "192.168.1.100:8000")
    if err != nil {
        slog.Error(err.Error())
        return
    }
    slog.Info("TCP server listening on 192.168.1.100:8000...")
    for {
        conn, err := listener.Accept()
        if err != nil {
            slog.Error(err.Error())
            continue
        }
        slog.Info("Connected from " + conn.RemoteAddr().String())
        go handleConn(conn)
    }
}

代码中,handleConn()函数我们每次循环时都会先读取4字节的包长度,这里我们用到了io.ReadFull()函数,它的作用是从Reader中读取指定字节的数据,如果数据不足就会阻塞等待直到获得指定字节数的数据或是报错。获取数据包长度后,我们继续使用io.ReadFull()读取指定长度字节数的数据,它会阻塞直到我们获取完整的数据包。最终我们调用之前定义的反序列化方法将数据重新组装为Message结构体并打印出来。

此外还要注意,io.ReadFull()对于数据量不大的数据包是简洁和有效的,但如果读取的是一个较大的数据包如文件、数据块等,还是需要我们实现手动循环读取。

客户端代码实现

下面是客户端部分具体的实现代码。

package main

import (
    "bufio"
    "demo-go/message"
    "log/slog"
    "net"
    "os"
)

func main() {
    conn, err := net.Dial("tcp", "192.168.1.100:8000")
    if err != nil {
        slog.Error(err.Error())
        return
    }
    defer func(conn net.Conn) {
        err := conn.Close()
        if err != nil {
            slog.Error(err.Error())
            return
        }
    }(conn)
    reader := bufio.NewReader(os.Stdin)
    writer := bufio.NewWriter(conn)
    var idx uint32 = 0
    for {
        // 从标准输入读取用户输入的内容
        line, err := reader.ReadString('\n')
        if err != nil {
            slog.Error(err.Error())
            return
        }
        // 序列化数据
        msg := message.Message{ID: idx, Data: line}
        msgBytes, err := msg.Marshal()
        if err != nil {
            slog.Error(err.Error())
            return
        }
        // 写入TCP连接
        if _, err := writer.Write(msgBytes); err != nil {
            slog.Error(err.Error())
            return
        }
        if err := writer.Flush(); err != nil {
            slog.Error(err.Error())
            return
        }
        idx++
    }
}

代码中,我们从标准输入读取数据并组装为Message结构体,然后调用Marshal()方法将其序列化为字节数组并写入TCP套接字。

运行服务端和客户端

我们运行服务端和客户端,并在客户端输入内容,可以看到服务端收到了消息。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap