From 95ea6c6f4d6da04641bed5029cdcc8ae66e0558c Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Fri, 1 Nov 2024 13:08:02 +0100 Subject: [PATCH] Refactor pipeline and node pool components Refactor to decouple date provider and introduce Auditor usage. Renamed and altered functions within pipeline components to utilize the `Auditor` class, improving flexibility and modularity. Removed the embedded `DateTimeProvider` instance from several classes and ensured that such dependencies are injected or fetched through associated components like the node pools. This change enhances monitoring and logging capabilities during request processing. Adjust time tolerance in TransportConfigurationTests Increased the time tolerance for the 'LastUpdate' field comparison from 100 milliseconds to 2 seconds. This change enhances the reliability of the test by accommodating larger variations in timing. (cherry picked from commit 14207cb2a36fbb2f6b7335734c6750ebeae2503a) Simplify RequestPipeline and reuse a singleton instance if we can Add DateTimeProvider and RequestPipelineFactory properties This commit introduces `DateTimeProvider` and `RequestPipelineFactory` properties to the transport configuration. The changes ensure that these properties are properly initialized and accessed throughout various components, enhancing configurability and testability of date and request pipeline behaviors. --- .../Audit/Auditor.cs | 2 +- .../Components/ExposingPipelineFactory.cs | 15 +- .../Components/SealedVirtualCluster.cs | 7 +- .../Components/VirtualCluster.cs | 20 +- .../Components/VirtualizedCluster.cs | 9 +- .../Components/NodePool/CloudNodePool.cs | 8 +- .../Components/NodePool/NodePool.cs | 14 +- .../Components/NodePool/SingleNodePool.cs | 10 +- .../Components/NodePool/SniffingNodePool.cs | 13 +- .../Components/NodePool/StaticNodePool.cs | 48 +- .../Components/NodePool/StickyNodePool.cs | 12 +- .../NodePool/StickySniffingNodePool.cs | 14 +- .../Pipeline/DefaultRequestPipeline.cs | 559 ----------------- .../Components/Pipeline/PipelineException.cs | 2 +- .../Components/Pipeline/PipelineFailure.cs | 4 +- .../Components/Pipeline/RequestPipeline.cs | 591 +++++++++++++++--- .../DefaultRequestPipelineFactory.cs | 9 +- .../Providers/RequestPipelineFactory.cs | 2 +- .../Configuration/HeadersList.cs | 15 +- .../Configuration/ITransportConfiguration.cs | 7 + .../Configuration/TransportConfiguration.cs | 21 +- .../TransportConfigurationDescriptor.cs | 21 +- .../Diagnostics/AuditDiagnosticObserver.cs | 2 +- .../Diagnostics/Auditing/Audit.cs | 2 +- .../Diagnostics/Auditing/Auditable.cs | 56 +- .../Diagnostics/DiagnosticSources.cs | 4 +- .../RequestPipelineDiagnosticObserver.cs | 2 +- src/Elastic.Transport/DistributedTransport.cs | 138 ++-- .../Exceptions/TransportException.cs | 4 +- .../Extensions/EmptyEnumerator.cs | 23 + .../Responses/HttpDetails/ApiCallDetails.cs | 6 +- .../TransportConfigurationTests.cs | 3 - 32 files changed, 778 insertions(+), 865 deletions(-) delete mode 100644 src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs create mode 100644 src/Elastic.Transport/Extensions/EmptyEnumerator.cs diff --git a/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs b/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs index eb996fd..5b594cb 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs @@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu public IEnumerable AsyncAuditTrail { get; set; } public IEnumerable AuditTrail { get; set; } - public Func Cluster { get; set; } + public Func Cluster { get; } public TransportResponse Response { get; internal set; } public TransportResponse ResponseAsync { get; internal set; } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs b/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs index b2601fe..59b6b33 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs @@ -11,20 +11,15 @@ namespace Elastic.Transport.VirtualizedCluster.Components; public sealed class ExposingPipelineFactory : RequestPipelineFactory where TConfiguration : class, ITransportConfiguration { - public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider) + public ExposingPipelineFactory(TConfiguration configuration) { - DateTimeProvider = dateTimeProvider; Configuration = configuration; - Pipeline = Create(new RequestData(Configuration, null, null), DateTimeProvider); - RequestHandler = new DistributedTransport(Configuration, this, DateTimeProvider); + Transport = new DistributedTransport(Configuration); } - // ReSharper disable once MemberCanBePrivate.Global - public RequestPipeline Pipeline { get; } - private DateTimeProvider DateTimeProvider { get; } private TConfiguration Configuration { get; } - public ITransport RequestHandler { get; } + public ITransport Transport { get; } - public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) => - new DefaultRequestPipeline(requestData, DateTimeProvider); + public override RequestPipeline Create(RequestData requestData) => + new RequestPipeline(requestData); } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs index f2b1c1b..bbe7651 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs @@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat private TransportConfigurationDescriptor CreateSettings() => new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration); + /// Create the cluster using all defaults on public VirtualizedCluster AllDefaults() => - new(_dateTimeProvider, CreateSettings()); + new(CreateSettings()); /// Create the cluster using to provide configuration changes /// Provide custom configuration options public VirtualizedCluster Settings(Func selector) => - new(_dateTimeProvider, selector(CreateSettings())); + new(selector(CreateSettings())); /// /// Allows you to create an instance of ` using the DSL provided by /// /// Provide custom configuration options public VirtualClusterRequestInvoker VirtualClusterConnection(Func selector = null) => - new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings())) + new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings())) .Connection; } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs index 9f0ac77..4c1025a 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs @@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable nodes, MockProductRegistration produc InternalNodes = nodes.ToList(); } - public List ClientCallRules { get; } = new List(); - public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider(); + public List ClientCallRules { get; } = new(); + private TestableDateTimeProvider TestDateTimeProvider { get; } = new(); protected List InternalNodes { get; } public IReadOnlyList Nodes => InternalNodes; - public List PingingRules { get; } = new List(); + public List PingingRules { get; } = new(); - public List SniffingRules { get; } = new List(); + public List SniffingRules { get; } = new(); internal string PublishAddressOverride { get; private set; } internal bool SniffShouldReturnFqnd { get; private set; } @@ -73,25 +73,27 @@ public VirtualCluster ClientCalls(Func selector public SealedVirtualCluster SingleNodeConnection(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster StaticNodePool(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration); + var dateTimeProvider = TestDateTimeProvider; + var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider }; + return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster SniffingNodePool(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster StickyNodePool(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster StickySniffingNodePool(Func sorter = null, @@ -99,6 +101,6 @@ public SealedVirtualCluster StickySniffingNodePool(Func sorter = nu ) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration); } } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs index 20ddf3f..268ef73 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs @@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse; private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/"); - internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings) + internal VirtualizedCluster(TransportConfigurationDescriptor settings) { - _dateTimeProvider = dateTimeProvider; _settings = settings; - _exposingRequestPipeline = new ExposingPipelineFactory(settings, _dateTimeProvider); + _dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider + ?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider)); + _exposingRequestPipeline = new ExposingPipelineFactory(settings); _syncCall = (t, r) => t.Request( path: RootPath, @@ -52,7 +53,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.Connection as VirtualClusterRequestInvoker; public NodePool ConnectionPool => RequestHandler.Configuration.NodePool; - public ITransport RequestHandler => _exposingRequestPipeline?.RequestHandler; + public ITransport RequestHandler => _exposingRequestPipeline?.Transport; public VirtualizedCluster TransportProxiesTo( Func, Func, TransportResponse> sync, diff --git a/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs b/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs index 54282d9..15cff9c 100644 --- a/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs @@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool /// Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html /// /// - /// Optionally inject an instance of used to set - public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) => + public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) => AuthenticationHeader = credentials; - private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) => + private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) => ClusterName = parsedCloudId.Name; //TODO implement debugger display for NodePool implementations and display it there and its ToString() @@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId) return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}")); } - - /// - protected override void Dispose(bool disposing) => base.Dispose(disposing); } diff --git a/src/Elastic.Transport/Components/NodePool/NodePool.cs b/src/Elastic.Transport/Components/NodePool/NodePool.cs index 97bcc07..592ec5d 100644 --- a/src/Elastic.Transport/Components/NodePool/NodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/NodePool.cs @@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable { private bool _disposed; - internal NodePool() { } - /// /// The last time that this instance was updated. /// - public abstract DateTimeOffset LastUpdate { get; protected set; } + public abstract DateTimeOffset? LastUpdate { get; protected set; } + + /// > + public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default; /// /// Returns the default maximum retries for the connection pool implementation. @@ -82,10 +83,7 @@ public void Dispose() /// protected virtual void Dispose(bool disposing) { - if (!_disposed) - { - _disposed = true; - } + if (!_disposed) _disposed = true; } /// @@ -93,7 +91,7 @@ protected virtual void Dispose(bool disposing) /// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1. /// if there are no live nodes yields a different dead node to try once /// - public abstract IEnumerable CreateView(Action audit = null); + public abstract IEnumerable CreateView(Auditor? auditor = null); /// /// Reseeds the nodes. The implementation is responsible for thread safety. diff --git a/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs b/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs index b1e0424..1d5462a 100644 --- a/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs @@ -12,16 +12,15 @@ namespace Elastic.Transport; public class SingleNodePool : NodePool { /// - public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null) + public SingleNodePool(Uri uri) { var node = new Node(uri); UsingSsl = node.Uri.Scheme == "https"; Nodes = new List { node }; - LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now(); } /// - public override DateTimeOffset LastUpdate { get; protected set; } + public override DateTimeOffset? LastUpdate { get; protected set; } /// public override int MaxRetries => 0; @@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null) public override bool UsingSsl { get; protected set; } /// - public override IEnumerable CreateView(Action audit = null) => Nodes; + public override IEnumerable CreateView(Auditor? auditor) => Nodes; /// public override void Reseed(IEnumerable nodes) { } //ignored - - /// - protected override void Dispose(bool disposing) => base.Dispose(disposing); } diff --git a/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs b/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs index 29ebbbb..1c996f5 100644 --- a/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs @@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool private readonly ReaderWriterLockSlim _readerWriter = new(); /// > - public SniffingNodePool(IEnumerable uris, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : base(uris, randomize, dateTimeProvider) { } + public SniffingNodePool(IEnumerable uris, bool randomize = true) : base(uris, randomize) { } /// > - public SniffingNodePool(IEnumerable nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : base(nodes, randomize, dateTimeProvider) { } + public SniffingNodePool(IEnumerable nodes, bool randomize = true) : base(nodes, randomize) { } /// > - public SniffingNodePool(IEnumerable nodes, Func nodeScorer, DateTimeProvider dateTimeProvider = null) - : base(nodes, nodeScorer, dateTimeProvider) { } + public SniffingNodePool(IEnumerable nodes, Func nodeScorer) : base(nodes, nodeScorer) { } /// public override IReadOnlyCollection Nodes @@ -81,12 +78,12 @@ public override void Reseed(IEnumerable nodes) } /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { _readerWriter.EnterReadLock(); try { - return base.CreateView(audit); + return base.CreateView(auditor); } finally { diff --git a/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs b/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs index 560fbbf..95a77dc 100644 --- a/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs @@ -28,37 +28,36 @@ public class StaticNodePool : NodePool private readonly Func _nodeScorer; /// - public StaticNodePool(IEnumerable uris, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : this(uris.Select(uri => new Node(uri)), randomize, null, dateTimeProvider) { } + public StaticNodePool(IEnumerable uris, bool randomize = true) + : this(uris.Select(uri => new Node(uri)), randomize, null) { } /// - public StaticNodePool(IEnumerable nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : this(nodes, randomize, null, dateTimeProvider) { } + public StaticNodePool(IEnumerable nodes, bool randomize = true) + : this(nodes, randomize, null) { } /// - protected StaticNodePool(IEnumerable nodes, bool randomize, int? randomizeSeed = null, DateTimeProvider dateTimeProvider = null) + protected StaticNodePool(IEnumerable nodes, bool randomize, int? randomizeSeed = null) { Randomize = randomize; Random = !randomize || !randomizeSeed.HasValue ? new Random() : new Random(randomizeSeed.Value); - Initialize(nodes, dateTimeProvider); + Initialize(nodes); } //this constructor is protected because nodeScorer only makes sense on subclasses that support reseeding otherwise just manually sort `nodes` before instantiating. /// - protected StaticNodePool(IEnumerable nodes, Func nodeScorer = null, DateTimeProvider dateTimeProvider = null) + protected StaticNodePool(IEnumerable nodes, Func nodeScorer = null) { _nodeScorer = nodeScorer; - Initialize(nodes, dateTimeProvider); + Initialize(nodes); } - private void Initialize(IEnumerable nodes, DateTimeProvider dateTimeProvider) + private void Initialize(IEnumerable nodes) { var nodesProvided = nodes?.ToList() ?? throw new ArgumentNullException(nameof(nodes)); nodesProvided.ThrowIfEmpty(nameof(nodes)); - DateTimeProvider = dateTimeProvider ?? Elastic.Transport.DefaultDateTimeProvider.Default; string scheme = null; foreach (var node in nodesProvided) @@ -76,11 +75,10 @@ private void Initialize(IEnumerable nodes, DateTimeProvider dateTimeProvid InternalNodes = SortNodes(nodesProvided) .DistinctByCustom(n => n.Uri) .ToList(); - LastUpdate = DateTimeProvider.Now(); } /// - public override DateTimeOffset LastUpdate { get; protected set; } + public override DateTimeOffset? LastUpdate { get; protected set; } /// public override int MaxRetries => InternalNodes.Count - 1; @@ -112,9 +110,6 @@ protected IReadOnlyList AliveNodes } } - /// > - protected DateTimeProvider DateTimeProvider { get; private set; } - /// /// The list of nodes we are operating over. This is protected so that subclasses that DO implement /// can update this list. Its up to subclasses to make this thread safe. @@ -137,7 +132,7 @@ protected IReadOnlyList AliveNodes /// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1. /// if there are no live nodes yields a different dead node to try once /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { var nodes = AliveNodes; @@ -146,13 +141,13 @@ public override IEnumerable CreateView(Action audit = nu if (nodes.Count == 0) { //could not find a suitable node retrying on first node off globalCursor - yield return RetryInternalNodes(globalCursor, audit); + yield return RetryInternalNodes(globalCursor, auditor); yield break; } var localCursor = globalCursor % nodes.Count; - foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) yield return aliveNode; + foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } /// @@ -164,14 +159,13 @@ public override void Reseed(IEnumerable nodes) { } //ignored /// /// /// - /// Trace action to document the fact all nodes were dead and were resurrecting one at random - protected Node RetryInternalNodes(int globalCursor, Action audit = null) + /// Trace action to document the fact all nodes were dead and were resurrecting one at random + protected Node RetryInternalNodes(int globalCursor, Auditor? auditor = null) { - audit?.Invoke(AuditEvent.AllNodesDead, null); + auditor?.Emit(AuditEvent.AllNodesDead); var node = InternalNodes[globalCursor % InternalNodes.Count]; node.IsResurrected = true; - audit?.Invoke(AuditEvent.Resurrection, node); - + auditor?.Emit(AuditEvent.Resurrection, node); return node; } @@ -181,8 +175,8 @@ protected Node RetryInternalNodes(int globalCursor, Action aud /// /// The starting point into from wich to start. /// - /// Trace action to notify if a resurrection occured - protected static IEnumerable SelectAliveNodes(int cursor, IReadOnlyList aliveNodes, Action audit = null) + /// Trace action to notify if a resurrection occured + protected static IEnumerable SelectAliveNodes(int cursor, IReadOnlyList aliveNodes, Auditor? auditor = null) { // ReSharper disable once ForCanBeConvertedToForeach for (var attempts = 0; attempts < aliveNodes.Count; attempts++) @@ -192,7 +186,7 @@ protected static IEnumerable SelectAliveNodes(int cursor, IReadOnlyList SortNodes(IEnumerable nodes) => ? nodes.OrderByDescending(_nodeScorer) : nodes.OrderBy(n => Randomize ? Random.Next() : 1); - /// - protected override void Dispose(bool disposing) => base.Dispose(disposing); } diff --git a/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs b/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs index fab319e..8f40cf8 100644 --- a/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs @@ -16,15 +16,13 @@ namespace Elastic.Transport; public sealed class StickyNodePool : StaticNodePool { /// - public StickyNodePool(IEnumerable uris, DateTimeProvider dateTimeProvider = null) - : base(uris, false, dateTimeProvider) { } + public StickyNodePool(IEnumerable uris) : base(uris, false) { } /// - public StickyNodePool(IEnumerable nodes, DateTimeProvider dateTimeProvider = null) - : base(nodes, false, dateTimeProvider) { } + public StickyNodePool(IEnumerable nodes) : base(nodes, false) { } /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { var nodes = AliveNodes; @@ -33,7 +31,7 @@ public override IEnumerable CreateView(Action audit = nu var globalCursor = Interlocked.Increment(ref GlobalCursor); //could not find a suitable node retrying on first node off globalCursor - yield return RetryInternalNodes(globalCursor, audit); + yield return RetryInternalNodes(globalCursor, auditor); yield break; } @@ -44,7 +42,7 @@ public override IEnumerable CreateView(Action audit = nu Interlocked.Exchange(ref GlobalCursor, -1); var localCursor = 0; - foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) + foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } diff --git a/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs b/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs index e4344f4..fd8f201 100644 --- a/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs @@ -17,12 +17,12 @@ namespace Elastic.Transport; public sealed class StickySniffingNodePool : SniffingNodePool { /// - public StickySniffingNodePool(IEnumerable uris, Func nodeScorer, DateTimeProvider dateTimeProvider = null) - : base(uris.Select(uri => new Node(uri)), nodeScorer ?? DefaultNodeScore, dateTimeProvider) { } + public StickySniffingNodePool(IEnumerable uris, Func nodeScorer) + : base(uris.Select(uri => new Node(uri)), nodeScorer ?? DefaultNodeScore) { } /// - public StickySniffingNodePool(IEnumerable nodes, Func nodeScorer, DateTimeProvider dateTimeProvider = null) - : base(nodes, nodeScorer ?? DefaultNodeScore, dateTimeProvider) { } + public StickySniffingNodePool(IEnumerable nodes, Func nodeScorer) + : base(nodes, nodeScorer ?? DefaultNodeScore) { } /// public override bool SupportsPinging => true; @@ -31,7 +31,7 @@ public StickySniffingNodePool(IEnumerable nodes, Func nodeSco public override bool SupportsReseeding => true; /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { var nodes = AliveNodes; @@ -40,7 +40,7 @@ public override IEnumerable CreateView(Action audit = nu var globalCursor = Interlocked.Increment(ref GlobalCursor); //could not find a suitable node retrying on first node off globalCursor - yield return RetryInternalNodes(globalCursor, audit); + yield return RetryInternalNodes(globalCursor, auditor); yield break; } @@ -51,7 +51,7 @@ public override IEnumerable CreateView(Action audit = nu Interlocked.Exchange(ref GlobalCursor, -1); var localCursor = 0; - foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) + foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } diff --git a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs deleted file mode 100644 index 19b7d64..0000000 --- a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs +++ /dev/null @@ -1,559 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Elastic.Transport.Diagnostics.Auditing; -using Elastic.Transport.Extensions; -using Elastic.Transport.Products; -using static Elastic.Transport.Diagnostics.Auditing.AuditEvent; - -namespace Elastic.Transport; - -/// -public class DefaultRequestPipeline : RequestPipeline -{ - private readonly IRequestInvoker _requestInvoker; - private readonly NodePool _nodePool; - private readonly RequestData _requestData; - private readonly DateTimeProvider _dateTimeProvider; - private readonly MemoryStreamFactory _memoryStreamFactory; - private readonly Func _nodePredicate; - private readonly ProductRegistration _productRegistration; - private readonly ResponseBuilder _responseBuilder; - - private RequestConfiguration? _pingAndSniffRequestConfiguration; - private List? _auditTrail; - private readonly ITransportConfiguration _settings; - - /// - internal DefaultRequestPipeline(RequestData requestData, DateTimeProvider dateTimeProvider) - { - _requestData = requestData; - _settings = requestData.ConnectionSettings; - - _nodePool = requestData.ConnectionSettings.NodePool; - _requestInvoker = requestData.ConnectionSettings.Connection; - _dateTimeProvider = dateTimeProvider; - _memoryStreamFactory = requestData.MemoryStreamFactory; - _productRegistration = requestData.ConnectionSettings.ProductRegistration; - _responseBuilder = _productRegistration.ResponseBuilder; - _nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate; - - StartedOn = dateTimeProvider.Now(); - } - - /// - public override IEnumerable AuditTrail => _auditTrail; - - private RequestConfiguration PingAndSniffRequestConfiguration - { - // Lazily loaded when first required, since not all node pools and configurations support pinging and sniffing. - // This avoids allocating 192B per request for those which do not need to ping or sniff. - get - { - if (_pingAndSniffRequestConfiguration is not null) return _pingAndSniffRequestConfiguration; - - _pingAndSniffRequestConfiguration = new RequestConfiguration - { - PingTimeout = PingTimeout, - RequestTimeout = PingTimeout, - Authentication = _requestData.AuthenticationHeader, - EnableHttpPipelining = _requestData.HttpPipeliningEnabled, - ForceNode = _requestData.ForceNode - }; - - return _pingAndSniffRequestConfiguration; - } - } - - //TODO xmldocs -#pragma warning disable 1591 - public bool DepletedRetries => Retried >= MaxRetries + 1 || IsTakingTooLong; - - public override bool FirstPoolUsageNeedsSniffing => - !RequestDisabledSniff - && _nodePool.SupportsReseeding && _settings.SniffsOnStartup && !_nodePool.SniffedOnStartup; - - public override bool IsTakingTooLong - { - get - { - var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout); - var now = _dateTimeProvider.Now(); - - //we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort. - var margin = timeout.TotalMilliseconds / 100.0 * 98; - var marginTimeSpan = TimeSpan.FromMilliseconds(margin); - var timespanCall = now - StartedOn; - var tookToLong = timespanCall >= marginTimeSpan; - return tookToLong; - } - } - - public override int MaxRetries => _requestData.MaxRetries; - - public bool Refresh { get; private set; } - - public int Retried { get; private set; } - - public IEnumerable SniffNodes => _nodePool - .CreateView(LazyAuditable) - .ToList() - .OrderBy(n => _productRegistration.SniffOrder(n)); - - public override bool SniffsOnConnectionFailure => - !RequestDisabledSniff - && _nodePool.SupportsReseeding && _settings.SniffsOnConnectionFault; - - public override bool SniffsOnStaleCluster => - !RequestDisabledSniff - && _nodePool.SupportsReseeding && _settings.SniffInformationLifeSpan.HasValue; - - public override bool StaleClusterState - { - get - { - if (!SniffsOnStaleCluster) return false; - - // ReSharper disable once PossibleInvalidOperationException - // already checked by SniffsOnStaleCluster - var sniffLifeSpan = _settings.SniffInformationLifeSpan.Value; - - var now = _dateTimeProvider.Now(); - var lastSniff = _nodePool.LastUpdate; - - return sniffLifeSpan < now - lastSniff; - } - } - - public override DateTimeOffset StartedOn { get; } - - private TimeSpan PingTimeout => _requestData.PingTimeout; - - private bool RequestDisabledSniff => _requestData.DisableSniff; - - private TimeSpan RequestTimeout => _requestData.RequestTimeout; - - public override void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose(); - - public override void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) - { - if (response == null) - { - //make sure we copy over the error body in case we disabled direct streaming. - var s = callDetails?.ResponseBodyInBytes == null ? Stream.Null : _memoryStreamFactory.Create(callDetails.ResponseBodyInBytes); - var m = callDetails?.ResponseMimeType ?? RequestData.DefaultMimeType; - response = _responseBuilder.ToResponse(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null); - } - - response.ApiCallDetails.AuditTrail = AuditTrail; - } - - public override TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) - => CallProductEndpointCoreAsync(false, endpoint, requestData, postData).EnsureCompleted(); - - public override Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) - => CallProductEndpointCoreAsync(true, endpoint, requestData, postData, cancellationToken).AsTask(); - - private async ValueTask CallProductEndpointCoreAsync(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) - where TResponse : TransportResponse, new() - { - using var audit = Audit(HealthyResponse, endpoint.Node); - - if (audit is not null) - audit.PathAndQuery = endpoint.PathAndQuery; - - try - { - TResponse response; - - if (isAsync) - response = await _requestInvoker.RequestAsync(endpoint, requestData, postData, cancellationToken).ConfigureAwait(false); - else - response = _requestInvoker.Request(endpoint, requestData, postData); - - response.ApiCallDetails.AuditTrail = AuditTrail; - - ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails, response); - - if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType && audit is not null) - { - var @event = response.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; - audit.Event = @event; - } - - return response; - } - catch (Exception e) when (audit is not null) - { - var @event = e is TransportException t && t.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; - audit.Event = @event; - audit.Exception = e; - throw; - } - } - - public override TransportException? CreateClientException( - TResponse response, - ApiCallDetails? callDetails, - Endpoint endpoint, - RequestData data, - List? seenExceptions - ) - { - if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null; - - var pipelineFailure = callDetails?.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; - var innerException = callDetails?.OriginalException; - if (seenExceptions is not null && seenExceptions.HasAny(out var exs)) - { - pipelineFailure = exs.Last().FailureReason; - innerException = exs.AsAggregateOrFirst(); - } - - var statusCode = callDetails?.HttpStatusCode != null ? callDetails.HttpStatusCode.Value.ToString() : "unknown"; - var resource = callDetails == null - ? "unknown resource" - : $"Status code {statusCode} from: {callDetails.HttpMethod} {callDetails.Uri.PathAndQuery}"; - - var exceptionMessage = innerException?.Message ?? "Request failed to execute"; - - if (IsTakingTooLong) - { - pipelineFailure = PipelineFailure.MaxTimeoutReached; - Audit(MaxTimeoutReached); - exceptionMessage = "Maximum timeout reached while retrying request"; - } - else if (Retried >= MaxRetries && MaxRetries > 0) - { - pipelineFailure = PipelineFailure.MaxRetriesReached; - Audit(MaxRetriesReached); - exceptionMessage = "Maximum number of retries reached"; - - var now = _dateTimeProvider.Now(); - var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now); - if (Retried >= activeNodes) - { - Audit(FailedOverAllNodes); - exceptionMessage += ", failed over to all the known alive nodes before failing"; - } - } - - exceptionMessage += !exceptionMessage.EndsWith(".", StringComparison.Ordinal) ? $". Call: {resource}" : $" Call: {resource}"; - if (response != null && _productRegistration.TryGetServerErrorReason(response, out var reason)) - exceptionMessage += $". ServerError: {reason}"; - - var clientException = new TransportException(pipelineFailure, exceptionMessage, innerException) - { - Endpoint = endpoint, - ApiCallDetails = callDetails, - AuditTrail = AuditTrail - }; - - return clientException; - } - - public override void FirstPoolUsage(SemaphoreSlim semaphore) - { - if (!FirstPoolUsageNeedsSniffing) return; - - if (!semaphore.Wait(RequestTimeout)) - { - if (FirstPoolUsageNeedsSniffing) - throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); - - return; - } - - if (!FirstPoolUsageNeedsSniffing) - { - semaphore.Release(); - return; - } - - try - { - using (Audit(SniffOnStartup)) - { - Sniff(); - _nodePool.MarkAsSniffed(); - } - } - finally - { - semaphore.Release(); - } - } - - public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken) - { - if (!FirstPoolUsageNeedsSniffing) return; - - // TODO cancellationToken could throw here and will bubble out as OperationCancelledException - // everywhere else it would bubble out wrapped in a `UnexpectedTransportException` - var success = await semaphore.WaitAsync(RequestTimeout, cancellationToken).ConfigureAwait(false); - if (!success) - { - if (FirstPoolUsageNeedsSniffing) - throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); - - return; - } - - if (!FirstPoolUsageNeedsSniffing) - { - semaphore.Release(); - return; - } - try - { - using (Audit(SniffOnStartup)) - { - await SniffAsync(cancellationToken).ConfigureAwait(false); - _nodePool.MarkAsSniffed(); - } - } - finally - { - semaphore.Release(); - } - } - - public override void MarkAlive(Node node) => node.MarkAlive(); - - public override void MarkDead(Node node) - { - var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); - node.MarkDead(deadUntil); - Retried++; - } - - /// - public override bool TryGetSingleNode(out Node node) - { - if (_nodePool.Nodes.Count <= 1 && _nodePool.MaxRetries <= _nodePool.Nodes.Count && - !_nodePool.SupportsPinging && !_nodePool.SupportsReseeding) - { - node = _nodePool.Nodes.FirstOrDefault(); - - if (node is not null && _nodePredicate(node)) return true; - } - - node = null; - return false; - } - - public override IEnumerable NextNode() - { - if (_requestData.ForceNode != null) - { - yield return new Node(_requestData.ForceNode); - - yield break; - } - - //This for loop allows to break out of the view state machine if we need to - //force a refresh (after reseeding node pool). We have a hardcoded limit of only - //allowing 100 of these refreshes per call - var refreshed = false; - for (var i = 0; i < 100; i++) - { - if (DepletedRetries) yield break; - - foreach (var node in _nodePool.CreateView(LazyAuditable)) - { - if (DepletedRetries) break; - - if (!_nodePredicate(node)) continue; - - yield return node; - - if (!Refresh) continue; - - Refresh = false; - refreshed = true; - break; - } - //unless a refresh was requested we will not iterate over more then a single view. - //keep in mind refreshes are also still bound to overall maxretry count/timeout. - if (!refreshed) break; - } - } - - public override void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted(); - - public override Task PingAsync(Node node, CancellationToken cancellationToken = default) - => PingCoreAsync(true, node, cancellationToken).AsTask(); - - public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default) - { - if (!_productRegistration.SupportsPing) return; - if (PingDisabled(node)) return; - - var pingEndpoint = _productRegistration.CreatePingEndpoint(node, PingAndSniffRequestConfiguration); - - using var audit = Audit(PingSuccess, node); - - if (audit is not null) - audit.PathAndQuery = pingEndpoint.PathAndQuery; - - TransportResponse response; - - try - { - if (isAsync) - response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, _requestData, cancellationToken).ConfigureAwait(false); - else - response = _productRegistration.Ping(_requestInvoker, pingEndpoint, _requestData); - - ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails); - - //ping should not silently accept bad but valid http responses - if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) - { - var pipelineFailure = response.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; - throw new PipelineException(pipelineFailure, response.ApiCallDetails.OriginalException) { Response = response }; - } - } - catch (Exception e) - { - response = (e as PipelineException)?.Response; - if (audit is not null) - { - audit.Event = PingFailure; - audit.Exception = e; - } - throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response }; - } - } - - public override void Sniff() => SniffCoreAsync(false).EnsureCompleted(); - - public override Task SniffAsync(CancellationToken cancellationToken = default) - => SniffCoreAsync(true, cancellationToken).AsTask(); - - public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default) - { - if (!_productRegistration.SupportsSniff) return; - - var exceptions = new List(); - - foreach (var node in SniffNodes) - { - var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings); - //TODO remove - var requestData = new RequestData(_settings, null, null); - - using var audit = Audit(SniffSuccess, node); - - if (audit is not null) - audit.PathAndQuery = sniffEndpoint.PathAndQuery; - - Tuple> result; - - try - { - if (isAsync) - result = await _productRegistration - .SniffAsync(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, requestData, cancellationToken) - .ConfigureAwait(false); - else - result = _productRegistration - .Sniff(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, requestData); - - ThrowBadAuthPipelineExceptionWhenNeeded(result.Item1.ApiCallDetails); - - //sniff should not silently accept bad but valid http responses - if (!result.Item1.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) - { - var pipelineFailure = result.Item1.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; - throw new PipelineException(pipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 }; - } - - _nodePool.Reseed(result.Item2); - Refresh = true; - - return; - } - catch (Exception e) - { - if (audit is not null) - { - audit.Event = SniffFailure; - audit.Exception = e; - } - exceptions.Add(e); - } - - throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst()); - } - } - - public override void SniffOnConnectionFailure() - { - if (!SniffsOnConnectionFailure) return; - - using (Audit(SniffOnFail)) - Sniff(); - } - - public override async Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken) - { - if (!SniffsOnConnectionFailure) return; - - using (Audit(SniffOnFail)) - await SniffAsync(cancellationToken).ConfigureAwait(false); - } - - public override void SniffOnStaleCluster() - { - if (!StaleClusterState) return; - - using (Audit(AuditEvent.SniffOnStaleCluster)) - { - Sniff(); - _nodePool.MarkAsSniffed(); - } - } - - public override async Task SniffOnStaleClusterAsync(CancellationToken cancellationToken) - { - if (!StaleClusterState) return; - - using (Audit(AuditEvent.SniffOnStaleCluster)) - { - await SniffAsync(cancellationToken).ConfigureAwait(false); - _nodePool.MarkAsSniffed(); - } - } - - public override void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions) - { - var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage, (Exception)null); - using (Audit(NoNodesAttempted)) - throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = AuditTrail }; - } - - private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected; - - private Auditable? Audit(AuditEvent type, Node node = null) => - !_settings.DisableAuditTrail ?? true ? new(type, ref _auditTrail, _dateTimeProvider, node) : null; - - private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse response = null) - { - if (details?.HttpStatusCode == 401) - throw new PipelineException(PipelineFailure.BadAuthentication, details.OriginalException) { Response = response }; - } - - private void LazyAuditable(AuditEvent e, Node n) - { - using (new Auditable(e, ref _auditTrail, _dateTimeProvider, n)) { } - } -} -#pragma warning restore 1591 diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs index 37f92e2..ca68411 100644 --- a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs +++ b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport; /// -/// A pipeline exception is throw when ever a known failing exit point is reached in +/// A pipeline exception is throw when ever a known failing exit point is reached in /// See for known exits points /// public class PipelineException : Exception diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs index 4312188..7252d6e 100644 --- a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs +++ b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport; /// -/// A failure in 's workflow that caused it to end prematurely. +/// A failure in 's workflow that caused it to end prematurely. /// public enum PipelineFailure { @@ -43,7 +43,7 @@ public enum PipelineFailure MaxRetriesReached, /// - /// An exception occurred during that could not be handled + /// An exception occurred during that could not be handled /// Unexpected, diff --git a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs index 99343b3..ac379e9 100644 --- a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs +++ b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs @@ -4,129 +4,568 @@ using System; using System.Collections.Generic; +using System.IO; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Elastic.Transport.Diagnostics.Auditing; +using Elastic.Transport.Extensions; +using Elastic.Transport.Products; +using static Elastic.Transport.Diagnostics.Auditing.AuditEvent; namespace Elastic.Transport; -/// /// Models the workflow of a request to multiple nodes -/// -public abstract class RequestPipeline : IDisposable +public class RequestPipeline { - private bool _disposed; + private readonly IRequestInvoker _requestInvoker; + private readonly NodePool _nodePool; + private readonly RequestData _requestData; + private readonly DateTimeProvider _dateTimeProvider; + private readonly MemoryStreamFactory _memoryStreamFactory; + private readonly Func _nodePredicate; + private readonly ProductRegistration _productRegistration; + private readonly ResponseBuilder _responseBuilder; + + private RequestConfiguration? _pingAndSniffRequestConfiguration; + //private List? _auditTrail; + private readonly ITransportConfiguration _settings; + + /// + internal RequestPipeline(RequestData requestData) + { + _requestData = requestData; + _settings = requestData.ConnectionSettings; + + _nodePool = requestData.ConnectionSettings.NodePool; + _requestInvoker = requestData.ConnectionSettings.Connection; + _dateTimeProvider = requestData.ConnectionSettings.DateTimeProvider; + _memoryStreamFactory = requestData.MemoryStreamFactory; + _productRegistration = requestData.ConnectionSettings.ProductRegistration; + _responseBuilder = _productRegistration.ResponseBuilder; + _nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate; + } - internal RequestPipeline() { } + /// A list of events + //public IEnumerable? AuditTrail => _auditTrail; - /// - /// An audit trail that can be used for logging and debugging purposes. Giving insights into how - /// the request made its way through the workflow - /// - public abstract IEnumerable AuditTrail { get; } + private RequestConfiguration PingAndSniffRequestConfiguration + { + // Lazily loaded when first required, since not all node pools and configurations support pinging and sniffing. + // This avoids allocating 192B per request for those which do not need to ping or sniff. + get + { + if (_pingAndSniffRequestConfiguration is not null) return _pingAndSniffRequestConfiguration; + + _pingAndSniffRequestConfiguration = new RequestConfiguration + { + PingTimeout = PingTimeout, + RequestTimeout = PingTimeout, + Authentication = _requestData.AuthenticationHeader, + EnableHttpPipelining = _requestData.HttpPipeliningEnabled, + ForceNode = _requestData.ForceNode + }; + + return _pingAndSniffRequestConfiguration; + } + } - /// - /// Should the workflow attempt the initial sniff as requested by - /// - /// - public abstract bool FirstPoolUsageNeedsSniffing { get; } + private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn); - //TODO xmldocs -#pragma warning disable 1591 - public abstract bool IsTakingTooLong { get; } + private bool FirstPoolUsageNeedsSniffing => + !RequestDisabledSniff + && _nodePool.SupportsReseeding && _settings.SniffsOnStartup && !_nodePool.SniffedOnStartup; - public abstract int MaxRetries { get; } + private bool IsTakingTooLong(DateTimeOffset startedOn) + { + var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout); + var now = _dateTimeProvider.Now(); + + //we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort. + var margin = timeout.TotalMilliseconds / 100.0 * 98; + var marginTimeSpan = TimeSpan.FromMilliseconds(margin); + var timespanCall = now - startedOn; + var tookToLong = timespanCall >= marginTimeSpan; + return tookToLong; + } - public abstract bool SniffsOnConnectionFailure { get; } + private int MaxRetries => _requestData.MaxRetries; - public abstract bool SniffsOnStaleCluster { get; } + private bool Refresh { get; set; } - public abstract bool StaleClusterState { get; } + private int Retried { get; set; } - public abstract DateTimeOffset StartedOn { get; } + private IEnumerable SniffNodes(Auditor? auditor) => _nodePool + .CreateView(auditor) + .ToList() + .OrderBy(n => _productRegistration.SniffOrder(n)); - public abstract TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) - where TResponse : TransportResponse, new(); + private bool SniffsOnConnectionFailure => + !RequestDisabledSniff + && _nodePool.SupportsReseeding && _settings.SniffsOnConnectionFault; - public abstract Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken) - where TResponse : TransportResponse, new(); + private bool SniffsOnStaleCluster => + !RequestDisabledSniff + && _nodePool.SupportsReseeding && _settings.SniffInformationLifeSpan.HasValue; - public abstract void MarkAlive(Node node); + private bool StaleClusterState + { + get + { + if (!SniffsOnStaleCluster) return false; - public abstract void MarkDead(Node node); + // ReSharper disable once PossibleInvalidOperationException + // already checked by SniffsOnStaleCluster + var sniffLifeSpan = _settings.SniffInformationLifeSpan.Value; - /// - /// Attempt to get a single node when the underlying connection pool contains only one node. - /// - /// This provides an optimised path for single node pools by avoiding an Enumerator on each call. - /// - /// - /// - /// true when a single node exists which has been set on the . - public abstract bool TryGetSingleNode(out Node node); + var now = _dateTimeProvider.Now(); + var lastSniff = _nodePool.LastUpdate; - public abstract IEnumerable NextNode(); + return sniffLifeSpan < now - lastSniff; + } + } - public abstract void Ping(Node node); + private TimeSpan PingTimeout => _requestData.PingTimeout; - public abstract Task PingAsync(Node node, CancellationToken cancellationToken); + private bool RequestDisabledSniff => _requestData.DisableSniff; - public abstract void FirstPoolUsage(SemaphoreSlim semaphore); + private TimeSpan RequestTimeout => _requestData.RequestTimeout; - public abstract Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken); + /// Emit event + public void AuditCancellationRequested(Auditor? auditor) => auditor?.Emit(CancellationRequested); - public abstract void Sniff(); + /// Ensures a response is returned with + public void BadResponse( + ref TResponse? response, + ApiCallDetails? callDetails, + Endpoint endpoint, + RequestData data, + PostData? postData, + TransportException exception, + IReadOnlyCollection? auditTrail + ) + where TResponse : TransportResponse, new() + { + if (response == null) + { + //make sure we copy over the error body in case we disabled direct streaming. + var s = callDetails?.ResponseBodyInBytes == null ? Stream.Null : _memoryStreamFactory.Create(callDetails.ResponseBodyInBytes); + var m = callDetails?.ResponseMimeType ?? RequestData.DefaultMimeType; + response = _responseBuilder.ToResponse(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null); + } - public abstract Task SniffAsync(CancellationToken cancellationToken); + response.ApiCallDetails.AuditTrail = auditTrail; + } - public abstract void SniffOnStaleCluster(); + /// Call the product's API endpoint ensuring rich enough exceptions are thrown + public TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData, Auditor? auditor) + where TResponse : TransportResponse, new() + => CallProductEndpointCoreAsync(false, endpoint, requestData, postData, auditor).EnsureCompleted(); - public abstract Task SniffOnStaleClusterAsync(CancellationToken cancellationToken); + /// Call the product's API endpoint ensuring rich enough exceptions are thrown + public Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, Auditor? auditor, CancellationToken cancellationToken = default) + where TResponse : TransportResponse, new() + => CallProductEndpointCoreAsync(true, endpoint, requestData, postData, auditor, cancellationToken).AsTask(); - public abstract void SniffOnConnectionFailure(); + private async ValueTask CallProductEndpointCoreAsync( + bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, Auditor? auditor, CancellationToken cancellationToken = default) + where TResponse : TransportResponse, new() + { + using var audit = auditor?.Add(HealthyResponse, _dateTimeProvider, endpoint.Node); - public abstract Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken); + if (audit is not null) + audit.PathAndQuery = endpoint.PathAndQuery; - public abstract void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) - where TResponse : TransportResponse, new(); + try + { + TResponse response; - public abstract void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions); + if (isAsync) + response = await _requestInvoker.RequestAsync(endpoint, requestData, postData, cancellationToken).ConfigureAwait(false); + else + response = _requestInvoker.Request(endpoint, requestData, postData); - public abstract void AuditCancellationRequested(); + response.ApiCallDetails.AuditTrail = auditor; - public abstract TransportException? CreateClientException(TResponse? response, ApiCallDetails? callDetails, - Endpoint endpoint, RequestData data, List? seenExceptions) - where TResponse : TransportResponse, new(); -#pragma warning restore 1591 + ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails, response); - /// - /// - /// - public void Dispose() + if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType && audit is not null) + { + var @event = response.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; + audit.Event = @event; + } + + return response; + } + catch (Exception e) when (audit is not null) + { + var @event = e is TransportException t && t.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; + audit.Event = @event; + audit.Exception = e; + throw; + } + } + + /// Create a rich enough + public TransportException? CreateClientException( + TResponse response, + ApiCallDetails? callDetails, + Endpoint endpoint, + Auditor? auditor, + DateTimeOffset startedOn, + List? seenExceptions + ) + where TResponse : TransportResponse, new() { - Dispose(disposing: true); - GC.SuppressFinalize(this); + if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null; + + var pipelineFailure = callDetails?.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; + var innerException = callDetails?.OriginalException; + if (seenExceptions is not null && seenExceptions.HasAny(out var exs)) + { + pipelineFailure = exs.Last().FailureReason; + innerException = exs.AsAggregateOrFirst(); + } + + var statusCode = callDetails?.HttpStatusCode != null ? callDetails.HttpStatusCode.Value.ToString() : "unknown"; + var resource = callDetails == null + ? "unknown resource" + : $"Status code {statusCode} from: {callDetails.HttpMethod} {callDetails.Uri.PathAndQuery}"; + + var exceptionMessage = innerException?.Message ?? "Request failed to execute"; + + if (IsTakingTooLong(startedOn)) + { + pipelineFailure = PipelineFailure.MaxTimeoutReached; + auditor?.Emit(MaxTimeoutReached); + exceptionMessage = "Maximum timeout reached while retrying request"; + } + else if (Retried >= MaxRetries && MaxRetries > 0) + { + pipelineFailure = PipelineFailure.MaxRetriesReached; + auditor?.Emit(MaxRetriesReached); + exceptionMessage = "Maximum number of retries reached"; + + var now = _dateTimeProvider.Now(); + var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now); + if (Retried >= activeNodes) + { + auditor?.Emit(FailedOverAllNodes); + exceptionMessage += ", failed over to all the known alive nodes before failing"; + } + } + + exceptionMessage += !exceptionMessage.EndsWith(".", StringComparison.Ordinal) ? $". Call: {resource}" : $" Call: {resource}"; + if (response != null && _productRegistration.TryGetServerErrorReason(response, out var reason)) + exceptionMessage += $". ServerError: {reason}"; + + var clientException = new TransportException(pipelineFailure, exceptionMessage, innerException) + { + Endpoint = endpoint, + ApiCallDetails = callDetails, + AuditTrail = auditor + }; + + return clientException; + } + + /// Routine for the first call into the product, potentially sniffing to discover the network topology + public void FirstPoolUsage(SemaphoreSlim semaphore, Auditor? auditor) + { + if (!FirstPoolUsageNeedsSniffing) return; + + if (!semaphore.Wait(RequestTimeout)) + { + if (FirstPoolUsageNeedsSniffing) + throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); + + return; + } + + if (!FirstPoolUsageNeedsSniffing) + { + semaphore.Release(); + return; + } + + try + { + using (auditor?.Add(SniffOnStartup, _dateTimeProvider)) + { + Sniff(auditor); + _nodePool.MarkAsSniffed(); + } + } + finally + { + semaphore.Release(); + } } - /// - /// - /// - /// - protected virtual void Dispose(bool disposing) + /// + public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, Auditor? auditor, CancellationToken cancellationToken) { - if (_disposed) + if (!FirstPoolUsageNeedsSniffing) return; + + // TODO cancellationToken could throw here and will bubble out as OperationCancelledException + // everywhere else it would bubble out wrapped in a `UnexpectedTransportException` + var success = await semaphore.WaitAsync(RequestTimeout, cancellationToken).ConfigureAwait(false); + if (!success) + { + if (FirstPoolUsageNeedsSniffing) + throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); + return; + } - if (disposing) + if (!FirstPoolUsageNeedsSniffing) { - DisposeManagedResources(); + semaphore.Release(); + return; } + try + { + using (auditor?.Add(SniffOnStartup, _dateTimeProvider)) + { + await SniffAsync(auditor, cancellationToken).ConfigureAwait(false); + _nodePool.MarkAsSniffed(); + } + } + finally + { + semaphore.Release(); + } + } - _disposed = true; + /// Mark as alive putting it back in rotation. + public void MarkAlive(Node node) => node.MarkAlive(); + + /// Mark as dead, taking it out of rotation. + public void MarkDead(Node node) + { + var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); + node.MarkDead(deadUntil); + Retried++; } - /// - /// - /// - protected virtual void DisposeManagedResources() { } + /// Fast path for if only a single node could ever be yielded this save an IEnumerator allocation + public bool TryGetSingleNode(out Node? node) + { + if (_nodePool.Nodes.Count <= 1 && _nodePool.MaxRetries <= _nodePool.Nodes.Count && + !_nodePool.SupportsPinging && !_nodePool.SupportsReseeding) + { + node = _nodePool.Nodes.FirstOrDefault(); + + if (node is not null && _nodePredicate(node)) return true; + } + + node = null; + return false; + } + + /// returns a consistent enumerable view into the available nodes + public IEnumerable NextNode(DateTimeOffset startedOn, Auditor? auditor) + { + if (_requestData.ForceNode != null) + { + yield return new Node(_requestData.ForceNode); + + yield break; + } + + //This for loop allows to break out of the view state machine if we need to + //force a refresh (after reseeding node pool). We have a hardcoded limit of only + //allowing 100 of these refreshes per call + var refreshed = false; + for (var i = 0; i < 100; i++) + { + if (DepletedRetries(startedOn)) yield break; + + foreach (var node in _nodePool.CreateView(auditor)) + { + if (DepletedRetries(startedOn)) break; + + if (!_nodePredicate(node)) continue; + + yield return node; + + if (!Refresh) continue; + + Refresh = false; + refreshed = true; + break; + } + //unless a refresh was requested we will not iterate over more then a single view. + //keep in mind refreshes are also still bound to overall maxretry count/timeout. + if (!refreshed) break; + } + } + + /// ping as a fast path ensuring its alive + public void Ping(Node node, Auditor? auditor) => PingCoreAsync(false, node, auditor).EnsureCompleted(); + + /// ping as a fast path ensuring its alive + public Task PingAsync(Node node, Auditor? auditor, CancellationToken cancellationToken = default) + => PingCoreAsync(true, node, auditor, cancellationToken).AsTask(); + + private async ValueTask PingCoreAsync(bool isAsync, Node node, Auditor? auditor, CancellationToken cancellationToken = default) + { + if (!_productRegistration.SupportsPing) return; + if (PingDisabled(node)) return; + + var pingEndpoint = _productRegistration.CreatePingEndpoint(node, PingAndSniffRequestConfiguration); + + using var audit = auditor?.Add(PingSuccess, _dateTimeProvider, node); + + if (audit is not null) + audit.PathAndQuery = pingEndpoint.PathAndQuery; + + TransportResponse response; + + try + { + if (isAsync) + response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, _requestData, cancellationToken).ConfigureAwait(false); + else + response = _productRegistration.Ping(_requestInvoker, pingEndpoint, _requestData); + + ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails); + + //ping should not silently accept bad but valid http responses + if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) + { + var pipelineFailure = response.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; + throw new PipelineException(pipelineFailure, response.ApiCallDetails.OriginalException) { Response = response }; + } + } + catch (Exception e) + { + response = (e as PipelineException)?.Response; + if (audit is not null) + { + audit.Event = PingFailure; + audit.Exception = e; + } + throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response }; + } + } + + /// Discover the products network topology to yield all available nodes + public void Sniff(Auditor? auditor) => SniffCoreAsync(false, auditor).EnsureCompleted(); + + /// Discover the products network topology to yield all available nodes + public Task SniffAsync(Auditor? auditor, CancellationToken cancellationToken = default) + => SniffCoreAsync(true, auditor, cancellationToken).AsTask(); + + private async ValueTask SniffCoreAsync(bool isAsync, Auditor? auditor, CancellationToken cancellationToken = default) + { + if (!_productRegistration.SupportsSniff) return; + + var exceptions = new List(); + + foreach (var node in SniffNodes(auditor)) + { + var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings); + using var audit = auditor?.Add(SniffSuccess, _dateTimeProvider, node); + + if (audit is not null) + audit.PathAndQuery = sniffEndpoint.PathAndQuery; + + Tuple> result; + + try + { + if (isAsync) + result = await _productRegistration + .SniffAsync(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, _requestData, cancellationToken) + .ConfigureAwait(false); + else + result = _productRegistration + .Sniff(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, _requestData); + + ThrowBadAuthPipelineExceptionWhenNeeded(result.Item1.ApiCallDetails); + + //sniff should not silently accept bad but valid http responses + if (!result.Item1.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) + { + var pipelineFailure = result.Item1.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; + throw new PipelineException(pipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 }; + } + + _nodePool.Reseed(result.Item2); + Refresh = true; + + return; + } + catch (Exception e) + { + if (audit is not null) + { + audit.Event = SniffFailure; + audit.Exception = e; + } + exceptions.Add(e); + } + + throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst()); + } + } + + /// sniff the topology when a connection failure happens + public void SniffOnConnectionFailure(Auditor? auditor) + { + if (!SniffsOnConnectionFailure) return; + + using (auditor?.Add(SniffOnFail, _dateTimeProvider)) + Sniff(auditor); + } + + /// sniff the topology when a connection failure happens + public async Task SniffOnConnectionFailureAsync(Auditor? auditor, CancellationToken cancellationToken) + { + if (!SniffsOnConnectionFailure) return; + + using (auditor?.Add(SniffOnFail, _dateTimeProvider)) + await SniffAsync(auditor, cancellationToken).ConfigureAwait(false); + } + + /// sniff the topology after a set period to ensure it's up to date + public void SniffOnStaleCluster(Auditor? auditor) + { + if (!StaleClusterState) return; + + using (auditor?.Add(AuditEvent.SniffOnStaleCluster, _dateTimeProvider)) + { + Sniff(auditor); + _nodePool.MarkAsSniffed(); + } + } + + /// sniff the topology after a set period to ensure its up to date + public async Task SniffOnStaleClusterAsync(Auditor? auditor, CancellationToken cancellationToken) + { + if (!StaleClusterState) return; + + using (auditor?.Add(AuditEvent.SniffOnStaleCluster, _dateTimeProvider)) + { + await SniffAsync(auditor, cancellationToken).ConfigureAwait(false); + _nodePool.MarkAsSniffed(); + } + } + + /// emit event in case no nodes were available + public void ThrowNoNodesAttempted(Endpoint endpoint, Auditor? auditor, List? seenExceptions) + { + var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage); + using (auditor?.Add(NoNodesAttempted, _dateTimeProvider)) + throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = auditor }; + } + + private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected; + + private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse? response = null) + { + if (details.HttpStatusCode == 401) + throw new PipelineException(PipelineFailure.BadAuthentication, details.OriginalException) { Response = response }; + } } diff --git a/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs b/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs index 0d137b3..399381d 100644 --- a/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs +++ b/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs @@ -5,13 +5,14 @@ namespace Elastic.Transport; /// -/// The default implementation for that returns +/// The default implementation for that returns /// internal sealed class DefaultRequestPipelineFactory : RequestPipelineFactory { + public static readonly DefaultRequestPipelineFactory Default = new (); /// - /// returns instances of + /// returns instances of /// - public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) => - new DefaultRequestPipeline(requestData, dateTimeProvider); + public override RequestPipeline Create(RequestData requestData) => + new RequestPipeline(requestData); } diff --git a/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs b/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs index ff4ef38..ee0d6ff 100644 --- a/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs +++ b/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs @@ -10,5 +10,5 @@ public abstract class RequestPipelineFactory internal RequestPipelineFactory() { } /// Create an instance of - public abstract RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider); + public abstract RequestPipeline Create(RequestData requestData); } diff --git a/src/Elastic.Transport/Configuration/HeadersList.cs b/src/Elastic.Transport/Configuration/HeadersList.cs index a08a889..a01905d 100644 --- a/src/Elastic.Transport/Configuration/HeadersList.cs +++ b/src/Elastic.Transport/Configuration/HeadersList.cs @@ -6,6 +6,7 @@ using System.Collections; using System.Collections.Generic; using System.Linq; +using Elastic.Transport.Extensions; namespace Elastic.Transport; @@ -89,18 +90,4 @@ private void AddToHeaders(HeadersList? headers) IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - internal struct EmptyEnumerator : IEnumerator - { - public T Current => default; - object IEnumerator.Current => Current; - public bool MoveNext() => false; - - public void Reset() - { - } - - public void Dispose() - { - } - } } diff --git a/src/Elastic.Transport/Configuration/ITransportConfiguration.cs b/src/Elastic.Transport/Configuration/ITransportConfiguration.cs index 0cd334b..bf875c0 100644 --- a/src/Elastic.Transport/Configuration/ITransportConfiguration.cs +++ b/src/Elastic.Transport/Configuration/ITransportConfiguration.cs @@ -46,6 +46,13 @@ public interface ITransportConfiguration : IRequestConfiguration, IDisposable /// ProductRegistration ProductRegistration { get; } + /// Allows you to wrap calls to , mainly for testing purposes to not have to rely + /// on the wall clock + DateTimeProvider DateTimeProvider { get; } + + /// In charge of create a new + RequestPipelineFactory PipelineProvider { get; } + /// /// The time to put dead nodes out of rotation (this will be multiplied by the number of times they've been dead) /// diff --git a/src/Elastic.Transport/Configuration/TransportConfiguration.cs b/src/Elastic.Transport/Configuration/TransportConfiguration.cs index db9c150..72ef3be 100644 --- a/src/Elastic.Transport/Configuration/TransportConfiguration.cs +++ b/src/Elastic.Transport/Configuration/TransportConfiguration.cs @@ -88,9 +88,14 @@ public TransportConfiguration( NodePool = nodePool; ProductRegistration = productRegistration ?? DefaultProductRegistration.Default; Connection = invoker ?? new HttpRequestInvoker(); - Accept = productRegistration?.DefaultMimeType; RequestResponseSerializer = serializer ?? new LowLevelRequestResponseSerializer(); + DateTimeProvider = nodePool.DateTimeProvider; + MetaHeaderProvider = productRegistration?.MetaHeaderProvider; + UrlFormatter = new UrlFormatter(this); + + PipelineProvider = DefaultRequestPipelineFactory.Default; + Accept = productRegistration?.DefaultMimeType; ConnectionLimit = DefaultConnectionLimit; DnsRefreshTimeout = DefaultDnsRefreshTimeout; MemoryStreamFactory = DefaultMemoryStreamFactory; @@ -98,8 +103,6 @@ public TransportConfiguration( SniffsOnStartup = true; SniffInformationLifeSpan = TimeSpan.FromHours(1); - MetaHeaderProvider = productRegistration?.MetaHeaderProvider; - UrlFormatter = new UrlFormatter(this); StatusCodeToResponseSuccess = ProductRegistration.HttpStatusCodeClassifier; UserAgent = UserAgent.Create(ProductRegistration.Name, ProductRegistration.GetType()); @@ -118,6 +121,10 @@ public TransportConfiguration(ITransportConfiguration config) if (config is null) throw new ArgumentNullException(nameof(config)); + // it's important url formatter is repointed to the new instance of ITransportConfiguration + UrlFormatter = new UrlFormatter(this); + + Accept = config.Accept; AllowedStatusCodes = config.AllowedStatusCodes; Authentication = config.Authentication; @@ -127,6 +134,7 @@ public TransportConfiguration(ITransportConfiguration config) Connection = config.Connection; ConnectionLimit = config.ConnectionLimit; ContentType = config.ContentType; + DateTimeProvider = config.DateTimeProvider; DeadTimeout = config.DeadTimeout; DisableAuditTrail = config.DisableAuditTrail; DisableAutomaticProxyDetection = config.DisableAutomaticProxyDetection; @@ -154,6 +162,7 @@ public TransportConfiguration(ITransportConfiguration config) OpaqueId = config.OpaqueId; ParseAllHeaders = config.ParseAllHeaders; PingTimeout = config.PingTimeout; + PipelineProvider = config.PipelineProvider; PrettyJson = config.PrettyJson; ProductRegistration = config.ProductRegistration; ProxyAddress = config.ProxyAddress; @@ -173,7 +182,6 @@ public TransportConfiguration(ITransportConfiguration config) StatusCodeToResponseSuccess = config.StatusCodeToResponseSuccess; ThrowExceptions = config.ThrowExceptions; TransferEncodingChunked = config.TransferEncodingChunked; - UrlFormatter = config.UrlFormatter; UserAgent = config.UserAgent; } @@ -203,7 +211,12 @@ public virtual bool DebugMode public IRequestInvoker Connection { get; } /// public Serializer RequestResponseSerializer { get; } + /// + public DateTimeProvider DateTimeProvider { get; } + + /// + public RequestPipelineFactory PipelineProvider { get; init; } /// // ReSharper disable UnusedAutoPropertyAccessor.Global diff --git a/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs b/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs index 73b2fca..161a706 100644 --- a/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs +++ b/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs @@ -75,11 +75,14 @@ protected TransportConfigurationDescriptorBase(NodePool nodePool, IRequestInvoke _nodePool = nodePool; _productRegistration = productRegistration ?? DefaultProductRegistration.Default; _connection = requestInvoker ?? new HttpRequestInvoker(); - _accept = productRegistration?.DefaultMimeType; - _bootstrapLock = new(1, 1); - _requestResponseSerializer = requestResponseSerializer ?? new LowLevelRequestResponseSerializer(); + _pipelineProvider = DefaultRequestPipelineFactory.Default; + _dateTimeProvider = nodePool.DateTimeProvider; + _bootstrapLock = new(1, 1); + _metaHeaderProvider = productRegistration?.MetaHeaderProvider; + _urlFormatter = new UrlFormatter(this); + _accept = productRegistration?.DefaultMimeType; _connectionLimit = TransportConfiguration.DefaultConnectionLimit; _dnsRefreshTimeout = TransportConfiguration.DefaultDnsRefreshTimeout; _memoryStreamFactory = TransportConfiguration.DefaultMemoryStreamFactory; @@ -87,9 +90,6 @@ protected TransportConfigurationDescriptorBase(NodePool nodePool, IRequestInvoke _sniffsOnStartup = true; _sniffInformationLifeSpan = TimeSpan.FromHours(1); - _metaHeaderProvider = productRegistration?.MetaHeaderProvider; - - _urlFormatter = new UrlFormatter(this); _statusCodeToResponseSuccess = _productRegistration.HttpStatusCodeClassifier; _userAgent = Transport.UserAgent.Create(_productRegistration.Name, _productRegistration.GetType()); @@ -165,12 +165,18 @@ protected TransportConfigurationDescriptorBase(NodePool nodePool, IRequestInvoke private MetaHeaderProvider? _metaHeaderProvider; private HeadersList? _responseHeadersToParse; private bool? _parseAllHeaders; + private DateTimeProvider _dateTimeProvider; + private RequestPipelineFactory _pipelineProvider; SemaphoreSlim ITransportConfiguration.BootstrapLock => _bootstrapLock; IRequestInvoker ITransportConfiguration.Connection => _connection; int ITransportConfiguration.ConnectionLimit => _connectionLimit; NodePool ITransportConfiguration.NodePool => _nodePool; ProductRegistration ITransportConfiguration.ProductRegistration => _productRegistration; + + DateTimeProvider? ITransportConfiguration.DateTimeProvider => _dateTimeProvider; + RequestPipelineFactory? ITransportConfiguration.PipelineProvider => _pipelineProvider; + TimeSpan? ITransportConfiguration.DeadTimeout => _deadTimeout; bool ITransportConfiguration.DisableAutomaticProxyDetection => _disableAutomaticProxyDetection; TimeSpan? ITransportConfiguration.KeepAliveInterval => _keepAliveInterval; @@ -429,6 +435,9 @@ public T SkipDeserializationForStatusCodes(params int[] statusCodes) => /// public T MemoryStreamFactory(MemoryStreamFactory memoryStreamFactory) => Assign(memoryStreamFactory, static (a, v) => a._memoryStreamFactory = v); + /// > + public T PipelineProvider(RequestPipelineFactory provider) => Assign(provider, static (a, v) => a._pipelineProvider = v); + /// > public T EnableTcpStats(bool enableTcpStats = true) => Assign(enableTcpStats, static (a, v) => a._enableTcpStats = v); diff --git a/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs index f9dc741..6e99a1d 100644 --- a/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs @@ -8,7 +8,7 @@ namespace Elastic.Transport.Diagnostics; -/// Provides a typed listener to events that emits +/// Provides a typed listener to events that emits internal sealed class AuditDiagnosticObserver : TypedDiagnosticObserver { /// diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs b/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs index 53954ff..1059016 100644 --- a/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs +++ b/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs @@ -24,7 +24,7 @@ internal Audit(AuditEvent type, DateTimeOffset started) /// /// The node on which the request was made. /// - public Node Node { get; internal set; } + public Node? Node { get; internal init; } /// /// The path of the request. diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs b/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs index a0c40f2..bd8e757 100644 --- a/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs +++ b/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs @@ -3,46 +3,82 @@ // See the LICENSE file in the project root for more information using System; +using System.Collections; using System.Collections.Generic; +using Elastic.Transport.Extensions; namespace Elastic.Transport.Diagnostics.Auditing; +/// Collects events +public class Auditor : IReadOnlyCollection +{ + private readonly DateTimeProvider _dateTimeProvider; + private List? _audits; + + internal Auditor(DateTimeProvider dateTimeProvider) => _dateTimeProvider = dateTimeProvider; + + /// + public IEnumerator GetEnumerator() => + _audits?.GetEnumerator() ?? (IEnumerator)new EmptyEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + internal Auditable Add(Auditable auditable) + { + _audits ??= new List(); + _audits.Add(auditable.Audit); + return auditable; + } + internal Auditable Add(AuditEvent type, DateTimeProvider dateTimeProvider, Node? node = null) + { + _audits ??= new List(); + var auditable = new Auditable(type, dateTimeProvider, node); + _audits.Add(auditable.Audit); + return auditable; + } + + /// Emits an event that does not need to track a duration + public void Emit(AuditEvent type) => Add(type, _dateTimeProvider).Dispose(); + /// Emits an event that does not need to track a duration + public void Emit(AuditEvent type, Node node) => Add(type, _dateTimeProvider, node).Dispose(); + + /// + public int Count => _audits?.Count ?? 0; +} + internal class Auditable : IDisposable { private readonly Audit _audit; private readonly DateTimeProvider _dateTimeProvider; - public Auditable(AuditEvent type, ref List auditTrail, DateTimeProvider dateTimeProvider, Node node) + public Auditable(AuditEvent type, DateTimeProvider dateTimeProvider, Node? node) { - auditTrail ??= new List(); - _dateTimeProvider = dateTimeProvider; var started = _dateTimeProvider.Now(); - _audit = new Audit(type, started) { Node = node }; - - auditTrail.Add(_audit); } public AuditEvent Event { - set => _audit.Event = value; + set => Audit.Event = value; } public Exception Exception { - set => _audit.Exception = value; + set => Audit.Exception = value; } public string PathAndQuery { - set => _audit.PathAndQuery = value; + set => Audit.PathAndQuery = value; } - public void Dispose() => _audit.Ended = _dateTimeProvider.Now(); + public Audit Audit => _audit; + + public void Dispose() => Audit.Ended = _dateTimeProvider.Now(); } diff --git a/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs b/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs index 618f1e4..257d038 100644 --- a/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs +++ b/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs @@ -76,7 +76,7 @@ public class SerializerDiagnosticKeys : IDiagnosticsKeys } /// - /// Provides access to the string event names that emits + /// Provides access to the string event names that emits /// public class RequestPipelineDiagnosticKeys : IDiagnosticsKeys { @@ -97,7 +97,7 @@ public class RequestPipelineDiagnosticKeys : IDiagnosticsKeys /// /// Reference to the diagnostic source name that allows you to listen to all decisions that - /// makes. Events it emits are the names on + /// makes. Events it emits are the names on /// public class AuditDiagnosticKeys : IDiagnosticsKeys { diff --git a/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs index 77f15a1..dcac354 100644 --- a/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport.Diagnostics; -/// Provides a typed listener to actions that takes e.g sniff, ping, or making an API call ; +/// Provides a typed listener to actions that takes e.g sniff, ping, or making an API call ; internal sealed class RequestPipelineDiagnosticObserver : TypedDiagnosticObserver { /// diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index c24dfc0..8b45e06 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -10,6 +10,7 @@ using System.Threading; using System.Threading.Tasks; using Elastic.Transport.Diagnostics; +using Elastic.Transport.Diagnostics.Auditing; using Elastic.Transport.Extensions; using Elastic.Transport.Products; @@ -23,27 +24,12 @@ namespace Elastic.Transport; public sealed class DistributedTransport : DistributedTransport { /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different - /// nodes + /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on + /// different nodes /// - /// The configuration to use for this transport - public DistributedTransport(ITransportConfiguration configurationValues) : base(configurationValues, null, null) { } - - /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different - /// nodes - /// - /// The configuration to use for this transport - /// In charge of create a new pipeline, safe to pass null to use the default - /// The date time proved to use, safe to pass null to use the default - internal DistributedTransport( - ITransportConfiguration configurationValues, - RequestPipelineFactory? pipelineProvider = null, - DateTimeProvider? dateTimeProvider = null - ) - : base(configurationValues, pipelineProvider, dateTimeProvider) { } + /// The configuration to use for this transport + public DistributedTransport(ITransportConfiguration configuration) + : base(configuration) { } } /// @@ -53,36 +39,26 @@ public class DistributedTransport : ITransport private readonly ProductRegistration _productRegistration; /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different - /// nodes + /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on + /// different nodes /// - /// The configuration to use for this transport - /// In charge of create a new pipeline, safe to pass null to use the default - /// The date time proved to use, safe to pass null to use the default - public DistributedTransport( - TConfiguration configurationValues, - RequestPipelineFactory? pipelineProvider = null, - DateTimeProvider? dateTimeProvider = null - ) + /// The configuration to use for this transport + public DistributedTransport(TConfiguration configuration) { - configurationValues.ThrowIfNull(nameof(configurationValues)); - configurationValues.NodePool.ThrowIfNull(nameof(configurationValues.NodePool)); - configurationValues.Connection.ThrowIfNull(nameof(configurationValues.Connection)); - configurationValues.RequestResponseSerializer.ThrowIfNull(nameof(configurationValues - .RequestResponseSerializer)); - - _productRegistration = configurationValues.ProductRegistration; - Configuration = configurationValues; + configuration.ThrowIfNull(nameof(configuration)); + configuration.NodePool.ThrowIfNull(nameof(configuration.NodePool)); + configuration.Connection.ThrowIfNull(nameof(configuration.Connection)); + configuration.RequestResponseSerializer.ThrowIfNull(nameof(configuration.RequestResponseSerializer)); + + _productRegistration = configuration.ProductRegistration; + Configuration = configuration; + MemoryStreamFactory = configuration.MemoryStreamFactory; TransportRequestData = new RequestData(Configuration); - PipelineProvider = pipelineProvider ?? new DefaultRequestPipelineFactory(); - DateTimeProvider = dateTimeProvider ?? DefaultDateTimeProvider.Default; - MemoryStreamFactory = configurationValues.MemoryStreamFactory; + TransportPipeline = Configuration.PipelineProvider.Create(TransportRequestData); } - private DateTimeProvider DateTimeProvider { get; } + private RequestPipeline TransportPipeline { get; } private MemoryStreamFactory MemoryStreamFactory { get; } - private RequestPipelineFactory PipelineProvider { get; } private RequestData TransportRequestData { get; } /// @@ -141,12 +117,14 @@ private async ValueTask RequestCoreAsync( Configuration.OnRequestDataCreated?.Invoke(requestData); - using var pipeline = PipelineProvider.Create(requestData, DateTimeProvider); + var pipeline = requestData == TransportRequestData ? TransportPipeline :Configuration.PipelineProvider.Create(requestData); + var startedOn = Configuration.DateTimeProvider.Now(); + var auditor = Configuration.DisableAuditTrail.GetValueOrDefault(false) ? null : new Auditor(Configuration.DateTimeProvider); if (isAsync) - await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, cancellationToken).ConfigureAwait(false); + await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, auditor, cancellationToken).ConfigureAwait(false); else - pipeline.FirstPoolUsage(Configuration.BootstrapLock); + pipeline.FirstPoolUsage(Configuration.BootstrapLock, auditor); TResponse response = null; @@ -187,10 +165,10 @@ private async ValueTask RequestCoreAsync( try { if (isAsync) - response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, cancellationToken) + response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, auditor, cancellationToken) .ConfigureAwait(false); else - response = pipeline.CallProductEndpoint(endpoint, requestData, data); + response = pipeline.CallProductEndpoint(endpoint, requestData, data, auditor); } catch (PipelineException pipelineException) when (!pipelineException.Recoverable) { @@ -202,12 +180,12 @@ private async ValueTask RequestCoreAsync( } catch (Exception killerException) { - ThrowUnexpectedTransportException(killerException, seenExceptions, endpoint, response, pipeline); + ThrowUnexpectedTransportException(killerException, seenExceptions, endpoint, response, auditor); } } else { - foreach (var node in pipeline.NextNode()) + foreach (var node in pipeline.NextNode(startedOn, auditor)) { attemptedNodes++; endpoint = endpoint with { Node = node }; @@ -223,23 +201,23 @@ private async ValueTask RequestCoreAsync( if (_productRegistration.SupportsSniff) { if (isAsync) - await pipeline.SniffOnStaleClusterAsync(cancellationToken).ConfigureAwait(false); + await pipeline.SniffOnStaleClusterAsync(auditor, cancellationToken).ConfigureAwait(false); else - pipeline.SniffOnStaleCluster(); + pipeline.SniffOnStaleCluster(auditor); } if (_productRegistration.SupportsPing) { if (isAsync) - await PingAsync(pipeline, node, cancellationToken).ConfigureAwait(false); + await PingAsync(pipeline, node, auditor, cancellationToken).ConfigureAwait(false); else - Ping(pipeline, node); + Ping(pipeline, node, auditor); } if (isAsync) - response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, cancellationToken) + response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, auditor, cancellationToken) .ConfigureAwait(false); else - response = pipeline.CallProductEndpoint(endpoint, requestData, data); + response = pipeline.CallProductEndpoint(endpoint, requestData, data, auditor); if (!response.ApiCallDetails.SuccessOrKnownError) { @@ -248,9 +226,9 @@ private async ValueTask RequestCoreAsync( if (_productRegistration.SupportsSniff) { if (isAsync) - await pipeline.SniffOnConnectionFailureAsync(cancellationToken).ConfigureAwait(false); + await pipeline.SniffOnConnectionFailureAsync(auditor, cancellationToken).ConfigureAwait(false); else - pipeline.SniffOnConnectionFailure(); + pipeline.SniffOnConnectionFailure(auditor); } } } @@ -266,19 +244,19 @@ private async ValueTask RequestCoreAsync( catch (Exception killerException) { if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested) - pipeline.AuditCancellationRequested(); + pipeline.AuditCancellationRequested(auditor); throw new UnexpectedTransportException(killerException, seenExceptions) { Endpoint = endpoint, ApiCallDetails = response?.ApiCallDetails, - AuditTrail = pipeline.AuditTrail + AuditTrail = auditor }; } if (cancellationToken.IsCancellationRequested) { - pipeline.AuditCancellationRequested(); + pipeline.AuditCancellationRequested(auditor); break; } @@ -296,7 +274,7 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); - return FinalizeResponse(endpoint, requestData, data, pipeline, seenExceptions, response); + return FinalizeResponse(endpoint, requestData, data, pipeline, startedOn, auditor, seenExceptions, response); } finally { @@ -307,13 +285,13 @@ private async ValueTask RequestCoreAsync( private static void ThrowUnexpectedTransportException(Exception killerException, List seenExceptions, Endpoint endpoint, - TResponse response, RequestPipeline pipeline + TResponse response, IReadOnlyCollection? auditTrail ) where TResponse : TransportResponse, new() => throw new UnexpectedTransportException(killerException, seenExceptions) { Endpoint = endpoint, ApiCallDetails = response?.ApiCallDetails, - AuditTrail = pipeline.AuditTrail + AuditTrail = auditTrail }; private static void HandlePipelineException( @@ -328,19 +306,25 @@ ref List seenExceptions seenExceptions.Add(ex); } - private TResponse FinalizeResponse(Endpoint endpoint, RequestData requestData, PostData? postData, RequestPipeline pipeline, + private TResponse FinalizeResponse( + Endpoint endpoint, + RequestData requestData, + PostData? postData, + RequestPipeline pipeline, + DateTimeOffset startedOn, + Auditor auditor, List? seenExceptions, TResponse? response ) where TResponse : TransportResponse, new() { if (endpoint.IsEmpty) //foreach never ran - pipeline.ThrowNoNodesAttempted(endpoint, seenExceptions); + pipeline.ThrowNoNodesAttempted(endpoint, auditor, seenExceptions); var callDetails = GetMostRecentCallDetails(response, seenExceptions); - var clientException = pipeline.CreateClientException(response, callDetails, endpoint, requestData, seenExceptions); + var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, seenExceptions); if (response?.ApiCallDetails == null) - pipeline.BadResponse(ref response, callDetails, endpoint, requestData, postData, clientException); + pipeline.BadResponse(ref response, callDetails, endpoint, requestData, postData, clientException, auditor); HandleTransportException(requestData, clientException, response); return response; @@ -367,8 +351,8 @@ private void HandleTransportException(RequestData data, Exception clientExceptio a.OriginalException = clientException; //On .NET Core the TransportClient implementation throws exceptions on bad responses //This causes it to behave differently to .NET FULL. We already wrapped the WebException - //under TransportException and it exposes way more information as part of it's - //exception message e.g the the root cause of the server error body. + //under TransportException, and it exposes way more information as part of its + //exception message e.g. the root cause of the server error body. #if NETFRAMEWORK if (a.OriginalException is WebException) a.OriginalException = clientException; @@ -379,30 +363,30 @@ private void HandleTransportException(RequestData data, Exception clientExceptio if (data != null && clientException != null && data.ThrowExceptions) throw clientException; } - private void Ping(RequestPipeline pipeline, Node node) + private void Ping(RequestPipeline pipeline, Node node, Auditor? auditor) { try { - pipeline.Ping(node); + pipeline.Ping(node, auditor); } catch (PipelineException e) when (e.Recoverable) { if (_productRegistration.SupportsSniff) - pipeline.SniffOnConnectionFailure(); + pipeline.SniffOnConnectionFailure(auditor); throw; } } - private async Task PingAsync(RequestPipeline pipeline, Node node, CancellationToken cancellationToken) + private async Task PingAsync(RequestPipeline pipeline, Node node, Auditor? auditor, CancellationToken cancellationToken) { try { - await pipeline.PingAsync(node, cancellationToken).ConfigureAwait(false); + await pipeline.PingAsync(node, auditor, cancellationToken).ConfigureAwait(false); } catch (PipelineException e) when (e.Recoverable) { if (_productRegistration.SupportsSniff) - await pipeline.SniffOnConnectionFailureAsync(cancellationToken).ConfigureAwait(false); + await pipeline.SniffOnConnectionFailureAsync(auditor, cancellationToken).ConfigureAwait(false); throw; } } diff --git a/src/Elastic.Transport/Exceptions/TransportException.cs b/src/Elastic.Transport/Exceptions/TransportException.cs index fe94bb0..602626f 100644 --- a/src/Elastic.Transport/Exceptions/TransportException.cs +++ b/src/Elastic.Transport/Exceptions/TransportException.cs @@ -25,7 +25,7 @@ public class TransportException : Exception public TransportException(string message) : base(message) => FailureReason = PipelineFailure.Unexpected; /// - public TransportException(PipelineFailure failure, string message, Exception innerException) + public TransportException(PipelineFailure failure, string message, Exception? innerException = null) : base(message, innerException) => FailureReason = failure; /// @@ -41,7 +41,7 @@ public TransportException(PipelineFailure failure, string message, TransportResp /// The audit trail keeping track of what happened during the invocation of /// a request, up until the moment of this exception. /// - public IEnumerable AuditTrail { get; internal set; } + public IReadOnlyCollection? AuditTrail { get; internal init; } /// /// The reason this exception occurred was one of the well defined exit points as modelled by diff --git a/src/Elastic.Transport/Extensions/EmptyEnumerator.cs b/src/Elastic.Transport/Extensions/EmptyEnumerator.cs new file mode 100644 index 0000000..cac38ec --- /dev/null +++ b/src/Elastic.Transport/Extensions/EmptyEnumerator.cs @@ -0,0 +1,23 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections; +using System.Collections.Generic; + +namespace Elastic.Transport.Extensions; + +internal struct EmptyEnumerator : IEnumerator +{ + public T Current => default; + object IEnumerator.Current => Current!; + public bool MoveNext() => false; + + public void Reset() + { + } + + public void Dispose() + { + } +} diff --git a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs index 08bc240..34016df 100644 --- a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs +++ b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs @@ -24,17 +24,17 @@ internal ApiCallDetails() { } /// /// /// > - public IEnumerable AuditTrail { get; internal set; } + public IReadOnlyCollection? AuditTrail { get; internal set; } /// /// /// - internal IReadOnlyDictionary ThreadPoolStats { get; set; } + internal IReadOnlyDictionary? ThreadPoolStats { get; init; } /// /// /// - internal IReadOnlyDictionary TcpStats { get; set; } + internal IReadOnlyDictionary? TcpStats { get; init; } /// /// diff --git a/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs b/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs index 9e7e8f5..369f7a5 100644 --- a/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs +++ b/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs @@ -32,12 +32,9 @@ public void SameDefaults() config.Should().BeEquivalentTo(newConfig, c => c .Excluding(p=>p.BootstrapLock) - .Excluding(p=>p.NodePool.LastUpdate) ); config.BootstrapLock.CurrentCount.Should().Be(newConfig.BootstrapLock.CurrentCount); - config.NodePool.LastUpdate - .Should().BeCloseTo(newConfig.NodePool.LastUpdate, TimeSpan.FromMilliseconds(100)); } #if !NETFRAMEWORK