Skip to content

Commit

Permalink
Merge branch 'master' into feat/autoshard-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Sep 19, 2023
2 parents 36ca48d + 81638fe commit f43d6ac
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 131 deletions.
4 changes: 2 additions & 2 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.

if options.Filter.Enable {
cf := filter.ContentFilter{
Topic: relay.DefaultWakuTopic,
ContentTopics: []string{options.ContentTopic},
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: filter.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
Expand Down
2 changes: 1 addition & 1 deletion examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func main() {

// Send FilterRequest from light node to full node
cf := filter.ContentFilter{
ContentTopics: []string{contentTopic},
ContentTopics: filter.NewContentTopicSet(contentTopic),
}

theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf)
Expand Down
2 changes: 1 addition & 1 deletion flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
];
doCheck = false;
# FIXME: This needs to be manually changed when updating modules.
vendorSha256 = "sha256-eS/4YnNv2yGR+tVMq6xfx0Ntq8WosV+pTrbOb3mNYaA=";
vendorSha256 = "sha256-4xChSKAkwwrFp5/ZMnhtvsR4drVfw1cLE3YXwVHeW0A=";
# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
};
Expand Down
6 changes: 3 additions & 3 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

type filterArgument struct {
Topic string `json:"pubsubTopic,omitempty"`
PubsubTopic string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
}

Expand All @@ -23,8 +23,8 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
}

return filter.ContentFilter{
Topic: f.Topic,
ContentTopics: f.ContentTopics,
PubsubTopic: f.PubsubTopic,
ContentTopics: filter.NewContentTopicSet(f.ContentTopics...),
}, nil
}

Expand Down
76 changes: 41 additions & 35 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow
Expand All @@ -49,8 +50,12 @@ type WakuFilterLightNode struct {
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
Topic string
ContentTopics []string
PubsubTopic string
ContentTopics ContentTopicSet
}

func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}

type WakuFilterPushResult struct {
Expand Down Expand Up @@ -186,8 +191,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.Topic,
ContentTopics: contentFilter.ContentTopics,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}

wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
Expand Down Expand Up @@ -235,11 +240,11 @@ func getPubSubTopicFromContentTopic(cTopicString string) (string, error) {
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string)

if contentFilter.Topic != "" {
pubSubTopicMap[contentFilter.Topic] = contentFilter.ContentTopics
if contentFilter.PubsubTopic != "" {
pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList()
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopics {
for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, err
Expand Down Expand Up @@ -297,18 +302,24 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
subscriptions := make([]*SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter
cFilter.Topic = pubSubTopic
cFilter.ContentTopics = cTopics
//TO OPTIMIZE: Should we parallelize these, if so till how many batches?
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe for conentTopics ",
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter)
if existingSub != nil {
subscriptions = append(subscriptions, existingSub)
} else {
//TO OPTIMIZE: Should we parallelize these, if so till how many batches?
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe for conentTopics ",
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter.Topic, cFilter.ContentTopics))
}

if len(failedContentTopics) > 0 {
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
Expand All @@ -324,11 +335,11 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
return nil, err
}

if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) {
if !wf.subscriptions.Has(peerID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList()...) {
return nil, errors.New("subscription does not exist")
}

return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
}

func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
Expand Down Expand Up @@ -379,8 +390,8 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
var output []*SubscriptionDetails

for _, peerSubscription := range wf.subscriptions.items {
for _, subscriptionPerTopic := range peerSubscription.subscriptionsPerTopic {
for _, subscriptionDetail := range subscriptionPerTopic {
for _, subscriptions := range peerSubscription.subsPerPubsubTopic {
for _, subscriptionDetail := range subscriptions {
output = append(output, subscriptionDetail)
}
}
Expand All @@ -398,24 +409,24 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte
return
}

subscriptionDetailList, ok := peerSubscription.subscriptionsPerTopic[contentFilter.Topic]
subscriptionDetailList, ok := peerSubscription.subsPerPubsubTopic[contentFilter.PubsubTopic]
if !ok {
return
}

for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList {
subscriptionDetail.Remove(contentFilter.ContentTopics...)
if len(subscriptionDetail.ContentTopics) == 0 {
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
delete(subscriptionDetailList, subscriptionDetailID)
} else {
subscriptionDetailList[subscriptionDetailID] = subscriptionDetail
}
}

if len(subscriptionDetailList) == 0 {
delete(wf.subscriptions.items[peerID].subscriptionsPerTopic, contentFilter.Topic)
delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic)
} else {
wf.subscriptions.items[peerID].subscriptionsPerTopic[contentFilter.Topic] = subscriptionDetailList
wf.subscriptions.items[peerID].subsPerPubsubTopic[contentFilter.PubsubTopic] = subscriptionDetailList
}

}
Expand Down Expand Up @@ -449,8 +460,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
for pTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter
cFilter.Topic = pTopic
cFilter.ContentTopics = cTopics
cFilter.PubsubTopic = pTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
for peerID := range wf.subscriptions.items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
Expand All @@ -462,7 +473,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
}

wf.cleanupSubscriptions(peerID, cFilter)
if len(subscriptions.subscriptionsPerTopic) == 0 {
if len(subscriptions.subsPerPubsubTopic) == 0 {
delete(wf.subscriptions.items, peerID)
}

Expand Down Expand Up @@ -518,14 +529,9 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
return nil, err
}

var contentTopics []string
for k := range sub.ContentTopics {
contentTopics = append(contentTopics, k)
}

opts = append(opts, Peer(sub.PeerID))

return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.PubsubTopic, ContentTopics: contentTopics}, opts...)
return wf.Unsubscribe(ctx, sub.ContentFilter, opts...)
}

func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
Expand Down
24 changes: 8 additions & 16 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

func TestFilterSuite(t *testing.T) {
Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
defer s.wg.Done()
select {
case env := <-ch:
s.Require().Equal(s.contentFilter.ContentTopics[0], env.Message().GetContentTopic())
s.Require().Equal(maps.Keys(s.contentFilter.ContentTopics)[0], env.Message().GetContentTopic())
case <-time.After(5 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
Expand Down Expand Up @@ -141,11 +142,8 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
s.wg.Wait()
}

func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) []*SubscriptionDetails {
s.contentFilter = ContentFilter{
Topic: string(topic),
ContentTopics: []string{contentTopic},
}
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails {
s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)}

subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
Expand Down Expand Up @@ -340,10 +338,7 @@ func (s *FilterTestSuite) TestRunningGuard() {

s.lightNode.Stop()

contentFilter := ContentFilter{
Topic: "test",
ContentTopics: []string{"test"},
}
contentFilter := ContentFilter{"test", NewContentTopicSet("test")}

_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))

Expand All @@ -359,10 +354,7 @@ func (s *FilterTestSuite) TestRunningGuard() {

func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {

contentFilter := ContentFilter{
Topic: "test",
ContentTopics: []string{"test"},
}
contentFilter := ContentFilter{"test", NewContentTopicSet("test")}

_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
Expand Down Expand Up @@ -477,8 +469,8 @@ func (s *FilterTestSuite) TestAutoShard() {

}, s.subDetails[0].C)
_, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{
Topic: s.testTopic,
ContentTopics: []string{newContentTopic},
PubsubTopic: s.testTopic,
ContentTopics: NewContentTopicSet(newContentTopic),
})
s.Require().NoError(err)

Expand Down
8 changes: 8 additions & 0 deletions waku/v2/protocol/filter/subscribers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ var ErrNotFound = errors.New("not found")

type ContentTopicSet map[string]struct{}

func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
for _, ct := range contentTopics {
s[ct] = struct{}{}
}
return s
}

type PeerSet map[peer.ID]struct{}

type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics
Expand Down
Loading

0 comments on commit f43d6ac

Please sign in to comment.