Skip to content

Commit

Permalink
fix: fix deadlock caused by BatchSequence with timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
BEagle1984 committed May 9, 2023
1 parent d14ee3b commit 2d43f1d
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 16 deletions.
1 change: 1 addition & 0 deletions docs/releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ uid: releases
### Fixes

* Prevent an `ObjectDisposedException` to be thrown by the `BrokerCallbackInvoker` during application shutdown
* Fix possible deadlock in [BatchSequence](xref:Silverback.Messaging.Sequences.Batch.BatchSequence) with timeout

## [4.2.0](https://github.com/BEagle1984/silverback/releases/tag/v4.2.0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,6 @@ protected virtual void Dispose(bool disposing)
_addingSemaphoreSlim.Wait();
_addingSemaphoreSlim.Dispose();

Context.Dispose();

// If necessary cancel the SequencerBehaviorsTask (if an error occurs between the two behaviors)
if (!SequencerBehaviorsTask.IsCompleted)
_sequencerBehaviorsTaskCompletionSource.TrySetCanceled();
Expand Down Expand Up @@ -621,8 +619,6 @@ private async Task<bool> RollbackTransactionAndNotifyProcessingCompletedAsync(Ex
case SequenceAbortReason.IncompleteSequence:
await Context.TransactionManager.RollbackAsync(exception, true).ConfigureAwait(false);
break;
case SequenceAbortReason.None:
throw new InvalidOperationException("Reason shouldn't be None.");
case SequenceAbortReason.ConsumerAborted:
case SequenceAbortReason.Disposing:
done = await Context.TransactionManager.RollbackAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,18 @@ public virtual async Task HandleAsync(ConsumerPipelineContext context, ConsumerB

// Loop again if the retrieved sequence has completed already in the meanwhile
if (!sequence.IsPending || sequence.IsCompleting)
{
// If the sequence was new, it means it was never handed over to the transaction handler
// (no message was added to the sequence so far, the timeout elapsed before the sequence
// was used).
if (sequence.IsNew)
{
await context.SequenceStore.RemoveAsync(sequence.SequenceId).ConfigureAwait(false);
sequence.Dispose();
}

continue;
}

AddToSequenceResult addResult =
await sequence.AddAsync(originalEnvelope, previousSequence).ConfigureAwait(false);
Expand All @@ -101,18 +112,7 @@ public virtual async Task HandleAsync(ConsumerPipelineContext context, ConsumerB
}

if (sequence.IsComplete)
{
await AwaitOtherBehaviorIfNeededAsync(sequence).ConfigureAwait(false);

// Mark the envelope as the end of the sequence only if the sequence wasn't swapped (e.g. chunk -> batch)
if (sequence.Context.Sequence == null || sequence == sequence.Context.Sequence ||
sequence.Context.Sequence.IsCompleting || sequence.Context.Sequence.IsComplete)
{
context.SetIsSequenceEnd();
}

_logger.LogSequenceCompleted(sequence);
}
await HandleCompletedSequenceAsync(context, sequence).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -250,5 +250,19 @@ private static void StartActivityIfNeeded(ISequence sequence)

return sequence;
}

private async Task HandleCompletedSequenceAsync(ConsumerPipelineContext context, ISequence sequence)
{
await AwaitOtherBehaviorIfNeededAsync(sequence).ConfigureAwait(false);

// Mark the envelope as the end of the sequence only if the sequence wasn't swapped (e.g. chunk -> batch)
if (sequence.Context.Sequence == null || sequence == sequence.Context.Sequence ||
sequence.Context.Sequence.IsCompleting || sequence.Context.Sequence.IsComplete)
{
context.SetIsSequenceEnd();
}

_logger.LogSequenceCompleted(sequence);
}
}
}

0 comments on commit 2d43f1d

Please sign in to comment.