From 1fc62d9dd53c14c6bf56f8cb1f1718ec3cb22b6f Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 16 Feb 2024 14:07:26 +0800 Subject: [PATCH 01/29] test: connection gater --- waku/v2/peermanager/peer_manager_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index f85a80a39..73c09312f 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -337,3 +337,23 @@ func TestOnDemandPeerDiscovery(t *testing.T) { require.Equal(t, host1.ID(), peerIDs[0]) } + +func TestConnectionGater(t *testing.T) { + + logger := utils.Logger() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + h1, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + + connGater := NewConnectionGater(5, logger) + + allow := connGater.InterceptPeerDial(h1.ID()) + require.True(t, allow) + + addr := getAddr(h1) + allow = connGater.InterceptAddrDial(h1.ID(), addr) + require.True(t, allow) + +} From 371e35cf3bd6a4ab287635b1a0bf5ac96857f522 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 16 Feb 2024 16:39:55 +0800 Subject: [PATCH 02/29] test: subscribe to topic event bus --- tests/utils.go | 27 +++++++++++++++++++ .../peermanager/topic_event_handler_test.go | 27 +++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 waku/v2/peermanager/topic_event_handler_test.go diff --git a/tests/utils.go b/tests/utils.go index 10c99be63..c99e83bf9 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,6 +9,10 @@ import ( "encoding/hex" "encoding/json" "fmt" + "github.com/prometheus/client_golang/prometheus" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "io" "math" "math/big" @@ -210,6 +214,29 @@ func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.Pri return host, port, privKey } +func MakeWakuRelay(t *testing.T, topic string, log *zap.Logger) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { + + broadcaster := relay.NewBroadcaster(10) + require.NoError(t, broadcaster.Start(context.Background())) + + port, err := FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) + relay.SetHost(host) + + err = relay.Start(context.Background()) + require.NoError(t, err) + + sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) + require.NoError(t, err) + + return relay, sub[0], host, broadcaster +} + func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) if err != nil { diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go new file mode 100644 index 000000000..aaf28f5e6 --- /dev/null +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -0,0 +1,27 @@ +package peermanager + +import ( + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/utils" + "testing" +) + +func TestSubscribeToRelayEvtBus(t *testing.T) { + log := utils.Logger() + + // Host 1 + r, _, h1, _ := tests.MakeWakuRelay(t, "testTopic", log) + + // Host 1 used by peer manager + pm := NewPeerManager(10, 20, utils.Logger()) + pm.SetHost(h1) + + // Create a new relay event bus + relayEvtBus := r.Events() + + // Subscribe to EventBus + err := pm.SubscribeToRelayEvtBus(relayEvtBus) + require.NoError(t, err) + +} From 69a0188dcebbce32cdde11588bf9268c9efbc32a Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 19 Feb 2024 14:39:46 +0800 Subject: [PATCH 03/29] test: handle new relay topic subscription --- tests/utils.go | 11 +---- .../peermanager/topic_event_handler_test.go | 47 ++++++++++++++++++- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/tests/utils.go b/tests/utils.go index c99e83bf9..b1f9c646d 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -10,7 +10,6 @@ import ( "encoding/json" "fmt" "github.com/prometheus/client_golang/prometheus" - "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "io" @@ -214,7 +213,7 @@ func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.Pri return host, port, privKey } -func MakeWakuRelay(t *testing.T, topic string, log *zap.Logger) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) { +func MakeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { broadcaster := relay.NewBroadcaster(10) require.NoError(t, broadcaster.Start(context.Background())) @@ -228,13 +227,7 @@ func MakeWakuRelay(t *testing.T, topic string, log *zap.Logger) (*relay.WakuRela relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) relay.SetHost(host) - err = relay.Start(context.Background()) - require.NoError(t, err) - - sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) - require.NoError(t, err) - - return relay, sub[0], host, broadcaster + return relay, host, broadcaster } func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index aaf28f5e6..5991cb7d1 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -1,8 +1,11 @@ package peermanager import ( + "context" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "testing" ) @@ -11,7 +14,7 @@ func TestSubscribeToRelayEvtBus(t *testing.T) { log := utils.Logger() // Host 1 - r, _, h1, _ := tests.MakeWakuRelay(t, "testTopic", log) + r, h1, _ := tests.MakeWakuRelay(t, log) // Host 1 used by peer manager pm := NewPeerManager(10, 20, utils.Logger()) @@ -25,3 +28,45 @@ func TestSubscribeToRelayEvtBus(t *testing.T) { require.NoError(t, err) } + +func TestHandleNewRelayTopicSubscription(t *testing.T) { + log := utils.Logger() + pubSubTopic := "/waku/2/go/pm/test" + ctx := context.Background() + + // Relay and Host + r, h1, _ := tests.MakeWakuRelay(t, log) + err := r.Start(ctx) + require.NoError(t, err) + + // Host 1 used by peer manager + pm := NewPeerManager(10, 20, utils.Logger()) + pm.SetHost(h1) + + // Create a new relay event bus + relayEvtBus := r.Events() + + // Subscribe to EventBus + err = pm.SubscribeToRelayEvtBus(relayEvtBus) + require.NoError(t, err) + pm.Start(ctx) + + // Subscribe to Pubsub topic + _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtRelaySubscribed: + { + eventDetails := (relay.EvtRelaySubscribed)(e) + pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst) + } + } + } + + //// Check Peer Manager knows about the topic + //_, ok := pm.subRelayTopics[pubSubTopic] + //require.True(t, ok) +} From a8e88563734bd84caaa0e208459c8beca0a36699 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 19 Feb 2024 16:48:25 +0800 Subject: [PATCH 04/29] test: handleNewRelayTopicSubscription hangs --- waku/v2/peermanager/topic_event_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 1a39fee25..779dadb3a 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -77,6 +77,7 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic //TODO: Initiate on-demand discovery for this pubSubTopic. // Use peer-exchange and rendevouz? //Should we query discoverycache to find out if there are any more peers before triggering discovery? + pm.logger.Info("handleNewRelayTopicUnSubscription - finished") return } } From 34b2d58ea52616dc62bf142ea17c686a812f79a4 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 20 Feb 2024 13:54:59 +0800 Subject: [PATCH 05/29] test: handle new topic subscription - restore handleNewRelayTopicSubscription original body --- waku/v2/peermanager/topic_event_handler.go | 1 - .../v2/peermanager/topic_event_handler_test.go | 18 ++++++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler.go b/waku/v2/peermanager/topic_event_handler.go index 779dadb3a..1a39fee25 100644 --- a/waku/v2/peermanager/topic_event_handler.go +++ b/waku/v2/peermanager/topic_event_handler.go @@ -77,7 +77,6 @@ func (pm *PeerManager) handleNewRelayTopicSubscription(pubsubTopic string, topic //TODO: Initiate on-demand discovery for this pubSubTopic. // Use peer-exchange and rendevouz? //Should we query discoverycache to find out if there are any more peers before triggering discovery? - pm.logger.Info("handleNewRelayTopicUnSubscription - finished") return } } diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 5991cb7d1..db6c56464 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -49,12 +49,17 @@ func TestHandleNewRelayTopicSubscription(t *testing.T) { // Subscribe to EventBus err = pm.SubscribeToRelayEvtBus(relayEvtBus) require.NoError(t, err) - pm.Start(ctx) + + // Register necessary protocols and connect + pm.ctx = ctx + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + go pm.connectivityLoop(ctx) // Subscribe to Pubsub topic _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) + // Call the appropriate handler select { case e := <-pm.sub.Out(): switch e := e.(type) { @@ -63,10 +68,15 @@ func TestHandleNewRelayTopicSubscription(t *testing.T) { eventDetails := (relay.EvtRelaySubscribed)(e) pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst) } + default: + require.Fail(t, "unexpected event arrived") } + + case <-ctx.Done(): + require.Fail(t, "closed channel") } - //// Check Peer Manager knows about the topic - //_, ok := pm.subRelayTopics[pubSubTopic] - //require.True(t, ok) + // Check Peer Manager knows about the topic + _, ok := pm.subRelayTopics[pubSubTopic] + require.True(t, ok) } From a911e5a32cba2df43790b34575fc601a65732c38 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 20 Feb 2024 14:11:38 +0800 Subject: [PATCH 06/29] test: add handle new topic unsubscription - fix package dependency cycle --- tests/utils.go | 20 ------- .../peermanager/topic_event_handler_test.go | 54 +++++++++++++++++-- 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/tests/utils.go b/tests/utils.go index b1f9c646d..10c99be63 100644 --- a/tests/utils.go +++ b/tests/utils.go @@ -9,9 +9,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - "github.com/prometheus/client_golang/prometheus" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" "io" "math" "math/big" @@ -213,23 +210,6 @@ func CreateHost(t *testing.T, opts ...config.Option) (host.Host, int, *ecdsa.Pri return host, port, privKey } -func MakeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { - - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - - port, err := FindFreePort(t, "", 5) - require.NoError(t, err) - - host, err := MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) - - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) - relay.SetHost(host) - - return relay, host, broadcaster -} - func ExtractIP(addr multiaddr.Multiaddr) (*net.TCPAddr, error) { ipStr, err := addr.ValueForProtocol(multiaddr.P_IP4) if err != nil { diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index db6c56464..5086d6560 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -2,19 +2,41 @@ package peermanager import ( "context" + "crypto/rand" + "github.com/libp2p/go-libp2p/core/host" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" "testing" ) +func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { + + broadcaster := relay.NewBroadcaster(10) + require.NoError(t, broadcaster.Start(context.Background())) + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + host, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) + relay.SetHost(host) + + return relay, host, broadcaster +} + func TestSubscribeToRelayEvtBus(t *testing.T) { log := utils.Logger() // Host 1 - r, h1, _ := tests.MakeWakuRelay(t, log) + r, h1, _ := makeWakuRelay(t, log) // Host 1 used by peer manager pm := NewPeerManager(10, 20, utils.Logger()) @@ -29,13 +51,13 @@ func TestSubscribeToRelayEvtBus(t *testing.T) { } -func TestHandleNewRelayTopicSubscription(t *testing.T) { +func TestHandleRelayTopicSubscription(t *testing.T) { log := utils.Logger() pubSubTopic := "/waku/2/go/pm/test" ctx := context.Background() // Relay and Host - r, h1, _ := tests.MakeWakuRelay(t, log) + r, h1, _ := makeWakuRelay(t, log) err := r.Start(ctx) require.NoError(t, err) @@ -79,4 +101,30 @@ func TestHandleNewRelayTopicSubscription(t *testing.T) { // Check Peer Manager knows about the topic _, ok := pm.subRelayTopics[pubSubTopic] require.True(t, ok) + + // UnSubscribe from Pubsub topic + err = r.Unsubscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + + // Call the appropriate handler + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtRelayUnsubscribed: + { + eventDetails := (relay.EvtRelayUnsubscribed)(e) + pm.handleNewRelayTopicUnSubscription(eventDetails.Topic) + } + default: + require.Fail(t, "unexpected event arrived") + } + + case <-ctx.Done(): + require.Fail(t, "closed channel") + } + + // Check Peer Manager knows about the topic + _, ok = pm.subRelayTopics[pubSubTopic] + require.False(t, ok) + } From 0091a52cd21a969d19b1ac47a61d510e9977b2f4 Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 22 Feb 2024 13:26:01 +0800 Subject: [PATCH 07/29] test: try to get topic into healthy state --- .../peermanager/topic_event_handler_test.go | 160 ++++++++++++++++-- 1 file changed, 146 insertions(+), 14 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 5086d6560..c3e771357 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -3,15 +3,19 @@ package peermanager import ( "context" "crypto/rand" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + "strconv" "testing" ) @@ -32,6 +36,23 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, return relay, host, broadcaster } +func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h host.Host) (*PeerManager, event.Bus) { + // Host 1 used by peer manager + pm := NewPeerManager(10, 20, utils.Logger()) + pm.SetHost(h) + + // Create a new relay event bus + relayEvtBus := r.Events() + + // Subscribe to EventBus + err := pm.SubscribeToRelayEvtBus(relayEvtBus) + require.NoError(t, err) + + // Register necessary protocols + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + return pm, relayEvtBus +} + func TestSubscribeToRelayEvtBus(t *testing.T) { log := utils.Logger() @@ -61,20 +82,9 @@ func TestHandleRelayTopicSubscription(t *testing.T) { err := r.Start(ctx) require.NoError(t, err) - // Host 1 used by peer manager - pm := NewPeerManager(10, 20, utils.Logger()) - pm.SetHost(h1) - - // Create a new relay event bus - relayEvtBus := r.Events() - - // Subscribe to EventBus - err = pm.SubscribeToRelayEvtBus(relayEvtBus) - require.NoError(t, err) - - // Register necessary protocols and connect + // Peermanager with event bus + pm, _ := makePeerManagerWithEventBus(t, r, h1) pm.ctx = ctx - pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) go pm.connectivityLoop(ctx) // Subscribe to Pubsub topic @@ -123,8 +133,130 @@ func TestHandleRelayTopicSubscription(t *testing.T) { require.Fail(t, "closed channel") } - // Check Peer Manager knows about the topic + // Check the original topic was removed from Peer Manager _, ok = pm.subRelayTopics[pubSubTopic] require.False(t, ok) } + +func TestHandlePeerTopicEvent(t *testing.T) { + log := utils.Logger() + pubSubTopic := "/waku/2/go/pm/test" + ctx := context.Background() + + // Relay and Host1 + r, h1, _ := makeWakuRelay(t, log) + err := r.Start(ctx) + require.NoError(t, err) + + // Relay and Host2 + r2, h2, _ := makeWakuRelay(t, log) + err = r2.Start(ctx) + require.NoError(t, err) + + // Peermanager with event bus + pm, eventBus := makePeerManagerWithEventBus(t, r, h1) + pm.ctx = ctx + go pm.connectivityLoop(ctx) + + // Add h2 peer + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, relay.WakuRelayID_v200) + require.NoError(t, err) + + h1.Peerstore().AddAddr(h2.ID(), tests.GetHostAddress(h2), peerstore.PermanentAddrTTL) + err = h1.Peerstore().AddProtocols(h2.ID(), relay.WakuRelayID_v200) + require.NoError(t, err) + + // Subscribe to Pubsub topic which also emits relay.PEER_JOINED + _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + + // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT + peerEvt := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: h1.ID(), + State: relay.PEER_JOINED, + } + + emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) + require.NoError(t, err) + + err = emitter.Emit(peerEvt) + require.NoError(t, err) + + // Call the appropriate handler + for i := 0; i < 2; i++ { + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtPeerTopic: + { + log.Info("Handling topic event...") + peerEvt := (relay.EvtPeerTopic)(e) + pm.handlerPeerTopicEvent(peerEvt) + for id := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts before", zap.String("id", strconv.Itoa(id))) + } + } + case relay.EvtRelaySubscribed: + { + log.Info("Handling subscribe event...") + eventDetails := (relay.EvtRelaySubscribed)(e) + pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst) + } + default: + require.Fail(t, "unexpected event arrived") + } + + case <-ctx.Done(): + require.Fail(t, "closed channel") + } + } + + // Evaluate topic health - unhealthy at first, because no peers connected + peerTopic := pm.subRelayTopics[peerEvt.PubsubTopic] + //pm.checkAndUpdateTopicHealth(topic) + require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) + + // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT + peerEvt2 := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: h2.ID(), + State: relay.PEER_JOINED, + } + + err = emitter.Emit(peerEvt2) + require.NoError(t, err) + + // Call the appropriate handler + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtPeerTopic: + { + log.Info("Handling topic event...") + peerEvt := (relay.EvtPeerTopic)(e) + pm.handlerPeerTopicEvent(peerEvt) + for id := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts after", zap.String("id", strconv.Itoa(id))) + } + } + default: + require.Fail(t, "unexpected event arrived") + } + + case <-ctx.Done(): + require.Fail(t, "closed channel") + } + + // Evaluate topic health - unhealthy at first, because no peers connected + peerTopic = pm.subRelayTopics[peerEvt.PubsubTopic] + + for id := range peerTopic.topic.ListPeers() { + log.Info("peers", zap.String("ID", strconv.Itoa(id))) + } + + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[peerEvt.PubsubTopic]) + require.Equal(t, TopicHealth(MinimallyHealthy), peerTopic.healthStatus) + +} From 8d524cd5a19ebfc215a1e1c3bde1c0d028669399 Mon Sep 17 00:00:00 2001 From: Roman Date: Sat, 24 Feb 2024 10:26:57 +0800 Subject: [PATCH 08/29] test: change broadcaster to register ror all type --- .../peermanager/topic_event_handler_test.go | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index c3e771357..7f63cbbb7 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -16,6 +16,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "strconv" + "testing" ) @@ -30,16 +31,18 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) + broadcaster.RegisterForAll() + relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) relay.SetHost(host) return relay, host, broadcaster } -func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h host.Host) (*PeerManager, event.Bus) { +func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { // Host 1 used by peer manager pm := NewPeerManager(10, 20, utils.Logger()) - pm.SetHost(h) + pm.SetHost(*h) // Create a new relay event bus relayEvtBus := r.Events() @@ -83,7 +86,7 @@ func TestHandleRelayTopicSubscription(t *testing.T) { require.NoError(t, err) // Peermanager with event bus - pm, _ := makePeerManagerWithEventBus(t, r, h1) + pm, _ := makePeerManagerWithEventBus(t, r, &h1) pm.ctx = ctx go pm.connectivityLoop(ctx) @@ -155,7 +158,7 @@ func TestHandlePeerTopicEvent(t *testing.T) { require.NoError(t, err) // Peermanager with event bus - pm, eventBus := makePeerManagerWithEventBus(t, r, h1) + pm, eventBus := makePeerManagerWithEventBus(t, r, &h1) pm.ctx = ctx go pm.connectivityLoop(ctx) @@ -194,9 +197,12 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("Handling topic event...") peerEvt := (relay.EvtPeerTopic)(e) pm.handlerPeerTopicEvent(peerEvt) - for id := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts before", zap.String("id", strconv.Itoa(id))) + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts before", zap.String("peer", peer.String())) + log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) + } + } case relay.EvtRelaySubscribed: { @@ -237,8 +243,9 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("Handling topic event...") peerEvt := (relay.EvtPeerTopic)(e) pm.handlerPeerTopicEvent(peerEvt) - for id := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts after", zap.String("id", strconv.Itoa(id))) + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts after", zap.String("id", peer.String())) + log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) } } default: From b8b39a0194473c5cec5cd49c37b406680ad04747 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 26 Feb 2024 13:26:48 +0800 Subject: [PATCH 09/29] test: peers for topic obtained as topic.topic.ListPeers() empty --- .../peermanager/topic_event_handler_test.go | 31 ++++++++++++++++--- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 7f63cbbb7..6941b34db 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -16,6 +16,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "strconv" + "time" "testing" ) @@ -28,15 +29,15 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) - host, err := tests.MakeHost(context.Background(), port, rand.Reader) + h, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) broadcaster.RegisterForAll() - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) - relay.SetHost(host) + r := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) + r.SetHost(h) - return relay, host, broadcaster + return r, h, broadcaster } func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { @@ -160,7 +161,7 @@ func TestHandlePeerTopicEvent(t *testing.T) { // Peermanager with event bus pm, eventBus := makePeerManagerWithEventBus(t, r, &h1) pm.ctx = ctx - go pm.connectivityLoop(ctx) + pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) // Add h2 peer _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, relay.WakuRelayID_v200) @@ -170,6 +171,26 @@ func TestHandlePeerTopicEvent(t *testing.T) { err = h1.Peerstore().AddProtocols(h2.ID(), relay.WakuRelayID_v200) require.NoError(t, err) + go pm.connectivityLoop(ctx) + + pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger) + require.NoError(t, err) + err = pc.Start(ctx) + require.NoError(t, err) + + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts initially", zap.String("peer", peer.String())) + log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) + + } + + topic, ok := pm.subRelayTopics[pubSubTopic] + if ok { + log.Info("existing topic details", zap.String("topic", topic.topic.String())) + } + + time.Sleep(4 * time.Second) + // Subscribe to Pubsub topic which also emits relay.PEER_JOINED _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) From 20760e3457a86f6549af3310970fcaa71a8aed73 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 26 Feb 2024 16:37:30 +0800 Subject: [PATCH 10/29] fix: extend TestServiceSlot with more peers --- waku/v2/peermanager/service_slot_test.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index a9d944a79..9691d9e37 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -22,12 +22,31 @@ func TestServiceSlot(t *testing.T) { fetchedPeers, err := slots.getPeers(protocol).getRandom(1) require.NoError(t, err) require.Equal(t, peerID, maps.Keys(fetchedPeers)[0]) - //TODO: Add test to get more than 1 peers // slots.getPeers(protocol).remove(peerID) // _, err = slots.getPeers(protocol).getRandom(1) require.Equal(t, err, ErrNoPeersAvailable) + + // Test with more peers + peerID2 := peer.ID("peerId2") + peerID3 := peer.ID("peerId3") + + // + slots.getPeers(protocol).add(peerID2) + slots.getPeers(protocol).add(peerID3) + // + + fetchedPeers, err = slots.getPeers(protocol).getRandom(2) + require.NoError(t, err) + require.Equal(t, 2, len(maps.Keys(fetchedPeers))) + + slots.getPeers(protocol).remove(peerID2) + + fetchedPeers, err = slots.getPeers(protocol).getRandom(10) + require.NoError(t, err) + require.Equal(t, peerID3, maps.Keys(fetchedPeers)[0]) + } func TestServiceSlotRemovePeerFromAll(t *testing.T) { From 8e28eddf906843f933d21f484dcea13815a6bbe5 Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 29 Feb 2024 19:35:37 +0800 Subject: [PATCH 11/29] fix: rewrite Connection Gater test with --- waku/v2/peermanager/connection_gater_test.go | 85 ++++++++++++++++++++ waku/v2/peermanager/peer_manager_test.go | 20 ----- 2 files changed, 85 insertions(+), 20 deletions(-) create mode 100644 waku/v2/peermanager/connection_gater_test.go diff --git a/waku/v2/peermanager/connection_gater_test.go b/waku/v2/peermanager/connection_gater_test.go new file mode 100644 index 000000000..f4733caf0 --- /dev/null +++ b/waku/v2/peermanager/connection_gater_test.go @@ -0,0 +1,85 @@ +package peermanager + +import ( + "context" + "github.com/libp2p/go-libp2p/core/control" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peerstore" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/v2/utils" + "testing" +) + +type mockConnMultiaddrs struct { + local, remote ma.Multiaddr +} + +func (m mockConnMultiaddrs) LocalMultiaddr() ma.Multiaddr { + return m.local +} + +func (m mockConnMultiaddrs) RemoteMultiaddr() ma.Multiaddr { + return m.remote +} + +func TestConnectionGater(t *testing.T) { + + log := utils.Logger() + + _, h1, _ := makeWakuRelay(t, log) + _, h2, _ := makeWakuRelay(t, log) + + h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + err := h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID())) + require.NoError(t, err) + + h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID())) + require.NoError(t, err) + + peerA := h1.ID() + + remoteMulti1 := ma.StringCast("/ip4/1.2.3.4/tcp/1234") + + connGater := NewConnectionGater(2, log) + + // Test peer blocking + allow := connGater.InterceptPeerDial(peerA) + require.True(t, allow) + + // Test connection was secured and upgraded + allow = connGater.InterceptSecured(network.DirInbound, peerA, &mockConnMultiaddrs{local: nil, remote: nil}) + require.True(t, allow) + + connection1 := h1.Network().Conns()[0] + + allow, reason := connGater.InterceptUpgraded(connection1) + require.True(t, allow) + require.Equal(t, control.DisconnectReason(0), reason) + + // Test addr and subnet blocking + allow = connGater.InterceptAddrDial(peerA, remoteMulti1) + require.True(t, allow) + + // Bellow the connection limit + allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteMulti1}) + require.True(t, allow) + + ip, err := manet.ToIP(remoteMulti1) + connGater.limiter[ip.String()] = 3 + + // Above the connection limit + allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteMulti1}) + require.False(t, allow) + + // Call twice NotifyDisconnect to get bellow the limit(2): 3 -> 1 + connGater.NotifyDisconnect(remoteMulti1) + connGater.NotifyDisconnect(remoteMulti1) + + // Bellow the connection limit again + allow = connGater.validateInboundConn(remoteMulti1) + require.True(t, allow) + +} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 73c09312f..f85a80a39 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -337,23 +337,3 @@ func TestOnDemandPeerDiscovery(t *testing.T) { require.Equal(t, host1.ID(), peerIDs[0]) } - -func TestConnectionGater(t *testing.T) { - - logger := utils.Logger() - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - h1, err := tests.MakeHost(ctx, 0, rand.Reader) - require.NoError(t, err) - - connGater := NewConnectionGater(5, logger) - - allow := connGater.InterceptPeerDial(h1.ID()) - require.True(t, allow) - - addr := getAddr(h1) - allow = connGater.InterceptAddrDial(h1.ID(), addr) - require.True(t, allow) - -} From 4de04d5ad9f5644268033016e70745b5dc23f2d0 Mon Sep 17 00:00:00 2001 From: Roman Date: Sat, 2 Mar 2024 10:45:19 +0800 Subject: [PATCH 12/29] fix: rename variable for remote address --- waku/v2/peermanager/connection_gater_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/waku/v2/peermanager/connection_gater_test.go b/waku/v2/peermanager/connection_gater_test.go index f4733caf0..26a85671b 100644 --- a/waku/v2/peermanager/connection_gater_test.go +++ b/waku/v2/peermanager/connection_gater_test.go @@ -41,7 +41,7 @@ func TestConnectionGater(t *testing.T) { peerA := h1.ID() - remoteMulti1 := ma.StringCast("/ip4/1.2.3.4/tcp/1234") + remoteAddr1 := ma.StringCast("/ip4/1.2.3.4/tcp/1234") connGater := NewConnectionGater(2, log) @@ -60,26 +60,26 @@ func TestConnectionGater(t *testing.T) { require.Equal(t, control.DisconnectReason(0), reason) // Test addr and subnet blocking - allow = connGater.InterceptAddrDial(peerA, remoteMulti1) + allow = connGater.InterceptAddrDial(peerA, remoteAddr1) require.True(t, allow) // Bellow the connection limit - allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteMulti1}) + allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1}) require.True(t, allow) - ip, err := manet.ToIP(remoteMulti1) + ip, err := manet.ToIP(remoteAddr1) connGater.limiter[ip.String()] = 3 // Above the connection limit - allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteMulti1}) + allow = connGater.InterceptAccept(&mockConnMultiaddrs{local: nil, remote: remoteAddr1}) require.False(t, allow) // Call twice NotifyDisconnect to get bellow the limit(2): 3 -> 1 - connGater.NotifyDisconnect(remoteMulti1) - connGater.NotifyDisconnect(remoteMulti1) + connGater.NotifyDisconnect(remoteAddr1) + connGater.NotifyDisconnect(remoteAddr1) // Bellow the connection limit again - allow = connGater.validateInboundConn(remoteMulti1) + allow = connGater.validateInboundConn(remoteAddr1) require.True(t, allow) } From 2f7b6c6857ab1c6be16d03407648b50a25212a8e Mon Sep 17 00:00:00 2001 From: Roman Date: Sat, 2 Mar 2024 12:27:16 +0800 Subject: [PATCH 13/29] test: connectedness and health after four nodes added --- .../peermanager/topic_event_handler_test.go | 182 +++++++++++++++--- 1 file changed, 150 insertions(+), 32 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 6941b34db..ac89c3c7a 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -16,9 +17,8 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" "strconv" - "time" - "testing" + "time" ) func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, relay.Broadcaster) { @@ -34,7 +34,9 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, broadcaster.RegisterForAll() - r := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, log) + r := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), + prometheus.DefaultRegisterer, log) + r.SetHost(h) return r, h, broadcaster @@ -148,35 +150,60 @@ func TestHandlePeerTopicEvent(t *testing.T) { pubSubTopic := "/waku/2/go/pm/test" ctx := context.Background() - // Relay and Host1 - r, h1, _ := makeWakuRelay(t, log) - err := r.Start(ctx) - require.NoError(t, err) - - // Relay and Host2 - r2, h2, _ := makeWakuRelay(t, log) - err = r2.Start(ctx) - require.NoError(t, err) + //// Relay and Host1 + //r, h1, _ := makeWakuRelay(t, log) + //err := r.Start(ctx) + //require.NoError(t, err) + // + //// Relay and Host2 + //r2, h2, _ := makeWakuRelay(t, log) + //err = r2.Start(ctx) + //require.NoError(t, err) // Peermanager with event bus - pm, eventBus := makePeerManagerWithEventBus(t, r, &h1) + //pm, eventBus := makePeerManagerWithEventBus(t, r, &h1) + //pm.ctx = ctx + //pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + + // Add h2 peer + //_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, relay.WakuRelayID_v200) + //require.NoError(t, err) + + //h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) + //err = h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID())) + //require.NoError(t, err) + // + //h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) + //err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID())) + //require.NoError(t, err) + + hosts := make([]host.Host, 5) + relays := make([]*relay.WakuRelay, 5) + + for i := 0; i < 5; i++ { + relays[i], hosts[i], _ = makeWakuRelay(t, log) + err := relays[i].Start(ctx) + require.NoError(t, err) + } + + pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) pm.ctx = ctx pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) - // Add h2 peer - _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, relay.WakuRelayID_v200) - require.NoError(t, err) + for i := 1; i < 5; i++ { + pm.host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL) + err := pm.host.Connect(ctx, hosts[i].Peerstore().PeerInfo(hosts[i].ID())) + require.NoError(t, err) + err = pm.host.Peerstore().(wps.WakuPeerstore).SetDirection(hosts[i].ID(), network.DirOutbound) + require.NoError(t, err) - h1.Peerstore().AddAddr(h2.ID(), tests.GetHostAddress(h2), peerstore.PermanentAddrTTL) - err = h1.Peerstore().AddProtocols(h2.ID(), relay.WakuRelayID_v200) - require.NoError(t, err) + //_, err = relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + //require.NoError(t, err) - go pm.connectivityLoop(ctx) + } - pc, err := NewPeerConnectionStrategy(pm, 120*time.Second, pm.logger) - require.NoError(t, err) - err = pc.Start(ctx) - require.NoError(t, err) + go pm.connectivityLoop(ctx) + time.Sleep(2 * time.Second) for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { log.Info("hosts initially", zap.String("peer", peer.String())) @@ -189,16 +216,14 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("existing topic details", zap.String("topic", topic.topic.String())) } - time.Sleep(4 * time.Second) - // Subscribe to Pubsub topic which also emits relay.PEER_JOINED - _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT peerEvt := relay.EvtPeerTopic{ PubsubTopic: pubSubTopic, - PeerID: h1.ID(), + PeerID: hosts[1].ID(), State: relay.PEER_JOINED, } @@ -241,14 +266,17 @@ func TestHandlePeerTopicEvent(t *testing.T) { } // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic := pm.subRelayTopics[peerEvt.PubsubTopic] + peerTopic := pm.subRelayTopics[pubSubTopic] //pm.checkAndUpdateTopicHealth(topic) require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) + time.Sleep(2 * time.Second) + + ///// Two // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT peerEvt2 := relay.EvtPeerTopic{ PubsubTopic: pubSubTopic, - PeerID: h2.ID(), + PeerID: hosts[2].ID(), State: relay.PEER_JOINED, } @@ -278,13 +306,103 @@ func TestHandlePeerTopicEvent(t *testing.T) { } // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic = pm.subRelayTopics[peerEvt.PubsubTopic] + peerTopic = pm.subRelayTopics[pubSubTopic] + + for id := range peerTopic.topic.ListPeers() { + log.Info("peers", zap.String("ID", strconv.Itoa(id))) + } + + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubSubTopic]) + require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) + time.Sleep(2 * time.Second) + + ////////// Three + + // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT + peerEvt3 := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: hosts[3].ID(), + State: relay.PEER_JOINED, + } + + err = emitter.Emit(peerEvt3) + require.NoError(t, err) + + // Call the appropriate handler + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtPeerTopic: + { + log.Info("Handling topic event...") + peerEvt := (relay.EvtPeerTopic)(e) + pm.handlerPeerTopicEvent(peerEvt) + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts after", zap.String("id", peer.String())) + log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) + } + } + default: + require.Fail(t, "unexpected event arrived") + } + + case <-ctx.Done(): + require.Fail(t, "closed channel") + } + + // Evaluate topic health - unhealthy at first, because no peers connected + peerTopic = pm.subRelayTopics[pubSubTopic] + + for id := range peerTopic.topic.ListPeers() { + log.Info("peers", zap.String("ID", strconv.Itoa(id))) + } + + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubSubTopic]) + require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) + + ///// Four + time.Sleep(2 * time.Second) + + // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT + peerEvt4 := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: hosts[4].ID(), + State: relay.PEER_JOINED, + } + + err = emitter.Emit(peerEvt4) + require.NoError(t, err) + + // Call the appropriate handler + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtPeerTopic: + { + log.Info("Handling topic event...") + peerEvt := (relay.EvtPeerTopic)(e) + pm.handlerPeerTopicEvent(peerEvt) + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts after", zap.String("id", peer.String())) + log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) + } + } + default: + require.Fail(t, "unexpected event arrived") + } + + case <-ctx.Done(): + require.Fail(t, "closed channel") + } + + // Evaluate topic health - unhealthy at first, because no peers connected + peerTopic = pm.subRelayTopics[pubSubTopic] for id := range peerTopic.topic.ListPeers() { log.Info("peers", zap.String("ID", strconv.Itoa(id))) } - pm.checkAndUpdateTopicHealth(pm.subRelayTopics[peerEvt.PubsubTopic]) + pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubSubTopic]) require.Equal(t, TopicHealth(MinimallyHealthy), peerTopic.healthStatus) } From f74bb4ee74335316aa88ce306f75e72cd64d5676 Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 4 Mar 2024 10:14:57 +0800 Subject: [PATCH 14/29] fix: clean up TestHandlePeerTopicEvent --- .../peermanager/topic_event_handler_test.go | 208 ++++-------------- 1 file changed, 44 insertions(+), 164 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index ac89c3c7a..26a5e64a9 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -150,33 +150,6 @@ func TestHandlePeerTopicEvent(t *testing.T) { pubSubTopic := "/waku/2/go/pm/test" ctx := context.Background() - //// Relay and Host1 - //r, h1, _ := makeWakuRelay(t, log) - //err := r.Start(ctx) - //require.NoError(t, err) - // - //// Relay and Host2 - //r2, h2, _ := makeWakuRelay(t, log) - //err = r2.Start(ctx) - //require.NoError(t, err) - - // Peermanager with event bus - //pm, eventBus := makePeerManagerWithEventBus(t, r, &h1) - //pm.ctx = ctx - //pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) - - // Add h2 peer - //_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{""}, relay.WakuRelayID_v200) - //require.NoError(t, err) - - //h1.Peerstore().AddAddrs(h2.ID(), h2.Addrs(), peerstore.PermanentAddrTTL) - //err = h1.Connect(context.Background(), h2.Peerstore().PeerInfo(h2.ID())) - //require.NoError(t, err) - // - //h2.Peerstore().AddAddrs(h1.ID(), h1.Addrs(), peerstore.PermanentAddrTTL) - //err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID())) - //require.NoError(t, err) - hosts := make([]host.Host, 5) relays := make([]*relay.WakuRelay, 5) @@ -186,10 +159,12 @@ func TestHandlePeerTopicEvent(t *testing.T) { require.NoError(t, err) } + // Create peer manager instance with the first hosts pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) pm.ctx = ctx pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) + // Connect hosts[0] with all other hosts to reach 4 connections for i := 1; i < 5; i++ { pm.host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL) err := pm.host.Connect(ctx, hosts[i].Peerstore().PeerInfo(hosts[i].ID())) @@ -197,26 +172,17 @@ func TestHandlePeerTopicEvent(t *testing.T) { err = pm.host.Peerstore().(wps.WakuPeerstore).SetDirection(hosts[i].ID(), network.DirOutbound) require.NoError(t, err) - //_, err = relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) - //require.NoError(t, err) - } go pm.connectivityLoop(ctx) - time.Sleep(2 * time.Second) - - for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts initially", zap.String("peer", peer.String())) - log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) - } + time.Sleep(2 * time.Second) - topic, ok := pm.subRelayTopics[pubSubTopic] - if ok { - log.Info("existing topic details", zap.String("topic", topic.topic.String())) + if len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)) == 0 { + log.Info("No peers for the topic yet") } - // Subscribe to Pubsub topic which also emits relay.PEER_JOINED + // Subscribe to Pubsub topic _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) @@ -233,7 +199,7 @@ func TestHandlePeerTopicEvent(t *testing.T) { err = emitter.Emit(peerEvt) require.NoError(t, err) - // Call the appropriate handler + // Process subscribe event and first PEER_JOINED event for i := 0; i < 2; i++ { select { case e := <-pm.sub.Out(): @@ -246,7 +212,6 @@ func TestHandlePeerTopicEvent(t *testing.T) { for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { log.Info("hosts before", zap.String("peer", peer.String())) log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) - } } @@ -266,143 +231,58 @@ func TestHandlePeerTopicEvent(t *testing.T) { } // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic := pm.subRelayTopics[pubSubTopic] - //pm.checkAndUpdateTopicHealth(topic) - require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) - time.Sleep(2 * time.Second) - - ///// Two - - // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT - peerEvt2 := relay.EvtPeerTopic{ - PubsubTopic: pubSubTopic, - PeerID: hosts[2].ID(), - State: relay.PEER_JOINED, - } - - err = emitter.Emit(peerEvt2) - require.NoError(t, err) - - // Call the appropriate handler - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtPeerTopic: - { - log.Info("Handling topic event...") - peerEvt := (relay.EvtPeerTopic)(e) - pm.handlerPeerTopicEvent(peerEvt) - for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts after", zap.String("id", peer.String())) - log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) - } - } - default: - require.Fail(t, "unexpected event arrived") - } - - case <-ctx.Done(): - require.Fail(t, "closed channel") - } - - // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic = pm.subRelayTopics[pubSubTopic] - - for id := range peerTopic.topic.ListPeers() { - log.Info("peers", zap.String("ID", strconv.Itoa(id))) + peerTopic, ok := pm.subRelayTopics[pubSubTopic] + if ok { + log.Info("New topic subscribed", zap.String("topic", peerTopic.topic.String())) } - - pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubSubTopic]) + pm.checkAndUpdateTopicHealth(peerTopic) require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) time.Sleep(2 * time.Second) - ////////// Three - - // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT - peerEvt3 := relay.EvtPeerTopic{ - PubsubTopic: pubSubTopic, - PeerID: hosts[3].ID(), - State: relay.PEER_JOINED, - } + // Process second to fourth PEER_JOINED event + for i := 2; i < 4; i++ { - err = emitter.Emit(peerEvt3) - require.NoError(t, err) - - // Call the appropriate handler - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtPeerTopic: - { - log.Info("Handling topic event...") - peerEvt := (relay.EvtPeerTopic)(e) - pm.handlerPeerTopicEvent(peerEvt) - for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts after", zap.String("id", peer.String())) - log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) - } - } - default: - require.Fail(t, "unexpected event arrived") + peerEvt2 := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: hosts[i].ID(), + State: relay.PEER_JOINED, } - case <-ctx.Done(): - require.Fail(t, "closed channel") - } - - // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic = pm.subRelayTopics[pubSubTopic] - - for id := range peerTopic.topic.ListPeers() { - log.Info("peers", zap.String("ID", strconv.Itoa(id))) - } - - pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubSubTopic]) - require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) - - ///// Four - time.Sleep(2 * time.Second) - - // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT - peerEvt4 := relay.EvtPeerTopic{ - PubsubTopic: pubSubTopic, - PeerID: hosts[4].ID(), - State: relay.PEER_JOINED, - } - - err = emitter.Emit(peerEvt4) - require.NoError(t, err) + err = emitter.Emit(peerEvt2) + require.NoError(t, err) - // Call the appropriate handler - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtPeerTopic: - { - log.Info("Handling topic event...") - peerEvt := (relay.EvtPeerTopic)(e) - pm.handlerPeerTopicEvent(peerEvt) - for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts after", zap.String("id", peer.String())) - log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) + // Call the appropriate handler + select { + case e := <-pm.sub.Out(): + switch e := e.(type) { + case relay.EvtPeerTopic: + { + log.Info("Handling topic event...") + peerEvt := (relay.EvtPeerTopic)(e) + pm.handlerPeerTopicEvent(peerEvt) + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + log.Info("hosts after", zap.String("id", peer.String())) + log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) + } } + default: + require.Fail(t, "unexpected event arrived") } - default: - require.Fail(t, "unexpected event arrived") + + case <-ctx.Done(): + require.Fail(t, "closed channel") } - case <-ctx.Done(): - require.Fail(t, "closed channel") + // Evaluate topic health - unhealthy at first, because D > #peers connected + pm.checkAndUpdateTopicHealth(peerTopic) + require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) + time.Sleep(2 * time.Second) } - // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic = pm.subRelayTopics[pubSubTopic] - + // Evaluate topic health - should reach minimal health - 4 peers connected for id := range peerTopic.topic.ListPeers() { - log.Info("peers", zap.String("ID", strconv.Itoa(id))) + log.Info("peers joined", zap.String("ID", strconv.Itoa(id))) } - - pm.checkAndUpdateTopicHealth(pm.subRelayTopics[pubSubTopic]) + pm.checkAndUpdateTopicHealth(peerTopic) require.Equal(t, TopicHealth(MinimallyHealthy), peerTopic.healthStatus) - } From 4b6f6d224390d64ffe5812db9f2b1f61e144347d Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 4 Mar 2024 16:13:19 +0800 Subject: [PATCH 15/29] fix: add IDS peers info --- waku/v2/peermanager/topic_event_handler_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 26a5e64a9..4e81a0939 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -164,7 +164,7 @@ func TestHandlePeerTopicEvent(t *testing.T) { pm.ctx = ctx pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) - // Connect hosts[0] with all other hosts to reach 4 connections + // Connect host[0] with all other hosts to reach 4 connections for i := 1; i < 5; i++ { pm.host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL) err := pm.host.Connect(ctx, hosts[i].Peerstore().PeerInfo(hosts[i].ID())) @@ -284,5 +284,12 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("peers joined", zap.String("ID", strconv.Itoa(id))) } pm.checkAndUpdateTopicHealth(peerTopic) + + peersIn, peersOut := pm.getRelayPeers() + log.Info("IDS peers", zap.String("in ", strconv.Itoa(len(peersIn))), zap.String("out", strconv.Itoa(len(peersOut)))) + + notConnectedPeers := pm.getNotConnectedPers(pubSubTopic) + log.Info("IDS peers", zap.String("not connected", strconv.Itoa(len(notConnectedPeers)))) + require.Equal(t, TopicHealth(MinimallyHealthy), peerTopic.healthStatus) } From 9640a8c614935a6b2fe946afb65564f5af4aad64 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 5 Mar 2024 15:20:15 +0800 Subject: [PATCH 16/29] fix: subscribe to topic from all peers --- waku/v2/peermanager/topic_event_handler_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 4e81a0939..15d497ff5 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -183,8 +183,10 @@ func TestHandlePeerTopicEvent(t *testing.T) { } // Subscribe to Pubsub topic - _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) - require.NoError(t, err) + for i := 0; i < 5; i++ { + _, err := relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + } // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT peerEvt := relay.EvtPeerTopic{ From ba2f8b9d1ef933806cbc561f8dba20e10b95c210 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 8 Mar 2024 16:34:52 +0800 Subject: [PATCH 17/29] fix: simplify test handle peer topic event - check peers were added to topic related Wakustore - check connectedness --- .../peermanager/topic_event_handler_test.go | 138 +++++------------- 1 file changed, 36 insertions(+), 102 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 15d497ff5..ba4a5f14e 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -6,6 +6,7 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" @@ -16,7 +17,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "strconv" "testing" "time" ) @@ -59,6 +59,18 @@ func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) return pm, relayEvtBus } +func emitTopicEvent(t *testing.T, pubSubTopic string, peerID peer.ID, emitter event.Emitter, state relay.PeerTopicState) { + + peerEvt := relay.EvtPeerTopic{ + PubsubTopic: pubSubTopic, + PeerID: peerID, + State: state, + } + + err := emitter.Emit(peerEvt) + require.NoError(t, err) +} + func TestSubscribeToRelayEvtBus(t *testing.T) { log := utils.Logger() @@ -174,124 +186,46 @@ func TestHandlePeerTopicEvent(t *testing.T) { } - go pm.connectivityLoop(ctx) - + // Wait for connections to settle time.Sleep(2 * time.Second) if len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic)) == 0 { log.Info("No peers for the topic yet") } - // Subscribe to Pubsub topic - for i := 0; i < 5; i++ { - _, err := relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) - require.NoError(t, err) - } + // Subscribe to Pubsub topic on first host only + _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) - // peerEvt to find: relay.PEER_JOINED, relay.PEER_LEFT - peerEvt := relay.EvtPeerTopic{ - PubsubTopic: pubSubTopic, - PeerID: hosts[1].ID(), - State: relay.PEER_JOINED, - } + // Start event loop to listen to events + ctxEventLoop := context.Background() + go pm.peerEventLoop(ctxEventLoop) + // Prepare emitter emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) require.NoError(t, err) - err = emitter.Emit(peerEvt) - require.NoError(t, err) - - // Process subscribe event and first PEER_JOINED event - for i := 0; i < 2; i++ { - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtPeerTopic: - { - log.Info("Handling topic event...") - peerEvt := (relay.EvtPeerTopic)(e) - pm.handlerPeerTopicEvent(peerEvt) - for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts before", zap.String("peer", peer.String())) - log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) - } - - } - case relay.EvtRelaySubscribed: - { - log.Info("Handling subscribe event...") - eventDetails := (relay.EvtRelaySubscribed)(e) - pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst) - } - default: - require.Fail(t, "unexpected event arrived") - } - - case <-ctx.Done(): - require.Fail(t, "closed channel") - } - } - - // Evaluate topic health - unhealthy at first, because no peers connected - peerTopic, ok := pm.subRelayTopics[pubSubTopic] - if ok { - log.Info("New topic subscribed", zap.String("topic", peerTopic.topic.String())) + // Send PEER_JOINED events for hosts 2-5 + for i := 1; i < 5; i++ { + emitTopicEvent(t, pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) + time.Sleep(100 * time.Millisecond) } - pm.checkAndUpdateTopicHealth(peerTopic) - require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) - time.Sleep(2 * time.Second) - - // Process second to fourth PEER_JOINED event - for i := 2; i < 4; i++ { - peerEvt2 := relay.EvtPeerTopic{ - PubsubTopic: pubSubTopic, - PeerID: hosts[i].ID(), - State: relay.PEER_JOINED, - } - - err = emitter.Emit(peerEvt2) - require.NoError(t, err) - - // Call the appropriate handler - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtPeerTopic: - { - log.Info("Handling topic event...") - peerEvt := (relay.EvtPeerTopic)(e) - pm.handlerPeerTopicEvent(peerEvt) - for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { - log.Info("hosts after", zap.String("id", peer.String())) - log.Info("peer", zap.String("connectedness", string(rune(pm.host.Network().Connectedness(peer))))) - } - } - default: - require.Fail(t, "unexpected event arrived") - } + // Check four hosts have joined the topic + require.Equal(t, 4, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) - case <-ctx.Done(): - require.Fail(t, "closed channel") - } - - // Evaluate topic health - unhealthy at first, because D > #peers connected - pm.checkAndUpdateTopicHealth(peerTopic) - require.Equal(t, TopicHealth(UnHealthy), peerTopic.healthStatus) - time.Sleep(2 * time.Second) + // Check all hosts have been connected + for _, peer := range pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic) { + require.Equal(t, network.Connected, pm.host.Network().Connectedness(peer)) } - // Evaluate topic health - should reach minimal health - 4 peers connected - for id := range peerTopic.topic.ListPeers() { - log.Info("peers joined", zap.String("ID", strconv.Itoa(id))) + // Send PEER_LEFT events for hosts 2-5 + for i := 1; i < 5; i++ { + emitTopicEvent(t, pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) + time.Sleep(100 * time.Millisecond) } - pm.checkAndUpdateTopicHealth(peerTopic) - - peersIn, peersOut := pm.getRelayPeers() - log.Info("IDS peers", zap.String("in ", strconv.Itoa(len(peersIn))), zap.String("out", strconv.Itoa(len(peersOut)))) - notConnectedPeers := pm.getNotConnectedPers(pubSubTopic) - log.Info("IDS peers", zap.String("not connected", strconv.Itoa(len(notConnectedPeers)))) + // Check all hosts have left the topic + require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) - require.Equal(t, TopicHealth(MinimallyHealthy), peerTopic.healthStatus) } From b70a0be6eae937eacf1e2add327d7c26e6d3a192 Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 8 Mar 2024 16:45:41 +0800 Subject: [PATCH 18/29] fix: simplify test handle relay topic subscription --- .../peermanager/topic_event_handler_test.go | 43 ++++--------------- 1 file changed, 9 insertions(+), 34 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index ba4a5f14e..5b751d5a1 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -100,31 +100,20 @@ func TestHandleRelayTopicSubscription(t *testing.T) { err := r.Start(ctx) require.NoError(t, err) - // Peermanager with event bus + // Peer manager with event bus pm, _ := makePeerManagerWithEventBus(t, r, &h1) pm.ctx = ctx - go pm.connectivityLoop(ctx) + + // Start event loop to listen to events + ctxEventLoop := context.Background() + go pm.peerEventLoop(ctxEventLoop) // Subscribe to Pubsub topic _, err = r.Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) - // Call the appropriate handler - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtRelaySubscribed: - { - eventDetails := (relay.EvtRelaySubscribed)(e) - pm.handleNewRelayTopicSubscription(eventDetails.Topic, eventDetails.TopicInst) - } - default: - require.Fail(t, "unexpected event arrived") - } - - case <-ctx.Done(): - require.Fail(t, "closed channel") - } + // Wait for event loop to call handler + time.Sleep(100 * time.Millisecond) // Check Peer Manager knows about the topic _, ok := pm.subRelayTopics[pubSubTopic] @@ -134,22 +123,8 @@ func TestHandleRelayTopicSubscription(t *testing.T) { err = r.Unsubscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) - // Call the appropriate handler - select { - case e := <-pm.sub.Out(): - switch e := e.(type) { - case relay.EvtRelayUnsubscribed: - { - eventDetails := (relay.EvtRelayUnsubscribed)(e) - pm.handleNewRelayTopicUnSubscription(eventDetails.Topic) - } - default: - require.Fail(t, "unexpected event arrived") - } - - case <-ctx.Done(): - require.Fail(t, "closed channel") - } + // Wait for event loop to call handler + time.Sleep(100 * time.Millisecond) // Check the original topic was removed from Peer Manager _, ok = pm.subRelayTopics[pubSubTopic] From 7e5138be15b31446b79e2ea77c4c73daf3fe58af Mon Sep 17 00:00:00 2001 From: Roman Date: Fri, 8 Mar 2024 17:03:32 +0800 Subject: [PATCH 19/29] fix: close event loop gracefully - TestHandleRelayTopicSubscription --- waku/v2/peermanager/topic_event_handler_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 5b751d5a1..de6705d4d 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -105,7 +105,8 @@ func TestHandleRelayTopicSubscription(t *testing.T) { pm.ctx = ctx // Start event loop to listen to events - ctxEventLoop := context.Background() + ctxEventLoop, cancel := context.WithCancel(context.Background()) + defer cancel() go pm.peerEventLoop(ctxEventLoop) // Subscribe to Pubsub topic @@ -113,7 +114,7 @@ func TestHandleRelayTopicSubscription(t *testing.T) { require.NoError(t, err) // Wait for event loop to call handler - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) // Check Peer Manager knows about the topic _, ok := pm.subRelayTopics[pubSubTopic] @@ -124,12 +125,13 @@ func TestHandleRelayTopicSubscription(t *testing.T) { require.NoError(t, err) // Wait for event loop to call handler - time.Sleep(100 * time.Millisecond) + time.Sleep(200 * time.Millisecond) // Check the original topic was removed from Peer Manager _, ok = pm.subRelayTopics[pubSubTopic] require.False(t, ok) + r.Stop() } func TestHandlePeerTopicEvent(t *testing.T) { From 621b5e1f2876396abffec777064d1613f4bf7869 Mon Sep 17 00:00:00 2001 From: Roman Date: Sat, 9 Mar 2024 09:44:40 +0800 Subject: [PATCH 20/29] fix: prevent data race at TestHandleRelayTopicSubscription --- waku/v2/peermanager/topic_event_handler_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index de6705d4d..57f3c39ec 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -117,8 +117,10 @@ func TestHandleRelayTopicSubscription(t *testing.T) { time.Sleep(200 * time.Millisecond) // Check Peer Manager knows about the topic + pm.topicMutex.RLock() _, ok := pm.subRelayTopics[pubSubTopic] require.True(t, ok) + pm.topicMutex.RUnlock() // UnSubscribe from Pubsub topic err = r.Unsubscribe(ctx, protocol.NewContentFilter(pubSubTopic)) @@ -128,8 +130,10 @@ func TestHandleRelayTopicSubscription(t *testing.T) { time.Sleep(200 * time.Millisecond) // Check the original topic was removed from Peer Manager + pm.topicMutex.RLock() _, ok = pm.subRelayTopics[pubSubTopic] require.False(t, ok) + pm.topicMutex.RUnlock() r.Stop() } From 1faed8bf16585f2af9ef22aa71e2d2b92aa074c0 Mon Sep 17 00:00:00 2001 From: Roman Date: Sat, 9 Mar 2024 16:21:37 +0800 Subject: [PATCH 21/29] fix: ineffectual assignment to err --- waku/v2/peermanager/connection_gater_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/waku/v2/peermanager/connection_gater_test.go b/waku/v2/peermanager/connection_gater_test.go index 26a85671b..17f74f462 100644 --- a/waku/v2/peermanager/connection_gater_test.go +++ b/waku/v2/peermanager/connection_gater_test.go @@ -68,6 +68,7 @@ func TestConnectionGater(t *testing.T) { require.True(t, allow) ip, err := manet.ToIP(remoteAddr1) + require.NoError(t, err) connGater.limiter[ip.String()] = 3 // Above the connection limit From e133ffba39622a78a4484ea82b5b82185939572a Mon Sep 17 00:00:00 2001 From: Roman Date: Mon, 11 Mar 2024 21:51:14 +0800 Subject: [PATCH 22/29] fix: lower data amount used at ValidPayloads tests for filter --- waku/v2/protocol/filter/filter_push_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/waku/v2/protocol/filter/filter_push_test.go b/waku/v2/protocol/filter/filter_push_test.go index e5bc530f3..126e95608 100644 --- a/waku/v2/protocol/filter/filter_push_test.go +++ b/waku/v2/protocol/filter/filter_push_test.go @@ -16,7 +16,7 @@ func (s *FilterTestSuite) TestValidPayloadsASCII() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomASCIIString) + messages := prepareData(50, false, false, true, tests.GenerateRandomASCIIString) // All messages should be received s.waitForMessages(func() { @@ -34,7 +34,7 @@ func (s *FilterTestSuite) TestValidPayloadsUTF8() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomUTF8String) + messages := prepareData(50, false, false, true, tests.GenerateRandomUTF8String) // All messages should be received s.waitForMessages(func() { @@ -52,7 +52,7 @@ func (s *FilterTestSuite) TestValidPayloadsBase64() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomBase64String) + messages := prepareData(50, false, false, true, tests.GenerateRandomBase64String) // All messages should be received s.waitForMessages(func() { @@ -70,7 +70,7 @@ func (s *FilterTestSuite) TestValidPayloadsJSON() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomJSONString) + messages := prepareData(50, false, false, true, tests.GenerateRandomJSONString) // All messages should be received s.waitForMessages(func() { @@ -88,7 +88,7 @@ func (s *FilterTestSuite) TestValidPayloadsURLEncoded() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString) + messages := prepareData(50, false, false, true, tests.GenerateRandomURLEncodedString) // All messages should be received s.waitForMessages(func() { @@ -106,7 +106,7 @@ func (s *FilterTestSuite) TestValidPayloadsSQL() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(100, false, false, true, tests.GenerateRandomSQLInsert) + messages := prepareData(50, false, false, true, tests.GenerateRandomSQLInsert) // All messages should be received s.waitForMessages(func() { From 6b2d7435dedf3a0014b67e4d17eacdc149bea076 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 12 Mar 2024 13:08:34 +0800 Subject: [PATCH 23/29] fix: simplify emitTopicEvent() --- waku/v2/peermanager/topic_event_handler_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 57f3c39ec..14ee21055 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -59,7 +59,7 @@ func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) return pm, relayEvtBus } -func emitTopicEvent(t *testing.T, pubSubTopic string, peerID peer.ID, emitter event.Emitter, state relay.PeerTopicState) { +func emitTopicEvent(pubSubTopic string, peerID peer.ID, emitter event.Emitter, state relay.PeerTopicState) error { peerEvt := relay.EvtPeerTopic{ PubsubTopic: pubSubTopic, @@ -67,8 +67,7 @@ func emitTopicEvent(t *testing.T, pubSubTopic string, peerID peer.ID, emitter ev State: state, } - err := emitter.Emit(peerEvt) - require.NoError(t, err) + return emitter.Emit(peerEvt) } func TestSubscribeToRelayEvtBus(t *testing.T) { @@ -188,7 +187,8 @@ func TestHandlePeerTopicEvent(t *testing.T) { // Send PEER_JOINED events for hosts 2-5 for i := 1; i < 5; i++ { - emitTopicEvent(t, pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) + err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) + require.NoError(t, err) time.Sleep(100 * time.Millisecond) } @@ -202,7 +202,8 @@ func TestHandlePeerTopicEvent(t *testing.T) { // Send PEER_LEFT events for hosts 2-5 for i := 1; i < 5; i++ { - emitTopicEvent(t, pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) + err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) + require.NoError(t, err) time.Sleep(100 * time.Millisecond) } From d6fb227372a299eeec1c27655b10581fbdf53934 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 12 Mar 2024 19:35:00 +0800 Subject: [PATCH 24/29] Revert "fix: lower data amount used at ValidPayloads tests for filter" This reverts commit e133ffba39622a78a4484ea82b5b82185939572a. --- waku/v2/protocol/filter/filter_push_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/waku/v2/protocol/filter/filter_push_test.go b/waku/v2/protocol/filter/filter_push_test.go index 126e95608..e5bc530f3 100644 --- a/waku/v2/protocol/filter/filter_push_test.go +++ b/waku/v2/protocol/filter/filter_push_test.go @@ -16,7 +16,7 @@ func (s *FilterTestSuite) TestValidPayloadsASCII() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(50, false, false, true, tests.GenerateRandomASCIIString) + messages := prepareData(100, false, false, true, tests.GenerateRandomASCIIString) // All messages should be received s.waitForMessages(func() { @@ -34,7 +34,7 @@ func (s *FilterTestSuite) TestValidPayloadsUTF8() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(50, false, false, true, tests.GenerateRandomUTF8String) + messages := prepareData(100, false, false, true, tests.GenerateRandomUTF8String) // All messages should be received s.waitForMessages(func() { @@ -52,7 +52,7 @@ func (s *FilterTestSuite) TestValidPayloadsBase64() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(50, false, false, true, tests.GenerateRandomBase64String) + messages := prepareData(100, false, false, true, tests.GenerateRandomBase64String) // All messages should be received s.waitForMessages(func() { @@ -70,7 +70,7 @@ func (s *FilterTestSuite) TestValidPayloadsJSON() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(50, false, false, true, tests.GenerateRandomJSONString) + messages := prepareData(100, false, false, true, tests.GenerateRandomJSONString) // All messages should be received s.waitForMessages(func() { @@ -88,7 +88,7 @@ func (s *FilterTestSuite) TestValidPayloadsURLEncoded() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(50, false, false, true, tests.GenerateRandomURLEncodedString) + messages := prepareData(100, false, false, true, tests.GenerateRandomURLEncodedString) // All messages should be received s.waitForMessages(func() { @@ -106,7 +106,7 @@ func (s *FilterTestSuite) TestValidPayloadsSQL() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Prepare data - messages := prepareData(50, false, false, true, tests.GenerateRandomSQLInsert) + messages := prepareData(100, false, false, true, tests.GenerateRandomSQLInsert) // All messages should be received s.waitForMessages(func() { From dfdde56fd61209c567132fa067088db373bd2193 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 12 Mar 2024 19:54:28 +0800 Subject: [PATCH 25/29] fix: add test for uniqueness to TestServiceSlot --- waku/v2/peermanager/service_slot_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/waku/v2/peermanager/service_slot_test.go b/waku/v2/peermanager/service_slot_test.go index 9691d9e37..80687bbfb 100644 --- a/waku/v2/peermanager/service_slot_test.go +++ b/waku/v2/peermanager/service_slot_test.go @@ -41,6 +41,9 @@ func TestServiceSlot(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(maps.Keys(fetchedPeers))) + // Check for uniqueness + require.NotEqual(t, maps.Keys(fetchedPeers)[0], maps.Keys(fetchedPeers)[1]) + slots.getPeers(protocol).remove(peerID2) fetchedPeers, err = slots.getPeers(protocol).getRandom(10) From 6b94d07b746485c07ac6f75948a40efc162d04c9 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 12 Mar 2024 20:44:42 +0800 Subject: [PATCH 26/29] test: subscribe all hosts experiment --- .../peermanager/topic_event_handler_test.go | 49 ++++++++++--------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 14ee21055..2986aea8d 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -152,7 +152,7 @@ func TestHandlePeerTopicEvent(t *testing.T) { } // Create peer manager instance with the first hosts - pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) + pm, _ := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) pm.ctx = ctx pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) @@ -173,24 +173,29 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("No peers for the topic yet") } - // Subscribe to Pubsub topic on first host only - _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) - require.NoError(t, err) + for i := 1; i < 5; i++ { + // Subscribe to Pubsub topic on first host only + _, err := relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + } // Start event loop to listen to events ctxEventLoop := context.Background() go pm.peerEventLoop(ctxEventLoop) - // Prepare emitter - emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) - require.NoError(t, err) + //// Prepare emitter + //emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) + //require.NoError(t, err) - // Send PEER_JOINED events for hosts 2-5 - for i := 1; i < 5; i++ { - err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) - require.NoError(t, err) - time.Sleep(100 * time.Millisecond) - } + //// Send PEER_JOINED events for hosts 2-5 + //for i := 1; i < 5; i++ { + // err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) + // require.NoError(t, err) + // time.Sleep(100 * time.Millisecond) + //} + + // Wait for connections to settle + time.Sleep(30 * time.Second) // Check four hosts have joined the topic require.Equal(t, 4, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) @@ -200,14 +205,14 @@ func TestHandlePeerTopicEvent(t *testing.T) { require.Equal(t, network.Connected, pm.host.Network().Connectedness(peer)) } - // Send PEER_LEFT events for hosts 2-5 - for i := 1; i < 5; i++ { - err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) - require.NoError(t, err) - time.Sleep(100 * time.Millisecond) - } - - // Check all hosts have left the topic - require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) + //// Send PEER_LEFT events for hosts 2-5 + //for i := 1; i < 5; i++ { + // err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) + // require.NoError(t, err) + // time.Sleep(100 * time.Millisecond) + //} + // + //// Check all hosts have left the topic + //require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) } From 0372719e4713f5969ded5060c4974acb7460be97 Mon Sep 17 00:00:00 2001 From: Roman Date: Wed, 13 Mar 2024 18:29:15 +0800 Subject: [PATCH 27/29] test: experiment - start event loop before subscribe --- waku/v2/peermanager/topic_event_handler_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 2986aea8d..d5acc1042 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -173,16 +173,16 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("No peers for the topic yet") } + // Start event loop to listen to events + ctxEventLoop := context.Background() + go pm.peerEventLoop(ctxEventLoop) + for i := 1; i < 5; i++ { // Subscribe to Pubsub topic on first host only _, err := relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) require.NoError(t, err) } - // Start event loop to listen to events - ctxEventLoop := context.Background() - go pm.peerEventLoop(ctxEventLoop) - //// Prepare emitter //emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) //require.NoError(t, err) From ce77fd652e55471e322db8e4ffb01c4b98768cad Mon Sep 17 00:00:00 2001 From: Roman Date: Thu, 14 Mar 2024 20:36:39 +0800 Subject: [PATCH 28/29] fix: restore TestHandlePeerTopicEvent to limit its scope to pm events only --- .../peermanager/topic_event_handler_test.go | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index d5acc1042..14ee21055 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -152,7 +152,7 @@ func TestHandlePeerTopicEvent(t *testing.T) { } // Create peer manager instance with the first hosts - pm, _ := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) + pm, eventBus := makePeerManagerWithEventBus(t, relays[0], &hosts[0]) pm.ctx = ctx pm.RegisterWakuProtocol(relay.WakuRelayID_v200, relay.WakuRelayENRField) @@ -173,30 +173,25 @@ func TestHandlePeerTopicEvent(t *testing.T) { log.Info("No peers for the topic yet") } + // Subscribe to Pubsub topic on first host only + _, err := relays[0].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + require.NoError(t, err) + // Start event loop to listen to events ctxEventLoop := context.Background() go pm.peerEventLoop(ctxEventLoop) + // Prepare emitter + emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) + require.NoError(t, err) + + // Send PEER_JOINED events for hosts 2-5 for i := 1; i < 5; i++ { - // Subscribe to Pubsub topic on first host only - _, err := relays[i].Subscribe(ctx, protocol.NewContentFilter(pubSubTopic)) + err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) require.NoError(t, err) + time.Sleep(100 * time.Millisecond) } - //// Prepare emitter - //emitter, err := eventBus.Emitter(new(relay.EvtPeerTopic)) - //require.NoError(t, err) - - //// Send PEER_JOINED events for hosts 2-5 - //for i := 1; i < 5; i++ { - // err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_JOINED) - // require.NoError(t, err) - // time.Sleep(100 * time.Millisecond) - //} - - // Wait for connections to settle - time.Sleep(30 * time.Second) - // Check four hosts have joined the topic require.Equal(t, 4, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) @@ -205,14 +200,14 @@ func TestHandlePeerTopicEvent(t *testing.T) { require.Equal(t, network.Connected, pm.host.Network().Connectedness(peer)) } - //// Send PEER_LEFT events for hosts 2-5 - //for i := 1; i < 5; i++ { - // err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) - // require.NoError(t, err) - // time.Sleep(100 * time.Millisecond) - //} - // - //// Check all hosts have left the topic - //require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) + // Send PEER_LEFT events for hosts 2-5 + for i := 1; i < 5; i++ { + err = emitTopicEvent(pubSubTopic, hosts[i].ID(), emitter, relay.PEER_LEFT) + require.NoError(t, err) + time.Sleep(100 * time.Millisecond) + } + + // Check all hosts have left the topic + require.Equal(t, 0, len(pm.host.Peerstore().(*wps.WakuPeerstoreImpl).PeersByPubSubTopic(pubSubTopic))) } From 86fbbff3c5c4e89fa069735c719f94a2a8e08f4a Mon Sep 17 00:00:00 2001 From: Roman Date: Sat, 16 Mar 2024 09:15:02 +0800 Subject: [PATCH 29/29] fix: update NewPeerManager() call to include metadata --- waku/v2/peermanager/topic_event_handler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waku/v2/peermanager/topic_event_handler_test.go b/waku/v2/peermanager/topic_event_handler_test.go index 14ee21055..8fc8da946 100644 --- a/waku/v2/peermanager/topic_event_handler_test.go +++ b/waku/v2/peermanager/topic_event_handler_test.go @@ -44,7 +44,7 @@ func makeWakuRelay(t *testing.T, log *zap.Logger) (*relay.WakuRelay, host.Host, func makePeerManagerWithEventBus(t *testing.T, r *relay.WakuRelay, h *host.Host) (*PeerManager, event.Bus) { // Host 1 used by peer manager - pm := NewPeerManager(10, 20, utils.Logger()) + pm := NewPeerManager(10, 20, nil, utils.Logger()) pm.SetHost(*h) // Create a new relay event bus @@ -77,7 +77,7 @@ func TestSubscribeToRelayEvtBus(t *testing.T) { r, h1, _ := makeWakuRelay(t, log) // Host 1 used by peer manager - pm := NewPeerManager(10, 20, utils.Logger()) + pm := NewPeerManager(10, 20, nil, utils.Logger()) pm.SetHost(h1) // Create a new relay event bus