Skip to content

Commit

Permalink
fixed a bug about setting nextPipelineFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
kerryjiang committed Mar 24, 2024
1 parent cd8b62d commit b4589b0
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,11 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
{
if (buffer.Length > 0)
{
var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var nextPipelineFilter);
var needReadMore = ReaderBuffer(ref buffer, pipelineFilter, packagePipe, out consumed, out examined, out var currentPipelineFilter);

if (nextPipelineFilter != null)
if (currentPipelineFilter != null)
{
_pipelineFilter = pipelineFilter = nextPipelineFilter;
pipelineFilter = currentPipelineFilter;
}

if (!needReadMore)
Expand Down Expand Up @@ -299,7 +299,7 @@ protected void WriteEOFPackage()
_packagePipe.WirteEOF();
}

private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipelineFilter<TPackageInfo> pipelineFilter, IObjectPipe<TPackageInfo> packagePipe, out SequencePosition consumed, out SequencePosition examined, out IPipelineFilter<TPackageInfo> nextPipelineFilter)
private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipelineFilter<TPackageInfo> pipelineFilter, IObjectPipe<TPackageInfo> packagePipe, out SequencePosition consumed, out SequencePosition examined, out IPipelineFilter<TPackageInfo> currentPipelineFilter)
{
consumed = buffer.Start;
examined = buffer.End;
Expand All @@ -312,20 +312,22 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe

while (true)
{
var currentPipelineFilter = pipelineFilter;
var prevPipelineFilter = pipelineFilter;
var filterSwitched = false;

var packageInfo = currentPipelineFilter.Filter(ref seqReader);
var packageInfo = pipelineFilter.Filter(ref seqReader);

nextPipelineFilter = currentPipelineFilter.NextFilter;
var nextFilter = pipelineFilter.NextFilter;

if (nextPipelineFilter != null)
if (nextFilter != null)
{
nextPipelineFilter.Context = currentPipelineFilter.Context; // pass through the context
pipelineFilter = nextPipelineFilter;
nextFilter.Context = pipelineFilter.Context; // pass through the context
_pipelineFilter = pipelineFilter = nextFilter;
filterSwitched = true;
}

currentPipelineFilter = pipelineFilter;

var bytesConsumed = seqReader.Consumed;
bytesConsumedTotal += bytesConsumed;

Expand Down Expand Up @@ -353,14 +355,14 @@ private bool ReaderBuffer<TPackageInfo>(ref ReadOnlySequence<byte> buffer, IPipe
consumed = buffer.GetPosition(bytesConsumedTotal);
return true;
}

// we should reset the previous pipeline filter after switch
currentPipelineFilter.Reset();
prevPipelineFilter.Reset();
}
else
{
// reset the pipeline filter after we parse one full package
currentPipelineFilter.Reset();
prevPipelineFilter.Reset();
packagePipe.Write(packageInfo);
}

Expand Down

0 comments on commit b4589b0

Please sign in to comment.