forked from Azure/go-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
receiver.go
386 lines (339 loc) · 10.6 KB
/
receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
package amqp
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
type messageDisposition struct {
id uint32
state encoding.DeliveryState
}
// Receiver receives messages on a single AMQP link.
type Receiver struct {
link *link // underlying link
batching bool // enable batching of message dispositions
batchMaxAge time.Duration // maximum time between the start n batch and sending the batch to the server
dispositions chan messageDisposition // message dispositions are sent on this channel when batching is enabled
maxCredit uint32 // maximum allowed inflight messages
inFlight inFlight // used to track message disposition when rcv-settle-mode == second
manualCreditor *manualCreditor // allows for credits to be managed manually (via calls to IssueCredit/DrainCredit)
}
// IssueCredit adds credits to be requested in the next flow
// request.
func (r *Receiver) IssueCredit(credit uint32) error {
return r.link.IssueCredit(credit)
}
// DrainCredit sets the drain flag on the next flow frame and
// waits for the drain to be acknowledged.
func (r *Receiver) DrainCredit(ctx context.Context) error {
return r.link.DrainCredit(ctx)
}
// Prefetched returns the next message that is stored in the Receiver's
// prefetch cache. It does NOT wait for the remote sender to send messages
// and returns immediately if the prefetch cache is empty. To receive from the
// prefetch and wait for messages from the remote Sender use `Receive`.
//
// When using ModeSecond, you *must* take an action on the message by calling
// one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage.
// When using ModeFirst, the message is spontaneously Accepted at reception.
func (r *Receiver) Prefetched(ctx context.Context) (*Message, error) {
if atomic.LoadUint32(&r.link.Paused) == 1 {
select {
case r.link.ReceiverReady <- struct{}{}:
default:
}
}
// non-blocking receive to ensure buffered messages are
// delivered regardless of whether the link has been closed.
select {
case msg := <-r.link.Messages:
debug(3, "Receive() non blocking %d", msg.deliveryID)
msg.link = r.link
return acceptIfModeFirst(ctx, r, &msg)
case <-ctx.Done():
return nil, ctx.Err()
default:
// done draining messages
return nil, nil
}
}
// Receive returns the next message from the sender.
//
// Blocks until a message is received, ctx completes, or an error occurs.
// When using ModeSecond, you *must* take an action on the message by calling
// one of the following: AcceptMessage, RejectMessage, ReleaseMessage, ModifyMessage.
// When using ModeFirst, the message is spontaneously Accepted at reception.
func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
msg, err := r.Prefetched(ctx)
if err != nil || msg != nil {
return msg, err
}
// wait for the next message
select {
case msg := <-r.link.Messages:
debug(3, "Receive() blocking %d", msg.deliveryID)
msg.link = r.link
return acceptIfModeFirst(ctx, r, &msg)
case <-r.link.close:
return nil, r.link.err
case <-r.link.Detached:
return nil, r.link.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
// acceptIfModeFirst auto-accepts a message if we are in mode first, otherwise it no-ops.
func acceptIfModeFirst(ctx context.Context, r *Receiver, msg *Message) (*Message, error) {
// for ModeFirst, auto-accept the message
if receiverSettleModeValue(r.link.ReceiverSettleMode) == ModeSecond {
return msg, nil
}
if err := r.AcceptMessage(ctx, msg); err != nil {
return nil, err
}
return msg, nil
}
// Accept notifies the server that the message has been
// accepted and does not require redelivery.
func (r *Receiver) AcceptMessage(ctx context.Context, msg *Message) error {
if !msg.shouldSendDisposition() {
return nil
}
return r.messageDisposition(ctx, msg, &encoding.StateAccepted{})
}
// Reject notifies the server that the message is invalid.
//
// Rejection error is optional.
func (r *Receiver) RejectMessage(ctx context.Context, msg *Message, e *Error) error {
if !msg.shouldSendDisposition() {
return nil
}
return r.messageDisposition(ctx, msg, &encoding.StateRejected{Error: e})
}
// Release releases the message back to the server. The message
// may be redelivered to this or another consumer.
func (r *Receiver) ReleaseMessage(ctx context.Context, msg *Message) error {
if !msg.shouldSendDisposition() {
return nil
}
return r.messageDisposition(ctx, msg, &encoding.StateReleased{})
}
// Modify notifies the server that the message was not acted upon
// and should be modifed.
//
// deliveryFailed indicates that the server must consider this and
// unsuccessful delivery attempt and increment the delivery count.
//
// undeliverableHere indicates that the server must not redeliver
// the message to this link.
//
// messageAnnotations is an optional annotation map to be merged
// with the existing message annotations, overwriting existing keys
// if necessary.
func (r *Receiver) ModifyMessage(ctx context.Context, msg *Message, deliveryFailed, undeliverableHere bool, messageAnnotations Annotations) error {
if !msg.shouldSendDisposition() {
return nil
}
return r.messageDisposition(ctx,
msg, &encoding.StateModified{
DeliveryFailed: deliveryFailed,
UndeliverableHere: undeliverableHere,
MessageAnnotations: messageAnnotations,
})
}
// Address returns the link's address.
func (r *Receiver) Address() string {
if r.link.Source == nil {
return ""
}
return r.link.Source.Address
}
// LinkName returns associated link name or an empty string if link is not defined.
func (r *Receiver) LinkName() string {
return r.link.Key.name
}
// LinkSourceFilterValue retrieves the specified link source filter value or nil if it doesn't exist.
func (r *Receiver) LinkSourceFilterValue(name string) interface{} {
if r.link.Source == nil {
return nil
}
filter, ok := r.link.Source.Filter[encoding.Symbol(name)]
if !ok {
return nil
}
return filter.Value
}
// Close closes the Receiver and AMQP link.
//
// If ctx expires while waiting for servers response, ctx.Err() will be returned.
// The session will continue to wait for the response until the Session or Client
// is closed.
func (r *Receiver) Close(ctx context.Context) error {
return r.link.Close(ctx)
}
func (r *Receiver) dispositionBatcher() {
// batch operations:
// Keep track of the first and last delivery ID, incrementing as
// Accept() is called. After last-first == batchSize, send disposition.
// If Reject()/Release() is called, send one disposition for previously
// accepted, and one for the rejected/released message. If messages are
// accepted out of order, send any existing batch and the current message.
var (
batchSize = r.maxCredit
batchStarted bool
first uint32
last uint32
)
// create an unstarted timer
batchTimer := time.NewTimer(1 * time.Minute)
batchTimer.Stop()
defer batchTimer.Stop()
for {
select {
case msgDis := <-r.dispositions:
// not accepted or batch out of order
_, isAccept := msgDis.state.(*encoding.StateAccepted)
if !isAccept || (batchStarted && last+1 != msgDis.id) {
// send the current batch, if any
if batchStarted {
lastCopy := last
err := r.sendDisposition(first, &lastCopy, &encoding.StateAccepted{})
if err != nil {
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
}
// send the current message
err := r.sendDisposition(msgDis.id, nil, msgDis.state)
if err != nil {
r.inFlight.remove(msgDis.id, nil, err)
}
continue
}
if batchStarted {
// increment last
last++
} else {
// start new batch
batchStarted = true
first = msgDis.id
last = msgDis.id
batchTimer.Reset(r.batchMaxAge)
}
// send batch if current size == batchSize
if last-first+1 >= batchSize {
lastCopy := last
err := r.sendDisposition(first, &lastCopy, &encoding.StateAccepted{})
if err != nil {
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
if !batchTimer.Stop() {
<-batchTimer.C // batch timer must be drained if stop returns false
}
}
// maxBatchAge elapsed, send batch
case <-batchTimer.C:
lastCopy := last
err := r.sendDisposition(first, &lastCopy, &encoding.StateAccepted{})
if err != nil {
r.inFlight.remove(first, &lastCopy, err)
}
batchStarted = false
batchTimer.Stop()
case <-r.link.Detached:
return
}
}
}
// sendDisposition sends a disposition frame to the peer
func (r *Receiver) sendDisposition(first uint32, last *uint32, state encoding.DeliveryState) error {
fr := &frames.PerformDisposition{
Role: encoding.RoleReceiver,
First: first,
Last: last,
Settled: r.link.ReceiverSettleMode == nil || *r.link.ReceiverSettleMode == ModeFirst,
State: state,
}
debug(1, "TX (sendDisposition): %s", fr)
return r.link.Session.txFrame(fr, nil)
}
func (r *Receiver) messageDisposition(ctx context.Context, msg *Message, state encoding.DeliveryState) error {
var wait chan error
if r.link.ReceiverSettleMode != nil && *r.link.ReceiverSettleMode == ModeSecond {
debug(3, "RX (messageDisposition): add %d to inflight", msg.deliveryID)
wait = r.inFlight.add(msg.deliveryID)
}
if r.batching {
r.dispositions <- messageDisposition{id: msg.deliveryID, state: state}
} else {
err := r.sendDisposition(msg.deliveryID, nil, state)
if err != nil {
return err
}
}
if wait == nil {
return nil
}
select {
case err := <-wait:
// we've received confirmation of disposition
r.link.DeleteUnsettled(msg)
return err
case <-ctx.Done():
return ctx.Err()
}
}
// inFlight tracks in-flight message dispositions allowing receivers
// to block waiting for the server to respond when an appropriate
// settlement mode is configured.
type inFlight struct {
mu sync.RWMutex
m map[uint32]chan error
}
func (f *inFlight) add(id uint32) chan error {
wait := make(chan error, 1)
f.mu.Lock()
if f.m == nil {
f.m = map[uint32]chan error{id: wait}
} else {
f.m[id] = wait
}
f.mu.Unlock()
return wait
}
func (f *inFlight) remove(first uint32, last *uint32, err error) {
f.mu.Lock()
if f.m == nil {
f.mu.Unlock()
return
}
ll := first
if last != nil {
ll = *last
}
for i := first; i <= ll; i++ {
wait, ok := f.m[i]
if ok {
wait <- err
delete(f.m, i)
}
}
f.mu.Unlock()
}
func (f *inFlight) clear(err error) {
f.mu.Lock()
for id, wait := range f.m {
wait <- err
delete(f.m, id)
}
f.mu.Unlock()
}
func (f *inFlight) len() int {
f.mu.RLock()
defer f.mu.RUnlock()
return len(f.m)
}