Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #41 from mailgun/thrawn/reset_behavior
Browse files Browse the repository at this point in the history
Added Behavior RESET_REMAINING
  • Loading branch information
thrawn01 authored Jan 7, 2020
2 parents fd90c50 + 8ab97c3 commit 61d160d
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 93 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
* Allow cache users to invalidate a ratelimit after a specific time
* Changing limit and duration before expire should now work correctly
* Added Behavior RESET_REMAINING to reset any hits recorded in the cache for the
specified rate limit

## Changes
* TokenBucketItem is now provided when `OnChange()` is called instead of `RateLimitResp`
Expand All @@ -17,6 +19,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* Always include reset_time in leaky bucket responses
* Fixed subtle bug during shutdown where PeerClient passed into goroutine could
be out of scope/changed when routine runs
* Behavior is now a flag, this should be a backward compatible change for
anyone using GLOBAL or NO_BATCHING but will break anyone using
DURATION_IS_GREGORIAN. Use `HasBehavior()` function to check for behavior
flags.

## [0.7.1] - 2019-12-10
### Added
Expand Down
26 changes: 21 additions & 5 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,24 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

if ok {
if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
c.Remove(r.HashKey())
if s != nil {
s.Remove(r.HashKey())
}
return &RateLimitResp{
Status: Status_UNDER_LIMIT,
Limit: r.Limit,
Remaining: r.Limit,
ResetTime: 0,
}, nil
}

// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.

t, ok := item.Value.(*TokenBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
Expand Down Expand Up @@ -74,7 +86,7 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
// If the duration config changed, update the new ExpireAt
if t.Duration != r.Duration {
expire := t.CreatedAt + r.Duration
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(time.Now(), r.Duration)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +136,7 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
// Add a new rate limit to the cache
now := MillisecondNow()
expire := now + r.Duration
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
expire, err = GregorianExpiration(time.Now(), r.Duration)
if err != nil {
return nil, err
Expand Down Expand Up @@ -190,13 +202,17 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
return leakyBucket(s, c, r)
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
b.Remaining = r.Limit
}

// Update limit and duration if they changed
b.Limit = r.Limit
b.Duration = r.Duration

duration := r.Duration
rate := duration / r.Limit
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
d, err := GregorianDuration(time.Now(), r.Duration)
if err != nil {
return nil, err
Expand Down Expand Up @@ -273,7 +289,7 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

duration := r.Duration
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
n := time.Now()
expire, err := GregorianExpiration(n, r.Duration)
if err != nil {
Expand Down
72 changes: 72 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,76 @@ func TestChangeLimit(t *testing.T) {
}
}

func TestResetRemaining(t *testing.T) {
client, errs := guber.DialV1Server(cluster.GetPeer())
require.Nil(t, errs)

tests := []struct {
Remaining int64
Algorithm guber.Algorithm
Behavior guber.Behavior
Status guber.Status
Name string
Limit int64
}{
{
Name: "Should subtract 1 from remaining",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Status: guber.Status_UNDER_LIMIT,
Behavior: guber.Behavior_BATCHING,
Remaining: 99,
Limit: 100,
},
{
Name: "Should subtract 2 from remaining",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Status: guber.Status_UNDER_LIMIT,
Behavior: guber.Behavior_BATCHING,
Remaining: 98,
Limit: 100,
},
{
Name: "Should reset the remaining",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Status: guber.Status_UNDER_LIMIT,
Behavior: guber.Behavior_RESET_REMAINING,
Remaining: 100,
Limit: 100,
},
{
Name: "Should subtract 1 from remaining after reset",
Algorithm: guber.Algorithm_TOKEN_BUCKET,
Status: guber.Status_UNDER_LIMIT,
Behavior: guber.Behavior_BATCHING,
Remaining: 99,
Limit: 100,
},
}

for _, tt := range tests {
t.Run(tt.Name, func(t *testing.T) {
resp, err := client.GetRateLimits(context.Background(), &guber.GetRateLimitsReq{
Requests: []*guber.RateLimitReq{
{
Name: "test_reset_remaining",
UniqueKey: "account:1234",
Algorithm: tt.Algorithm,
Duration: guber.Millisecond * 100,
Behavior: tt.Behavior,
Limit: tt.Limit,
Hits: 1,
},
},
})
require.Nil(t, err)

rl := resp.Responses[0]

assert.Equal(t, tt.Status, rl.Status)
assert.Equal(t, tt.Remaining, rl.Remaining)
assert.Equal(t, tt.Limit, rl.Limit)
})
}
}

// TODO: Add a test for sending no rate limits RateLimitReqList.RateLimits = nil
2 changes: 1 addition & 1 deletion global.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (gm *globalManager) updatePeers(updates map[string]*RateLimitReq) {
for _, rl := range updates {
// We are only sending the status of the rate limit so
// we clear the behavior flag so we don't get queued for update again.
rl.Behavior = 0
SetBehavior(&rl.Behavior, Behavior_GLOBAL, false)
rl.Hits = 0

status, err := gm.instance.getRateLimit(rl)
Expand Down
19 changes: 17 additions & 2 deletions gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (s *Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*Get
}
}
} else {
if inOut.In.Behavior == Behavior_GLOBAL {
if HasBehavior(inOut.In.Behavior, Behavior_GLOBAL) {
inOut.Out, err = s.getGlobalRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{Error: err.Error()}
Expand Down Expand Up @@ -293,7 +293,7 @@ func (s *Instance) getRateLimit(r *RateLimitReq) (*RateLimitResp, error) {
s.conf.Cache.Lock()
defer s.conf.Cache.Unlock()

if r.Behavior == Behavior_GLOBAL {
if HasBehavior(r.Behavior, Behavior_GLOBAL) {
s.global.QueueUpdate(r)
}

Expand Down Expand Up @@ -408,3 +408,18 @@ func (s *Instance) Collect(ch chan<- prometheus.Metric) {
ch <- s.global.asyncMetrics
ch <- s.global.broadcastMetrics
}

// HasBehavior returns true if the provided behavior is set
func HasBehavior(b Behavior, flag Behavior) bool {
return b&flag != 0
}

// SetBehavior sets or clears the behavior depending on the boolean `set`
func SetBehavior(b *Behavior, flag Behavior, set bool) {
if set {
*b = *b | flag
} else {
mask := *b ^ flag
*b &= mask
}
}
117 changes: 63 additions & 54 deletions gubernator.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 61d160d

Please sign in to comment.