主题思路
上次简单演示了一下SSE
使用,接下来使用SSE
来实现一个发布订阅模式,主体逻辑大概如下,服务端负责维护一个主题集合,主题可以由任何人通过POST
请求来创建,订阅者发送一个GET
请求,包含要订阅的主题名,然后和服务端建立一个SSE
连接,开始接收消息,发布者发送一个POST
请求,包含主题名和需要发布的消息,服务端收到发布者发布的信息,将这条信息推送给所有订阅了这个主题的人。代码实现也很简单,为了简化实现,我们用gin
来实现。
接下来先来定义一下主题
gotype Topic struct {
Name string
Subscribers map[string]chan<- string
}
其中每个订阅者都有一个消息发送通道,消息会先到chan
,后面到订阅时候通过轮询队列来发送信息
gotype PubSubServer struct {
Topics map[string]*Topic
}
func NewPubSubServer() *PubSubServer {
return &PubSubServer{
Topics: make(map[string]*Topic),
}
}
服务实例。接下来定义三个路由
gopubSubServer := NewPubSubServer()
router := gin.Default()
router.POST("/topic", pubSubServer.createTopicHandler)
router.GET("/subto", pubSubServer.subscribeHandler)
router.POST("/pushto", pubSubServer.pushToTopicHandler)
log.Println("PubSub server started on port 8000")
log.Fatal(router.Run(":8000"))
这就是主函数逻辑,在代码中我会尽量使用英文,为了提升英语能力,这三个路由主要是创建主题,订阅主题,发布主题。
接下来是代码主体
gofunc (ps *PubSubServer) createTopicHandler(c *gin.Context) {
topicName := c.PostForm("TopicName")
if topicName == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"})
return
}
_, exists := ps.Topics[topicName]
if exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "Topic already exists"})
return
}
topic := &Topic{
Name: topicName,
Subscribers: make(map[string]chan<- string),
}
ps.Topics[topicName] = topic
c.JSON(http.StatusCreated, gin.H{"message": fmt.Sprintf("Topic created: %s", topicName)})
}
func (ps *PubSubServer) subscribeHandler(c *gin.Context) {
topicName := c.Query("TopicName")
if topicName == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"})
return
}
topic, exists := ps.Topics[topicName]
if !exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"})
return
}
messageChan := make(chan string)
topic.Subscribers[c.ClientIP()] = messageChan
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// Write the response to the HTTP response stream
flusher, _ := c.Writer.(http.Flusher)
for {
select {
case message := <-messageChan:
// Write message to response stream
fmt.Fprintf(c.Writer, "data: %s\n", message)
// Refresh the response stream and send data to the client
flusher.Flush()
case <-c.Writer.CloseNotify():
delete(topic.Subscribers, c.ClientIP())
log.Printf("Subscriber disconnected: %s", c.ClientIP())
return
}
}
}
func (ps *PubSubServer) pushToTopicHandler(c *gin.Context) {
topicName := c.PostForm("TopicName")
if topicName == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "TopicName is required"})
return
}
topic, exists := ps.Topics[topicName]
if !exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "Topic does not exist"})
return
}
message := c.PostForm("Message")
if message == "" {
c.JSON(http.StatusBadRequest, gin.H{"error": "Message is required"})
return
}
for _, subscriber := range topic.Subscribers {
subscriber <- message
}
c.JSON(http.StatusOK, gin.H{"message": "Message pushed to topic"})
}
以上代码都很简单,不多讲解,接下来实现一个简单的基于命令行的客户端,
gopackage main
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"strings"
)
func main() {
reader := bufio.NewReader(os.Stdin)
for {
fmt.Println("Select an action:")
fmt.Println("1. Create a topic")
fmt.Println("2. Subscribe to a topic")
fmt.Println("3. Push a message to a topic")
fmt.Println("0. Exit")
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
switch input {
case "1":
createTopic(reader)
case "2":
subscribeToTopic(reader)
case "3":
pushMessageToTopic(reader)
case "0":
fmt.Println("Exiting...")
return
default:
fmt.Println("Invalid input. Please try again.")
}
}
}
func createTopic(reader *bufio.Reader) {
fmt.Print("Enter the topic name: ")
topicName, _ := reader.ReadString('\n')
topicName = strings.TrimSpace(topicName)
data := url.Values{}
data.Set("TopicName", topicName)
resp, err := http.PostForm("http://localhost:8000/topic", data)
if err != nil {
fmt.Println("Error creating topic:", err)
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusCreated {
fmt.Println("Topic created successfully")
} else {
fmt.Println("Error creating topic:", resp.Status)
}
}
func subscribeToTopic(reader *bufio.Reader) {
// 获取订阅主题
fmt.Print("Enter the topic name: ")
topicName, _ := reader.ReadString('\n')
topicName = strings.TrimSpace(topicName)
// 构建查询参数
queryParams := url.Values{}
queryParams.Set("TopicName", topicName)
// 创建 SSE 连接
url := "http://localhost:8000/subto?" + queryParams.Encode()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal("Error creating request:", err)
}
req.Header.Set("Accept", "text/event-stream")
client := &http.Client{}
fmt.Println("开始做")
resp, err := client.Do(req)
fmt.Println("做完了")
if err != nil {
log.Fatal("Error connecting to server:", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
log.Fatal("Failed to subscribe to topic:", resp.Status)
}
go readEvents(resp.Body)
select {}
}
func readEvents(body io.Reader) {
reader := bufio.NewReader(body)
for {
line, err := reader.ReadString('\n')
if err != nil {
log.Fatal("Error reading event:", err)
}
fmt.Println("Received event:", line)
}
}
func pushMessageToTopic(reader *bufio.Reader) {
fmt.Print("Enter the topic name: ")
topicName, _ := reader.ReadString('\n')
topicName = strings.TrimSpace(topicName)
fmt.Print("Enter the message to push: ")
message, _ := reader.ReadString('\n')
message = strings.TrimSpace(message)
data := url.Values{}
data.Set("TopicName", topicName)
data.Set("Message", message)
resp, err := http.PostForm("http://localhost:8000/pushto", data)
if err != nil {
fmt.Println("Error pushing message to topic:", err)
return
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
fmt.Println("Message pushed to topic successfully")
} else {
fmt.Println("Error pushing message to topic:", resp.Status)
}
}
我不是很了解gin
对于SSE的支持,所以上面推送部分还是用的传统方法,后续了解一下。,这是个简单的发布订阅这系统,也是一个IM当中常见的模型,当高并发的时候,通过维护一个长连接,来维护下行通道,短连接维护上行通道,可以减少资源滥用,通常长连接会选择websocket
,但是SSE也未尝不可
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!