Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Merge pull request #14 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Release Candidate 0.4.0
  • Loading branch information
thrawn01 authored Jul 19, 2019
2 parents 5eea10b + 22ed42a commit b326bac
Show file tree
Hide file tree
Showing 30 changed files with 99 additions and 69 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.4.0] - 2019-07-16
### Added
* Support for GLOBAL behavior
* Improved README documentation
### Changes
* GetRateLimits() now fetches rate limits asynchronously

## [0.3.2] - 2019-06-03
### Changes
* Now properly respecting the maxBatchLimit when talking with peers
Expand Down
2 changes: 1 addition & 1 deletion golang/algorithms.go → algorithms.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package gubernator

import (
"github.com/mailgun/gubernator/golang/cache"
"github.com/mailgun/gubernator/cache"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
Expand Down
4 changes: 2 additions & 2 deletions golang/benchmark_test.go → benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package gubernator_test

import (
"context"
guber "github.com/mailgun/gubernator/golang"
"github.com/mailgun/gubernator/golang/cluster"
guber "github.com/mailgun/gubernator"
"github.com/mailgun/gubernator/cluster"
"github.com/mailgun/holster"
"testing"
)
Expand Down
2 changes: 1 addition & 1 deletion golang/cache.go → cache.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package gubernator

import (
"github.com/mailgun/gubernator/golang/cache"
"github.com/mailgun/gubernator/cache"
"github.com/mailgun/holster"
)

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion golang/cluster/cluster.go → cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cluster

import (
"fmt"
"github.com/mailgun/gubernator/golang"
"github.com/mailgun/gubernator"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"os"
"time"

guber "github.com/mailgun/gubernator/golang"
guber "github.com/mailgun/gubernator"
"github.com/mailgun/holster"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"os/signal"

"github.com/mailgun/gubernator/golang/cluster"
"github.com/mailgun/gubernator/cluster"
"github.com/sirupsen/logrus"
)

Expand Down
4 changes: 2 additions & 2 deletions golang/cmd/gubernator/main.go → cmd/gubernator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
etcd "github.com/coreos/etcd/clientv3"
"github.com/ghodss/yaml"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/mailgun/gubernator/golang"
"github.com/mailgun/gubernator/golang/cache"
"github.com/mailgun/gubernator"
"github.com/mailgun/gubernator/cache"
"github.com/mailgun/holster"
"github.com/mailgun/holster/clock"
"github.com/mailgun/holster/etcdutil"
Expand Down
2 changes: 1 addition & 1 deletion golang/config.go → config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package gubernator

import (
"fmt"
"github.com/mailgun/gubernator/golang/cache"
"github.com/mailgun/gubernator/cache"
"github.com/mailgun/holster"
"google.golang.org/grpc"
"time"
Expand Down
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions golang/functional_test.go → functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"testing"
"time"

guber "github.com/mailgun/gubernator/golang"
"github.com/mailgun/gubernator/golang/cluster"
guber "github.com/mailgun/gubernator"
"github.com/mailgun/gubernator/cluster"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down
File renamed without changes.
125 changes: 76 additions & 49 deletions golang/gubernator.go → gubernator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,64 +63,91 @@ func (s *Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*Get
"Requests.RateLimits list too large; max size is '%d'", maxBatchSize)
}

// TODO: Support getting multiple keys in an async manner (FanOut)
for _, req := range r.Requests {
globalKey := req.Name + "_" + req.UniqueKey
var rl *RateLimitResp
var peer *PeerClient
var err error

if len(req.UniqueKey) == 0 {
rl = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
goto NextRateLimit
}

if len(req.Name) == 0 {
rl = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
goto NextRateLimit
}
type InOut struct {
In *RateLimitReq
Idx int
Out *RateLimitResp
}

peer, err = s.GetPeer(globalKey)
if err != nil {
rl = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", globalKey, err),
}
goto NextRateLimit
}
// Asynchronously fetch rate limits
out := make(chan InOut)
go func() {
fan := holster.NewFanOut(1000)
// For each item in the request body
for i, item := range r.Requests {
fan.Run(func(data interface{}) error {
inOut := data.(InOut)

globalKey := inOut.In.Name + "_" + inOut.In.UniqueKey
var peer *PeerClient
var err error

if len(inOut.In.UniqueKey) == 0 {
inOut.Out = &RateLimitResp{Error: "field 'unique_key' cannot be empty"}
out <- inOut
return nil
}

// If our server instance is the owner of this rate limit
if peer.isOwner {
// Apply our rate limit algorithm to the request
rl, err = s.getRateLimit(req)
if err != nil {
rl = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", globalKey, err),
if len(inOut.In.Name) == 0 {
inOut.Out = &RateLimitResp{Error: "field 'namespace' cannot be empty"}
out <- inOut
return nil
}
goto NextRateLimit
}
} else {
if req.Behavior == Behavior_GLOBAL {
rl, err = s.getGlobalRateLimit(req)

peer, err = s.GetPeer(globalKey)
if err != nil {
rl = &RateLimitResp{Error: err.Error()}
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while finding peer that owns rate limit '%s' - '%s'", globalKey, err),
}
out <- inOut
return nil
}
goto NextRateLimit
}

// Make an RPC call to the peer that owns this rate limit
rl, err = peer.GetPeerRateLimit(ctx, req)
if err != nil {
rl = &RateLimitResp{
Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", globalKey, err),

// If our server instance is the owner of this rate limit
if peer.isOwner {
// Apply our rate limit algorithm to the request
inOut.Out, err = s.getRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while applying rate limit for '%s' - '%s'", globalKey, err),
}
}
} else {
if inOut.In.Behavior == Behavior_GLOBAL {
inOut.Out, err = s.getGlobalRateLimit(inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{Error: err.Error()}
}
out <- inOut
return nil
}

// Make an RPC call to the peer that owns this rate limit
inOut.Out, err = peer.GetPeerRateLimit(ctx, inOut.In)
if err != nil {
inOut.Out = &RateLimitResp{
Error: fmt.Sprintf("while fetching rate limit '%s' from peer - '%s'", globalKey, err),
}
}

// Inform the client of the owner key of the key
inOut.Out.Metadata = map[string]string{"owner": peer.host}
}
}

// Inform the client of the owner key of the key
rl.Metadata = map[string]string{"owner": peer.host}
out <- inOut
return nil
}, InOut{In: item, Idx: i})
}
NextRateLimit:
resp.Responses = append(resp.Responses, rl)
fan.Wait()
close(out)
}()

resp.Responses = make([]*RateLimitResp, len(r.Requests))
// Collect the async responses as they return
for i := range out {
resp.Responses[i.Idx] = i.Out
}

return &resp, nil
}

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion golang/metrics.go → metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/mailgun/gubernator/golang/cache"
"github.com/mailgun/gubernator/cache"
"google.golang.org/grpc/stats"
)

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
4 changes: 2 additions & 2 deletions setup.py → python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
author="Derrick J. Wippler",
author_email='[email protected]',
url='https://github.com/mailgun/gubernator',
package_dir={'': 'python'},
packages=find_packages('python', exclude=['tests']),
package_dir={'': '.'},
packages=find_packages('.', exclude=['tests']),
install_requires=requirements,
license="Apache Software License 2.0",
python_requires='>=2.7',
Expand Down
2 changes: 1 addition & 1 deletion golang/statsd.go → statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"strings"
"time"

"github.com/mailgun/gubernator/golang/cache"
"github.com/mailgun/gubernator/cache"
"github.com/mailgun/holster"
"github.com/smira/go-statsd"
"google.golang.org/grpc/stats"
Expand Down
4 changes: 0 additions & 4 deletions stub.go

This file was deleted.

0 comments on commit b326bac

Please sign in to comment.