diff --git a/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/IServiceBusTopicCreator.cs b/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/IServiceBusTopicCreator.cs new file mode 100644 index 00000000..a47a1685 --- /dev/null +++ b/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/IServiceBusTopicCreator.cs @@ -0,0 +1,9 @@ +using System.Threading; +using System.Threading.Tasks; + +namespace KnightBus.Azure.ServiceBus.Management; + +public interface IServiceBusTopicCreator +{ + Task CreateIfNotExists(string path, CancellationToken ct); +} diff --git a/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusExtensions.cs b/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusExtensions.cs index 4e3edeb3..cb1ab770 100644 --- a/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusExtensions.cs +++ b/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusExtensions.cs @@ -7,7 +7,10 @@ namespace KnightBus.Azure.ServiceBus.Management; public static class ServiceBusExtensions { - internal static QueueProperties ToQueueProperties(this QueueRuntimeProperties properties, IQueueManager manager) + internal static QueueProperties ToQueueProperties( + this QueueRuntimeProperties properties, + IQueueManager manager + ) { return new QueueProperties(properties.Name, manager, true) { @@ -23,7 +26,12 @@ internal static QueueProperties ToQueueProperties(this QueueRuntimeProperties pr UpdatedAt = properties.UpdatedAt }; } - internal static SubscriptionQueueProperties ToQueueProperties(this SubscriptionRuntimeProperties properties, IQueueManager manager, string topic) + + internal static SubscriptionQueueProperties ToQueueProperties( + this SubscriptionRuntimeProperties properties, + IQueueManager manager, + string topic + ) { return new SubscriptionQueueProperties(properties.SubscriptionName, manager, topic, false) { @@ -38,7 +46,10 @@ internal static SubscriptionQueueProperties ToQueueProperties(this SubscriptionR }; } - internal static QueueProperties ToQueueProperties(this TopicRuntimeProperties properties, IQueueManager manager) + internal static QueueProperties ToQueueProperties( + this TopicRuntimeProperties properties, + IQueueManager manager + ) { return new QueueProperties(properties.Name, manager, false, QueueType.Topic) { @@ -50,7 +61,10 @@ internal static QueueProperties ToQueueProperties(this TopicRuntimeProperties pr }; } - public static IServiceCollection AddServiceBusManagement(this IServiceCollection services, string connectionString) + public static IServiceCollection AddServiceBusManagement( + this IServiceCollection services, + string connectionString + ) { return services .AddScoped() @@ -60,4 +74,9 @@ public static IServiceCollection AddServiceBusManagement(this IServiceCollection .AddScoped() .UseServiceBus(c => c.ConnectionString = connectionString); } + + public static IServiceCollection AddTopicCreator(this IServiceCollection services) + { + return services.AddScoped(); + } } diff --git a/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusTopicManager.cs b/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusTopicManager.cs index 28593f96..5884e5ef 100644 --- a/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusTopicManager.cs +++ b/knightbus-azureservicebus/src/KnightBus.Azure.ServiceBus.Management/ServiceBusTopicManager.cs @@ -9,7 +9,7 @@ namespace KnightBus.Azure.ServiceBus.Management; -public class ServiceBusTopicManager : IQueueManager +public class ServiceBusTopicManager : IQueueManager, IServiceBusTopicCreator { private readonly ServiceBusAdministrationClient _adminClient; private readonly ServiceBusClient _client; @@ -19,20 +19,29 @@ public ServiceBusTopicManager(IServiceBusConfiguration configuration) _adminClient = new ServiceBusAdministrationClient(configuration.ConnectionString); _client = new ServiceBusClient(configuration.ConnectionString); } + public async Task> List(CancellationToken ct) { var queues = _adminClient.GetTopicsRuntimePropertiesAsync((ct)); var properties = new List(); await foreach (var q in queues) - properties.Add(q.ToQueueProperties(new ServiceBusSubscriptionManager(q.Name, _client, _adminClient))); + properties.Add( + q.ToQueueProperties( + new ServiceBusSubscriptionManager(q.Name, _client, _adminClient) + ) + ); return properties; } public async Task Get(string path, CancellationToken ct) { - var topic = await _adminClient.GetTopicRuntimePropertiesAsync(path, ct).ConfigureAwait(false); - return topic.Value.ToQueueProperties(new ServiceBusSubscriptionManager(path, _client, _adminClient)); + var topic = await _adminClient + .GetTopicRuntimePropertiesAsync(path, ct) + .ConfigureAwait(false); + return topic.Value.ToQueueProperties( + new ServiceBusSubscriptionManager(path, _client, _adminClient) + ); } public Task Delete(string path, CancellationToken ct) @@ -45,12 +54,20 @@ public Task> Peek(string name, int count, Cancellati throw new NotImplementedException(); } - public Task> PeekDeadLetter(string path, int count, CancellationToken ct) + public Task> PeekDeadLetter( + string path, + int count, + CancellationToken ct + ) { throw new NotImplementedException(); } - public Task> ReadDeadLetter(string path, int count, CancellationToken ct) + public Task> ReadDeadLetter( + string path, + int count, + CancellationToken ct + ) { throw new NotImplementedException(); } @@ -59,5 +76,31 @@ public Task MoveDeadLetters(string path, int count, CancellationToken ct) { throw new NotImplementedException(); } + + /// + ///Will try and get the topic and create one if one isn't found + /// + /// + /// + /// True if the topic was actually created, otherwise false, this does not neccesarily indicate that the Topic exists + public async Task CreateIfNotExists(string path, CancellationToken ct) + { + try + { + var topic = await _adminClient.GetTopicRuntimePropertiesAsync(path, ct); + return false; + } + catch (ServiceBusException ex) + { + if (ex.Reason == ServiceBusFailureReason.MessageNotFound) + { + var createdTopic = await _adminClient.CreateTopicAsync(path, ct); + if (createdTopic.HasValue) + return true; + } + } + return false; + } + public QueueType QueueType => QueueType.Topic; }