forked from Azure/go-amqp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
link.go
888 lines (776 loc) · 26.6 KB
/
link.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
package amqp
import (
"bytes"
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/Azure/go-amqp/internal/buffer"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
)
// link is a unidirectional route.
//
// May be used for sending or receiving.
type link struct {
Key linkKey // Name and direction
Handle uint32 // our handle
RemoteHandle uint32 // remote's handle
dynamicAddr bool // request a dynamic link address from the server
RX chan frames.FrameBody // sessions sends frames for this link on this channel
Transfers chan frames.PerformTransfer // sender uses to send transfer frames
closeOnce sync.Once // closeOnce protects close from being closed multiple times
// NOTE: `close` and `detached` BOTH need to be checked to determine if the link
// is not in a "closed" state
// close signals the mux to shutdown. This indicates that `Close()` was called on this link.
close chan struct{}
// detached is closed by mux/muxDetach when the link is fully detached.
// This will be initiated if the service sends back an error or requests the link detach.
Detached chan struct{}
detachErrorMu sync.Mutex // protects detachError
detachError *Error // error to send to remote on detach, set by closeWithError
Session *Session // parent session
receiver *Receiver // allows link options to modify Receiver
Source *frames.Source // used for Receiver links
Target *frames.Target // used for Sender links
properties map[encoding.Symbol]interface{} // additional properties sent upon link attach
// Indicates whether we should allow detaches on disposition errors or not.
// Some AMQP servers (like Event Hubs) benefit from keeping the link open on disposition errors
// (for instance, if you're doing many parallel sends over the same link and you get back a
// throttling error, which is not fatal)
detachOnDispositionError bool
// "The delivery-count is initialized by the sender when a link endpoint is created,
// and is incremented whenever a message is sent. Only the sender MAY independently
// modify this field. The receiver's value is calculated based on the last known
// value from the sender and any subsequent messages received on the link. Note that,
// despite its name, the delivery-count is not a count but a sequence number
// initialized at an arbitrary point by the sender."
deliveryCount uint32
linkCredit uint32 // maximum number of messages allowed between flow updates
SenderSettleMode *SenderSettleMode
ReceiverSettleMode *ReceiverSettleMode
MaxMessageSize uint64
detachReceived bool
err error // err returned on Close()
// message receiving
Paused uint32 // atomically accessed; indicates that all link credits have been used by sender
ReceiverReady chan struct{} // receiver sends on this when mux is paused to indicate it can handle more messages
Messages chan Message // used to send completed messages to receiver
unsettledMessages map[string]struct{} // used to keep track of messages being handled downstream
unsettledMessagesLock sync.RWMutex // lock to protect concurrent access to unsettledMessages
buf buffer.Buffer // buffered bytes for current message
more bool // if true, buf contains a partial message
msg Message // current message being decoded
}
func newLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l := &link{
Key: linkKey{randString(40), encoding.Role(r != nil)},
Session: s,
receiver: r,
close: make(chan struct{}),
Detached: make(chan struct{}),
ReceiverReady: make(chan struct{}, 1),
detachOnDispositionError: true,
}
// configure options
for _, o := range opts {
err := o(l)
if err != nil {
return nil, err
}
}
// sending unsettled messages when the receiver is in mode-second is currently
// broken and causes a hang after sending, so just disallow it for now.
if r == nil && senderSettleModeValue(l.SenderSettleMode) != ModeSettled && receiverSettleModeValue(l.ReceiverSettleMode) == ModeSecond {
return nil, errors.New("sender does not support exactly-once guarantee")
}
return l, nil
}
// attachLink is used by Receiver and Sender to create new links
func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
l, err := newLink(s, r, opts)
if err != nil {
return nil, err
}
isReceiver := r != nil
// buffer rx to linkCredit so that conn.mux won't block
// attempting to send to a slow reader
if isReceiver {
if l.receiver.manualCreditor != nil {
l.RX = make(chan frames.FrameBody, l.receiver.maxCredit)
} else {
l.RX = make(chan frames.FrameBody, l.linkCredit)
}
} else {
l.RX = make(chan frames.FrameBody, 1)
}
// request handle from Session.mux
select {
case <-s.done:
return nil, s.err
case s.allocateHandle <- l:
}
// wait for handle allocation
select {
case <-s.done:
return nil, s.err
case <-l.RX:
}
// check for link request error
if l.err != nil {
return nil, l.err
}
attach := &frames.PerformAttach{
Name: l.Key.name,
Handle: l.Handle,
ReceiverSettleMode: l.ReceiverSettleMode,
SenderSettleMode: l.SenderSettleMode,
MaxMessageSize: l.MaxMessageSize,
Source: l.Source,
Target: l.Target,
Properties: l.properties,
}
if isReceiver {
attach.Role = encoding.RoleReceiver
if attach.Source == nil {
attach.Source = new(frames.Source)
}
attach.Source.Dynamic = l.dynamicAddr
} else {
attach.Role = encoding.RoleSender
if attach.Target == nil {
attach.Target = new(frames.Target)
}
attach.Target.Dynamic = l.dynamicAddr
}
// send Attach frame
debug(1, "TX (attachLink): %s", attach)
_ = s.txFrame(attach, nil)
// wait for response
var fr frames.FrameBody
select {
case <-s.done:
return nil, s.err
case fr = <-l.RX:
}
debug(3, "RX (attachLink): %s", fr)
resp, ok := fr.(*frames.PerformAttach)
if !ok {
return nil, fmt.Errorf("unexpected attach response: %#v", fr)
}
// If the remote encounters an error during the attach it returns an Attach
// with no Source or Target. The remote then sends a Detach with an error.
//
// Note that if the application chooses not to create a terminus, the session
// endpoint will still create a link endpoint and issue an attach indicating
// that the link endpoint has no associated local terminus. In this case, the
// session endpoint MUST immediately detach the newly created link endpoint.
//
// http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144
if resp.Source == nil && resp.Target == nil {
// wait for detach
select {
case <-s.done:
return nil, s.err
case fr = <-l.RX:
}
detach, ok := fr.(*frames.PerformDetach)
if !ok {
return nil, fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)
}
// send return detach
fr = &frames.PerformDetach{
Handle: l.Handle,
Closed: true,
}
debug(1, "TX (attachLink): %s", fr)
_ = s.txFrame(fr, nil)
if detach.Error == nil {
return nil, fmt.Errorf("received detach with no error specified")
}
return nil, detach.Error
}
if l.MaxMessageSize == 0 || resp.MaxMessageSize < l.MaxMessageSize {
l.MaxMessageSize = resp.MaxMessageSize
}
if isReceiver {
if l.Source == nil {
l.Source = new(frames.Source)
}
// if dynamic address requested, copy assigned name to address
if l.dynamicAddr && resp.Source != nil {
l.Source.Address = resp.Source.Address
}
// deliveryCount is a sequence number, must initialize to sender's initial sequence number
l.deliveryCount = resp.InitialDeliveryCount
// buffer receiver so that link.mux doesn't block
l.Messages = make(chan Message, l.receiver.maxCredit)
l.unsettledMessages = map[string]struct{}{}
// copy the received filter values
l.Source.Filter = resp.Source.Filter
} else {
if l.Target == nil {
l.Target = new(frames.Target)
}
// if dynamic address requested, copy assigned name to address
if l.dynamicAddr && resp.Target != nil {
l.Target.Address = resp.Target.Address
}
l.Transfers = make(chan frames.PerformTransfer)
}
err = l.setSettleModes(resp)
if err != nil {
l.muxDetach()
return nil, err
}
go l.mux()
return l, nil
}
func (l *link) addUnsettled(msg *Message) {
l.unsettledMessagesLock.Lock()
l.unsettledMessages[string(msg.DeliveryTag)] = struct{}{}
l.unsettledMessagesLock.Unlock()
}
// DeleteUnsettled removes the message from the map of unsettled messages.
func (l *link) DeleteUnsettled(msg *Message) {
l.unsettledMessagesLock.Lock()
delete(l.unsettledMessages, string(msg.DeliveryTag))
l.unsettledMessagesLock.Unlock()
}
func (l *link) countUnsettled() int {
l.unsettledMessagesLock.RLock()
count := len(l.unsettledMessages)
l.unsettledMessagesLock.RUnlock()
return count
}
// setSettleModes sets the settlement modes based on the resp frames.PerformAttach.
//
// If a settlement mode has been explicitly set locally and it was not honored by the
// server an error is returned.
func (l *link) setSettleModes(resp *frames.PerformAttach) error {
var (
localRecvSettle = receiverSettleModeValue(l.ReceiverSettleMode)
respRecvSettle = receiverSettleModeValue(resp.ReceiverSettleMode)
)
if l.ReceiverSettleMode != nil && localRecvSettle != respRecvSettle {
return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.ReceiverSettleMode, &respRecvSettle)
}
l.ReceiverSettleMode = &respRecvSettle
var (
localSendSettle = senderSettleModeValue(l.SenderSettleMode)
respSendSettle = senderSettleModeValue(resp.SenderSettleMode)
)
if l.SenderSettleMode != nil && localSendSettle != respSendSettle {
return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.SenderSettleMode, &respSendSettle)
}
l.SenderSettleMode = &respSendSettle
return nil
}
// doFlow handles the logical 'flow' event for a link.
// For receivers it will send (if needed) an AMQP flow frame, via `muxFlow`. If a fatal error
// occurs it will be set in `l.err` and 'ok' will be false.
// For senders it will indicate if we should try to send any outgoing transfers (the logical
// equivalent of a flow for a sender) by returning true for 'enableOutgoingTransfers'.
func (l *link) doFlow() (ok bool, enableOutgoingTransfers bool) {
var (
isReceiver = l.receiver != nil
isSender = !isReceiver
)
switch {
// enable outgoing transfers case if sender and credits are available
case isSender && l.linkCredit > 0:
debug(1, "Link Mux isSender: credit: %d, deliveryCount: %d, messages: %d, unsettled: %d", l.linkCredit, l.deliveryCount, len(l.Messages), l.countUnsettled())
return true, true
case isReceiver && l.receiver.manualCreditor != nil:
drain, credits := l.receiver.manualCreditor.FlowBits(l.linkCredit)
if drain || credits > 0 {
debug(1, "FLOW Link Mux (manual): source: %s, inflight: %d, credit: %d, creditsToAdd: %d, drain: %v, deliveryCount: %d, messages: %d, unsettled: %d, maxCredit : %d, settleMode: %s",
l.Source.Address, l.receiver.inFlight.len(), l.linkCredit, credits, drain, l.deliveryCount, len(l.Messages), l.countUnsettled(), l.receiver.maxCredit, l.ReceiverSettleMode.String())
// send a flow frame.
l.err = l.muxFlow(credits, drain)
}
// if receiver && half maxCredits have been processed, send more credits
case isReceiver && l.linkCredit+uint32(l.countUnsettled()) <= l.receiver.maxCredit/2:
debug(1, "FLOW Link Mux half: source: %s, inflight: %d, credit: %d, deliveryCount: %d, messages: %d, unsettled: %d, maxCredit : %d, settleMode: %s", l.Source.Address, l.receiver.inFlight.len(), l.linkCredit, l.deliveryCount, len(l.Messages), l.countUnsettled(), l.receiver.maxCredit, l.ReceiverSettleMode.String())
linkCredit := l.receiver.maxCredit - uint32(l.countUnsettled())
l.err = l.muxFlow(linkCredit, false)
if l.err != nil {
return false, false
}
atomic.StoreUint32(&l.Paused, 0)
case isReceiver && l.linkCredit == 0:
debug(1, "PAUSE Link Mux pause: inflight: %d, credit: %d, deliveryCount: %d, messages: %d, unsettled: %d, maxCredit : %d, settleMode: %s", l.receiver.inFlight.len(), l.linkCredit, l.deliveryCount, len(l.Messages), l.countUnsettled(), l.receiver.maxCredit, l.ReceiverSettleMode.String())
atomic.StoreUint32(&l.Paused, 1)
}
return true, false
}
func (l *link) mux() {
defer l.muxDetach()
Loop:
for {
var outgoingTransfers chan frames.PerformTransfer
ok, enableOutgoingTransfers := l.doFlow()
if !ok {
return
}
if enableOutgoingTransfers {
outgoingTransfers = l.Transfers
}
select {
// received frame
case fr := <-l.RX:
l.err = l.muxHandleFrame(fr)
if l.err != nil {
return
}
// send data
case tr := <-outgoingTransfers:
debug(3, "TX(link): %s", tr)
// Ensure the session mux is not blocked
for {
select {
case l.Session.txTransfer <- &tr:
// decrement link-credit after entire message transferred
if !tr.More {
l.deliveryCount++
l.linkCredit--
// we are the sender and we keep track of the peer's link credit
debug(3, "TX(link): key:%s, decremented linkCredit: %d", l.Key.name, l.linkCredit)
}
continue Loop
case fr := <-l.RX:
l.err = l.muxHandleFrame(fr)
if l.err != nil {
return
}
case <-l.close:
l.err = ErrLinkClosed
return
case <-l.Session.done:
l.err = l.Session.err
return
}
}
case <-l.ReceiverReady:
continue
case <-l.close:
l.err = ErrLinkClosed
return
case <-l.Session.done:
l.err = l.Session.err
return
}
}
}
// muxFlow sends tr to the session mux.
// l.linkCredit will also be updated to `linkCredit`
func (l *link) muxFlow(linkCredit uint32, drain bool) error {
var (
deliveryCount = l.deliveryCount
)
debug(3, "link.muxFlow(): len(l.Messages):%d - linkCredit: %d - deliveryCount: %d, inFlight: %d", len(l.Messages), linkCredit, deliveryCount, l.receiver.inFlight.len())
fr := &frames.PerformFlow{
Handle: &l.Handle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages,
Drain: drain,
}
debug(3, "TX (muxFlow): %s", fr)
// Update credit. This must happen before entering loop below
// because incoming messages handled while waiting to transmit
// flow increment deliveryCount. This causes the credit to become
// out of sync with the server.
if !drain {
// if we're draining we don't want to touch our internal credit - we're not changing it so any issued credits
// are still valid until drain completes, at which point they will be naturally zeroed.
l.linkCredit = linkCredit
}
// Ensure the session mux is not blocked
for {
select {
case l.Session.tx <- fr:
return nil
case fr := <-l.RX:
err := l.muxHandleFrame(fr)
if err != nil {
return err
}
case <-l.close:
return ErrLinkClosed
case <-l.Session.done:
return l.Session.err
}
}
}
func (l *link) muxReceive(fr frames.PerformTransfer) error {
if !l.more {
// this is the first transfer of a message,
// record the delivery ID, message format,
// and delivery Tag
if fr.DeliveryID != nil {
l.msg.deliveryID = *fr.DeliveryID
}
if fr.MessageFormat != nil {
l.msg.Format = *fr.MessageFormat
}
l.msg.DeliveryTag = fr.DeliveryTag
// these fields are required on first transfer of a message
if fr.DeliveryID == nil {
msg := "received message without a delivery-id"
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errors.New(msg)
}
if fr.MessageFormat == nil {
msg := "received message without a message-format"
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errors.New(msg)
}
if fr.DeliveryTag == nil {
msg := "received message without a delivery-tag"
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errors.New(msg)
}
} else {
// this is a continuation of a multipart message
// some fields may be omitted on continuation transfers,
// but if they are included they must be consistent
// with the first.
if fr.DeliveryID != nil && *fr.DeliveryID != l.msg.deliveryID {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent delivery-id: %d != %d",
*fr.DeliveryID, l.msg.deliveryID,
)
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errors.New(msg)
}
if fr.MessageFormat != nil && *fr.MessageFormat != l.msg.Format {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent message-format: %d != %d",
*fr.MessageFormat, l.msg.Format,
)
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errors.New(msg)
}
if fr.DeliveryTag != nil && !bytes.Equal(fr.DeliveryTag, l.msg.DeliveryTag) {
msg := fmt.Sprintf(
"received continuation transfer with inconsistent delivery-tag: %q != %q",
fr.DeliveryTag, l.msg.DeliveryTag,
)
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: msg,
})
return errors.New(msg)
}
}
// discard message if it's been aborted
if fr.Aborted {
l.buf.Reset()
l.msg = Message{}
l.more = false
return nil
}
// ensure maxMessageSize will not be exceeded
if l.MaxMessageSize != 0 && uint64(l.buf.Len())+uint64(len(fr.Payload)) > l.MaxMessageSize {
msg := fmt.Sprintf("received message larger than max size of %d", l.MaxMessageSize)
l.closeWithError(&Error{
Condition: ErrorMessageSizeExceeded,
Description: msg,
})
return errors.New(msg)
}
// add the payload the the buffer
l.buf.Append(fr.Payload)
// mark as settled if at least one frame is settled
l.msg.settled = l.msg.settled || fr.Settled
// save in-progress status
l.more = fr.More
if fr.More {
return nil
}
// last frame in message
err := l.msg.Unmarshal(&l.buf)
if err != nil {
return err
}
debug(1, "deliveryID %d before push to receiver - deliveryCount : %d - linkCredit: %d, len(messages): %d, len(inflight): %d", l.msg.deliveryID, l.deliveryCount, l.linkCredit, len(l.Messages), l.receiver.inFlight.len())
// send to receiver, this should never block due to buffering
// and flow control.
if receiverSettleModeValue(l.ReceiverSettleMode) == ModeSecond {
l.addUnsettled(&l.msg)
}
l.Messages <- l.msg
debug(1, "deliveryID %d after push to receiver - deliveryCount : %d - linkCredit: %d, len(messages): %d, len(inflight): %d", l.msg.deliveryID, l.deliveryCount, l.linkCredit, len(l.Messages), l.receiver.inFlight.len())
// reset progress
l.buf.Reset()
l.msg = Message{}
// decrement link-credit after entire message received
l.deliveryCount++
l.linkCredit--
debug(1, "deliveryID %d before exit - deliveryCount : %d - linkCredit: %d, len(messages): %d", l.msg.deliveryID, l.deliveryCount, l.linkCredit, len(l.Messages))
return nil
}
// DrainCredit will cause a flow frame with 'drain' set to true when
// the next flow frame is sent in 'mux()'.
// Applicable only when manual credit management has been enabled.
func (l *link) DrainCredit(ctx context.Context) error {
if l.receiver == nil || l.receiver.manualCreditor == nil {
return errors.New("drain can only be used with receiver links using manual credit management")
}
// cause mux() to check our flow conditions.
select {
case l.ReceiverReady <- struct{}{}:
default:
}
return l.receiver.manualCreditor.Drain(ctx)
}
// IssueCredit requests additional credits be issued for this link.
// Applicable only when manual credit management has been enabled.
func (l *link) IssueCredit(credit uint32) error {
if l.receiver == nil || l.receiver.manualCreditor == nil {
return errors.New("issueCredit can only be used with receiver links using manual credit management")
}
if err := l.receiver.manualCreditor.IssueCredit(credit); err != nil {
return err
}
// cause mux() to check our flow conditions.
select {
case l.ReceiverReady <- struct{}{}:
default:
}
return nil
}
// muxHandleFrame processes fr based on type.
func (l *link) muxHandleFrame(fr frames.FrameBody) error {
var (
isSender = l.receiver == nil
errOnRejectDisposition = l.detachOnDispositionError && (isSender && (l.ReceiverSettleMode == nil || *l.ReceiverSettleMode == ModeFirst))
)
switch fr := fr.(type) {
// message frame
case *frames.PerformTransfer:
debug(3, "RX (muxHandleFrame): %s", fr)
if isSender {
// Senders should never receive transfer frames, but handle it just in case.
l.closeWithError(&Error{
Condition: ErrorNotAllowed,
Description: "sender cannot process transfer frame",
})
return fmt.Errorf("sender received transfer frame")
}
return l.muxReceive(*fr)
// flow control frame
case *frames.PerformFlow:
debug(3, "RX (muxHandleFrame): %s", fr)
if isSender {
linkCredit := *fr.LinkCredit - l.deliveryCount
if fr.DeliveryCount != nil {
// DeliveryCount can be nil if the receiver hasn't processed
// the attach. That shouldn't be the case here, but it's
// what ActiveMQ does.
linkCredit += *fr.DeliveryCount
}
l.linkCredit = linkCredit
}
if !fr.Echo {
// if the 'drain' flag has been set in the frame sent to the _receiver_ then
// we signal whomever is waiting (the service has seen and acknowledged our drain)
if fr.Drain && l.receiver.manualCreditor != nil {
l.linkCredit = 0 // we have no active credits at this point.
l.receiver.manualCreditor.EndDrain()
}
return nil
}
var (
// copy because sent by pointer below; prevent race
linkCredit = l.linkCredit
deliveryCount = l.deliveryCount
)
// send flow
resp := &frames.PerformFlow{
Handle: &l.Handle,
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
debug(1, "TX (muxHandleFrame): %s", resp)
_ = l.Session.txFrame(resp, nil)
// remote side is closing links
case *frames.PerformDetach:
debug(1, "RX (muxHandleFrame): %s", fr)
// don't currently support link detach and reattach
if !fr.Closed {
return fmt.Errorf("non-closing detach not supported: %+v", fr)
}
// set detach received and close link
l.detachReceived = true
return fmt.Errorf("received detach frame %v", &DetachError{fr.Error})
case *frames.PerformDisposition:
debug(3, "RX (muxHandleFrame): %s", fr)
// Unblock receivers waiting for message disposition
if l.receiver != nil {
// bubble disposition error up to the receiver
var dispositionError error
if state, ok := fr.State.(*encoding.StateRejected); ok {
// state.Error isn't required to be filled out. For instance if you dead letter a message
// you will get a rejected response that doesn't contain an error.
if state.Error != nil {
dispositionError = state.Error
}
}
// removal from the in-flight map will also remove the message from the unsettled map
l.receiver.inFlight.remove(fr.First, fr.Last, dispositionError)
}
// If sending async and a message is rejected, cause a link error.
//
// This isn't ideal, but there isn't a clear better way to handle it.
if fr, ok := fr.State.(*encoding.StateRejected); ok && errOnRejectDisposition {
return fr.Error
}
if fr.Settled {
return nil
}
resp := &frames.PerformDisposition{
Role: encoding.RoleSender,
First: fr.First,
Last: fr.Last,
Settled: true,
}
debug(1, "TX (muxHandleFrame): %s", resp)
_ = l.Session.txFrame(resp, nil)
default:
// TODO: evaluate
debug(1, "muxHandleFrame: unexpected frame: %s\n", fr)
}
return nil
}
// Check checks the link state, returning an error if the link is closed (ErrLinkClosed) or if
// it is in a detached state (ErrLinkDetached)
func (l *link) Check() error {
select {
case <-l.Detached:
return ErrLinkDetached
case <-l.close:
return ErrLinkClosed
default:
return nil
}
}
// close closes and requests deletion of the link.
//
// No operations on link are valid after close.
//
// If ctx expires while waiting for servers response, ctx.Err() will be returned.
// The session will continue to wait for the response until the Session or Client
// is closed.
func (l *link) Close(ctx context.Context) error {
l.closeOnce.Do(func() { close(l.close) })
select {
case <-l.Detached:
case <-ctx.Done():
return ctx.Err()
}
if l.err == ErrLinkClosed {
return nil
}
return l.err
}
func (l *link) closeWithError(de *Error) {
l.closeOnce.Do(func() {
l.detachErrorMu.Lock()
l.detachError = de
l.detachErrorMu.Unlock()
close(l.close)
})
}
func (l *link) muxDetach() {
defer func() {
// final cleanup and signaling
// deallocate handle
select {
case l.Session.deallocateHandle <- l:
case <-l.Session.done:
if l.err == nil {
l.err = l.Session.err
}
}
// signal other goroutines that link is detached
close(l.Detached)
// unblock any in flight message dispositions
if l.receiver != nil {
l.receiver.inFlight.clear(l.err)
}
}()
// "A peer closes a link by sending the detach frame with the
// handle for the specified link, and the closed flag set to
// true. The partner will destroy the corresponding link
// endpoint, and reply with its own detach frame with the
// closed flag set to true.
//
// Note that one peer MAY send a closing detach while its
// partner is sending a non-closing detach. In this case,
// the partner MUST signal that it has closed the link by
// reattaching and then sending a closing detach."
l.detachErrorMu.Lock()
detachError := l.detachError
l.detachErrorMu.Unlock()
fr := &frames.PerformDetach{
Handle: l.Handle,
Closed: true,
Error: detachError,
}
Loop:
for {
select {
case l.Session.tx <- fr:
// after sending the detach frame, break the read loop
break Loop
case fr := <-l.RX:
// discard incoming frames to avoid blocking session.mux
if fr, ok := fr.(*frames.PerformDetach); ok && fr.Closed {
l.detachReceived = true
}
case <-l.Session.done:
if l.err == nil {
l.err = l.Session.err
}
return
}
}
// don't wait for remote to detach when already
// received or closing due to error
if l.detachReceived || detachError != nil {
return
}
for {
select {
// read from link until detach with Close == true is received,
// other frames are discarded.
case fr := <-l.RX:
if fr, ok := fr.(*frames.PerformDetach); ok && fr.Closed {
return
}
// connection has ended
case <-l.Session.done:
if l.err == nil {
l.err = l.Session.err
}
return
}
}
}