Skip to content

Commit

Permalink
Extract queue client query text
Browse files Browse the repository at this point in the history
  • Loading branch information
Viktor Hartenberger committed Apr 14, 2024
1 parent e329305 commit 8460079
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using KnightBus.PostgreSql.Messages;
using Npgsql;
using NpgsqlTypes;
using static KnightBus.PostgreSql.PostgresConstants;
using static KnightBus.PostgreSql.Query;

namespace KnightBus.PostgreSql;

Expand All @@ -23,24 +23,7 @@ public PostgresQueueClient(NpgsqlDataSource npgsqlDataSource, IMessageSerializer
public async Task<PostgresMessage<T>[]> GetMessagesAsync(int count, int visibilityTimeout)
{
await using var connection = await _npgsqlDataSource.OpenConnectionAsync();
await using var command = new NpgsqlCommand(@$"
WITH cte AS
(
SELECT message_id
FROM {SchemaName}.{QueuePrefix}_{_queueName}
WHERE visibility_timeout <= clock_timestamp()
ORDER BY message_id ASC
LIMIT ($1)
FOR UPDATE SKIP LOCKED
)
UPDATE {SchemaName}.{QueuePrefix}_{_queueName} t
SET
visibility_timeout = clock_timestamp() + ($2),
read_count = read_count + 1
FROM cte
WHERE t.message_id = cte.message_id
RETURNING *;
", connection);
await using var command = new NpgsqlCommand(GetMessages(_queueName), connection);

command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
command.Parameters.Add(new NpgsqlParameter<TimeSpan> { TypedValue = TimeSpan.FromSeconds(visibilityTimeout) });
Expand Down Expand Up @@ -77,10 +60,7 @@ FROM cte

public async Task CompleteAsync(PostgresMessage<T> message)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
DELETE FROM {SchemaName}.{QueuePrefix}_{_queueName}
WHERE message_id = ($1);
");
await using var command = _npgsqlDataSource.CreateCommand(CompleteMessage(_queueName));
command.Parameters.Add(new NpgsqlParameter<long> { Value = message.Id });
await command.ExecuteNonQueryAsync();
}
Expand All @@ -90,11 +70,7 @@ public async Task AbandonByErrorAsync(PostgresMessage<T> message, Exception exce
var errorString = exception.ToString();
message.Properties["error_message"] = errorString;

await using var command = _npgsqlDataSource.CreateCommand(@$"
UPDATE {SchemaName}.{QueuePrefix}_{_queueName}
SET properties = ($1), visibility_timeout = now()
WHERE message_id = ($2);
");
await using var command = _npgsqlDataSource.CreateCommand(AbandonByError(_queueName));
command.Parameters.Add(new NpgsqlParameter
{
Value = _serializer.Serialize(message.Properties), NpgsqlDbType = NpgsqlDbType.Jsonb
Expand All @@ -105,29 +81,14 @@ public async Task AbandonByErrorAsync(PostgresMessage<T> message, Exception exce

public async Task DeadLetterMessageAsync(PostgresMessage<T> message)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
WITH DeadLetter AS (
DELETE FROM {SchemaName}.{QueuePrefix}_{_queueName}
WHERE message_id = ($1)
RETURNING message_id, enqueued_at, message, properties
)
INSERT INTO {SchemaName}.{DlQueuePrefix}_{_queueName} (message_id, enqueued_at, created_at, message, properties)
SELECT message_id, enqueued_at, now(), message, properties
FROM DeadLetter;
");
await using var command = _npgsqlDataSource.CreateCommand(DeadLetterMessage(_queueName));
command.Parameters.Add(new NpgsqlParameter<long> { Value = message.Id });
await command.ExecuteNonQueryAsync();
}

public async Task<List<PostgresMessage<T>>> PeekDeadLetterMessagesAsync(int count, CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
SELECT message_id, enqueued_at, created_at, message, properties
FROM {SchemaName}.{DlQueuePrefix}_{_queueName}
ORDER BY message_id ASC
LIMIT ($1);
");

await using var command = _npgsqlDataSource.CreateCommand(PeekDeadLetterMessage(_queueName));
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
await using var reader = await command.ExecuteReaderAsync(ct);
return await reader.ReadDeadLetterRows<T>(_serializer, ct);
Expand Down
53 changes: 53 additions & 0 deletions knightbus-postgresql/src/KnightBus.PostgreSql/Query.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
namespace KnightBus.PostgreSql;
using static PostgresConstants;

public static class Query
{
public static string GetMessages(PostgresQueueName queueName) => @$"
WITH cte AS
(
SELECT message_id
FROM {SchemaName}.{QueuePrefix}_{queueName}
WHERE visibility_timeout <= clock_timestamp()
ORDER BY message_id ASC
LIMIT ($1)
FOR UPDATE SKIP LOCKED
)
UPDATE {SchemaName}.{QueuePrefix}_{queueName} t
SET
visibility_timeout = clock_timestamp() + ($2),
read_count = read_count + 1
FROM cte
WHERE t.message_id = cte.message_id
RETURNING *;
";

public static string CompleteMessage(PostgresQueueName queueName) => @$"
DELETE FROM {SchemaName}.{QueuePrefix}_{queueName}
WHERE message_id = ($1);
";

public static string AbandonByError(PostgresQueueName queueName) => @$"
UPDATE {SchemaName}.{QueuePrefix}_{queueName}
SET properties = ($1), visibility_timeout = now()
WHERE message_id = ($2);
";

public static string DeadLetterMessage(PostgresQueueName queueName) => @$"
WITH DeadLetter AS (
DELETE FROM {SchemaName}.{QueuePrefix}_{queueName}
WHERE message_id = ($1)
RETURNING message_id, enqueued_at, message, properties
)
INSERT INTO {SchemaName}.{DlQueuePrefix}_{queueName} (message_id, enqueued_at, created_at, message, properties)
SELECT message_id, enqueued_at, now(), message, properties
FROM DeadLetter;
";

public static string PeekDeadLetterMessage(PostgresQueueName queueName) => @$"
SELECT message_id, enqueued_at, created_at, message, properties
FROM {SchemaName}.{DlQueuePrefix}_{queueName}
ORDER BY message_id ASC
LIMIT ($1);
";
}

0 comments on commit 8460079

Please sign in to comment.