我们来写一个简单的SSE服务端,SSE全名Server-Send-Event
服务端五秒钟向客户端推送一条消息,SSE建立连接主要有接下来几个过程
Content-Type
为text/event-stream
keeplive
代码如下
go
// 设置响应头,指定内容类型为text/event-stream
w.Header().Set("Content-Type", "text/event-stream")
// 设置连接保持活跃,以便服务器可以持续向客户端发送事件
w.Header().Set("Connection", "keep-alive")
// 禁用缓存
w.Header().Set("Cache-Control", "no-cache")
接下来就是用GET
请求进行握手,像WebSocket
一样,中间要检查一次客户端是否支持SSE
,因为它是新标准,按理说主流浏览器都已支持,握手之后,客户端和服务端就会保持一个长连接,服务端可以主动给客户端推送信息,但是客户端不能像服务端推送信息,他只能收信息。
gopackage main
import (
"fmt"
"net/http"
"time"
)
func main() {
http.HandleFunc("/events", eventHandler)
http.ListenAndServe(":8080", nil)
}
func eventHandler(w http.ResponseWriter, r *http.Request) {
// 设置响应头,指定内容类型为text/event-stream
w.Header().Set("Content-Type", "text/event-stream")
// 设置连接保持活跃,以便服务器可以持续向客户端发送事件
w.Header().Set("Connection", "keep-alive")
// 禁用缓存
w.Header().Set("Cache-Control", "no-cache")
// 将响应写入到HTTP响应流
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// 模拟每5秒向客户端推送一条消息
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 生成要发送的消息
message := fmt.Sprintf("Message received at: %s\n", time.Now().Format("2006-01-02 15:04:05"))
// 将消息写入响应流
fmt.Fprintf(w, "data: %s\n\n", message)
// 刷新响应流,将数据发送给客户端
flusher.Flush()
case <-r.Context().Done():
// 如果客户端关闭连接,则终止循环
return
}
}
}
有一些IM系统利用SSE来进行和客户端的长连接,负责消息下行通道,,上行通道有一个维持一两分钟的短连接来进行,这样可以减少资源的占用,主流做法是上行通道和下行通道各维持一个长链接(通常是websockt
),对于高并发的场景,可能很浪费资源。
客户端也很简单,就是发送一个GET
请求握手,之后在一个死循环里等待消息到来。
gopackage main
import (
"bufio"
"fmt"
"log"
"net/http"
"strings"
"time"
)
func main() {
client := &http.Client{}
req, err := http.NewRequest("GET", "http://localhost:8080/events", nil)
if err != nil {
log.Fatal("Failed to create request:", err)
}
req.Header.Set("Accept", "text/event-stream")
resp, err := client.Do(req)
if err != nil {
log.Fatal("Failed to connect to SSE server:", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Fatal("SSE server returned an error:", resp.Status)
}
reader := bufio.NewReader(resp.Body)
for {
event, err := readSSEEvent(reader)
if err != nil {
log.Fatal("Failed to read SSE event:", err)
}
fmt.Println(event.Data)
time.Sleep(2 * time.Second)
}
}
func readSSEEvent(reader *bufio.Reader) (*Event, error) {
event := &Event{}
for {
line, err := reader.ReadString('\n')
if err != nil {
return nil, err
}
line = strings.TrimSuffix(line, "\n")
if line == "" {
break
}
event.Data = line
}
return event, nil
}
type Event struct {
Data string
}
这就是全部代码。接下来基于SSE
实现一个发布订阅系统。
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!