From bc16c74f2ec9642ff961bf30ebeeca4f3dbf5e87 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 15 Aug 2024 07:27:56 +0530 Subject: [PATCH] feat: shard based filtering in peer exchange (#1194) --- waku/v2/protocol/peer_exchange/protocol.go | 34 ++++++++++++++++++- .../peer_exchange/waku_peer_exchange_test.go | 4 +++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 08e5051ca..3f33b2ec8 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -7,6 +7,7 @@ import ( "math" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" @@ -17,6 +18,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" + wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb" "github.com/waku-org/go-waku/waku/v2/service" "go.uber.org/zap" @@ -155,8 +157,38 @@ func (wakuPX *WakuPeerExchange) Stop() { }) } +func (wakuPX *WakuPeerExchange) DefaultPredicate() discv5.Predicate { + return discv5.FilterPredicate(func(n *enode.Node) bool { + localRS, err := wenr.RelaySharding(wakuPX.disc.Node().Record()) + if err != nil { + return false + } + + if localRS == nil { // No shard registered, so no need to check for shards + return true + } + + nodeRS, err := wenr.RelaySharding(n.Record()) + if err != nil { + wakuPX.log.Debug("failed to get relay shards from node record", logging.ENode("node", n), zap.Error(err)) + return false + } + + if nodeRS == nil { + // Node has no shards registered. + return false + } + + if nodeRS.ClusterID != localRS.ClusterID { + return false + } + + return true + }) +} + func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error { - iterator, err := wakuPX.disc.PeerIterator() + iterator, err := wakuPX.disc.PeerIterator(wakuPX.DefaultPredicate()) if err != nil { return fmt.Errorf("obtaining iterator: %w", err) } diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go index 0078aaa19..b0de6c5ab 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go @@ -57,6 +57,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { ip1, _ := tests.ExtractIP(host1.Addrs()[0]) l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) + err = wenr.Update(utils.Logger(), l1, wenr.WithWakuRelaySharding(protocol.RelayShards{ClusterID: 16, ShardIDs: []uint16{32}})) + require.NoError(t, err) discv5PeerConn1 := discv5.NewTestPeerDiscoverer() d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1))) require.NoError(t, err) @@ -69,6 +71,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) { require.NoError(t, err) l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger()) require.NoError(t, err) + err = wenr.Update(utils.Logger(), l2, wenr.WithWakuRelaySharding(protocol.RelayShards{ClusterID: 16, ShardIDs: []uint16{32}})) + require.NoError(t, err) discv5PeerConn2 := discv5.NewTestPeerDiscoverer() d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()})) require.NoError(t, err)