From a37169687facd3bf36a4c23d0696c072c12453a1 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Tue, 5 Nov 2024 23:09:28 +0530 Subject: [PATCH] fix: use bad peer removal logic only for lightpush and filter --- waku/v2/api/filter/filter_manager.go | 1 + waku/v2/peermanager/peer_manager.go | 26 ++++++------------- waku/v2/protocol/filter/client.go | 5 ++++ .../v2/protocol/filter/filter_health_check.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 5 ++++ 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 942ac6852..b85f10c66 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -197,6 +197,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online + mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index c543cbe8e..69a0b23c1 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -102,7 +102,6 @@ const maxFailedAttempts = 5 const prunePeerStoreInterval = 10 * time.Minute const peerConnectivityLoopSecs = 15 const maxConnsToPeerRatio = 3 -const badPeersCleanupInterval = 1 * time.Minute const maxDialFailures = 2 // 80% relay peers 20% service peers @@ -258,14 +257,13 @@ func (pm *PeerManager) Start(ctx context.Context) { } } -func (pm *PeerManager) removeBadPeers() { - if !pm.RelayEnabled { - for _, peerID := range pm.host.Peerstore().Peers() { - if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures { - //delete peer from peerStore - pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) - pm.RemovePeer(peerID) - } +func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) { + if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures && + pm.peerConnector.onlineChecker.IsOnline() { + if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically. + //delete peer from peerStore + pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) + pm.RemovePeer(peerID) } } } @@ -273,17 +271,13 @@ func (pm *PeerManager) removeBadPeers() { func (pm *PeerManager) peerStoreLoop(ctx context.Context) { defer utils.LogOnPanic() t := time.NewTicker(prunePeerStoreInterval) - t1 := time.NewTicker(badPeersCleanupInterval) defer t.Stop() - defer t1.Stop() for { select { case <-ctx.Done(): return case <-t.C: pm.prunePeerStore() - case <-t1.C: - pm.removeBadPeers() } } } @@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { if err == nil || errors.Is(err, context.Canceled) { return } + if pm.peerConnector != nil { pm.peerConnector.addConnectionBackoff(peerID) } @@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) { pm.logger.Error("failed to emit DialError", zap.Error(emitterErr)) } } - if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures { - //delete peer from peerStore - pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID)) - pm.RemovePeer(peerID) - } } diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 878b0d48d..a53e99dc9 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -15,6 +15,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" @@ -249,6 +250,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, wf.metrics.RecordError(dialFailure) if wf.pm != nil { wf.pm.HandleDialError(err, peerID) + if errors.Is(err, swarm.ErrAllDialsFailed) || + errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) { + wf.pm.CheckAndRemoveBadPeer(peerID) + } } return err } diff --git a/waku/v2/protocol/filter/filter_health_check.go b/waku/v2/protocol/filter/filter_health_check.go index 126090d99..7bdd15694 100644 --- a/waku/v2/protocol/filter/filter_health_check.go +++ b/waku/v2/protocol/filter/filter_health_check.go @@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) { ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout) defer cancel() err := wf.Ping(ctxWithTimeout, peer) - if err != nil { + if err != nil && wf.onlineChecker.IsOnline() { wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err)) //quickly retry ping again before marking subscription as failure //Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent. diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 9d6744315..a97007073 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -13,6 +13,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/net/swarm" "github.com/libp2p/go-msgio/pbio" "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" @@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p wakuLP.metrics.RecordError(dialFailure) if wakuLP.pm != nil { wakuLP.pm.HandleDialError(err, peerID) + if errors.Is(err, swarm.ErrAllDialsFailed) || + errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) { + wakuLP.pm.CheckAndRemoveBadPeer(peerID) + } } return nil, err }