-
Notifications
You must be signed in to change notification settings - Fork 1
/
topic.go
144 lines (127 loc) · 3.23 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package redispubsub
import (
"context"
"encoding/json"
"errors"
"github.com/go-redis/redis/v9"
"gocloud.dev/gcerrors"
"gocloud.dev/pubsub"
"gocloud.dev/pubsub/batcher"
"gocloud.dev/pubsub/driver"
)
var sendBatcherOpts = &batcher.Options{
MaxBatchSize: 100,
MaxHandlers: 100, // max concurrency for sends
}
var errNotInitialized = errors.New("redispubsub: topic not initialized")
type topic struct {
producer *redis.Client
topicName string
opts TopicOptions
}
// TopicOptions contains configuration options for topics.
type TopicOptions struct {
// BatcherOptions adds constraints to the default batching done for sends.
BatcherOptions batcher.Options
MaxLen int64
}
// OpenTopic creates a pubsub.Topic that sends to a Redis topic.
func OpenTopic(broker *redis.Client, topicName string, opts *TopicOptions) (*pubsub.Topic, error) {
dt, err := openTopic(broker, topicName, opts)
if err != nil {
return nil, err
}
bo := sendBatcherOpts.NewMergedOptions(&opts.BatcherOptions)
return pubsub.NewTopic(dt, bo), nil
}
// openTopic returns the driver for OpenTopic. This function exists so the test
// harness can get the driver interface implementation if it needs to.
func openTopic(broker *redis.Client, topicName string, opts *TopicOptions) (driver.Topic, error) {
if opts == nil {
opts = &TopicOptions{}
}
return &topic{producer: broker, topicName: topicName, opts: *opts}, nil
}
// SendBatch implements driver.Topic.SendBatch.
func (t *topic) SendBatch(ctx context.Context, dms []*driver.Message) error {
if t == nil || t.producer == nil {
return errNotInitialized
}
// Convert the messages to a slice of redis.XAddArgs.
for _, dm := range dms {
bm, err := json.Marshal(dm.Metadata)
if err != nil {
return err
}
msg := map[string]interface{}{
"headers": bm,
"body": dm.Body,
}
args := &redis.XAddArgs{
Stream: t.topicName,
MaxLen: t.opts.MaxLen,
Values: msg,
}
if dm.BeforeSend != nil {
asFunc := func(i interface{}) bool {
if p, ok := i.(**redis.XAddArgs); ok {
*p = args
return true
}
return false
}
if e := dm.BeforeSend(asFunc); e != nil {
return e
}
}
res, err := t.producer.XAdd(context.Background(), args).Result()
if err != nil {
return err
}
if dm.AfterSend != nil {
asFunc := func(i interface{}) bool {
if p, ok := i.(*string); ok {
*p = res
return true
}
return false
}
if err := dm.AfterSend(asFunc); err != nil {
return err
}
}
}
return nil
}
// Close implements io.Closer.
func (t *topic) Close() error {
return nil
}
// IsRetryable implements driver.Topic.IsRetryable.
func (t *topic) IsRetryable(error) bool {
return false
}
// As implements driver.Topic.As.
func (t *topic) As(i interface{}) bool {
if p, ok := i.(**redis.Client); ok {
*p = t.producer
return true
}
return false
}
// ErrorAs implements driver.Topic.ErrorAs.
func (t *topic) ErrorAs(err error, i interface{}) bool {
return errorAs(err, i)
}
// ErrorCode implements driver.Topic.ErrorCode.
func (t *topic) ErrorCode(err error) gcerrors.ErrorCode {
switch err {
case nil:
return gcerrors.OK
case context.Canceled:
return gcerrors.Canceled
case errNotInitialized:
return gcerrors.NotFound
}
return gcerrors.Unknown
}