Skip to content

Commit

Permalink
fix: flaky test by skipping connectivity checks
Browse files Browse the repository at this point in the history
By skipping connectivity checks we reduce the chances of simultaneously opening a stream that will block connection establishment.

Context: libp2p/go-libp2p#2589
  • Loading branch information
dennis-tra committed Sep 29, 2023
1 parent 509eee4 commit 042bba3
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 11 deletions.
15 changes: 10 additions & 5 deletions v2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,21 @@ type QueryConfig struct {
// operation. A DefaultQuorum of 0 means that we search the network until
// we have exhausted the keyspace.
DefaultQuorum int

// SkipConnectivityCheck defines whether we do a connectivity check before
// we add peers to the routing table.
SkipConnectivityCheck bool
}

// DefaultQueryConfig returns the default query configuration options for a DHT.
func DefaultQueryConfig() *QueryConfig {
return &QueryConfig{
Concurrency: 3, // MAGIC
Timeout: 5 * time.Minute, // MAGIC
RequestConcurrency: 3, // MAGIC
RequestTimeout: time.Minute, // MAGIC
DefaultQuorum: 0, // MAGIC
Concurrency: 3, // MAGIC
Timeout: 5 * time.Minute, // MAGIC
RequestConcurrency: 3, // MAGIC
RequestTimeout: time.Minute, // MAGIC
DefaultQuorum: 0, // MAGIC
SkipConnectivityCheck: false,
}
}

Expand Down
1 change: 1 addition & 0 deletions v2/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func New(h host.Host, cfg *Config) (*DHT, error) {
coordCfg.Routing.Logger = cfg.Logger.With("behaviour", "routing")
coordCfg.Routing.Tracer = cfg.TracerProvider.Tracer(tele.TracerName)
coordCfg.Routing.Meter = cfg.MeterProvider.Meter(tele.MeterName)
coordCfg.Routing.IncludeSkipCheck = cfg.Query.SkipConnectivityCheck

rtr := &router{
host: h,
Expand Down
21 changes: 21 additions & 0 deletions v2/internal/coord/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ type RoutingConfig struct {
// ProbeCheckInterval is the time interval the behaviour should use between connectivity checks for the same node in the routing table.
ProbeCheckInterval time.Duration

// IncludeSkipCheck indicates whether we perform connectivity checks before we add a peer to the routing table.
IncludeSkipCheck bool

// IncludeQueueCapacity is the maximum number of nodes the behaviour should keep queued as candidates for inclusion in the routing table.
IncludeQueueCapacity int

Expand Down Expand Up @@ -268,6 +271,7 @@ func DefaultRoutingConfig() *RoutingConfig {
ProbeRequestConcurrency: 3, // MAGIC
ProbeCheckInterval: 6 * time.Hour, // MAGIC

IncludeSkipCheck: false,
IncludeRequestConcurrency: 3, // MAGIC
IncludeQueueCapacity: 128, // MAGIC

Expand Down Expand Up @@ -307,6 +311,22 @@ type RoutingBehaviour struct {
ready chan struct{}
}

type Recording2SM[E any, S any] struct {
State S
Received E
}

func NewRecording2SM[E any, S any](response S) *Recording2SM[E, S] {
return &Recording2SM[E, S]{
State: response,
}

Check warning on line 322 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L319-L322

Added lines #L319 - L322 were not covered by tests
}

func (r *Recording2SM[E, S]) Advance(ctx context.Context, e E) S {
r.Received = e
return r.State

Check warning on line 327 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L325-L327

Added lines #L325 - L327 were not covered by tests
}

func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key, kadt.PeerID], cfg *RoutingConfig) (*RoutingBehaviour, error) {
if cfg == nil {
cfg = DefaultRoutingConfig()

Check warning on line 332 in v2/internal/coord/routing.go

View check run for this annotation

Codecov / codecov/patch

v2/internal/coord/routing.go#L332

Added line #L332 was not covered by tests
Expand All @@ -331,6 +351,7 @@ func NewRoutingBehaviour(self kadt.PeerID, rt routing.RoutingTableCpl[kadt.Key,
includeCfg.Clock = cfg.Clock
includeCfg.Tracer = cfg.Tracer
includeCfg.Meter = cfg.Meter
includeCfg.SkipCheck = cfg.IncludeSkipCheck
includeCfg.Timeout = cfg.ConnectivityCheckTimeout
includeCfg.QueueCapacity = cfg.IncludeQueueCapacity
includeCfg.Concurrency = cfg.IncludeRequestConcurrency
Expand Down
19 changes: 13 additions & 6 deletions v2/internal/coord/routing/include.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,9 @@ type IncludeConfig struct {
Concurrency int // the maximum number of include checks that may be in progress at any one time
Timeout time.Duration // the time to wait before terminating a check that is not making progress
Clock clock.Clock // a clock that may replaced by a mock when testing

// Tracer is the tracer that should be used to trace execution.
Tracer trace.Tracer

// Meter is the meter that should be used to record metrics.
Meter metric.Meter
SkipCheck bool // whether to skip connectivity checks and add any node passed to this state machine
Tracer trace.Tracer // Tracer is the tracer that should be used to trace execution.
Meter metric.Meter // Meter is the meter that should be used to record metrics.
}

// Validate checks the configuration options and returns an error if any have invalid values.
Expand Down Expand Up @@ -124,6 +121,7 @@ func DefaultIncludeConfig() *IncludeConfig {
Tracer: tele.NoopTracer(),
Meter: tele.NoopMeter(),

SkipCheck: false,
Concurrency: 3,
Timeout: time.Minute,
QueueCapacity: 128,
Expand Down Expand Up @@ -209,6 +207,15 @@ func (in *Include[K, N]) Advance(ctx context.Context, ev IncludeEvent) (out Incl

switch tev := ev.(type) {
case *EventIncludeAddCandidate[K, N]:

if in.cfg.SkipCheck {
if in.rt.AddNode(tev.NodeID) {
return &StateIncludeRoutingUpdated[K, N]{NodeID: tev.NodeID}
} else {
return &StateIncludeIdle{}
}
}

// Ignore if already running a check
_, checking := in.checks[key.HexString(tev.NodeID.Key())]
if checking {
Expand Down
2 changes: 2 additions & 0 deletions v2/routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ func (suite *SearchValueQuorumTestSuite) SetupTest() {

cfg := DefaultConfig()
cfg.Clock = clk
cfg.Query.SkipConnectivityCheck = true

top := NewTopology(t)

// init privileged DHT server
Expand Down

0 comments on commit 042bba3

Please sign in to comment.