Skip to content

Commit

Permalink
dispose clients properly when running in memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Fredrik Arvidsson committed Jan 20, 2022
1 parent 3dafc7b commit 73e0b24
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 2 deletions.
15 changes: 15 additions & 0 deletions src/Kafka.TestFramework/AsyncDisposableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Kafka.TestFramework
{
internal static class AsyncDisposableExtensions
{
internal static Task DisposeAllAsync(
this IEnumerable<IAsyncDisposable> disposables)
=> disposables.Select(client => client.DisposeAsync())
.WhenAllAsync();
}
}
34 changes: 32 additions & 2 deletions src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System.Threading;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Kafka.Protocol;

namespace Kafka.TestFramework
{
Expand All @@ -25,7 +28,34 @@ public async Task<IRequestClient> CreateRequestClientAsync(
await _clients
.SendAsync(responseClient, cancellationToken)
.ConfigureAwait(false);
return RequestClient.Start(requestClient);
return new DisposableRequestClientDecorator(RequestClient.Start(requestClient), responseClient, second, first);
}

private class DisposableRequestClientDecorator : IRequestClient
{
private readonly IRequestClient _requestClient;
private readonly List<IAsyncDisposable> _disposables;

public DisposableRequestClientDecorator(IRequestClient requestClient, params IAsyncDisposable[] disposables)
{
_requestClient = requestClient;
_disposables = new List<IAsyncDisposable>(disposables) { requestClient };
}
public async ValueTask DisposeAsync()
{
await _disposables.DisposeAllAsync()
.ConfigureAwait(false);
}

public ValueTask<ResponsePayload> ReadAsync(RequestPayload requestPayload, CancellationToken cancellationToken = default)
{
return _requestClient.ReadAsync(requestPayload, cancellationToken);
}

public ValueTask SendAsync(RequestPayload payload, CancellationToken cancellationToken = default)
{
return _requestClient.SendAsync(payload, cancellationToken);
}
}
}
}
17 changes: 17 additions & 0 deletions src/Kafka.TestFramework/ValueTaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace Kafka.TestFramework
{
internal static class ValueTaskExtensions
{
internal static Task WhenAllAsync(
this IEnumerable<ValueTask> tasks)
=> Task.WhenAll(
tasks.Where(
valueTask
=> !valueTask.IsCompletedSuccessfully)
.Select(valueTask => valueTask.AsTask()));
}
}

0 comments on commit 73e0b24

Please sign in to comment.