Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Postgresql transport #111

Merged
merged 40 commits into from
Apr 17, 2024
Merged

Postgresql transport #111

merged 40 commits into from
Apr 17, 2024

Conversation

viktor-h
Copy link
Contributor

@viktor-h viktor-h commented Apr 4, 2024

No description provided.

@viktor-h viktor-h force-pushed the postgresql-transport branch from 2630738 to 7bacfdc Compare April 14, 2024 14:42
@viktor-h viktor-h marked this pull request as ready for review April 14, 2024 21:20
@viktor-h viktor-h requested a review from NiklasArbin April 15, 2024 07:02
Comment on lines 118 to 124
var propertiesOrdinal = reader.GetOrdinal("properties");
var isPropertiesNull = reader.IsDBNull(propertiesOrdinal);

var postgresMessage = new PostgresMessage<DictionaryMessage>
{
Id = reader.GetInt64(reader.GetOrdinal("message_id")),
ReadCount = reader.GetInt32(reader.GetOrdinal("read_count")),
Copy link

@niclaslindstedt niclaslindstedt Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could move the GetOrdinal functions outwide the while loop since they won't change during the loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch

Comment on lines 50 to 63
// var commands = new List<SamplePostgresMessage>();
// for (int i = 0; i < 10000; i++)
// {
// commands.Add(new SamplePostgresMessage { MessageBody = $"{i}_{Guid.NewGuid()}" });
// }
// var chunks = commands.Chunk(1000);
// foreach (var chunk in chunks)
// {
// await client.SendAsync(chunk);
// }

await client.SendAsync(new SamplePostgresMessage { MessageBody = Guid.NewGuid().ToString() });

//await client.SendAsync(new SamplePoisonPostgresMessage { MessageBody = $"error_{Guid.NewGuid()}" } );

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this commented-out code be removed?

Copy link

@niclaslindstedt niclaslindstedt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice :)

private readonly IMessageSerializer _serializer;
private readonly SemaphoreSlim _maxConcurrent;
private CancellationTokenSource _pumpDelayCancellationTokenSource = new();
private readonly TimeSpan _pollingSleepInterval = TimeSpan.FromSeconds(5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable? 5 seconds might be too long in some scenarios

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I will have a go at it.

I would also like to explore a longer interval combined with LISTEN/NOTIFY. But that would still require polling since notifications are ephemeral.

_maxConcurrent = new SemaphoreSlim(settings.MaxConcurrentCalls);
}

public async Task StartAsync(CancellationToken cancellationToken)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix this warning. Return the task or Task.Completed


public static class NpgsqlDataReaderExtensions
{
public static async Task<List<PostgresMessage<T>>> ReadDeadLetterRows<T>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IAsyncEnumerable instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will copy arrays multiple times in memory of not either sized properly or handled as IEnumerable

var mBody = _serializer.Serialize(messagesList[i]);
command.Parameters.Add(new NpgsqlParameter { Value = mBody, NpgsqlDbType = NpgsqlDbType.Jsonb });
}
var stringValues = values.ToString().TrimEnd(',');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimization: do not add ',' if last row instead of modifying the entire string every time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let's get rid of that. Especially better for large batch inserts when the cmd text can get big.


namespace KnightBus.PostgreSql;

public interface IPostgresBus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add CancellationToken support!

}

public async Task<List<PostgresMessage<T>>> GetMessagesAsync(int count, int visibilityTimeout)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancellationToken?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm yes GetMessagesAsync should have support for cancellation. But not CompleteAsync, AbandonByErrorAsync and DeadLetterMessageAsync if we are in the midst of processing a message during shutdown. But no new ones should be fetched.


public static class QueueInitializer
{
public static async Task InitQueue(PostgresQueueName queueName, NpgsqlDataSource npgsqlDataSource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will you manage to update to a newer version? Should schema version be included somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to think a bit about how this should be handled. I have deliberately left it out because I might have to invent some migration support that works for a lot of tables sharing the same schema.

Copy link
Member

@NiklasArbin NiklasArbin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Impressive work!

@viktor-h
Copy link
Contributor Author

Impressive work!

thanks!

await using var transaction = await connection.BeginTransactionAsync();

await using var createSchema = new NpgsqlCommand(@$"
CREATE SCHEMA IF NOT EXISTS {SchemaName};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an idea on how to handle schema updates in the future, if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it only uses a small set of tables we might end up in a situation where we want to add functionality. For example a priority column to order by?

I can think of two ways at the moment. Run migrations for each queue on first application start YOLO style or have some bundled CLI tool that does all the queue creations and migrations with a connection string.

foreach (var postgresMessage in messages)
{
await _maxConcurrent.WaitAsync(cancellationToken).ConfigureAwait(false);
var timeoutToken = new CancellationTokenSource(Settings.MessageLockTimeout);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work properly, the timer starts ticking when you get the messages, but you start the timer for the token after waiting for the semaphore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

472ad53 not perfect but should align better with message fetch time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this won't work. The timeoutToken will get disposed if declared once above message loop.

@viktor-h viktor-h force-pushed the postgresql-transport branch 3 times, most recently from a043efa to 9cba7dd Compare April 16, 2024 21:17
@viktor-h viktor-h force-pushed the postgresql-transport branch from 189a9c8 to 2ff5154 Compare April 17, 2024 10:23
@viktor-h viktor-h merged commit 3c686c6 into master Apr 17, 2024
3 checks passed
@viktor-h viktor-h deleted the postgresql-transport branch April 17, 2024 11:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants