From bb3fff3ad9b8383a8c072c5317e6fbbc9ed815ea Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 29 May 2024 14:31:33 +0530 Subject: [PATCH 1/4] fix: avoid closing channel during cleanup in order to avoid panic --- waku/v2/api/filter.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/waku/v2/api/filter.go b/waku/v2/api/filter.go index 7281b9158..09451aefe 100644 --- a/waku/v2/api/filter.go +++ b/waku/v2/api/filter.go @@ -75,6 +75,7 @@ func (apiSub *Sub) waitOnSubClose() { case subId := <-apiSub.closing: //trigger closing and resubscribe flow for subscription. apiSub.closeAndResubscribe(subId) + } } } @@ -89,13 +90,9 @@ func (apiSub *Sub) closeAndResubscribe(subId string) { } func (apiSub *Sub) cleanup() { - apiSub.log.Debug("ENTER cleanup()") - defer func() { - apiSub.log.Debug("EXIT cleanup()") - }() + apiSub.log.Debug("Cleaning up subscription", zap.Stringer("config", apiSub.Config)) for _, s := range apiSub.subs { - close(s.Closing) _, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s) if err != nil { //Logging with info as this is part of cleanup @@ -168,10 +165,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) { } }(subDetails) go func(subDetails *subscription.SubscriptionDetails) { - <-subDetails.Closing - apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) + select { + case <-apiSub.ctx.Done(): + return + case <-subDetails.Closing: + apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID)) - apiSub.closing <- subDetails.ID + apiSub.closing <- subDetails.ID + } }(subDetails) } } From 57378a59fe2044ae0d430550b1addebce3131e7b Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 29 May 2024 16:19:40 +0530 Subject: [PATCH 2/4] fix: use bool channel to avoid double close race condition --- waku/v2/protocol/filter/filter_health_check.go | 2 +- waku/v2/protocol/subscription/subscription_details.go | 3 ++- waku/v2/protocol/subscription/subscriptions_map.go | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go index 11b9a7200..872aabeaa 100644 --- a/waku/v2/protocol/filter/filter_health_check.go +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -28,7 +28,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) //Indicating that subscription is closing, - close(subscription.Closing) + subscription.Closing <- true } } } diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index f2ec88706..be9ae1e31 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -29,7 +29,7 @@ type SubscriptionDetails struct { mapRef *SubscriptionsMap Closed bool `json:"-"` once sync.Once - Closing chan struct{} + Closing chan bool PeerID peer.ID `json:"peerID"` ContentFilter protocol.ContentFilter `json:"contentFilters"` @@ -99,6 +99,7 @@ func (s *SubscriptionDetails) CloseC() { defer s.Unlock() s.Closed = true close(s.C) + close(s.Closing) }) } diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index c308d9bba..927826121 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -75,7 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content PeerID: peerID, C: make(chan *protocol.Envelope, 1024), ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, - Closing: make(chan struct{}), + Closing: make(chan bool), } // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair From 6d7f8f86c32f9a67388e74fe6a2d59499b5d5203 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Mon, 3 Jun 2024 15:25:47 +0530 Subject: [PATCH 3/4] fix: local channel close to avoid panics --- waku/v2/protocol/filter/filter_health_check.go | 3 +-- waku/v2/protocol/subscription/subscription_details.go | 8 ++++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go index 872aabeaa..836175b53 100644 --- a/waku/v2/protocol/filter/filter_health_check.go +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -26,9 +26,8 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer) for _, subscription := range subscriptions { wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID)) - //Indicating that subscription is closing, - subscription.Closing <- true + subscription.SetClosing() } } } diff --git a/waku/v2/protocol/subscription/subscription_details.go b/waku/v2/protocol/subscription/subscription_details.go index be9ae1e31..4ce928db0 100644 --- a/waku/v2/protocol/subscription/subscription_details.go +++ b/waku/v2/protocol/subscription/subscription_details.go @@ -108,6 +108,14 @@ func (s *SubscriptionDetails) Close() error { return s.mapRef.Delete(s) } +func (s *SubscriptionDetails) SetClosing() { + s.Lock() + defer s.Unlock() + if !s.Closed { + s.Closing <- true + } +} + func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { result := struct { PeerID peer.ID `json:"peerID"` From 234ab6ad67140d311d9905f50a36f0569983ba29 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 5 Jun 2024 21:44:18 +0530 Subject: [PATCH 4/4] fix: remove stale entry in subscription map and update filter ping interval (#1119) --- waku/v2/protocol/filter/client.go | 2 +- waku/v2/protocol/subscription/subscriptions_map.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index edbba8d3f..3186ac499 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -88,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM wf.pm = pm wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) - wf.peerPingInterval = 5 * time.Second + wf.peerPingInterval = 1 * time.Minute return wf } diff --git a/waku/v2/protocol/subscription/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go index 927826121..14b3680c2 100644 --- a/waku/v2/protocol/subscription/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -147,6 +147,11 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic) } + if len(peerSubscription.SubsPerPubsubTopic) == 0 { + sub.logger.Debug("no more subs for peer", zap.Stringer("id", subscription.PeerID)) + delete(sub.items, subscription.PeerID) + } + return nil }