diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index c525144f0..23a57f344 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -64,8 +64,8 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. if options.Filter.Enable { cf := filter.ContentFilter{ - Topic: relay.DefaultWakuTopic, - ContentTopics: []string{options.ContentTopic}, + PubsubTopic: relay.DefaultWakuTopic, + ContentTopics: filter.NewContentTopicSet(options.ContentTopic), } var filterOpt filter.FilterSubscribeOption peerID, err := options.Filter.NodePeerID() diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 382723ab5..c35584ea9 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -98,7 +98,7 @@ func main() { // Send FilterRequest from light node to full node cf := filter.ContentFilter{ - ContentTopics: []string{contentTopic}, + ContentTopics: filter.NewContentTopicSet(contentTopic), } theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf) diff --git a/flake.nix b/flake.nix index 5aa9e6bcb..46f1d588a 100644 --- a/flake.nix +++ b/flake.nix @@ -29,7 +29,7 @@ ]; doCheck = false; # FIXME: This needs to be manually changed when updating modules. - vendorSha256 = "sha256-eS/4YnNv2yGR+tVMq6xfx0Ntq8WosV+pTrbOb3mNYaA="; + vendorSha256 = "sha256-4xChSKAkwwrFp5/ZMnhtvsR4drVfw1cLE3YXwVHeW0A="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; }; diff --git a/library/filter.go b/library/filter.go index f450917ca..a66a6fbef 100644 --- a/library/filter.go +++ b/library/filter.go @@ -11,7 +11,7 @@ import ( ) type filterArgument struct { - Topic string `json:"pubsubTopic,omitempty"` + PubsubTopic string `json:"pubsubTopic,omitempty"` ContentTopics []string `json:"contentTopics,omitempty"` } @@ -23,8 +23,8 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) { } return filter.ContentFilter{ - Topic: f.Topic, - ContentTopics: f.ContentTopics, + PubsubTopic: f.PubsubTopic, + ContentTopics: filter.NewContentTopicSet(f.ContentTopics...), }, nil } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index d89e4c0a8..a07469a3a 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/exp/maps" ) // FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow @@ -49,8 +50,12 @@ type WakuFilterLightNode struct { // ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding) // If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm type ContentFilter struct { - Topic string - ContentTopics []string + PubsubTopic string + ContentTopics ContentTopicSet +} + +func (cf ContentFilter) ContentTopicsList() []string { + return maps.Keys(cf.ContentTopics) } type WakuFilterPushResult struct { @@ -186,8 +191,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr request := &pb.FilterSubscribeRequest{ RequestId: hex.EncodeToString(params.requestID), FilterSubscribeType: reqType, - PubsubTopic: &contentFilter.Topic, - ContentTopics: contentFilter.ContentTopics, + PubsubTopic: &contentFilter.PubsubTopic, + ContentTopics: contentFilter.ContentTopicsList(), } wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) @@ -235,11 +240,11 @@ func getPubSubTopicFromContentTopic(cTopicString string) (string, error) { func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) { pubSubTopicMap := make(map[string][]string) - if contentFilter.Topic != "" { - pubSubTopicMap[contentFilter.Topic] = contentFilter.ContentTopics + if contentFilter.PubsubTopic != "" { + pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() } else { //Parse the content-Topics to figure out shards. - for _, cTopicString := range contentFilter.ContentTopics { + for _, cTopicString := range contentFilter.ContentTopicsList() { pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString) if err != nil { return nil, err @@ -297,18 +302,24 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont subscriptions := make([]*SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var cFilter ContentFilter - cFilter.Topic = pubSubTopic - cFilter.ContentTopics = cTopics - //TO OPTIMIZE: Should we parallelize these, if so till how many batches? - err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) - if err != nil { - wf.log.Error("Failed to subscribe for conentTopics ", - zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), - zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) + cFilter.PubsubTopic = pubSubTopic + cFilter.ContentTopics = NewContentTopicSet(cTopics...) + existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter) + if existingSub != nil { + subscriptions = append(subscriptions, existingSub) + } else { + //TO OPTIMIZE: Should we parallelize these, if so till how many batches? + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) + if err != nil { + wf.log.Error("Failed to subscribe for conentTopics ", + zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + } + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) } - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter.Topic, cFilter.ContentTopics)) } + if len(failedContentTopics) > 0 { return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) } else { @@ -324,11 +335,11 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter return nil, err } - if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) { + if !wf.subscriptions.Has(peerID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList()...) { return nil, errors.New("subscription does not exist") } - return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil + return wf.subscriptions.NewSubscription(peerID, contentFilter), nil } func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { @@ -379,8 +390,8 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { var output []*SubscriptionDetails for _, peerSubscription := range wf.subscriptions.items { - for _, subscriptionPerTopic := range peerSubscription.subscriptionsPerTopic { - for _, subscriptionDetail := range subscriptionPerTopic { + for _, subscriptions := range peerSubscription.subsPerPubsubTopic { + for _, subscriptionDetail := range subscriptions { output = append(output, subscriptionDetail) } } @@ -398,14 +409,14 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte return } - subscriptionDetailList, ok := peerSubscription.subscriptionsPerTopic[contentFilter.Topic] + subscriptionDetailList, ok := peerSubscription.subsPerPubsubTopic[contentFilter.PubsubTopic] if !ok { return } for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList { - subscriptionDetail.Remove(contentFilter.ContentTopics...) - if len(subscriptionDetail.ContentTopics) == 0 { + subscriptionDetail.Remove(contentFilter.ContentTopicsList()...) + if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 { delete(subscriptionDetailList, subscriptionDetailID) } else { subscriptionDetailList[subscriptionDetailID] = subscriptionDetail @@ -413,9 +424,9 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte } if len(subscriptionDetailList) == 0 { - delete(wf.subscriptions.items[peerID].subscriptionsPerTopic, contentFilter.Topic) + delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic) } else { - wf.subscriptions.items[peerID].subscriptionsPerTopic[contentFilter.Topic] = subscriptionDetailList + wf.subscriptions.items[peerID].subsPerPubsubTopic[contentFilter.PubsubTopic] = subscriptionDetailList } } @@ -449,8 +460,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) for pTopic, cTopics := range pubSubTopicMap { var cFilter ContentFilter - cFilter.Topic = pTopic - cFilter.ContentTopics = cTopics + cFilter.PubsubTopic = pTopic + cFilter.ContentTopics = NewContentTopicSet(cTopics...) for peerID := range wf.subscriptions.items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue @@ -462,7 +473,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co } wf.cleanupSubscriptions(peerID, cFilter) - if len(subscriptions.subscriptionsPerTopic) == 0 { + if len(subscriptions.subsPerPubsubTopic) == 0 { delete(wf.subscriptions.items, peerID) } @@ -518,14 +529,9 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, return nil, err } - var contentTopics []string - for k := range sub.ContentTopics { - contentTopics = append(contentTopics, k) - } - opts = append(opts, Peer(sub.PeerID)) - return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.PubsubTopic, ContentTopics: contentTopics}, opts...) + return wf.Unsubscribe(ctx, sub.ContentFilter, opts...) } func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 0cbed10b6..fb774cda0 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -20,6 +20,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + "golang.org/x/exp/maps" ) func TestFilterSuite(t *testing.T) { @@ -109,7 +110,7 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) { defer s.wg.Done() select { case env := <-ch: - s.Require().Equal(s.contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + s.Require().Equal(maps.Keys(s.contentFilter.ContentTopics)[0], env.Message().GetContentTopic()) case <-time.After(5 * time.Second): s.Require().Fail("Message timeout") case <-s.ctx.Done(): @@ -141,11 +142,8 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) s.wg.Wait() } -func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) []*SubscriptionDetails { - s.contentFilter = ContentFilter{ - Topic: string(topic), - ContentTopics: []string{contentTopic}, - } +func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails { + s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)} subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) s.Require().NoError(err) @@ -340,10 +338,7 @@ func (s *FilterTestSuite) TestRunningGuard() { s.lightNode.Stop() - contentFilter := ContentFilter{ - Topic: "test", - ContentTopics: []string{"test"}, - } + contentFilter := ContentFilter{"test", NewContentTopicSet("test")} _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) @@ -359,10 +354,7 @@ func (s *FilterTestSuite) TestRunningGuard() { func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { - contentFilter := ContentFilter{ - Topic: "test", - ContentTopics: []string{"test"}, - } + contentFilter := ContentFilter{"test", NewContentTopicSet("test")} _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) @@ -477,8 +469,8 @@ func (s *FilterTestSuite) TestAutoShard() { }, s.subDetails[0].C) _, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{ - Topic: s.testTopic, - ContentTopics: []string{newContentTopic}, + PubsubTopic: s.testTopic, + ContentTopics: NewContentTopicSet(newContentTopic), }) s.Require().NoError(err) diff --git a/waku/v2/protocol/filter/subscribers_map.go b/waku/v2/protocol/filter/subscribers_map.go index 4145a741f..b77d1332d 100644 --- a/waku/v2/protocol/filter/subscribers_map.go +++ b/waku/v2/protocol/filter/subscribers_map.go @@ -14,6 +14,14 @@ var ErrNotFound = errors.New("not found") type ContentTopicSet map[string]struct{} +func NewContentTopicSet(contentTopics ...string) ContentTopicSet { + s := make(ContentTopicSet, len(contentTopics)) + for _, ct := range contentTopics { + s[ct] = struct{}{} + } + return s +} + type PeerSet map[peer.ID]struct{} type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics diff --git a/waku/v2/protocol/filter/subscribers_map_test.go b/waku/v2/protocol/filter/subscribers_map_test.go index 1bbf6cc1f..b4a0d6b88 100644 --- a/waku/v2/protocol/filter/subscribers_map_test.go +++ b/waku/v2/protocol/filter/subscribers_map_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" ) -const TOPIC = "/test/topic" +const PUBSUB_TOPIC = "/test/topic" func createPeerID(t *testing.T) peer.ID { peerId, err := test.RandPeerID() @@ -29,23 +29,23 @@ func TestAppend(t *testing.T) { subs := NewSubscribersMap(5 * time.Second) peerId := createPeerID(t) - subs.Set(peerId, TOPIC, []string{"topic1"}) + subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1"}) - sub := firstSubscriber(subs, TOPIC, "topic1") + sub := firstSubscriber(subs, PUBSUB_TOPIC, "topic1") assert.NotEmpty(t, sub) // Adding to existing peer - subs.Set(peerId, TOPIC, []string{"topic2"}) + subs.Set(peerId, PUBSUB_TOPIC, []string{"topic2"}) - sub = firstSubscriber(subs, TOPIC, "topic2") + sub = firstSubscriber(subs, PUBSUB_TOPIC, "topic2") assert.NotEmpty(t, sub) - subs.Set(peerId, TOPIC+"2", []string{"topic1"}) + subs.Set(peerId, PUBSUB_TOPIC+"2", []string{"topic1"}) - sub = firstSubscriber(subs, TOPIC+"2", "topic1") + sub = firstSubscriber(subs, PUBSUB_TOPIC+"2", "topic1") assert.NotEmpty(t, sub) - sub = firstSubscriber(subs, TOPIC+"2", "topic2") + sub = firstSubscriber(subs, PUBSUB_TOPIC+"2", "topic2") assert.Empty(t, sub) } @@ -53,19 +53,19 @@ func TestRemove(t *testing.T) { subs := NewSubscribersMap(5 * time.Second) peerId := createPeerID(t) - subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"}) - subs.Set(peerId, TOPIC+"2", []string{"topic1"}) + subs.Set(peerId, PUBSUB_TOPIC+"1", []string{"topic1", "topic2"}) + subs.Set(peerId, PUBSUB_TOPIC+"2", []string{"topic1"}) err := subs.DeleteAll(peerId) assert.Empty(t, err) - sub := firstSubscriber(subs, TOPIC+"1", "topic1") + sub := firstSubscriber(subs, PUBSUB_TOPIC+"1", "topic1") assert.Empty(t, sub) - sub = firstSubscriber(subs, TOPIC+"1", "topic2") + sub = firstSubscriber(subs, PUBSUB_TOPIC+"1", "topic2") assert.Empty(t, sub) - sub = firstSubscriber(subs, TOPIC+"2", "topic1") + sub = firstSubscriber(subs, PUBSUB_TOPIC+"2", "topic1") assert.Empty(t, sub) assert.False(t, subs.Has(peerId)) @@ -81,11 +81,11 @@ func TestRemovePartial(t *testing.T) { subs := NewSubscribersMap(5 * time.Second) peerId := createPeerID(t) - subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) - err := subs.Delete(peerId, TOPIC, []string{"topic1"}) + subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"}) + err := subs.Delete(peerId, PUBSUB_TOPIC, []string{"topic1"}) require.NoError(t, err) - sub := firstSubscriber(subs, TOPIC, "topic2") + sub := firstSubscriber(subs, PUBSUB_TOPIC, "topic2") assert.NotEmpty(t, sub) } @@ -93,13 +93,13 @@ func TestRemoveBogus(t *testing.T) { subs := NewSubscribersMap(5 * time.Second) peerId := createPeerID(t) - subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) - err := subs.Delete(peerId, TOPIC, []string{"does not exist", "topic1"}) + subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"}) + err := subs.Delete(peerId, PUBSUB_TOPIC, []string{"does not exist", "topic1"}) require.NoError(t, err) - sub := firstSubscriber(subs, TOPIC, "topic1") + sub := firstSubscriber(subs, PUBSUB_TOPIC, "topic1") assert.Empty(t, sub) - sub = firstSubscriber(subs, TOPIC, "does not exist") + sub = firstSubscriber(subs, PUBSUB_TOPIC, "does not exist") assert.Empty(t, sub) err = subs.Delete(peerId, "DOES_NOT_EXIST", []string{"topic1"}) @@ -110,7 +110,7 @@ func TestSuccessFailure(t *testing.T) { subs := NewSubscribersMap(5 * time.Second) peerId := createPeerID(t) - subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) + subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"}) subs.FlagAsFailure(peerId) require.True(t, subs.IsFailedPeer(peerId)) @@ -118,7 +118,7 @@ func TestSuccessFailure(t *testing.T) { subs.FlagAsFailure(peerId) require.False(t, subs.Has(peerId)) - subs.Set(peerId, TOPIC, []string{"topic1", "topic2"}) + subs.Set(peerId, PUBSUB_TOPIC, []string{"topic1", "topic2"}) subs.FlagAsFailure(peerId) require.True(t, subs.IsFailedPeer(peerId)) diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/filter/subscriptions_map.go index 167a2bccc..bcd516270 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/filter/subscriptions_map.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" + "golang.org/x/exp/maps" ) type SubscriptionDetails struct { @@ -19,16 +20,16 @@ type SubscriptionDetails struct { once sync.Once PeerID peer.ID - PubsubTopic string - ContentTopics map[string]struct{} + ContentFilter ContentFilter C chan *protocol.Envelope } +// Map of SubscriptionDetails.ID to subscriptions type SubscriptionSet map[string]*SubscriptionDetails type PeerSubscription struct { - peerID peer.ID - subscriptionsPerTopic map[string]SubscriptionSet + peerID peer.ID + subsPerPubsubTopic map[string]SubscriptionSet } type SubscriptionsMap struct { @@ -44,38 +45,33 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap { } } -func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails { +func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) *SubscriptionDetails { sub.Lock() defer sub.Unlock() peerSubscription, ok := sub.items[peerID] if !ok { peerSubscription = &PeerSubscription{ - peerID: peerID, - subscriptionsPerTopic: make(map[string]SubscriptionSet), + peerID: peerID, + subsPerPubsubTopic: make(map[string]SubscriptionSet), } sub.items[peerID] = peerSubscription } - _, ok = peerSubscription.subscriptionsPerTopic[topic] + _, ok = peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] if !ok { - peerSubscription.subscriptionsPerTopic[topic] = make(SubscriptionSet) + peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] = make(SubscriptionSet) } details := &SubscriptionDetails{ ID: uuid.NewString(), mapRef: sub, PeerID: peerID, - PubsubTopic: topic, C: make(chan *protocol.Envelope, 1024), - ContentTopics: make(map[string]struct{}), + ContentFilter: ContentFilter{cf.PubsubTopic, maps.Clone(cf.ContentTopics)}, } - for _, ct := range contentTopics { - details.ContentTopics[ct] = struct{}{} - } - - sub.items[peerID].subscriptionsPerTopic[topic][details.ID] = details + sub.items[peerID].subsPerPubsubTopic[cf.PubsubTopic][details.ID] = details return details } @@ -88,7 +84,34 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool { return ok } -func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ...string) bool { +func (sub *SubscriptionsMap) Get(peerID peer.ID, cf ContentFilter) *SubscriptionDetails { + sub.RLock() + defer sub.RUnlock() + + // Check if peer exits + peerSubscription, ok := sub.items[peerID] + if !ok { + return nil + } + + // Check if pubsub topic exists + subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] + if !ok { + return nil + } + + // Check if the content topic exists within the list of subscriptions for this peer + for _, subscription := range subscriptions { + if maps.Equal(subscription.ContentFilter.ContentTopics, cf.ContentTopics) { + return subscription + } + } + + return nil +} + +// Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided +func (sub *SubscriptionsMap) Has(peerID peer.ID, pubsubTopic string, contentTopics ...string) bool { sub.RLock() defer sub.RUnlock() @@ -99,7 +122,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ... } //TODO: Handle pubsubTopic as null // Check if pubsub topic exists - subscriptions, ok := peerSubscription.subscriptionsPerTopic[topic] + subscriptions, ok := peerSubscription.subsPerPubsubTopic[pubsubTopic] if !ok { return false } @@ -108,7 +131,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ... for _, ct := range contentTopics { found := false for _, subscription := range subscriptions { - _, exists := subscription.ContentTopics[ct] + _, exists := subscription.ContentFilter.ContentTopics[ct] if exists { found = true break @@ -121,7 +144,6 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ... return true } - func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.Lock() defer sub.Unlock() @@ -131,7 +153,7 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { return ErrNotFound } - delete(peerSubscription.subscriptionsPerTopic[subscription.PubsubTopic], subscription.ID) + delete(peerSubscription.subsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID) return nil } @@ -141,7 +163,7 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) { defer s.Unlock() for _, ct := range contentTopics { - s.ContentTopics[ct] = struct{}{} + s.ContentFilter.ContentTopics[ct] = struct{}{} } } @@ -150,7 +172,7 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) { defer s.Unlock() for _, ct := range contentTopics { - delete(s.ContentTopics, ct) + delete(s.ContentFilter.ContentTopics, ct) } } @@ -178,21 +200,16 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails { mapRef: s.mapRef, Closed: false, PeerID: s.PeerID, - PubsubTopic: s.PubsubTopic, - ContentTopics: make(map[string]struct{}), + ContentFilter: ContentFilter{s.ContentFilter.PubsubTopic, maps.Clone(s.ContentFilter.ContentTopics)}, C: make(chan *protocol.Envelope), } - for k := range s.ContentTopics { - result.ContentTopics[k] = struct{}{} - } - return result } func (sub *SubscriptionsMap) clear() { for _, peerSubscription := range sub.items { - for _, subscriptionSet := range peerSubscription.subscriptionsPerTopic { + for _, subscriptionSet := range peerSubscription.subsPerPubsubTopic { for _, subscription := range subscriptionSet { subscription.closeC() } @@ -212,7 +229,7 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) sub.RLock() defer sub.RUnlock() - subscriptions, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()] + subscriptions, ok := sub.items[peerID].subsPerPubsubTopic[envelope.PubsubTopic()] if ok { iterateSubscriptionSet(sub.logger, subscriptions, envelope) } @@ -224,7 +241,7 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e subscription.RLock() defer subscription.RUnlock() - _, ok := subscription.ContentTopics[envelope.Message().ContentTopic] + _, ok := subscription.ContentFilter.ContentTopics[envelope.Message().ContentTopic] if !ok { // only send the msg to subscriptions that have matching contentTopic return } @@ -249,10 +266,10 @@ func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { result := resultType{ PeerID: s.PeerID.Pretty(), - PubsubTopic: s.PubsubTopic, + PubsubTopic: s.ContentFilter.PubsubTopic, } - for c := range s.ContentTopics { + for c := range s.ContentFilter.ContentTopics { result.ContentTopics = append(result.ContentTopics, c) } diff --git a/waku/v2/protocol/filter/subscriptions_map_test.go b/waku/v2/protocol/filter/subscriptions_map_test.go index 436dc139a..382280bb8 100644 --- a/waku/v2/protocol/filter/subscriptions_map_test.go +++ b/waku/v2/protocol/filter/subscriptions_map_test.go @@ -15,23 +15,23 @@ import ( func TestSubscriptionMapAppend(t *testing.T) { fmap := NewSubscriptionMap(utils.Logger()) peerID := createPeerID(t) - contentTopics := []string{"ct1", "ct2"} + contentTopics := NewContentTopicSet("ct1", "ct2") - sub := fmap.NewSubscription(peerID, TOPIC, contentTopics) - _, found := sub.ContentTopics["ct1"] + sub := fmap.NewSubscription(peerID, ContentFilter{PUBSUB_TOPIC, contentTopics}) + _, found := sub.ContentFilter.ContentTopics["ct1"] require.True(t, found) - _, found = sub.ContentTopics["ct2"] + _, found = sub.ContentFilter.ContentTopics["ct2"] require.True(t, found) require.False(t, sub.Closed) require.Equal(t, sub.PeerID, peerID) - require.Equal(t, sub.PubsubTopic, TOPIC) + require.Equal(t, sub.ContentFilter.PubsubTopic, PUBSUB_TOPIC) sub.Add("ct3") - _, found = sub.ContentTopics["ct3"] + _, found = sub.ContentFilter.ContentTopics["ct3"] require.True(t, found) sub.Remove("ct3") - _, found = sub.ContentTopics["ct3"] + _, found = sub.ContentFilter.ContentTopics["ct3"] require.False(t, found) err := sub.Close() @@ -44,12 +44,12 @@ func TestSubscriptionMapAppend(t *testing.T) { func TestSubscriptionClear(t *testing.T) { fmap := NewSubscriptionMap(utils.Logger()) - contentTopics := []string{"ct1", "ct2"} + contentTopics := NewContentTopicSet("ct1", "ct2") var subscriptions = []*SubscriptionDetails{ - fmap.NewSubscription(createPeerID(t), TOPIC+"1", contentTopics), - fmap.NewSubscription(createPeerID(t), TOPIC+"2", contentTopics), - fmap.NewSubscription(createPeerID(t), TOPIC+"3", contentTopics), + fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "1", contentTopics}), + fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "2", contentTopics}), + fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "3", contentTopics}), } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -84,9 +84,9 @@ func TestSubscriptionsNotify(t *testing.T) { p1 := createPeerID(t) p2 := createPeerID(t) var subscriptions = []*SubscriptionDetails{ - fmap.NewSubscription(p1, TOPIC+"1", []string{"ct1", "ct2"}), - fmap.NewSubscription(p2, TOPIC+"1", []string{"ct1"}), - fmap.NewSubscription(p1, TOPIC+"2", []string{"ct1", "ct2"}), + fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1", "ct2")}), + fmap.NewSubscription(p2, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1")}), + fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "2", NewContentTopicSet("ct1", "ct2")}), } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -140,7 +140,7 @@ func TestSubscriptionsNotify(t *testing.T) { go failOnReceive(ctx, 2) time.Sleep(200 * time.Millisecond) - envTopic1Ct1 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 0), 0, TOPIC+"1") + envTopic1Ct1 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 0), 0, PUBSUB_TOPIC+"1") wg.Add(1) go func() { defer wg.Done() @@ -164,7 +164,7 @@ func TestSubscriptionsNotify(t *testing.T) { go failOnReceive(ctx, 2) time.Sleep(200 * time.Millisecond) - envTopic1Ct2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct2", 0), 0, TOPIC+"1") + envTopic1Ct2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct2", 0), 0, PUBSUB_TOPIC+"1") wg.Add(1) go func() { defer wg.Done() @@ -193,7 +193,7 @@ func TestSubscriptionsNotify(t *testing.T) { go failOnReceive(ctx, 2) time.Sleep(200 * time.Millisecond) - envTopic1Ct1_2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 1), 1, TOPIC+"1") + envTopic1Ct1_2 := protocol.NewEnvelope(tests.CreateWakuMessage("ct1", 1), 1, PUBSUB_TOPIC+"1") wg.Add(1) go func() {