diff --git a/v2/routing.go b/v2/routing.go index a68df5c2..60d25463 100644 --- a/v2/routing.go +++ b/v2/routing.go @@ -330,24 +330,27 @@ func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing return nil, routing.ErrNotSupported } - if rOpt.Offline { - val, err := b.Fetch(ctx, path) - if err != nil { - if errors.Is(err, ds.ErrNotFound) { - return nil, routing.ErrNotFound - } + val, err := b.Fetch(ctx, path) + if err != nil { + if !errors.Is(err, ds.ErrNotFound) { return nil, fmt.Errorf("fetch from backend: %w", err) } - rec, ok := val.(*recpb.Record) - if !ok { - return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val) - } - - if rec == nil { + if rOpt.Offline { return nil, routing.ErrNotFound } + out := make(chan []byte) + go d.searchValueRoutine(ctx, b, ns, path, rOpt, out) + return out, nil + } + + rec, ok := val.(*recpb.Record) + if !ok { + return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val) + } + + if rOpt.Offline { out := make(chan []byte, 1) defer close(out) out <- rec.GetValue() @@ -355,29 +358,24 @@ func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing } out := make(chan []byte) - go d.searchValueRoutine(ctx, keyStr, rOpt, out) + go func() { + out <- rec.GetValue() + d.searchValueRoutine(ctx, b, ns, path, rOpt, out) + }() + return out, nil } -func (d *DHT) searchValueRoutine(ctx context.Context, keyStr string, ropt *routing.Options, out chan<- []byte) { +func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) { _, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine") defer span.End() defer close(out) - // TODO: remove below duplication - ns, path, err := record.SplitKey(keyStr) - if err != nil { - return - } - - b, found := d.backends[ns] - if !found { - return - } + routingKey := []byte(newRoutingKey(ns, path)) req := &pb.Message{ Type: pb.Message_GET_VALUE, - Key: []byte(keyStr), + Key: routingKey, } // The currently known best value for keyStr @@ -397,12 +395,12 @@ func (d *DHT) searchValueRoutine(ctx context.Context, keyStr string, ropt *routi return nil } - if !bytes.Equal([]byte(keyStr), rec.GetKey()) { + if !bytes.Equal(routingKey, rec.GetKey()) { d.log.Debug("record key mismatch") return nil } - idx, _ := b.Validate(ctx, path, best, rec.GetValue()) + idx, _ := backend.Validate(ctx, path, best, rec.GetValue()) switch idx { case 0: if bytes.Equal(best, rec.GetValue()) { @@ -428,7 +426,7 @@ func (d *DHT) searchValueRoutine(ctx context.Context, keyStr string, ropt *routi return nil } - _, err = d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize) + _, err := d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize) if err != nil { d.log.Warn("Search value query failed", slog.String("err", err.Error())) } diff --git a/v2/routing_test.go b/v2/routing_test.go index 7ca6185a..20c5077b 100644 --- a/v2/routing_test.go +++ b/v2/routing_test.go @@ -502,47 +502,6 @@ func TestDHT_SearchValue_simple(t *testing.T) { assertClosed(t, ctx, valChan) } -func TestDHT_SearchValue_offline(t *testing.T) { - // Test setup: - // There is just one other server that returns a valid value. - ctx := kadtest.CtxShort(t) - d := newTestDHT(t) - - key, v := makePkKeyValue(t) - err := d.putValueLocal(ctx, key, v) - require.NoError(t, err) - - valChan, err := d.SearchValue(ctx, key, routing.Offline) - require.NoError(t, err) - - val := readItem(t, ctx, valChan) - assert.Equal(t, v, val) - - assertClosed(t, ctx, valChan) -} - -func TestDHT_SearchValue_offline_not_found_locally(t *testing.T) { - // Test setup: - // We are connected to a peer that holds the record but require an offline - // lookup. Assert that we don't receive the record - ctx := kadtest.CtxShort(t) - - key, v := makePkKeyValue(t) - - top := NewTopology(t) - d1 := top.AddServer(nil) - d2 := top.AddServer(nil) - - top.Connect(ctx, d1, d2) - - err := d2.putValueLocal(ctx, key, v) - require.NoError(t, err) - - valChan, err := d1.SearchValue(ctx, key, routing.Offline) - assert.ErrorIs(t, err, routing.ErrNotFound) - assert.Nil(t, valChan) -} - func TestDHT_SearchValue_returns_best_values(t *testing.T) { // Test setup: // d2 returns no value @@ -794,3 +753,78 @@ func TestDHT_SearchValue_stops_with_cancelled_context(t *testing.T) { assert.NoError(t, err) assertClosed(t, ctx, valueChan) } + +func TestDHT_SearchValue_has_record_locally(t *testing.T) { + // Test setup: + // There is just one other server that returns a valid value. + ctx := kadtest.CtxShort(t) + clk := clock.New() + + _, priv := newIdentity(t) + _, validValue := makeIPNSKeyValue(t, clk, priv, 1, time.Hour) + key, betterValue := makeIPNSKeyValue(t, clk, priv, 2, time.Hour) + + top := NewTopology(t) + d1 := top.AddServer(nil) + d2 := top.AddServer(nil) + + top.Connect(ctx, d1, d2) + + err := d1.putValueLocal(ctx, key, validValue) + require.NoError(t, err) + + err = d2.putValueLocal(ctx, key, betterValue) + require.NoError(t, err) + + valChan, err := d1.SearchValue(ctx, key) + require.NoError(t, err) + + val := readItem(t, ctx, valChan) // from local store + assert.Equal(t, validValue, val) + + val = readItem(t, ctx, valChan) + assert.Equal(t, betterValue, val) + + assertClosed(t, ctx, valChan) +} + +func TestDHT_SearchValue_offline(t *testing.T) { + // Test setup: + // There is just one other server that returns a valid value. + ctx := kadtest.CtxShort(t) + d := newTestDHT(t) + + key, v := makePkKeyValue(t) + err := d.putValueLocal(ctx, key, v) + require.NoError(t, err) + + valChan, err := d.SearchValue(ctx, key, routing.Offline) + require.NoError(t, err) + + val := readItem(t, ctx, valChan) + assert.Equal(t, v, val) + + assertClosed(t, ctx, valChan) +} + +func TestDHT_SearchValue_offline_not_found_locally(t *testing.T) { + // Test setup: + // We are connected to a peer that holds the record but require an offline + // lookup. Assert that we don't receive the record + ctx := kadtest.CtxShort(t) + + key, v := makePkKeyValue(t) + + top := NewTopology(t) + d1 := top.AddServer(nil) + d2 := top.AddServer(nil) + + top.Connect(ctx, d1, d2) + + err := d2.putValueLocal(ctx, key, v) + require.NoError(t, err) + + valChan, err := d1.SearchValue(ctx, key, routing.Offline) + assert.ErrorIs(t, err, routing.ErrNotFound) + assert.Nil(t, valChan) +}