Skip to content

Commit

Permalink
Add per request configuration and response builder support (#126)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz authored Oct 30, 2024
1 parent 63e90cb commit 6f6f064
Show file tree
Hide file tree
Showing 18 changed files with 166 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

#nullable enable
namespace Elastic.Transport.VirtualizedCluster.Components;

/// <summary>
Expand All @@ -14,7 +15,7 @@ public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider da
DateTimeProvider = dateTimeProvider;
MemoryStreamFactory = TransportConfiguration.DefaultMemoryStreamFactory;
Configuration = configuration;
Pipeline = Create(Configuration, DateTimeProvider, MemoryStreamFactory, new DefaultRequestParameters());
Pipeline = Create(Configuration, DateTimeProvider, MemoryStreamFactory, null);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider, MemoryStreamFactory);
}

Expand All @@ -26,6 +27,6 @@ public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider da
public ITransport<TConfiguration> RequestHandler { get; }

public override RequestPipeline Create(TConfiguration configurationValues, DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory, RequestParameters requestParameters) =>
new DefaultRequestPipeline<TConfiguration>(Configuration, DateTimeProvider, MemoryStreamFactory, requestParameters ?? new DefaultRequestParameters());
MemoryStreamFactory memoryStreamFactory, IRequestConfiguration? requestConfiguration) =>
new DefaultRequestPipeline<TConfiguration>(Configuration, DateTimeProvider, MemoryStreamFactory, requestConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,28 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings, _dateTimeProvider);

_syncCall = (t, r) => t.Request<VirtualResponse>(
HttpMethod.GET, "/",
PostData.Serializable(new {}), new DefaultRequestParameters()
{
RequestConfiguration = r?.Invoke(new RequestConfigurationDescriptor(null))
});
method: HttpMethod.GET,
path: "/",
postData: PostData.Serializable(new { }),
requestParameters: new DefaultRequestParameters(),
openTelemetryData: default,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor(null)),
responseBuilder: null
);
_asyncCall = async (t, r) =>
{
var res = await t.RequestAsync<VirtualResponse>
(
HttpMethod.GET, "/",
PostData.Serializable(new { }),
new DefaultRequestParameters()
{
RequestConfiguration = r?.Invoke(new RequestConfigurationDescriptor(null))
},
method: HttpMethod.GET,
path: "/",
postData: PostData.Serializable(new { }),
requestParameters: new DefaultRequestParameters(),
openTelemetryData: default,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor(null)),
responseBuilder: null,
CancellationToken.None
).ConfigureAwait(false);
return (TransportResponse)res;
return res;
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal DefaultRequestPipeline(
TConfiguration configurationValues,
DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory,
RequestParameters requestParameters
IRequestConfiguration? requestConfiguration
)
{
_settings = configurationValues;
Expand All @@ -47,7 +47,7 @@ RequestParameters requestParameters
_productRegistration = configurationValues.ProductRegistration;
_responseBuilder = _productRegistration.ResponseBuilder;
_nodePredicate = _settings.NodePredicate ?? _productRegistration.NodePredicate;
RequestConfiguration = requestParameters?.RequestConfiguration;
RequestConfiguration = requestConfiguration;
StartedOn = dateTimeProvider.Now();
}

Expand Down
71 changes: 15 additions & 56 deletions src/Elastic.Transport/Components/Pipeline/RequestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,29 @@ public sealed class RequestData
public const string OpaqueIdHeader = "X-Opaque-Id";
public const string RunAsSecurityHeader = "es-security-runas-user";

private Uri _requestUri;
private Node _node;
private Uri? _requestUri;
private Node? _node;

public RequestData(
HttpMethod method, string path,
PostData data,
HttpMethod method,
string pathAndQuery,
PostData? data,
ITransportConfiguration global,
RequestParameters local,
IRequestConfiguration? local,
CustomResponseBuilder? customResponseBuilder,
MemoryStreamFactory memoryStreamFactory,
OpenTelemetryData openTelemetryData
)
: this(method, data, global, local?.RequestConfiguration, memoryStreamFactory)
{
_path = path;
OpenTelemetryData = openTelemetryData;
CustomResponseBuilder = local?.CustomResponseBuilder;
PathAndQuery = CreatePathWithQueryStrings(path, ConnectionSettings, local);
}

private RequestData(HttpMethod method,
PostData data,
ITransportConfiguration global,
IRequestConfiguration local,
MemoryStreamFactory memoryStreamFactory
)
{
CustomResponseBuilder = customResponseBuilder;
ConnectionSettings = global;
MemoryStreamFactory = memoryStreamFactory;
Method = method;
PostData = data;

PathAndQuery = pathAndQuery;

if (data != null)
data.DisableDirectStreaming = local?.DisableDirectStreaming ?? global.DisableDirectStreaming;

Expand Down Expand Up @@ -119,19 +111,15 @@ MemoryStreamFactory memoryStreamFactory
ResponseHeadersToParse = new HeadersList(local.ResponseHeadersToParse, global.ResponseHeadersToParse);
}
else
{
ResponseHeadersToParse = global.ResponseHeadersToParse;
}
}

private readonly string _path;

public string Accept { get; }
public IReadOnlyCollection<int> AllowedStatusCodes { get; }
public AuthorizationHeader AuthenticationHeader { get; }
public X509CertificateCollection ClientCertificates { get; }
public ITransportConfiguration ConnectionSettings { get; }
public CustomResponseBuilder CustomResponseBuilder { get; }
public CustomResponseBuilder? CustomResponseBuilder { get; }
public bool DisableAutomaticProxyDetection { get; }
public HeadersList ResponseHeadersToParse { get; }
public bool ParseAllHeaders { get; }
Expand All @@ -143,7 +131,7 @@ MemoryStreamFactory memoryStreamFactory
public MemoryStreamFactory MemoryStreamFactory { get; }
public HttpMethod Method { get; }

public Node? Node
public Node Node
{
get => _node;
set
Expand All @@ -159,13 +147,13 @@ public Node? Node
public string PathAndQuery { get; }
public TimeSpan PingTimeout { get; }
public bool Pipelined { get; }
public PostData PostData { get; }
public PostData? PostData { get; }
public string ProxyAddress { get; }
public string ProxyPassword { get; }
public string ProxyUsername { get; }
public string ContentType { get; }
public TimeSpan RequestTimeout { get; }
public string RunAs { get; }
public string? RunAs { get; }
public IReadOnlyCollection<int> SkipDeserializationForStatusCodes { get; }
public bool ThrowExceptions { get; }
public UserAgent UserAgent { get; }
Expand Down Expand Up @@ -197,35 +185,7 @@ public Uri Uri

internal OpenTelemetryData OpenTelemetryData { get; }

public override string ToString() => $"{Method.GetStringValue()} {_path}";

// TODO This feels like its in the wrong place
private string CreatePathWithQueryStrings(string path, ITransportConfiguration global, RequestParameters request)
{
path ??= string.Empty;
if (path.Contains("?"))
throw new ArgumentException($"{nameof(path)} can not contain querystring parameters and needs to be already escaped");

var g = global.QueryStringParameters;
var l = request?.QueryString;

if ((g == null || g.Count == 0) && (l == null || l.Count == 0)) return path;

//create a copy of the global query string collection if needed.
var nv = g == null ? new NameValueCollection() : new NameValueCollection(g);

//set all querystring pairs from local `l` on the querystring collection
var formatter = ConnectionSettings.UrlFormatter;
nv.UpdateFromDictionary(l, formatter);

//if nv has no keys simply return path as provided
if (!nv.HasKeys()) return path;

//create string for query string collection where key and value are escaped properly.
var queryString = ToQueryString(nv);
path += queryString;
return path;
}
public override string ToString() => $"{Method.GetStringValue()} {PathAndQuery}";

internal bool ValidateResponseContentType(string responseMimeType)
{
Expand All @@ -248,6 +208,5 @@ internal bool ValidateResponseContentType(string responseMimeType)
|| trimmedAccept.Contains("application/vnd.elasticsearch+json") && trimmedResponseMimeType.StartsWith(DefaultMimeType, StringComparison.OrdinalIgnoreCase);
}

public static string ToQueryString(NameValueCollection collection) => collection.ToQueryString();
#pragma warning restore 1591
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ internal sealed class DefaultRequestPipelineFactory<TConfiguration> : RequestPip
/// returns instances of <see cref="DefaultRequestPipeline{TConfiguration}"/>
/// </summary>
public override RequestPipeline Create(TConfiguration configurationValues, DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory, RequestParameters requestParameters) =>
new DefaultRequestPipeline<TConfiguration>(configurationValues, dateTimeProvider, memoryStreamFactory, requestParameters);
MemoryStreamFactory memoryStreamFactory, IRequestConfiguration? requestConfiguration) =>
new DefaultRequestPipeline<TConfiguration>(configurationValues, dateTimeProvider, memoryStreamFactory, requestConfiguration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ internal RequestPipelineFactory() { }

/// <summary> Create an instance of <see cref="RequestPipeline"/> </summary>
public abstract RequestPipeline Create(TConfiguration configuration, DateTimeProvider dateTimeProvider,
MemoryStreamFactory memoryStreamFactory, RequestParameters requestParameters);
MemoryStreamFactory memoryStreamFactory, IRequestConfiguration? requestParameters);
}
10 changes: 5 additions & 5 deletions src/Elastic.Transport/Configuration/ITransportConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public interface ITransportConfiguration : IDisposable
/// <summary>
/// Try to send these headers for every request
/// </summary>
NameValueCollection Headers { get; }
NameValueCollection? Headers { get; }

/// <summary>
/// Whether HTTP pipelining is enabled. The default is <c>true</c>
Expand Down Expand Up @@ -143,18 +143,18 @@ public interface ITransportConfiguration : IDisposable
/// When using static or single node connection pooling it is assumed the list of node you instantiate the client with should be taken
/// verbatim.
/// </summary>
Func<Node, bool> NodePredicate { get; }
Func<Node, bool>? NodePredicate { get; }

/// <summary>
/// Allows you to register a callback every time a an API call is returned
/// </summary>
Action<ApiCallDetails> OnRequestCompleted { get; }
Action<ApiCallDetails>? OnRequestCompleted { get; }

/// <summary>
/// An action to run when the <see cref="RequestData" /> for a request has been
/// created.
/// </summary>
Action<RequestData> OnRequestDataCreated { get; }
Action<RequestData>? OnRequestDataCreated { get; }

/// <summary>
/// When enabled, all headers from the HTTP response will be included in the <see cref="ApiCallDetails"/>.
Expand Down Expand Up @@ -184,7 +184,7 @@ public interface ITransportConfiguration : IDisposable
/// <summary>
/// Append these query string parameters automatically to every request
/// </summary>
NameValueCollection QueryStringParameters { get; }
NameValueCollection? QueryStringParameters { get; }

/// <summary>The serializer to use to serialize requests and deserialize responses</summary>
Serializer RequestResponseSerializer { get; }
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Transport/Configuration/RequestConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public interface IRequestConfiguration
/// <summary>
/// Holds additional meta data about the request.
/// </summary>
RequestMetaData RequestMetaData { get; set; }
RequestMetaData? RequestMetaData { get; set; }
}

/// <inheritdoc cref="IRequestConfiguration"/>
Expand Down Expand Up @@ -192,7 +192,7 @@ public class RequestConfiguration : IRequestConfiguration
public class RequestConfigurationDescriptor : IRequestConfiguration
{
/// <inheritdoc cref="IRequestConfiguration"/>
public RequestConfigurationDescriptor(IRequestConfiguration config)
public RequestConfigurationDescriptor(IRequestConfiguration? config)
{
Self.RequestTimeout = config?.RequestTimeout;
Self.PingTimeout = config?.PingTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ public readonly struct OpenTelemetryData
/// <summary>
/// The name to use for spans relating to a request.
/// </summary>
public readonly string? SpanName { get; init; }
public string? SpanName { get; init; }

/// <summary>
/// Additional span attributes for transport spans relating to a request.
/// </summary>
public readonly Dictionary<string, object>? SpanAttributes { get; init; }
public Dictionary<string, object>? SpanAttributes { get; init; }
}
20 changes: 14 additions & 6 deletions src/Elastic.Transport/DistributedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,13 @@ public TResponse Request<TResponse>(
string path,
PostData? data,
RequestParameters? requestParameters,
in OpenTelemetryData openTelemetryData
in OpenTelemetryData openTelemetryData,
IRequestConfiguration? localConfiguration,
CustomResponseBuilder? responseBuilder
)
where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(false, method, path, data, requestParameters, openTelemetryData).EnsureCompleted();
RequestCoreAsync<TResponse>(isAsync: false,
method, path, data, requestParameters, openTelemetryData, localConfiguration, responseBuilder).EnsureCompleted();

/// <inheritdoc cref="ITransport.RequestAsync{TResponse}"/>
public Task<TResponse> RequestAsync<TResponse>(
Expand All @@ -108,10 +111,13 @@ public Task<TResponse> RequestAsync<TResponse>(
PostData? data,
RequestParameters? requestParameters,
in OpenTelemetryData openTelemetryData,
IRequestConfiguration? localConfiguration,
CustomResponseBuilder? responseBuilder,
CancellationToken cancellationToken = default
)
where TResponse : TransportResponse, new() =>
RequestCoreAsync<TResponse>(true, method, path, data, requestParameters, openTelemetryData, cancellationToken).AsTask();
RequestCoreAsync<TResponse>(isAsync: true,
method, path, data, requestParameters, openTelemetryData, localConfiguration, responseBuilder, cancellationToken).AsTask();

private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
bool isAsync,
Expand All @@ -120,6 +126,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
PostData? data,
RequestParameters? requestParameters,
OpenTelemetryData openTelemetryData,
IRequestConfiguration? localRequestConfiguration,
CustomResponseBuilder? customResponseBuilder,
CancellationToken cancellationToken = default
)
where TResponse : TransportResponse, new()
Expand All @@ -132,15 +140,15 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(

try
{
using var pipeline =
PipelineProvider.Create(Configuration, DateTimeProvider, MemoryStreamFactory, requestParameters);
using var pipeline = PipelineProvider.Create(Configuration, DateTimeProvider, MemoryStreamFactory, localRequestConfiguration);

if (isAsync)
await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, cancellationToken).ConfigureAwait(false);
else
pipeline.FirstPoolUsage(Configuration.BootstrapLock);

var requestData = new RequestData(method, path, data, Configuration, requestParameters, MemoryStreamFactory, openTelemetryData);
var pathAndQuery = requestParameters?.CreatePathWithQueryStrings(path, Configuration) ?? path;
var requestData = new RequestData(method, pathAndQuery, data, Configuration, localRequestConfiguration, customResponseBuilder, MemoryStreamFactory, openTelemetryData);
Configuration.OnRequestDataCreated?.Invoke(requestData);
TResponse response = null;

Expand Down
Loading

0 comments on commit 6f6f064

Please sign in to comment.