-
Notifications
You must be signed in to change notification settings - Fork 0
/
broker.go
128 lines (100 loc) · 2.93 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package gosse
import (
"context"
"encoding/json"
"github.com/go-redis/redis/v8"
"log"
"net/http"
"sync"
"time"
)
type Broker struct {
streams map[string]eventStream // key is stream ID
streamM sync.RWMutex
redisClient *redis.Client
streamFinishedCH chan string // stream ID
}
func NewBroker(redisClient *redis.Client) *Broker {
b := Broker{
streams: make(map[string]eventStream),
redisClient: redisClient,
streamFinishedCH: make(chan string, 1),
}
go b.run()
return &b
}
func (b *Broker) createStreamIfNotExists(streamID string) error {
b.streamM.RLock()
_, ok := b.streams[streamID]
b.streamM.RUnlock()
if ok {
return nil
}
stream := newEventStream(streamID, b.streamFinishedCH, b.redisClient)
b.streamM.Lock()
defer b.streamM.Unlock()
b.streams[stream.id] = stream
return nil
}
func (b *Broker) removeStream(streamID string) error {
b.streamM.Lock()
defer b.streamM.Unlock()
stream := b.streams[streamID]
stream.Done() // exit goroutine
delete(b.streams, streamID)
return nil
}
// We don't need to worry about whether a stream with the passed ID exists. We know it does.
// Because this method is only called internally by an http handler created in our package.
// And that handler only has streamIDs that exist.
func (b *Broker) subscribe(streamID, clientID string) (sseClient, error) {
err := b.createStreamIfNotExists(streamID)
if err != nil {
return sseClient{}, err
}
b.streamM.RLock()
stream := b.streams[streamID]
b.streamM.RUnlock()
c := newClient(clientID, streamID, stream.clientUnsubscribedCH)
stream.Subscribe(c)
return c, nil
}
// Broadcast Will broadcast a message to all connected clients that are subscribed to
// the specified eventStreamID. If you want to exclude a client, so that a client does not
// receive a message, you may pass it's ID for the arg excludeClientID. Even though excludeClientID
// is a variadic argument, only the first string passed will be used.
func (b *Broker) Broadcast(eventStreamID, eventName, eventData string, excludeClientID ...string) error {
var excludedClientID string
if len(excludeClientID) > 0 {
excludedClientID = excludeClientID[0]
}
ev := serverSentEvent{
EventName: eventName,
Data: eventData,
ExcludedClientID: excludedClientID,
}
evJson, err := json.Marshal(&ev)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = b.redisClient.Publish(ctx, redisChannelName(eventStreamID), evJson).Result()
return err
}
// HttpHandler Returns a new http handler that will send events
// to clients for specified event stream ID. stream ID is the query param "stream_id".
func (b *Broker) HttpHandler() http.HandlerFunc {
return newSSEHttpHandler(b)
}
func (b *Broker) run() {
for {
select {
case streamID := <-b.streamFinishedCH:
err := b.removeStream(streamID)
if err != nil {
log.Println(err)
}
}
}
}