Skip to content

Commit

Permalink
Merge pull request #6 from Fresa/add-tests-for-kafka-2-5-0
Browse files Browse the repository at this point in the history
Add tests for kafka 2 5 0
  • Loading branch information
Fresa authored Jul 23, 2024
2 parents 15acdc3 + 2516213 commit e056789
Show file tree
Hide file tree
Showing 26 changed files with 1,038 additions and 178 deletions.
14 changes: 14 additions & 0 deletions Kafka.TestFramework.sln
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework", "src\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.1.1.0.Tests", "tests\Confluent.Kafka.1.1.0.Tests\Confluent.Kafka.1.1.0.Tests.csproj", "{12646283-CC9E-4D45-8E89-F52337C790B6}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestSpecification", "tests\Kafka.TestSpecification\Kafka.TestSpecification.csproj", "{4676B0E5-6641-426B-986D-49D336B0E5E6}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.2.4.0.Tests", "tests\Confluent.Kafka.2.4.0.Tests\Confluent.Kafka.2.4.0.Tests.csproj", "{B06C41D0-DB18-47F2-A280-0104FE5843CC}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -37,13 +41,23 @@ Global
{12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.Build.0 = Release|Any CPU
{4676B0E5-6641-426B-986D-49D336B0E5E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4676B0E5-6641-426B-986D-49D336B0E5E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4676B0E5-6641-426B-986D-49D336B0E5E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4676B0E5-6641-426B-986D-49D336B0E5E6}.Release|Any CPU.Build.0 = Release|Any CPU
{B06C41D0-DB18-47F2-A280-0104FE5843CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B06C41D0-DB18-47F2-A280-0104FE5843CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B06C41D0-DB18-47F2-A280-0104FE5843CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B06C41D0-DB18-47F2-A280-0104FE5843CC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{FCFCC73C-B0A6-4D4E-A765-9A94303D00AD} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
{12646283-CC9E-4D45-8E89-F52337C790B6} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
{4676B0E5-6641-426B-986D-49D336B0E5E6} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
{B06C41D0-DB18-47F2-A280-0104FE5843CC} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B432BD60-300C-4955-9D44-011413D52F2D}
Expand Down
2 changes: 2 additions & 0 deletions Kafka.TestFramework.v3.ncrunchsolution
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
<SolutionConfiguration>
<Settings>
<AllowParallelTestExecution>True</AllowParallelTestExecution>
<EnableRDI>True</EnableRDI>
<RdiConfigured>True</RdiConfigured>
<SolutionConfigured>True</SolutionConfigured>
</Settings>
</SolutionConfiguration>
22 changes: 21 additions & 1 deletion src/Kafka.TestFramework/SocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,27 @@ private void StartAcceptingClients()
var clientSocket = await _clientAcceptingSocket
.AcceptAsync()
.ConfigureAwait(false);
Logger.Debug("Client connected {@clientSocket}", clientSocket);
Logger.Debug("Client connected {@clientSocket}", new
{
clientSocket.AddressFamily,
clientSocket.Available,
clientSocket.Connected,
clientSocket.ProtocolType,
clientSocket.ReceiveTimeout,
clientSocket.ReceiveBufferSize,
clientSocket.Ttl,
clientSocket.SendBufferSize,
clientSocket.SendTimeout,
RemoteEndPoint = new
{
clientSocket.RemoteEndPoint.AddressFamily
},
clientSocket.SocketType,
LocalEndPoint = new
{
clientSocket.LocalEndPoint.AddressFamily
}
});

await _waitingClients
.SendAsync(
Expand Down
13 changes: 0 additions & 13 deletions tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.1.0" />
<PackageReference Include="Confluent.Kafka" Version="1.8.2" />
<PackageReference Include="FluentAssertions" Version="5.9.0" />
<PackageReference Include="Kafka.Protocol" Version="2.0.3" />
<PackageReference Include="Log.It.With.NLog" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.1" />
<PackageReference Include="System.Linq.Async" Version="4.1.1" />
<PackageReference Include="Test.It" Version="2.2.0" />
<PackageReference Include="Test.It.With.XUnit" Version="2.2.4" />
Expand All @@ -24,6 +23,7 @@

<ItemGroup>
<ProjectReference Include="..\..\src\Kafka.TestFramework\Kafka.TestFramework.csproj" />
<ProjectReference Include="..\Kafka.TestSpecification\Kafka.TestSpecification.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using FluentAssertions;
using Kafka.Protocol;
using Kafka.Protocol.Records;
using Xunit;
using Xunit.Abstractions;
using Int32 = Kafka.Protocol.Int32;
using Record = Kafka.Protocol.Records.Record;

namespace Kafka.TestFramework.Tests
{
public partial class Given_a_socket_based_test_server
{
public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync
{
private SocketBasedKafkaTestFramework _testServer;
private readonly List<string> _result = new List<string>();
private const int NumberOfMessage = 5;

public When_connecting_to_the_server_and_consuming_a_message(
ITestOutputHelper testOutputHelper)
: base(testOutputHelper)
{
}

protected override Task GivenAsync()
{
_testServer = KafkaTestFramework.WithSocket();

_testServer.On<ApiVersionsRequest, ApiVersionsResponse>(
request => request.Respond()
.WithAllApiKeys());

_testServer.On<MetadataRequest, MetadataResponse>(
request => request.Respond()
.WithTopicsCollection(
request.TopicsCollection?.Select(topic =>
new Func<MetadataResponse.MetadataResponseTopic,
MetadataResponse.MetadataResponseTopic>(
responseTopic =>
responseTopic
.WithName(topic.Name)
.WithPartitionsCollection(partition =>
partition
.WithLeaderId(0)
.WithPartitionIndex(0))))
.ToArray() ??
Array.Empty<Func<MetadataResponse.MetadataResponseTopic,
MetadataResponse.MetadataResponseTopic>>())
.WithBrokersCollection(broker => broker
.WithHost("localhost")
.WithPort(_testServer.Port))
);

_testServer.On<FindCoordinatorRequest, FindCoordinatorResponse>(
request => request.Respond()
.WithHost("localhost")
.WithPort(_testServer.Port)
);

_testServer.On<JoinGroupRequest, JoinGroupResponse>(
request => request.Respond()
.WithProtocolName(request.ProtocolsCollection.First().Value.Name));

_testServer.On<SyncGroupRequest, SyncGroupResponse>(
async (request, cancellationToken) => request.Respond()
.WithAssignment(
await new ConsumerProtocolAssignment(ConsumerProtocolAssignment.MaxVersion)
.WithAssignedPartitionsCollection(partition => partition
.WithTopic("topic1")
.WithPartitionsCollection(new Int32[] { 0 }))
.ToBytesAsync(cancellationToken)
.ConfigureAwait(false))
);

_testServer.On<OffsetFetchRequest, OffsetFetchResponse>(request => request.Respond()
.WithTopicsCollection(
request.TopicsCollection?.Select(topic =>
new Func<OffsetFetchResponse.OffsetFetchResponseTopic,
OffsetFetchResponse.OffsetFetchResponseTopic>(responseTopic =>
responseTopic
.WithName(topic.Name)
.WithPartitionsCollection(topic.PartitionIndexesCollection
.Select(partitionIndex =>
new Func<OffsetFetchResponse.OffsetFetchResponseTopic.
OffsetFetchResponsePartition,
OffsetFetchResponse.OffsetFetchResponseTopic.
OffsetFetchResponsePartition>(
partition => partition
.WithPartitionIndex(partitionIndex)))
.ToArray())))
.ToArray() ??
Array.Empty<Func<OffsetFetchResponse.OffsetFetchResponseTopic,
OffsetFetchResponse.OffsetFetchResponseTopic>>()));

var records = new Dictionary<long, Record>();
for (var i = 0; i < NumberOfMessage; i++)
{
records.Add(i, new Record
{
OffsetDelta = i,
Value = Encoding.UTF8.GetBytes(
$"data{i} fetched from broker")
});
}

_testServer.On<FetchRequest, FetchResponse>(async (request, cancellationToken) =>
{
var returnsData = false;
var response = request.Respond()
.WithResponsesCollection(
request.TopicsCollection.Select(topic =>
new Func<FetchResponse.FetchableTopicResponse, FetchResponse.FetchableTopicResponse>(
response => response
.WithTopic(topic.Topic)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func<FetchResponse.FetchableTopicResponse.PartitionData,
FetchResponse.FetchableTopicResponse.PartitionData>(data =>
{
var recordBatch = new NullableRecordBatch
{
LastOffsetDelta = (int)partition.FetchOffset,
Magic = 2,
Records = records.TryGetValue(partition.FetchOffset.Value,
out var record)
? new NullableArray<Record>(record)
: NullableArray<Record>.Default
};
returnsData = recordBatch.Records != NullableArray<Record>.Default;
return returnsData ? data.WithRecords(recordBatch) : data;
})).ToArray()
))).ToArray());
if (!returnsData)
{
await Task.Delay(request.MaxWaitMs, cancellationToken)
.ConfigureAwait(false);
}

return response;
});

_testServer.On<OffsetCommitRequest, OffsetCommitResponse>(request => request.Respond()
.WithTopicsCollection(request.TopicsCollection.Select(topic =>
new Func<OffsetCommitResponse.OffsetCommitResponseTopic,
OffsetCommitResponse.OffsetCommitResponseTopic>(responseTopic => responseTopic
.WithName(topic.Name)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func<OffsetCommitResponse.OffsetCommitResponseTopic.
OffsetCommitResponsePartition,
OffsetCommitResponse.OffsetCommitResponseTopic.
OffsetCommitResponsePartition>(responsePartition => responsePartition
.WithPartitionIndex(partition.PartitionIndex))).ToArray())
)).ToArray()
));

_testServer.On<LeaveGroupRequest, LeaveGroupResponse>(request => request.Respond());
_testServer.On<HeartbeatRequest, HeartbeatResponse>(request => request.Respond());

return Task.CompletedTask;
}

protected override async Task WhenAsync()
{
await using (_testServer.Start()
.ConfigureAwait(false))
{
ConsumeMessages("localhost", _testServer.Port, _testServer.Stopping);
}
}

[Fact]
public void It_should_have_read_the_messages_sent()
{
_result.Should().HaveCount(NumberOfMessage);
for (var i = 0; i < NumberOfMessage; i++)
{
_result.Should().Contain($"data{i} fetched from broker");
}
}

private void ConsumeMessages(string host,
int port, CancellationToken testServerStopping)
{
var consumerConfig = new ConsumerConfig(new Dictionary<string, string>
{
{ "log_level", "7" }
})
{
BootstrapServers = $"{host}:{port}",
ApiVersionRequestTimeoutMs = 30000,
Debug = "all",
GroupId = "group1",
};

using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
.SetLogHandler(this.Log)
.Build();

consumer.Subscribe("topic1");
var cancellationToken = CancellationTokenSource
.CreateLinkedTokenSource(testServerStopping, TimeoutCancellationToken).Token;
try
{
for (var i = 0; i < NumberOfMessage; i++)
{
_result.Add(consumer.Consume(cancellationToken).Message.Value);
}
}
finally
{
consumer.Close();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void It_should_have_read_the_message_sent()
_records.First().Value.EncodeToString(Encoding.UTF8).Should().Be("test");
}

private static async Task ProduceMessageFromClientAsync(string host,
private async Task ProduceMessageFromClientAsync(string host,
int port, CancellationToken testServerStopping)
{
var producerConfig = new ProducerConfig(new Dictionary<string, string>
Expand All @@ -129,12 +129,12 @@ private static async Task ProduceMessageFromClientAsync(string host,

using var producer =
new ProducerBuilder<Null, string>(producerConfig)
.SetLogHandler(LogExtensions.UseLogIt)
.SetLogHandler(this.Log)
.Build();

var report = await producer
.ProduceAsync("my-topic",
new Message<Null, string> { Value = "test" })
new Message<Null, string> { Value = "test" }, testServerStopping)
.ConfigureAwait(false);
LogFactory.Create("producer").Info("Produce report {@report}", report);

Expand Down
Loading

0 comments on commit e056789

Please sign in to comment.