From fc540c780ea44de80ecdc52e25113fd9093cc8fa Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 28 Oct 2024 10:55:21 -0400 Subject: [PATCH] chore: use AddrInfo instead of ID to match nwaku's libwaku ping function --- waku/v2/api/common/pinger.go | 8 +++++--- waku/v2/api/history/cycle.go | 29 +++++++++++++++++------------ waku/v2/api/history/sort.go | 2 +- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/waku/v2/api/common/pinger.go b/waku/v2/api/common/pinger.go index ba8c26a21..f99895dd4 100644 --- a/waku/v2/api/common/pinger.go +++ b/waku/v2/api/common/pinger.go @@ -6,11 +6,12 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) type Pinger interface { - PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) + PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) } type defaultPingImpl struct { @@ -23,8 +24,9 @@ func NewDefaultPinger(host host.Host) Pinger { } } -func (d *defaultPingImpl) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) { - pingResultCh := ping.Ping(ctx, d.host, peerID) +func (d *defaultPingImpl) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) { + d.host.Peerstore().AddAddrs(peerInfo.ID, peerInfo.Addrs, peerstore.AddressTTL) + pingResultCh := ping.Ping(ctx, d.host, peerInfo.ID) select { case <-ctx.Done(): return 0, ctx.Err() diff --git a/waku/v2/api/history/cycle.go b/waku/v2/api/history/cycle.go index 9404579bf..eb51ba040 100644 --- a/waku/v2/api/history/cycle.go +++ b/waku/v2/api/history/cycle.go @@ -45,8 +45,8 @@ type peerStatus struct { type StorenodeConfigProvider interface { UseStorenodes() (bool, error) - GetPinnedStorenode() (peer.ID, error) - Storenodes() ([]peer.ID, error) + GetPinnedStorenode() (peer.AddrInfo, error) + Storenodes() ([]peer.AddrInfo, error) } type StorenodeCycle struct { @@ -104,7 +104,7 @@ func (m *StorenodeCycle) connectToNewStorenodeAndWait(ctx context.Context) error } // If no pinned storenode, no need to disconnect and wait for it to be available - if pinnedStorenode == "" { + if pinnedStorenode.ID == "" { m.disconnectActiveStorenode(graylistBackoff) } @@ -180,21 +180,26 @@ func poolSize(fleetSize int) int { return int(math.Ceil(float64(fleetSize) / 4)) } -func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.ID) []peer.ID { +func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, allStorenodes []peer.AddrInfo) []peer.AddrInfo { + peerIDToInfo := make(map[peer.ID]peer.AddrInfo) + for _, p := range allStorenodes { + peerIDToInfo[p.ID] = p + } + availableStorenodes := make(map[peer.ID]time.Duration) availableStorenodesMutex := sync.Mutex{} availableStorenodesWg := sync.WaitGroup{} for _, storenode := range allStorenodes { availableStorenodesWg.Add(1) - go func(peerID peer.ID) { + go func(peerInfo peer.AddrInfo) { defer availableStorenodesWg.Done() ctx, cancel := context.WithTimeout(ctx, 4*time.Second) defer cancel() - rtt, err := m.pinger.PingPeer(ctx, peerID) + rtt, err := m.pinger.PingPeer(ctx, peerInfo) if err == nil { // pinging storenodes might fail, but we don't care availableStorenodesMutex.Lock() - availableStorenodes[peerID] = rtt + availableStorenodes[peerInfo.ID] = rtt availableStorenodesMutex.Unlock() } }(storenode) @@ -209,7 +214,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, var sortedStorenodes []SortedStorenode for storenodeID, rtt := range availableStorenodes { sortedStorenode := SortedStorenode{ - Storenode: storenodeID, + Storenode: peerIDToInfo[storenodeID], RTT: rtt, } m.peersMutex.Lock() @@ -222,7 +227,7 @@ func (m *StorenodeCycle) getAvailableStorenodesSortedByRTT(ctx context.Context, } sort.Sort(byRTTMsAndCanConnectBefore(sortedStorenodes)) - result := make([]peer.ID, len(sortedStorenodes)) + result := make([]peer.AddrInfo, len(sortedStorenodes)) for i, s := range sortedStorenodes { result[i] = s.Storenode } @@ -252,8 +257,8 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { return err } - if pinnedStorenode != "" { - return m.setActiveStorenode(pinnedStorenode) + if pinnedStorenode.ID != "" { + return m.setActiveStorenode(pinnedStorenode.ID) } m.logger.Info("Finding a new storenode..") @@ -287,7 +292,7 @@ func (m *StorenodeCycle) findNewStorenode(ctx context.Context) error { } ms := allStorenodes[r.Int64()] - return m.setActiveStorenode(ms) + return m.setActiveStorenode(ms.ID) } func (m *StorenodeCycle) storenodeStatus(peerID peer.ID) connStatus { diff --git a/waku/v2/api/history/sort.go b/waku/v2/api/history/sort.go index 22e94c571..4f38941de 100644 --- a/waku/v2/api/history/sort.go +++ b/waku/v2/api/history/sort.go @@ -7,7 +7,7 @@ import ( ) type SortedStorenode struct { - Storenode peer.ID + Storenode peer.AddrInfo RTT time.Duration CanConnectAfter time.Time }