Skip to content

Commit

Permalink
Ensure we track retry attempts per request (#146)
Browse files Browse the repository at this point in the history
As the pipeline is now created once per transport, we need to track the
retires outside of the pipeline.

It also fixes two other bugs where we don't null check correctly.

Closes #145
  • Loading branch information
stevejgordon authored Nov 25, 2024
1 parent 1bd2db0 commit 6dc8ff9
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 39 deletions.
17 changes: 7 additions & 10 deletions src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private RequestConfiguration PingAndSniffRequestConfiguration
}
}

private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn);
private bool DepletedRetries(DateTimeOffset startedOn, int attemptedNodes) => attemptedNodes >= MaxRetries + 1 || IsTakingTooLong(startedOn);

private bool FirstPoolUsageNeedsSniffing =>
!RequestDisabledSniff
Expand All @@ -87,8 +87,6 @@ private bool IsTakingTooLong(DateTimeOffset startedOn)

private bool Refresh { get; set; }

private int Retried { get; set; }

private IEnumerable<Node> SniffNodes(Auditor? auditor) => _nodePool
.CreateView(auditor)
.ToList()
Expand Down Expand Up @@ -204,6 +202,7 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(
Endpoint endpoint,
Auditor? auditor,
DateTimeOffset startedOn,
int attemptedNodes,
List<PipelineException>? seenExceptions
)
where TResponse : TransportResponse, new()
Expand Down Expand Up @@ -231,15 +230,15 @@ private async ValueTask<TResponse> CallProductEndpointCoreAsync<TResponse>(
auditor?.Emit(MaxTimeoutReached);
exceptionMessage = "Maximum timeout reached while retrying request";
}
else if (Retried >= MaxRetries && MaxRetries > 0)
else if (attemptedNodes >= MaxRetries && MaxRetries > 0)
{
pipelineFailure = PipelineFailure.MaxRetriesReached;
auditor?.Emit(MaxRetriesReached);
exceptionMessage = "Maximum number of retries reached";

var now = _dateTimeProvider.Now();
var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now);
if (Retried >= activeNodes)
if (attemptedNodes >= activeNodes)
{
auditor?.Emit(FailedOverAllNodes);
exceptionMessage += ", failed over to all the known alive nodes before failing";
Expand Down Expand Up @@ -336,7 +335,6 @@ public void MarkDead(Node node)
{
var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout);
node.MarkDead(deadUntil);
Retried++;
}

/// Fast path for <see cref="NextNode"/> if only a single node could ever be yielded this save an IEnumerator allocation
Expand All @@ -355,12 +353,11 @@ public bool TryGetSingleNode(out Node? node)
}

/// returns a consistent enumerable view into the available nodes
public IEnumerable<Node> NextNode(DateTimeOffset startedOn, Auditor? auditor)
public IEnumerable<Node> NextNode(DateTimeOffset startedOn, int attemptedNodes, Auditor? auditor)
{
if (_boundConfiguration.ForceNode != null)
{
yield return new Node(_boundConfiguration.ForceNode);

yield break;
}

Expand All @@ -370,11 +367,11 @@ public IEnumerable<Node> NextNode(DateTimeOffset startedOn, Auditor? auditor)
var refreshed = false;
for (var i = 0; i < 100; i++)
{
if (DepletedRetries(startedOn)) yield break;
if (DepletedRetries(startedOn, attemptedNodes)) yield break;

foreach (var node in _nodePool.CreateView(auditor))
{
if (DepletedRetries(startedOn)) break;
if (DepletedRetries(startedOn, attemptedNodes)) break;

if (!_nodePredicate(node)) continue;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ internal HttpRequestInvoker(ResponseFactory responseFactory)
public HttpRequestInvoker(Func<HttpMessageHandler, BoundConfiguration, HttpMessageHandler> wrappingHandler) :
this(wrappingHandler, new DefaultResponseFactory()) { }

/// <summary>
/// Allows consumers to inject their own HttpMessageHandler, and optionally call our default implementation.
/// </summary>
public HttpRequestInvoker(Func<HttpMessageHandler, BoundConfiguration, HttpMessageHandler> wrappingHandler, ITransportConfiguration transportConfiguration) :
this(wrappingHandler, new DefaultResponseFactory())
{ }

internal HttpRequestInvoker(Func<HttpMessageHandler, BoundConfiguration, HttpMessageHandler> wrappingHandler, ResponseFactory responseFactory)
{
ResponseFactory = responseFactory;
Expand Down Expand Up @@ -203,16 +196,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
receivedResponse?.Dispose();
}

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return response;

var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);

if (attributes is null) return response;

foreach (var attribute in attributes)
Activity.Current?.SetTag(attribute.Key, attribute.Value);

RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response);
return response;
}
catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
receivedResponse?.Dispose();
}

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
{
var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);
foreach (var attribute in attributes)
{
Activity.Current?.SetTag(attribute.Key, attribute.Value);
}
}
RequestInvokerHelpers.SetOtelAttributes(boundConfiguration, response);

return response;
}
Expand Down Expand Up @@ -421,7 +414,7 @@ protected virtual void SetProxyIfNeeded(HttpWebRequest request, BoundConfigurati
protected virtual void SetAuthenticationIfNeeded(Endpoint endpoint, BoundConfiguration boundConfiguration, HttpWebRequest request)
{
//If user manually specifies an Authorization Header give it preference
if (boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization"))
if (boundConfiguration.Headers is not null && boundConfiguration.Headers.HasKeys() && boundConfiguration.Headers.AllKeys.Contains("Authorization"))
{
var header = boundConfiguration.Headers["Authorization"];
request.Headers["Authorization"] = header;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System.Diagnostics;
using Elastic.Transport.Diagnostics;

namespace Elastic.Transport;

internal static class RequestInvokerHelpers
{
public static void SetOtelAttributes<TResponse>(BoundConfiguration boundConfiguration, TResponse response) where TResponse : TransportResponse
{
if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
return;

var attributes = boundConfiguration.ConnectionSettings.ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails(response.ApiCallDetails);

if (attributes is null) return;

foreach (var attribute in attributes)
Activity.Current?.SetTag(attribute.Key, attribute.Value);
}
}
7 changes: 4 additions & 3 deletions src/Elastic.Transport/DistributedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
}
else
{
foreach (var node in pipeline.NextNode(startedOn, auditor))
foreach (var node in pipeline.NextNode(startedOn, attemptedNodes, auditor))
{
attemptedNodes++;
endpoint = endpoint with { Node = node };
Expand Down Expand Up @@ -265,7 +265,7 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
if (activity is { IsAllDataRequested: true })
OpenTelemetry.SetCommonAttributes(activity, Configuration);

return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response);
return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, attemptedNodes, auditor, seenExceptions, response);
}
finally
{
Expand Down Expand Up @@ -303,6 +303,7 @@ private TResponse FinalizeResponse<TResponse>(
PostData? postData,
RequestPipeline pipeline,
DateTimeOffset startedOn,
int attemptedNodes,
Auditor auditor,
List<PipelineException>? seenExceptions,
TResponse? response
Expand All @@ -312,7 +313,7 @@ private TResponse FinalizeResponse<TResponse>(
pipeline.ThrowNoNodesAttempted(endpoint, auditor, seenExceptions);

var callDetails = GetMostRecentCallDetails(response, seenExceptions);
var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, seenExceptions);
var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, attemptedNodes, seenExceptions);

if (response?.ApiCallDetails == null)
pipeline.BadResponse(ref response, callDetails, endpoint, boundConfiguration, postData, clientException, auditor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Xunit;
using System;
using FluentAssertions;
using System.Linq;

namespace Elastic.Transport.Tests.Components.NodePool
{
public class StaticNodePoolTests
{
[Fact]
public void MultipleRequests_WhenOnlyASingleEndpointIsConfigured_AndTheEndpointIsUnavailable_DoNotThrowAnException()
{
Node[] nodes = [new Uri("http://localhost:9200")];
var pool = new StaticNodePool(nodes);
var transport = new DistributedTransport(new TransportConfiguration(pool));

var response = transport.Request<StringResponse>(HttpMethod.GET, "/", null, null);

response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse();
response.ApiCallDetails.AuditTrail.Count.Should().Be(1);

var audit = response.ApiCallDetails.AuditTrail.First();
audit.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest);
audit.Node.FailedAttempts.Should().Be(1);
audit.Node.IsAlive.Should().BeFalse();

response = transport.Request<StringResponse>(HttpMethod.GET, "/", null, null);

response.ApiCallDetails.SuccessOrKnownError.Should().BeFalse();

var eventCount = 0;

foreach (var a in response.ApiCallDetails.AuditTrail)
{
eventCount++;

if (eventCount == 1)
{
a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.AllNodesDead);
}

if (eventCount == 2)
{
a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.Resurrection);
}

if (eventCount == 3)
{
a.Event.Should().Be(Diagnostics.Auditing.AuditEvent.BadRequest);
audit.Node.FailedAttempts.Should().Be(2);
audit.Node.IsAlive.Should().BeFalse();
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using Xunit;
using System;
using FluentAssertions;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
using System.Collections.Generic;
using System.IO;
using Elastic.Transport.Diagnostics;
using System.Net.NetworkInformation;

namespace Elastic.Transport.Tests.Components.TransportClient
{
public class RequestInvokerTests
{
[Fact]
public void NoExceptionShouldBeThrown_WhenHttpResponseDoesNotIncludeCloudHeaders()
{
// This test validates that if `ProductRegistration.ParseOpenTelemetryAttributesFromApiCallDetails` returns null,
// no exception is thrown and attributes are not set.

using var listener = new ActivityListener
{
ActivityStarted = _ => { },
ActivityStopped = activity => { },
ShouldListenTo = activitySource => activitySource.Name == OpenTelemetry.ElasticTransportActivitySourceName,
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData
};
ActivitySource.AddActivityListener(listener);

var requestInvoker = new HttpRequestInvoker(new TestResponseFactory());
var pool = new SingleNodePool(new Uri("http://localhost:9200"));
var config = new TransportConfiguration(pool, requestInvoker);
var transport = new DistributedTransport(config);

var response = transport.Head("/");
response.ApiCallDetails.HttpStatusCode.Should().Be(200);
}

private sealed class TestResponseFactory : ResponseFactory
{
public override TResponse Create<TResponse>(
Endpoint endpoint,
BoundConfiguration boundConfiguration,
PostData postData,
Exception ex,
int? statusCode,
Dictionary<string, IEnumerable<string>> headers,
Stream responseStream,
string contentType,
long contentLength,
IReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats,
IReadOnlyDictionary<TcpState, int> tcpStats) => CreateResponse<TResponse>();

public override Task<TResponse> CreateAsync<TResponse>(
Endpoint endpoint,
BoundConfiguration boundConfiguration,
PostData postData,
Exception ex,
int? statusCode,
Dictionary<string, IEnumerable<string>> headers,
Stream responseStream,
string contentType,
long contentLength,
IReadOnlyDictionary<string, ThreadPoolStatistics> threadPoolStats,
IReadOnlyDictionary<TcpState, int> tcpStats,
CancellationToken cancellationToken = default)
{
var response = CreateResponse<TResponse>();
return Task.FromResult(response);
}

private static TResponse CreateResponse<TResponse>() where TResponse : TransportResponse, new() => new TResponse
{
ApiCallDetails = new() { HttpStatusCode = 200, Uri = new Uri("http://localhost/") }
};
}
}
}

0 comments on commit 6dc8ff9

Please sign in to comment.