Skip to content

Commit

Permalink
fix: parallelize filter subs to different peers
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Jul 26, 2024
1 parent a9be17f commit f7adb0a
Showing 1 changed file with 28 additions and 14 deletions.
42 changes: 28 additions & 14 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,21 +407,35 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot

paramsCopy := params.Copy()
paramsCopy.selectedPeers = selectedPeers
for _, peer := range selectedPeers {
err := wf.request(
ctx,
params.requestID,
pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
peer)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
var wg sync.WaitGroup
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
tmpSubs := make([]*subscription.SubscriptionDetails, len(selectedPeers))
for i, peerID := range selectedPeers {
wg.Add(1)
go func(index int, ID peer.ID) {
defer wg.Done()
err := wf.request(
reqCtx,
params.requestID,
pb.FilterSubscribeRequest_SUBSCRIBE,
cFilter,
ID)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
} else {
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", ID))
tmpSubs[index] = wf.subscriptions.NewSubscription(ID, cFilter)
}
}(i, peerID)
}
wg.Wait()
for _, sub := range tmpSubs {
if sub != nil {
subscriptions = append(subscriptions, sub)
}
wf.log.Debug("subscription successful", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), zap.Stringer("peer", peer))
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(peer, cFilter))
}
}

Expand Down

0 comments on commit f7adb0a

Please sign in to comment.