Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v2 Peer Diversity Filter #953

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions v2/diversity_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package dht

import (
"sync"

"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/plprobelab/go-libdht/kad/triert"
)

var _ triert.NodeFilter[kadt.Key, kadt.PeerID] = (*TrieRTPeerDiversityFilter)(nil)

// TrieRTPeerDiversityFilter is a wrapper around the `peerdiversity.Filter` used
// as `triert.NodeFilter` to configure the diversity filter for the TrieRT
// Routing Table. TrieRTPeerDiversityFilter should be provided as in the TrieRT
// config, and is not applied directly on the `DHT` instance.
// Please see the docs for `peerdiversity.Filter` for more details
type TrieRTPeerDiversityFilter struct {
*peerdiversity.Filter
}

// NewRTPeerDiversityFilter constructs the `TrieRTPeerDiversityFilter` defining
// the diversity filter for the TrieRT Routing Table.
// `maxPerCpl` represents the maximum number of peers per common prefix length
// allowed to share the same /16 IP group.
// `maxForTable` represents the maximum number of peers in the routing table
// allowed to share the same /16 IP group.
func NewRTPeerDiversityFilter(h host.Host, maxPerCpl, maxForTable int) (*TrieRTPeerDiversityFilter, error) {
multiaddrsFn := func(p peer.ID) []ma.Multiaddr {
cs := h.Network().ConnsToPeer(p)
addr := make([]ma.Multiaddr, 0, len(cs))
for _, c := range cs {
addr = append(addr, c.RemoteMultiaddr())
}
return addr
}

peerIpGroupFilter := newRtPeerIPGroupFilter(maxPerCpl, maxForTable, multiaddrsFn)
filter, err := peerdiversity.NewFilter(peerIpGroupFilter, "triert/diversity",
func(p peer.ID) int {
return kadt.PeerID(h.ID()).Key().CommonPrefixLength(kadt.PeerID(p).Key())
})

if err != nil {
return nil, err
}

Check warning on line 49 in v2/diversity_filter.go

View check run for this annotation

Codecov / codecov/patch

v2/diversity_filter.go#L48-L49

Added lines #L48 - L49 were not covered by tests

return &TrieRTPeerDiversityFilter{
Filter: filter,
}, nil
}

// TryAdd is called by TrieRT when a new node is added to the routing table.
func (f *TrieRTPeerDiversityFilter) TryAdd(rt *triert.TrieRT[kadt.Key, kadt.PeerID], n kadt.PeerID) bool {
return f.Filter.TryAdd(peer.ID(n))
}

// Remove is called by TrieRT when a node is removed from the routing table.
func (f *TrieRTPeerDiversityFilter) Remove(n kadt.PeerID) {
f.Filter.Remove(peer.ID(n))
}

var _ peerdiversity.PeerIPGroupFilter = (*rtPeerIPGroupFilter)(nil)

// rtPeerIPGroupFilter is an implementation of `peerdiversity.PeerIPGroupFilter`.
// Please see the docs for `peerdiversity.PeerIPGroupFilter` for more details.
type rtPeerIPGroupFilter struct {
mu sync.RWMutex

maxPerCpl int
maxForTable int

multiaddrsFn func(peer.ID) []ma.Multiaddr

cplIpGroupCount map[int]map[peerdiversity.PeerIPGroupKey]int
tableIpGroupCount map[peerdiversity.PeerIPGroupKey]int
}

// newRtPeerIPGroupFilter constructs the `PeerIPGroupFilter` that will be used
// to configure the diversity filter for the Routing Table.
func newRtPeerIPGroupFilter(maxPerCpl, maxForTable int,
multiaddrsFn func(peer.ID) []ma.Multiaddr) *rtPeerIPGroupFilter {
return &rtPeerIPGroupFilter{
multiaddrsFn: multiaddrsFn,

maxPerCpl: maxPerCpl,
maxForTable: maxForTable,

cplIpGroupCount: make(map[int]map[peerdiversity.PeerIPGroupKey]int),
tableIpGroupCount: make(map[peerdiversity.PeerIPGroupKey]int),
}

}

// Allow is called by the `peerdiversity.Filter` to check if a peer is allowed
// to be added to the routing table.
func (r *rtPeerIPGroupFilter) Allow(g peerdiversity.PeerGroupInfo) bool {
r.mu.RLock()
defer r.mu.RUnlock()

key := g.IPGroupKey
cpl := g.Cpl

if r.tableIpGroupCount[key] >= r.maxForTable {

return false
}

c, ok := r.cplIpGroupCount[cpl]
allow := !ok || c[key] < r.maxPerCpl
return allow
}

// Increment is called by the `peerdiversity.Filter` when a peer is added to the
// routing table.
func (r *rtPeerIPGroupFilter) Increment(g peerdiversity.PeerGroupInfo) {
r.mu.Lock()
defer r.mu.Unlock()

key := g.IPGroupKey
cpl := g.Cpl

r.tableIpGroupCount[key] = r.tableIpGroupCount[key] + 1
if _, ok := r.cplIpGroupCount[cpl]; !ok {
r.cplIpGroupCount[cpl] = make(map[peerdiversity.PeerIPGroupKey]int)
}

r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] + 1
}

// Decrement is called by the `peerdiversity.Filter` when a peer is removed from
// the routing table.
func (r *rtPeerIPGroupFilter) Decrement(g peerdiversity.PeerGroupInfo) {
r.mu.Lock()
defer r.mu.Unlock()

key := g.IPGroupKey
cpl := g.Cpl

r.tableIpGroupCount[key] = r.tableIpGroupCount[key] - 1
if r.tableIpGroupCount[key] == 0 {
delete(r.tableIpGroupCount, key)
}

r.cplIpGroupCount[cpl][key] = r.cplIpGroupCount[cpl][key] - 1
if r.cplIpGroupCount[cpl][key] == 0 {
delete(r.cplIpGroupCount[cpl], key)
}
if len(r.cplIpGroupCount[cpl]) == 0 {
delete(r.cplIpGroupCount, cpl)
}
}

// PeerAddresses is called by the `peerdiversity.Filter` to get the list of
// addresses of a peer.
func (r *rtPeerIPGroupFilter) PeerAddresses(p peer.ID) []ma.Multiaddr {
return r.multiaddrsFn(p)
}
182 changes: 182 additions & 0 deletions v2/diversity_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package dht

import (
"context"
"testing"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
"github.com/libp2p/go-libp2p/core/peer"
ma "github.com/multiformats/go-multiaddr"
"github.com/plprobelab/go-libdht/kad/triert"
"github.com/stretchr/testify/require"
)

type peerInfo struct {
ID string
Addrs []string
}

var inputPeers []peerInfo = []peerInfo{ // first bits of Kad Key, bucket index wrt. 1EoooPEER1
{"1EoooPEER1", []string{"/ip4/0.0.0.0/tcp/4001"}}, // 0100 1101
{"1EoooPEER2", []string{"/ip4/1.1.1.1/tcp/4001"}}, // 1110 -> b0
{"1EoooPEER3", []string{"/ip4/1.1.64.64/tcp/4001"}}, // 0110 -> b2
{"1EoooPEER4", []string{"/ip4/1.1.128.128/tcp/4001"}}, // 0110 -> b2
{"1EoooPEER5", []string{"/ip4/1.1.192.192/tcp/4001"}}, // 0100 1110 -> b6
{"1EoooPEER6", []string{"/ip4/1.1.2.2/tcp/4001"}}, // 1010 -> b0
{"1EoooPEER7", []string{"/ip4/1.2.1.1/tcp/4001"}}, // 0000 -> b1
{"1EoooPEER8", []string{"/ip4/1.1.255.255/tcp/4001"}}, // 0111 -> b2
{"1EoooPEER9", []string{}}, // 1111 -> b0

{"1EooPEER11", []string{"/ip6/2000:1234::/tcp/4001", "/ip4/1.1.3.3/tcp/4001"}}, // 1100 -> b0
{"1EooPEER12", []string{"/ip6/2000:1234::1/tcp/4001"}}, // 0001 -> b1
{"1EooPEER13", []string{"/ip6/2000:1234:5678::/tcp/4001"}}, // 0110 -> b2
{"1EooPEER14", []string{"/ip6/2000:1234::2/tcp/4001", "/ip4/3.3.3.3/tcp/4001"}}, // 0000 -> b1
{"1EooPEER15", []string{"/ip6/2000:1234:5678::1/tcp/4001"}}, // 0000 -> b1
}

var peers map[string]peer.AddrInfo = make(map[string]peer.AddrInfo)

func init() {
for _, p := range inputPeers {
pid, err := peer.Decode(p.ID)
if err != nil {
panic(err)
}
addrs := make([]ma.Multiaddr, 0, len(p.Addrs))
for _, a := range p.Addrs {
addr, err := ma.NewMultiaddr(a)
if err != nil {
panic(err)
}
addrs = append(addrs, addr)
}
peers[p.ID] = peer.AddrInfo{ID: pid, Addrs: addrs}
}
}

func TestRtPeerIPGroupFilter(t *testing.T) {
gf := newRtPeerIPGroupFilter(2, 3, func(p peer.ID) []ma.Multiaddr {
for _, pi := range peers {
if pi.ID == p {
return pi.Addrs
}
}
return nil
})

filter, err := peerdiversity.NewFilter(gf, "triert/diversity",
func(p peer.ID) int {
return kadt.PeerID(peers["1EoooPEER1"].ID).Key().CommonPrefixLength(
kadt.PeerID(p).Key())
})
require.NoError(t, err)

// generate routing table using 1EoooPEER1 as the local peer, and the
// diversity filter
rtcfg := &triert.Config[kadt.Key, kadt.PeerID]{
NodeFilter: &TrieRTPeerDiversityFilter{Filter: filter},
}
rt, err := triert.New[kadt.Key, kadt.PeerID](
kadt.PeerID(peers["1EoooPEER1"].ID), rtcfg)
require.NoError(t, err)

// add 3 peers with the same IP group (1.1.0.0/16)
success := rt.AddNode(kadt.PeerID(peers["1EoooPEER2"].ID))
require.True(t, success)
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER3"].ID))
require.True(t, success)
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER4"].ID))
require.True(t, success)

// add another peer with the same IP group (1.1.0.0/16) will fail
// (maxForTable = 3)
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER5"].ID))
require.False(t, success)

// remove 1EoooPEER2 from the routing table
success = rt.RemoveKey(kadt.PeerID(peers["1EoooPEER2"].ID).Key())
require.True(t, success)

// adding 1EoooPEER8 will fail, because it falls in the same bucket (2) as
// 1EoooPEER3 and 1EoooPEER4 and it has the same IP group (maxPerCpl = 2)
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER8"].ID))
require.False(t, success)

// adding 1EoooPEER6 will succeed, because it belongs to bucket 0, which has
// no other peers using the same IP group
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER6"].ID))
require.True(t, success)

// adding 1EoooPEER7 will succeed, because it doesn't share the same IP
// group with any other peer in the routing table
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER7"].ID))
require.True(t, success)
// removing the last peer from an IP group
success = rt.RemoveKey(kadt.PeerID(peers["1EoooPEER7"].ID).Key())
require.True(t, success)

// adding 1EoooPEER9 will fail, because it doesn't have a valid multiaddr
success = rt.AddNode(kadt.PeerID(peers["1EoooPEER9"].ID))
require.False(t, success)

// adding 1EooPEER11 will fail, because out of its 2 multiaddrs, one belongs
// to an IP group that already has 3 peers in the routing table
success = rt.AddNode(kadt.PeerID(peers["1EooPEER11"].ID))
require.False(t, success)

// adding 1EooPEER12 will succeed, because it is the first peer in its
// ip6 group
success = rt.AddNode(kadt.PeerID(peers["1EooPEER12"].ID))
require.True(t, success)

// adding 1EooPEER14 will succeed because both its multiaddrs belong to non
// full ip groups
success = rt.AddNode(kadt.PeerID(peers["1EooPEER14"].ID))
require.True(t, success)

// adding 1EooPEER15 will fail because its ip6 group is full for cpl = 1
success = rt.AddNode(kadt.PeerID(peers["1EooPEER15"].ID))
require.False(t, success)

// adding 1EooPEER13 will succeed, because even tough it shares the same
// ip6 group with 1EooPEER15, it has a different cpl
success = rt.AddNode(kadt.PeerID(peers["1EooPEER13"].ID))
require.True(t, success)
}

func TestRTPeerDiversityFilter(t *testing.T) {
ctx := context.Background()
h, err := libp2p.New()
require.NoError(t, err)

// create 2 remote peers
h1, err := libp2p.New()
require.NoError(t, err)
h2, err := libp2p.New()
require.NoError(t, err)

// connect h to h1 and h2
err = h.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()})
require.NoError(t, err)
err = h.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})
require.NoError(t, err)

// create peer filter and routing table
peerFilter, err := NewRTPeerDiversityFilter(h, 1, 1)
require.NoError(t, err)
rtcfg := &triert.Config[kadt.Key, kadt.PeerID]{
NodeFilter: peerFilter,
}
rt, err := triert.New[kadt.Key, kadt.PeerID](kadt.PeerID(h.ID()), rtcfg)
require.NoError(t, err)

// try to add h1 to the routing table. succeeds because it is the first peer
success := rt.AddNode(kadt.PeerID(h1.ID()))
require.True(t, success)

// try to add h2 to the routing table. fails because it is the second peer
success = rt.AddNode(kadt.PeerID(h2.ID()))
require.False(t, success)
}
6 changes: 4 additions & 2 deletions v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ require (
github.com/ipfs/go-ds-leveldb v0.5.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/libp2p/go-libp2p v0.30.0
github.com/libp2p/go-libp2p-kbucket v0.5.0
github.com/libp2p/go-libp2p-record v0.2.0
github.com/libp2p/go-msgio v0.3.0
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multihash v0.2.3
github.com/pkg/errors v0.9.1 // indirect
github.com/plprobelab/go-libdht v0.0.0-20230928202609-8c74cc7954b3
github.com/prometheus/client_golang v1.16.0 // indirect
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/otel v1.18.0
Expand All @@ -30,8 +32,6 @@ require (
google.golang.org/protobuf v1.31.0
)

require github.com/plprobelab/go-libdht v0.0.0-20230928112736-796722ce828d

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down Expand Up @@ -59,6 +59,7 @@ require (
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipld/go-ipld-prime v0.21.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
Expand Down Expand Up @@ -92,6 +93,7 @@ require (
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
Expand Down
Loading
Loading