From 7d12cd0fb91e73363d7619901d1fa9e8caf2c3ab Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 10 Nov 2023 15:41:34 +0530 Subject: [PATCH] connect to the store peer that is added via API --- waku/v2/node/wakunode2.go | 2 +- waku/v2/peermanager/peer_manager.go | 10 +++++++++- waku/v2/peermanager/peer_manager_test.go | 14 +++++++------- waku/v2/protocol/store/waku_store_client.go | 3 +-- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0bb611f73..c50cb0a8b 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -713,7 +713,7 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro // AddPeer is used to add a peer and the protocols it support to the node peerstore // TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics. func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { - return w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...) + return w.peermanager.AddPeer(address, origin, pubSubTopics, false, protocols...) } // AddDiscoveredPeer to add a discovered peer to the node peerStore diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 18e0f4048..430a2760c 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -420,7 +420,7 @@ func (pm *PeerManager) addPeer(ID peer.ID, addrs []ma.Multiaddr, origin wps.Orig } // AddPeer adds peer to the peerStore and also to service slots -func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) { +func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, connectNow bool, protocols ...protocol.ID) (peer.ID, error) { //Assuming all addresses have peerId info, err := peer.AddrInfoFromP2pAddr(address) if err != nil { @@ -438,6 +438,14 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTo return "", err } + if connectNow { + go pm.peerConnector.PushToChan(service.PeerData{ + Origin: origin, + AddrInfo: *info, + PubSubTopics: pubSubTopics, + }) + } + return info.ID, nil } diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 5b2890930..4af828cef 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -71,7 +71,7 @@ func TestServiceSlots(t *testing.T) { // add h2 peer to peer manager t.Log(h2.ID()) - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol)) require.NoError(t, err) /////////////// @@ -84,7 +84,7 @@ func TestServiceSlots(t *testing.T) { require.Equal(t, peerID, h2.ID()) // add h3 peer to peer manager - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol)) require.NoError(t, err) // check that returned peer is h2 or h3 peer @@ -108,7 +108,7 @@ func TestServiceSlots(t *testing.T) { require.Error(t, err, ErrNoPeersAvailable) // add h4 peer for protocol1 - _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1)) + _, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, false, libp2pProtocol.ID(protocol1)) require.NoError(t, err) //Test peer selection for protocol1 @@ -134,10 +134,10 @@ func TestPeerSelection(t *testing.T) { defer h3.Close() protocol := libp2pProtocol.ID("test/protocol") - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/2/rs/2/1", "/waku/2/rs/2/2"}, false, libp2pProtocol.ID(protocol)) require.NoError(t, err) - _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, libp2pProtocol.ID(protocol)) + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/2/rs/2/1"}, false, libp2pProtocol.ID(protocol)) require.NoError(t, err) _, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol}) @@ -176,7 +176,7 @@ func TestDefaultProtocol(t *testing.T) { defer h5.Close() //Test peer selection for relay protocol from peer store - _, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, relay.WakuRelayID_v200) + _, err = pm.AddPeer(getAddr(h5), wps.Static, []string{""}, false, relay.WakuRelayID_v200) require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. @@ -197,7 +197,7 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { require.NoError(t, err) defer h6.Close() - _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) + _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, false, protocol2) require.NoError(t, err) peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2}) diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 137c85a81..4b83efd5d 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -281,12 +281,11 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR //Add Peer to peerstore. if store.pm != nil && params.peerAddr != nil { - peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4) + peerId, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, true, StoreID_v20beta4) if err != nil { return nil, err } params.selectedPeer = peerId - } if store.pm != nil && params.selectedPeer == "" { var err error