diff --git a/backend_trace.go b/backend_trace.go index 6cb63d7..8b1aba1 100644 --- a/backend_trace.go +++ b/backend_trace.go @@ -3,6 +3,7 @@ package zikade import ( "context" + "github.com/plprobelab/zikade/tele" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -74,5 +75,5 @@ func (t *tracedBackend) Validate(ctx context.Context, key string, values ...any) // traceAttributes is a helper to build the trace attributes. func (t *tracedBackend) traceAttributes(key string) trace.SpanStartEventOption { - return trace.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key)) + return trace.WithAttributes(attribute.String("namespace", t.namespace), tele.AttrBinKey([]byte(key))) } diff --git a/internal/coord/query.go b/internal/coord/query.go index 05f23cb..bd8c73e 100644 --- a/internal/coord/query.go +++ b/internal/coord/query.go @@ -467,9 +467,10 @@ func (w *queryNotifier[E]) NotifyFinished(ctx context.Context, ev E) { w.DrainPending() close(w.monitor.NotifyProgressed()) + finishedChan := w.monitor.NotifyFinished() select { - case w.monitor.NotifyFinished() <- CtxEvent[E]{Ctx: ctx, Event: ev}: + case finishedChan <- CtxEvent[E]{Ctx: ctx, Event: ev}: default: } - close(w.monitor.NotifyFinished()) + close(finishedChan) } diff --git a/internal/coord/query_test.go b/internal/coord/query_test.go index 40d285b..0ab13c8 100644 --- a/internal/coord/query_test.go +++ b/internal/coord/query_test.go @@ -2,18 +2,17 @@ package coord import ( "context" - "sync" "testing" "github.com/benbjohnson/clock" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "github.com/plprobelab/zikade/internal/coord/coordt" "github.com/plprobelab/zikade/internal/kadtest" "github.com/plprobelab/zikade/internal/nettest" "github.com/plprobelab/zikade/kadt" "github.com/plprobelab/zikade/pb" + "github.com/stretchr/testify/suite" + + "github.com/stretchr/testify/require" ) func TestPooledQueryConfigValidate(t *testing.T) { @@ -275,7 +274,6 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() { } func TestPooledQuery_deadlock_regression(t *testing.T) { - t.Skip() ctx := kadtest.CtxShort(t) msg := &pb.Message{} queryID := coordt.QueryID("test") @@ -342,16 +340,13 @@ func TestPooledQuery_deadlock_regression(t *testing.T) { // Advance the query pool state machine. Because we returned a new node // above, the query pool state machine wants to send another outbound query ev, _ = c.queryBehaviour.Perform(ctx) - require.IsType(t, &EventAddNode{}, ev) // event to notify the routing table - ev, _ = c.queryBehaviour.Perform(ctx) require.IsType(t, &EventOutboundSendMessage{}, ev) + ev, _ = c.queryBehaviour.Perform(ctx) + require.IsType(t, &EventAddNode{}, ev) // event to notify the routing table hasLock := make(chan struct{}) - var once sync.Once - wrappedWaiter.BeforeProgressed = func() { - once.Do(func() { - close(hasLock) - }) + wrappedWaiter.BeforeFinished = func() { + close(hasLock) } // Simulate a successful response from the new node. This node didn't return @@ -361,7 +356,11 @@ func TestPooledQuery_deadlock_regression(t *testing.T) { // of 1, the channel cannot hold both events. At the same time, the waiter // doesn't consume the messages because it's busy processing the previous // query event (because we haven't released the blocking waiterMsg call above). - go c.queryBehaviour.Notify(ctx, successMsg(nodes[2].NodeID)) + c.queryBehaviour.Notify(ctx, successMsg(nodes[2].NodeID)) + + ev, ok := c.queryBehaviour.Perform(ctx) + require.Nil(t, ev) + require.False(t, ok) // wait until the above Notify call was handled by waiting until the hasLock // channel was closed in the above BeforeNotify hook. If that hook is called diff --git a/router.go b/router.go index 90b5ce8..f64d900 100644 --- a/router.go +++ b/router.go @@ -2,7 +2,6 @@ package zikade import ( "context" - "encoding/base64" "fmt" "time" @@ -44,7 +43,7 @@ func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Messag spanOpts := []trace.SpanStartOption{ trace.WithAttributes(tele.AttrMessageType(req.GetType().String())), trace.WithAttributes(tele.AttrPeerID(to.String())), - trace.WithAttributes(tele.AttrKey(base64.RawStdEncoding.EncodeToString(req.GetKey()))), + trace.WithAttributes(tele.AttrBinKey(req.GetKey())), } ctx, span := r.tele.Tracer.Start(ctx, "router.SendMessage", spanOpts...) defer func() { diff --git a/routing.go b/routing.go index f7ee5c7..5a0d35a 100644 --- a/routing.go +++ b/routing.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "runtime" "time" "github.com/ipfs/go-cid" @@ -256,7 +257,7 @@ func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts .. // putValueLocal stores a value in the local datastore without reaching out to // the network. func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error { - ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValueLocal") + ctx, span := d.tele.Tracer.Start(ctx, "DHT.putValueLocal") defer span.End() ns, path, err := record.SplitKey(key) @@ -308,7 +309,7 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) // SearchValue will search in the DHT for keyStr. keyStr must have the form // `/$namespace/$binary_id` func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) { - _, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue") + ctx, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue") defer span.End() // first parse the routing options @@ -364,7 +365,7 @@ func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing } func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) { - _, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine") + ctx, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine") defer span.End() defer close(out) @@ -390,10 +391,14 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string quorum := d.getQuorum(ropt) fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error { + _, innerSpan := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine.QueryFunc") + defer innerSpan.End() + rec := resp.GetRecord() if rec == nil { return nil } + runtime.Gosched() if !bytes.Equal(routingKey, rec.GetKey()) { return nil @@ -402,11 +407,13 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string idx, _ := backend.Validate(ctx, path, best, rec.GetValue()) switch idx { case 0: // "best" is still the best value + innerSpan.SetAttributes(attribute.String("better", "old")) if bytes.Equal(best, rec.GetValue()) { quorumPeers[id] = struct{}{} } case 1: // rec.GetValue() is better than our current "best" + innerSpan.SetAttributes(attribute.String("better", "new")) // We have identified a better record. All peers that were currently // in our set of quorum peers need to be updated wit this new record @@ -422,6 +429,7 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string best = rec.GetValue() out <- best case -1: // "best" and rec.GetValue() are both invalid + innerSpan.SetAttributes(attribute.String("better", "none")) return nil default: diff --git a/routing_test.go b/routing_test.go index d261f22..a1d894f 100644 --- a/routing_test.go +++ b/routing_test.go @@ -21,7 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/plprobelab/zikade/internal/kadtest" + kadtest "github.com/plprobelab/zikade/internal/kadtest" "github.com/plprobelab/zikade/kadt" ) diff --git a/stream.go b/stream.go index af37a22..f79b9a6 100644 --- a/stream.go +++ b/stream.go @@ -120,7 +120,7 @@ func (d *DHT) handleNewStream(ctx context.Context, s network.Stream) error { ctx = tele.WithAttributes(ctx, tele.AttrMessageType(req.GetType().String()), - tele.AttrKey(base64.StdEncoding.EncodeToString(req.GetKey())), + tele.AttrBinKey(req.GetKey()), ) // extend metrics context and slogger with message information. @@ -128,7 +128,7 @@ func (d *DHT) handleNewStream(ctx context.Context, s network.Stream) error { // would already exist and tag.New would return an error. slogger = slogger.With( slog.String("type", req.GetType().String()), - slog.String("key", base64.StdEncoding.EncodeToString(req.GetKey())), + slog.String("key", base64.RawStdEncoding.EncodeToString(req.GetKey())), ) // track message metrics diff --git a/tele/tele.go b/tele/tele.go index 7a7cce2..4c17dd3 100644 --- a/tele/tele.go +++ b/tele/tele.go @@ -2,6 +2,7 @@ package tele import ( "context" + "encoding/base64" "fmt" "go.opentelemetry.io/otel" @@ -95,8 +96,11 @@ func AttrMessageType(val string) attribute.KeyValue { return attribute.String("message_type", val) } -func AttrKey(val string) attribute.KeyValue { - return attribute.String("key", val) +// AttrBinKey base64 encodes the given key to a trace/meter attribute. This is +// necessary as some binary keys conain invalid UTF-8 characters. +func AttrBinKey(key []byte) attribute.KeyValue { + val := base64.RawStdEncoding.EncodeToString(key) + return attribute.String("key_b64", val) } // AttrInEvent creates an attribute that records the type of an event