编辑
2023-09-21
后端
00
请注意,本文编写于 597 天前,最后修改于 597 天前,其中某些信息可能已经过时。

我们来写一个简单的SSE服务端,SSE全名Server-Send-Event服务端五秒钟向客户端推送一条消息,SSE建立连接主要有接下来几个过程

  • 设置Content-Typetext/event-stream
  • s设置keeplive
  • 禁用浏览器缓存

代码如下

go
// 设置响应头,指定内容类型为text/event-stream w.Header().Set("Content-Type", "text/event-stream") // 设置连接保持活跃,以便服务器可以持续向客户端发送事件 w.Header().Set("Connection", "keep-alive") // 禁用缓存 w.Header().Set("Cache-Control", "no-cache")

接下来就是用GET请求进行握手,像WebSocket一样,中间要检查一次客户端是否支持SSE,因为它是新标准,按理说主流浏览器都已支持,握手之后,客户端和服务端就会保持一个长连接,服务端可以主动给客户端推送信息,但是客户端不能像服务端推送信息,他只能收信息。

go
package main import ( "fmt" "net/http" "time" ) func main() { http.HandleFunc("/events", eventHandler) http.ListenAndServe(":8080", nil) } func eventHandler(w http.ResponseWriter, r *http.Request) { // 设置响应头,指定内容类型为text/event-stream w.Header().Set("Content-Type", "text/event-stream") // 设置连接保持活跃,以便服务器可以持续向客户端发送事件 w.Header().Set("Connection", "keep-alive") // 禁用缓存 w.Header().Set("Cache-Control", "no-cache") // 将响应写入到HTTP响应流 flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } // 模拟每5秒向客户端推送一条消息 ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: // 生成要发送的消息 message := fmt.Sprintf("Message received at: %s\n", time.Now().Format("2006-01-02 15:04:05")) // 将消息写入响应流 fmt.Fprintf(w, "data: %s\n\n", message) // 刷新响应流,将数据发送给客户端 flusher.Flush() case <-r.Context().Done(): // 如果客户端关闭连接,则终止循环 return } } }

有一些IM系统利用SSE来进行和客户端的长连接,负责消息下行通道,,上行通道有一个维持一两分钟的短连接来进行,这样可以减少资源的占用,主流做法是上行通道和下行通道各维持一个长链接(通常是websockt),对于高并发的场景,可能很浪费资源。

客户端也很简单,就是发送一个GET请求握手,之后在一个死循环里等待消息到来。

go
package main import ( "bufio" "fmt" "log" "net/http" "strings" "time" ) func main() { client := &http.Client{} req, err := http.NewRequest("GET", "http://localhost:8080/events", nil) if err != nil { log.Fatal("Failed to create request:", err) } req.Header.Set("Accept", "text/event-stream") resp, err := client.Do(req) if err != nil { log.Fatal("Failed to connect to SSE server:", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Fatal("SSE server returned an error:", resp.Status) } reader := bufio.NewReader(resp.Body) for { event, err := readSSEEvent(reader) if err != nil { log.Fatal("Failed to read SSE event:", err) } fmt.Println(event.Data) time.Sleep(2 * time.Second) } } func readSSEEvent(reader *bufio.Reader) (*Event, error) { event := &Event{} for { line, err := reader.ReadString('\n') if err != nil { return nil, err } line = strings.TrimSuffix(line, "\n") if line == "" { break } event.Data = line } return event, nil } type Event struct { Data string }

这就是全部代码。接下来基于SSE实现一个发布订阅系统。

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!