Skip to content

Commit

Permalink
Refactor pipeline and node pool components
Browse files Browse the repository at this point in the history
Refactor to decouple date provider and introduce Auditor usage.

Renamed and altered functions within pipeline components to utilize the `Auditor` class, improving flexibility and modularity. Removed the embedded `DateTimeProvider` instance from several classes and ensured that such dependencies are injected or fetched through associated components like the node pools. This change enhances monitoring and logging capabilities during request processing.

Adjust time tolerance in TransportConfigurationTests

Increased the time tolerance for the 'LastUpdate' field comparison from 100 milliseconds to 2 seconds. This change enhances the reliability of the test by accommodating larger variations in timing.

(cherry picked from commit 14207cb)

Simplify RequestPipeline and reuse a singleton instance if we can

Add DateTimeProvider and RequestPipelineFactory properties

This commit introduces `DateTimeProvider` and `RequestPipelineFactory` properties to the transport configuration. The changes ensure that these properties are properly initialized and accessed throughout various components, enhancing configurability and testability of date and request pipeline behaviors.
  • Loading branch information
Mpdreamz committed Nov 1, 2024
1 parent 5c18b96 commit 95ea6c6
Show file tree
Hide file tree
Showing 32 changed files with 778 additions and 865 deletions.
2 changes: 1 addition & 1 deletion src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu

public IEnumerable<Diagnostics.Auditing.Audit> AsyncAuditTrail { get; set; }
public IEnumerable<Diagnostics.Auditing.Audit> AuditTrail { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; }

public TransportResponse Response { get; internal set; }
public TransportResponse ResponseAsync { get; internal set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory
where TConfiguration : class, ITransportConfiguration
{
public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider)
public ExposingPipelineFactory(TConfiguration configuration)
{
DateTimeProvider = dateTimeProvider;
Configuration = configuration;
Pipeline = Create(new RequestData(Configuration, null, null), DateTimeProvider);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
Transport = new DistributedTransport<TConfiguration>(Configuration);
}

// ReSharper disable once MemberCanBePrivate.Global
public RequestPipeline Pipeline { get; }
private DateTimeProvider DateTimeProvider { get; }
private TConfiguration Configuration { get; }
public ITransport<TConfiguration> RequestHandler { get; }
public ITransport<TConfiguration> Transport { get; }

public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
new DefaultRequestPipeline(requestData, DateTimeProvider);
public override RequestPipeline Create(RequestData requestData) =>
new RequestPipeline(requestData);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
private TransportConfigurationDescriptor CreateSettings() =>
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);


/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
public VirtualizedCluster AllDefaults() =>
new(_dateTimeProvider, CreateSettings());
new(CreateSettings());

/// <summary> Create the cluster using <paramref name="selector"/> to provide configuration changes </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualizedCluster Settings(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector) =>
new(_dateTimeProvider, selector(CreateSettings()));
new(selector(CreateSettings()));

/// <summary>
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
/// </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector = null) =>
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings()))
.Connection;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable<Node> nodes, MockProductRegistration produc
InternalNodes = nodes.ToList();
}

public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
public List<IClientCallRule> ClientCallRules { get; } = new();
private TestableDateTimeProvider TestDateTimeProvider { get; } = new();

protected List<Node> InternalNodes { get; }
public IReadOnlyList<Node> Nodes => InternalNodes;
public List<IRule> PingingRules { get; } = new List<IRule>();
public List<IRule> PingingRules { get; } = new();

public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
public List<ISniffRule> SniffingRules { get; } = new();
internal string PublishAddressOverride { get; private set; }

internal bool SniffShouldReturnFqnd { get; private set; }
Expand Down Expand Up @@ -73,32 +73,34 @@ public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector
public SealedVirtualCluster SingleNodeConnection(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StaticNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
var dateTimeProvider = TestDateTimeProvider;
var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider };
return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster SniffingNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickyNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickySniffingNodePool(Func<Node, float> sorter = null,
Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null
)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse;

private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");

internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings)
internal VirtualizedCluster(TransportConfigurationDescriptor settings)
{
_dateTimeProvider = dateTimeProvider;
_settings = settings;
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings, _dateTimeProvider);
_dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider
?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider));
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings);

_syncCall = (t, r) => t.Request<VirtualResponse>(
path: RootPath,
Expand All @@ -52,7 +53,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport

public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.Connection as VirtualClusterRequestInvoker;
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.Transport;

public VirtualizedCluster TransportProxiesTo(
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,
Expand Down
8 changes: 2 additions & 6 deletions src/Elastic.Transport/Components/NodePool/CloudNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool
/// <para> Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html</para>
/// </param>
/// <param name="credentials"></param>
/// <param name="dateTimeProvider">Optionally inject an instance of <see cref="DateTimeProvider"/> used to set <see cref="NodePool.LastUpdate"/></param>
public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) =>
public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) =>
AuthenticationHeader = credentials;

private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) =>
private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) =>
ClusterName = parsedCloudId.Name;

//TODO implement debugger display for NodePool implementations and display it there and its ToString()
Expand Down Expand Up @@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId)

return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}"));
}

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
14 changes: 6 additions & 8 deletions src/Elastic.Transport/Components/NodePool/NodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable
{
private bool _disposed;

internal NodePool() { }

/// <summary>
/// The last time that this instance was updated.
/// </summary>
public abstract DateTimeOffset LastUpdate { get; protected set; }
public abstract DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc cref="DateTimeProvider"/>>
public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default;

/// <summary>
/// Returns the default maximum retries for the connection pool implementation.
Expand Down Expand Up @@ -82,18 +83,15 @@ public void Dispose()
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
_disposed = true;
}
if (!_disposed) _disposed = true;
}

/// <summary>
/// Creates a view over the nodes, with changing starting positions, that wraps over on each call
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
/// if there are no live nodes yields a different dead node to try once
/// </summary>
public abstract IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null);
public abstract IEnumerable<Node> CreateView(Auditor? auditor = null);

/// <summary>
/// Reseeds the nodes. The implementation is responsible for thread safety.
Expand Down
10 changes: 3 additions & 7 deletions src/Elastic.Transport/Components/NodePool/SingleNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ namespace Elastic.Transport;
public class SingleNodePool : NodePool
{
/// <inheritdoc cref="SingleNodePool"/>
public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public SingleNodePool(Uri uri)
{
var node = new Node(uri);
UsingSsl = node.Uri.Scheme == "https";
Nodes = new List<Node> { node };
LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now();
}

/// <inheritdoc />
public override DateTimeOffset LastUpdate { get; protected set; }
public override DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc />
public override int MaxRetries => 0;
Expand All @@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public override bool UsingSsl { get; protected set; }

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null) => Nodes;
public override IEnumerable<Node> CreateView(Auditor? auditor) => Nodes;

/// <inheritdoc />
public override void Reseed(IEnumerable<Node> nodes) { } //ignored

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
13 changes: 5 additions & 8 deletions src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool
private readonly ReaderWriterLockSlim _readerWriter = new();

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(uris, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true) : base(uris, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(nodes, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true) : base(nodes, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, DateTimeProvider dateTimeProvider = null)
: base(nodes, nodeScorer, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer) : base(nodes, nodeScorer) { }

/// <inheritdoc />
public override IReadOnlyCollection<Node> Nodes
Expand Down Expand Up @@ -81,12 +78,12 @@ public override void Reseed(IEnumerable<Node> nodes)
}

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
public override IEnumerable<Node> CreateView(Auditor? auditor)
{
_readerWriter.EnterReadLock();
try
{
return base.CreateView(audit);
return base.CreateView(auditor);
}
finally
{
Expand Down
Loading

0 comments on commit 95ea6c6

Please sign in to comment.