Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #54 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Fixed race conditions in Peer.Shutdown() and globalManager
  • Loading branch information
thrawn01 authored May 11, 2020
2 parents 5ccb6b4 + 9932701 commit 171909a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 10 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ VERSION=$(shell cat version)

LDFLAGS="-X main.Version=$(VERSION)"

test:
go test ./... -race -count=1

docker:
docker build --build-arg VERSION=$(VERSION) -t thrawn01/gubernator:$(VERSION) .
docker tag thrawn01/gubernator:$(VERSION) thrawn01/gubernator:latest
Expand Down
6 changes: 4 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,15 @@ func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) {
var req UpdatePeerGlobalsReq
start := time.Now()

for _, rl := range updates {
for _, r := range updates {
// Copy the original since we removing the GLOBAL behavior
rl := *r
// We are only sending the status of the rate limit so
// we clear the behavior flag so we don't get queued for update again.
SetBehavior(&rl.Behavior, Behavior_GLOBAL, false)
rl.Hits = 0

status, err := gm.instance.getRateLimit(rl)
status, err := gm.instance.getRateLimit(&rl)
if err != nil {
gm.log.WithError(err).Errorf("while sending global updates to peers for: '%s'", rl.HashKey())
continue
Expand Down
2 changes: 1 addition & 1 deletion interval.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func NewInterval(d time.Duration) *Interval {
C: make(chan struct{}, 1),
in: make(chan struct{}),
}
go i.run(d)
i.run(d)
return &i
}

Expand Down
20 changes: 13 additions & 7 deletions peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ func (c *PeerClient) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits
c.mutex.RUnlock()
return nil, ErrClosing
}
c.mutex.RUnlock()

// NOTE: This must be done within the RLock since calling Wait() in Shutdown() causes
// a race condition if called within a separate go routine if the internal wg is `0`
// when Wait() is called then Add(1) is called concurrently.
c.wg.Add(1)
defer c.wg.Done()

c.mutex.RUnlock()

resp, err := c.client.GetPeerRateLimits(ctx, r)
if err != nil {
return nil, err
Expand All @@ -123,11 +127,13 @@ func (c *PeerClient) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals
c.mutex.RUnlock()
return nil, ErrClosing
}
c.mutex.RUnlock()

// See NOTE above about RLock and wg.Add(1)
c.wg.Add(1)
defer c.wg.Done()

c.mutex.RUnlock()

return c.client.UpdatePeerGlobals(ctx, r)
}

Expand All @@ -143,12 +149,13 @@ func (c *PeerClient) getPeerRateLimitsBatch(ctx context.Context, r *RateLimitReq
// Enqueue the request to be sent
c.queue <- &req

// Unlock to prevent the chan from being closed
c.mutex.RUnlock()

// See NOTE above about RLock and wg.Add(1)
c.wg.Add(1)
defer c.wg.Done()

// Unlock to prevent the chan from being closed
c.mutex.RUnlock()

// Wait for a response or context cancel
select {
case resp := <-req.resp:
Expand Down Expand Up @@ -260,13 +267,12 @@ func (c *PeerClient) Shutdown(ctx context.Context) error {
c.mutex.Unlock()
return nil
}
defer c.mutex.Unlock()

c.isClosing = true
// We need to close the chan here to prevent a possible race
close(c.queue)

c.mutex.Unlock()

defer func() {
if c.conn != nil {
c.conn.Close()
Expand Down

0 comments on commit 171909a

Please sign in to comment.