Skip to content

Commit

Permalink
fix: use bad peer removal logic only for lightpush and filter and opt…
Browse files Browse the repository at this point in the history
…ion to restart missing message verifier (#1244)
  • Loading branch information
chaitanyaprem authored Dec 9, 2024
1 parent 6550ff3 commit 809dba5
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 30 deletions.
5 changes: 4 additions & 1 deletion waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}

func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
Expand Down Expand Up @@ -162,6 +163,7 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}

sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
Expand All @@ -188,6 +190,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
Expand Down
42 changes: 34 additions & 8 deletions waku/v2/api/missing/missing_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type MessageTracker interface {
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
type MissingMessageVerifier struct {
ctx context.Context
cancel context.CancelFunc
params missingMessageVerifierParams

storenodeRequestor common.StorenodeRequestor
Expand All @@ -43,10 +44,12 @@ type MissingMessageVerifier struct {
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.RWMutex

C <-chan *protocol.Envelope
C chan *protocol.Envelope

timesource timesource.Timesource
logger *zap.Logger
timesource timesource.Timesource
logger *zap.Logger
isRunning bool
runningMutex sync.RWMutex
}

// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
Expand All @@ -63,6 +66,8 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
messageTracker: messageTracker,
logger: logger.Named("missing-msg-verifier"),
params: params,
criteriaInterest: make(map[string]criteriaInterest),
C: make(chan *protocol.Envelope, 1000),
}
}

Expand Down Expand Up @@ -97,12 +102,24 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest
}

func (m *MissingMessageVerifier) setRunning(running bool) {
m.runningMutex.Lock()
defer m.runningMutex.Unlock()
m.isRunning = running
}

func (m *MissingMessageVerifier) Start(ctx context.Context) {
m.ctx = ctx
m.criteriaInterest = make(map[string]criteriaInterest)
m.runningMutex.Lock()
if m.isRunning { //make sure verifier only runs once.
m.runningMutex.Unlock()
return
}
m.isRunning = true
m.runningMutex.Unlock()

c := make(chan *protocol.Envelope, 1000)
m.C = c
ctx, cancelFunc := context.WithCancel(ctx)
m.ctx = ctx
m.cancel = cancelFunc

go func() {
defer utils.LogOnPanic()
Expand All @@ -123,24 +140,33 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
for _, interest := range critIntList {
select {
case <-ctx.Done():
m.setRunning(false)
return
default:
semaphore <- struct{}{}
go func(interest criteriaInterest) {
defer utils.LogOnPanic()
m.fetchHistory(c, interest)
m.fetchHistory(m.C, interest)
<-semaphore
}(interest)
}
}

case <-ctx.Done():
m.setRunning(false)
return
}
}
}()
}

func (m *MissingMessageVerifier) Stop() {
m.cancel()
m.runningMutex.Lock()
defer m.runningMutex.Unlock()
m.isRunning = false
}

func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) {
contentTopics := interest.contentFilter.ContentTopics.ToList()
for i := 0; i < len(contentTopics); i += maxContentTopicsPerRequest {
Expand Down
26 changes: 8 additions & 18 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 3
const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2

// 80% relay peers 20% service peers
Expand Down Expand Up @@ -258,32 +257,27 @@ func (pm *PeerManager) Start(ctx context.Context) {
}
}

func (pm *PeerManager) removeBadPeers() {
if !pm.RelayEnabled {
for _, peerID := range pm.host.Peerstore().Peers() {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) {
if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures &&
pm.peerConnector.onlineChecker.IsOnline() {
if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically.
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
}

func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval)
t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop()
defer t1.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
case <-t1.C:
pm.removeBadPeers()
}
}
}
Expand Down Expand Up @@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) {
return
}

if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
Expand All @@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
//delete peer from peerStore
pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
pm.RemovePeer(peerID)
}
}
14 changes: 12 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -267,6 +268,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
wf.metrics.RecordError(dialFailure)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wf.pm.CheckAndRemoveBadPeer(peerID)
}
}
return err
}
Expand Down Expand Up @@ -355,7 +360,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
if params.pm != nil && reqPeerCount > 0 {

wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
params.selectedPeers, err = wf.pm.SelectPeers(
selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
Expand All @@ -368,7 +373,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
return nil, nil, err
if len(params.selectedPeers) == 0 {
return nil, nil, err
}
}
if len(selectedPeers) > 0 {
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/filter_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel()
err := wf.Ping(ctxWithTimeout, peer)
if err != nil {
if err != nil && wf.onlineChecker.IsOnline() {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure
//Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
Expand Down
5 changes: 5 additions & 0 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
Expand Down Expand Up @@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
wakuLP.metrics.RecordError(dialFailure)
if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID)
if errors.Is(err, swarm.ErrAllDialsFailed) ||
errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
wakuLP.pm.CheckAndRemoveBadPeer(peerID)
}
}
return nil, err
}
Expand Down

0 comments on commit 809dba5

Please sign in to comment.