Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor response handling and update project structure #131

Merged
merged 3 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Elastic.Transport.sln
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Playground", "Playground\Pl
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Elastic.Elasticsearch.IntegrationTests", "tests\Elastic.Elasticsearch.IntegrationTests\Elastic.Elasticsearch.IntegrationTests.csproj", "{317C118F-FA1E-499A-B7F2-DC932DE66CB8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Transport.Tests.Shared", "tests\Elastic.Transport.Tests.Shared\Elastic.Transport.Tests.Shared.csproj", "{13A2597D-F50C-4D7F-ADA9-716991C8E9DE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -86,6 +88,10 @@ Global
{317C118F-FA1E-499A-B7F2-DC932DE66CB8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{317C118F-FA1E-499A-B7F2-DC932DE66CB8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{317C118F-FA1E-499A-B7F2-DC932DE66CB8}.Release|Any CPU.Build.0 = Release|Any CPU
{13A2597D-F50C-4D7F-ADA9-716991C8E9DE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{13A2597D-F50C-4D7F-ADA9-716991C8E9DE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{13A2597D-F50C-4D7F-ADA9-716991C8E9DE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{13A2597D-F50C-4D7F-ADA9-716991C8E9DE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -100,6 +106,7 @@ Global
{ED4E89BE-FBE9-4876-979C-63A0E3BC5419} = {BBB0AC81-F09D-4895-84E2-7E933D608E78}
{5EE4DC72-B337-448B-802A-6158F4D90667} = {7610B796-BB3E-4CB2-8296-79BBFF6D23FC}
{317C118F-FA1E-499A-B7F2-DC932DE66CB8} = {3582B07D-C2B0-49CC-B676-EAF806EB010E}
{13A2597D-F50C-4D7F-ADA9-716991C8E9DE} = {3582B07D-C2B0-49CC-B676-EAF806EB010E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {7F60C4BB-6216-4E50-B1E4-9C38EB484843}
Expand Down
2 changes: 1 addition & 1 deletion Playground/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

var registration = new ElasticsearchProductRegistration(typeof(Elastic.Clients.Elasticsearch.ElasticsearchClient));

Console.WriteLine(registration.DefaultMimeType ?? "NOT SPECIFIED");
Console.WriteLine(registration.DefaultContentType ?? "NOT SPECIFIED");
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider da
{
DateTimeProvider = dateTimeProvider;
Configuration = configuration;
Pipeline = Create(new RequestData(Configuration, null, null), DateTimeProvider);
Pipeline = Create(new RequestData(Configuration, null), DateTimeProvider);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
}

Expand All @@ -28,3 +28,4 @@ public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider da
public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
new DefaultRequestPipeline(requestData, DateTimeProvider);
}
#nullable restore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Transport.VirtualizedCluster.Products;
Expand Down Expand Up @@ -101,6 +102,8 @@ private static object DefaultResponse
}
}

public ResponseFactory ResponseFactory => _inMemoryRequestInvoker.ResponseFactory;

private void UpdateCluster(VirtualCluster cluster)
{
lock (Lock)
Expand All @@ -109,7 +112,6 @@ private void UpdateCluster(VirtualCluster cluster)
_calls = cluster.Nodes.ToDictionary(n => n.Uri.Port, v => new State());
_productRegistration = cluster.ProductRegistration;
}

}

private bool IsSniffRequest(Endpoint endpoint) => _productRegistration.IsSniffRequest(endpoint);
Expand Down Expand Up @@ -173,7 +175,7 @@ public TResponse Request<TResponse>(Endpoint endpoint, RequestData requestData,
}
catch (TheException e)
{
return requestData.ConnectionSettings.ProductRegistration.ResponseBuilder.ToResponse<TResponse>(endpoint, requestData, postData, e, null, null, Stream.Null, null, -1, null, null);
return ResponseFactory.Create<TResponse>(endpoint, requestData, postData, e, null, null, Stream.Null, null, -1, null, null);
}
}

Expand Down Expand Up @@ -326,3 +328,4 @@ private class State
public int Successes;
}
}
#nullable restore
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
path: RootPath,
postData: PostData.Serializable(new { }),
openTelemetryData: default,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()),
responseBuilder: null
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor())
);
_asyncCall = async (t, r) =>
{
Expand All @@ -43,14 +42,13 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport
postData: PostData.Serializable(new { }),
openTelemetryData: default,
localConfiguration: r?.Invoke(new RequestConfigurationDescriptor()),
responseBuilder: null,
CancellationToken.None
).ConfigureAwait(false);
return res;
};
}

public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.Connection as VirtualClusterRequestInvoker;
public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.RequestInvoker as VirtualClusterRequestInvoker;
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public ElasticsearchVirtualCluster MasterEligible(params int[] ports)
foreach (var node in InternalNodes.Where(n => !ports.Contains(n.Uri.Port)))
{
var currentFeatures = node.Features.Count == 0 ? ElasticsearchNodeFeatures.Default : node.Features;
node.Features = currentFeatures.Except(new[] { ElasticsearchNodeFeatures.MasterEligible }).ToList().AsReadOnly();
node.Features = currentFeatures.Except([ElasticsearchNodeFeatures.MasterEligible]).ToList().AsReadOnly();
}
return this;
}
Expand All @@ -77,7 +77,7 @@ public ElasticsearchVirtualCluster StoresNoData(params int[] ports)
foreach (var node in InternalNodes.Where(n => ports.Contains(n.Uri.Port)))
{
var currentFeatures = node.Features.Count == 0 ? ElasticsearchNodeFeatures.Default : node.Features;
node.Features = currentFeatures.Except(new[] { ElasticsearchNodeFeatures.HoldsData }).ToList().AsReadOnly();
node.Features = currentFeatures.Except([ElasticsearchNodeFeatures.HoldsData]).ToList().AsReadOnly();
}
return this;
}
Expand All @@ -88,7 +88,7 @@ public VirtualCluster HttpDisabled(params int[] ports)
foreach (var node in InternalNodes.Where(n => ports.Contains(n.Uri.Port)))
{
var currentFeatures = node.Features.Count == 0 ? ElasticsearchNodeFeatures.Default : node.Features;
node.Features = currentFeatures.Except(new[] { ElasticsearchNodeFeatures.HttpEnabled }).ToList().AsReadOnly();
node.Features = currentFeatures.Except([ElasticsearchNodeFeatures.HttpEnabled]).ToList().AsReadOnly();
}
return this;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Elastic.Transport.VirtualizedCluster/Rules/RuleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public TRule ReturnResponse<T>(T response)
r = ms.ToArray();
}
Self.ReturnResponse = r;
Self.ReturnContentType = RequestData.DefaultMimeType;
Self.ReturnContentType = RequestData.DefaultContentType;
return (TRule)this;
}

public TRule ReturnByteResponse(byte[] response, string responseContentType = RequestData.DefaultMimeType)
public TRule ReturnByteResponse(byte[] response, string responseContentType = RequestData.DefaultContentType)
{
Self.ReturnResponse = response;
Self.ReturnContentType = responseContentType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ public class DefaultRequestPipeline : RequestPipeline
private readonly MemoryStreamFactory _memoryStreamFactory;
private readonly Func<Node, bool> _nodePredicate;
private readonly ProductRegistration _productRegistration;
private readonly ResponseBuilder _responseBuilder;

private RequestConfiguration? _pingAndSniffRequestConfiguration;
private List<Audit>? _auditTrail;
private readonly ITransportConfiguration _settings;
Expand All @@ -36,13 +34,11 @@ internal DefaultRequestPipeline(RequestData requestData, DateTimeProvider dateTi
{
_requestData = requestData;
_settings = requestData.ConnectionSettings;

_nodePool = requestData.ConnectionSettings.NodePool;
_requestInvoker = requestData.ConnectionSettings.Connection;
_requestInvoker = requestData.ConnectionSettings.RequestInvoker;
_dateTimeProvider = dateTimeProvider;
_memoryStreamFactory = requestData.MemoryStreamFactory;
_productRegistration = requestData.ConnectionSettings.ProductRegistration;
_responseBuilder = _productRegistration.ResponseBuilder;
_nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate;

StartedOn = dateTimeProvider.Now();
Expand Down Expand Up @@ -148,8 +144,8 @@ public override void BadResponse<TResponse>(ref TResponse response, ApiCallDetai
{
//make sure we copy over the error body in case we disabled direct streaming.
var s = callDetails?.ResponseBodyInBytes == null ? Stream.Null : _memoryStreamFactory.Create(callDetails.ResponseBodyInBytes);
var m = callDetails?.ResponseMimeType ?? RequestData.DefaultMimeType;
response = _responseBuilder.ToResponse<TResponse>(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null);
var m = callDetails?.ResponseContentType ?? RequestData.DefaultContentType;
response = _requestInvoker.ResponseFactory.Create<TResponse>(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null);
}

response.ApiCallDetails.AuditTrail = AuditTrail;
Expand Down Expand Up @@ -447,8 +443,9 @@ public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellati
foreach (var node in SniffNodes)
{
var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings);

//TODO remove
var requestData = new RequestData(_settings, null, null);
var requestData = new RequestData(_settings, null);

using var audit = Audit(SniffSuccess, node);

Expand Down
Loading
Loading