Skip to content

Commit

Permalink
feat: findProvidersAsync (#938)
Browse files Browse the repository at this point in the history
* feat: findProvidersAsync

* add: find providers tests

* add config tests
  • Loading branch information
dennis-tra authored Sep 26, 2023
1 parent 09dd7b0 commit dd5e537
Show file tree
Hide file tree
Showing 7 changed files with 402 additions and 16 deletions.
3 changes: 2 additions & 1 deletion v2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Backend interface {
Store(ctx context.Context, key string, value any) (any, error)

// Fetch returns the record for the given path or a [ds.ErrNotFound] if it
// wasn't found or another error if any occurred.
// wasn't found or another error if any occurred. key won't contain the
// namespace prefix.
Fetch(ctx context.Context, key string) (any, error)
}

Expand Down
27 changes: 27 additions & 0 deletions v2/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,33 @@ func TestConfig_Validate(t *testing.T) {
assert.Error(t, cfg.Validate())
})

t.Run("backends for ipfs protocol (public key missing)", func(t *testing.T) {
cfg := DefaultConfig()
cfg.ProtocolID = ProtocolIPFS
cfg.Backends[namespaceProviders] = &RecordBackend{}
cfg.Backends[namespaceIPNS] = &RecordBackend{}
cfg.Backends["another"] = &RecordBackend{}
assert.Error(t, cfg.Validate())
})

t.Run("backends for ipfs protocol (ipns missing)", func(t *testing.T) {
cfg := DefaultConfig()
cfg.ProtocolID = ProtocolIPFS
cfg.Backends[namespaceProviders] = &RecordBackend{}
cfg.Backends["another"] = &RecordBackend{}
cfg.Backends[namespacePublicKey] = &RecordBackend{}
assert.Error(t, cfg.Validate())
})

t.Run("backends for ipfs protocol (providers missing)", func(t *testing.T) {
cfg := DefaultConfig()
cfg.ProtocolID = ProtocolIPFS
cfg.Backends["another"] = &RecordBackend{}
cfg.Backends[namespaceIPNS] = &RecordBackend{}
cfg.Backends[namespacePublicKey] = &RecordBackend{}
assert.Error(t, cfg.Validate())
})

t.Run("nil address filter", func(t *testing.T) {
cfg := DefaultConfig()
cfg.AddressFilter = nil
Expand Down
1 change: 1 addition & 0 deletions v2/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var rng = rand.New(rand.NewSource(1337))

func newTestDHT(t testing.TB) *DHT {
cfg := DefaultConfig()
cfg.Logger = devnull

return newTestDHTWithConfig(t, cfg)
}
Expand Down
8 changes: 3 additions & 5 deletions v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor
defer cancel()

if numResults < 1 {
numResults = 20
numResults = 20 // TODO: parameterize
}

seeds, err := c.GetClosestNodes(ctx, msg.Target(), numResults)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) erro
ctx, cancel := context.WithCancel(ctx)
defer cancel()

seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20)
seeds, err := c.GetClosestNodes(ctx, msg.Target(), 20) // TODO: parameterize
if err != nil {
return err
}
Expand All @@ -449,9 +449,7 @@ func (c *Coordinator) BroadcastRecord(ctx context.Context, msg *pb.Message) erro
// queue the start of the query
c.brdcstBehaviour.Notify(ctx, cmd)

contacted, errs, err := c.waitForBroadcast(ctx, waiter)
fmt.Println(contacted)
fmt.Println(errs)
_, _, err = c.waitForBroadcast(ctx, waiter)

return err
}
Expand Down
2 changes: 1 addition & 1 deletion v2/internal/coord/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type Query[K kad.Key[K], N kad.NodeID[K], M coordt.Message] struct {
findCloser bool
stats QueryStats

// finished indicates that that the query has completed its work or has been stopped.
// finished indicates that the query has completed its work or has been stopped.
finished bool

// targetNodes is the set of responsive nodes thought to be closest to the target.
Expand Down
99 changes: 90 additions & 9 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
otel "go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/coord/coordt"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
Expand Down Expand Up @@ -110,20 +111,100 @@ func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error {
}

func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo {
_, span := d.tele.Tracer.Start(ctx, "DHT.FindProvidersAsync", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count)))
peerOut := make(chan peer.AddrInfo)
go d.findProvidersAsyncRoutine(ctx, c, count, peerOut)
return peerOut
}

func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan peer.AddrInfo) {
_, span := d.tele.Tracer.Start(ctx, "DHT.findProvidersAsyncRoutine", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count)))
defer span.End()

// verify if this DHT supports provider records by checking if a "providers"
// backend is registered.
_, found := d.backends[namespaceProviders]
defer close(out)

// verify if this DHT supports provider records by checking
// if a "providers" backend is registered.
b, found := d.backends[namespaceProviders]
if !found || !c.Defined() {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
return peerOut
span.RecordError(fmt.Errorf("no providers backend registered or CID undefined"))
return
}

// TODO reach out to Zikade
panic("implement me")
// first fetch the record locally
stored, err := b.Fetch(ctx, string(c.Hash()))
if err != nil {
span.RecordError(err)
d.log.Warn("Fetching value from provider store", slog.String("cid", c.String()), slog.String("err", err.Error()))
return
}

ps, ok := stored.(*providerSet)
if !ok {
span.RecordError(err)
d.log.Warn("Stored value is not a provider set", slog.String("cid", c.String()), slog.String("type", fmt.Sprintf("%T", stored)))
return
}

// send all providers onto the out channel until the desired count
// was reached. If no count was specified, continue with network lookup.
providers := map[peer.ID]struct{}{}
for _, provider := range ps.providers {
providers[provider.ID] = struct{}{}

select {
case <-ctx.Done():
return
case out <- provider:
}

if count != 0 && len(providers) == count {
return
}
}

// Craft message to send to other peers
msg := &pb.Message{
Type: pb.Message_GET_PROVIDERS,
Key: c.Hash(),
}

// handle node response
fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error {
// loop through all providers that the remote peer returned
for _, provider := range resp.ProviderAddrInfos() {

// if we had already sent that peer on the channel -> do nothing
if _, found := providers[provider.ID]; found {
continue
}

// keep track that we will have sent this peer on the channel
providers[provider.ID] = struct{}{}

// actually send the provider information to the user
select {
case <-ctx.Done():
return coordt.ErrSkipRemaining
case out <- provider:
}

// if count is 0, we will wait until the query has exhausted the keyspace
// if count isn't 0, we will stop if the number of providers we have sent
// equals the number that the user has requested.
if count != 0 && len(providers) == count {
return coordt.ErrSkipRemaining
}
}

return nil
}

_, err = d.kad.QueryMessage(ctx, msg, fn, 20) // TODO: parameterize
if err != nil {
span.RecordError(err)
d.log.Warn("Failed querying", slog.String("cid", c.String()), slog.String("err", err.Error()))
return
}
}

// PutValue satisfies the [routing.Routing] interface and will add the given
Expand Down
Loading

0 comments on commit dd5e537

Please sign in to comment.