Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: peermanager tests coverage improvement #1035

Merged
merged 36 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1fc62d9
test: connection gater
romanzac Feb 16, 2024
371e35c
test: subscribe to topic event bus
romanzac Feb 16, 2024
4deaa68
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Feb 17, 2024
69a0188
test: handle new relay topic subscription
romanzac Feb 19, 2024
a8e8856
test: handleNewRelayTopicSubscription hangs
romanzac Feb 19, 2024
34b2d58
test: handle new topic subscription
romanzac Feb 20, 2024
a911e5a
test: add handle new topic unsubscription
romanzac Feb 20, 2024
0091a52
test: try to get topic into healthy state
romanzac Feb 22, 2024
8d524cd
test: change broadcaster to register ror all type
romanzac Feb 24, 2024
b8b39a0
test: peers for topic obtained as topic.topic.ListPeers() empty
romanzac Feb 26, 2024
20760e3
fix: extend TestServiceSlot with more peers
romanzac Feb 26, 2024
8e28edd
fix: rewrite Connection Gater test with
romanzac Feb 29, 2024
4de04d5
fix: rename variable for remote address
romanzac Mar 2, 2024
2f7b6c6
test: connectedness and health after four nodes added
romanzac Mar 2, 2024
f74bb4e
fix: clean up TestHandlePeerTopicEvent
romanzac Mar 4, 2024
4b6f6d2
fix: add IDS peers info
romanzac Mar 4, 2024
d3acab7
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Mar 4, 2024
9640a8c
fix: subscribe to topic from all peers
romanzac Mar 5, 2024
ba2f8b9
fix: simplify test handle peer topic event
romanzac Mar 8, 2024
b70a0be
fix: simplify test handle relay topic subscription
romanzac Mar 8, 2024
7e5138b
fix: close event loop gracefully
romanzac Mar 8, 2024
621b5e1
fix: prevent data race at TestHandleRelayTopicSubscription
romanzac Mar 9, 2024
1faed8b
fix: ineffectual assignment to err
romanzac Mar 9, 2024
ae0e73a
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Mar 11, 2024
dca4502
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Mar 11, 2024
e133ffb
fix: lower data amount used at ValidPayloads tests for filter
romanzac Mar 11, 2024
6b2d743
fix: simplify emitTopicEvent()
romanzac Mar 12, 2024
d6fb227
Revert "fix: lower data amount used at ValidPayloads tests for filter"
romanzac Mar 12, 2024
dfdde56
fix: add test for uniqueness to TestServiceSlot
romanzac Mar 12, 2024
6b94d07
test: subscribe all hosts experiment
romanzac Mar 12, 2024
0372719
test: experiment - start event loop before subscribe
romanzac Mar 13, 2024
ce77fd6
fix: restore TestHandlePeerTopicEvent to limit its scope to pm events…
romanzac Mar 14, 2024
6811fc6
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Mar 14, 2024
f8a67f8
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Mar 15, 2024
0d4cb71
Merge branch 'master' into chore-peermanager-tests-coverage-improvement
romanzac Mar 16, 2024
86fbbff
fix: update NewPeerManager() call to include metadata
romanzac Mar 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions waku/v2/peermanager/connection_gater_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package peermanager

import (
"context"
"github.com/libp2p/go-libp2p/core/control"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/utils"
"testing"
)

type mockConnMultiaddrs struct {
local, remote ma.Multiaddr
}

func (m mockConnMultiaddrs) LocalMultiaddr() ma.Multiaddr {
return m.local
}

func (m mockConnMultiaddrs) RemoteMultiaddr() ma.Multiaddr {
return m.remote
}

func TestConnectionGater(t *testing.T) {

log := utils.Logger()

_, h1, _ := makeWakuRelay(t, log)
_, h2, _ := makeWakuRelay(t, log)

h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL)
err := h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID()))
require.NoError(t, err)

h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL)
err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID()))
require.NoError(t, err)

peerA := h1.ID()

remoteAddr1 := ma.StringCast("/ip4/1.2.3.4/tcp/1234")

connGater := NewConnectionGater(2, log)

// Test peer blocking
allow := connGater.InterceptPeerDial(peerA)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
require.True(t, allow)

// Test connection was secured and upgraded
allow = connGater.InterceptSecured(network.DirInbound, peerA, &mockConnMultiaddrs{local: nil, remote: nil})
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
require.True(t, allow)

connection1 := h1.Network().Conns()[0]

allow, reason := connGater.InterceptUpgraded(connection1)
require.True(t, allow)
require.Equal(t, control.DisconnectReason(0), reason)

// Test addr and subnet blocking
allow = connGater.InterceptAddrDial(peerA, remoteAddr1)
require.True(t, allow)

// Bellow the connection limit
allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1})
require.True(t, allow)

ip, err := manet.ToIP(remoteAddr1)
require.NoError(t, err)
connGater.limiter[ip.String()] = 3

// Above the connection limit
allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1})
require.False(t, allow)

// Call twice NotifyDisconnect to get bellow the limit(2): 3 -> 1
connGater.NotifyDisconnect(remoteAddr1)
connGater.NotifyDisconnect(remoteAddr1)

// Bellow the connection limit again
allow = connGater.validateInboundConn(remoteAddr1)
require.True(t, allow)

}
21 changes: 20 additions & 1 deletion waku/v2/peermanager/service_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,31 @@ func TestServiceSlot(t *testing.T) {
fetchedPeers, err := slots.getPeers(protocol).getRandom(1)
require.NoError(t, err)
require.Equal(t, peerID, maps.Keys(fetchedPeers)[0])
//TODO: Add test to get more than 1 peers
//
slots.getPeers(protocol).remove(peerID)
//
_, err = slots.getPeers(protocol).getRandom(1)
require.Equal(t, err, ErrNoPeersAvailable)

// Test with more peers
peerID2 := peer.ID("peerId2")
peerID3 := peer.ID("peerId3")

//
slots.getPeers(protocol).add(peerID2)
slots.getPeers(protocol).add(peerID3)
//

fetchedPeers, err = slots.getPeers(protocol).getRandom(2)
require.NoError(t, err)
require.Equal(t, 2, len(maps.Keys(fetchedPeers)))
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

slots.getPeers(protocol).remove(peerID2)

fetchedPeers, err = slots.getPeers(protocol).getRandom(10)
require.NoError(t, err)
require.Equal(t, peerID3, maps.Keys(fetchedPeers)[0])

}

func TestServiceSlotRemovePeerFromAll(t *testing.T) {
Expand Down
213 changes: 213 additions & 0 deletions waku/v2/peermanager/topic_event_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package peermanager

import (
"context"
"crypto/rand"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"testing"
"time"
)

func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) {

broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))

port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

h, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)

broadcaster.RegisterForAll()

r := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(),
prometheus.DefaultRegisterer, log)

r.SetHost(h)

return r, h, broadcaster
}

func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) {
// Host 1 used by peer manager
pm := NewPeerManager(10, 20, utils.Logger())
pm.SetHost(*h)

// Create a new relay event bus
relayEvtBus := r.Events()

// Subscribe to EventBus
err := pm.SubscribeToRelayEvtBus(relayEvtBus)
require.NoError(t, err)

// Register necessary protocols
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)
return pm, relayEvtBus
}

func emitTopicEvent(pubSubTopic string, peerID peer.ID, emitter event.Emitter, state relay.PeerTopicState) error {

peerEvt := relay.EvtPeerTopic{
PubsubTopic: pubSubTopic,
PeerID: peerID,
State: state,
}

return emitter.Emit(peerEvt)
}

func TestSubscribeToRelayEvtBus(t *testing.T) {
log := utils.Logger()

// Host 1
r, h1, _ := makeWakuRelay(t, log)

// Host 1 used by peer manager
pm := NewPeerManager(10, 20, utils.Logger())
pm.SetHost(h1)

// Create a new relay event bus
relayEvtBus := r.Events()

// Subscribe to EventBus
err := pm.SubscribeToRelayEvtBus(relayEvtBus)
require.NoError(t, err)

}

func TestHandleRelayTopicSubscription(t *testing.T) {
log := utils.Logger()
pubSubTopic := "/waku/2/go/pm/test"
ctx := context.Background()

// Relay and Host
r, h1, _ := makeWakuRelay(t, log)
err := r.Start(ctx)
require.NoError(t, err)

// Peer manager with event bus
pm, _ := makePeerManagerWithEventBus(t, r, &h1)
pm.ctx = ctx

// Start event loop to listen to events
ctxEventLoop, cancel := context.WithCancel(context.Background())
defer cancel()
go pm.peerEventLoop(ctxEventLoop)

// Subscribe to Pubsub topic
_, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)

// Wait for event loop to call handler
time.Sleep(200 * time.Millisecond)

// Check Peer Manager knows about the topic
pm.topicMutex.RLock()
_, ok := pm.subRelayTopics[pubSubTopic]
require.True(t, ok)
pm.topicMutex.RUnlock()

// UnSubscribe from Pubsub topic
err = r.Unsubscribe(ctx, protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)

// Wait for event loop to call handler
time.Sleep(200 * time.Millisecond)

// Check the original topic was removed from Peer Manager
pm.topicMutex.RLock()
_, ok = pm.subRelayTopics[pubSubTopic]
require.False(t, ok)
pm.topicMutex.RUnlock()

r.Stop()
}

func TestHandlePeerTopicEvent(t *testing.T) {
log := utils.Logger()
pubSubTopic := "/waku/2/go/pm/test"
ctx := context.Background()

hosts := make([]host.Host, 5)
relays := make([]*relay.WakuRelay, 5)

for i := 0; i < 5; i++ {
relays[i], hosts[i], _ = makeWakuRelay(t, log)
err := relays[i].Start(ctx)
require.NoError(t, err)
}

// Create peer manager instance with the first hosts
pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0])
pm.ctx = ctx
pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField)

// Connect host[0] with all other hosts to reach 4 connections
for i := 1; i < 5; i++ {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
pm.host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL)
err := pm.host.Connect(ctx, hosts[i].Peerstore().PeerInfo(hosts[i].ID()))
require.NoError(t, err)
err = pm.host.Peerstore().(wps.WakuPeerstore).SetDirection(hosts[i].ID(), network.DirOutbound)
require.NoError(t, err)

}

// Wait for connections to settle
time.Sleep(2 * time.Second)

if len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)) == 0 {
log.Info("No peers for the topic yet")
}

// Subscribe to Pubsub topic on first host only
_, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic))
require.NoError(t, err)

// Start event loop to listen to events
ctxEventLoop := context.Background()
go pm.peerEventLoop(ctxEventLoop)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

// Prepare emitter
emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic))
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)

// Send PEER_JOINED events for hosts 2-5
for i := 1; i < 5; i++ {
err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
}

// Check four hosts have joined the topic
require.Equal(t, 4, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)))

// Check all hosts have been connected
for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) {
require.Equal(t, network.Connected, pm.host.Network().Connectedness(peer))
}

// Send PEER_LEFT events for hosts 2-5
for i := 1; i < 5; i++ {
err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
}

// Check all hosts have left the topic
require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)))

}
12 changes: 6 additions & 6 deletions waku/v2/protocol/filter/filter_push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (s *FilterTestSuite) TestValidPayloadsASCII() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomASCIIString)
messages := prepareData(50, false, false, true, tests.GenerateRandomASCIIString)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved

// All messages should be received
s.waitForMessages(func() {
Expand All @@ -34,7 +34,7 @@ func (s *FilterTestSuite) TestValidPayloadsUTF8() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomUTF8String)
messages := prepareData(50, false, false, true, tests.GenerateRandomUTF8String)

// All messages should be received
s.waitForMessages(func() {
Expand All @@ -52,7 +52,7 @@ func (s *FilterTestSuite) TestValidPayloadsBase64() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomBase64String)
messages := prepareData(50, false, false, true, tests.GenerateRandomBase64String)

// All messages should be received
s.waitForMessages(func() {
Expand All @@ -70,7 +70,7 @@ func (s *FilterTestSuite) TestValidPayloadsJSON() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomJSONString)
messages := prepareData(50, false, false, true, tests.GenerateRandomJSONString)

// All messages should be received
s.waitForMessages(func() {
Expand All @@ -88,7 +88,7 @@ func (s *FilterTestSuite) TestValidPayloadsURLEncoded() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString)
messages := prepareData(50, false, false, true, tests.GenerateRandomURLEncodedString)

// All messages should be received
s.waitForMessages(func() {
Expand All @@ -106,7 +106,7 @@ func (s *FilterTestSuite) TestValidPayloadsSQL() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

// Prepare data
messages := prepareData(100, false, false, true, tests.GenerateRandomSQLInsert)
messages := prepareData(50, false, false, true, tests.GenerateRandomSQLInsert)

// All messages should be received
s.waitForMessages(func() {
Expand Down
Loading