From 37f06aea1c832b35926d62082a38febed4aa1c4c Mon Sep 17 00:00:00 2001 From: BEagle1984 Date: Thu, 18 May 2023 21:28:34 +0200 Subject: [PATCH] fix: prevent deadlock in OutboxWorker --- Directory.Build.props | 2 +- docs/releases.md | 6 +++ .../TransactionalOutbox/OutboxWorker.cs | 28 +++++++--- .../Silverback.Storage.Memory.csproj | 34 +++++++++++++ .../Silverback.Storage.Sqlite.csproj | 36 +++++++++++++ .../TransactionalOutbox/OutboxWorkerTests.cs | 51 +++++++++++++++++++ .../TestTypes/TestBroker.cs | 2 + .../TestTypes/TestProducer.cs | 19 +++++-- 8 files changed, 165 insertions(+), 13 deletions(-) create mode 100644 src/Silverback.Storage.Memory/Silverback.Storage.Memory.csproj create mode 100644 src/Silverback.Storage.Sqlite/Silverback.Storage.Sqlite.csproj diff --git a/Directory.Build.props b/Directory.Build.props index 92fc695b8..d2701737e 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,7 +1,7 @@ - 4.3.0$(BaseVersionSuffix) + 4.3.1$(BaseVersionSuffix) 1 $(BaseVersionSuffix) diff --git a/docs/releases.md b/docs/releases.md index 41213a73a..8fd76d87d 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -4,6 +4,12 @@ uid: releases # Releases +## [4.3.1](https://github.com/BEagle1984/silverback/releases/tag/v4.3.1) + +### Fixes + +* Fix deadlock in `OutboxWorker` when `enforceMessageOrder=true` (default) + ## [4.3.0](https://github.com/BEagle1984/silverback/releases/tag/v4.3.0) ### What's new diff --git a/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs b/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs index 604fea418..4bf9a58d1 100644 --- a/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs +++ b/src/Silverback.Integration/Messaging/Outbound/TransactionalOutbox/OutboxWorker.cs @@ -165,15 +165,28 @@ private async Task ProcessQueueAsync( { _logger.LogProcessingOutboxStoredMessage(i + 1, outboxMessages.Count); - await ProcessMessageAsync( - outboxMessages[i], - failedMessages, - outboxReader, - serviceProvider) - .ConfigureAwait(false); + try + { + await ProcessMessageAsync( + outboxMessages[i], + failedMessages, + outboxReader, + serviceProvider) + .ConfigureAwait(false); + } + catch (Exception) + { + // Subtract the produce operations that will never be initiated + Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1)); + throw; + } if (stoppingToken.IsCancellationRequested) + { + // Subtract the produce operations that will never be initiated + Interlocked.Add(ref _pendingProduceOperations, -(outboxMessages.Count - i - 1)); break; + } } } finally @@ -247,8 +260,7 @@ private IProducerEndpoint GetTargetEndpoint( var targetEndpoint = outboundRoutes .SelectMany(route => route.GetOutboundRouter(serviceProvider).Endpoints) - .FirstOrDefault( - endpoint => endpoint.Name == endpointName || endpoint.DisplayName == endpointName); + .FirstOrDefault(endpoint => endpoint.Name == endpointName || endpoint.DisplayName == endpointName); if (targetEndpoint == null) { diff --git a/src/Silverback.Storage.Memory/Silverback.Storage.Memory.csproj b/src/Silverback.Storage.Memory/Silverback.Storage.Memory.csproj new file mode 100644 index 000000000..cdb9505d4 --- /dev/null +++ b/src/Silverback.Storage.Memory/Silverback.Storage.Memory.csproj @@ -0,0 +1,34 @@ + + + + $(DefaultTargetFrameworks) + $(RootNamespace) + $(BaseVersion) + $(Authors) + $(Company) + $(License) + $(Copyright) + $(ProjectUrl) + $(RepositoryUrl) + $(RepositoryType) + ${GeneratePackageOnBuild} + $(Description) This package contains an implementation of Silverback.Storage that stores the data in memory. + $(IconUrl) + $(Tags) + $(LangVersion) + true + Silverback.Storage.Memory + Silverback.Storage.Memory + + + + + + + + + + + + + diff --git a/src/Silverback.Storage.Sqlite/Silverback.Storage.Sqlite.csproj b/src/Silverback.Storage.Sqlite/Silverback.Storage.Sqlite.csproj new file mode 100644 index 000000000..dbc116951 --- /dev/null +++ b/src/Silverback.Storage.Sqlite/Silverback.Storage.Sqlite.csproj @@ -0,0 +1,36 @@ + + + + $(DefaultTargetFrameworks) + $(RootNamespace) + $(BaseVersion) + $(Authors) + $(Company) + $(License) + $(Copyright) + $(ProjectUrl) + $(RepositoryUrl) + $(RepositoryType) + ${GeneratePackageOnBuild} + $(Description) This package contains an implementation of Silverback.Storage that stores the data in Sqlite. + $(IconUrl) + $(Tags) + $(LangVersion) + true + Silverback.Storage.Sqlite + Silverback.Storage.Sqlite + + + + + + + + + + + + + + + diff --git a/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs b/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs index d5e388704..a4058a125 100644 --- a/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs +++ b/tests/Silverback.Integration.Tests/Messaging/Outbound/TransactionalOutbox/OutboxWorkerTests.cs @@ -167,5 +167,56 @@ await _outboxWriter.WriteAsync( _broker.ProducedMessages.Should().HaveCount(3); } + + [Fact] + public async Task ProcessQueue_ProduceError_Retried() + { + await _outboxWriter.WriteAsync( + new TestEventOne { Content = "Test" }, + null, + null, + "topic1", + "topic1"); + await _outboxWriter.WriteAsync( + new TestEventTwo { Content = "Test" }, + null, + null, + "topic2", + "topic2"); + await _outboxWriter.WriteAsync( + new TestEventOne { Content = "Test" }, + null, + null, + "topic1", + "topic1"); + await _outboxWriter.WriteAsync( + new TestEventOne { Content = "Test" }, + null, + null, + "topic1", + "topic1"); + await _outboxWriter.CommitAsync(); + + _broker.FailProduceNumber = new[] { 2, 3 }; // Note: counter is per producer / topic + + await _worker.ProcessQueueAsync(CancellationToken.None); + + _broker.ProducedMessages.Should().HaveCount(2); + _broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1"); + _broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2"); + + await _worker.ProcessQueueAsync(CancellationToken.None); + + _broker.ProducedMessages.Should().HaveCount(2); + _broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1"); + _broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2"); + + await _worker.ProcessQueueAsync(CancellationToken.None); + + _broker.ProducedMessages.Should().HaveCount(3); + _broker.ProducedMessages[0].Endpoint.Name.Should().Be("topic1"); + _broker.ProducedMessages[1].Endpoint.Name.Should().Be("topic2"); + _broker.ProducedMessages[2].Endpoint.Name.Should().Be("topic1"); + } } } diff --git a/tests/Silverback.Integration.Tests/TestTypes/TestBroker.cs b/tests/Silverback.Integration.Tests/TestTypes/TestBroker.cs index d2e4418d8..22b991131 100644 --- a/tests/Silverback.Integration.Tests/TestTypes/TestBroker.cs +++ b/tests/Silverback.Integration.Tests/TestTypes/TestBroker.cs @@ -22,6 +22,8 @@ public TestBroker(IServiceProvider serviceProvider) public bool SimulateConnectIssues { get; set; } + public IReadOnlyCollection? FailProduceNumber { get; set; } + protected override Task ConnectAsync( IReadOnlyCollection producers, IReadOnlyCollection consumers) diff --git a/tests/Silverback.Integration.Tests/TestTypes/TestProducer.cs b/tests/Silverback.Integration.Tests/TestTypes/TestProducer.cs index 80842487d..ebd308acd 100644 --- a/tests/Silverback.Integration.Tests/TestTypes/TestProducer.cs +++ b/tests/Silverback.Integration.Tests/TestTypes/TestProducer.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Threading.Tasks; using NSubstitute; using Silverback.Diagnostics; @@ -17,6 +18,8 @@ namespace Silverback.Tests.Integration.TestTypes { public class TestProducer : Producer { + private int _produceCount; + public TestProducer( TestBroker broker, TestProducerEndpoint endpoint, @@ -51,7 +54,7 @@ public TestProducer( IReadOnlyCollection? headers, string actualEndpointName) { - ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint)); + PerformMockProduce(messageBytes, headers); return null; } @@ -80,7 +83,7 @@ protected override void ProduceCore( Action onSuccess, Action onError) { - ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint)); + PerformMockProduce(messageBytes, headers); onSuccess.Invoke(null); } @@ -101,7 +104,7 @@ await messageStream.ReadAllAsync().ConfigureAwait(false), IReadOnlyCollection? headers, string actualEndpointName) { - ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint)); + PerformMockProduce(messageBytes, headers); return Task.FromResult(null); } @@ -128,9 +131,17 @@ protected override Task ProduceCoreAsync( Action onSuccess, Action onError) { - ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint)); + PerformMockProduce(messageBytes, headers); onSuccess.Invoke(null); return Task.CompletedTask; } + + private void PerformMockProduce(byte[]? messageBytes, IReadOnlyCollection? headers) + { + if (Broker.FailProduceNumber != null && Broker.FailProduceNumber.Contains(++_produceCount)) + throw new InvalidOperationException("Produce failed (mock)."); + + ProducedMessages.Add(new ProducedMessage(messageBytes, headers, Endpoint)); + } } }