Skip to content

Commit

Permalink
Simplify use of gauges with atomics
Browse files Browse the repository at this point in the history
  • Loading branch information
iand committed Sep 25, 2023
1 parent cba95d4 commit feda524
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 84 deletions.
29 changes: 13 additions & 16 deletions v2/internal/coord/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package routing
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
Expand All @@ -28,9 +28,6 @@ type Bootstrap[K kad.Key[K], N kad.NodeID[K]] struct {
// qry is the query used by the bootstrap process
qry *query.Query[K, N, any]

// qryMu guards access to qry
qryMu sync.RWMutex

// cfg is a copy of the optional configuration supplied to the Bootstrap
cfg BootstrapConfig

Expand All @@ -45,6 +42,9 @@ type Bootstrap[K kad.Key[K], N kad.NodeID[K]] struct {

// gaugeRunning is a gauge that tracks whether the bootstrap is running.
gaugeRunning metric.Int64ObservableGauge

// running records whether the bootstrap is running after the last state change so that it can be read asynchronously by gaugeRunning
running atomic.Bool
}

// BootstrapConfig specifies optional configuration for a Bootstrap
Expand Down Expand Up @@ -167,12 +167,10 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
metric.WithDescription("Whether or not the bootstrap is running"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
b.qryMu.RLock()
defer b.qryMu.RUnlock()
if b.qry == nil {
o.Observe(0)
} else {
if b.running.Load() {
o.Observe(1)
} else {
o.Observe(0)
}
return nil
}),
Expand All @@ -185,15 +183,16 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
}

// Advance advances the state of the bootstrap by attempting to advance its query if running.
func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState {
func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) (out BootstrapState) {
ctx, span := b.cfg.Tracer.Start(ctx, "Bootstrap.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer span.End()
defer func() {
b.running.Store(b.qry != nil) // record whether the bootstrap is still running for metrics
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

switch tev := ev.(type) {
case *EventBootstrapStart[K, N]:
b.qryMu.Lock()
defer b.qryMu.Unlock()

if b.qry != nil {
return b.advanceQuery(ctx, &query.EventQueryPoll{})
}
Expand Down Expand Up @@ -233,8 +232,6 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst
panic(fmt.Sprintf("unexpected event: %T", tev))
}

b.qryMu.Lock()
defer b.qryMu.Unlock()
if b.qry != nil {
return b.advanceQuery(ctx, &query.EventQueryPoll{})
}
Expand Down
72 changes: 32 additions & 40 deletions v2/internal/coord/routing/explore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -49,12 +49,6 @@ type Explore[K kad.Key[K], N kad.NodeID[K]] struct {
// qryCpl is the cpl the current query is exploring for
qryCpl int

// cplAttributeSet holds the current cpl being explored in an attribute that may be used with metrics
cplAttributeSet attribute.Set

// qryMu guards access to qry, qryCpl and cplAttributeSet
qryMu sync.RWMutex

// cfg is a copy of the optional configuration supplied to the Explore
cfg ExploreConfig

Expand All @@ -71,6 +65,12 @@ type Explore[K kad.Key[K], N kad.NodeID[K]] struct {

// gaugeRunning is a gauge that tracks whether an explore is running.
gaugeRunning metric.Int64ObservableGauge

// running records whether an explore is running after the last state change so that it can be read asynchronously by gaugeRunning
running atomic.Bool

// cplAttributeSet holds the current cpl being explored in an attribute that may be used with metrics
cplAttributeSet atomic.Value // holds a [attribute.Set]
}

// NodeIDForCplFunc is a function that given a cpl generates a [kad.NodeID] with a key that has
Expand Down Expand Up @@ -180,6 +180,7 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
qryCpl: -1,
schedule: schedule,
}
e.cplAttributeSet.Store(attribute.NewSet())

var err error
e.counterFindSent, err = cfg.Meter.Int64Counter(
Expand Down Expand Up @@ -214,12 +215,10 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
metric.WithDescription("Whether or not the an explore is running for a cpl"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
e.qryMu.RLock()
defer e.qryMu.RUnlock()
if e.qry == nil {
o.Observe(0)
if e.running.Load() {
o.Observe(1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))
} else {
o.Observe(1, metric.WithAttributeSet(e.cplAttributeSet))
o.Observe(0)
}
return nil
}),
Expand All @@ -232,27 +231,25 @@ func NewExplore[K kad.Key[K], N kad.NodeID[K]](self N, rt RoutingTableCpl[K, N],
}

// Advance advances the state of the explore by attempting to advance its query if running.
func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) ExploreState {
func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) (out ExploreState) {
ctx, span := e.cfg.Tracer.Start(ctx, "Explore.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer span.End()
defer func() {
e.running.Store(e.qry != nil)
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

switch tev := ev.(type) {
case *EventExplorePoll:
// ignore, nothing to do
case *EventExploreFindCloserResponse[K, N]:
e.qryMu.RLock()
defer e.qryMu.RUnlock()

e.counterFindSucceeded.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet))
e.counterFindSucceeded.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))
return e.advanceQuery(ctx, &query.EventQueryNodeResponse[K, N]{
NodeID: tev.NodeID,
CloserNodes: tev.CloserNodes,
})
case *EventExploreFindCloserFailure[K, N]:
e.qryMu.RLock()
defer e.qryMu.RUnlock()

e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet))
e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))
span.RecordError(tev.Error)
return e.advanceQuery(ctx, &query.EventQueryNodeFailure[K, N]{
NodeID: tev.NodeID,
Expand All @@ -262,9 +259,6 @@ func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) ExploreSta
panic(fmt.Sprintf("unexpected event: %T", tev))
}

e.qryMu.Lock()
defer e.qryMu.Unlock()

// if query is running, give it a chance to advance
if e.qry != nil {
return e.advanceQuery(ctx, &query.EventQueryPoll{})
Expand Down Expand Up @@ -302,7 +296,7 @@ func (e *Explore[K, N]) Advance(ctx context.Context, ev ExploreEvent) ExploreSta
}
e.qry = qry
e.qryCpl = cpl
e.cplAttributeSet = attribute.NewSet(attribute.Int("cpl", cpl))
e.cplAttributeSet.Store(attribute.NewSet(attribute.Int("cpl", cpl)))

return e.advanceQuery(ctx, &query.EventQueryPoll{})
}
Expand All @@ -311,12 +305,10 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent)
ctx, span := e.cfg.Tracer.Start(ctx, "Explore.advanceQuery")
defer span.End()

// e.qryMu is held by Advance

state := e.qry.Advance(ctx, qev)
switch st := state.(type) {
case *query.StateQueryFindCloser[K, N]:
e.counterFindSent.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet))
e.counterFindSent.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))
return &StateExploreFindCloser[K, N]{
Cpl: e.qryCpl,
QueryID: st.QueryID,
Expand All @@ -326,21 +318,17 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent)
}
case *query.StateQueryFinished[K, N]:
span.SetAttributes(attribute.String("out_state", "StateExploreFinished"))
e.qry = nil
e.qryCpl = -1
e.cplAttributeSet = attribute.NewSet()
e.clearQuery()
return &StateExploreQueryFinished{
Cpl: e.qryCpl,
Stats: st.Stats,
}
case *query.StateQueryWaitingAtCapacity:
elapsed := e.cfg.Clock.Since(st.Stats.Start)
if elapsed > e.cfg.Timeout {
e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet))
e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))
span.SetAttributes(attribute.String("out_state", "StateExploreTimeout"))
e.qry = nil
e.qryCpl = -1
e.cplAttributeSet = attribute.NewSet()
e.clearQuery()
return &StateExploreQueryTimeout{
Cpl: e.qryCpl,
Stats: st.Stats,
Expand All @@ -354,11 +342,9 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent)
case *query.StateQueryWaitingWithCapacity:
elapsed := e.cfg.Clock.Since(st.Stats.Start)
if elapsed > e.cfg.Timeout {
e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet))
e.counterFindFailed.Add(ctx, 1, metric.WithAttributeSet(e.cplAttributeSet.Load().(attribute.Set)))
span.SetAttributes(attribute.String("out_state", "StateExploreTimeout"))
e.qry = nil
e.qryCpl = -1
e.cplAttributeSet = attribute.NewSet()
e.clearQuery()
return &StateExploreQueryTimeout{
Cpl: e.qryCpl,
Stats: st.Stats,
Expand All @@ -374,6 +360,12 @@ func (e *Explore[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent)
}
}

func (e *Explore[K, N]) clearQuery() {
e.qry = nil
e.qryCpl = -1
e.cplAttributeSet.Store(attribute.NewSet())
}

// ExploreState is the state of an [Explore].
type ExploreState interface {
exploreState()
Expand Down
19 changes: 6 additions & 13 deletions v2/internal/coord/routing/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package routing
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
Expand All @@ -30,9 +30,6 @@ type Include[K kad.Key[K], N kad.NodeID[K]] struct {
// candidates is a list of nodes that are candidates for adding to the routing table
candidates *nodeQueue[K, N]

// candidatesMu guards access to candidates
candidatesMu sync.RWMutex

// cfg is a copy of the optional configuration supplied to the Include
cfg IncludeConfig

Expand All @@ -53,6 +50,9 @@ type Include[K kad.Key[K], N kad.NodeID[K]] struct {

// gaugeCandidateCount is a gauge that tracks the number of nodes in the probe's pending queue of scheduled checks.
gaugeCandidateCount metric.Int64ObservableGauge

// candidateCount holds the number of candidate nodes after the last state change so that it can be read asynchronously by gaugeCandidateCount
candidateCount atomic.Int64
}

// IncludeConfig specifies optional configuration for an Include
Expand Down Expand Up @@ -187,9 +187,7 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
metric.WithDescription("Total number of nodes in the include state machine's candidate queue"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
in.candidatesMu.RLock()
defer in.candidatesMu.RUnlock()
o.Observe(int64(in.candidates.Len()))
o.Observe(in.candidateCount.Load())
return nil
}),
)
Expand All @@ -204,6 +202,7 @@ func NewInclude[K kad.Key[K], N kad.NodeID[K]](rt kad.RoutingTable[K, N], cfg *I
func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out IncludeState) {
ctx, span := in.cfg.Tracer.Start(ctx, "Include.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
in.candidateCount.Store(int64(in.candidates.Len()))
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()
Expand All @@ -222,14 +221,11 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl
}

// TODO: potentially time out a check and make room in the queue
in.candidatesMu.Lock()
if !in.candidates.HasCapacity() {
in.candidatesMu.Unlock()
in.counterCandidatesDroppedCapacity.Add(ctx, 1)
return &StateIncludeWaitingFull{}
}
in.candidates.Enqueue(ctx, tev.NodeID)
in.candidatesMu.Unlock()

case *EventIncludeConnectivityCheckSuccess[K, N]:
in.counterChecksPassed.Add(ctx, 1)
Expand All @@ -253,9 +249,6 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl
panic(fmt.Sprintf("unexpected event: %T", tev))
}

in.candidatesMu.Lock()
defer in.candidatesMu.Unlock()

if len(in.checks) == in.cfg.Concurrency {
if !in.candidates.HasCapacity() {
in.counterCandidatesDroppedCapacity.Add(ctx, 1)
Expand Down
Loading

0 comments on commit feda524

Please sign in to comment.