Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: peer exchange tests coverage improvement #1046

Merged
merged 9 commits into from
Mar 11, 2024
245 changes: 245 additions & 0 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@ package peer_exchange

import (
"context"
"fmt"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/peermanager"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/discv5"
Expand All @@ -17,6 +27,26 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
)

func createHostWithDiscv5(t *testing.T, bootnode ...*enode.Node) (host.Host, *discv5.DiscoveryV5) {
ps, err := pstoremem.NewPeerstore()
require.NoError(t, err)
wakuPeerStore := wps.NewWakuPeerstore(ps)

host, _, prvKey := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore))

port, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
ip, _ := tests.ExtractIP(host.Addrs()[0])
l, err := tests.NewLocalnode(prvKey, ip, port, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
discv5PeerConn1 := discv5.NewTestPeerDiscoverer()
d, err := discv5.NewDiscoveryV5(prvKey, l, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(port)), discv5.WithBootnodes(bootnode))
require.NoError(t, err)
d.SetHost(host)

return host, d
}

func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
// H1
host1, _, prvKey1 := tests.CreateHost(t)
Expand Down Expand Up @@ -185,3 +215,218 @@ func TestRetrieveFilteredPeerExchangePeers(t *testing.T) {
px1.Stop()
px3.Stop()
}

func TestPeerExchangeOptions(t *testing.T) {

// Prepare host1
host1, _, prvKey1 := tests.CreateHost(t)
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
ip1, _ := tests.ExtractIP(host1.Addrs()[0])
l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
discv5PeerConn1 := discv5.NewTestPeerDiscoverer()
d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)

// Mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)

// Test WithPeerAddr()
params := new(PeerExchangeRequestParameters)
params.host = px1.h
params.log = px1.log
params.pm = px1.pm

optList := DefaultOptions(px1.h)
optList = append(optList, WithPeerAddr(host1.Addrs()[0]))
for _, opt := range optList {
err := opt(params)
require.NoError(t, err)
}

require.Equal(t, host1.Addrs()[0], params.peerAddr)

// Test WithFastestPeerSelection()
optList = DefaultOptions(px1.h)
optList = append(optList, WithFastestPeerSelection(host1.ID()))
for _, opt := range optList {
err := opt(params)
require.NoError(t, err)
}

require.Equal(t, peermanager.LowestRTT, params.peerSelectionType)
require.Equal(t, host1.ID(), params.preferredPeers[0])

}

func TestRetrieveProvidePeerExchangeWithPMAndPeerAddr(t *testing.T) {
log := utils.Logger()

// H1 + H2 with discovery on
host1, d1 := createHostWithDiscv5(t)
host2, d2 := createHostWithDiscv5(t, d1.Node())

// H3
ps3, err := pstoremem.NewPeerstore()
require.NoError(t, err)
wakuPeerStore3 := wps.NewWakuPeerstore(ps3)
host3, _, _ := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore3))

defer d1.Stop()
defer d2.Stop()
defer host1.Close()
defer host2.Close()
defer host3.Close()

err = d1.Start(context.Background())
require.NoError(t, err)

err = d2.Start(context.Background())
require.NoError(t, err)

// Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, log)
pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
require.NoError(t, err)
pxPeerConn3.SetHost(host3)
err = pxPeerConn3.Start(context.Background())
require.NoError(t, err)

// mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)

px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px3.SetHost(host3)

err = px1.Start(context.Background())
require.NoError(t, err)

time.Sleep(5 * time.Second)

// Check peer store of the client doesn't contain peer2 before the request
require.False(t, slices.Contains(host3.Peerstore().Peers(), host2.ID()))

// Construct multi address like example "/ip4/0.0.0.0/tcp/30304/p2p/16Uiu2HAmBu5zRFzBGAzzMAuGWhaxN2EwcbW7CzibELQELzisf192"
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host1.ID()))
require.NoError(t, err)
host1MultiAddr := host1.Addrs()[0].Encapsulate(hostInfo)
require.NoError(t, err)

log.Info("Connecting to peer", zap.String(host1MultiAddr.String(), "to provide 1 peer"))
err = px3.Request(context.Background(), 1, WithPeerAddr(host1MultiAddr))
require.NoError(t, err)

time.Sleep(5 * time.Second)

// Check peer store of the client does contain peer2 after the request
require.True(t, slices.Contains(host3.Peerstore().Peers(), host2.ID()))

}

func TestRetrieveProvidePeerExchangeWithPMOnly(t *testing.T) {
log := utils.Logger()

// H1 + H2 + H4 with discovery on
host1, d1 := createHostWithDiscv5(t)
host2, d2 := createHostWithDiscv5(t, d1.Node())
host4, d4 := createHostWithDiscv5(t, d1.Node())

// H3
ps3, err := pstoremem.NewPeerstore()
require.NoError(t, err)
wakuPeerStore3 := wps.NewWakuPeerstore(ps3)
host3, _, _ := tests.CreateHost(t, libp2p.Peerstore(wakuPeerStore3))

defer d1.Stop()
defer d2.Stop()
defer d4.Stop()
defer host1.Close()
defer host2.Close()
defer host3.Close()
defer host4.Close()

err = d1.Start(context.Background())
require.NoError(t, err)

err = d2.Start(context.Background())
require.NoError(t, err)

err = d4.Start(context.Background())
require.NoError(t, err)

// Prepare peer manager for host3
pm3 := peermanager.NewPeerManager(10, 20, log)
pm3.SetHost(host3)
pxPeerConn3, err := peermanager.NewPeerConnectionStrategy(pm3, 30*time.Second, utils.Logger())
require.NoError(t, err)
pxPeerConn3.SetHost(host3)
err = pxPeerConn3.Start(context.Background())
require.NoError(t, err)

// Add peer exchange server to client's peer store
host3.Peerstore().AddAddrs(host1.ID(), host1.Addrs(), peerstore.PermanentAddrTTL)
err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1)
require.NoError(t, err)

// mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)

px3, err := NewWakuPeerExchange(nil, pxPeerConn3, pm3, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px3.SetHost(host3)

err = px1.Start(context.Background())
require.NoError(t, err)

time.Sleep(5 * time.Second)

// Check peer store of the client doesn't contain peer2 and peer4 before the request
require.False(t, slices.Contains(host3.Peerstore().Peers(), host2.ID()))
require.False(t, slices.Contains(host3.Peerstore().Peers(), host4.ID()))

// Connect to PM to get peers
err = px3.Request(context.Background(), 2, WithAutomaticPeerSelection(host1.ID()))
require.NoError(t, err)

time.Sleep(5 * time.Second)

// Check peer store of the client does contain peer2 and peer4 after the request
require.True(t, slices.Contains(host3.Peerstore().Peers(), host2.ID()))
require.True(t, slices.Contains(host3.Peerstore().Peers(), host4.ID()))

}

func TestRecordError(t *testing.T) {

m := newMetrics(prometheus.DefaultRegisterer)

// Increment error counter for rateLimitFailure 7 times
for i := 0; i < 7; i++ {
m.RecordError(rateLimitFailure)
}

// Retrieve metric values
counter, _ := peerExchangeErrors.GetMetricWithLabelValues(string(rateLimitFailure))
rateLimitFailures := &dto.Metric{}

// Store values into metric client struct
err := counter.Write(rateLimitFailures)
require.NoError(t, err)

// Check the count is in
require.Equal(t, 7, int(rateLimitFailures.GetCounter().GetValue()))

}
Loading