Skip to content

Commit

Permalink
Feature/postgres events (#119)
Browse files Browse the repository at this point in the history
* Add pqsql tables for topic and subscriptions

* Add knightbus plumbing for events

* create publish function

* Use test containers for postgres

* Working SubscriptionManager tests

* use raw mode parameters

* Refactor the read methods to use the same functions
  • Loading branch information
NiklasArbin authored May 24, 2024
1 parent 7754020 commit 3da304c
Show file tree
Hide file tree
Showing 25 changed files with 991 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<PackageIconUrl>https://raw.githubusercontent.com/BookBeat/knightbus/master/documentation/media/images/knighbus-64.png</PackageIconUrl>
<PackageIcon>knighbus-64.png</PackageIcon>
<RepositoryUrl>https://github.com/BookBeat/knightbus</RepositoryUrl>
<Version>1.1.3</Version>
<Version>1.2.0</Version>
<PackageTags>knightbus;postgresql;queues;messaging</PackageTags>
<LangVersion>latest</LangVersion>
<ImplicitUsings>enable</ImplicitUsings>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,54 @@ public async Task<List<PostgresQueueMetadata>> ListQueues(CancellationToken ct)
return queueMetas;
}

public async Task<List<PostgresQueueMetadata>> ListSubscriptions(string topic, CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
SELECT subscription_name, created_at
FROM {SchemaName}.{TopicPrefix}_{topic};
");

await using var reader = await command.ExecuteReaderAsync(ct);
var queueMetas = new List<PostgresQueueMetadata>();
while (await reader.ReadAsync(ct))
{
queueMetas.Add(new PostgresQueueMetadata
{
Name = reader.GetString(0),
CreatedAt = reader.GetFieldValue<DateTimeOffset>(1)
});
}

await using var conn = await _npgsqlDataSource.OpenConnectionAsync(ct);
foreach (var queueMetadata in queueMetas)
{
await using var batch = new NpgsqlBatch(conn)
{
BatchCommands =
{
new NpgsqlBatchCommand($@"
SELECT COUNT(*) FROM {SchemaName}.{SubscriptionPrefix}_{topic}_{queueMetadata.Name};"),
new NpgsqlBatchCommand($@"
SELECT COUNT(*) FROM {SchemaName}.{DlQueuePrefix}_{topic}_{queueMetadata.Name};")

}
};

await batch.PrepareAsync(ct);

await using var batchReader = await batch.ExecuteReaderAsync(ct);
await batchReader.ReadAsync(ct);
queueMetadata.ActiveMessagesCount = batchReader.GetInt32(0);
await batchReader.NextResultAsync(ct);
await batchReader.ReadAsync(ct);
queueMetadata.DeadLetterMessagesCount = batchReader.GetInt32(0);
await batchReader.NextResultAsync(ct);
await batchReader.ReadAsync(ct);
}

return queueMetas;
}

public async Task<PostgresQueueMetadata> GetQueue(PostgresQueueName queueName, CancellationToken ct)
{
await using var conn = await _npgsqlDataSource.OpenConnectionAsync(ct);
Expand All @@ -85,11 +133,16 @@ SELECT created_at FROM {SchemaName}.metadata

metadataCommand.Parameters.Add(new NpgsqlParameter<string> { TypedValue = queueName.Value });
batch.BatchCommands.Add(metadataCommand);

await batch.PrepareAsync(ct);

var queueMeta = new PostgresQueueMetadata { Name = queueName.Value };
await using var reader = await batch.ExecuteReaderAsync(ct);
return await GetQueueMetadata(reader, queueName, ct);
}

private static async Task<PostgresQueueMetadata> GetQueueMetadata(NpgsqlDataReader reader, PostgresQueueName queueName, CancellationToken ct)
{
var queueMeta = new PostgresQueueMetadata { Name = queueName.Value };
await reader.ReadAsync(ct);
queueMeta.ActiveMessagesCount = reader.GetInt32(0);
await reader.NextResultAsync(ct);
Expand All @@ -101,6 +154,34 @@ SELECT created_at FROM {SchemaName}.metadata
return queueMeta;
}

public async Task<PostgresQueueMetadata> GetSubscription(string topic, PostgresQueueName subscription, CancellationToken ct)
{
await using var conn = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var batch = new NpgsqlBatch(conn)
{
BatchCommands =
{
new NpgsqlBatchCommand($@"
SELECT COUNT(*) FROM {SchemaName}.{SubscriptionPrefix}_{topic}_{subscription};"),
new NpgsqlBatchCommand($@"
SELECT COUNT(*) FROM {SchemaName}.{DlQueuePrefix}_{topic}_{subscription};")

}
};

var metadataCommand = new NpgsqlBatchCommand($@"
SELECT created_at FROM {SchemaName}.{TopicPrefix}_{topic}
WHERE subscription_name = ($1);");

metadataCommand.Parameters.Add(new NpgsqlParameter<string> { TypedValue = subscription.Value });
batch.BatchCommands.Add(metadataCommand);

await batch.PrepareAsync(ct);

await using var reader = await batch.ExecuteReaderAsync(ct);
return await GetQueueMetadata(reader, subscription, ct);
}

public async IAsyncEnumerable<PostgresMessage<DictionaryMessage>> PeekMessagesAsync(PostgresQueueName queueName, int count, [EnumeratorCancellation] CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
Expand All @@ -113,9 +194,19 @@ ORDER BY message_id ASC
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });

await using var reader = await command.ExecuteReaderAsync(ct);
await foreach (var p in ReadMessages(reader, false, ct))
{
yield return p;
}
}

private async IAsyncEnumerable<PostgresMessage<DictionaryMessage>> ReadMessages(NpgsqlDataReader reader, bool isDeadLetter,
[EnumeratorCancellation] CancellationToken ct)
{

var propertiesOrdinal = reader.GetOrdinal("properties");
var messageIdOrdinal = reader.GetOrdinal("message_id");
var readCountOrdinal = reader.GetOrdinal("read_count");
var readCountOrdinal = !isDeadLetter ? reader.GetOrdinal("read_count") : 0;
var messageOrdinal = reader.GetOrdinal("message");

while (await reader.ReadAsync(ct))
Expand All @@ -140,6 +231,24 @@ ORDER BY message_id ASC
}
}

public async IAsyncEnumerable<PostgresMessage<DictionaryMessage>> PeekMessagesAsync(string topic, PostgresQueueName subscription, int count, [EnumeratorCancellation] CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
SELECT message_id, enqueued_at, read_count, message, properties
FROM {SchemaName}.{SubscriptionPrefix}_{topic}_{subscription}
ORDER BY message_id ASC
LIMIT ($1);
");

command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });

await using var reader = await command.ExecuteReaderAsync(ct);
await foreach (var p in ReadMessages(reader, false, ct))
{
yield return p;
}
}

public async IAsyncEnumerable<PostgresMessage<DictionaryMessage>> PeekDeadLettersAsync(PostgresQueueName queueName, int count, [EnumeratorCancellation] CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
Expand All @@ -152,28 +261,27 @@ ORDER BY message_id ASC
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
await using var reader = await command.ExecuteReaderAsync(ct);

var propertiesOrdinal = reader.GetOrdinal("properties");
var messageIdOrdinal = reader.GetOrdinal("message_id");
var messageOrdinal = reader.GetOrdinal("message");

while (await reader.ReadAsync(ct))
await foreach (var p in ReadMessages(reader, true, ct))
{
var isPropertiesNull = reader.IsDBNull(propertiesOrdinal);
yield return p;
}
}

public async IAsyncEnumerable<PostgresMessage<DictionaryMessage>> PeekDeadLettersAsync(string topic, PostgresQueueName subscription, int count, [EnumeratorCancellation] CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
SELECT message_id, enqueued_at, created_at, message, properties
FROM {SchemaName}.{DlQueuePrefix}_{topic}_{subscription}
ORDER BY message_id ASC
LIMIT ($1);
");

var postgresMessage = new PostgresMessage<DictionaryMessage>
{
Id = reader.GetInt64(messageIdOrdinal),
Message = _serializer
.Deserialize<DictionaryMessage>(reader.GetFieldValue<byte[]>(messageOrdinal)
.AsMemory()),
Properties = isPropertiesNull
? new Dictionary<string, string>()
: _serializer
.Deserialize<Dictionary<string, string>>(
reader.GetFieldValue<byte[]>(propertiesOrdinal)
.AsMemory())
};
yield return postgresMessage;
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
await using var reader = await command.ExecuteReaderAsync(ct);

await foreach (var p in ReadMessages(reader, true, ct))
{
yield return p;
}
}

Expand All @@ -198,28 +306,36 @@ FOR UPDATE
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
await using var reader = await command.ExecuteReaderAsync(ct);

var propertiesOrdinal = reader.GetOrdinal("properties");
var messageIdOrdinal = reader.GetOrdinal("message_id");
var messageOrdinal = reader.GetOrdinal("message");

while (await reader.ReadAsync(ct))
await foreach (var p in ReadMessages(reader, true, ct))
{
var isPropertiesNull = reader.IsDBNull(propertiesOrdinal);
yield return p;
}
}

public async IAsyncEnumerable<PostgresMessage<DictionaryMessage>> ReadDeadLettersAsync(string topic, PostgresQueueName subscription, int count, [EnumeratorCancellation] CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
WITH deleted_rows AS (
DELETE FROM {SchemaName}.{DlQueuePrefix}_{topic}_{subscription}
WHERE message_id IN (
SELECT message_id
FROM {SchemaName}.{DlQueuePrefix}_{topic}_{subscription}
ORDER BY message_id ASC
LIMIT ($1)
FOR UPDATE
)
RETURNING *
)
SELECT message_id, enqueued_at, created_at, message, properties
FROM deleted_rows;
");

var postgresMessage = new PostgresMessage<DictionaryMessage>
{
Id = reader.GetInt64(messageIdOrdinal),
Message = _serializer
.Deserialize<DictionaryMessage>(reader.GetFieldValue<byte[]>(messageOrdinal)
.AsMemory()),
Properties = isPropertiesNull
? new Dictionary<string, string>()
: _serializer
.Deserialize<Dictionary<string, string>>(
reader.GetFieldValue<byte[]>(propertiesOrdinal)
.AsMemory())
};
yield return postgresMessage;
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
await using var reader = await command.ExecuteReaderAsync(ct);

await foreach (var p in ReadMessages(reader, true, ct))
{
yield return p;
}
}

Expand All @@ -243,6 +359,32 @@ FROM deleted_rows
RETURNING *
)
SELECT COUNT(*) FROM inserted_rows;
");
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
var result = await command.ExecuteScalarAsync(ct);
return (long)(result ?? 0);
}

public async Task<long> RequeueDeadLettersAsync(string topic, PostgresQueueName subscription, int count, CancellationToken ct)
{
await using var command = _npgsqlDataSource.CreateCommand(@$"
WITH deleted_rows AS (
DELETE FROM {SchemaName}.{DlQueuePrefix}_{topic}_{subscription}
WHERE message_id IN (
SELECT message_id
FROM {SchemaName}.{DlQueuePrefix}_{topic}_{subscription}
ORDER BY message_id ASC
LIMIT ($1)
FOR UPDATE
)
RETURNING *
), inserted_rows AS (
INSERT INTO {SchemaName}.{SubscriptionPrefix}_{topic}_{subscription} (visibility_timeout, message)
SELECT now(), message
FROM deleted_rows
RETURNING *
)
SELECT COUNT(*) FROM inserted_rows;
");
command.Parameters.Add(new NpgsqlParameter<int> { TypedValue = count });
var result = await command.ExecuteScalarAsync(ct);
Expand Down Expand Up @@ -271,6 +413,28 @@ DELETE FROM {SchemaName}.metadata
await deleteMetadata.ExecuteNonQueryAsync(ct);
await transaction.CommitAsync(ct);
}
public async Task DeleteSubscription(string topic, PostgresQueueName subscription, CancellationToken ct)
{
await using var connection = await _npgsqlDataSource.OpenConnectionAsync(ct);
await using var transaction = await connection.BeginTransactionAsync(ct);

await using var truncateQueue = _npgsqlDataSource.CreateCommand(@$"
DROP TABLE IF EXISTS {SchemaName}.{SubscriptionPrefix}_{topic}_{subscription};
");
await using var truncateDlQueue = _npgsqlDataSource.CreateCommand(@$"
DROP TABLE IF EXISTS {SchemaName}.{DlQueuePrefix}_{topic}_{subscription};
");
await using var deleteMetadata = _npgsqlDataSource.CreateCommand(@$"
DELETE FROM {SchemaName}.{TopicPrefix}_{topic}
WHERE subscription_name = ($1);
");
deleteMetadata.Parameters.Add(new NpgsqlParameter<string> { TypedValue = subscription.Value });

await truncateQueue.ExecuteNonQueryAsync(ct);
await truncateDlQueue.ExecuteNonQueryAsync(ct);
await deleteMetadata.ExecuteNonQueryAsync(ct);
await transaction.CommitAsync(ct);
}

public async Task PurgeDeadLetterQueue(PostgresQueueName queueName)
{
Expand Down
Loading

0 comments on commit 3da304c

Please sign in to comment.