Skip to content

Commit

Permalink
fix(postgresbus): use batch commands
Browse files Browse the repository at this point in the history
  • Loading branch information
elmaxe committed Sep 13, 2024
1 parent 887eca7 commit 099ef9a
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/BookBeat/knightbus/master/documentation/media/images/knighbus-64.png</PackageIconUrl>
<PackageIcon>knighbus-64.png</PackageIcon>
<RepositoryUrl>https://github.com/BookBeat/knightbus</RepositoryUrl>
<Version>1.1.4</Version>
<Version>1.1.5</Version>
<PackageTags>knightbus;postgresql;queues;messaging</PackageTags>
<LangVersion>latest</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
Expand Down
52 changes: 24 additions & 28 deletions knightbus-postgresql/src/KnightBus.PostgreSql/PostgresBus.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Data;
using System.Text;
using KnightBus.Core;
using KnightBus.Messages;
using KnightBus.PostgreSql.Messages;
Expand All @@ -25,7 +24,7 @@ public class PostgresBus : IPostgresBus
private readonly NpgsqlDataSource _npgsqlDataSource;
private readonly IMessageSerializer _serializer;

public PostgresBus([FromKeyedServices(PostgresConstants.NpgsqlDataSourceContainerKey)]NpgsqlDataSource npgsqlDataSource, IPostgresConfiguration postgresConfiguration)
public PostgresBus([FromKeyedServices(NpgsqlDataSourceContainerKey)] NpgsqlDataSource npgsqlDataSource, IPostgresConfiguration postgresConfiguration)
{
_npgsqlDataSource = npgsqlDataSource;
_serializer = postgresConfiguration.MessageSerializer;
Expand Down Expand Up @@ -66,37 +65,34 @@ private async Task SendAsyncInternal<T>(IEnumerable<T> messages, TimeSpan? delay
var queueName = AutoMessageMapper.GetQueueName<T>();
var messagesList = messages.ToList();

await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct).ConfigureAwait(false);
await using var command = new NpgsqlCommand(null, connection);

/*
* Build a cmd text with multirow VALUES syntax and parameter placeholders
* INSERT INTO queue (visibility_timeout, message) VALUES
* ('now() + optional delay', ($1)),
* ('now() + optional delay', ($2)),
* ('now() + optional delay', ($3));
*/
var values = new StringBuilder();
for (int i = 0; i < messagesList.Count; i++)
await using var connection = await _npgsqlDataSource
.OpenConnectionAsync(ct)
.ConfigureAwait(false);
await using var batch = new NpgsqlBatch(connection);

foreach (var message in messagesList)
{
var oneBasedIndex = i + 1;
var trailingComma = oneBasedIndex == messagesList.Count ? string.Empty : ",";
values.AppendFormat("((now() + interval '{0} seconds'), (${1})){2}",
delay?.TotalSeconds ?? 0, oneBasedIndex, trailingComma);
var mBody = _serializer.Serialize(message);

var mBody = _serializer.Serialize(messagesList[i]);
command.Parameters.Add(new NpgsqlParameter { Value = mBody, NpgsqlDbType = NpgsqlDbType.Jsonb });
}
var visibilityTimeout = $"now() + INTERVAL '{delay?.TotalSeconds ?? 0}'";
var cmd = new NpgsqlBatchCommand(
$"""
INSERT INTO {SchemaName}.{QueuePrefix}_{queueName} (visibility_timeout, message)
VALUES ({visibilityTimeout}, $1::JSONB);
"""
);

var stringValues = values.ToString();
cmd.Parameters.Add(
new NpgsqlParameter { Value = mBody, NpgsqlDbType = NpgsqlDbType.Jsonb }
);

command.CommandText = @$"
INSERT INTO {SchemaName}.{QueuePrefix}_{queueName} (visibility_timeout, message)
VALUES {stringValues};
";
await command.PrepareAsync(ct);
await command.ExecuteNonQueryAsync(ct);
batch.BatchCommands.Add(cmd);
}

await batch.PrepareAsync(ct).ConfigureAwait(false);
await batch.ExecuteNonQueryAsync(ct).ConfigureAwait(false);
}

private async Task PublishAsyncInternal<T>(IEnumerable<T> messages, CancellationToken ct) where T : IPostgresEvent
{
var topicName = AutoMessageMapper.GetQueueName<T>();
Expand Down
Loading

0 comments on commit 099ef9a

Please sign in to comment.