编辑
2023-09-22
后端
00
请注意,本文编写于 596 天前,最后修改于 596 天前,其中某些信息可能已经过时。

主题思路

上次简单演示了一下SSE使用,接下来使用SSE来实现一个发布订阅模式,主体逻辑大概如下,服务端负责维护一个主题集合,主题可以由任何人通过POST请求来创建,订阅者发送一个GET请求,包含要订阅的主题名,然后和服务端建立一个SSE连接,开始接收消息,发布者发送一个POST请求,包含主题名和需要发布的消息,服务端收到发布者发布的信息,将这条信息推送给所有订阅了这个主题的人。代码实现也很简单,为了简化实现,我们用gin来实现。

接下来先来定义一下主题

go
type Topic struct { Name string Subscribers map[string]chan<- string }

其中每个订阅者都有一个消息发送通道,消息会先到chan,后面到订阅时候通过轮询队列来发送信息

go
type PubSubServer struct { Topics map[string]*Topic } func NewPubSubServer() *PubSubServer { return &PubSubServer{ Topics: make(map[string]*Topic), } }

服务实例。接下来定义三个路由

go
pubSubServer := NewPubSubServer() router := gin.Default() router.POST("/topic", pubSubServer.createTopicHandler) router.GET("/subto", pubSubServer.subscribeHandler) router.POST("/pushto", pubSubServer.pushToTopicHandler) log.Println("PubSub server started on port 8000") log.Fatal(router.Run(":8000"))

这就是主函数逻辑,在代码中我会尽量使用英文,为了提升英语能力,这三个路由主要是创建主题,订阅主题,发布主题。

接下来是代码主体

go
func (ps *PubSubServer) createTopicHandler(c *gin.Context) { topicName := c.PostForm("TopicName") if topicName == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"}) return } _, exists := ps.Topics[topicName] if exists { c.JSON(http.StatusBadRequest, gin.H{"error": "Topic already exists"}) return } topic := &Topic{ Name: topicName, Subscribers: make(map[string]chan<- string), } ps.Topics[topicName] = topic c.JSON(http.StatusCreated, gin.H{"message": fmt.Sprintf("Topic created: %s", topicName)}) } func (ps *PubSubServer) subscribeHandler(c *gin.Context) { topicName := c.Query("TopicName") if topicName == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"}) return } topic, exists := ps.Topics[topicName] if !exists { c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"}) return } messageChan := make(chan string) topic.Subscribers[c.ClientIP()] = messageChan c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") // Write the response to the HTTP response stream flusher, _ := c.Writer.(http.Flusher) for { select { case message := <-messageChan: // Write message to response stream fmt.Fprintf(c.Writer, "data: %s\n", message) // Refresh the response stream and send data to the client flusher.Flush() case <-c.Writer.CloseNotify(): delete(topic.Subscribers, c.ClientIP()) log.Printf("Subscriber disconnected: %s", c.ClientIP()) return } } } func (ps *PubSubServer) pushToTopicHandler(c *gin.Context) { topicName := c.PostForm("TopicName") if topicName == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"}) return } topic, exists := ps.Topics[topicName] if !exists { c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"}) return } message := c.PostForm("Message") if message == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "Message is required"}) return } for _, subscriber := range topic.Subscribers { subscriber <- message } c.JSON(http.StatusOK, gin.H{"message": "Message pushed to topic"}) }

以上代码都很简单,不多讲解,接下来实现一个简单的基于命令行的客户端,

go
package main import ( "bufio" "fmt" "io" "log" "net/http" "net/url" "os" "strings" ) func main() { reader := bufio.NewReader(os.Stdin) for { fmt.Println("Select an action:") fmt.Println("1. Create a topic") fmt.Println("2. Subscribe to a topic") fmt.Println("3. Push a message to a topic") fmt.Println("0. Exit") input, _ := reader.ReadString('\n') input = strings.TrimSpace(input) switch input { case "1": createTopic(reader) case "2": subscribeToTopic(reader) case "3": pushMessageToTopic(reader) case "0": fmt.Println("Exiting...") return default: fmt.Println("Invalid input. Please try again.") } } } func createTopic(reader *bufio.Reader) { fmt.Print("Enter the topic name: ") topicName, _ := reader.ReadString('\n') topicName = strings.TrimSpace(topicName) data := url.Values{} data.Set("TopicName", topicName) resp, err := http.PostForm("http://localhost:8000/topic", data) if err != nil { fmt.Println("Error creating topic:", err) return } defer resp.Body.Close() if resp.StatusCode == http.StatusCreated { fmt.Println("Topic created successfully") } else { fmt.Println("Error creating topic:", resp.Status) } } func subscribeToTopic(reader *bufio.Reader) { // 获取订阅主题 fmt.Print("Enter the topic name: ") topicName, _ := reader.ReadString('\n') topicName = strings.TrimSpace(topicName) // 构建查询参数 queryParams := url.Values{} queryParams.Set("TopicName", topicName) // 创建 SSE 连接 url := "http://localhost:8000/subto?" + queryParams.Encode() req, err := http.NewRequest("GET", url, nil) if err != nil { log.Fatal("Error creating request:", err) } req.Header.Set("Accept", "text/event-stream") client := &http.Client{} fmt.Println("开始做") resp, err := client.Do(req) fmt.Println("做完了") if err != nil { log.Fatal("Error connecting to server:", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Fatal("Failed to subscribe to topic:", resp.Status) } go readEvents(resp.Body) select {} } func readEvents(body io.Reader) { reader := bufio.NewReader(body) for { line, err := reader.ReadString('\n') if err != nil { log.Fatal("Error reading event:", err) } fmt.Println("Received event:", line) } } func pushMessageToTopic(reader *bufio.Reader) { fmt.Print("Enter the topic name: ") topicName, _ := reader.ReadString('\n') topicName = strings.TrimSpace(topicName) fmt.Print("Enter the message to push: ") message, _ := reader.ReadString('\n') message = strings.TrimSpace(message) data := url.Values{} data.Set("TopicName", topicName) data.Set("Message", message) resp, err := http.PostForm("http://localhost:8000/pushto", data) if err != nil { fmt.Println("Error pushing message to topic:", err) return } defer resp.Body.Close() if resp.StatusCode == http.StatusOK { fmt.Println("Message pushed to topic successfully") } else { fmt.Println("Error pushing message to topic:", resp.Status) } }

我不是很了解gin对于SSE的支持,所以上面推送部分还是用的传统方法,后续了解一下。,这是个简单的发布订阅这系统,也是一个IM当中常见的模型,当高并发的时候,通过维护一个长连接,来维护下行通道,短连接维护上行通道,可以减少资源滥用,通常长连接会选择websocket,但是SSE也未尝不可

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!