Skip to content

Commit

Permalink
Add metrics to routing state machines (#939)
Browse files Browse the repository at this point in the history
* Add metrics to routing state machines

* Simplify use of gauges with atomics
  • Loading branch information
iand authored Sep 27, 2023
1 parent dd5e537 commit dedca86
Show file tree
Hide file tree
Showing 12 changed files with 558 additions and 73 deletions.
1 change: 1 addition & 0 deletions v2/internal/coord/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func DefaultCoordinatorConfig() *CoordinatorConfig {
cfg.Routing.Clock = cfg.Clock
cfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
cfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
cfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)

return cfg
}
Expand Down
20 changes: 20 additions & 0 deletions v2/internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/benbjohnson/clock"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"golang.org/x/exp/slog"

Expand Down Expand Up @@ -39,6 +40,9 @@ type RoutingConfig struct {
// Tracer is the tracer that should be used to trace execution.
Tracer trace.Tracer

// Meter is the meter that should be used to record metrics.
Meter metric.Meter

// BootstrapTimeout is the time the behaviour should wait before terminating a bootstrap if it is not making progress.
BootstrapTimeout time.Duration

Expand Down Expand Up @@ -118,6 +122,13 @@ func (cfg *RoutingConfig) Validate() error {
}
}

if cfg.Meter == nil {
return &errs.ConfigurationError{
Component: "RoutingConfig",
Err: fmt.Errorf("meter must not be nil"),
}
}

if cfg.BootstrapTimeout < 1 {
return &errs.ConfigurationError{
Component: "RoutingConfig",
Expand Down Expand Up @@ -246,6 +257,7 @@ func DefaultRoutingConfig() *RoutingConfig {
Clock: clock.New(),
Logger: tele.DefaultLogger("coord"),
Tracer: tele.NoopTracer(),
Meter: tele.NoopMeter(),

BootstrapTimeout: 5 * time.Minute, // MAGIC
BootstrapRequestConcurrency: 3, // MAGIC
Expand Down Expand Up @@ -304,6 +316,8 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,

bootstrapCfg := routing.DefaultBootstrapConfig()
bootstrapCfg.Clock = cfg.Clock
bootstrapCfg.Tracer = cfg.Tracer
bootstrapCfg.Meter = cfg.Meter
bootstrapCfg.Timeout = cfg.BootstrapTimeout
bootstrapCfg.RequestConcurrency = cfg.BootstrapRequestConcurrency
bootstrapCfg.RequestTimeout = cfg.BootstrapRequestTimeout
Expand All @@ -315,6 +329,8 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,

includeCfg := routing.DefaultIncludeConfig()
includeCfg.Clock = cfg.Clock
includeCfg.Tracer = cfg.Tracer
includeCfg.Meter = cfg.Meter
includeCfg.Timeout = cfg.ConnectivityCheckTimeout
includeCfg.QueueCapacity = cfg.IncludeQueueCapacity
includeCfg.Concurrency = cfg.IncludeRequestConcurrency
Expand All @@ -326,6 +342,8 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,

probeCfg := routing.DefaultProbeConfig()
probeCfg.Clock = cfg.Clock
probeCfg.Tracer = cfg.Tracer
probeCfg.Meter = cfg.Meter
probeCfg.Timeout = cfg.ConnectivityCheckTimeout
probeCfg.Concurrency = cfg.ProbeRequestConcurrency
probeCfg.CheckInterval = cfg.ProbeCheckInterval
Expand All @@ -337,6 +355,8 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,

exploreCfg := routing.DefaultExploreConfig()
exploreCfg.Clock = cfg.Clock
exploreCfg.Tracer = cfg.Tracer
exploreCfg.Meter = cfg.Meter
exploreCfg.Timeout = cfg.ExploreTimeout
exploreCfg.RequestConcurrency = cfg.ExploreRequestConcurrency
exploreCfg.RequestTimeout = cfg.ExploreRequestTimeout
Expand Down
115 changes: 107 additions & 8 deletions v2/internal/coord/routing/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package routing
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/benbjohnson/clock"
"github.com/plprobelab/go-kademlia/kad"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"

"github.com/libp2p/go-libp2p-kad-dht/v2/errs"
Expand All @@ -28,6 +30,21 @@ type Bootstrap[K kad.Key[K], N kad.NodeID[K]] struct {

// cfg is a copy of the optional configuration supplied to the Bootstrap
cfg BootstrapConfig

// counterFindSent is a counter that tracks the number of requests to find closer nodes sent.
counterFindSent metric.Int64Counter

// counterFindSucceeded is a counter that tracks the number of requests to find closer nodes that succeeded.
counterFindSucceeded metric.Int64Counter

// counterFindFailed is a counter that tracks the number of requests to find closer nodes that failed.
counterFindFailed metric.Int64Counter

// gaugeRunning is a gauge that tracks whether the bootstrap is running.
gaugeRunning metric.Int64ObservableGauge

// running records whether the bootstrap is running after the last state change so that it can be read asynchronously by gaugeRunning
running atomic.Bool
}

// BootstrapConfig specifies optional configuration for a Bootstrap
Expand All @@ -36,6 +53,12 @@ type BootstrapConfig struct {
RequestConcurrency int // the maximum number of concurrent requests that each query may have in flight
RequestTimeout time.Duration // the timeout queries should use for contacting a single node
Clock clock.Clock // a clock that may replaced by a mock when testing

// Tracer is the tracer that should be used to trace execution.
Tracer trace.Tracer

// Meter is the meter that should be used to record metrics.
Meter metric.Meter
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand All @@ -47,6 +70,20 @@ func (cfg *BootstrapConfig) Validate() error {
}
}

if cfg.Tracer == nil {
return &errs.ConfigurationError{
Component: "BootstrapConfig",
Err: fmt.Errorf("tracer must not be nil"),
}
}

if cfg.Meter == nil {
return &errs.ConfigurationError{
Component: "BootstrapConfig",
Err: fmt.Errorf("meter must not be nil"),
}
}

if cfg.Timeout < 1 {
return &errs.ConfigurationError{
Component: "BootstrapConfig",
Expand Down Expand Up @@ -75,7 +112,10 @@ func (cfg *BootstrapConfig) Validate() error {
// Options may be overridden before passing to NewBootstrap
func DefaultBootstrapConfig() *BootstrapConfig {
return &BootstrapConfig{
Clock: clock.New(), // use standard time
Clock: clock.New(), // use standard time
Tracer: tele.NoopTracer(),
Meter: tele.NoopMeter(),

Timeout: 5 * time.Minute, // MAGIC
RequestConcurrency: 3, // MAGIC
RequestTimeout: time.Minute, // MAGIC
Expand All @@ -89,20 +129,74 @@ func NewBootstrap[K kad.Key[K], N kad.NodeID[K]](self N, cfg *BootstrapConfig) (
return nil, err
}

return &Bootstrap[K, N]{
b := &Bootstrap[K, N]{
self: self,
cfg: *cfg,
}, nil
}

var err error
b.counterFindSent, err = cfg.Meter.Int64Counter(
"bootstrap_find_sent",
metric.WithDescription("Total number of find closer nodes requests sent by the bootstrap state machine"),
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create bootstrap_find_sent counter: %w", err)
}

b.counterFindSucceeded, err = cfg.Meter.Int64Counter(
"bootstrap_find_succeeded",
metric.WithDescription("Total number of find closer nodes requests sent by the bootstrap state machine that were successful"),
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create bootstrap_find_succeeded counter: %w", err)
}

b.counterFindFailed, err = cfg.Meter.Int64Counter(
"bootstrap_find_failed",
metric.WithDescription("Total number of find closer nodes requests sent by the bootstrap state machine that failed"),
metric.WithUnit("1"),
)
if err != nil {
return nil, fmt.Errorf("create bootstrap_find_failed counter: %w", err)
}

b.gaugeRunning, err = cfg.Meter.Int64ObservableGauge(
"bootstrap_running",
metric.WithDescription("Whether or not the bootstrap is running"),
metric.WithUnit("1"),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
if b.running.Load() {
o.Observe(1)
} else {
o.Observe(0)
}
return nil
}),
)
if err != nil {
return nil, fmt.Errorf("create bootstrap_running gauge: %w", err)
}

return b, nil
}

// Advance advances the state of the bootstrap by attempting to advance its query if running.
func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) BootstrapState {
ctx, span := tele.StartSpan(ctx, "Bootstrap.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer span.End()
func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) (out BootstrapState) {
ctx, span := b.cfg.Tracer.Start(ctx, "Bootstrap.Advance", trace.WithAttributes(tele.AttrInEvent(ev)))
defer func() {
b.running.Store(b.qry != nil) // record whether the bootstrap is still running for metrics
span.SetAttributes(tele.AttrOutEvent(out))
span.End()
}()

switch tev := ev.(type) {
case *EventBootstrapStart[K, N]:
// TODO: ignore start event if query is already in progress
if b.qry != nil {
return b.advanceQuery(ctx, &query.EventQueryPoll{})
}

iter := query.NewClosestNodesIter[K, N](b.self.Key())

qryCfg := query.DefaultQueryConfig()
Expand All @@ -119,11 +213,13 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst
return b.advanceQuery(ctx, &query.EventQueryPoll{})

case *EventBootstrapFindCloserResponse[K, N]:
b.counterFindSucceeded.Add(ctx, 1)
return b.advanceQuery(ctx, &query.EventQueryNodeResponse[K, N]{
NodeID: tev.NodeID,
CloserNodes: tev.CloserNodes,
})
case *EventBootstrapFindCloserFailure[K, N]:
b.counterFindFailed.Add(ctx, 1)
span.RecordError(tev.Error)
return b.advanceQuery(ctx, &query.EventQueryNodeFailure[K, N]{
NodeID: tev.NodeID,
Expand All @@ -144,11 +240,12 @@ func (b *Bootstrap[K, N]) Advance(ctx context.Context, ev BootstrapEvent) Bootst
}

func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent) BootstrapState {
ctx, span := tele.StartSpan(ctx, "Bootstrap.advanceQuery")
ctx, span := b.cfg.Tracer.Start(ctx, "Bootstrap.advanceQuery")
defer span.End()
state := b.qry.Advance(ctx, qev)
switch st := state.(type) {
case *query.StateQueryFindCloser[K, N]:
b.counterFindSent.Add(ctx, 1)
span.SetAttributes(attribute.String("out_state", "StateQueryFindCloser"))
return &StateBootstrapFindCloser[K, N]{
QueryID: st.QueryID,
Expand All @@ -164,6 +261,7 @@ func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent
case *query.StateQueryWaitingAtCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
b.counterFindFailed.Add(ctx, 1)
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
Expand All @@ -176,6 +274,7 @@ func (b *Bootstrap[K, N]) advanceQuery(ctx context.Context, qev query.QueryEvent
case *query.StateQueryWaitingWithCapacity:
elapsed := b.cfg.Clock.Since(st.Stats.Start)
if elapsed > b.cfg.Timeout {
b.counterFindFailed.Add(ctx, 1)
span.SetAttributes(attribute.String("out_state", "StateBootstrapTimeout"))
return &StateBootstrapTimeout{
Stats: st.Stats,
Expand Down
12 changes: 12 additions & 0 deletions v2/internal/coord/routing/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ func TestBootstrapConfigValidate(t *testing.T) {
require.Error(t, cfg.Validate())
})

t.Run("tracer is not nil", func(t *testing.T) {
cfg := DefaultBootstrapConfig()
cfg.Tracer = nil
require.Error(t, cfg.Validate())
})

t.Run("meter is not nil", func(t *testing.T) {
cfg := DefaultBootstrapConfig()
cfg.Meter = nil
require.Error(t, cfg.Validate())
})

t.Run("timeout positive", func(t *testing.T) {
cfg := DefaultBootstrapConfig()
cfg.Timeout = 0
Expand Down
Loading

0 comments on commit dedca86

Please sign in to comment.