Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #33 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
PIP-675: Add support for persistent store
  • Loading branch information
thrawn01 authored Dec 10, 2019
2 parents 2708066 + 77af7dd commit d34c9d9
Show file tree
Hide file tree
Showing 18 changed files with 838 additions and 345 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.7.0] - 2019-12-10
### Added
* Added `Loader` interface for only loading and saving at startup and shutdown
* Added `Store` interface for continuous synchronization between the persistent store and the cache.
### Changed
* Moved `cache.Cache` into the `gubernator` package
* Changed the `Cache` interface to use `CacheItem` for storing and retrieving cached items.

## [0.6.0] - 2019-11-12
### Added
* DURATION_IS_GREGORIAN behavior to support interval based ratelimit durations
Expand Down
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,40 @@ Examples when using `Behavior = DURATION_IS_GREGORIAN`
* If `Duration = 0` (Minutes) then the rate limit will reset to `Current = 0` at the end of the minute the rate limit was created.
* If `Duration = 4` (Months) then the rate limit will reset to `Current = 0` at the end of the month the rate limit was created.

## Gubernator as a library
If you are using golang, you can use Gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
on top. We do this internally here at mailgun with a service we creatively
called `ratelimits` which keeps track of the limits imposed on a per account
basis. In this way you can utilize the power and speed of Gubernator but still
layer business logic and integrate domain specific problems into your rate
limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
Gubernator server would. All you need to do is provide the GRPC server instance
and tell Gubernator where the peers in your cluster are located. The
`cmd/gubernator/main.go` is a great example of how to use Gubernator as a
library.

### Optional Disk Persistence
While the Gubernator server currently doesn't directly support disk
persistence, the Gubernator library does provide interfaces through which
library users can implement persistence. The Gubernator library has two
interfaces available for disk persistence. Depending on the use case an
implementor can implement the [Loader](/store.go) interface and only support persistence
of rate limits at startup and shutdown, or users can implement the [Store](/store.go)
interface and Gubernator will continuously call `OnChange()` and `Get()` to
keep the in memory cache and persistent store up to date with the latest rate
limit data. Both interfaces *can* be implemented simultaneously to ensure data
is always saved to persistent storage.

For those who choose to implement the `Store` interface, it is not required to
store ALL the rate limits received via `OnChange()`. For instance; If you wish
to support rate limit durations longer than a minute, day or month, calls to
`OnChange()` can check the duration of a rate limit and decide to only persist
those rate limits that have durations over a self determined limit.

### API
All methods are accessed via GRPC but are also exposed via HTTP using the
[GRPC Gateway](https://github.com/grpc-ecosystem/grpc-gateway)
Expand Down
127 changes: 86 additions & 41 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,52 @@ limitations under the License.
package gubernator

import (
"github.com/mailgun/gubernator/cache"
"time"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
item, ok := c.Get(r.HashKey())
func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
item, ok := c.GetItem(r.HashKey())
if s != nil {
if !ok {
// Check our store for the item
if item, ok = s.Get(r); ok {
c.Add(item)
}
}
}

if ok {
// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.

rl, ok := item.(*RateLimitResp)
rl, ok := item.Value.(*RateLimitResp)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
return tokenBucket(c, r)
if s != nil {
s.Remove(r.HashKey())
}
return tokenBucket(s, c, r)
}

// If we are already at the limit
if rl.Remaining == 0 {
rl.Status = Status_OVER_LIMIT
// Client is only interested in retrieving the current status
if r.Hits == 0 {
return rl, nil
}

// Client is only interested in retrieving the current status
if r.Hits == 0 {
if s != nil {
defer func() {
s.OnChange(r, item)
}()
}

// If we are already at the limit
if rl.Remaining == 0 {
rl.Status = Status_OVER_LIMIT
return rl, nil
}

Expand All @@ -67,9 +84,8 @@ func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
}

// Add a new rate limit to the cache
expire := cache.MillisecondNow() + r.Duration
expire := MillisecondNow() + r.Duration
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
var err error
expire, err = GregorianExpiration(time.Now(), r.Duration)
if err != nil {
return nil, err
Expand All @@ -88,27 +104,42 @@ func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
status.Remaining = r.Limit
}

c.Add(r.HashKey(), status, expire)
item = &CacheItem{
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: status,
ExpireAt: expire,
}

c.Add(item)
if s != nil {
s.OnChange(r, item)
}
return status, nil
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
type LeakyBucket struct {
Limit int64
Duration int64
LimitRemaining int64
TimeStamp int64
func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
item, ok := c.GetItem(r.HashKey())
if s != nil {
if !ok {
// Check our store for the item
if item, ok = s.Get(r); ok {
c.Add(item)
}
}
}

now := cache.MillisecondNow()
item, ok := c.Get(r.HashKey())
if ok {
b, ok := item.(*LeakyBucket)
b, ok := item.Value.(*LeakyBucketItem)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
return leakyBucket(c, r)
if s != nil {
s.Remove(r.HashKey())
}
return leakyBucket(s, c, r)
}

duration := r.Duration
Expand All @@ -134,19 +165,25 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
elapsed := now - b.TimeStamp
leak := int64(elapsed / rate)

b.LimitRemaining += leak
if b.LimitRemaining > b.Limit {
b.LimitRemaining = b.Limit
b.Remaining += leak
if b.Remaining > b.Limit {
b.Remaining = b.Limit
}

rl := &RateLimitResp{
Limit: b.Limit,
Remaining: b.LimitRemaining,
Remaining: b.Remaining,
Status: Status_UNDER_LIMIT,
}

if s != nil {
defer func() {
s.OnChange(r, item)
}()
}

// If we are already at the limit
if b.LimitRemaining == 0 {
if b.Remaining == 0 {
rl.Status = Status_OVER_LIMIT
rl.ResetTime = now + rate
return rl, nil
Expand All @@ -158,15 +195,15 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
}

// If requested hits takes the remainder
if b.LimitRemaining == r.Hits {
b.LimitRemaining = 0
if b.Remaining == r.Hits {
b.Remaining = 0
rl.Remaining = 0
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
if r.Hits > b.LimitRemaining {
if r.Hits > b.Remaining {
rl.Status = Status_OVER_LIMIT
rl.ResetTime = now + rate
return rl, nil
Expand All @@ -177,8 +214,8 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
return rl, nil
}

b.LimitRemaining -= r.Hits
rl.Remaining = b.LimitRemaining
b.Remaining -= r.Hits
rl.Remaining = b.Remaining
c.UpdateExpiration(r.HashKey(), now*duration)
return rl, nil
}
Expand All @@ -196,11 +233,11 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
}

// Create a new leaky bucket
b := LeakyBucket{
LimitRemaining: r.Limit - r.Hits,
Limit: r.Limit,
Duration: duration,
TimeStamp: now,
b := LeakyBucketItem{
Remaining: r.Limit - r.Hits,
Limit: r.Limit,
Duration: duration,
TimeStamp: now,
}

rl := RateLimitResp{
Expand All @@ -214,10 +251,18 @@ func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
if r.Hits > r.Limit {
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
b.LimitRemaining = 0
b.Remaining = 0
}

c.Add(r.HashKey(), &b, now+duration)

item = &CacheItem{
ExpireAt: now + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
}
c.Add(item)
if s != nil {
s.OnChange(r, item)
}
return &rl, nil
}
12 changes: 0 additions & 12 deletions architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,5 @@ limit request if the cluster is large enough. GLOBAL should only be used for
extremely high volume rate limits that don't scale well with the traditional
non `GLOBAL` behavior.

## Gubernator as a library
If you are using golang, you can use gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
on top. We do this internally here at mailgun with a service we creatively
called `ratelimits` which keeps track of the limits imposed on a per account
basis. In this way you can utilize the power and speed of gubernator but still
layer business logic and integrate domain specific problems into your rate
limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
gubernator server would. All you need to do is provide the GRPC server instance
and tell gubernator where the peers in your cluster are located.

Loading

0 comments on commit d34c9d9

Please sign in to comment.