前言 最近学习了一下 webrtc 的相关知识,于是就想着自己实现一个完整一点的项目,本文记录一下项目的开发思路及过程。
预览 github :https://github.com/lvboda/quick-chat
测试用户1账号/密码 :useruser1/123456
测试用户2账号/密码 :useruser2/123456
注意:请不要在一个浏览器同时登陆两个账号!
这里就不放截图了,部署在服务器上想看的直接去看就可以了。
思路 首先明确一下整体思路
项目为前后端分离的单体架构,前端 Vue 框架,后端 Gin 框架,数据库为 mysql。
后端实现登陆、注册等常规功能接口,并向前端提供一个 socket 服务用于传输实时消息及 RTC 的转发服务。
而流媒体实时通讯主要是在前端,使用 RTCPeerConnection(底层)可以实现 p2p 的流媒体数据传输。
数据库主要存储用户信息和关系数据。
还有一点是需要搭建一个 TURN 中继服务作为 NAT 穿透失败的候选
思路捋清了就开始干活。
数据库 表的设计比较简单,一共四张表 chat_record(聊天记录表)、community(群聊表)、user_base(用户基础信息表)、user_relation(用户关系表),主要就讲一下 user_base 和 user_relation 这两张表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 DROP TABLE IF EXISTS `user_base`;CREATE TABLE `user_base` ( `id` varchar (32 ) NOT NULL COMMENT '用户id' , `created_at` datetime(3 ) DEFAULT NULL COMMENT '创建时间' , `updated_at` datetime(3 ) DEFAULT NULL COMMENT '更新时间' , `deleted_at` datetime(3 ) DEFAULT NULL COMMENT '删除时间' , `nick_name` varchar (32 ) NOT NULL DEFAULT '' COMMENT '用户昵称' , `user_id` varchar (32 ) NOT NULL DEFAULT '' COMMENT '用户id' , `password` varchar (100 ) NOT NULL DEFAULT '' COMMENT '用户密码' , `user_role` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '用户类型: 1正常用户 2封禁用户 3管理员' , `gender` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '用户性别: 1男 2女' , `signature` varchar (255 ) NOT NULL DEFAULT '' COMMENT '用户个人签名' , `mobile` varchar (16 ) NOT NULL DEFAULT '' COMMENT '手机号码' , `face` varchar (100 ) NOT NULL DEFAULT '' COMMENT '头像' , `extend1` varchar (100 ) NOT NULL DEFAULT '' COMMENT '扩展字段1' , `extend2` varchar (100 ) NOT NULL DEFAULT '' COMMENT '扩展字段2' , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT= '用户基础信息表' ;
用户的信息表,也没什么好说的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 DROP TABLE IF EXISTS `user_relation`;CREATE TABLE `user_relation` ( `id` varchar (32 ) NOT NULL COMMENT '关系id' , `created_at` datetime(3 ) DEFAULT NULL COMMENT '创建时间' , `updated_at` datetime(3 ) DEFAULT NULL COMMENT '更新时间' , `deleted_at` datetime(3 ) DEFAULT NULL COMMENT '删除时间' , `user_id` varchar (32 ) NOT NULL COMMENT '用户id' , `friend_id` varchar (32 ) NOT NULL COMMENT '好友id' , `relation_type` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '关系类型: 1验证 2双向关系 3单项被删除关系' , `role_type` tinyint unsigned NOT NULL DEFAULT '1' COMMENT '角色类型: 1好友 2群聊' , `memo` varchar (120 ) DEFAULT NULL COMMENT '描述' , `extend` varchar (100 ) NOT NULL DEFAULT '' COMMENT '扩展字段' , PRIMARY KEY (`id`) ) ENGINE= InnoDB DEFAULT CHARSET= utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT= '用户关系表' ;
用户的关系表,这张表存两个用户或用户与群聊的关系,relation_type 这个字段用于表明关系类型,举个例子:
用户1给用户2发送了添加好友的验证请求则在表里插入一条数据 user_id 为用户1,friend_id 为用户2,relation_type 为1。
用户2同意了用户1的添加好友的验证请求则更新刚才数据的 relation_type 为2并新增一条 user_id 为用户2,friend_id 为用户1,relation_type 为2的数据。
为什么同意好友请求需要新增一条数据呢?
因为同意了好友验证则用户1和用户2为双向的好友关系,所以用户1和用户2都要有自己的关系数据,这样在查的时候都能查到了。
role_type 为角色类型,1为好友 2为群聊,如果是2那么 friend_id 为群聊 id。
聊天记录表本来是想做离线的聊天记录存储的,但是后来没用到,直接存内存了,所以这里就不说了。
后端 后端这块就说一下好友关系和 socket 服务的相关实现,像什么登陆、注册、权限验证的就不讲了,比较简单,直接看代码就可以。
好友关系相关接口 前面数据库那里大概说了一下好友关系是怎么算的,如果懂了的话就跳过这里。
发送验证信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func SendValidate (c *gin.Context) { var relation model.RelationEntity var query struct { UserId string `binding:"required"` FriendId string `binding:"required"` Memo string `binding:"required"` RoleType int `binding:"required"` } relation.UserId = query.UserId relation.FriendId = query.FriendId relation.Memo = query.Memo relation.RoleType = query.RoleType relation.RelationType = 1 if code := relation.Insert(); code != status.SUCCESS { c.AbortWithStatusJSON(http.StatusOK, status.GetResponse(status.ERROR_RELATION_VALIDATE_SEND, nil , nil )) return } c.JSON(http.StatusOK, status.GetResponse(status.SUCCESS, nil , nil )) }
关系表插入一条 relation_type 为1的数据
添加关系 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 func (relation RelationEntity) AddFriend() int { err := utils.Db.Transaction(func (tx *gorm.DB) error { updateFields := map [string ]any{ "user_id" : relation.UserId, "friend_id" : relation.FriendId, "relation_type" : relation.RelationType, } if err := tx.Model(&relation).Where("user_id = ? AND friend_id = ?" , relation.UserId, relation.FriendId).Updates(&updateFields).Error; err != nil { return err } relation.Id = utils.UUID() temp := relation.UserId relation.UserId = relation.FriendId relation.FriendId = temp if err := tx.Create(&relation).Error; err != nil { return err } return nil }) if err != nil { return status.ERROR } return status.SUCCESS }
添加关系也就是同意好友验证也可以理解为创建双向关系,就两步:
1、更新 relation_type 为1的那条数据为2。
2、把更新的那条数据的 user_id 和 friend_id 反过来,relation_type 为2插入新的数据。
这里走了一个事务。
删除关系 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func (relation RelationEntity) RemoveFriend() int { err := utils.Db.Transaction(func (tx *gorm.DB) error { updateFields := map [string ]any{ "user_id" : relation.UserId, "friend_id" : relation.FriendId, "relation_type" : relation.RelationType, } if err := tx.Model(&relation).Where("user_id = ? AND friend_id = ?" , relation.UserId, relation.FriendId).Updates(&updateFields).Error; err != nil { return err } if err := tx.Where("user_id = ? AND friend_id = ?" , relation.FriendId, relation.UserId).Delete(&relation).Error; err != nil { return err } return nil }) if err != nil { return status.ERROR } return status.SUCCESS }
删除关系两步:
1、根据 user_id 和 friend_id 查找数据并更新 relation_type 为3,3表示单项被删除关系。
2、调换 user_id 和 friend_id 查找数据并删除。
因为删除好友了就是单向关系了所以删除一个关系,同样也走事务。
查询关系 查询接口三个入参 FriendId、RelationType、RoleType,就比较开放了,怎么查全靠前端传参,我这里大概说一下。
查询验证消息:FriendId为当前登陆用户id、RelationType为1
查询好友列表:FriendId为当前登陆用户id、RelationType为2
查询删除好友:FriendId为当前登陆用户id、RelationType为3
RoleType1为好友关系,2为群聊
socket服务 socket 服务用于实时传输数据和作为 RTC 的转发
首先看一下 model 层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 var globalNodeGroup = model.NewNodeGroup()type NodeGroup struct { NodeMap map [string ]*Node Locker sync.RWMutex } func NewNodeGroup () *NodeGroup { var flag bool var ng *NodeGroup return func () *NodeGroup { if flag { return ng } else { flag = true ng = &NodeGroup{ NodeMap: make (map [string ]*Node), Locker: sync.RWMutex{}, } return ng } }() } type Node struct { Conn *websocket.Conn DataQueue chan []byte GroupSets set.Interface } func NewNode (conn *websocket.Conn) *Node { return &Node{ Conn: conn, DataQueue: make (chan []byte , 50 ), GroupSets: set.New(set.ThreadSafe), } }
可以理解为一个用户就是一个 Node,在用户登陆成功后前端发起 socket 连接并携带 user_id 参数,后端拿到 user_id 会创建 Node 并插入到全局的 node 存储也就是 globalNodeGroup,key为 user_id,如下面代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func Chat (c *gin.Context, conn *websocket.Conn) { uid := c.Param("uid" ) node, ok := globalNodeGroup.Add(uid, conn) if !ok { return } utils.Logger.Infof("ws:用户%s连接成功" , uid) } func (ng *NodeGroup) Add(id string , conn *websocket.Conn) (node *Node, ok bool ) { ng.Locker.Lock() if _, has := ng.NodeMap[id]; !has { node = NewNode(conn) ng.NodeMap[id] = node ok = true } ng.Locker.Unlock() return }
单聊消息收发 首先创建两个线程,一个处理发送数据,一个处理接收数据
1 2 3 4 5 6 7 8 9 10 11 12 13 func Chat (c *gin.Context, conn *websocket.Conn) { uid := c.Param("uid" ) node, ok := globalNodeGroup.Add(uid, conn) if !ok { return } utils.Logger.Infof("ws:用户%s连接成功" , uid) go sendLoop(uid, node) go receiveLoop(uid, node) }
先看一下 sendLoop 的实现,比较简单,它是一个死循环,一直在等待 DataQueue 有新数据,拿到新数据后调用 socket 的 WriteMessage 方法就相当于把数据推给了前端,至于数据是谁发的我们先不管:
1 2 3 4 5 6 7 8 9 10 11 12 func sendLoop (uid string , node *model.Node) { for { data := <-node.DataQueue err := node.Conn.WriteMessage(websocket.TextMessage, data) if err != nil { closeChan <- true return } } }
receiveLoop 接收线程比较复杂,在这之前先看一下前后端传输的消息的数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 type Message struct { Id string `json:"id"` SenderId string `json:"senderId"` ReceiverId string `json:"receiverId"` Content string `json:"content"` Extra string `json:"extra"` ContentType int `json:"contentType"` ProcessType int `json:"processType"` SendTime string `json:"sendTime"` Resource []byte `json:"resource"` }
然后看 receiveLoop 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func receiveLoop (uid string , node *model.Node) { for { _, data, err := node.Conn.ReadMessage() if err != nil { closeChan <- true return } dispatchProcess(data) } } func dispatchProcess (data []byte ) { msg := model.ToMessage(data) switch msg.ProcessType { case status.WS_PROCESS_SINGLE_MSG: sendMessage(msg) case status.WS_PROCESS_GROUP_MSG: sendGroupMessage(msg) case status.WS_PROCESS_CLOSE: closeChan <- true case status.WS_PROCESS_HEART: } }
看一下 sendMessage 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func sendMessage (msg model.Message) { globalNodeGroup.SendMessage(msg) } func (ng *NodeGroup) SendMessage(msg Message) (ok bool ) { ng.Locker.Lock() if node, has := ng.NodeMap[msg.ReceiverId]; has { node.DataQueue <- msg.Resource ok = true } ng.Locker.Unlock() return }
到这单聊的功能就完成了
群消息收发 紧接着上面的 dispatchProcess 分发函数往下走,看一下 sendGroupMessage 的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 func (ng *NodeGroup) SendGroupMessage(msg Message) { ng.Locker.Lock() for _, node := range ng.NodeMap { if node.GroupSets.Has(msg.ReceiverId) { node.DataQueue <- msg.Resource } } ng.Locker.Unlock() }
GroupSets 里的群聊 id 是在入口处进行 push 的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func Chat (c *gin.Context, conn *websocket.Conn) { go sendLoop(uid, node) go receiveLoop(uid, node) pushGroupId(uid, node) } func pushGroupId (uid string , node *model.Node) { var query struct { FriendId string RelationType int RoleType int } query.FriendId = uid query.RelationType = 2 query.RoleType = 2 relationList, _ := model.RelationEntity{}.SelectListBy(query, query.RoleType) for _, relation := range relationList { if relation.CommunityInfo.CommunityId != "" { node.GroupSets.Add(relation.CommunityInfo.CommunityId) } } }
离线消息推送 离线消息,也就是接收者的 Node 没在 NodeGroup 中,即为离线,首先在全局维护了一个离线消息存储 map:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 var globalOfflineGroup = model.NewOfflineGroup()type OfflineGroup struct { OfflineMap map [string ][]Message Locker sync.RWMutex } func NewOfflineGroup () *OfflineGroup { var flag bool var og *OfflineGroup return func () *OfflineGroup { if flag { return og } else { flag = true og = &OfflineGroup{ OfflineMap: map [string ][]Message{}, Locker: sync.RWMutex{}, } return og } }() }
然后来到 sendMessage 函数,做了一个判断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func sendMessage (msg model.Message) { if ok := globalNodeGroup.SendMessage(msg); !ok { globalOfflineGroup.Add(msg) } } func (og *OfflineGroup) Add(msg Message) { og.Locker.Lock() og.OfflineMap[msg.ReceiverId] = append (og.OfflineMap[msg.ReceiverId], msg) og.Locker.Unlock() }
那接收者上线是怎么收到这些离线消息的呢?回头来看入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func Chat (c *gin.Context, conn *websocket.Conn) { go sendLoop(uid, node) go receiveLoop(uid, node) pushOfflineMsg(uid) } func pushOfflineMsg (uid string ) { if msgQueue, has := globalOfflineGroup.OfflineMap[uid]; has { for _, msg := range msgQueue { sendMessage(msg) } globalOfflineGroup.Delete(uid) } }
在建立 socket 连接成功后就去全局离线消息存储中检索,通过 user_id 查有没有当前 Node 的离线消息,如果有就直接发送并清除,这样就实现了离线消息的接受,确保用户在离线状态也不会发生消息丢失的情况。
这里有个问题,因为数据都是存在内存中,可能会出现问题,这个后续可以考虑存表或 redis 里。
前端 前端用到了 simple-peer 这个库,这个库是 RTCPeerConnection 的封装,使用起来比较简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 let peer : any = null ;let iceRemoteVideo : HTMLVideoElement | null = null ;let iceLocalVideo : HTMLVideoElement | null = null ;async function createPeer (isSender: boolean, receiverId: string ) { peer = new window .SimplePeer ({ initiator : isSender, config : null }); peer.on ("signal" , (signal: any ) => { const msg = createMessage (receiverId, JSON .stringify (signal), 2 ); chatClient?.send (msg); }); peer.on ("stream" , (stream: any ) => { iceRemoteVideo = document .getElementById ("iceRemoteVideo" ) as HTMLVideoElement ; if (iceRemoteVideo) iceRemoteVideo.srcObject = stream; }); const webcamStream = await navigator.mediaDevices .getUserMedia ({ video : true , audio : true , }); const ms = new MediaStream (); webcamStream.getVideoTracks ().forEach ((track ) => { ms.addTrack (track); }); iceLocalVideo = document .getElementById ("iceLocalVideo" ) as HTMLVideoElement ; if (iceLocalVideo) iceLocalVideo.srcObject = ms; peer.addStream (webcamStream); return () => { if (iceRemoteVideo) iceRemoteVideo.srcObject = null ; if (iceLocalVideo) iceLocalVideo.srcObject = null ; peer.removeStream (webcamStream); webcamStream.getTracks ().forEach ((item ) => item.stop ()); ms.getTracks ().forEach ((item ) => item.stop ()); peer.destroy (); }; } async function onSignal (msg: Message ) { if (!peer) await createPeer (false , msg.senderId ); peer.signal (JSON .parse (msg.content )); }
代码贴上来看看就可以了,比较简单,主要是这篇字数有点太多了,前端就不细讲了。
最后 项目的 github 地址:https://github.com/lvboda/quick-chat
参考资料