Skip to content

Commit

Permalink
Add generic message pump and let StorageQueueMessagePump use it (#112)
Browse files Browse the repository at this point in the history
* Add generic message pump and let StorageQueueMessagePump use it
* Adjust for locktime elapsed
* wait for threads to become available instead of polling
* fill all available threads and add prefetch as a buffer
* Add MaxFetch to respect limits for transports
* Let redis use the GenericMessagePump
* Obsolete the SemaphoreQueue
  • Loading branch information
NiklasArbin authored Apr 17, 2024
1 parent 688061a commit 2a21ef2
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 281 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/BookBeat/knightbus/master/documentation/media/images/knighbus-64.png</PackageIconUrl>
<PackageIcon>knighbus-64.png</PackageIcon>
<RepositoryUrl>https://github.com/BookBeat/knightbus</RepositoryUrl>
<Version>16.0.1</Version>
<Version>16.1.0</Version>
<PackageTags>knightbus;azure storage;blob;queues;messaging</PackageTags>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
using System;
using System.Linq;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Azure;
using KnightBus.Azure.Storage.Messages;
Expand All @@ -10,118 +9,40 @@

namespace KnightBus.Azure.Storage;

internal class StorageQueueMessagePump
internal class StorageQueueMessagePump : GenericMessagePump<StorageQueueMessage, IStorageQueueCommand>
{
private readonly IStorageQueueClient _storageQueueClient;
private readonly IProcessingSettings _settings;
private readonly ILogger _log;
private readonly TimeSpan _pollingInterval = TimeSpan.FromMilliseconds(5000);
internal readonly SemaphoreSlim _maxConcurrent;
private Task _runningTask;

public StorageQueueMessagePump(IStorageQueueClient storageQueueClient, IProcessingSettings settings, ILogger log)
public StorageQueueMessagePump(IStorageQueueClient storageQueueClient, IProcessingSettings settings, ILogger log) : base(settings, log)
{
_storageQueueClient = storageQueueClient;
_settings = settings;
_log = log;
_maxConcurrent = new SemaphoreSlim(_settings.MaxConcurrentCalls);
}

public Task StartAsync<T>(Func<StorageQueueMessage, CancellationToken, Task> action, CancellationToken cancellationToken) where T : IStorageQueueCommand

protected override async IAsyncEnumerable<StorageQueueMessage> GetMessagesAsync<TMessage>(int count, TimeSpan? lockDuration)
{
_runningTask = Task.Run(async () =>
var messages = await _storageQueueClient.GetMessagesAsync<TMessage>(count, lockDuration);
foreach (var message in messages)
{
while (!cancellationToken.IsCancellationRequested)
{
await PumpAsync<T>(action, cancellationToken).ConfigureAwait(false);
}
});
return Task.CompletedTask;
yield return message;
}
}

internal async Task PumpAsync<T>(Func<StorageQueueMessage, CancellationToken, Task> action, CancellationToken cancellationToken) where T : IStorageQueueCommand
protected override async Task CreateChannel(Type messageType)
{
var messagesFound = false;
try
{
//Do not fetch and lock messages if we won't be able to process them
if (_maxConcurrent.CurrentCount == 0) return;

var queueName = AutoMessageMapper.GetQueueName<T>();

var prefetchCount = _settings.PrefetchCount > 0 ? _settings.PrefetchCount : 1;

TimeSpan visibilityTimeout;
if (_settings is IExtendMessageLockTimeout extendMessageLockTimeout)
{
visibilityTimeout = extendMessageLockTimeout.ExtensionDuration;
}
else
{
visibilityTimeout = _settings.MessageLockTimeout;
}

//Make sure the lock still exist when the process is cancelled by token, otherwise the message cannot be abandoned
visibilityTimeout += TimeSpan.FromMinutes(2);

var messages = await _storageQueueClient.GetMessagesAsync<T>(prefetchCount, visibilityTimeout)
.ConfigureAwait(false);
messagesFound = messages.Any();

_log.LogDebug("Prefetched {MessageCount} messages from {QueueName} in {Name}", messages.Count, queueName,
nameof(StorageQueueMessagePump));

foreach (var message in messages)
{
var timeoutToken = new CancellationTokenSource(_settings.MessageLockTimeout);
var linkedToken =
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token);
try
{
_log.LogDebug("Processing {@Message} in {Name}", message, nameof(StorageQueueMessagePump));
_log.LogDebug("{ThreadCount} remaining threads that can process messages in {QueueName} in {Name}",
_maxConcurrent.CurrentCount, queueName, nameof(StorageQueueMessagePump));

await _maxConcurrent.WaitAsync(timeoutToken.Token).ConfigureAwait(false);
}
catch (OperationCanceledException operationCanceledException)
{
_log.LogDebug(operationCanceledException,
"Operation canceled for {@Message} in {QueueName} in {Name}", message, queueName,
nameof(StorageQueueMessagePump));
await _storageQueueClient.CreateIfNotExistsAsync().ConfigureAwait(false);
}

//If we are still waiting when the message has not been scheduled for execution timeouts
protected override bool ShouldCreateChannel(Exception e)
{
return e is RequestFailedException { Status: (int)HttpStatusCode.NotFound };
}

continue;
}
#pragma warning disable 4014 //No need to await the result, let's keep the pump going
Task.Run(async () => await action.Invoke(message, linkedToken.Token).ConfigureAwait(false),
timeoutToken.Token)
.ContinueWith(task =>
{
_maxConcurrent.Release();
timeoutToken.Dispose();
linkedToken.Dispose();
}).ConfigureAwait(false);
#pragma warning restore 4014
}
}
catch (RequestFailedException e) when (e.Status is (int)HttpStatusCode.NotFound)
{
_log.LogInformation($"{typeof(T).Name} not found. Creating.");
await _storageQueueClient.CreateIfNotExistsAsync().ConfigureAwait(false);
}
catch (Exception e)
{
_log.LogError(e, "StorageQueueMessagePump error in {MessageType}", typeof(T));
}
finally
{
if (!messagesFound)
{
//Only delay pump if no messages were found
await Task.Delay(_pollingInterval).ConfigureAwait(false);
}
}
protected override Task CleanupResources()
{
return Task.CompletedTask;
}

protected override TimeSpan PollingDelay => TimeSpan.FromSeconds(5);
protected override int MaxFetch => 32;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public async Task Should_prefetch_messages()
messages.Add(new StorageQueueMessage(new LongRunningTestCommand { Message = i.ToString() }));
}

_clientMock.Setup(x => x.GetMessagesAsync<LongRunningTestCommand>(10, It.IsAny<TimeSpan?>()))
_clientMock.Setup(x => x.GetMessagesAsync<LongRunningTestCommand>(11, It.IsAny<TimeSpan?>()))
.ReturnsAsync(messages);
var pump = new StorageQueueMessagePump(_clientMock.Object, settings, Mock.Of<ILogger>());
var invocations = 0;
Expand All @@ -59,7 +59,7 @@ public async Task Should_not_fetch_more_messages_when_max_concurrent_is_reached(
PrefetchCount = 0,
MaxConcurrentCalls = 1
};
var messages = new List<StorageQueueMessage> { new StorageQueueMessage(new LongRunningTestCommand { Message = 1.ToString() }) };
var messages = new List<StorageQueueMessage> { new(new LongRunningTestCommand { Message = 1.ToString() }) };


_clientMock.Setup(x => x.GetMessagesAsync<LongRunningTestCommand>(1, It.IsAny<TimeSpan?>()))
Expand All @@ -75,10 +75,11 @@ async Task Function(StorageQueueMessage a, CancellationToken b)

//act
await pump.PumpAsync<LongRunningTestCommand>(Function, CancellationToken.None);
await pump.PumpAsync<LongRunningTestCommand>(Function, CancellationToken.None);
var secondTask = pump.PumpAsync<LongRunningTestCommand>(Function, CancellationToken.None);
await Task.Delay(100);
//assert
invocations.Should().Be(1, "Max concurrent = 1, and Prefetch = 0");
await secondTask;
}

[Test]
Expand Down Expand Up @@ -109,7 +110,7 @@ public async Task Should_release_semaphore_if_exception()
await pump.PumpAsync<LongRunningTestCommand>(function, CancellationToken.None);
await Task.Delay(100);
//assert
pump._maxConcurrent.CurrentCount.Should().Be(1);
pump.AvailableThreads.Should().Be(1);

}
[Test]
Expand Down Expand Up @@ -154,7 +155,7 @@ public async Task Should_not_exceed_max_concurrent_when_prefetch_is_high()
messages.Add(new StorageQueueMessage(new LongRunningTestCommand { Message = i.ToString() }));
}

_clientMock.Setup(x => x.GetMessagesAsync<LongRunningTestCommand>(20, It.IsAny<TimeSpan?>()))
_clientMock.Setup(x => x.GetMessagesAsync<LongRunningTestCommand>(30, It.IsAny<TimeSpan?>()))
.ReturnsAsync(messages);
var pump = new StorageQueueMessagePump(_clientMock.Object, settings, Mock.Of<ILogger>());
var invocations = 0;
Expand All @@ -179,7 +180,7 @@ public async Task Should_not_release_semaphore_until_task_is_completed()
var settings = new TestMessageSettings
{
DeadLetterDeliveryLimit = 1,
PrefetchCount = 1,
PrefetchCount = 0,
MaxConcurrentCalls = 1
};
var messageCount = 1;
Expand All @@ -202,7 +203,7 @@ Task Function(StorageQueueMessage a, CancellationToken b)
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
await Task.Delay(100);
//assert
pump._maxConcurrent.CurrentCount.Should().Be(0);
pump.AvailableThreads.Should().Be(0);
}

[Test]
Expand Down Expand Up @@ -238,7 +239,7 @@ async Task Function(StorageQueueMessage a, CancellationToken b)
#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed
await Task.Delay(150);
//assert
pump._maxConcurrent.CurrentCount.Should().Be(1);
pump.AvailableThreads.Should().Be(1);
countable.Verify(x => x.Count(), Times.Never);
}

Expand Down
2 changes: 1 addition & 1 deletion knightbus-redis/src/KnightBus.Redis/KnightBus.Redis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/BookBeat/knightbus/master/documentation/media/images/knighbus-64.png</PackageIconUrl>
<PackageIcon>knighbus-64.png</PackageIcon>
<RepositoryUrl>https://github.com/BookBeat/knightbus</RepositoryUrl>
<Version>13.0.0</Version>
<Version>13.1.0</Version>
<PackageTags>knightbus;redis;queues;messaging</PackageTags>
<LangVersion>latest</LangVersion>
</PropertyGroup>
Expand Down
103 changes: 12 additions & 91 deletions knightbus-redis/src/KnightBus.Redis/RedisChannelReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
using System;
using System.Threading;
using System.Threading;
using System.Threading.Tasks;
using KnightBus.Core;
using KnightBus.Messages;
using KnightBus.Redis.Messages;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;

namespace KnightBus.Redis;
Expand All @@ -14,110 +11,34 @@ internal abstract class RedisChannelReceiver<T> : IChannelReceiver
{
private readonly RedisConfiguration _redisConfiguration;
private readonly IHostConfiguration _hostConfiguration;
private readonly SemaphoreQueue _maxConcurrent;
private readonly IMessageProcessor _processor;
private readonly string _queueName;
protected readonly IConnectionMultiplexer ConnectionMultiplexer;
private readonly IProcessingSettings _settings;
private readonly string _queueName;
private readonly IMessageSerializer _serializer;
private CancellationTokenSource _pumpDelayCancellationTokenSource = new CancellationTokenSource();
private Task _messagePumpTask;
private Task _lostMessageTask;
private LostMessageBackgroundService<T> _lostMessageService;
private RedisQueueClient<T> _queueClient;


protected RedisChannelReceiver(IConnectionMultiplexer connectionMultiplexer, string queueName, IProcessingSettings settings, IMessageSerializer serializer, RedisConfiguration redisConfiguration, IHostConfiguration hostConfiguration, IMessageProcessor processor)
{
ConnectionMultiplexer = connectionMultiplexer;
_settings = settings;
_queueName = queueName;
_serializer = serializer;
Settings = settings;
_redisConfiguration = redisConfiguration;
_hostConfiguration = hostConfiguration;
_processor = processor;
_queueName = queueName;
_maxConcurrent = new SemaphoreQueue(settings.MaxConcurrentCalls);
}

public virtual async Task StartAsync(CancellationToken cancellationToken)
{
_queueClient = new RedisQueueClient<T>(ConnectionMultiplexer.GetDatabase(_redisConfiguration.DatabaseId), AutoMessageMapper.GetQueueName<T>(), _serializer, _hostConfiguration.Log);
var sub = ConnectionMultiplexer.GetSubscriber();
await sub.SubscribeAsync(new RedisChannel(_queueName, RedisChannel.PatternMode.Literal), MessageSignalReceivedHandler);

_messagePumpTask = Task.Factory.StartNew(async () =>
{
while (!cancellationToken.IsCancellationRequested)
if (!await PumpAsync(cancellationToken).ConfigureAwait(false))
await Delay(_pumpDelayCancellationTokenSource.Token).ConfigureAwait(false);
});
_lostMessageService = new LostMessageBackgroundService<T>(ConnectionMultiplexer, _redisConfiguration.DatabaseId, _serializer, _hostConfiguration.Log, _settings.MessageLockTimeout, _queueName);
_lostMessageTask = _lostMessageService.Start(cancellationToken);
}

public IProcessingSettings Settings { get; set; }

private async Task Delay(CancellationToken cancellationToken)
{
try
{
await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
//reset the delay
_pumpDelayCancellationTokenSource = new CancellationTokenSource();
}
}

private void MessageSignalReceivedHandler(RedisChannel channel, RedisValue redisValue)
private async Task ProcessMessageAsync(RedisMessage<T> redisMessage, CancellationToken cancellationToken)
{
//Cancel the pumps delay
_pumpDelayCancellationTokenSource.Cancel();
var stateHandler = new RedisMessageStateHandler<T>(ConnectionMultiplexer, _redisConfiguration, redisMessage, Settings.DeadLetterDeliveryLimit, _hostConfiguration.DependencyInjection, _hostConfiguration.Log);
await _processor.ProcessAsync(stateHandler, cancellationToken).ConfigureAwait(false);
}

private async Task<bool> PumpAsync(CancellationToken cancellationToken)
public virtual async Task StartAsync(CancellationToken cancellationToken)
{
try
{
var prefetchCount = _settings.PrefetchCount > 0 ? _settings.PrefetchCount : 1;
var messages = await _queueClient.GetMessagesAsync(prefetchCount).ConfigureAwait(false);
if (messages.Length == 0) return false;

foreach (var redisMessage in messages)
{
if (redisMessage != null)
{
await _maxConcurrent.WaitAsync(cancellationToken).ConfigureAwait(false);
var timeoutToken = new CancellationTokenSource(_settings.MessageLockTimeout);
var linkedToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutToken.Token);
#pragma warning disable 4014
Task.Run(async () => await ProcessMessageAsync(redisMessage, linkedToken.Token).ConfigureAwait(false), timeoutToken.Token)
.ContinueWith(task2 =>
{
_maxConcurrent.Release();
timeoutToken.Dispose();
linkedToken.Dispose();
}).ConfigureAwait(false);
#pragma warning restore 4014
}
else
{
return false;
}
}

return true;
}
catch (Exception e)
{
_hostConfiguration.Log.LogError(e, "Redis message pump error");
return false;
}
var messagePump = new RedisMessagePump<T>(ConnectionMultiplexer, _queueName, _serializer, _redisConfiguration, Settings, _hostConfiguration.Log);
await messagePump.StartAsync<T>(ProcessMessageAsync, cancellationToken);
}

private async Task ProcessMessageAsync(RedisMessage<T> redisMessage, CancellationToken cancellationToken)
{
var stateHandler = new RedisMessageStateHandler<T>(ConnectionMultiplexer, _redisConfiguration, redisMessage, _settings.DeadLetterDeliveryLimit, _hostConfiguration.DependencyInjection, _hostConfiguration.Log);
await _processor.ProcessAsync(stateHandler, cancellationToken).ConfigureAwait(false);
}
public IProcessingSettings Settings { get; set; }
}
Loading

0 comments on commit 2a21ef2

Please sign in to comment.