From 9482fb3789fde779feb45f4796fd016243f10091 Mon Sep 17 00:00:00 2001 From: BEagle1984 Date: Sat, 25 Nov 2023 18:35:46 +0100 Subject: [PATCH] fix: respect IgnoreUnhandledException when batch processing (issue #216) --- .../Messaging/Sequences/SequenceBase`1.cs | 16 +++-- .../SequencerConsumerBehaviorBase.cs | 6 +- .../Kafka/BatchTests.cs | 68 +++++++++++++++++++ 3 files changed, 82 insertions(+), 8 deletions(-) diff --git a/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs b/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs index 69b58810d..ff531d282 100644 --- a/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs +++ b/src/Silverback.Integration/Messaging/Sequences/SequenceBase`1.cs @@ -299,6 +299,16 @@ protected virtual async Task AddCoreAsync( _abortCancellationTokenSource.Token.ThrowIfCancellationRequested(); + int pushedStreamsCount = await _streamProvider.PushAsync( + envelope, + throwIfUnhandled, + _abortCancellationTokenSource.Token) + .ConfigureAwait(false); + + // If no stream was pushed, the message was ignored (throwIfUnhandled must be false) + if (pushedStreamsCount == 0) + return AddToSequenceResult.Success(0); + Length++; if (TotalLength != null && Length == TotalLength || IsLastMessage(envelope)) @@ -316,12 +326,6 @@ protected virtual async Task AddCoreAsync( }); } - int pushedStreamsCount = await _streamProvider.PushAsync( - envelope, - throwIfUnhandled, - _abortCancellationTokenSource.Token) - .ConfigureAwait(false); - if (IsCompleting) await CompleteAsync().ConfigureAwait(false); diff --git a/src/Silverback.Integration/Messaging/Sequences/SequencerConsumerBehaviorBase.cs b/src/Silverback.Integration/Messaging/Sequences/SequencerConsumerBehaviorBase.cs index d6cb65173..bb04fbcbe 100644 --- a/src/Silverback.Integration/Messaging/Sequences/SequencerConsumerBehaviorBase.cs +++ b/src/Silverback.Integration/Messaging/Sequences/SequencerConsumerBehaviorBase.cs @@ -98,8 +98,10 @@ public virtual async Task HandleAsync(ConsumerPipelineContext context, ConsumerB continue; } - AddToSequenceResult addResult = - await sequence.AddAsync(originalEnvelope, previousSequence).ConfigureAwait(false); + AddToSequenceResult addResult = await sequence.AddAsync( + originalEnvelope, + previousSequence, + context.Envelope.Endpoint.ThrowIfUnhandled).ConfigureAwait(false); if (!addResult.IsSuccess) continue; diff --git a/tests/Silverback.Integration.Tests.E2E/Kafka/BatchTests.cs b/tests/Silverback.Integration.Tests.E2E/Kafka/BatchTests.cs index a04a82597..9242efc6b 100644 --- a/tests/Silverback.Integration.Tests.E2E/Kafka/BatchTests.cs +++ b/tests/Silverback.Integration.Tests.E2E/Kafka/BatchTests.cs @@ -1819,5 +1819,73 @@ public async Task Batch_WithMultiplePartitionsProcessedTogether_SingleStreamIsRe receivedBatches.Should().HaveCount(1); receivedBatches.Sum(batch => batch.Count).Should().Be(15); } + + [Fact] + public async Task Batch_IgnoreUnhandledMessages_UnhandledMessageIgnored() + { + var receivedBatches = new List>(); + var completedBatches = 0; + + Host.ConfigureServices( + services => services + .AddLogging() + .AddSilverback() + .UseModel() + .WithConnectionToMessageBroker( + options => options.AddMockedKafka( + mockedKafkaOptions => + mockedKafkaOptions.WithDefaultPartitionsCount(1))) + .AddKafkaEndpoints( + endpoints => endpoints + .Configure( + config => + { + config.BootstrapServers = "PLAINTEXT://e2e"; + }) + .AddOutbound(endpoint => endpoint.ProduceTo(DefaultTopicName)) + .AddInbound( + endpoint => endpoint + .ConsumeFrom(DefaultTopicName) + .Configure( + config => + { + config.GroupId = "consumer1"; + config.EnableAutoCommit = false; + config.CommitOffsetEach = 1; + }) + .EnableBatchProcessing(2) + .IgnoreUnhandledMessages())) + .AddDelegateSubscriber( + async (IMessageStreamEnumerable eventsStream) => + { + var list = new List(); + + receivedBatches.ThreadSafeAdd(list); + + await foreach (var message in eventsStream) + { + list.Add(message); + } + + Interlocked.Increment(ref completedBatches); + })) + .Run(); + + var publisher = Host.ScopedServiceProvider.GetRequiredService(); + + await publisher.PublishAsync(new TestEventOne { Content = "Handled message" }); + await publisher.PublishAsync(new TestEventTwo { Content = "Unhandled message" }); + await publisher.PublishAsync(new TestEventOne { Content = "Handled message" }); + await publisher.PublishAsync(new TestEventTwo { Content = "Unhandled message" }); + await publisher.PublishAsync(new TestEventOne { Content = "Handled message" }); + await publisher.PublishAsync(new TestEventOne { Content = "Handled message" }); + + await Helper.WaitUntilAllMessagesAreConsumedAsync(); + + receivedBatches.Should().HaveCount(2); + receivedBatches[0].Should().HaveCount(2); + receivedBatches[1].Should().HaveCount(2); + completedBatches.Should().Be(2); + } } }