-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Postgresql transport #111
Postgresql transport #111
Conversation
2630738
to
7bacfdc
Compare
var propertiesOrdinal = reader.GetOrdinal("properties"); | ||
var isPropertiesNull = reader.IsDBNull(propertiesOrdinal); | ||
|
||
var postgresMessage = new PostgresMessage<DictionaryMessage> | ||
{ | ||
Id = reader.GetInt64(reader.GetOrdinal("message_id")), | ||
ReadCount = reader.GetInt32(reader.GetOrdinal("read_count")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could move the GetOrdinal functions outwide the while loop since they won't change during the loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice catch
// var commands = new List<SamplePostgresMessage>(); | ||
// for (int i = 0; i < 10000; i++) | ||
// { | ||
// commands.Add(new SamplePostgresMessage { MessageBody = $"{i}_{Guid.NewGuid()}" }); | ||
// } | ||
// var chunks = commands.Chunk(1000); | ||
// foreach (var chunk in chunks) | ||
// { | ||
// await client.SendAsync(chunk); | ||
// } | ||
|
||
await client.SendAsync(new SamplePostgresMessage { MessageBody = Guid.NewGuid().ToString() }); | ||
|
||
//await client.SendAsync(new SamplePoisonPostgresMessage { MessageBody = $"error_{Guid.NewGuid()}" } ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this commented-out code be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice :)
private readonly IMessageSerializer _serializer; | ||
private readonly SemaphoreSlim _maxConcurrent; | ||
private CancellationTokenSource _pumpDelayCancellationTokenSource = new(); | ||
private readonly TimeSpan _pollingSleepInterval = TimeSpan.FromSeconds(5); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be configurable? 5 seconds might be too long in some scenarios
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah I will have a go at it.
I would also like to explore a longer interval combined with LISTEN/NOTIFY. But that would still require polling since notifications are ephemeral.
_maxConcurrent = new SemaphoreSlim(settings.MaxConcurrentCalls); | ||
} | ||
|
||
public async Task StartAsync(CancellationToken cancellationToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix this warning. Return the task or Task.Completed
|
||
public static class NpgsqlDataReaderExtensions | ||
{ | ||
public static async Task<List<PostgresMessage<T>>> ReadDeadLetterRows<T>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IAsyncEnumerable instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will copy arrays multiple times in memory of not either sized properly or handled as IEnumerable
var mBody = _serializer.Serialize(messagesList[i]); | ||
command.Parameters.Add(new NpgsqlParameter { Value = mBody, NpgsqlDbType = NpgsqlDbType.Jsonb }); | ||
} | ||
var stringValues = values.ToString().TrimEnd(','); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimization: do not add ',' if last row instead of modifying the entire string every time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes let's get rid of that. Especially better for large batch inserts when the cmd text can get big.
|
||
namespace KnightBus.PostgreSql; | ||
|
||
public interface IPostgresBus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add CancellationToken support!
} | ||
|
||
public async Task<List<PostgresMessage<T>>> GetMessagesAsync(int count, int visibilityTimeout) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CancellationToken?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm yes GetMessagesAsync should have support for cancellation. But not CompleteAsync, AbandonByErrorAsync and DeadLetterMessageAsync if we are in the midst of processing a message during shutdown. But no new ones should be fetched.
|
||
public static class QueueInitializer | ||
{ | ||
public static async Task InitQueue(PostgresQueueName queueName, NpgsqlDataSource npgsqlDataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will you manage to update to a newer version? Should schema version be included somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to think a bit about how this should be handled. I have deliberately left it out because I might have to invent some migration support that works for a lot of tables sharing the same schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Impressive work!
thanks! |
await using var transaction = await connection.BeginTransactionAsync(); | ||
|
||
await using var createSchema = new NpgsqlCommand(@$" | ||
CREATE SCHEMA IF NOT EXISTS {SchemaName}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have an idea on how to handle schema updates in the future, if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though it only uses a small set of tables we might end up in a situation where we want to add functionality. For example a priority column to order by?
I can think of two ways at the moment. Run migrations for each queue on first application start YOLO style or have some bundled CLI tool that does all the queue creations and migrations with a connection string.
foreach (var postgresMessage in messages) | ||
{ | ||
await _maxConcurrent.WaitAsync(cancellationToken).ConfigureAwait(false); | ||
var timeoutToken = new CancellationTokenSource(Settings.MessageLockTimeout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will not work properly, the timer starts ticking when you get the messages, but you start the timer for the token after waiting for the semaphore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
472ad53 not perfect but should align better with message fetch time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm this won't work. The timeoutToken will get disposed if declared once above message loop.
a043efa
to
9cba7dd
Compare
This reverts commit 8460079.
189a9c8
to
2ff5154
Compare
No description provided.