Skip to content

Commit

Permalink
feat: rate limit with rln configuration (#1262)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaichaosun authored Dec 24, 2024
1 parent 78b522d commit 6dcf177
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 18 deletions.
16 changes: 9 additions & 7 deletions waku/v2/api/publish/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,9 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

const DefaultPeersToPublishForLightpush = 2
const DefaultPublishingLimiterRate = rate.Limit(5)
const DefaultPublishingLimitBurst = 10

type PublishMethod int

Expand Down Expand Up @@ -53,7 +50,7 @@ type MessageSender struct {
publishMethod PublishMethod
publisher Publisher
messageSentCheck ISentCheck
rateLimiter *PublishRateLimiter
rateLimiter PublishRateLimiter
logger *zap.Logger
evtMessageSent event.Emitter
}
Expand Down Expand Up @@ -82,14 +79,19 @@ func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request {
return r
}

func NewMessageSender(publishMethod PublishMethod, publisher Publisher, logger *zap.Logger) (*MessageSender, error) {
func NewMessageSender(publishMethod PublishMethod, publisher Publisher, rateLimiter PublishRateLimiter, logger *zap.Logger) (*MessageSender, error) {
if publishMethod == UnknownMethod {
return nil, errors.New("publish method is required")
}

if rateLimiter == nil {
rateLimiter = NewDefaultRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst)
}

return &MessageSender{
publishMethod: publishMethod,
publisher: publisher,
rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst),
rateLimiter: rateLimiter,
logger: logger,
}, nil
}
Expand All @@ -99,7 +101,7 @@ func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *Mess
return ms
}

func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender {
func (ms *MessageSender) WithRateLimiting(rateLimiter PublishRateLimiter) *MessageSender {
ms.rateLimiter = rateLimiter
return ms
}
Expand Down
10 changes: 5 additions & 5 deletions waku/v2/api/publish/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *MockMessageSentCheck) Start() {
}

func TestNewSenderWithUnknownMethod(t *testing.T) {
sender, err := NewMessageSender(UnknownMethod, nil, nil)
sender, err := NewMessageSender(UnknownMethod, nil, nil, nil)
require.NotNil(t, err)
require.Nil(t, sender)
}
Expand All @@ -55,7 +55,7 @@ func TestNewSenderWithRelay(t *testing.T) {
_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
publisher := NewDefaultPublisher(nil, relayNode)
sender, err := NewMessageSender(Relay, publisher, utils.Logger())
sender, err := NewMessageSender(Relay, publisher, nil, utils.Logger())
require.Nil(t, err)
require.NotNil(t, sender)
require.Nil(t, sender.messageSentCheck)
Expand All @@ -81,7 +81,7 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) {
_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
publisher := NewDefaultPublisher(nil, relayNode)
sender, err := NewMessageSender(Relay, publisher, utils.Logger())
sender, err := NewMessageSender(Relay, publisher, nil, utils.Logger())

check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)}
sender.WithMessageSentCheck(check)
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestNewSenderWithRelayAndMessageSentCheck(t *testing.T) {
}

func TestNewSenderWithLightPush(t *testing.T) {
sender, err := NewMessageSender(LightPush, nil, nil)
sender, err := NewMessageSender(LightPush, nil, nil, nil)
require.Nil(t, err)
require.NotNil(t, sender)
require.Equal(t, LightPush, sender.publishMethod)
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestMessageSentEmitter(t *testing.T) {
_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
publisher := NewDefaultPublisher(nil, relayNode)
sender, err := NewMessageSender(Relay, publisher, utils.Logger())
sender, err := NewMessageSender(Relay, publisher, nil, utils.Logger())
require.Nil(t, err)

check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)}
Expand Down
18 changes: 13 additions & 5 deletions waku/v2/api/publish/rate_limiting.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,30 @@ import (
"golang.org/x/time/rate"
)

const DefaultPublishingLimiterRate = rate.Limit(5)
const DefaultPublishingLimitBurst = 10

// RateLimiter
type PublishRateLimiter interface {
Check(ctx context.Context, logger *zap.Logger) error
}

// PublishRateLimiter is used to decorate publish functions to limit the
// number of messages per second that can be published
type PublishRateLimiter struct {
type DefaultRateLimiter struct {
limiter *rate.Limiter
}

// NewPublishRateLimiter will create a new instance of PublishRateLimiter.
// You can specify an rate.Inf value to in practice ignore the rate limiting
func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter {
return &PublishRateLimiter{
func NewDefaultRateLimiter(r rate.Limit, b int) *DefaultRateLimiter {
return &DefaultRateLimiter{
limiter: rate.NewLimiter(r, b),
}
}

// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied
func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
func (p *DefaultRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
return func(envelope *protocol.Envelope, logger *zap.Logger) error {
if err := p.Check(ctx, logger); err != nil {
return err
Expand All @@ -33,7 +41,7 @@ func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn Pu
}
}

func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
func (p *DefaultRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
if err := p.limiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("could not send message (limiter)", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/publish/rate_limiting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestRateLimit(t *testing.T) {
r := NewPublishRateLimiter(rate.Limit(1), 1)
r := NewDefaultRateLimiter(rate.Limit(1), 1)
l := utils.Logger()

var counter atomic.Int32
Expand Down
63 changes: 63 additions & 0 deletions waku/v2/api/publish/rln_rate_limiting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package publish

import (
"context"
"errors"
"sync"
"time"

"go.uber.org/zap"
)

var ErrRateLimited = errors.New("rate limit exceeded")

const RlnLimiterCapacity = 100
const RlnLimiterRefillInterval = 10 * time.Minute

// RlnRateLimiter is used to rate limit the outgoing messages,
// The capacity and refillInterval comes from RLN contract configuration.
type RlnRateLimiter struct {
mu sync.Mutex
capacity int
tokens int
refillInterval time.Duration
lastRefill time.Time
}

// NewRlnPublishRateLimiter creates a new rate limiter, starts with a full capacity bucket.
func NewRlnRateLimiter(capacity int, refillInterval time.Duration) *RlnRateLimiter {
return &RlnRateLimiter{
capacity: capacity,
tokens: capacity, // Start with a full bucket
refillInterval: refillInterval,
lastRefill: time.Now(),
}
}

// Allow checks if a token can be consumed, and refills the bucket if necessary
func (rl *RlnRateLimiter) Allow() bool {
rl.mu.Lock()
defer rl.mu.Unlock()

// Refill tokens if the refill interval has passed
now := time.Now()
if now.Sub(rl.lastRefill) >= rl.refillInterval {
rl.tokens = rl.capacity // Refill the bucket
rl.lastRefill = now
}

// Check if there are tokens available
if rl.tokens > 0 {
rl.tokens--
return true
}

return false
}

func (rl *RlnRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
if rl.Allow() {
return nil
}
return ErrRateLimited
}
26 changes: 26 additions & 0 deletions waku/v2/api/publish/rln_rate_limiting_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package publish

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
)

func TestRlnRateLimit(t *testing.T) {
r := NewRlnRateLimiter(3, 5*time.Second)
l := utils.Logger()

for i := 0; i < 3; i++ {
require.NoError(t, r.Check(context.Background(), l))
}
require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited)

time.Sleep(6 * time.Second)
for i := 0; i < 3; i++ {
require.NoError(t, r.Check(context.Background(), l))
}
require.ErrorIs(t, r.Check(context.Background(), l), ErrRateLimited)
}

0 comments on commit 6dcf177

Please sign in to comment.