Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deadlock regression test #61

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion backend_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zikade
import (
"context"

"github.com/plprobelab/zikade/tele"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -74,5 +75,5 @@ func (t *tracedBackend) Validate(ctx context.Context, key string, values ...any)

// traceAttributes is a helper to build the trace attributes.
func (t *tracedBackend) traceAttributes(key string) trace.SpanStartEventOption {
return trace.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
return trace.WithAttributes(attribute.String("namespace", t.namespace), tele.AttrBinKey([]byte(key)))
}
5 changes: 3 additions & 2 deletions internal/coord/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,10 @@ func (w *queryNotifier[E]) NotifyFinished(ctx context.Context, ev E) {
w.DrainPending()
close(w.monitor.NotifyProgressed())

finishedChan := w.monitor.NotifyFinished()
select {
case w.monitor.NotifyFinished() <- CtxEvent[E]{Ctx: ctx, Event: ev}:
case finishedChan <- CtxEvent[E]{Ctx: ctx, Event: ev}:
default:
}
close(w.monitor.NotifyFinished())
close(finishedChan)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needed to change this, because BeforeFinished was called twice here.

}
25 changes: 12 additions & 13 deletions internal/coord/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ package coord

import (
"context"
"sync"
"testing"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/plprobelab/zikade/internal/coord/coordt"
"github.com/plprobelab/zikade/internal/kadtest"
"github.com/plprobelab/zikade/internal/nettest"
"github.com/plprobelab/zikade/kadt"
"github.com/plprobelab/zikade/pb"
"github.com/stretchr/testify/suite"

"github.com/stretchr/testify/require"
)

func TestPooledQueryConfigValidate(t *testing.T) {
Expand Down Expand Up @@ -275,7 +274,6 @@ func (ts *QueryBehaviourBaseTestSuite) TestNotifiesQueryFinished() {
}

func TestPooledQuery_deadlock_regression(t *testing.T) {
t.Skip()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncommented this test

ctx := kadtest.CtxShort(t)
msg := &pb.Message{}
queryID := coordt.QueryID("test")
Expand Down Expand Up @@ -342,16 +340,13 @@ func TestPooledQuery_deadlock_regression(t *testing.T) {
// Advance the query pool state machine. Because we returned a new node
// above, the query pool state machine wants to send another outbound query
ev, _ = c.queryBehaviour.Perform(ctx)
require.IsType(t, &EventAddNode{}, ev) // event to notify the routing table
ev, _ = c.queryBehaviour.Perform(ctx)
require.IsType(t, &EventOutboundSendMessage{}, ev)
ev, _ = c.queryBehaviour.Perform(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best to use DrainBehaviour from #64 here. We just want to run the behaviour until it has completed all its work. Future changes might introduce new events that would affect this test.

require.IsType(t, &EventAddNode{}, ev) // event to notify the routing table

hasLock := make(chan struct{})
var once sync.Once
wrappedWaiter.BeforeProgressed = func() {
once.Do(func() {
close(hasLock)
})
wrappedWaiter.BeforeFinished = func() {
close(hasLock)
}

// Simulate a successful response from the new node. This node didn't return
Expand All @@ -361,7 +356,11 @@ func TestPooledQuery_deadlock_regression(t *testing.T) {
// of 1, the channel cannot hold both events. At the same time, the waiter
// doesn't consume the messages because it's busy processing the previous
// query event (because we haven't released the blocking waiterMsg call above).
go c.queryBehaviour.Notify(ctx, successMsg(nodes[2].NodeID))
c.queryBehaviour.Notify(ctx, successMsg(nodes[2].NodeID))

ev, ok := c.queryBehaviour.Perform(ctx)
require.Nil(t, ev)
require.False(t, ok)

// wait until the above Notify call was handled by waiting until the hasLock
// channel was closed in the above BeforeNotify hook. If that hook is called
Expand Down
3 changes: 1 addition & 2 deletions router.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package zikade

import (
"context"
"encoding/base64"
"fmt"
"time"

Expand Down Expand Up @@ -44,7 +43,7 @@ func (r *router) SendMessage(ctx context.Context, to kadt.PeerID, req *pb.Messag
spanOpts := []trace.SpanStartOption{
trace.WithAttributes(tele.AttrMessageType(req.GetType().String())),
trace.WithAttributes(tele.AttrPeerID(to.String())),
trace.WithAttributes(tele.AttrKey(base64.RawStdEncoding.EncodeToString(req.GetKey()))),
trace.WithAttributes(tele.AttrBinKey(req.GetKey())),
}
ctx, span := r.tele.Tracer.Start(ctx, "router.SendMessage", spanOpts...)
defer func() {
Expand Down
14 changes: 11 additions & 3 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"runtime"
"time"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -256,7 +257,7 @@ func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ..
// putValueLocal stores a value in the local datastore without reaching out to
// the network.
func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValueLocal")
ctx, span := d.tele.Tracer.Start(ctx, "DHT.putValueLocal")
defer span.End()

ns, path, err := record.SplitKey(key)
Expand Down Expand Up @@ -308,7 +309,7 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option)
// SearchValue will search in the DHT for keyStr. keyStr must have the form
// `/$namespace/$binary_id`
func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) {
_, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue")
ctx, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue")
defer span.End()

// first parse the routing options
Expand Down Expand Up @@ -364,7 +365,7 @@ func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing
}

func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) {
_, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine")
ctx, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine")
defer span.End()
defer close(out)

Expand All @@ -390,10 +391,14 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string
quorum := d.getQuorum(ropt)

fn := func(ctx context.Context, id kadt.PeerID, resp *pb.Message, stats coordt.QueryStats) error {
_, innerSpan := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine.QueryFunc")
defer innerSpan.End()

rec := resp.GetRecord()
if rec == nil {
return nil
}
runtime.Gosched()

if !bytes.Equal(routingKey, rec.GetKey()) {
return nil
Expand All @@ -402,11 +407,13 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string
idx, _ := backend.Validate(ctx, path, best, rec.GetValue())
switch idx {
case 0: // "best" is still the best value
innerSpan.SetAttributes(attribute.String("better", "old"))
if bytes.Equal(best, rec.GetValue()) {
quorumPeers[id] = struct{}{}
}

case 1: // rec.GetValue() is better than our current "best"
innerSpan.SetAttributes(attribute.String("better", "new"))

// We have identified a better record. All peers that were currently
// in our set of quorum peers need to be updated wit this new record
Expand All @@ -422,6 +429,7 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string
best = rec.GetValue()
out <- best
case -1: // "best" and rec.GetValue() are both invalid
innerSpan.SetAttributes(attribute.String("better", "none"))
return nil

default:
Expand Down
2 changes: 1 addition & 1 deletion routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/plprobelab/zikade/internal/kadtest"
kadtest "github.com/plprobelab/zikade/internal/kadtest"
"github.com/plprobelab/zikade/kadt"
)

Expand Down
4 changes: 2 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,15 @@ func (d *DHT) handleNewStream(ctx context.Context, s network.Stream) error {

ctx = tele.WithAttributes(ctx,
tele.AttrMessageType(req.GetType().String()),
tele.AttrKey(base64.StdEncoding.EncodeToString(req.GetKey())),
tele.AttrBinKey(req.GetKey()),
)

// extend metrics context and slogger with message information.
// ctx must be overwritten because in the next iteration metrics.KeyMessageType
// would already exist and tag.New would return an error.
slogger = slogger.With(
slog.String("type", req.GetType().String()),
slog.String("key", base64.StdEncoding.EncodeToString(req.GetKey())),
slog.String("key", base64.RawStdEncoding.EncodeToString(req.GetKey())),
)

// track message metrics
Expand Down
8 changes: 6 additions & 2 deletions tele/tele.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tele

import (
"context"
"encoding/base64"
"fmt"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -95,8 +96,11 @@ func AttrMessageType(val string) attribute.KeyValue {
return attribute.String("message_type", val)
}

func AttrKey(val string) attribute.KeyValue {
return attribute.String("key", val)
// AttrBinKey base64 encodes the given key to a trace/meter attribute. This is
// necessary as some binary keys conain invalid UTF-8 characters.
func AttrBinKey(key []byte) attribute.KeyValue {
val := base64.RawStdEncoding.EncodeToString(key)
return attribute.String("key_b64", val)
}

// AttrInEvent creates an attribute that records the type of an event
Expand Down