Skip to content

Commit

Permalink
fix: respect IgnoreUnhandledException when batch processing (issue #216)
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed Nov 25, 2023
1 parent 1623072 commit 9482fb3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 8 deletions.
16 changes: 10 additions & 6 deletions src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,16 @@ protected virtual async Task<AddToSequenceResult> AddCoreAsync(

_abortCancellationTokenSource.Token.ThrowIfCancellationRequested();

int pushedStreamsCount = await _streamProvider.PushAsync(
envelope,
throwIfUnhandled,
_abortCancellationTokenSource.Token)
.ConfigureAwait(false);

// If no stream was pushed, the message was ignored (throwIfUnhandled must be false)
if (pushedStreamsCount == 0)
return AddToSequenceResult.Success(0);

Length++;

if (TotalLength != null && Length == TotalLength || IsLastMessage(envelope))
Expand All @@ -316,12 +326,6 @@ protected virtual async Task<AddToSequenceResult> AddCoreAsync(
});
}

int pushedStreamsCount = await _streamProvider.PushAsync(
envelope,
throwIfUnhandled,
_abortCancellationTokenSource.Token)
.ConfigureAwait(false);

if (IsCompleting)
await CompleteAsync().ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ public virtual async Task HandleAsync(ConsumerPipelineContext context, ConsumerB
continue;
}

AddToSequenceResult addResult =
await sequence.AddAsync(originalEnvelope, previousSequence).ConfigureAwait(false);
AddToSequenceResult addResult = await sequence.AddAsync(
originalEnvelope,
previousSequence,
context.Envelope.Endpoint.ThrowIfUnhandled).ConfigureAwait(false);

if (!addResult.IsSuccess)
continue;
Expand Down
68 changes: 68 additions & 0 deletions tests/Silverback.Integration.Tests.E2E/Kafka/BatchTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1819,5 +1819,73 @@ public async Task Batch_WithMultiplePartitionsProcessedTogether_SingleStreamIsRe
receivedBatches.Should().HaveCount(1);
receivedBatches.Sum(batch => batch.Count).Should().Be(15);
}

[Fact]
public async Task Batch_IgnoreUnhandledMessages_UnhandledMessageIgnored()
{
var receivedBatches = new List<List<TestEventOne>>();
var completedBatches = 0;

Host.ConfigureServices(
services => services
.AddLogging()
.AddSilverback()
.UseModel()
.WithConnectionToMessageBroker(
options => options.AddMockedKafka(
mockedKafkaOptions =>
mockedKafkaOptions.WithDefaultPartitionsCount(1)))
.AddKafkaEndpoints(
endpoints => endpoints
.Configure(
config =>
{
config.BootstrapServers = "PLAINTEXT://e2e";
})
.AddOutbound<IIntegrationEvent>(endpoint => endpoint.ProduceTo(DefaultTopicName))
.AddInbound(
endpoint => endpoint
.ConsumeFrom(DefaultTopicName)
.Configure(
config =>
{
config.GroupId = "consumer1";
config.EnableAutoCommit = false;
config.CommitOffsetEach = 1;
})
.EnableBatchProcessing(2)
.IgnoreUnhandledMessages()))
.AddDelegateSubscriber(
async (IMessageStreamEnumerable<TestEventOne> eventsStream) =>
{
var list = new List<TestEventOne>();

receivedBatches.ThreadSafeAdd(list);

await foreach (var message in eventsStream)
{
list.Add(message);
}

Interlocked.Increment(ref completedBatches);
}))
.Run();

var publisher = Host.ScopedServiceProvider.GetRequiredService<IEventPublisher>();

await publisher.PublishAsync(new TestEventOne { Content = "Handled message" });
await publisher.PublishAsync(new TestEventTwo { Content = "Unhandled message" });
await publisher.PublishAsync(new TestEventOne { Content = "Handled message" });
await publisher.PublishAsync(new TestEventTwo { Content = "Unhandled message" });
await publisher.PublishAsync(new TestEventOne { Content = "Handled message" });
await publisher.PublishAsync(new TestEventOne { Content = "Handled message" });

await Helper.WaitUntilAllMessagesAreConsumedAsync();

receivedBatches.Should().HaveCount(2);
receivedBatches[0].Should().HaveCount(2);
receivedBatches[1].Should().HaveCount(2);
completedBatches.Should().Be(2);
}
}
}

0 comments on commit 9482fb3

Please sign in to comment.