Skip to content

Commit

Permalink
fix: code review
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Nov 29, 2024
1 parent e37768a commit 3828f31
Show file tree
Hide file tree
Showing 20 changed files with 105 additions and 103 deletions.
5 changes: 3 additions & 2 deletions cmd/waku/server/rest/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
"golang.org/x/time/rate"
)

func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
Expand All @@ -37,8 +38,8 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {

// node2 connects to node1
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
node1 := createNode(t, node.WithWakuFilterFullNode(filter.WithFullNodeRateLimiter(rate.Inf, 0))) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter

node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
Expand Down
2 changes: 1 addition & 1 deletion examples/chat2-reliable/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.SubscribeOption
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()
Expand Down
2 changes: 1 addition & 1 deletion examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.SubscribeOption
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()
Expand Down
6 changes: 3 additions & 3 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m
ctx = instance.ctx
}

var fOptions []filter.SubscribeOption
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
Expand Down Expand Up @@ -141,7 +141,7 @@ func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string,
ctx = instance.ctx
}

var fOptions []filter.SubscribeOption
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
Expand Down Expand Up @@ -185,7 +185,7 @@ func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string
ctx = instance.ctx
}

var fOptions []filter.SubscribeOption
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func possibleRecursiveError(err error) bool {

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.SubscribeOption, 0)
options := make([]filter.FilterSubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type WakuNodeParameters struct {
enableRelay bool
enableFilterLightNode bool
enableFilterFullNode bool
filterOpts []filter.FullNodeOption
filterOpts []filter.Option
pubsubOpts []pubsub.Option
lightpushOpts []lightpush.Option

Expand Down Expand Up @@ -471,7 +471,7 @@ func WithWakuFilterLightNode() WakuNodeOption {

// WithWakuFilterFullNode enables the Waku Filter V2 protocol full node functionality.
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterFullNode(filterOpts ...filter.FullNodeOption) WakuNodeOption {
func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterFullNode = true
params.filterOpts = filterOpts
Expand Down
25 changes: 14 additions & 11 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea

if !wf.limiter.Allow(peerID) {
wf.metrics.RecordError(rateLimitFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}

Expand Down Expand Up @@ -318,8 +321,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
return nil
}

func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []SubscribeOption) (*SubscribeParameters, map[string][]string, error) {
params := new(SubscribeParameters)
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) {
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm
Expand Down Expand Up @@ -377,7 +380,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -479,8 +482,8 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
}

func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) (*SubscribeParameters, error) {
params := new(SubscribeParameters)
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) {
params := new(FilterSubscribeParameters)
params.log = wf.log
opts = append(DefaultUnsubscribeOptions(), opts...)
for _, opt := range opts {
Expand All @@ -492,14 +495,14 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption)
return params, nil
}

func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...PingOption) error {
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return err
}

params := &PingParameters{}
params := &FilterPingParameters{}
for _, opt := range opts {
opt(params)
}
Expand All @@ -516,7 +519,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
}

// Unsubscribe is used to stop receiving messages from specified peers for the content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -621,7 +624,7 @@ func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) boo
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
opts ...SubscribeOption) (*WakuFilterPushResult, error) {
opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -668,7 +671,7 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, reques

// close all subscribe for selectedPeer or if selectedPeer == "", then all peers
// send the unsubscribeAll request to the peers
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -739,7 +742,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Subsc
}

// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions waku/v2/protocol/filter/filter_proto_ident_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *FilterTestSuite) TestMultipleMessages() {
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"})
}

func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *SubscribeParameters,
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {

const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd")
Expand Down Expand Up @@ -111,7 +111,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa
return nil
}

func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand All @@ -129,7 +129,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
}

params := new(SubscribeParameters)
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/protocol/filter/filter_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())

// With valid peer
opts := []SubscribeOption{WithPeer(s.FullNodeHost.ID())}
opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())}

// Positive case
_, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
Expand All @@ -401,7 +401,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
addr := s.FullNodeHost.Addrs()[0]

// Combine mutually exclusive options
opts = []SubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}
opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}

// Should fail on wrong option combination
_, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
Expand Down
Loading

0 comments on commit 3828f31

Please sign in to comment.