Skip to content

Commit

Permalink
Add shared and redis tests for MessageStateHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasarbin committed Apr 12, 2024
1 parent f1ec844 commit b4f2ce1
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using KnightBus.Core;
using KnightBus.Core.PreProcessors;
using KnightBus.Redis.Management;
using KnightBus.Shared.Tests.Integration;
using Microsoft.Extensions.Logging;
using Moq;
using NUnit.Framework;

namespace KnightBus.Redis.Tests.Integration;

[TestFixture]
public class RedisMessageStateHandlerTests : MessageStateHandlerTests<TestCommand>
{
private RedisBus _bus;

public override async Task Setup()
{
_bus = new RedisBus(RedisTestBase.Configuration.ConnectionString, Array.Empty<IMessagePreProcessor>());
var logger = new Mock<ILogger<RedisManagementClient>>();
var managementClient = new RedisManagementClient(RedisTestBase.Configuration, logger.Object);
var qm = new RedisQueueManager(managementClient, RedisTestBase.Configuration);
_bus = new RedisBus(RedisTestBase.Configuration.ConnectionString, Array.Empty<IMessagePreProcessor>());
await qm.Delete(AutoMessageMapper.GetQueueName<TestCommand>(), CancellationToken.None);
}

protected override async Task<List<TestCommand>> GetMessages(int count)
{
var queueName = AutoMessageMapper.GetQueueName<TestCommand>();
var q = new RedisQueueClient<TestCommand>(RedisTestBase.Database, queueName, RedisTestBase.Configuration.MessageSerializer, Mock.Of<ILogger>());
var m = await q.PeekMessagesAsync(count).ToListAsync();
return m.Select(x => x.Message).ToList();
}

protected override async Task<List<TestCommand>> GetDeadLetterMessages(int count)
{
var queueName = AutoMessageMapper.GetQueueName<TestCommand>();
var q = new RedisQueueClient<TestCommand>(RedisTestBase.Database, queueName, RedisTestBase.Configuration.MessageSerializer, Mock.Of<ILogger>());
var m = await q.PeekDeadlettersAsync(count).ToListAsync();
return m.Select(x => x.Message.Body).ToList();
}

protected override async Task<string> SendMessage(string message)
{
await _bus.SendAsync(new TestCommand(message));
return AutoMessageMapper.GetQueueName<TestCommand>();
}

protected override async Task<IMessageStateHandler<TestCommand>> GetMessageStateHandler()
{
var queueName = AutoMessageMapper.GetQueueName<TestCommand>();
var q = new RedisQueueClient<TestCommand>(RedisTestBase.Database, queueName, RedisTestBase.Configuration.MessageSerializer, Mock.Of<ILogger>());
var m = await q.GetMessagesAsync(1);
return new RedisMessageStateHandler<TestCommand>(RedisTestBase.Multiplexer, RedisTestBase.Configuration, m.First(), 5, null, Mock.Of<ILogger>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@
using System.Threading;
using System.Threading.Tasks;
using KnightBus.Core;
using KnightBus.Core.DependencyInjection;
using KnightBus.Core.Management;
using KnightBus.Core.PreProcessors;
using KnightBus.Redis.Management;
using KnightBus.Redis.Messages;
using KnightBus.Shared.Tests.Integration;
using Microsoft.Extensions.Logging;
using Moq;
using NUnit.Framework;
using static KnightBus.Redis.Tests.Integration.RedisTestBase;

namespace KnightBus.Redis.Tests.Integration;



[TestFixture]
public class RedisQueueManagerTests : QueueManagerTests<TestCommand>
{
Expand All @@ -25,12 +22,12 @@ public class RedisQueueManagerTests : QueueManagerTests<TestCommand>
public override async Task Setup()
{
var logger = new Mock<ILogger<RedisManagementClient>>();
var managementClient = new RedisManagementClient(RedisTestBase.Configuration, logger.Object);
QueueManager = new RedisQueueManager(managementClient, RedisTestBase.Configuration);
var managementClient = new RedisManagementClient(Configuration, logger.Object);
QueueManager = new RedisQueueManager(managementClient, Configuration);
QueueType = QueueType.Queue;
_bus = new RedisBus(RedisTestBase.Configuration.ConnectionString, Array.Empty<IMessagePreProcessor>());
_bus = new RedisBus(Configuration.ConnectionString, Array.Empty<IMessagePreProcessor>());
var queues = await QueueManager.List(CancellationToken.None);
await QueueManager.Delete("test", CancellationToken.None);
await QueueManager.Delete(AutoMessageMapper.GetQueueName<TestCommand>(), CancellationToken.None);
foreach (var queue in queues)
{
await QueueManager.Delete(queue.Name, CancellationToken.None);
Expand All @@ -40,7 +37,7 @@ public override async Task Setup()
public override async Task<string> CreateQueue()
{
var queueName = Guid.NewGuid().ToString("N");
await RedisTestBase.Database.SetAddAsync(RedisQueueConventions.QueueListKey, queueName).ConfigureAwait(false);
await Database.SetAddAsync(RedisQueueConventions.QueueListKey, queueName).ConfigureAwait(false);
return queueName;
}

Expand All @@ -52,8 +49,8 @@ public override async Task<string> SendMessage(string message)

public override async Task<IMessageStateHandler<TestCommand>> GetMessageStateHandler(string queueName)
{
var q = new RedisQueueClient<TestCommand>(RedisTestBase.Database, queueName, RedisTestBase.Configuration.MessageSerializer, Mock.Of<ILogger>());
var q = new RedisQueueClient<TestCommand>(Database, queueName, Configuration.MessageSerializer, Mock.Of<ILogger>());
var m = await q.GetMessagesAsync(1);
return new RedisMessageStateHandler<TestCommand>(RedisTestBase.Multiplexer, RedisTestBase.Configuration, m.First(), 5, null, Mock.Of<ILogger>());
return new RedisMessageStateHandler<TestCommand>(Multiplexer, Configuration, m.First(), 5, null, Mock.Of<ILogger>());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FluentAssertions;
using KnightBus.Core;
using KnightBus.Messages;
using NUnit.Framework;

namespace KnightBus.Shared.Tests.Integration;

[TestFixture]
public abstract class MessageStateHandlerTests<TCommand> where TCommand : class, IMessage
{
[SetUp]
public abstract Task Setup();
protected abstract Task<List<TCommand>> GetMessages(int count);
protected abstract Task<List<TCommand>> GetDeadLetterMessages(int count);
protected abstract Task SendMessage(string message);
protected abstract Task<IMessageStateHandler<TCommand>> GetMessageStateHandler();

[Test]
public async Task Should_complete_the_message_and_remove_it_from_the_queue()
{
//arrange
await SendMessage("Testing Complete");
var stateHandler = await GetMessageStateHandler();

//act
await stateHandler.CompleteAsync();

//assert
var messages = await GetMessages(1);
messages.Should().BeEmpty();
}

[Test]
public async Task Should_abandon_the_message_and_return_it_to_the_queue()
{
//arrange
await SendMessage("Testing Abandon");
var stateHandler = await GetMessageStateHandler();

//act
await stateHandler.AbandonByErrorAsync(new Exception());

//assert
var messages = await GetMessages(10);
messages.Should().HaveCount(1);
}
[Test]
public async Task Should_dead_letter_the_message_and_move_it_to_the_dl_queue()
{
//arrange
await SendMessage("Testing Dead Letters");
var stateHandler = await GetMessageStateHandler();

//act
await stateHandler.DeadLetterAsync(0);

//assert
var messages = await GetMessages(1);
messages.Should().HaveCount(0);
var deadLetters = await GetDeadLetterMessages(10);
deadLetters.Count.Should().Be(1);
}
}

0 comments on commit b4f2ce1

Please sign in to comment.