Skip to content

Commit

Permalink
feat: shard based filtering in peer exchange (#1194)
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem authored Aug 15, 2024
1 parent 3b2cde8 commit bc16c74
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
34 changes: 33 additions & 1 deletion waku/v2/protocol/peer_exchange/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
"go.uber.org/zap"
Expand Down Expand Up @@ -155,8 +157,38 @@ func (wakuPX *WakuPeerExchange) Stop() {
})
}

func (wakuPX *WakuPeerExchange) DefaultPredicate() discv5.Predicate {
return discv5.FilterPredicate(func(n *enode.Node) bool {
localRS, err := wenr.RelaySharding(wakuPX.disc.Node().Record())
if err != nil {
return false
}

if localRS == nil { // No shard registered, so no need to check for shards
return true
}

nodeRS, err := wenr.RelaySharding(n.Record())
if err != nil {
wakuPX.log.Debug("failed to get relay shards from node record", logging.ENode("node", n), zap.Error(err))
return false
}

if nodeRS == nil {
// Node has no shards registered.
return false
}

if nodeRS.ClusterID != localRS.ClusterID {
return false
}

return true
})
}

func (wakuPX *WakuPeerExchange) iterate(ctx context.Context) error {
iterator, err := wakuPX.disc.PeerIterator()
iterator, err := wakuPX.disc.PeerIterator(wakuPX.DefaultPredicate())
if err != nil {
return fmt.Errorf("obtaining iterator: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
ip1, _ := tests.ExtractIP(host1.Addrs()[0])
l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
err = wenr.Update(utils.Logger(), l1, wenr.WithWakuRelaySharding(protocol.RelayShards{ClusterID: 16, ShardIDs: []uint16{32}}))
require.NoError(t, err)
discv5PeerConn1 := discv5.NewTestPeerDiscoverer()
d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
Expand All @@ -69,6 +71,8 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
require.NoError(t, err)
l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
err = wenr.Update(utils.Logger(), l2, wenr.WithWakuRelaySharding(protocol.RelayShards{ClusterID: 16, ShardIDs: []uint16{32}}))
require.NoError(t, err)
discv5PeerConn2 := discv5.NewTestPeerDiscoverer()
d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
require.NoError(t, err)
Expand Down

0 comments on commit bc16c74

Please sign in to comment.