From 035dc75a8a111b3e93e7927bccba6c0d04eeb87e Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Thu, 21 Sep 2023 13:36:04 +0300 Subject: [PATCH] Change UnsubscribeWithSubscription so that it's single sub-specific Also merge FilterSubscribe and FilterUnsubscribe options/params --- library/filter.go | 8 +- waku/v2/protocol/filter/client.go | 107 +++++++++++-------- waku/v2/protocol/filter/filter_test.go | 49 +++++++-- waku/v2/protocol/filter/options.go | 57 +++------- waku/v2/protocol/filter/options_test.go | 10 +- waku/v2/protocol/filter/subscriptions_map.go | 32 +----- 6 files changed, 132 insertions(+), 131 deletions(-) diff --git a/library/filter.go b/library/filter.go index a66a6fbef..009167653 100644 --- a/library/filter.go +++ b/library/filter.go @@ -134,13 +134,13 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error { ctx = context.Background() } - var fOptions []filter.FilterUnsubscribeOption + var fOptions []filter.FilterSubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { return err } - fOptions = append(fOptions, filter.Peer(p)) + fOptions = append(fOptions, filter.WithPeer(p)) } else { return errors.New("peerID is required") } @@ -176,13 +176,13 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) { ctx = context.Background() } - var fOptions []filter.FilterUnsubscribeOption + var fOptions []filter.FilterSubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { return "", err } - fOptions = append(fOptions, filter.Peer(p)) + fOptions = append(fOptions, filter.WithPeer(p)) } else { fOptions = append(fOptions, filter.UnsubscribeAll()) } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index a07469a3a..dd47db61c 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -58,6 +58,10 @@ func (cf ContentFilter) ContentTopicsList() []string { return maps.Keys(cf.ContentTopics) } +func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter { + return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)} +} + type WakuFilterPushResult struct { Err error PeerID peer.ID @@ -149,7 +153,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str } else { pubSubTopic = *messagePush.PubsubTopic } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage.ContentTopic) { + if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { logger.Warn("received messagepush with invalid subscription parameters", logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) @@ -304,20 +308,14 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont var cFilter ContentFilter cFilter.PubsubTopic = pubSubTopic cFilter.ContentTopics = NewContentTopicSet(cTopics...) - existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter) - if existingSub != nil { - subscriptions = append(subscriptions, existingSub) - } else { - //TO OPTIMIZE: Should we parallelize these, if so till how many batches? - err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) - if err != nil { - wf.log.Error("Failed to subscribe for conentTopics ", - zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), - zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) - } - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) + if err != nil { + wf.log.Error("Failed to subscribe for contentTopics ", + zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) } + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) } if len(failedContentTopics) > 0 { @@ -335,15 +333,15 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter return nil, err } - if !wf.subscriptions.Has(peerID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList()...) { + if !wf.subscriptions.Has(peerID, contentFilter) { return nil, errors.New("subscription does not exist") } return wf.subscriptions.NewSubscription(peerID, contentFilter), nil } -func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { - params := new(FilterUnsubscribeParameters) +func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) { + params := new(FilterSubscribeParameters) params.log = wf.log opts = append(DefaultUnsubscribeOptions(), opts...) for _, opt := range opts { @@ -418,21 +416,18 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte subscriptionDetail.Remove(contentFilter.ContentTopicsList()...) if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 { delete(subscriptionDetailList, subscriptionDetailID) - } else { - subscriptionDetailList[subscriptionDetailID] = subscriptionDetail + subscriptionDetail.closeC() } } if len(subscriptionDetailList) == 0 { delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic) - } else { - wf.subscriptions.items[peerID].subsPerPubsubTopic[contentFilter.PubsubTopic] = subscriptionDetailList } } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -456,12 +451,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co if err != nil { return nil, err } - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) for pTopic, cTopics := range pubSubTopicMap { - var cFilter ContentFilter - cFilter.PubsubTopic = pTopic - cFilter.ContentTopics = NewContentTopicSet(cTopics...) + cFilter := NewContentFilter(pTopic, cTopics...) + wf.log.Warn("cfilter", zap.Any("cf", cFilter)) for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue @@ -487,21 +480,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co params.wg.Done() } }() - - err := wf.request( - ctx, - &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, - pb.FilterSubscribeRequest_UNSUBSCRIBE, - cFilter) - if err != nil { - ferr, ok := err.(*FilterError) - if ok && ferr.Code == http.StatusNotFound { - wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err)) - } else { - wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) - return - } - } + err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter) if params.wg != nil { resultChan <- WakuFilterPushResult{ @@ -521,20 +500,54 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return resultChan, nil } -// Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +// UnsubscribeWithSubscription is used to close a particular subscription +// If there are no more subscriptions matching the passed [peer, contentFilter] pair, +// server unsubscribe is also performed +func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { return nil, err } - opts = append(opts, Peer(sub.PeerID)) + params, err := wf.getUnsubscribeParameters(opts...) + if err != nil { + return nil, err + } + + // Close this sub + sub.Close() + + resultChan := make(chan WakuFilterPushResult, 1) + + if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) { + // Last sub for this [peer, contentFilter] pair + err = wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: sub.PeerID, requestID: params.requestID}, sub.ContentFilter) + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: sub.PeerID, + } + } + close(resultChan) + return resultChan, err + +} + +func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error { + err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter) + if err != nil { + ferr, ok := err.(*FilterError) + if ok && ferr.Code == http.StatusNotFound { + wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", params.selectedPeer), zap.Error(err)) + } else { + wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", params.selectedPeer), zap.Error(err)) + } + } - return wf.Unsubscribe(ctx, sub.ContentFilter, opts...) + return err } -func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err @@ -590,7 +603,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index fb774cda0..5338f9a17 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -20,7 +20,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/exp/maps" ) func TestFilterSuite(t *testing.T) { @@ -110,7 +109,7 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { defer s.wg.Done() select { case env := <-ch: - s.Require().Equal(maps.Keys(s.contentFilter.ContentTopics)[0], env.Message().GetContentTopic()) + s.Require().Equal(s.contentFilter.ContentTopicsList()[0], env.Message().GetContentTopic()) case <-time.After(5 * time.Second): s.Require().Fail("Message timeout") case <-s.ctx.Done(): @@ -128,8 +127,10 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) go func() { defer s.wg.Done() select { - case <-ch: - s.Require().Fail("should not receive another message") + case _, ok := <-ch: + if ok { + s.Require().Fail("should not receive another message") + } case <-time.After(1 * time.Second): // Timeout elapsed, all good case <-s.ctx.Done(): @@ -216,10 +217,44 @@ func (s *FilterTestSuite) TestWakuFilter() { s.publishMsg(s.testTopic, "TopicB", "second") }, s.subDetails[0].C) - _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) + _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) - time.Sleep(1 * time.Second) + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "third") + }, s.subDetails[0].C) + + // Two new subscriptions with same [peer, contentFilter] + s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + secondSub := s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) + + // Assert that we have 2 subscriptions now + s.Require().Equal(len(s.lightNode.Subscriptions()), 2) + + // Should be received on both subscriptions + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "first") + }, s.subDetails[0].C) + + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "first") + }, secondSub[0].C) + + // Unsubscribe from second sub only + _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, secondSub[0]) + s.Require().NoError(err) + + // Should still receive + s.waitForMsg(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "first") + }, s.subDetails[0].C) + + // Unsubscribe from first sub only + _, err = s.lightNode.UnsubscribeWithSubscription(s.ctx, s.subDetails[0]) + s.Require().NoError(err) + + s.Require().Equal(len(s.lightNode.Subscriptions()), 0) // Should not receive after unsubscribe s.waitForTimeout(func() { @@ -441,7 +476,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.publishMsg(s.testTopic, "TopicB", "second") }, s.subDetails[0].C) - _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) + _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) time.Sleep(1 * time.Second) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 6999a0928..9b23db1d7 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -15,18 +15,16 @@ import ( type ( FilterSubscribeParameters struct { - host host.Host selectedPeer peer.ID - pm *peermanager.PeerManager requestID []byte log *zap.Logger - } - FilterUnsubscribeParameters struct { + // Subscribe-specific + host host.Host + pm *peermanager.PeerManager + + // Unsubscribe-specific unsubscribeAll bool - selectedPeer peer.ID - requestID []byte - log *zap.Logger wg *sync.WaitGroup } @@ -37,8 +35,7 @@ type ( Option func(*FilterParameters) - FilterSubscribeOption func(*FilterSubscribeParameters) - FilterUnsubscribeOption func(*FilterUnsubscribeParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) ) func WithTimeout(timeout time.Duration) Option { @@ -89,7 +86,7 @@ func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Fi } // WithRequestID is an option to set a specific request ID to be used when -// creating a filter subscription +// creating/removing a filter subscription func WithRequestID(requestID []byte) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.requestID = requestID @@ -111,51 +108,31 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption { } } -func UnsubscribeAll() FilterUnsubscribeOption { - return func(params *FilterUnsubscribeParameters) { +func UnsubscribeAll() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { params.unsubscribeAll = true } } -func Peer(p peer.ID) FilterUnsubscribeOption { - return func(params *FilterUnsubscribeParameters) { - params.selectedPeer = p - } -} - -// RequestID is an option to set a specific request ID to be used when -// removing a subscription from a filter node -func RequestID(requestID []byte) FilterUnsubscribeOption { - return func(params *FilterUnsubscribeParameters) { - params.requestID = requestID - } -} - -func AutomaticRequestID() FilterUnsubscribeOption { - return func(params *FilterUnsubscribeParameters) { - params.requestID = protocol.GenerateRequestID() - } -} - -// WithWaitGroup allos specigying a waitgroup to wait until all +// WithWaitGroup allows specifying a waitgroup to wait until all // unsubscribe requests are complete before the function is complete -func WithWaitGroup(wg *sync.WaitGroup) FilterUnsubscribeOption { - return func(params *FilterUnsubscribeParameters) { +func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { params.wg = wg } } // DontWait is used to fire and forget an unsubscription, and don't // care about the results of it -func DontWait() FilterUnsubscribeOption { - return func(params *FilterUnsubscribeParameters) { +func DontWait() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { params.wg = nil } } -func DefaultUnsubscribeOptions() []FilterUnsubscribeOption { - return []FilterUnsubscribeOption{ - AutomaticRequestID(), +func DefaultUnsubscribeOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ + WithAutomaticRequestID(), WithWaitGroup(&sync.WaitGroup{}), } } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index 4bea34ee8..28ad20b2c 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -17,6 +17,7 @@ func TestFilterOption(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) + // subscribe options options := []FilterSubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), @@ -34,13 +35,14 @@ func TestFilterOption(t *testing.T) { require.Equal(t, host, params.host) require.NotNil(t, params.selectedPeer) - options2 := []FilterUnsubscribeOption{ - AutomaticRequestID(), + // Unsubscribe options + options2 := []FilterSubscribeOption{ + WithAutomaticRequestID(), UnsubscribeAll(), - Peer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), + WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), } - params2 := new(FilterUnsubscribeParameters) + params2 := new(FilterSubscribeParameters) for _, opt := range options2 { opt(params2) diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/filter/subscriptions_map.go index bcd516270..35ccc5aff 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/filter/subscriptions_map.go @@ -84,34 +84,8 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool { return ok } -func (sub *SubscriptionsMap) Get(peerID peer.ID, cf ContentFilter) *SubscriptionDetails { - sub.RLock() - defer sub.RUnlock() - - // Check if peer exits - peerSubscription, ok := sub.items[peerID] - if !ok { - return nil - } - - // Check if pubsub topic exists - subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] - if !ok { - return nil - } - - // Check if the content topic exists within the list of subscriptions for this peer - for _, subscription := range subscriptions { - if maps.Equal(subscription.ContentFilter.ContentTopics, cf.ContentTopics) { - return subscription - } - } - - return nil -} - // Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided -func (sub *SubscriptionsMap) Has(peerID peer.ID, pubsubTopic string, contentTopics ...string) bool { +func (sub *SubscriptionsMap) Has(peerID peer.ID, cf ContentFilter) bool { sub.RLock() defer sub.RUnlock() @@ -122,13 +96,13 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, pubsubTopic string, contentTopi } //TODO: Handle pubsubTopic as null // Check if pubsub topic exists - subscriptions, ok := peerSubscription.subsPerPubsubTopic[pubsubTopic] + subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] if !ok { return false } // Check if the content topic exists within the list of subscriptions for this peer - for _, ct := range contentTopics { + for _, ct := range cf.ContentTopicsList() { found := false for _, subscription := range subscriptions { _, exists := subscription.ContentFilter.ContentTopics[ct]