Skip to content

Commit

Permalink
DynamoStoreSource logging improvements
Browse files Browse the repository at this point in the history
Inc update to Equinox 4.0.0-beta2.2
  • Loading branch information
bartelink committed May 16, 2022
1 parent 70b6e6d commit 56e0220
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/Propulsion.CosmosStore/Propulsion.CosmosStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

<PackageReference Include="FSharp.Core" Version="4.5.4" />

<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-beta.2" />
<PackageReference Include="Equinox.CosmosStore" Version="4.0.0-beta.4.2" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.2.2" />
</ItemGroup>

Expand Down
4 changes: 0 additions & 4 deletions src/Propulsion.CosmosStore/ReaderCheckpoint.fs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,7 @@ type Service internal (resolve : SourceId * TrancheId * string -> Decider<Events
member _.Start(source, tranche, ?establishOrigin) : Async<TimeSpan * Position> =
let decider = resolve (source, tranche, consumerGroupName)
let establishOrigin = match establishOrigin with None -> async { return Position.initial } | Some f -> f
#if !COSMOSV2 && !COSMOSV3
decider.Transact(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency)
#else
decider.TransactAsync(decideStart establishOrigin DateTimeOffset.UtcNow defaultCheckpointFrequency)
#endif

/// Ingest a position update
/// NB fails if not already initialized; caller should ensure correct initialization has taken place via Read -> Start
Expand Down
6 changes: 2 additions & 4 deletions src/Propulsion.DynamoStore/AppendsEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ module Ingest =
| Start es
| Append es -> es
| Discard -> () |]
let decide shouldClose (inputs : Events.StreamSpan seq) = function
let decide shouldClose (inputs : Events.StreamSpan seq) : _ -> _ * _ = function
| ({ closed = false; versions = cur } as state : Fold.State) ->
let closed, ingested, events =
match tryToIngested state inputs with
Expand All @@ -110,9 +110,7 @@ type Service internal (shouldClose, resolve : AppendsTrancheId * AppendsEpochId

let isSelf p = IndexStreamId.toStreamName p |> FsCodec.StreamName.splitCategoryAndId |> fst = Category
if spans |> Array.exists (function { p = p } -> isSelf p) then invalidArg (nameof spans) "Writes to indices should be filtered prior to indexing"
decider.TransactEx((fun (c : Equinox.ISyncContext<_>) -> async {
return Ingest.decide (shouldClose c.Version) spans c.State
}), (fun r _c -> r), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale)
decider.TransactEx((fun c -> Ingest.decide (shouldClose c.Version) spans c.State), if assumeEmpty = Some true then Equinox.AssumeEmpty else Equinox.AllowStale)

module Config =

Expand Down
41 changes: 33 additions & 8 deletions src/Propulsion.DynamoStore/DynamoStoreSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,37 @@ module private Impl =
let! version = epochs.ReadVersion(trancheId, epochId)
return Checkpoint.ofEpochAndOffset epochId version |> Checkpoint.toPosition }

let logReadFailure (storeLog : Serilog.ILogger) =
let force = storeLog.IsEnabled Serilog.Events.LogEventLevel.Verbose
function
| Exceptions.ProvisionedThroughputExceeded when not force -> ()
| e -> storeLog.Warning(e, "DynamoDb read failure")

let logCommitFailure (storeLog : Serilog.ILogger) =
let force = storeLog.IsEnabled Serilog.Events.LogEventLevel.Verbose
function
| Exceptions.ProvisionedThroughputExceeded when not force -> ()
| e -> storeLog.Warning(e, "DynamoDb commit failure")

let mkBatch checkpoint isTail items : Propulsion.Feed.Internal.Batch<_> =
{ items = items; checkpoint = Checkpoint.toPosition checkpoint; isTail = isTail }
let sliceBatch epochId offset items =
mkBatch (Checkpoint.ofEpochAndOffset epochId offset) false items
let finalBatch epochId (version, state : AppendsEpoch.Reader.State) items : Propulsion.Feed.Internal.Batch<_> =
mkBatch (Checkpoint.ofEpochClosedAndVersion epochId state.closed version) (not state.closed) items

let readIndexedSpansAsStreamEvents log (maybeLoad, loadDop) batchCutoff (context : DynamoStoreContext) (AppendsTrancheId.Parse tid, Checkpoint.Parse (epochId, offset))
let readIndexedSpansAsStreamEvents (log : Serilog.ILogger, sourceId, storeLog) (maybeLoad, loadDop) batchCutoff (context : DynamoStoreContext) (AppendsTrancheId.Parse tid, Checkpoint.Parse (epochId, offset))
: AsyncSeq<System.TimeSpan * Propulsion.Feed.Internal.Batch<byte[]>> = asyncSeq {
let epochs = AppendsEpoch.Reader.Config.create log context
let epochs = AppendsEpoch.Reader.Config.create storeLog context
let sw = System.Diagnostics.Stopwatch.StartNew()
let! version, state = epochs.Read(tid, epochId, offset)
log.Debug("Loaded {c} ingestion records from {tid} {eid} {off}", state.changes.Length, tid, epochId, offset)
sw.Stop()
let streamEvents =
let all = state.changes |> Seq.collect (fun struct (_i, xs) -> xs) |> AppendsEpoch.flatten
all |> Seq.choose (fun span -> maybeLoad (IndexStreamId.toStreamName span.p) (span.i, span.c) |> Option.map (fun load -> span.p, load)) |> dict
let totalStreams, streamEvents =
let all = state.changes |> Seq.collect (fun struct (_i, xs) -> xs) |> AppendsEpoch.flatten |> Array.ofSeq
all.Length, all |> Seq.choose (fun span -> maybeLoad (IndexStreamId.toStreamName span.p) (span.i, span.c) |> Option.map (fun load -> span.p, load)) |> dict
if streamEvents.Count > batchCutoff then
log.Information("Reader {sourceId}/{trancheId}/{epochId}@{offset} Loaded Changes {changes} Streams {loadingCount}/{streamsCount}", sourceId, tid, epochId, offset, state.changes.Length, streamEvents.Count, totalStreams)
let buffer, cache = ResizeArray<AppendsEpoch.Events.StreamSpan>(), ConcurrentDictionary()
// For each batch we produce, we load any streams we have not already loaded at this time
let materializeSpans : Async<StreamEvent array> = async {
Expand All @@ -88,10 +102,17 @@ module private Impl =
// TOCONSIDER revise logic to share session key etc to rule this out
let events = Array.sub items (span.i - items[0].Index |> int) span.c.Length
for e in events do ({ stream = IndexStreamId.toStreamName span.p; event = e } : StreamEvent) |] }
let mutable prevLoaded = 0L
for i, spans in state.changes do
let pending = spans |> Array.filter (fun (span : AppendsEpoch.Events.StreamSpan) -> streamEvents.ContainsKey(span.p))
if buffer.Count <> 0 && buffer.Count + pending.Length > batchCutoff then
let! hydrated = materializeSpans
match cache.Count with
| loadedNow when prevLoaded <> loadedNow ->
prevLoaded <- loadedNow
log.Information("Reader {sourceId}/{trancheId}/{epochId}@{offset} Loaded {streams}s {events}e",
sourceId, tid, epochId, i, cache.Count, cache.Values |> Seq.sumBy Array.length)
| _ -> ()
yield sw.Elapsed, sliceBatch epochId i hydrated // not i + 1 as the batch does not include these changes
sw.Reset()
buffer.Clear()
Expand Down Expand Up @@ -139,10 +160,11 @@ type DynamoStoreSource
// Override default start position to be at the tail of the index (Default: Always replay all events)
?fromTail,
// Separated log for DynamoStore calls in order to facilitate filtering and/or gathering metrics
?storeLog) =
?storeLog,
?readFailureSleepInterval) =
inherit Propulsion.Feed.Internal.TailingFeedSource(log, statsInterval, sourceId, tailSleepInterval,
Impl.readIndexedSpansAsStreamEvents
(defaultArg storeLog log)
(log, sourceId, defaultArg storeLog log)
(LoadMode.map (defaultArg storeLog log) loadMode)
eventBatchLimit
(DynamoStoreContext indexClient),
Expand All @@ -151,7 +173,10 @@ type DynamoStoreSource
else Some (Impl.readTailPositionForTranche
(defaultArg storeLog log)
(DynamoStoreContext indexClient))),
sink)
sink,
Impl.logReadFailure (defaultArg storeLog log),
(defaultArg readFailureSleepInterval (tailSleepInterval*2.)),
Impl.logCommitFailure (defaultArg storeLog log))

member internal _.Pump() =
let context = DynamoStoreContext(indexClient)
Expand Down
2 changes: 1 addition & 1 deletion src/Propulsion.DynamoStore/Propulsion.DynamoStore.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<PackageReference Include="FSharp.Core" Version="4.7.2" />

<!-- <PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.4" />-->
<PackageReference Include="Equinox.DynamoStore" Version="4.0.0-beta.4.2" />
<PackageReference Include="FsCodec.SystemTextJson" Version="3.0.0-rc.2.2" />
</ItemGroup>
<ItemGroup>
Expand Down
24 changes: 13 additions & 11 deletions src/Propulsion.Feed/FeedReader.fs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ module private Impl =
let mutable batchLastPosition = Position.parse -1L
let mutable batchCaughtUp = false

let mutable pagesRead, pagesEmpty = 0, 0
let mutable readLatency, recentPagesRead, itemsRead, recentPagesEmpty = TimeSpan.Zero, 0, 0, 0
let mutable pagesRead, pagesEmpty, events = 0, 0, 0L
let mutable readLatency, recentPagesRead, recentEvents, recentPagesEmpty = TimeSpan.Zero, 0, 0, 0

let mutable ingestLatency, currentBatches, maxBatches = TimeSpan.Zero, 0, 0

Expand All @@ -57,14 +57,14 @@ module private Impl =
let p pos = match pos with p when p = Position.parse -1L -> Nullable() | x -> Nullable x
let m = Log.Metric.Read {
source = source; tranche = tranche
token = p batchLastPosition; latency = readLatency; pages = recentPagesRead; items = itemsRead
token = p batchLastPosition; latency = readLatency; pages = recentPagesRead; items = recentEvents
ingestLatency = ingestLatency; ingestQueued = currentBatches }
let readS, postS = readLatency.TotalSeconds, ingestLatency.TotalSeconds
(log |> Log.metric m).Information(
"Reader {source:l}/{tranche:l} Position {readPosition} Tail {caughtUp} Committed {lastCommittedPosition} Pages {pagesRead} Empty {pagesEmpty} | Recent {l:f1}s Pages {recentPagesRead} Empty {recentPagesEmpty} Items {itemsRead} | Wait {pausedS:f1}s Ahead {cur}/{max}",
source, tranche, p batchLastPosition, batchCaughtUp, p lastCommittedPosition, pagesRead, pagesEmpty, readS, recentPagesRead, recentPagesEmpty, itemsRead, postS, currentBatches, maxBatches)
"Reader {source:l}/{tranche:l} Position {readPosition} Tail {caughtUp} Committed {lastCommittedPosition} Pages {pagesRead} Empty {pagesEmpty} Events {events} | Recent {l:f1}s Pages {recentPagesRead} Empty {recentPagesEmpty} Events {recentEvents} | Wait {pausedS:f1}s Ahead {cur}/{max}",
source, tranche, p batchLastPosition, batchCaughtUp, p lastCommittedPosition, pagesRead, pagesEmpty, events, readS, recentPagesRead, recentPagesEmpty, recentEvents, postS, currentBatches, maxBatches)
readLatency <- TimeSpan.Zero; ingestLatency <- TimeSpan.Zero;
recentPagesRead <- 0; itemsRead <- 0; recentPagesEmpty <- 0
recentPagesRead <- 0; recentEvents <- 0; recentPagesEmpty <- 0

member _.RecordBatch(readTime, batch: Batch<_>) =
readLatency <- readLatency + readTime
Expand All @@ -74,7 +74,8 @@ module private Impl =
| 0 -> pagesEmpty <- pagesEmpty + 1
recentPagesEmpty <- recentPagesEmpty + 1
| c -> pagesRead <- pagesRead + 1
itemsRead <- itemsRead + c
events <- events + int64 c
recentEvents <- recentEvents + c
recentPagesRead <- recentPagesRead + 1

member _.UpdateCommittedPosition(pos) =
Expand Down Expand Up @@ -118,7 +119,8 @@ type FeedReader
* TrancheId// identifiers of source and tranche within that; a checkpoint is maintained per such pairing
* Position // index representing next read position in stream
// permitted to throw if it fails; failures are counted and/or retried with throttling
-> Async<unit>) =
-> Async<unit>,
?logCommitFailure) =

let log = log.ForContext("source", sourceId).ForContext("tranche", trancheId)
let stats = Stats(log, statsInterval, sourceId, trancheId)
Expand All @@ -127,9 +129,9 @@ type FeedReader
try do! commitCheckpoint (sourceId, trancheId, position)
stats.UpdateCommittedPosition(position)
log.Debug("Committed checkpoint {position}", position)
with exc ->
log.Warning(exc, "Exception while committing checkpoint {position}", position)
return! Async.Raise exc }
with e ->
match logCommitFailure with None -> log.ForContext<FeedReader>().Debug(e, "Exception while committing checkpoint {position}", position) | Some l -> l e
return! Async.Raise e }

let submitPage (readLatency, batch : Batch<byte[]>) = async {
stats.RecordBatch(readLatency, batch)
Expand Down
18 changes: 11 additions & 7 deletions src/Propulsion.Feed/FeedSource.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ open System
type FeedSourceBase internal
( log : Serilog.ILogger, statsInterval : TimeSpan, sourceId,
checkpoints : IFeedCheckpointStore, establishOrigin : (TrancheId -> Async<Position>) option,
sink : ProjectorPipeline<Ingestion.Ingester<seq<StreamEvent<byte[]>>, Submission.SubmissionBatch<int,StreamEvent<byte[]>>>>) =
sink : ProjectorPipeline<Ingestion.Ingester<seq<StreamEvent<byte[]>>, Submission.SubmissionBatch<int,StreamEvent<byte[]>>>>,
?logCommitFailure) =

let log = log.ForContext("source", sourceId)

let pumpPartition crawl partitionId trancheId = async {
let log = log.ForContext("tranche", trancheId)
let ingester : Ingestion.Ingester<_, _> = sink.StartIngester(log, partitionId)
let reader = FeedReader(log, sourceId, trancheId, statsInterval, crawl trancheId, ingester.Submit, checkpoints.Commit)
let reader = FeedReader(log, sourceId, trancheId, statsInterval, crawl trancheId, ingester.Submit, checkpoints.Commit, ?logCommitFailure = logCommitFailure)
try let! freq, pos = checkpoints.Start(sourceId, trancheId, ?establishOrigin = (match establishOrigin with None -> None | Some f -> Some (f trancheId)))
log.Information("Reading {source:l}/{tranche:l} From {pos} Checkpoint Event interval {checkpointFreq:n1}m", sourceId, trancheId, pos, freq.TotalMinutes)
return! reader.Pump(pos)
Expand Down Expand Up @@ -45,18 +46,21 @@ type TailingFeedSource
sourceId, tailSleepInterval : TimeSpan,
crawl : TrancheId * Position -> AsyncSeq<TimeSpan * Batch<_>>,
checkpoints : IFeedCheckpointStore, establishOrigin : (TrancheId -> Async<Position>) option,
sink : ProjectorPipeline<Ingestion.Ingester<seq<StreamEvent<byte[]>>, Submission.SubmissionBatch<int,StreamEvent<byte[]>>>>) =
inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink)
sink : ProjectorPipeline<Ingestion.Ingester<seq<StreamEvent<byte[]>>, Submission.SubmissionBatch<int,StreamEvent<byte[]>>>>,
?logReadFailure,
?readFailureSleepInterval : TimeSpan,
?logCommitFailure) =
inherit FeedSourceBase(log, statsInterval, sourceId, checkpoints, establishOrigin, sink, ?logCommitFailure = logCommitFailure)

let crawl trancheId (wasLast, startPos) = asyncSeq {
if wasLast then
do! Async.Sleep tailSleepInterval
try let batches = crawl (trancheId, startPos)
for batch in batches do
yield batch
with e ->
log.Warning(e, "Read failure")
do! Async.Sleep tailSleepInterval }
with e -> // Swallow (and sleep, if requested) if there's an issue reading from a tailing log
match logReadFailure with None -> log.ForContext<TailingFeedSource>().Warning(e, "Read failure") | Some l -> l e
match readFailureSleepInterval with None -> () | Some interval -> do! Async.Sleep interval }

member _.Pump(readTranches) =
base.Pump(readTranches, crawl)
Expand Down

0 comments on commit 56e0220

Please sign in to comment.