From c265185e1ad9ad8fe0d49893e479456f56b9d04f Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 18 Nov 2022 23:07:53 +0000 Subject: [PATCH] MessageDb: propulsion checkpoint --- CHANGELOG.md | 2 +- README.md | 10 +- src/Propulsion.MessageDb/Internal.fs | 13 -- src/Propulsion.MessageDb/MessageDbSource.fs | 74 ++++++------ .../Propulsion.MessageDb.fsproj | 5 +- src/Propulsion.MessageDb/ReaderCheckpoint.fs | 112 +++++++++--------- src/Propulsion.MessageDb/Readme.md | 7 +- src/Propulsion.MessageDb/Types.fs | 6 + .../Propulsion.MessageDb.Integration/Tests.fs | 44 ++++--- tools/Propulsion.Tool/Args.fs | 25 ++-- tools/Propulsion.Tool/Program.fs | 30 +++-- 11 files changed, 169 insertions(+), 159 deletions(-) delete mode 100644 src/Propulsion.MessageDb/Internal.fs create mode 100644 src/Propulsion.MessageDb/Types.fs diff --git a/CHANGELOG.md b/CHANGELOG.md index 86554491..16d2258e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `Propulsion.EventStoreDb`: Ported `EventStore` to target `Equinox.EventStore` >= `4.0.0` (using the gRPC interface) [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.CosmosStore3`: Special cased version of `Propulsion.CosmosStore` to target `Equinox.CosmosStore` v `[3.0.7`-`3.99.0]` **Deprecated; Please migrate to `Propulsion.CosmosStore` by updating `Equinox.CosmosStore` dependencies to `4.0.0`** [#139](https://github.com/jet/propulsion/pull/139) - `Propulsion.DynamoStore`: `Equinox.CosmosStore`-equivalent functionality for `Equinox.DynamoStore`. Combines elements of `CosmosStore`, `SqlStreamStore`, `Feed` [#140](https://github.com/jet/propulsion/pull/143) [#140](https://github.com/jet/propulsion/pull/143) [#177](https://github.com/jet/propulsion/pull/177) -- `Propulsion.MessageDb`: `FeedSource` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord) +- `Propulsion.MessageDb`: `FeedSource` and `CheckpointStore` for [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord) - `Propulsion.MemoryStore`: `MemoryStoreSource` to align with other sources for integration testing. Includes *deterministic* `AwaitCompletion` as per `Propulsion.Feed`-based Sources [#165](https://github.com/jet/propulsion/pull/165) - `Propulsion.SqlStreamStore`: Added `startFromTail` [#173](https://github.com/jet/propulsion/pull/173) - `Propulsion.Tool`: `checkpoint` commandline option; enables viewing or overriding checkpoints [#141](https://github.com/jet/propulsion/pull/141) diff --git a/README.md b/README.md index 1fd7766d..21a7de38 100644 --- a/README.md +++ b/README.md @@ -108,11 +108,11 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks. - `Propulsion.Tool` [![Tool NuGet](https://img.shields.io/nuget/v/Propulsion.Tool.svg)](https://www.nuget.org/packages/Propulsion.Tool/): Tool used to initialize a Change Feed Processor `aux` container for `Propulsion.Cosmos` and demonstrate basic projection, including to Kafka. See [quickstart](#quickstart). - - CosmosDB: Initialize `-aux` Container for ChangeFeedProcessor - - CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints - - CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka - - DynamoStore: validate and/or reindex DynamoStore Index - - MessageDb: Initialize a checkpoints table in a Postgres Database + - `init` CosmosDB: Initialize an `-aux` Container for ChangeFeedProcessor + - `initpg` : MessageDb: Initialize a checkpoints table in a Postgres Database + - `index`: DynamoStore: validate and/or reindex DynamoStore Index + - `checkpoint`: CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints in DynamoStore/CosmosStore/Postgres + - `project`: CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka ## Deprecated components diff --git a/src/Propulsion.MessageDb/Internal.fs b/src/Propulsion.MessageDb/Internal.fs deleted file mode 100644 index 8b842704..00000000 --- a/src/Propulsion.MessageDb/Internal.fs +++ /dev/null @@ -1,13 +0,0 @@ -namespace Propulsion.MessageDb - -open FSharp.UMX -open Npgsql - -module internal FeedSourceId = - let wellKnownId : Propulsion.Feed.SourceId = UMX.tag "messageDb" - -module internal Npgsql = - let connect connectionString ct = task { - let conn = new NpgsqlConnection(connectionString) - do! conn.OpenAsync(ct) - return conn } diff --git a/src/Propulsion.MessageDb/MessageDbSource.fs b/src/Propulsion.MessageDb/MessageDbSource.fs index 5e02322b..34cfc96d 100644 --- a/src/Propulsion.MessageDb/MessageDbSource.fs +++ b/src/Propulsion.MessageDb/MessageDbSource.fs @@ -1,24 +1,31 @@ namespace Propulsion.MessageDb -open FSharp.Control open FsCodec -open FsCodec.Core +open FSharp.Control open NpgsqlTypes -open Propulsion.Feed -open Propulsion.Feed.Core open Propulsion.Internal open System open System.Data.Common -open System.Diagnostics +module internal Npgsql = + + let connect connectionString ct = task { + let conn = new Npgsql.NpgsqlConnection(connectionString) + do! conn.OpenAsync(ct) + return conn } + +module Internal = + + open Propulsion.Feed + open System.Threading.Tasks + open Propulsion.Infrastructure // AwaitTaskCorrect -module Core = type MessageDbCategoryClient(connectionString) = let connect = Npgsql.connect connectionString let parseRow (reader: DbDataReader) = let readNullableString idx = if reader.IsDBNull(idx) then None else Some (reader.GetString idx) let streamName = reader.GetString(8) - let event = TimelineEvent.Create( + let event = FsCodec.Core.TimelineEvent.Create( index = reader.GetInt64(0), eventType = reader.GetString(1), data = ReadOnlyMemory(Text.Encoding.UTF8.GetBytes(reader.GetString 2)), @@ -28,9 +35,9 @@ module Core = ?causationId = readNullableString 6, context = reader.GetInt64(9), timestamp = DateTimeOffset(DateTime.SpecifyKind(reader.GetDateTime(7), DateTimeKind.Utc))) - struct(StreamName.parse streamName, event) - member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) = task { + + member _.ReadCategoryMessages(category: TrancheId, fromPositionInclusive: int64, batchSize: int, ct) : Task> = task { use! conn = connect ct let command = conn.CreateCommand(CommandText = "select position, type, data, metadata, id::uuid, (metadata::jsonb->>'$correlationId')::text, @@ -44,12 +51,13 @@ module Core = let mutable checkpoint = fromPositionInclusive use! reader = command.ExecuteReaderAsync(ct) - let events = [| while reader.Read() do yield parseRow reader |] + let events = [| while reader.Read() do parseRow reader |] checkpoint <- match Array.tryLast events with Some (_, ev) -> unbox ev.Context | None -> checkpoint - return { checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } } - member _.ReadCategoryLastVersion(category: TrancheId, ct) = task { + return ({ checkpoint = Position.parse checkpoint; items = events; isTail = events.Length = 0 } : Propulsion.Feed.Core.Batch<_>) } + + member _.ReadCategoryLastVersion(category: TrancheId, ct) : Task = task { use! conn = connect ct let command = conn.CreateCommand(CommandText = "select max(global_position) from messages where category(stream_name) = @Category;") command.Parameters.AddWithValue("Category", NpgsqlDbType.Text, TrancheId.toString category) |> ignore @@ -57,45 +65,44 @@ module Core = use! reader = command.ExecuteReaderAsync(ct) return if reader.Read() then reader.GetInt64(0) else 0L } -module private Impl = - open Core - open Propulsion.Infrastructure // AwaitTaskCorrect - - let readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async> = async { + let internal readBatch batchSize (store : MessageDbCategoryClient) (category, pos) : Async> = async { let! ct = Async.CancellationToken let positionInclusive = Position.toInt64 pos - let! x = store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect - return x } + return! store.ReadCategoryMessages(category, positionInclusive, batchSize, ct) |> Async.AwaitTaskCorrect } - let readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async = async { + let internal readTailPositionForTranche (store : MessageDbCategoryClient) trancheId : Async = async { let! ct = Async.CancellationToken let! lastEventPos = store.ReadCategoryLastVersion(trancheId, ct) |> Async.AwaitTaskCorrect return Position.parse lastEventPos } -type MessageDbSource +type MessageDbSource internal ( log : Serilog.ILogger, statsInterval, - client: Core.MessageDbCategoryClient, batchSize, tailSleepInterval, + client: Internal.MessageDbCategoryClient, batchSize, tailSleepInterval, checkpoints : Propulsion.Feed.IFeedCheckpointStore, sink : Propulsion.Streams.Default.Sink, - categories, - // Override default start position to be at the tail of the index. Default: Replay all events. - ?startFromTail, - ?sourceId) = + tranches, ?startFromTail, ?sourceId) = inherit Propulsion.Feed.Core.TailingFeedSource ( log, statsInterval, defaultArg sourceId FeedSourceId.wellKnownId, tailSleepInterval, checkpoints, ( if startFromTail <> Some true then None - else Some (Impl.readTailPositionForTranche client)), + else Some (Internal.readTailPositionForTranche client)), sink, (fun req -> asyncSeq { - let sw = Stopwatch.StartNew() - let! b = Impl.readBatch batchSize client req + let sw = Stopwatch.start () + let! b = Internal.readBatch batchSize client req yield sw.Elapsed, b }), string) - new (log, statsInterval, connectionString, batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail, ?sourceId) = - MessageDbSource(log, statsInterval, Core.MessageDbCategoryClient(connectionString), - batchSize, tailSleepInterval, checkpoints, sink, trancheIds, ?startFromTail=startFromTail, ?sourceId=sourceId) + new( log, statsInterval, + connectionString, batchSize, tailSleepInterval, + checkpoints, sink, + categories, + // Override default start position to be at the tail of the index. Default: Replay all events. + ?startFromTail, ?sourceId) = + MessageDbSource(log, statsInterval, Internal.MessageDbCategoryClient(connectionString), + batchSize, tailSleepInterval, checkpoints, sink, + categories |> Array.map Propulsion.Feed.TrancheId.parse, + ?startFromTail=startFromTail, ?sourceId=sourceId) abstract member ListTranches : unit -> Async - default _.ListTranches() = async { return categories |> Array.map TrancheId.parse } + default _.ListTranches() = async { return tranches } abstract member Pump : unit -> Async default x.Pump() = base.Pump(x.ListTranches) @@ -103,7 +110,6 @@ type MessageDbSource abstract member Start : unit -> Propulsion.SourcePipeline default x.Start() = base.Start(x.Pump()) - /// Pumps to the Sink until either the specified timeout has been reached, or all items in the Source have been fully consumed member x.RunUntilCaughtUp(timeout : TimeSpan, statsInterval : IntervalTimer) = task { let sw = Stopwatch.start () diff --git a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj index 3d4bde9c..7ee75d73 100644 --- a/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj +++ b/src/Propulsion.MessageDb/Propulsion.MessageDb.fsproj @@ -2,14 +2,13 @@ net6.0 - true Infrastructure.fs - + @@ -17,10 +16,10 @@ + - diff --git a/src/Propulsion.MessageDb/ReaderCheckpoint.fs b/src/Propulsion.MessageDb/ReaderCheckpoint.fs index e20fc2f4..c24c2a8e 100644 --- a/src/Propulsion.MessageDb/ReaderCheckpoint.fs +++ b/src/Propulsion.MessageDb/ReaderCheckpoint.fs @@ -5,69 +5,75 @@ open NpgsqlTypes open Propulsion.Feed open Propulsion.Infrastructure +let [] TableName = "propulsion_checkpoint" -let table = "propulsion_checkpoint" +module internal Impl = -let createIfNotExists (conn : NpgsqlConnection, schema: string) = - let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{table} ( - source text not null, - tranche text not null, - consumer_group text not null, - position bigint not null, - primary key (source, tranche, consumer_group));") - cmd.ExecuteNonQueryAsync() |> Async.AwaitTaskCorrect |> Async.Ignore + open System.Threading + open System.Threading.Tasks -let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64) - = async { - let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{table}(source, tranche, consumer_group, position) - values (@Source, @Tranche, @ConsumerGroup, @Position) - on conflict (source, tranche, consumer_group) - do update set position = @Position;") - cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore - cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore - cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore - cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore + let createIfNotExists (conn : NpgsqlConnection, schema: string) ct = task { + let cmd = conn.CreateCommand(CommandText = $"create table if not exists {schema}.{TableName} ( + source text not null, + tranche text not null, + consumer_group text not null, + position bigint not null, + primary key (source, tranche, consumer_group));") + do! cmd.ExecuteNonQueryAsync(ct) : Task } - let! ct = Async.CancellationToken - do! cmd.ExecuteNonQueryAsync(ct) |> Async.AwaitTaskCorrect |> Async.Ignore } + let commitPosition (conn : NpgsqlConnection, schema: string) source tranche (consumerGroup : string) (position : int64) ct = task { + let cmd = conn.CreateCommand(CommandText = $"insert into {schema}.{TableName}(source, tranche, consumer_group, position) + values (@Source, @Tranche, @ConsumerGroup, @Position) + on conflict (source, tranche, consumer_group) + do update set position = @Position;") + cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore + cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore + cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore + cmd.Parameters.AddWithValue("Position", NpgsqlDbType.Bigint, position) |> ignore + do! cmd.ExecuteNonQueryAsync(ct) :> Task } -let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) = async { - let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{table} - where source = @Source - and tranche = @Tranche - and consumer_group = @ConsumerGroup") - cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore - cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore - cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore + let tryGetPosition (conn : NpgsqlConnection, schema : string) source tranche (consumerGroup : string) (ct : CancellationToken) = task { + let cmd = conn.CreateCommand(CommandText = $"select position from {schema}.{TableName} + where source = @Source + and tranche = @Tranche + and consumer_group = @ConsumerGroup") + cmd.Parameters.AddWithValue("Source", NpgsqlDbType.Text, SourceId.toString source) |> ignore + cmd.Parameters.AddWithValue("Tranche", NpgsqlDbType.Text, TrancheId.toString tranche) |> ignore + cmd.Parameters.AddWithValue("ConsumerGroup", NpgsqlDbType.Text, consumerGroup) |> ignore + use! reader = cmd.ExecuteReaderAsync(ct) + return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone } - let! ct = Async.CancellationToken - use! reader = cmd.ExecuteReaderAsync(ct) |> Async.AwaitTaskCorrect - return if reader.Read() then ValueSome (reader.GetInt64 0) else ValueNone } + let exec connString f= async { + let! ct = Async.CancellationToken + use! conn = connect connString ct |> Async.AwaitTaskCorrect + return! f conn ct |> Async.AwaitTaskCorrect } -type CheckpointStore(connString : string, schema: string, consumerGroupName, defaultCheckpointFrequency) = - let connect = Npgsql.connect connString +type CheckpointStore(connString : string, schema : string, consumerGroupName, defaultCheckpointFrequency : System.TimeSpan) = - member _.CreateSchemaIfNotExists() = async { - let! ct = Async.CancellationToken - use! conn = connect ct |> Async.AwaitTaskCorrect - return! createIfNotExists (conn, schema) } + let exec f = Impl.exec connString f + let setPos source tranche pos = + let commit conn = Impl.commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos) + exec commit - interface IFeedCheckpointStore with + member _.CreateSchemaIfNotExists() : Async = + let creat conn = Impl.createIfNotExists (conn, schema) + exec creat - member _.Start(source, tranche, ?establishOrigin) = async { - let! ct = Async.CancellationToken - use! conn = connect ct |> Async.AwaitTaskCorrect - let! maybePos = tryGetPosition (conn, schema) source tranche consumerGroupName - let! pos = - match maybePos, establishOrigin with - | ValueSome pos, _ -> async { return Position.parse pos } - | ValueNone, Some f -> f - | ValueNone, None -> async { return Position.initial } - return defaultCheckpointFrequency, pos } + member _.Override(source, tranche, pos : Position) : Async = + setPos source tranche pos - member _.Commit(source, tranche, pos) = async { - let! ct = Async.CancellationToken - use! conn = connect ct |> Async.AwaitTaskCorrect - return! commitPosition (conn, schema) source tranche consumerGroupName (Position.toInt64 pos) } + interface IFeedCheckpointStore with + member _.Start(source, tranche, ?establishOrigin) = + let start conn ct = task { + let! maybePos = Impl.tryGetPosition (conn, schema) source tranche consumerGroupName ct |> Async.AwaitTaskCorrect + let! pos = + match maybePos, establishOrigin with + | ValueSome pos, _ -> async { return Position.parse pos } + | ValueNone, Some f -> f + | ValueNone, None -> async { return Position.initial } + return struct (defaultCheckpointFrequency, pos) } + exec start + member _.Commit(source, tranche, pos) : Async = + setPos source tranche pos diff --git a/src/Propulsion.MessageDb/Readme.md b/src/Propulsion.MessageDb/Readme.md index d7c6cd14..f8cc2135 100644 --- a/src/Propulsion.MessageDb/Readme.md +++ b/src/Propulsion.MessageDb/Readme.md @@ -14,12 +14,13 @@ let quickStart log stats categories handle = async { // The checkpoint store will receive the highest version // that has been handled and flushes it to the // table on an interval - let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Port=5433; Username=postgres; Password=postgres", "public", groupName, TimeSpan.FromSeconds 10) + let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Port=5433; Username=postgres; Password=postgres", + "public", groupName, TimeSpan.FromSeconds 10) // Creates the checkpoint table in the schema // You can also create this manually do! checkpoints.CreateSchemaIfNotExists() - let client = MessageDbCategoryClient("Host=localhost; Database=message_store; Port=5433; Username=message_store; Password=;") + let connStr = "Host=localhost; Database=message_store; Port=5433; Username=message_store; Password=;" let maxReadAhead = 100 let maxConcurrentStreams = 2 use sink = @@ -30,7 +31,7 @@ let quickStart log stats categories handle = async { use src = MessageDbSource( log, statsInterval = TimeSpan.FromMinutes 1, - client, batchSize = 1000, + connStr, batchSize = 1000, // Controls the time to wait once fully caught up // before requesting a new batch of events tailSleepInterval = TimeSpan.FromMilliseconds 100, diff --git a/src/Propulsion.MessageDb/Types.fs b/src/Propulsion.MessageDb/Types.fs new file mode 100644 index 00000000..6e0fbb7b --- /dev/null +++ b/src/Propulsion.MessageDb/Types.fs @@ -0,0 +1,6 @@ +namespace Propulsion.MessageDb + +open FSharp.UMX + +module internal FeedSourceId = + let wellKnownId : Propulsion.Feed.SourceId = UMX.tag "messageDb" diff --git a/tests/Propulsion.MessageDb.Integration/Tests.fs b/tests/Propulsion.MessageDb.Integration/Tests.fs index de960f1b..56c81164 100644 --- a/tests/Propulsion.MessageDb.Integration/Tests.fs +++ b/tests/Propulsion.MessageDb.Integration/Tests.fs @@ -1,25 +1,22 @@ -module Tests +module Propulsion.MessageDb.Integration.Tests -open System -open System.Collections.Generic -open System.Threading.Tasks open FSharp.Control open Npgsql open NpgsqlTypes -open Propulsion.Feed -open Propulsion.Streams -open TypeShape.UnionContract -open Xunit open Propulsion.MessageDb -open Swensen.Unquote open Propulsion.Infrastructure open Propulsion.Internal +open Swensen.Unquote +open System +open System.Collections.Generic +open System.Threading.Tasks +open Xunit module Simple = - type Hello = {name: string} + type Hello = { name : string} type Event = | Hello of Hello - interface IUnionContract + interface TypeShape.UnionContract.IUnionContract let codec = FsCodec.SystemTextJson.Codec.Create() let writeMessagesToCategory category = task { @@ -47,37 +44,38 @@ let ``It processes events for a category`` () = async { let category2 = $"{Guid.NewGuid():N}" do! writeMessagesToCategory category1 |> Async.AwaitTaskCorrect do! writeMessagesToCategory category2 |> Async.AwaitTaskCorrect - let reader = Core.MessageDbCategoryClient("Host=localhost; Database=message_store; Port=5433; Username=message_store; Password=;") + let connString = "Host=localhost; Database=message_store; Port=5433; Username=message_store; Password=;" let checkpoints = ReaderCheckpoint.CheckpointStore("Host=localhost; Database=message_store; Port=5433; Username=postgres; Password=postgres", "public", $"TestGroup{consumerGroup}", TimeSpan.FromSeconds 10) do! checkpoints.CreateSchemaIfNotExists() let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1) with member _.HandleExn(log, x) = () member _.HandleOk x = () } - let stop = ref (fun () -> ()) + let mutable stop = ignore let handled = HashSet<_>() - let handle struct(stream, evts: StreamSpan<_>) = async { - lock handled (fun _ -> for evt in evts do handled.Add((stream, evt.Index)) |> ignore) - test <@ Array.chooseV Simple.codec.TryDecode evts |> Array.forall ((=) (Simple.Hello {name = "world"})) @> + let handle struct (stream, events: Propulsion.Streams.Default.StreamSpan) = async { + lock handled (fun _ -> + for evt in events do + handled.Add((stream, evt.Index)) |> ignore) + test <@ Array.chooseV Simple.codec.TryDecode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @> if handled.Count >= 2000 then - stop.contents() + stop () return struct (Propulsion.Streams.SpanResult.AllProcessed, ()) } use sink = Propulsion.Streams.Default.Config.Start(log, 2, 2, handle, stats, TimeSpan.FromMinutes 1) let source = MessageDbSource( log, TimeSpan.FromMinutes 1, - reader, 1000, TimeSpan.FromMilliseconds 100, + connString, 1000, TimeSpan.FromMilliseconds 100, checkpoints, sink, [| category1; category2 |]) use src = source.Start() - // who says you can't do backwards referencing in F# - stop.contents <- src.Stop + stop <- src.Stop Task.Delay(TimeSpan.FromSeconds 30).ContinueWith(fun _ -> src.Stop()) |> ignore do! src.AwaitShutdown() + // 2000 total events test <@ handled.Count = 2000 @> // 20 in each stream test <@ handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.length) |> Array.forall ((=) 20) @> // they were handled in order within streams - let ordering = handled |> Array.ofSeq |> Array.groupBy fst |> Array.map (snd >> Array.map snd) - test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> -} + let ordering = handled |> Seq.groupBy fst |> Seq.map (snd >> Seq.map snd >> Seq.toArray) |> Seq.toArray + test <@ ordering |> Array.forall ((=) [| 0L..19L |]) @> } diff --git a/tools/Propulsion.Tool/Args.fs b/tools/Propulsion.Tool/Args.fs index 303a128f..87f90cc3 100644 --- a/tools/Propulsion.Tool/Args.fs +++ b/tools/Propulsion.Tool/Args.fs @@ -271,25 +271,26 @@ module Mdb = | [] Category of string interface IArgParserTemplate with member a.Usage = a |> function - | ConnectionString _ -> $"Connection string for the postgres database housing message-db. (Optional if environment variable {CONNECTION_STRING} is defined)" + | ConnectionString _ -> $"Connection string for the postgres database housing message-db. (Optional if environment variable {CONNECTION_STRING} is defined)" | CheckpointConnectionString _ -> "Connection string used for the checkpoint store. If not specified, defaults to the connection string argument" - | Schema _ -> $"Schema that should contain the checkpoints table Optional if environment variable {SCHEMA} is defined" - | Category _ -> "The message-db categories to load" + | Schema _ -> $"Schema that should contain the checkpoints table Optional if environment variable {SCHEMA} is defined" + | Category _ -> "The message-db category to load (must specify >1 when projecting)" type Arguments(c : Configuration, p : ParseResults) = - let conn = p.TryGetResult ConnectionString |> Option.defaultWith (fun () -> c.MdbConnectionString) - let checkpointConn = p.TryGetResult CheckpointConnectionString |> Option.defaultValue conn + let conn () = p.TryGetResult ConnectionString |> Option.defaultWith (fun () -> c.MdbConnectionString) + let checkpointConn () = p.TryGetResult CheckpointConnectionString |> Option.defaultWith conn let schema = p.TryGetResult Schema |> Option.defaultWith (fun () -> c.MdbSchema) - member x.CreateClient() = Array.ofList (p.GetResults Category), Propulsion.MessageDb.Core.MessageDbCategoryClient(conn) + member x.CreateClient() = + Array.ofList (p.GetResults Category), conn () member x.CreateCheckpointStore(group) = - Propulsion.MessageDb.ReaderCheckpoint.CheckpointStore(checkpointConn, schema, group, TimeSpan.FromSeconds 5.) + Propulsion.MessageDb.ReaderCheckpoint.CheckpointStore(checkpointConn (), schema, group, TimeSpan.FromSeconds 5.) + member x.CreateCheckpointStoreTable() = async { - let log = Log.Logger - let connStringWithoutPassword = NpgsqlConnectionStringBuilder(checkpointConn, Password = null) - log.Information("Authenticating with postgres using {connectionString}", connStringWithoutPassword.ToString()) - log.Information("Creating checkpoints table as {table}", $"{schema}.{Propulsion.MessageDb.ReaderCheckpoint.table}") + let connStringWithoutPassword = NpgsqlConnectionStringBuilder(checkpointConn (), Password = null) + Log.Information("Authenticating with postgres using {connectionString}", connStringWithoutPassword.ToString()) + Log.Information("Creating checkpoints table as {table}", $"{schema}.{Propulsion.MessageDb.ReaderCheckpoint.TableName}") let checkpointStore = x.CreateCheckpointStore("nil") do! checkpointStore.CreateSchemaIfNotExists() - log.Information("Table created") } + Log.Information("Table created") } diff --git a/tools/Propulsion.Tool/Program.fs b/tools/Propulsion.Tool/Program.fs index 73c56125..7152f417 100644 --- a/tools/Propulsion.Tool/Program.fs +++ b/tools/Propulsion.Tool/Program.fs @@ -1,7 +1,6 @@ module Propulsion.Tool.Program open Argu -open Propulsion.Feed open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException open Propulsion.Tool.Args open Serilog @@ -70,7 +69,7 @@ and [] IndexParameters = | DynamoDbJson _ -> "Source DynamoDB JSON filename(s) to import (optional, omitting displays current state)" | MinSizeK _ -> "Index Stream minimum Item size in KiB. Default 48" | EventsPerBatch _ -> "Maximum Events to Ingest as a single batch. Default 10000" - | GapsLimit _ -> "Max Number of gaps to ouput to console. Default 10" + | GapsLimit _ -> "Max Number of gaps to output to console. Default 10" | Dynamo _ -> "Specify DynamoDB parameters." @@ -82,6 +81,7 @@ and [] CheckpointParameters = | [] Cosmos of ParseResults | [] Dynamo of ParseResults + | [] Pg of ParseResults interface IArgParserTemplate with member a.Usage = a |> function | Group _ -> "Consumer Group" @@ -91,6 +91,7 @@ and [] CheckpointParameters = | Cosmos _ -> "Specify CosmosDB parameters." | Dynamo _ -> "Specify DynamoDB parameters." + | Pg _ -> "Specify MessageDb parameters." and [] ProjectParameters = | [] ConsumerGroupName of string @@ -164,9 +165,10 @@ module Checkpoints = type Arguments(c, p : ParseResults) = member val StoreArgs = match p.GetSubCommand() with - | CheckpointParameters.Cosmos p -> Choice1Of2 (Args.Cosmos.Arguments (c, p)) - | CheckpointParameters.Dynamo p -> Choice2Of2 (Args.Dynamo.Arguments (c, p)) - | _ -> missingArg "Must specify `cosmos` or `dynamo` store" + | CheckpointParameters.Cosmos p -> Choice1Of3 (Args.Cosmos.Arguments (c, p)) + | CheckpointParameters.Dynamo p -> Choice2Of3 (Args.Dynamo.Arguments (c, p)) + | CheckpointParameters.Pg p -> Choice3Of3 (Args.Mdb.Arguments (c, p)) + | x -> missingArg $"unexpected subcommand %A{x}" let readOrOverride (c, p : ParseResults) = async { let a = Arguments(c, p) @@ -174,12 +176,15 @@ module Checkpoints = let! store, storeSpecFragment, overridePosition = async { let cache = Equinox.Cache (appName, sizeMb = 1) match a.StoreArgs with - | Choice1Of2 a -> + | Choice1Of3 a -> let! store = a.CreateCheckpointStore(group, cache, Log.forMetrics) return (store : Propulsion.Feed.IFeedCheckpointStore), "cosmos", fun pos -> store.Override(source, tranche, pos) - | Choice2Of2 a -> + | Choice2Of3 a -> let store = a.CreateCheckpointStore(group, cache, Log.forMetrics) - return store, $"dynamo -t {a.IndexTable}", fun pos -> store.Override(source, tranche, pos) } + return store, $"dynamo -t {a.IndexTable}", fun pos -> store.Override(source, tranche, pos) + | Choice3Of3 a -> + let store = a.CreateCheckpointStore(group) + return store, null, fun pos -> store.Override(source, tranche, pos) } Log.Information("Checkpoint Source {source} Tranche {tranche} Consumer Group {group}", source, tranche, group) match p.TryGetResult OverridePosition with | None -> @@ -188,9 +193,10 @@ module Checkpoints = | Some pos -> Log.Warning("Checkpoint Overriding to {pos}...", pos) do! overridePosition pos - let sid = Propulsion.Feed.ReaderCheckpoint.streamId (source, tranche, group) - let cmd = $"eqx dump '{Propulsion.Feed.ReaderCheckpoint.Category}-{sid}' {storeSpecFragment}" - Log.Information("Inspect via 👉 {cmd}", cmd) } + if storeSpecFragment <> null then + let sid = Propulsion.Feed.ReaderCheckpoint.streamId (source, tranche, group) + let cmd = $"eqx dump '{Propulsion.Feed.ReaderCheckpoint.Category}-{sid}' {storeSpecFragment}" + Log.Information("Inspect via 👉 {cmd}", cmd) } module Indexer = @@ -367,8 +373,8 @@ module Project = ?trancheIds = indexFilter ).Start() | Choice3Of3 sa -> - let checkpoints = sa.CreateCheckpointStore(group) let categories, client = sa.CreateClient() + let checkpoints = sa.CreateCheckpointStore(group) Propulsion.MessageDb.MessageDbSource( Log.Logger, stats.StatsInterval, client, defaultArg maxItems 100, TimeSpan.FromSeconds 0.5,