From 2a58297713c889191a24015b870cc2fbdf2778fe Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 5 Mar 2024 21:07:43 +0800 Subject: [PATCH 1/8] test: peer exchange options --- .../peer_exchange/waku_peer_exchange_test.go | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 212683f23..e3a72b10c 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -2,6 +2,7 @@ package peer_exchange import ( "context" + "github.com/waku-org/go-waku/waku/v2/peermanager" "testing" "time" @@ -185,3 +186,51 @@ func TestRetrieveFilteredPeerExchangePeers(t *testing.T) { px1.Stop() px3.Stop() } + +func TestPeerExchangeOptions(t *testing.T) { + + // Prepare host1 + host1, _, prvKey1 := tests.CreateHost(t) + udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip1, _ := tests.ExtractIP(host1.Addrs()[0]) + l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + require.NoError(t, err) + discv5PeerConn1 := discv5.NewTestPeerDiscoverer() + d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) + require.NoError(t, err) + d1.SetHost(host1) + + // Mount peer exchange + pxPeerConn1 := discv5.NewTestPeerDiscoverer() + px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + require.NoError(t, err) + px1.SetHost(host1) + + // Test WithPeerAddr() + params := new(PeerExchangeParameters) + params.host = px1.h + params.log = px1.log + params.pm = px1.pm + + optList := DefaultOptions(px1.h) + optList = append(optList, WithPeerAddr(host1.Addrs()[0])) + for _, opt := range optList { + err := opt(params) + require.NoError(t, err) + } + + require.Equal(t, host1.Addrs()[0], params.peerAddr) + + // Test WithFastestPeerSelection() + optList = DefaultOptions(px1.h) + optList = append(optList, WithFastestPeerSelection(host1.ID())) + for _, opt := range optList { + err := opt(params) + require.NoError(t, err) + } + + require.Equal(t, peermanager.LowestRTT, params.peerSelectionType) + require.Equal(t, host1.ID(), params.preferredPeers[0]) + +} From 71878e3ee7eb1c60996b9ee35a1d3e9e1db772f4 Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 7 Mar 2024 21:13:16 +0800 Subject: [PATCH 2/8] test: peer exchange with PM and PeerAddr - with some debug info --- .../peer_exchange/waku_peer_exchange_test.go | 105 ++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index e3a72b10c..df6e5ad80 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -2,7 +2,14 @@ package peer_exchange import ( "context" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + "go.uber.org/zap" + "golang.org/x/exp/slices" "testing" "time" @@ -18,6 +25,26 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) +func createHostWithDiscv5(t *testing.T, bootnode ...*enode.Node) (host.Host, *discv5.DiscoveryV5) { + ps, err := pstoremem.NewPeerstore() + require.NoError(t, err) + wakuPeerStore := wps.NewWakuPeerstore(ps) + + host, _, prvKey := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore)) + + port, err := tests.FindFreePort(t, "127.0.0.1", 3) + require.NoError(t, err) + ip, _ := tests.ExtractIP(host.Addrs()[0]) + l, err := tests.NewLocalnode(prvKey, ip, port, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) + require.NoError(t, err) + discv5PeerConn1 := discv5.NewTestPeerDiscoverer() + d, err := discv5.NewDiscoveryV5(prvKey, l, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(port)), discv5.WithBootnodes(bootnode)) + require.NoError(t, err) + d.SetHost(host) + + return host, d +} + func TestRetrieveProvidePeerExchangePeers(t *testing.T) { // H1 host1, _, prvKey1 := tests.CreateHost(t) @@ -234,3 +261,81 @@ func TestPeerExchangeOptions(t *testing.T) { require.Equal(t, host1.ID(), params.preferredPeers[0]) } + +func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { + log := utils.Logger() + + // H1 + H2 with discovery on + host1, d1 := createHostWithDiscv5(t) + host2, d2 := createHostWithDiscv5(t, d1.Node()) + + // H3 + ps3, err := pstoremem.NewPeerstore() + require.NoError(t, err) + wakuPeerStore3 := wps.NewWakuPeerstore(ps3) + host3, _, _ := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore3)) + + defer d1.Stop() + defer d2.Stop() + defer host1.Close() + defer host2.Close() + defer host3.Close() + + err = d1.Start(context.Background()) + require.NoError(t, err) + + err = d2.Start(context.Background()) + require.NoError(t, err) + + // Prepare peer manager for host3 + pm3 := peermanager.NewPeerManager(10, 20, log) + pm3.SetHost(host3) + pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) + require.NoError(t, err) + pm3.SetPeerConnector(pxPeerConn3) + pm3.Start(context.Background()) + + // mount peer exchange + pxPeerConn1 := discv5.NewTestPeerDiscoverer() + px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + require.NoError(t, err) + px1.SetHost(host1) + + //pxPeerConn3 := discv5.NewTestPeerDiscoverer() + px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) + require.NoError(t, err) + px3.SetHost(host3) + + err = px1.Start(context.Background()) + require.NoError(t, err) + + err = px3.Start(context.Background()) + require.NoError(t, err) + + time.Sleep(30 * time.Second) + + log.Info("Host1 is", zap.String("peer", host1.ID().String())) + log.Info("Host2 is", zap.String("peer", host2.ID().String())) + log.Info("Host3 is", zap.String("peer", host3.ID().String())) + + for _, peer := range host3.Peerstore().Peers() { + log.Info("Host3 knows before", zap.String("peer", peer.String())) + } + require.False(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) + + // Construct multi address like example "/ip4/0.0.0.0/tcp/30304/p2p/16Uiu2HAmBu5zRFzBGAzzMAuGWhaxN2EwcbW7CzibELQELzisf192" + host1MultiAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String()) + require.NoError(t, err) + + log.Info("Connecting to peer", zap.String(host1MultiAddr.String(), "to provide 1 peer")) + err = px3.Request(context.Background(), 1, WithPeerAddr(host1MultiAddr)) + require.NoError(t, err) + + time.Sleep(30 * time.Second) + + for _, peer := range host3.Peerstore().Peers() { + log.Info("Host3 knows after", zap.String("peer", peer.String())) + } + require.True(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) + +} From f0cfb6a4880bb7048e18018d8b507ae24c75ad4f Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 14:07:33 +0800 Subject: [PATCH 3/8] fix: items from PR review - remove unnecessary client's start - remove pm3.SetPeerConnector() - pxPeerConn3 needs to start - lower waiting time - reduce logging --- .../peer_exchange/waku_peer_exchange_test.go | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index df6e5ad80..a659132c9 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -292,8 +292,8 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { pm3.SetHost(host3) pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) require.NoError(t, err) - pm3.SetPeerConnector(pxPeerConn3) - pm3.Start(context.Background()) + pxPeerConn3.SetHost(host3) + pxPeerConn3.Start(context.Background()) // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() @@ -309,18 +309,9 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { err = px1.Start(context.Background()) require.NoError(t, err) - err = px3.Start(context.Background()) - require.NoError(t, err) - - time.Sleep(30 * time.Second) + time.Sleep(5 * time.Second) - log.Info("Host1 is", zap.String("peer", host1.ID().String())) - log.Info("Host2 is", zap.String("peer", host2.ID().String())) - log.Info("Host3 is", zap.String("peer", host3.ID().String())) - - for _, peer := range host3.Peerstore().Peers() { - log.Info("Host3 knows before", zap.String("peer", peer.String())) - } + // Check peer store of the client doesn't contain peer2 before the request require.False(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) // Construct multi address like example "/ip4/0.0.0.0/tcp/30304/p2p/16Uiu2HAmBu5zRFzBGAzzMAuGWhaxN2EwcbW7CzibELQELzisf192" @@ -331,11 +322,9 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { err = px3.Request(context.Background(), 1, WithPeerAddr(host1MultiAddr)) require.NoError(t, err) - time.Sleep(30 * time.Second) + time.Sleep(5 * time.Second) - for _, peer := range host3.Peerstore().Peers() { - log.Info("Host3 knows after", zap.String("peer", peer.String())) - } + // Check peer store of the client does contain peer2 after the request require.True(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) } From ac28c93d101dc38cea49d8d3e162a490a18d8898 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 14:16:32 +0800 Subject: [PATCH 4/8] fix: sync test with renamed PeerExchangeRequestParameters --- waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index a659132c9..45b9a7626 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -235,7 +235,7 @@ func TestPeerExchangeOptions(t *testing.T) { px1.SetHost(host1) // Test WithPeerAddr() - params := new(PeerExchangeParameters) + params := new(PeerExchangeRequestParameters) params.host = px1.h params.log = px1.log params.pm = px1.pm From e19d84485648a7422d3f7a8f8251c8a8210e3caf Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 14:21:57 +0800 Subject: [PATCH 5/8] fix: add error check for pxPeerConn3.Start --- waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 45b9a7626..74cf7bab3 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -293,7 +293,8 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) require.NoError(t, err) pxPeerConn3.SetHost(host3) - pxPeerConn3.Start(context.Background()) + err = pxPeerConn3.Start(context.Background()) + require.NoError(t, err) // mount peer exchange pxPeerConn1 := discv5.NewTestPeerDiscoverer() From f7ac589046f76a6e86b1130a8c5ddc4b2ad42bdc Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 16:17:45 +0800 Subject: [PATCH 6/8] test: retrieve peers with peer manager only --- .../peer_exchange/waku_peer_exchange_test.go | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 74cf7bab3..e830aa28d 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -329,3 +329,80 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { require.True(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) } + +func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { + log := utils.Logger() + + // H1 + H2 + H4 with discovery on + host1, d1 := createHostWithDiscv5(t) + host2, d2 := createHostWithDiscv5(t, d1.Node()) + host4, d4 := createHostWithDiscv5(t, d1.Node()) + + // H3 + ps3, err := pstoremem.NewPeerstore() + require.NoError(t, err) + wakuPeerStore3 := wps.NewWakuPeerstore(ps3) + host3, _, _ := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore3)) + + defer d1.Stop() + defer d2.Stop() + defer d4.Stop() + defer host1.Close() + defer host2.Close() + defer host3.Close() + defer host4.Close() + + err = d1.Start(context.Background()) + require.NoError(t, err) + + err = d2.Start(context.Background()) + require.NoError(t, err) + + err = d4.Start(context.Background()) + require.NoError(t, err) + + // Prepare peer manager for host3 + pm3 := peermanager.NewPeerManager(10, 20, log) + pm3.SetHost(host3) + pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger()) + require.NoError(t, err) + pxPeerConn3.SetHost(host3) + err = pxPeerConn3.Start(context.Background()) + require.NoError(t, err) + + // Add peer exchange server to client's peer store + host3.Peerstore().AddAddrs(host1.ID(), host1.Addrs(), peerstore.PermanentAddrTTL) + err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1) + require.NoError(t, err) + + // mount peer exchange + pxPeerConn1 := discv5.NewTestPeerDiscoverer() + px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger()) + require.NoError(t, err) + px1.SetHost(host1) + + //pxPeerConn3 := discv5.NewTestPeerDiscoverer() + px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) + require.NoError(t, err) + px3.SetHost(host3) + + err = px1.Start(context.Background()) + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + // Check peer store of the client doesn't contain peer2 and peer4 before the request + require.False(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) + require.False(t, slices.Contains(host3.Peerstore().Peers(), host4.ID())) + + // Connect to PM to get peers + err = px3.Request(context.Background(), 2, WithAutomaticPeerSelection(host1.ID())) + require.NoError(t, err) + + time.Sleep(5 * time.Second) + + // Check peer store of the client does contain peer2 and peer4 after the request + require.True(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) + require.True(t, slices.Contains(host3.Peerstore().Peers(), host4.ID())) + +} From 726acc7caf4a546639f95ec7669b2a74cb652de4 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 21:04:16 +0800 Subject: [PATCH 7/8] test: RecordError() with Prometheus client --- .../peer_exchange/waku_peer_exchange_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index e830aa28d..02f3fc004 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/discv5" @@ -406,3 +407,25 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { require.True(t, slices.Contains(host3.Peerstore().Peers(), host4.ID())) } + +func TestRecordError(t *testing.T) { + + m := newMetrics(prometheus.DefaultRegisterer) + + // Increment error counter for rateLimitFailure 7 times + for i := 0; i < 7; i++ { + m.RecordError(rateLimitFailure) + } + + // Retrieve metric values + counter, _ := peerExchangeErrors.GetMetricWithLabelValues(string(rateLimitFailure)) + rateLimitFailures := &dto.Metric{} + + // Store values into metric client struct + err := counter.Write(rateLimitFailures) + require.NoError(t, err) + + // Check the count is in + require.Equal(t, 7, int(rateLimitFailures.GetCounter().GetValue())) + +} From 2e8a07a252261aafdba60c33c15000056a03cc6d Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 21:26:09 +0800 Subject: [PATCH 8/8] fix: construct multiaddr with encapsulation - remove some commented lines --- waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 02f3fc004..972ff8a6c 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -2,6 +2,7 @@ package peer_exchange import ( "context" + "fmt" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" @@ -303,7 +304,6 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { require.NoError(t, err) px1.SetHost(host1) - //pxPeerConn3 := discv5.NewTestPeerDiscoverer() px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3) @@ -317,7 +317,9 @@ func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) { require.False(t, slices.Contains(host3.Peerstore().Peers(), host2.ID())) // Construct multi address like example "/ip4/0.0.0.0/tcp/30304/p2p/16Uiu2HAmBu5zRFzBGAzzMAuGWhaxN2EwcbW7CzibELQELzisf192" - host1MultiAddr, err := multiaddr.NewMultiaddr(host1.Addrs()[0].String() + "/p2p/" + host1.ID().String()) + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host1.ID())) + require.NoError(t, err) + host1MultiAddr := host1.Addrs()[0].Encapsulate(hostInfo) require.NoError(t, err) log.Info("Connecting to peer", zap.String(host1MultiAddr.String(), "to provide 1 peer")) @@ -382,7 +384,6 @@ func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) { require.NoError(t, err) px1.SetHost(host1) - //pxPeerConn3 := discv5.NewTestPeerDiscoverer() px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger()) require.NoError(t, err) px3.SetHost(host3)