实现流媒体实时通讯系统

前言

最近学习了一下 webrtc 的相关知识,于是就想着自己实现一个完整一点的项目,本文记录一下项目的开发思路及过程。

预览

githubhttps://github.com/lvboda/quick-chat

测试用户1账号/密码:useruser1/123456

测试用户2账号/密码:useruser2/123456

注意:请不要在一个浏览器同时登陆两个账号!

这里就不放截图了,部署在服务器上想看的直接去看就可以了。

思路

首先明确一下整体思路

项目为前后端分离的单体架构,前端 Vue 框架,后端 Gin 框架,数据库为 mysql。

后端实现登陆、注册等常规功能接口,并向前端提供一个 socket 服务用于传输实时消息及 RTC 的转发服务。

而流媒体实时通讯主要是在前端,使用 RTCPeerConnection(底层)可以实现 p2p 的流媒体数据传输。

数据库主要存储用户信息和关系数据。

还有一点是需要搭建一个 TURN 中继服务作为 NAT 穿透失败的候选

思路捋清了就开始干活。

数据库

表的设计比较简单,一共四张表 chat_record(聊天记录表)、community(群聊表)、user_base(用户基础信息表)、user_relation(用户关系表),主要就讲一下 user_base 和 user_relation 这两张表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
DROP TABLE IF EXISTS `user_base`;
CREATE TABLE `user_base` (
`id` varchar(32) NOT NULL COMMENT '用户id',
`created_at` datetime(3) DEFAULT NULL COMMENT '创建时间',
`updated_at` datetime(3) DEFAULT NULL COMMENT '更新时间',
`deleted_at` datetime(3) DEFAULT NULL COMMENT '删除时间',
`nick_name` varchar(32) NOT NULL DEFAULT '' COMMENT '用户昵称',
`user_id` varchar(32) NOT NULL DEFAULT '' COMMENT '用户id',
`password` varchar(100) NOT NULL DEFAULT '' COMMENT '用户密码',
`user_role` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '用户类型: 1正常用户 2封禁用户 3管理员',
`gender` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '用户性别: 1男 2女',
`signature` varchar(255) NOT NULL DEFAULT '' COMMENT '用户个人签名',
`mobile` varchar(16) NOT NULL DEFAULT '' COMMENT '手机号码',
`face` varchar(100) NOT NULL DEFAULT '' COMMENT '头像',
`extend1` varchar(100) NOT NULL DEFAULT '' COMMENT '扩展字段1',
`extend2` varchar(100) NOT NULL DEFAULT '' COMMENT '扩展字段2',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户基础信息表';

用户的信息表,也没什么好说的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
DROP TABLE IF EXISTS `user_relation`;
CREATE TABLE `user_relation` (
`id` varchar(32) NOT NULL COMMENT '关系id',
`created_at` datetime(3) DEFAULT NULL COMMENT '创建时间',
`updated_at` datetime(3) DEFAULT NULL COMMENT '更新时间',
`deleted_at` datetime(3) DEFAULT NULL COMMENT '删除时间',
`user_id` varchar(32) NOT NULL COMMENT '用户id',
`friend_id` varchar(32) NOT NULL COMMENT '好友id',
`relation_type` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '关系类型: 1验证 2双向关系 3单项被删除关系',
`role_type` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '角色类型: 1好友 2群聊',
`memo` varchar(120) DEFAULT NULL COMMENT '描述',
`extend` varchar(100) NOT NULL DEFAULT '' COMMENT '扩展字段',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='用户关系表';

用户的关系表,这张表存两个用户或用户与群聊的关系,relation_type 这个字段用于表明关系类型,举个例子:

用户1给用户2发送了添加好友的验证请求则在表里插入一条数据 user_id 为用户1,friend_id 为用户2,relation_type 为1。

用户2同意了用户1的添加好友的验证请求则更新刚才数据的 relation_type 为2并新增一条 user_id 为用户2,friend_id 为用户1,relation_type 为2的数据。

为什么同意好友请求需要新增一条数据呢?

因为同意了好友验证则用户1和用户2为双向的好友关系,所以用户1和用户2都要有自己的关系数据,这样在查的时候都能查到了。

role_type 为角色类型,1为好友 2为群聊,如果是2那么 friend_id 为群聊 id。

聊天记录表本来是想做离线的聊天记录存储的,但是后来没用到,直接存内存了,所以这里就不说了。

后端

后端这块就说一下好友关系和 socket 服务的相关实现,像什么登陆、注册、权限验证的就不讲了,比较简单,直接看代码就可以。

好友关系相关接口

前面数据库那里大概说了一下好友关系是怎么算的,如果懂了的话就跳过这里。

发送验证信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func SendValidate(c *gin.Context) {
var relation model.RelationEntity
var query struct {
UserId string `binding:"required"`
FriendId string `binding:"required"`
Memo string `binding:"required"`
RoleType int `binding:"required"`
}

relation.UserId = query.UserId
relation.FriendId = query.FriendId
relation.Memo = query.Memo
relation.RoleType = query.RoleType
relation.RelationType = 1

if code := relation.Insert(); code != status.SUCCESS {
c.AbortWithStatusJSON(http.StatusOK, status.GetResponse(status.ERROR_RELATION_VALIDATE_SEND, nil, nil))
return
}

c.JSON(http.StatusOK, status.GetResponse(status.SUCCESS, nil, nil))
}

关系表插入一条 relation_type 为1的数据

添加关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (relation RelationEntity) AddFriend() int {
err := utils.Db.Transaction(func(tx *gorm.DB) error {
updateFields := map[string]any{
"user_id": relation.UserId,
"friend_id": relation.FriendId,
"relation_type": relation.RelationType,
}

if err := tx.Model(&relation).Where("user_id = ? AND friend_id = ?", relation.UserId, relation.FriendId).Updates(&updateFields).Error; err != nil {
return err
}

relation.Id = utils.UUID()
temp := relation.UserId
relation.UserId = relation.FriendId
relation.FriendId = temp
if err := tx.Create(&relation).Error; err != nil {
return err
}

return nil
})

if err != nil {
return status.ERROR
}
return status.SUCCESS
}

添加关系也就是同意好友验证也可以理解为创建双向关系,就两步:

1、更新 relation_type 为1的那条数据为2。

2、把更新的那条数据的 user_id 和 friend_id 反过来,relation_type 为2插入新的数据。

这里走了一个事务。

删除关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (relation RelationEntity) RemoveFriend() int {
err := utils.Db.Transaction(func(tx *gorm.DB) error {
updateFields := map[string]any{
"user_id": relation.UserId,
"friend_id": relation.FriendId,
"relation_type": relation.RelationType,
}

if err := tx.Model(&relation).Where("user_id = ? AND friend_id = ?", relation.UserId, relation.FriendId).Updates(&updateFields).Error; err != nil {
return err
}

if err := tx.Where("user_id = ? AND friend_id = ?", relation.FriendId, relation.UserId).Delete(&relation).Error; err != nil {
return err
}

return nil
})

if err != nil {
return status.ERROR
}
return status.SUCCESS
}

删除关系两步:

1、根据 user_id 和 friend_id 查找数据并更新 relation_type 为3,3表示单项被删除关系。

2、调换 user_id 和 friend_id 查找数据并删除。

因为删除好友了就是单向关系了所以删除一个关系,同样也走事务。

查询关系

查询接口三个入参 FriendId、RelationType、RoleType,就比较开放了,怎么查全靠前端传参,我这里大概说一下。

查询验证消息:FriendId为当前登陆用户id、RelationType为1

查询好友列表:FriendId为当前登陆用户id、RelationType为2

查询删除好友:FriendId为当前登陆用户id、RelationType为3

RoleType1为好友关系,2为群聊

socket服务

socket 服务用于实时传输数据和作为 RTC 的转发

首先看一下 model 层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
var globalNodeGroup = model.NewNodeGroup()

// 全局node存储 {key:uid value:node}
type NodeGroup struct {
NodeMap map[string]*Node
Locker sync.RWMutex // 涉及到多线程操作的全局数据,加锁
}

// NewNodeGroup 新建NodeGroup 全局维护一个
func NewNodeGroup() *NodeGroup {
var flag bool
var ng *NodeGroup
return func() *NodeGroup {
if flag {
return ng
} else {
flag = true
ng = &NodeGroup{
NodeMap: make(map[string]*Node),
Locker: sync.RWMutex{},
}
return ng
}
}()
}

// 一个用户对应一个node
type Node struct {
// websocket连接
Conn *websocket.Conn
// 数据存储队列
DataQueue chan []byte
// 群id的set
GroupSets set.Interface
}

// NewNode 创建新的node
func NewNode(conn *websocket.Conn) *Node {
return &Node{
Conn: conn,
DataQueue: make(chan []byte, 50),
GroupSets: set.New(set.ThreadSafe),
}
}

可以理解为一个用户就是一个 Node,在用户登陆成功后前端发起 socket 连接并携带 user_id 参数,后端拿到 user_id 会创建 Node 并插入到全局的 node 存储也就是 globalNodeGroup,key为 user_id,如下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Chat(c *gin.Context, conn *websocket.Conn) {
uid := c.Param("uid")

node, ok := globalNodeGroup.Add(uid, conn)
if !ok {
return
}

utils.Logger.Infof("ws:用户%s连接成功", uid)
}
// Add 添加node
func (ng *NodeGroup) Add(id string, conn *websocket.Conn) (node *Node, ok bool) {
ng.Locker.Lock()
if _, has := ng.NodeMap[id]; !has {
node = NewNode(conn)
ng.NodeMap[id] = node
ok = true
}
ng.Locker.Unlock()
return
}

单聊消息收发

首先创建两个线程,一个处理发送数据,一个处理接收数据

1
2
3
4
5
6
7
8
9
10
11
12
13
func Chat(c *gin.Context, conn *websocket.Conn) {
uid := c.Param("uid")

node, ok := globalNodeGroup.Add(uid, conn)
if !ok {
return
}

utils.Logger.Infof("ws:用户%s连接成功", uid)

go sendLoop(uid, node) // 发送
go receiveLoop(uid, node) // 接受
}

先看一下 sendLoop 的实现,比较简单,它是一个死循环,一直在等待 DataQueue 有新数据,拿到新数据后调用 socket 的 WriteMessage 方法就相当于把数据推给了前端,至于数据是谁发的我们先不管:

1
2
3
4
5
6
7
8
9
10
11
12
// sendLoop 发送线程
func sendLoop(uid string, node *model.Node) {
for {
data := <-node.DataQueue
err := node.Conn.WriteMessage(websocket.TextMessage, data)

if err != nil {
closeChan <- true
return
}
}
}

receiveLoop 接收线程比较复杂,在这之前先看一下前后端传输的消息的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 传输消息结构
type Message struct {
// 消息id
Id string `json:"id"`
// 发送者id
SenderId string `json:"senderId"`
// 接受者id
ReceiverId string `json:"receiverId"`
// 消息内容
Content string `json:"content"`
// 附加信息
Extra string `json:"extra"`
// 消息类型 前端用来判断 后端不处理
ContentType int `json:"contentType"`
// 处理类型 见status
ProcessType int `json:"processType"`
// 发送时间
SendTime string `json:"sendTime"`
// 源数据
Resource []byte `json:"resource"`
}

然后看 receiveLoop 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// receiveLoop 接收线程
func receiveLoop(uid string, node *model.Node) {
// 一直去读数据,有新的数据进来了交给 dispatchProcess 处理,这个数据是前端传过来的
for {
_, data, err := node.Conn.ReadMessage()
if err != nil {
closeChan <- true
return
}

dispatchProcess(data)
}
}

// dispatchProcess 根据 ProcessType 分发不同的处理函数
func dispatchProcess(data []byte) {
// 做一个类型转换
msg := model.ToMessage(data)

switch msg.ProcessType {
case status.WS_PROCESS_SINGLE_MSG:
// 接收到的是单聊的消息
sendMessage(msg)
case status.WS_PROCESS_GROUP_MSG:
// 接收到的是群聊的消息
sendGroupMessage(msg)
case status.WS_PROCESS_CLOSE:
// 关闭 socket 连接
closeChan <- true
case status.WS_PROCESS_HEART:
// ❤️
}
}

看一下 sendMessage 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// sendMessage 发送消息
func sendMessage(msg model.Message) {
globalNodeGroup.SendMessage(msg)
}

// SendMessage 发送消息
func (ng *NodeGroup) SendMessage(msg Message) (ok bool) {
ng.Locker.Lock()
// 如果在全局的 nodeGroup 里找到了接收者的 Node 说明接收者在线,直接把数据推进接受者 Node 的 DataQueue 里
if node, has := ng.NodeMap[msg.ReceiverId]; has {
node.DataQueue <- msg.Resource
ok = true
}
ng.Locker.Unlock()
return
}

到这单聊的功能就完成了

群消息收发

紧接着上面的 dispatchProcess 分发函数往下走,看一下 sendGroupMessage 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
// SendGroupMessage 发送群消息
func (ng *NodeGroup) SendGroupMessage(msg Message) {
ng.Locker.Lock()
// 遍历每一个在线的 Node
for _, node := range ng.NodeMap {
// 如果 set 里有这个 ReceiverId,也就是群聊id,说明当前 Node 在这个群聊里
if node.GroupSets.Has(msg.ReceiverId) {
// 推数据
node.DataQueue <- msg.Resource
}
}
ng.Locker.Unlock()
}

GroupSets 里的群聊 id 是在入口处进行 push 的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func Chat(c *gin.Context, conn *websocket.Conn) {
// ...

go sendLoop(uid, node)
go receiveLoop(uid, node)
// 这里push
pushGroupId(uid, node)
// ...
}

// pushGroupId 添加群id,就是去关系表里查群关系数据,说白了就是查当前用户加入的群聊数据
func pushGroupId(uid string, node *model.Node) {
var query struct {
FriendId string
RelationType int
RoleType int
}
query.FriendId = uid
query.RelationType = 2
query.RoleType = 2
relationList, _ := model.RelationEntity{}.SelectListBy(query, query.RoleType)

for _, relation := range relationList {
if relation.CommunityInfo.CommunityId != "" {
// 把群聊 id push 进 GroupSets
node.GroupSets.Add(relation.CommunityInfo.CommunityId)
}
}
}

离线消息推送

离线消息,也就是接收者的 Node 没在 NodeGroup 中,即为离线,首先在全局维护了一个离线消息存储 map:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var globalOfflineGroup = model.NewOfflineGroup()

// 离线消息存储
type OfflineGroup struct {
// OfflineMap:{key: 接收者id,value:离线消息存储队列}
OfflineMap map[string][]Message
// 加锁
Locker sync.RWMutex
}

// NewOfflineGroup 新建OfflineGroup 全局维护一个
func NewOfflineGroup() *OfflineGroup {
var flag bool
var og *OfflineGroup
return func() *OfflineGroup {
if flag {
return og
} else {
flag = true
og = &OfflineGroup{
OfflineMap: map[string][]Message{},
Locker: sync.RWMutex{},
}
return og
}
}()
}

然后来到 sendMessage 函数,做了一个判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// sendMessage 发送消息
func sendMessage(msg model.Message) {
// 返回值如果为 false 则说明没有找到接收者 Node,即为离线状态
if ok := globalNodeGroup.SendMessage(msg); !ok {
// 把消息 push 到离线存储 map 中
globalOfflineGroup.Add(msg)
}
}

// Add 添加消息
func (og *OfflineGroup) Add(msg Message) {
og.Locker.Lock()
// 接收者 id 为 key
og.OfflineMap[msg.ReceiverId] = append(og.OfflineMap[msg.ReceiverId], msg)
og.Locker.Unlock()
}

那接收者上线是怎么收到这些离线消息的呢?回头来看入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Chat(c *gin.Context, conn *websocket.Conn) {
// ...

go sendLoop(uid, node)
go receiveLoop(uid, node)
// 这里把离线消息推过去
pushOfflineMsg(uid)
// ...
}

// pushOfflineMsg 离线消息推送
func pushOfflineMsg(uid string) {
// 如果有离线消息
if msgQueue, has := globalOfflineGroup.OfflineMap[uid]; has {
// 发送消息
for _, msg := range msgQueue {
sendMessage(msg)
}

// 清除
globalOfflineGroup.Delete(uid)
}
}

在建立 socket 连接成功后就去全局离线消息存储中检索,通过 user_id 查有没有当前 Node 的离线消息,如果有就直接发送并清除,这样就实现了离线消息的接受,确保用户在离线状态也不会发生消息丢失的情况。

这里有个问题,因为数据都是存在内存中,可能会出现问题,这个后续可以考虑存表或 redis 里。

前端

前端用到了 simple-peer 这个库,这个库是 RTCPeerConnection 的封装,使用起来比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
let peer: any = null;
let iceRemoteVideo: HTMLVideoElement | null = null;
let iceLocalVideo: HTMLVideoElement | null = null;
async function createPeer(isSender: boolean, receiverId: string) {
peer = new window.SimplePeer({ initiator: isSender, config: null });

peer.on("signal", (signal: any) => {
const msg = createMessage(receiverId, JSON.stringify(signal), 2);
chatClient?.send(msg);
});

peer.on("stream", (stream: any) => {
iceRemoteVideo = document.getElementById("iceRemoteVideo") as HTMLVideoElement;
if (iceRemoteVideo) iceRemoteVideo.srcObject = stream;
});

const webcamStream = await navigator.mediaDevices.getUserMedia({
video: true,
audio: true,
});

const ms = new MediaStream();
webcamStream.getVideoTracks().forEach((track) => {
ms.addTrack(track);
});

iceLocalVideo = document.getElementById("iceLocalVideo") as HTMLVideoElement;
if (iceLocalVideo) iceLocalVideo.srcObject = ms;

peer.addStream(webcamStream);

return () => {
if (iceRemoteVideo) iceRemoteVideo.srcObject = null;
if (iceLocalVideo) iceLocalVideo.srcObject = null;
peer.removeStream(webcamStream);
webcamStream.getTracks().forEach((item) => item.stop());
ms.getTracks().forEach((item) => item.stop());
peer.destroy();
};
}

async function onSignal(msg: Message) {
if (!peer) await createPeer(false, msg.senderId);
peer.signal(JSON.parse(msg.content));
}

代码贴上来看看就可以了,比较简单,主要是这篇字数有点太多了,前端就不细讲了。

最后

项目的 github 地址:https://github.com/lvboda/quick-chat

参考资料

实现流媒体实时通讯系统

https://lvboda-blog.pages.dev/183f.html

作者

Boda Lü

发布于

2022-11-15

更新于

2026-03-24

许可协议


评论