diff --git a/Propulsion.sln b/Propulsion.sln index 74b061e5..79c5c36a 100644 --- a/Propulsion.sln +++ b/Propulsion.sln @@ -53,6 +53,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore", "s EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Propulsion.DynamoStore.Lambda", "src\Propulsion.DynamoStore.Lambda\Propulsion.DynamoStore.Lambda.fsproj", "{0EE2957C-92FC-4D8D-9783-A48B9BE032B7}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.DynamoStore", "..\equinox\src\Equinox.DynamoStore\Equinox.DynamoStore.fsproj", "{BC68A82F-B702-4742-97A7-7B04D4293E47}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -115,6 +117,10 @@ Global {0EE2957C-92FC-4D8D-9783-A48B9BE032B7}.Debug|Any CPU.Build.0 = Debug|Any CPU {0EE2957C-92FC-4D8D-9783-A48B9BE032B7}.Release|Any CPU.ActiveCfg = Release|Any CPU {0EE2957C-92FC-4D8D-9783-A48B9BE032B7}.Release|Any CPU.Build.0 = Release|Any CPU + {BC68A82F-B702-4742-97A7-7B04D4293E47}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BC68A82F-B702-4742-97A7-7B04D4293E47}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BC68A82F-B702-4742-97A7-7B04D4293E47}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BC68A82F-B702-4742-97A7-7B04D4293E47}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -134,6 +140,7 @@ Global {A1964569-C6A9-46D3-8DD9-01E04AF8CC81} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898} {0B1D505F-FD80-428E-92E5-4367E95E96DF} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898} {0EE2957C-92FC-4D8D-9783-A48B9BE032B7} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898} + {BC68A82F-B702-4742-97A7-7B04D4293E47} = {4670F7C4-A4FD-4E3F-B97C-99F9B3FC1898} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DF04AF73-7412-46E5-9CC8-15CB48E3139A} diff --git a/src/Propulsion.DynamoStore.Lambda/Connector.fs b/src/Propulsion.DynamoStore.Lambda/Connector.fs index 45ad7da9..e9b95ca1 100644 --- a/src/Propulsion.DynamoStore.Lambda/Connector.fs +++ b/src/Propulsion.DynamoStore.Lambda/Connector.fs @@ -21,7 +21,7 @@ type Configuration(?tryGet) = open Equinox.DynamoStore type Connector(serviceUrl, accessKey, secretKey, table) = - let retries, timeout, queryMaxItems, maxBytes = 10, System.TimeSpan.FromSeconds 60., 50, 64*1024 + let retries, timeout, queryMaxItems, maxBytes = 10, System.TimeSpan.FromSeconds 60., 100, 48*1024 let conn = DynamoStoreConnector(serviceUrl, accessKey, secretKey, retries, timeout) let client = conn.CreateClient() diff --git a/src/Propulsion.DynamoStore.Lambda/aws-lambda-tools-defaults.json b/src/Propulsion.DynamoStore.Lambda/aws-lambda-tools-defaults.json index ea836ca0..357908c9 100644 --- a/src/Propulsion.DynamoStore.Lambda/aws-lambda-tools-defaults.json +++ b/src/Propulsion.DynamoStore.Lambda/aws-lambda-tools-defaults.json @@ -9,8 +9,8 @@ "region": "", "configuration": "Release", "function-runtime": "dotnet6", - "function-memory-size": 128, - "function-timeout": 120, + "function-memory-size": 192, + "function-timeout": 180, "function-architecture": "arm64", "function-handler": "Propulsion.DynamoStore.Lambda::Propulsion.DynamoStore.Lambda.Function::FunctionHandler" } diff --git a/src/Propulsion.DynamoStore/AppendsEpoch.fs b/src/Propulsion.DynamoStore/AppendsEpoch.fs index 2de08dc4..02c73cd0 100644 --- a/src/Propulsion.DynamoStore/AppendsEpoch.fs +++ b/src/Propulsion.DynamoStore/AppendsEpoch.fs @@ -110,15 +110,20 @@ type Service internal (shouldClose, resolve : AppendsTrancheId * AppendsEpochId let isSelf p = IndexStreamId.toStreamName p |> FsCodec.StreamName.splitCategoryAndId |> fst = Category if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing" - decider.Transact(Ingest.decide shouldClose spans, if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale) + decider.TransactEx((fun (c : Equinox.ISyncContext<_>) -> async { + return Ingest.decide (shouldClose c.Version) spans c.State + }), (fun r _c -> r), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale) module Config = let private resolveStream (context, cache) = let cat = Config.createUnoptimized Events.codec Fold.initial Fold.fold (context, Some cache) cat.Resolve - let create log maxItemsPerEpoch store = - let shouldClose totalItems = totalItems >= maxItemsPerEpoch + let create (log : Serilog.ILogger) (maxVersion, maxItemsPerEpoch) store = + let shouldClose version totalStreams = + let closing = version >= maxVersion || totalStreams >= maxItemsPerEpoch + if closing then log.Information("Closing v{version} Streams {streams}", version, totalStreams) + closing let resolve = streamName >> resolveStream store >> Config.createDecider log Service(shouldClose, resolve) diff --git a/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs b/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs index dc612320..12150661 100644 --- a/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs +++ b/src/Propulsion.DynamoStore/DynamoStoreIndexer.fs @@ -1,13 +1,14 @@ namespace Propulsion.DynamoStore -type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, ?maxItemsPerEpoch, ?storeLog) = +type DynamoStoreIndexer(log : Serilog.ILogger, context, cache, ?maxItemsPerEpoch, ?maxVersion, ?storeLog) = + let maxVersion = defaultArg maxVersion 2_000 let maxItemsPerEpoch = defaultArg maxItemsPerEpoch 100_000 do if maxItemsPerEpoch > AppendsEpoch.MaxItemsPerEpoch then invalidArg (nameof maxItemsPerEpoch) "Cannot exceed AppendsEpoch.MaxItemsPerEpoch" let storeLog = defaultArg storeLog log let log = log.ForContext() let ingester = - let epochs = AppendsEpoch.Config.create storeLog maxItemsPerEpoch (context, cache) + let epochs = AppendsEpoch.Config.create storeLog (maxVersion, maxItemsPerEpoch) (context, cache) let index = AppendsIndex.Config.create storeLog (context, cache) let createIngester trancheId = let log = log.ForContext("trancheId", trancheId) diff --git a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj index aa29476b..01795afa 100644 --- a/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj +++ b/src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj @@ -24,11 +24,13 @@ - + + +