Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: panic due to closed channel #1115

Merged
merged 7 commits into from
Jun 7, 2024
17 changes: 9 additions & 8 deletions waku/v2/api/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (apiSub *Sub) waitOnSubClose() {
case subId := <-apiSub.closing:
//trigger closing and resubscribe flow for subscription.
apiSub.closeAndResubscribe(subId)

}
}
}
Expand All @@ -89,13 +90,9 @@ func (apiSub *Sub) closeAndResubscribe(subId string) {
}

func (apiSub *Sub) cleanup() {
apiSub.log.Debug("ENTER cleanup()")
defer func() {
apiSub.log.Debug("EXIT cleanup()")
}()
apiSub.log.Debug("Cleaning up subscription", zap.Stringer("config", apiSub.Config))

for _, s := range apiSub.subs {
close(s.Closing)
_, err := apiSub.wf.UnsubscribeWithSubscription(apiSub.ctx, s)
if err != nil {
//Logging with info as this is part of cleanup
Expand Down Expand Up @@ -168,10 +165,14 @@ func (apiSub *Sub) multiplex(subs []*subscription.SubscriptionDetails) {
}
}(subDetails)
go func(subDetails *subscription.SubscriptionDetails) {
<-subDetails.Closing
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID))
select {
case <-apiSub.ctx.Done():
return
case <-subDetails.Closing:
apiSub.log.Debug("sub closing", zap.String("subID", subDetails.ID))

apiSub.closing <- subDetails.ID
apiSub.closing <- subDetails.ID
}
}(subDetails)
}
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
wf.pm = pm
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 5 * time.Second
wf.peerPingInterval = 1 * time.Minute
return wf
}

Expand Down
3 changes: 1 addition & 2 deletions waku/v2/protocol/filter/filter_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
subscriptions := wf.subscriptions.GetAllSubscriptionsForPeer(peer)
for _, subscription := range subscriptions {
wf.log.Debug("Notifying sub closing", zap.String("subID", subscription.ID))

//Indicating that subscription is closing,
close(subscription.Closing)
subscription.SetClosing()
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion waku/v2/protocol/subscription/subscription_details.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type SubscriptionDetails struct {
mapRef *SubscriptionsMap
Closed bool `json:"-"`
once sync.Once
Closing chan struct{}
Closing chan bool

PeerID peer.ID `json:"peerID"`
ContentFilter protocol.ContentFilter `json:"contentFilters"`
Expand Down Expand Up @@ -99,6 +99,7 @@ func (s *SubscriptionDetails) CloseC() {
defer s.Unlock()
s.Closed = true
close(s.C)
close(s.Closing)
})
}

Expand All @@ -107,6 +108,14 @@ func (s *SubscriptionDetails) Close() error {
return s.mapRef.Delete(s)
}

func (s *SubscriptionDetails) SetClosing() {
s.Lock()
defer s.Unlock()
if !s.Closed {
s.Closing <- true
}
}

func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) {
result := struct {
PeerID peer.ID `json:"peerID"`
Expand Down
7 changes: 6 additions & 1 deletion waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
PeerID: peerID,
C: make(chan *protocol.Envelope, 1024),
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
Closing: make(chan struct{}),
Closing: make(chan bool),
}

// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
Expand Down Expand Up @@ -147,6 +147,11 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic)
}

if len(peerSubscription.SubsPerPubsubTopic) == 0 {
sub.logger.Debug("no more subs for peer", zap.Stringer("id", subscription.PeerID))
delete(sub.items, subscription.PeerID)
}

return nil
}

Expand Down
Loading