From f7adb0aabade01fe2657a8fe0ec92d5dfd40ca00 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 26 Jul 2024 16:27:43 +0530 Subject: [PATCH] fix: parallelize filter subs to different peers --- waku/v2/protocol/filter/client.go | 42 ++++++++++++++++++++----------- 1 file changed, 28 insertions(+), 14 deletions(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 52b4efa69..c52c90981 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -407,21 +407,35 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot paramsCopy := params.Copy() paramsCopy.selectedPeers = selectedPeers - for _, peer := range selectedPeers { - err := wf.request( - ctx, - params.requestID, - pb.FilterSubscribeRequest_SUBSCRIBE, - cFilter, - peer) - if err != nil { - wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), - zap.Error(err)) - failedContentTopics = append(failedContentTopics, cTopics...) - continue + var wg sync.WaitGroup + reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + tmpSubs := make([]*subscription.SubscriptionDetails, len(selectedPeers)) + for i, peerID := range selectedPeers { + wg.Add(1) + go func(index int, ID peer.ID) { + defer wg.Done() + err := wf.request( + reqCtx, + params.requestID, + pb.FilterSubscribeRequest_SUBSCRIBE, + cFilter, + ID) + if err != nil { + wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + } else { + wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID)) + tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter) + } + }(i, peerID) + } + wg.Wait() + for _, sub := range tmpSubs { + if sub != nil { + subscriptions = append(subscriptions, sub) } - wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer)) - subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter)) } }