您现在的位置是:亿华云 > IT科技
构建一个即时消息应用(五):实时消息
亿华云2025-10-09 01:20:59【IT科技】0人已围观
简介本文是该系列的第五篇。第一篇: 模式 第二篇: OAuth 第三篇: 对话 第四篇: 消息 对于实时消息,我们将使用服务器发送事件Server-
本文是构建个即该系列的第五篇。
第一篇: 模式 第二篇: OAuth 第三篇: 对话 第四篇: 消息对于实时消息,时消时消我们将使用 服务器发送事件Server-Sent Events。息应息这是用实一个打开的连接,我们可以在其中传输数据流。构建个即我们会有个端点,时消时消用户会在其中订阅发送给他的息应息所有消息。
消息户端
在 HTTP 部分之前,用实让我们先编写一个映射map ,构建个即让所有客户端都监听消息。时消时消 像这样全局初始化:
type MessageClient struct { Messages chan Message UserID string } var messageClients sync.Map已创建的息应息新消息
还记得在 上一篇文章 中,当我们创建这条消息时,用实我们留下了一个 “TODO” 注释。构建个即在那里,时消时消我们将使用这个函数来调度一个 goroutine。息应息
go messageCreated(message)把这行代码插入到我们留注释的位置。
func messageCreated(message Message) error { if err := db.QueryRow(` SELECT user_id FROM participants WHERE user_id != $1 and conversation_id = $2 `, message.UserID, message.ConversationID). Scan(&message.ReceiverID); err != nil { return err } go broadcastMessage(message) return nil } func broadcastMessage(message Message) { messageClients.Range(func(key, _ interface{ }) bool { client := key.(*MessageClient) if client.UserID == message.ReceiverID { client.Messages <- message } return true }) }该函数查询接收者 ID(其他参与者 ID),并将消息发送给所有客户端。
订阅消息
让我们转到 main() 函数并添加以下路由:
router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))此端点处理 /api/messages 上的云服务器 GET 请求。请求应该是一个 EventSource 连接。它用一个事件流响应,其中的数据是 JSON 格式的。
func subscribeToMessages(w http.ResponseWriter, r *http.Request) { if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") { http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable) return } f, ok := w.(http.Flusher) if !ok { respondError(w, errors.New("streaming unsupported")) return } ctx := r.Context() authUserID := ctx.Value(keyAuthUserID).(string) h := w.Header() h.Set("Cache-Control", "no-cache") h.Set("Connection", "keep-alive") h.Set("Content-Type", "text/event-stream") messages := make(chan Message) defer close(messages) client := &MessageClient{ Messages: messages, UserID: authUserID} messageClients.Store(client, nil) defer messageClients.Delete(client) for { select { case <-ctx.Done(): return case message := <-messages: if b, err := json.Marshal(message); err != nil { log.Printf("could not marshall message: %v\n", err) fmt.Fprintf(w, "event: error\ndata: %v\n\n", err) } else { fmt.Fprintf(w, "data: %s\n\n", b) } f.Flush() } } }首先,它检查请求头是否正确,并检查服务器是否支持流式传输。我们创建一个消息通道,用它来构建一个客户端,并将其存储在客户端映射中。每当创建新消息时,它都会进入这个通道,因此我们可以通过 for-select 循环从中读取。
服务器发送事件Server-Sent Events使用以下格式发送数据:
data: some data here\n\n我们以 JSON 格式发送:
data: { "foo":"bar"}\n\n我们使用 fmt.Fprintf() 以这种格式写入响应写入器writter,并在循环的每次迭代中刷新数据。
这个循环会一直运行,直到使用请求上下文关闭连接为止。我们延迟了通道的企商汇关闭和客户端的删除,因此,当循环结束时,通道将被关闭,客户端不会收到更多的消息。
注意,服务器发送事件Server-Sent Events(EventSource)的 JavaScript API 不支持设置自定义请求头😒,所以我们不能设置 Authorization: Bearer <token>。这就是为什么 guard() 中间件也会从 URL 查询字符串中读取令牌的原因。
实时消息部分到此结束。我想说的是,这就是后端的全部内容。但是为了编写前端代码,我将再增加一个登录端点:一个仅用于开发的登录。亿华云
源代码很赞哦!(116)
相关文章
- 只要我们做的是从目前的市场情况选择域名,从简单易记,从个性特征上,我们就可以找到一个好域名进行注册。域名注册进行域名记录和解析以及绑定网站后,客户可以通过URL登录您的网站。
- 鸿蒙HarmonyOS三方件开发指南(8)-RoundedImage
- 应用部署架构:如何降低云网络时延?
- 前端的批量接口如何快速响应?有没有通用解决方案?
- 个人域名转为公司需要什么条件?个人域名转为公司该怎么做?
- Dubbo 同步调用太慢,也许你可以试试异步处理
- 利用 Python 爬取网站的新手指南
- 如何在 C# 8 中使用 Index 和 Range
- 小白注册网站域名该怎么办?有什么步骤?
- 基础篇:Java.Security框架之签名、加密、摘要及证书