上一篇笔记中我们实现了简单的TCP服务器和客户端,但我们仍遗留了一个问题,之前我们试图使用TCP协议收发“消息”,但可能会出现多条消息黏在一起或一条消息被拆开处理的情况,这篇笔记我们通过设计消息分割机制来解决这个问题。
TCP是一种流传输协议,以读数据为例,操作系统网络协议栈的传输层收到数据后,数据可能会进入缓冲区,何时将缓冲区内的数据交给应用程序处理则跟协议栈的具体实现有关,它可以理解为设计上就是不确定的,我们只能以流的方式从连接中连续读取数据,没有数据就阻塞等待,发送一条“消息”在接收方则不一定是收到一条单一、完整的“消息”,此时收发消息时就可能会出现多条消息黏在一起或一条消息被拆开处理的情况。
有一些人喜欢将这种现象称为TCP的“粘包”和“拆包”问题,但我个人认为这个说法极具误导性,因为TCP本身就是一种流传输协议,协议中根本不存在“包”这个概念,所谓的“粘包”和“拆包”也就无从谈起了。我们想要传输的“包”这个东西,其实本质上是需要我们基于TCP之上再设计一种应用层协议,这个应用层协议才是以“包”作为数据的传输单元。为了避免误导大家,后文不会再出现“粘包”、“拆包”字样。
传输层基于TCP协议设计一种基于“包”的应用层协议非常简单,具体来说有几种方式:
\r\n
作为包分隔符,那么我们就连续读取数据直到碰到\r\n
结束,并将读取到的内容作为一个完整数据包,但使用这种方式就要考虑包内是否有数据和分隔符冲突得益于设计精良的标准库,Go语言实现包分割机制非常简洁易懂,这里我们以一个简单的例子形式进行介绍,下面例子中客户端可以向服务端发送真正的“消息”,这些消息带有包分割机制,不会出现黏在一起或被拆开处理的情况,出于简单起见这次我们的服务端就不回复内容了,因为回复消息的代码也是类似的。
在具体编写网络服务端和客户端之前,我们需要先定义什么是包(或者称之为消息、报文等)以及它的数据结构,下面代码我们设计了一个结构体,并编写了这个结构体和[]byte
二进制数据之间的转换方法。
package message
import (
"bytes"
"encoding/binary"
"io"
)
type Message struct {
ID uint32
Data string
}
func (m *Message) Marshal() ([]byte, error) {
buffer := &bytes.Buffer{}
dataBytes := []byte(m.Data)
// 写入消息长度,占用4字节
if err := binary.Write(buffer, binary.BigEndian, uint32(4+len(dataBytes))); err != nil {
return nil, err
}
// 写入ID字段,占用4字节
if err := binary.Write(buffer, binary.BigEndian, m.ID); err != nil {
return nil, err
}
// 写入Data字段,不定长度
if err := binary.Write(buffer, binary.BigEndian, dataBytes); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func (m *Message) Unmarshal(data []byte) error {
reader := bytes.NewReader(data)
// 读取ID字段
if err := binary.Read(reader, binary.BigEndian, &m.ID); err != nil {
return err
}
// 读取Data字段
dataBytes, err := io.ReadAll(reader)
if err != nil {
return err
}
m.Data = string(dataBytes)
return nil
}
上面代码中,我们用到了标准库中的encoding/binary
模块,它用于顺序读写字节缓冲区,它在网络编程中非常常用。
我们定义了一个结构体Message
,它有一个4字节的ID
字段,还有一个不定长度的Data
字符串字段。代码中,Marshal
方法用于将结构体序列化为二进制字节数组,序列化时我们会先计算并写入结构体各个字段总共需要的字节数,然后顺序写入各个字段的二进制数据;Unmarshal
方法用于将字节数组转换回结构体,即反序列化,代码中我们先读取固定4字节的ID
字段,然后将其余所有数据读取出来作为Data
字段。
服务端代码的大致框架和之前章节的类似,我们主要关注handleConn()
函数如何处理和分割包的。
package main
import (
"bufio"
"demo-go/message"
"encoding/binary"
"io"
"log/slog"
"net"
"strconv"
)
func handleConn(conn net.Conn) {
defer func(conn net.Conn) {
err := conn.Close()
if err != nil {
slog.Error(err.Error())
return
}
}(conn)
reader := bufio.NewReader(conn)
for {
// 获取消息长度
lengthBuffer := make([]byte, 4)
if _, err := io.ReadFull(reader, lengthBuffer); err != nil {
if err == io.EOF {
slog.Info("Client EOF disconnected")
} else {
slog.Error(err.Error())
}
return
}
length := binary.BigEndian.Uint32(lengthBuffer)
// 根据消息长度读取消息
dataBuffer := make([]byte, length)
if _, err := io.ReadFull(reader, dataBuffer); err != nil {
if err == io.EOF {
slog.Info("Client EOF disconnected")
} else {
slog.Error(err.Error())
}
return
}
// 反序列化消息
var msg message.Message
if err := msg.Unmarshal(dataBuffer); err != nil {
slog.Error(err.Error())
return
}
// 打印消息
slog.Info("[" + strconv.Itoa(int(msg.ID)) + "] " + msg.Data)
}
}
func main() {
listener, err := net.Listen("tcp", "192.168.1.100:8000")
if err != nil {
slog.Error(err.Error())
return
}
slog.Info("TCP server listening on 192.168.1.100:8000...")
for {
conn, err := listener.Accept()
if err != nil {
slog.Error(err.Error())
continue
}
slog.Info("Connected from " + conn.RemoteAddr().String())
go handleConn(conn)
}
}
代码中,handleConn()
函数我们每次循环时都会先读取4字节的包长度,这里我们用到了io.ReadFull()
函数,它的作用是从Reader
中读取指定字节的数据,如果数据不足就会阻塞等待直到获得指定字节数的数据或是报错。获取数据包长度后,我们继续使用io.ReadFull()
读取指定长度字节数的数据,它会阻塞直到我们获取完整的数据包。最终我们调用之前定义的反序列化方法将数据重新组装为Message
结构体并打印出来。
此外还要注意,io.ReadFull()
对于数据量不大的数据包是简洁和有效的,但如果读取的是一个较大的数据包如文件、数据块等,还是需要我们实现手动循环读取。
下面是客户端部分具体的实现代码。
package main
import (
"bufio"
"demo-go/message"
"log/slog"
"net"
"os"
)
func main() {
conn, err := net.Dial("tcp", "192.168.1.100:8000")
if err != nil {
slog.Error(err.Error())
return
}
defer func(conn net.Conn) {
err := conn.Close()
if err != nil {
slog.Error(err.Error())
return
}
}(conn)
reader := bufio.NewReader(os.Stdin)
writer := bufio.NewWriter(conn)
var idx uint32 = 0
for {
// 从标准输入读取用户输入的内容
line, err := reader.ReadString('\n')
if err != nil {
slog.Error(err.Error())
return
}
// 序列化数据
msg := message.Message{ID: idx, Data: line}
msgBytes, err := msg.Marshal()
if err != nil {
slog.Error(err.Error())
return
}
// 写入TCP连接
if _, err := writer.Write(msgBytes); err != nil {
slog.Error(err.Error())
return
}
if err := writer.Flush(); err != nil {
slog.Error(err.Error())
return
}
idx++
}
}
代码中,我们从标准输入读取数据并组装为Message
结构体,然后调用Marshal()
方法将其序列化为字节数组并写入TCP套接字。
我们运行服务端和客户端,并在客户端输入内容,可以看到服务端收到了消息。