diff --git a/v2/internal/coord/routing/bootstrap.go b/v2/internal/coord/routing/bootstrap.go index bfd13b3e..5b77a440 100644 --- a/v2/internal/coord/routing/bootstrap.go +++ b/v2/internal/coord/routing/bootstrap.go @@ -3,7 +3,7 @@ package routing import ( "context" "fmt" - "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -28,9 +28,6 @@ type Bootstrap[K kad.Key[K], N kad.NodeID[K]] struct { // qry is the query used by the bootstrap process qry *query.Query[K, N, any] - // qryMu guards access to qry - qryMu sync.RWMutex - // cfg is a copy of the optional configuration supplied to the Bootstrap cfg BootstrapConfig @@ -45,6 +42,9 @@ type Bootstrap[K kad.Key[K], N kad.NodeID[K]] struct { // gaugeRunning is a gauge that tracks whether the bootstrap is running. gaugeRunning metric.Int64ObservableGauge + + // running records whether the bootstrap is running after the last state change so that it can be read asynchronously by gaugeRunning + running atomic.Bool } // BootstrapConfig specifies optional configuration for a Bootstrap @@ -167,12 +167,10 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) ( metric.WithDescription("Whether or not the bootstrap is running"), metric.WithUnit("1"), metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - b.qryMu.RLock() - defer b.qryMu.RUnlock() - if b.qry == nil { - o.Observe(0) - } else { + if b.running.Load() { o.Observe(1) + } else { + o.Observe(0) } return nil }), @@ -185,15 +183,16 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) ( } // Advance advances the state of the bootstrap by attempting to advance its query if running. -func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState { +func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) (out BootstrapState) { ctx, span := b.cfg.Tracer.Start(ctx, "Bootstrap.Advance", trace.WithAttributes(tele.AttrInEvent(ev))) - defer span.End() + defer func() { + b.running.Store(b.qry != nil) // record whether the bootstrap is still running for metrics + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() switch tev := ev.(type) { case *EventBootstrapStart[K, N]: - b.qryMu.Lock() - defer b.qryMu.Unlock() - if b.qry != nil { return b.advanceQuery(ctx, &query.EventQueryPoll{}) } @@ -233,8 +232,6 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst panic(fmt.Sprintf("unexpected event: %T", tev)) } - b.qryMu.Lock() - defer b.qryMu.Unlock() if b.qry != nil { return b.advanceQuery(ctx, &query.EventQueryPoll{}) } diff --git a/v2/internal/coord/routing/explore.go b/v2/internal/coord/routing/explore.go index 637f6161..cf731aa5 100644 --- a/v2/internal/coord/routing/explore.go +++ b/v2/internal/coord/routing/explore.go @@ -5,7 +5,7 @@ import ( "context" "fmt" "math/rand" - "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -49,12 +49,6 @@ type Explore[K kad.Key[K], N kad.NodeID[K]] struct { // qryCpl is the cpl the current query is exploring for qryCpl int - // cplAttributeSet holds the current cpl being explored in an attribute that may be used with metrics - cplAttributeSet attribute.Set - - // qryMu guards access to qry, qryCpl and cplAttributeSet - qryMu sync.RWMutex - // cfg is a copy of the optional configuration supplied to the Explore cfg ExploreConfig @@ -71,6 +65,12 @@ type Explore[K kad.Key[K], N kad.NodeID[K]] struct { // gaugeRunning is a gauge that tracks whether an explore is running. gaugeRunning metric.Int64ObservableGauge + + // running records whether an explore is running after the last state change so that it can be read asynchronously by gaugeRunning + running atomic.Bool + + // cplAttributeSet holds the current cpl being explored in an attribute that may be used with metrics + cplAttributeSet atomic.Value // holds a [attribute.Set] } // NodeIDForCplFunc is a function that given a cpl generates a [kad.NodeID] with a key that has @@ -180,6 +180,7 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N], qryCpl: -1, schedule: schedule, } + e.cplAttributeSet.Store(attribute.NewSet()) var err error e.counterFindSent, err = cfg.Meter.Int64Counter( @@ -214,12 +215,10 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N], metric.WithDescription("Whether or not the an explore is running for a cpl"), metric.WithUnit("1"), metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - e.qryMu.RLock() - defer e.qryMu.RUnlock() - if e.qry == nil { - o.Observe(0) + if e.running.Load() { + o.Observe(1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set))) } else { - o.Observe(1, metric.WithAttributeSet(e.cplAttributeSet)) + o.Observe(0) } return nil }), @@ -232,27 +231,25 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N], } // Advance advances the state of the explore by attempting to advance its query if running. -func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) ExploreState { +func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) (out ExploreState) { ctx, span := e.cfg.Tracer.Start(ctx, "Explore.Advance", trace.WithAttributes(tele.AttrInEvent(ev))) - defer span.End() + defer func() { + e.running.Store(e.qry != nil) + span.SetAttributes(tele.AttrOutEvent(out)) + span.End() + }() switch tev := ev.(type) { case *EventExplorePoll: // ignore, nothing to do case *EventExploreFindCloserResponse[K, N]: - e.qryMu.RLock() - defer e.qryMu.RUnlock() - - e.counterFindSucceeded.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet)) + e.counterFindSucceeded.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set))) return e.advanceQuery(ctx, &query.EventQueryNodeResponse[K, N]{ NodeID: tev.NodeID, CloserNodes: tev.CloserNodes, }) case *EventExploreFindCloserFailure[K, N]: - e.qryMu.RLock() - defer e.qryMu.RUnlock() - - e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet)) + e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set))) span.RecordError(tev.Error) return e.advanceQuery(ctx, &query.EventQueryNodeFailure[K, N]{ NodeID: tev.NodeID, @@ -262,9 +259,6 @@ func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) ExploreSta panic(fmt.Sprintf("unexpected event: %T", tev)) } - e.qryMu.Lock() - defer e.qryMu.Unlock() - // if query is running, give it a chance to advance if e.qry != nil { return e.advanceQuery(ctx, &query.EventQueryPoll{}) @@ -302,7 +296,7 @@ func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) ExploreSta } e.qry = qry e.qryCpl = cpl - e.cplAttributeSet = attribute.NewSet(attribute.Int("cpl", cpl)) + e.cplAttributeSet.Store(attribute.NewSet(attribute.Int("cpl", cpl))) return e.advanceQuery(ctx, &query.EventQueryPoll{}) } @@ -311,12 +305,10 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) ctx, span := e.cfg.Tracer.Start(ctx, "Explore.advanceQuery") defer span.End() - // e.qryMu is held by Advance - state := e.qry.Advance(ctx, qev) switch st := state.(type) { case *query.StateQueryFindCloser[K, N]: - e.counterFindSent.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet)) + e.counterFindSent.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set))) return &StateExploreFindCloser[K, N]{ Cpl: e.qryCpl, QueryID: st.QueryID, @@ -326,9 +318,7 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) } case *query.StateQueryFinished[K, N]: span.SetAttributes(attribute.String("out_state", "StateExploreFinished")) - e.qry = nil - e.qryCpl = -1 - e.cplAttributeSet = attribute.NewSet() + e.clearQuery() return &StateExploreQueryFinished{ Cpl: e.qryCpl, Stats: st.Stats, @@ -336,11 +326,9 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) case *query.StateQueryWaitingAtCapacity: elapsed := e.cfg.Clock.Since(st.Stats.Start) if elapsed > e.cfg.Timeout { - e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet)) + e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set))) span.SetAttributes(attribute.String("out_state", "StateExploreTimeout")) - e.qry = nil - e.qryCpl = -1 - e.cplAttributeSet = attribute.NewSet() + e.clearQuery() return &StateExploreQueryTimeout{ Cpl: e.qryCpl, Stats: st.Stats, @@ -354,11 +342,9 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) case *query.StateQueryWaitingWithCapacity: elapsed := e.cfg.Clock.Since(st.Stats.Start) if elapsed > e.cfg.Timeout { - e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet)) + e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set))) span.SetAttributes(attribute.String("out_state", "StateExploreTimeout")) - e.qry = nil - e.qryCpl = -1 - e.cplAttributeSet = attribute.NewSet() + e.clearQuery() return &StateExploreQueryTimeout{ Cpl: e.qryCpl, Stats: st.Stats, @@ -374,6 +360,12 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) } } +func (e *Explore[K, N]) clearQuery() { + e.qry = nil + e.qryCpl = -1 + e.cplAttributeSet.Store(attribute.NewSet()) +} + // ExploreState is the state of an [Explore]. type ExploreState interface { exploreState() diff --git a/v2/internal/coord/routing/include.go b/v2/internal/coord/routing/include.go index 5d92cbdd..4aad5383 100644 --- a/v2/internal/coord/routing/include.go +++ b/v2/internal/coord/routing/include.go @@ -3,7 +3,7 @@ package routing import ( "context" "fmt" - "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -30,9 +30,6 @@ type Include[K kad.Key[K], N kad.NodeID[K]] struct { // candidates is a list of nodes that are candidates for adding to the routing table candidates *nodeQueue[K, N] - // candidatesMu guards access to candidates - candidatesMu sync.RWMutex - // cfg is a copy of the optional configuration supplied to the Include cfg IncludeConfig @@ -53,6 +50,9 @@ type Include[K kad.Key[K], N kad.NodeID[K]] struct { // gaugeCandidateCount is a gauge that tracks the number of nodes in the probe's pending queue of scheduled checks. gaugeCandidateCount metric.Int64ObservableGauge + + // candidateCount holds the number of candidate nodes after the last state change so that it can be read asynchronously by gaugeCandidateCount + candidateCount atomic.Int64 } // IncludeConfig specifies optional configuration for an Include @@ -187,9 +187,7 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I metric.WithDescription("Total number of nodes in the include state machine's candidate queue"), metric.WithUnit("1"), metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - in.candidatesMu.RLock() - defer in.candidatesMu.RUnlock() - o.Observe(int64(in.candidates.Len())) + o.Observe(in.candidateCount.Load()) return nil }), ) @@ -204,6 +202,7 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out IncludeState) { ctx, span := in.cfg.Tracer.Start(ctx, "Include.Advance", trace.WithAttributes(tele.AttrInEvent(ev))) defer func() { + in.candidateCount.Store(int64(in.candidates.Len())) span.SetAttributes(tele.AttrOutEvent(out)) span.End() }() @@ -222,14 +221,11 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl } // TODO: potentially time out a check and make room in the queue - in.candidatesMu.Lock() if !in.candidates.HasCapacity() { - in.candidatesMu.Unlock() in.counterCandidatesDroppedCapacity.Add(ctx, 1) return &StateIncludeWaitingFull{} } in.candidates.Enqueue(ctx, tev.NodeID) - in.candidatesMu.Unlock() case *EventIncludeConnectivityCheckSuccess[K, N]: in.counterChecksPassed.Add(ctx, 1) @@ -253,9 +249,6 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl panic(fmt.Sprintf("unexpected event: %T", tev)) } - in.candidatesMu.Lock() - defer in.candidatesMu.Unlock() - if len(in.checks) == in.cfg.Concurrency { if !in.candidates.HasCapacity() { in.counterCandidatesDroppedCapacity.Add(ctx, 1) diff --git a/v2/internal/coord/routing/probe.go b/v2/internal/coord/routing/probe.go index 9207d19a..56f31146 100644 --- a/v2/internal/coord/routing/probe.go +++ b/v2/internal/coord/routing/probe.go @@ -5,7 +5,7 @@ import ( "context" "errors" "fmt" - "sync" + "sync/atomic" "time" "github.com/benbjohnson/clock" @@ -79,6 +79,9 @@ type Probe[K kad.Key[K], N kad.NodeID[K]] struct { // gaugePendingCount is a gauge that tracks the number of nodes in the probe's pending queue of scheduled checks. gaugePendingCount metric.Int64ObservableGauge + + // pendingCount holds the number of pending nodes after the last state change so that it can be read asynchronously by gaugePendingCount + pendingCount atomic.Int64 } // ProbeConfig specifies optional configuration for a Probe @@ -203,7 +206,7 @@ func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *Prob metric.WithDescription("Total number of nodes being monitored by the probe state machine"), metric.WithUnit("1"), metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { - o.Observe(int64(p.nvl.pendingCount())) + o.Observe(p.pendingCount.Load()) return nil }), ) @@ -218,6 +221,8 @@ func NewProbe[K kad.Key[K], N kad.NodeID[K]](rt RoutingTableCpl[K, N], cfg *Prob func (p *Probe[K, N]) Advance(ctx context.Context, ev ProbeEvent) (out ProbeState) { _, span := p.cfg.Tracer.Start(ctx, "Probe.Advance", trace.WithAttributes(tele.AttrInEvent(ev))) defer func() { + // update the pending count so gauge can read it asynchronously + p.pendingCount.Store(int64(p.nvl.pendingCount())) span.SetAttributes(tele.AttrOutEvent(out)) span.End() }() @@ -424,9 +429,6 @@ type nodeValueEntry[K kad.Key[K], N kad.NodeID[K]] struct { type nodeValueList[K kad.Key[K], N kad.NodeID[K]] struct { nodes map[string]*nodeValueEntry[K, N] - // pendingMu guards access to pending - pendingMu sync.RWMutex - // pending is a list of nodes ordered by the time of the next check pending *nodeValuePendingList[K, N] @@ -459,12 +461,10 @@ func (l *nodeValueList[K, N]) Put(nv *nodeValue[K, N]) { // nve.index is -1 when the node is not already in the pending list // this could be because it is new or if there is an ongoing check - l.pendingMu.Lock() if nve.index == -1 { heap.Push(l.pending, nve) } heap.Fix(l.pending, nve.index) - l.pendingMu.Unlock() l.removeFromOngoing(nv.NodeID) } @@ -479,8 +479,6 @@ func (l *nodeValueList[K, N]) Get(n N) (*nodeValue[K, N], bool) { } func (l *nodeValueList[K, N]) pendingCount() int { - l.pendingMu.RLock() - defer l.pendingMu.RUnlock() return len(*l.pending) } @@ -502,9 +500,7 @@ func (l *nodeValueList[K, N]) Remove(n N) { } delete(l.nodes, mk) if nve.index >= 0 { - l.pendingMu.Lock() heap.Remove(l.pending, nve.index) - l.pendingMu.Unlock() } l.removeFromOngoing(n) } @@ -548,8 +544,6 @@ func (l *nodeValueList[K, N]) removeFromOngoing(n N) { // PeekNext returns the next node that is due a connectivity check without removing it // from the pending list. func (l *nodeValueList[K, N]) PeekNext(ts time.Time) (*nodeValue[K, N], bool) { - l.pendingMu.Lock() - defer l.pendingMu.Unlock() if len(*l.pending) == 0 { return nil, false } @@ -574,9 +568,7 @@ func (l *nodeValueList[K, N]) MarkOngoing(n N, deadline time.Time) { } nve.nv.CheckDeadline = deadline l.nodes[mk] = nve - l.pendingMu.Lock() heap.Remove(l.pending, nve.index) - l.pendingMu.Unlock() l.ongoing = append(l.ongoing, nve.nv.NodeID) }