From 3828f3174183c1a07c1db835199e7b8d9784c703 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 28 Nov 2024 15:58:44 -0400 Subject: [PATCH] fix: code review --- cmd/waku/server/rest/filter_test.go | 5 +- examples/chat2-reliable/chat.go | 2 +- examples/chat2/chat.go | 2 +- library/filter.go | 6 +- waku/v2/api/filter/filter.go | 2 +- waku/v2/node/wakuoptions.go | 4 +- waku/v2/protocol/filter/client.go | 25 ++--- .../filter/filter_proto_ident_test.go | 6 +- .../protocol/filter/filter_subscribe_test.go | 4 +- waku/v2/protocol/filter/options.go | 96 +++++++++---------- waku/v2/protocol/filter/options_test.go | 12 +-- waku/v2/protocol/filter/server.go | 6 +- waku/v2/protocol/lightpush/waku_lightpush.go | 4 +- .../lightpush/waku_lightpush_option.go | 8 +- .../protocol/lightpush/waku_lightpush_test.go | 7 +- waku/v2/protocol/peer_exchange/client.go | 2 +- waku/v2/protocol/peer_exchange/protocol.go | 2 +- .../waku_peer_exchange_option.go | 8 +- .../peer_exchange/waku_peer_exchange_test.go | 4 +- waku/v2/utils/limiter.go | 3 - 20 files changed, 105 insertions(+), 103 deletions(-) diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 1641175b7..7877b61ff 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" + "golang.org/x/time/rate" ) func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { @@ -37,8 +38,8 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { // node2 connects to node1 func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) { - node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter - node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter + node1 := createNode(t, node.WithWakuFilterFullNode(filter.WithFullNodeRateLimiter(rate.Inf, 0))) // full node filter + node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1) diff --git a/examples/chat2-reliable/chat.go b/examples/chat2-reliable/chat.go index bbfe60bc3..689a4a460 100644 --- a/examples/chat2-reliable/chat.go +++ b/examples/chat2-reliable/chat.go @@ -79,7 +79,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(options.ContentTopic), } - var filterOpt filter.SubscribeOption + var filterOpt filter.FilterSubscribeOption peerID, err := options.Filter.NodePeerID() if err != nil { filterOpt = filter.WithAutomaticPeerSelection() diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index c33667765..48c39d4fd 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -65,7 +65,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(options.ContentTopic), } - var filterOpt filter.SubscribeOption + var filterOpt filter.FilterSubscribeOption peerID, err := options.Filter.NodePeerID() if err != nil { filterOpt = filter.WithAutomaticPeerSelection() diff --git a/library/filter.go b/library/filter.go index af8a1e4db..ef1d8cc0d 100644 --- a/library/filter.go +++ b/library/filter.go @@ -57,7 +57,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m ctx = instance.ctx } - var fOptions []filter.SubscribeOption + var fOptions []filter.FilterSubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { @@ -141,7 +141,7 @@ func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string, ctx = instance.ctx } - var fOptions []filter.SubscribeOption + var fOptions []filter.FilterSubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { @@ -185,7 +185,7 @@ func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string ctx = instance.ctx } - var fOptions []filter.SubscribeOption + var fOptions []filter.FilterSubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index e9d1ebd04..020bb23f5 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -194,7 +194,7 @@ func possibleRecursiveError(err error) bool { func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { // Low-level subscribe, returns a set of SubscriptionDetails - options := make([]filter.SubscribeOption, 0) + options := make([]filter.FilterSubscribeOption, 0) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) for _, p := range apiSub.Config.Peers { options = append(options, filter.WithPeer(p)) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 14df19215..7e7c5e103 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -85,7 +85,7 @@ type WakuNodeParameters struct { enableRelay bool enableFilterLightNode bool enableFilterFullNode bool - filterOpts []filter.FullNodeOption + filterOpts []filter.Option pubsubOpts []pubsub.Option lightpushOpts []lightpush.Option @@ -471,7 +471,7 @@ func WithWakuFilterLightNode() WakuNodeOption { // WithWakuFilterFullNode enables the Waku Filter V2 protocol full node functionality. // This WakuNodeOption accepts a list of WakuFilter options to setup the protocol -func WithWakuFilterFullNode(filterOpts ...filter.FullNodeOption) WakuNodeOption { +func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilterFullNode = true params.filterOpts = filterOpts diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index c9e5b7454..3d81048d6 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -167,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea if !wf.limiter.Allow(peerID) { wf.metrics.RecordError(rateLimitFailure) + if err := stream.Reset(); err != nil { + wf.log.Error("resetting connection", zap.Error(err)) + } return } @@ -318,8 +321,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, return nil } -func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []SubscribeOption) (*SubscribeParameters, map[string][]string, error) { - params := new(SubscribeParameters) +func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) { + params := new(FilterSubscribeParameters) params.log = wf.log params.host = wf.h params.pm = wf.pm @@ -377,7 +380,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. // Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics. -func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -479,8 +482,8 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter return wf.subscriptions.NewSubscription(peerID, contentFilter), nil } -func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) (*SubscribeParameters, error) { - params := new(SubscribeParameters) +func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) { + params := new(FilterSubscribeParameters) params.log = wf.log opts = append(DefaultUnsubscribeOptions(), opts...) for _, opt := range opts { @@ -492,14 +495,14 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) return params, nil } -func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...PingOption) error { +func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { return err } - params := &PingParameters{} + params := &FilterPingParameters{} for _, opt := range opts { opt(params) } @@ -516,7 +519,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts .. } // Unsubscribe is used to stop receiving messages from specified peers for the content filter -func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) (*WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -621,7 +624,7 @@ func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) boo // If there are no more subscriptions matching the passed [peer, contentFilter] pair, // server unsubscribe is also performed func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails, - opts ...SubscribeOption) (*WakuFilterPushResult, error) { + opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -668,7 +671,7 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, reques // close all subscribe for selectedPeer or if selectedPeer == "", then all peers // send the unsubscribeAll request to the peers -func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err @@ -739,7 +742,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Subsc } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index 3c87ef8d9..6614bfdc6 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -58,7 +58,7 @@ func (s *FilterTestSuite) TestMultipleMessages() { s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"}) } -func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *SubscribeParameters, +func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error { const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd") @@ -111,7 +111,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa return nil } -func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -129,7 +129,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) } - params := new(SubscribeParameters) + params := new(FilterSubscribeParameters) params.log = wf.log params.host = wf.h params.pm = wf.pm diff --git a/waku/v2/protocol/filter/filter_subscribe_test.go b/waku/v2/protocol/filter/filter_subscribe_test.go index edaa793f5..c8ec33c9f 100644 --- a/waku/v2/protocol/filter/filter_subscribe_test.go +++ b/waku/v2/protocol/filter/filter_subscribe_test.go @@ -392,7 +392,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() { s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // With valid peer - opts := []SubscribeOption{WithPeer(s.FullNodeHost.ID())} + opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())} // Positive case _, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) @@ -401,7 +401,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() { addr := s.FullNodeHost.Addrs()[0] // Combine mutually exclusive options - opts = []SubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)} + opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)} // Should fail on wrong option combination _, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 6f1957b5d..bde105e47 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -14,28 +14,28 @@ import ( "golang.org/x/time/rate" ) -func (old *SubscribeParameters) Copy() *SubscribeParameters { - return &SubscribeParameters{ +func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { + return &FilterSubscribeParameters{ selectedPeers: old.selectedPeers, requestID: old.requestID, } } type ( - PingParameters struct { + FilterPingParameters struct { requestID []byte } - PingOption func(*PingParameters) + FilterPingOption func(*FilterPingParameters) ) -func WithPingRequestId(requestId []byte) PingOption { - return func(params *PingParameters) { +func WithPingRequestId(requestId []byte) FilterPingOption { + return func(params *FilterPingParameters) { params.requestID = requestId } } type ( - SubscribeParameters struct { + FilterSubscribeParameters struct { selectedPeers peer.IDSlice peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection @@ -54,7 +54,7 @@ type ( wg *sync.WaitGroup } - FullNodeParameters struct { + FilterParameters struct { Timeout time.Duration MaxSubscribers int pm *peermanager.PeerManager @@ -62,7 +62,7 @@ type ( limitB int } - FullNodeOption func(*FullNodeParameters) + Option func(*FilterParameters) LightNodeParameters struct { limitR rate.Limit @@ -71,7 +71,7 @@ type ( LightNodeOption func(*LightNodeParameters) - SubscribeOption func(*SubscribeParameters) error + FilterSubscribeOption func(*FilterSubscribeParameters) error ) func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption { @@ -83,20 +83,20 @@ func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption { func DefaultLightNodeOptions() []LightNodeOption { return []LightNodeOption{ - WithLightNodeRateLimiter(rate.Inf, 0), + WithLightNodeRateLimiter(1, 1), } } -func WithTimeout(timeout time.Duration) FullNodeOption { - return func(params *FullNodeParameters) { +func WithTimeout(timeout time.Duration) Option { + return func(params *FilterParameters) { params.Timeout = timeout } } // WithPeer is an option used to specify the peerID to request the message history. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeer(p peer.ID) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithPeer(p peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.selectedPeers = append(params.selectedPeers, p) if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -108,8 +108,8 @@ func WithPeer(p peer.ID) SubscribeOption { // WithPeerAddr is an option used to specify a peerAddress. // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeerAddr(pAddr multiaddr.Multiaddr) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.peerAddr = pAddr if len(params.selectedPeers) != 0 { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -118,16 +118,16 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) SubscribeOption { } } -func WithMaxPeersPerContentFilter(numPeers int) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.maxPeers = numPeers return nil } } // WithPeersToExclude option excludes the peers that are specified from selection -func WithPeersToExclude(peers ...peer.ID) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.peersToExclude = peermanager.PeerSliceToMap(peers) return nil } @@ -136,8 +136,8 @@ func WithPeersToExclude(peers ...peer.ID) SubscribeOption { // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers return nil @@ -148,8 +148,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) SubscribeOption { // with the lowest ping If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore -func WithFastestPeerSelection(fromThesePeers ...peer.ID) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.peerSelectionType = peermanager.LowestRTT return nil } @@ -157,8 +157,8 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) SubscribeOption { // WithRequestID is an option to set a specific request ID to be used when // creating/removing a filter subscription -func WithRequestID(requestID []byte) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithRequestID(requestID []byte) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.requestID = requestID return nil } @@ -166,23 +166,23 @@ func WithRequestID(requestID []byte) SubscribeOption { // WithAutomaticRequestID is an option to automatically generate a request ID // when creating a filter subscription -func WithAutomaticRequestID() SubscribeOption { - return func(params *SubscribeParameters) error { +func WithAutomaticRequestID() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.requestID = protocol.GenerateRequestID() return nil } } -func DefaultSubscriptionOptions() []SubscribeOption { - return []SubscribeOption{ +func DefaultSubscriptionOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithMaxPeersPerContentFilter(1), } } -func UnsubscribeAll() SubscribeOption { - return func(params *SubscribeParameters) error { +func UnsubscribeAll() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.unsubscribeAll = true return nil } @@ -190,8 +190,8 @@ func UnsubscribeAll() SubscribeOption { // WithWaitGroup allows specifying a waitgroup to wait until all // unsubscribe requests are complete before the function is complete -func WithWaitGroup(wg *sync.WaitGroup) SubscribeOption { - return func(params *SubscribeParameters) error { +func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.wg = wg return nil } @@ -199,43 +199,43 @@ func WithWaitGroup(wg *sync.WaitGroup) SubscribeOption { // DontWait is used to fire and forget an unsubscription, and don't // care about the results of it -func DontWait() SubscribeOption { - return func(params *SubscribeParameters) error { +func DontWait() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { params.wg = nil return nil } } -func DefaultUnsubscribeOptions() []SubscribeOption { - return []SubscribeOption{ +func DefaultUnsubscribeOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ WithAutomaticRequestID(), WithWaitGroup(&sync.WaitGroup{}), } } -func WithMaxSubscribers(maxSubscribers int) FullNodeOption { - return func(params *FullNodeParameters) { +func WithMaxSubscribers(maxSubscribers int) Option { + return func(params *FilterParameters) { params.MaxSubscribers = maxSubscribers } } -func WithPeerManager(pm *peermanager.PeerManager) FullNodeOption { - return func(params *FullNodeParameters) { +func WithPeerManager(pm *peermanager.PeerManager) Option { + return func(params *FilterParameters) { params.pm = pm } } -func WithFullNodeRateLimiter(r rate.Limit, b int) FullNodeOption { - return func(params *FullNodeParameters) { +func WithFullNodeRateLimiter(r rate.Limit, b int) Option { + return func(params *FilterParameters) { params.limitR = r params.limitB = b } } -func DefaultFullNodeOptions() []FullNodeOption { - return []FullNodeOption{ +func DefaultOptions() []Option { + return []Option{ WithTimeout(DefaultIdleSubscriptionTimeout), WithMaxSubscribers(DefaultMaxSubscribers), - WithFullNodeRateLimiter(rate.Inf, 0), + WithFullNodeRateLimiter(1, 1), } } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index a101ef2be..a8a361485 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -19,13 +19,13 @@ func TestFilterOption(t *testing.T) { require.NoError(t, err) // subscribe options - options := []SubscribeOption{ + options := []FilterSubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), WithFastestPeerSelection(), } - params := new(SubscribeParameters) + params := new(FilterSubscribeParameters) params.host = host params.log = utils.Logger() @@ -38,13 +38,13 @@ func TestFilterOption(t *testing.T) { require.NotEqual(t, 0, params.selectedPeers) // Unsubscribe options - options2 := []SubscribeOption{ + options2 := []FilterSubscribeOption{ WithAutomaticRequestID(), UnsubscribeAll(), WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), } - params2 := new(SubscribeParameters) + params2 := new(FilterSubscribeParameters) for _, opt := range options2 { err := opt(params2) @@ -57,12 +57,12 @@ func TestFilterOption(t *testing.T) { // Mutually Exclusive options maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") require.NoError(t, err) - options3 := []SubscribeOption{ + options3 := []FilterSubscribeOption{ WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), WithPeerAddr(maddr), } - params3 := new(SubscribeParameters) + params3 := new(FilterSubscribeParameters) for idx, opt := range options3 { err := opt(params3) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index b9c88b451..82c4c47da 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -45,12 +45,12 @@ type ( ) // NewWakuFilterFullNode returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...FullNodeOption) *WakuFilterFullNode { +func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuFilterFullNode { wf := new(WakuFilterFullNode) wf.log = log.Named("filterv2-fullnode") - params := new(FullNodeParameters) - optList := DefaultFullNodeOptions() + params := new(FilterParameters) + optList := DefaultOptions() optList = append(optList, opts...) for _, opt := range optList { opt(params) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 8f286491e..7e411a4ac 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -58,7 +58,7 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.metrics = newMetrics(reg) params := &LightpushParameters{} - opts = append(DefaultOptions(), opts...) + opts = append(DefaultLightpushOptions(), opts...) for _, opt := range opts { opt(params) } @@ -257,7 +257,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe params.pm = wakuLP.pm var err error - optList := append(DefaultRequestOptions(wakuLP.h), opts...) + optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { err := opt(params) if err != nil { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index a399ee59e..b9740ab4c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -28,9 +28,9 @@ func WithRateLimiter(r rate.Limit, b int) Option { } } -func DefaultOptions() []Option { +func DefaultLightpushOptions() []Option { return []Option{ - WithRateLimiter(rate.Inf, 0), + WithRateLimiter(1, 1), } } @@ -138,8 +138,8 @@ func WithAutomaticRequestID() RequestOption { } } -// DefaultRequestOptions are the default options to be used when using the lightpush protocol -func DefaultRequestOptions(host host.Host) []RequestOption { +// DefaultOptions are the default options to be used when using the lightpush protocol +func DefaultOptions(host host.Host) []RequestOption { return []RequestOption{ WithAutomaticPeerSelection(), WithMaxPeers(1), //keeping default as 2 for status use-case diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 6ff17f634..5b4e9111b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/waku-org/go-waku/waku/v2/peermanager" + "golang.org/x/time/rate" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" @@ -273,7 +274,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0)) lightPushNode2.SetHost(host2) err := lightPushNode2.Start(ctx) require.NoError(t, err) @@ -358,7 +359,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) + client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0)) client.SetHost(clientHost) // Node2 @@ -366,7 +367,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0)) lightPushNode2.SetHost(host2) err = lightPushNode2.Start(ctx) require.NoError(t, err) diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 01bcfaefb..ef1f7bb9a 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -26,7 +26,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts params.log = wakuPX.log params.pm = wakuPX.pm - optList := DefaultRequestOptions(wakuPX.h) + optList := DefaultOptions(wakuPX.h) optList = append(optList, opts...) for _, opt := range optList { err := opt(params) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 89f6cca8e..dc181fb42 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -67,7 +67,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect wakuPX.CommonService = service.NewCommonService() params := &PeerExchangeParameters{} - opts = append(DefaultOptions(), opts...) + opts = append(DefaultPeerExchangeOptions(), opts...) for _, opt := range opts { opt(params) } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index a1e7288a6..c25078b73 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -26,9 +26,9 @@ func WithRateLimiter(r rate.Limit, b int) Option { } } -func DefaultOptions() []Option { +func DefaultPeerExchangeOptions() []Option { return []Option{ - WithRateLimiter(rate.Inf, 0), + WithRateLimiter(1, 1), } } @@ -95,8 +95,8 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption { } } -// DefaultRequestOptions are the default options to be used when using the lightpush protocol -func DefaultRequestOptions(host host.Host) []RequestOption { +// DefaultOptions are the default options to be used when using the lightpush protocol +func DefaultOptions(host host.Host) []RequestOption { return []RequestOption{ WithAutomaticPeerSelection(), } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index b2b816149..b0de6c5ab 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -248,7 +248,7 @@ func TestPeerExchangeOptions(t *testing.T) { params.log = px1.log params.pm = px1.pm - optList := DefaultRequestOptions(px1.h) + optList := DefaultOptions(px1.h) optList = append(optList, WithPeerAddr(host1.Addrs()[0])) for _, opt := range optList { err := opt(params) @@ -258,7 +258,7 @@ func TestPeerExchangeOptions(t *testing.T) { require.Equal(t, host1.Addrs()[0], params.peerAddr) // Test WithFastestPeerSelection() - optList = DefaultRequestOptions(px1.h) + optList = DefaultOptions(px1.h) optList = append(optList, WithFastestPeerSelection(host1.ID())) for _, opt := range optList { err := opt(params) diff --git a/waku/v2/utils/limiter.go b/waku/v2/utils/limiter.go index 3c865bcb7..a587659f8 100644 --- a/waku/v2/utils/limiter.go +++ b/waku/v2/utils/limiter.go @@ -65,8 +65,5 @@ func (r *RateLimiter) Allow(peerID peer.ID) bool { } func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error { - r.Lock() - defer r.Unlock() - return r.getOrCreate(peerID).Wait(ctx) }