From 6b30855f564d5cbe90bcaa9ef981d7bd70d3dafc Mon Sep 17 00:00:00 2001
From: pablo
Date: Tue, 10 Dec 2024 14:54:32 +0200
Subject: [PATCH] chore_: bump go-waku fixes on peers and concurrent access
---
go.mod | 2 +-
go.sum | 4 +-
.../waku/v2/api/filter/filter_manager.go | 5 ++-
.../waku/v2/api/missing/missing_messages.go | 42 +++++++++++++++----
.../waku/v2/peermanager/peer_manager.go | 26 ++++--------
.../go-waku/waku/v2/protocol/filter/client.go | 14 ++++++-
.../v2/protocol/filter/filter_health_check.go | 2 +-
.../v2/protocol/lightpush/waku_lightpush.go | 5 +++
.../go-waku/waku/v2/protocol/store/client.go | 4 ++
vendor/modules.txt | 2 +-
10 files changed, 72 insertions(+), 34 deletions(-)
diff --git a/go.mod b/go.mod
index 567f093e6ad..8ed069f69da 100644
--- a/go.mod
+++ b/go.mod
@@ -97,7 +97,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
- github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71
+ github.com/waku-org/go-waku v0.8.1-0.20241210120804-9a243696d77f
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1
diff --git a/go.sum b/go.sum
index d7b5baeee35..a2858b6152f 100644
--- a/go.sum
+++ b/go.sum
@@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
-github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71 h1:P9sQncEeeBqBRQEtiLdgQe5oWcTlAV5IVA5VGMqGslc=
-github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI=
+github.com/waku-org/go-waku v0.8.1-0.20241210120804-9a243696d77f h1:PIQzUgCdDAtoi1q4C6qNshHSAPtkgxLb9S5jujeJFmg=
+github.com/waku-org/go-waku v0.8.1-0.20241210120804-9a243696d77f/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go
index a43c3c3963c..665d577bd0f 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go
@@ -61,7 +61,8 @@ type EnevelopeProcessor interface {
OnNewEnvelope(env *protocol.Envelope) error
}
-func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
+func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int,
+ envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
@@ -162,6 +163,7 @@ func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
defer utils.LogOnPanic()
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
+
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
@@ -188,6 +190,7 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
+ mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go
index 72ac4f9f355..927ffb9c931 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/missing/missing_messages.go
@@ -35,6 +35,7 @@ type MessageTracker interface {
// MissingMessageVerifier is used to periodically retrieve missing messages from store nodes that have some specific criteria
type MissingMessageVerifier struct {
ctx context.Context
+ cancel context.CancelFunc
params missingMessageVerifierParams
storenodeRequestor common.StorenodeRequestor
@@ -43,10 +44,12 @@ type MissingMessageVerifier struct {
criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
criteriaInterestMu sync.RWMutex
- C <-chan *protocol.Envelope
+ C chan *protocol.Envelope
- timesource timesource.Timesource
- logger *zap.Logger
+ timesource timesource.Timesource
+ logger *zap.Logger
+ isRunning bool
+ runningMutex sync.RWMutex
}
// NewMissingMessageVerifier creates an instance of a MissingMessageVerifier
@@ -63,6 +66,8 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
messageTracker: messageTracker,
logger: logger.Named("missing-msg-verifier"),
params: params,
+ criteriaInterest: make(map[string]criteriaInterest),
+ C: make(chan *protocol.Envelope, 1000),
}
}
@@ -97,12 +102,24 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt
m.criteriaInterest[contentFilter.PubsubTopic] = criteriaInterest
}
+func (m *MissingMessageVerifier) setRunning(running bool) {
+ m.runningMutex.Lock()
+ defer m.runningMutex.Unlock()
+ m.isRunning = running
+}
+
func (m *MissingMessageVerifier) Start(ctx context.Context) {
- m.ctx = ctx
- m.criteriaInterest = make(map[string]criteriaInterest)
+ m.runningMutex.Lock()
+ if m.isRunning { //make sure verifier only runs once.
+ m.runningMutex.Unlock()
+ return
+ }
+ m.isRunning = true
+ m.runningMutex.Unlock()
- c := make(chan *protocol.Envelope, 1000)
- m.C = c
+ ctx, cancelFunc := context.WithCancel(ctx)
+ m.ctx = ctx
+ m.cancel = cancelFunc
go func() {
defer utils.LogOnPanic()
@@ -123,24 +140,33 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) {
for _, interest := range critIntList {
select {
case <-ctx.Done():
+ m.setRunning(false)
return
default:
semaphore <- struct{}{}
go func(interest criteriaInterest) {
defer utils.LogOnPanic()
- m.fetchHistory(c, interest)
+ m.fetchHistory(m.C, interest)
<-semaphore
}(interest)
}
}
case <-ctx.Done():
+ m.setRunning(false)
return
}
}
}()
}
+func (m *MissingMessageVerifier) Stop() {
+ m.cancel()
+ m.runningMutex.Lock()
+ defer m.runningMutex.Unlock()
+ m.isRunning = false
+}
+
func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) {
contentTopics := interest.contentFilter.ContentTopics.ToList()
for i := 0; i < len(contentTopics); i += maxContentTopicsPerRequest {
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go
index c543cbe8e30..69a0b23c11f 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/peermanager/peer_manager.go
@@ -102,7 +102,6 @@ const maxFailedAttempts = 5
const prunePeerStoreInterval = 10 * time.Minute
const peerConnectivityLoopSecs = 15
const maxConnsToPeerRatio = 3
-const badPeersCleanupInterval = 1 * time.Minute
const maxDialFailures = 2
// 80% relay peers 20% service peers
@@ -258,14 +257,13 @@ func (pm *PeerManager) Start(ctx context.Context) {
}
}
-func (pm *PeerManager) removeBadPeers() {
- if !pm.RelayEnabled {
- for _, peerID := range pm.host.Peerstore().Peers() {
- if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures {
- //delete peer from peerStore
- pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
- pm.RemovePeer(peerID)
- }
+func (pm *PeerManager) CheckAndRemoveBadPeer(peerID peer.ID) {
+ if pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) > maxDialFailures &&
+ pm.peerConnector.onlineChecker.IsOnline() {
+ if origin, _ := pm.host.Peerstore().(wps.WakuPeerstore).Origin(peerID); origin != wps.Static { // delete only if a peer is discovered and not configured statically.
+ //delete peer from peerStore
+ pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
+ pm.RemovePeer(peerID)
}
}
}
@@ -273,17 +271,13 @@ func (pm *PeerManager) removeBadPeers() {
func (pm *PeerManager) peerStoreLoop(ctx context.Context) {
defer utils.LogOnPanic()
t := time.NewTicker(prunePeerStoreInterval)
- t1 := time.NewTicker(badPeersCleanupInterval)
defer t.Stop()
- defer t1.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
pm.prunePeerStore()
- case <-t1.C:
- pm.removeBadPeers()
}
}
}
@@ -749,6 +743,7 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
if err == nil || errors.Is(err, context.Canceled) {
return
}
+
if pm.peerConnector != nil {
pm.peerConnector.addConnectionBackoff(peerID)
}
@@ -762,9 +757,4 @@ func (pm *PeerManager) HandleDialError(err error, peerID peer.ID) {
pm.logger.Error("failed to emit DialError", zap.Error(emitterErr))
}
}
- if !pm.RelayEnabled && pm.host.Peerstore().(wps.WakuPeerstore).ConnFailures(peerID) >= maxDialFailures {
- //delete peer from peerStore
- pm.logger.Debug("removing bad peer due to recurring dial failures", zap.Stringer("peerID", peerID))
- pm.RemovePeer(peerID)
- }
}
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
index 3d81048d667..8fbcd91c138 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
@@ -15,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
+ "github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
@@ -267,6 +268,10 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
wf.metrics.RecordError(dialFailure)
if wf.pm != nil {
wf.pm.HandleDialError(err, peerID)
+ if errors.Is(err, swarm.ErrAllDialsFailed) ||
+ errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
+ wf.pm.CheckAndRemoveBadPeer(peerID)
+ }
}
return err
}
@@ -355,7 +360,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
if params.pm != nil && reqPeerCount > 0 {
wf.log.Debug("handleFilterSubscribeOptions", zap.Int("peerCount", reqPeerCount), zap.Int("excludePeersLen", len(params.peersToExclude)))
- params.selectedPeers, err = wf.pm.SelectPeers(
+ selectedPeers, err := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
@@ -368,7 +373,12 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
)
if err != nil {
wf.log.Error("peer selection returned err", zap.Error(err))
- return nil, nil, err
+ if len(params.selectedPeers) == 0 {
+ return nil, nil, err
+ }
+ }
+ if len(selectedPeers) > 0 {
+ params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
wf.log.Debug("handleFilterSubscribeOptions exit", zap.Int("selectedPeerCount", len(params.selectedPeers)))
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go
index 126090d9939..7bdd1569489 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/filter_health_check.go
@@ -24,7 +24,7 @@ func (wf *WakuFilterLightNode) PingPeer(peer peer.ID) {
ctxWithTimeout, cancel := context.WithTimeout(wf.CommonService.Context(), PingTimeout)
defer cancel()
err := wf.Ping(ctxWithTimeout, peer)
- if err != nil {
+ if err != nil && wf.onlineChecker.IsOnline() {
wf.log.Warn("Filter ping failed towards peer", zap.Stringer("peer", peer), zap.Error(err))
//quickly retry ping again before marking subscription as failure
//Note that PingTimeout is a fraction of PingInterval so this shouldn't cause parallel pings being sent.
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
index 7e411a4ac89..c6bed8c2d49 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
@@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
+ "github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
@@ -198,6 +199,10 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
wakuLP.metrics.RecordError(dialFailure)
if wakuLP.pm != nil {
wakuLP.pm.HandleDialError(err, peerID)
+ if errors.Is(err, swarm.ErrAllDialsFailed) ||
+ errors.Is(err, swarm.ErrDialBackoff) || errors.Is(err, swarm.ErrNoAddresses) {
+ wakuLP.pm.CheckAndRemoveBadPeer(peerID)
+ }
}
return nil, err
}
diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go
index febb863e508..efb6448b97c 100644
--- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go
+++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/store/client.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
+ "sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
@@ -73,6 +74,7 @@ type WakuStore struct {
defaultRatelimit rate.Limit
rateLimiters map[peer.ID]*rate.Limiter
+ rateLimitersMux sync.Mutex
}
// NewWakuStore is used to instantiate a StoreV3 client
@@ -297,11 +299,13 @@ func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRe
logger.Debug("sending store request")
if !params.skipRatelimit {
+ s.rateLimitersMux.Lock()
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
if !ok {
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
s.rateLimiters[params.selectedPeer] = rateLimiter
}
+ s.rateLimitersMux.Unlock()
err := rateLimiter.Wait(ctx)
if err != nil {
return nil, err
diff --git a/vendor/modules.txt b/vendor/modules.txt
index eece9e4f735..800eb59ae90 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
-# github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71
+# github.com/waku-org/go-waku v0.8.1-0.20241210120804-9a243696d77f
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests