Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try making SuperSocket.Kestrel support Detach #765

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/SuperSocket.Connection/IObjectPipe.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ interface ISupplyController

void SupplyEnd();
}
}
}
16 changes: 13 additions & 3 deletions src/SuperSocket.Connection/PipeConnectionBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,16 @@ protected void UpdateLastActiveTime()
LastActiveTime = DateTimeOffset.Now;
}

public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
protected virtual IObjectPipe<TPackageInfo> CreatePackagePipe<TPackageInfo>(bool readAsDemand)
{
var packagePipe = !Options.ReadAsDemand
return !readAsDemand
? new DefaultObjectPipe<TPackageInfo>()
: new DefaultObjectPipeWithSupplyControl<TPackageInfo>();
}

public async override IAsyncEnumerable<TPackageInfo> RunAsync<TPackageInfo>(IPipelineFilter<TPackageInfo> pipelineFilter)
{
var packagePipe = CreatePackagePipe<TPackageInfo>(Options.ReadAsDemand);

_packagePipe = packagePipe;
_pipelineFilter = pipelineFilter;
Expand Down Expand Up @@ -304,7 +309,7 @@ protected async Task ReadPipeAsync<TPackageInfo>(PipeReader reader, IObjectPipe<
}
}

reader.Complete();
CompleteReader(reader, _isDetaching);
WriteEOFPackage();
}

Expand Down Expand Up @@ -412,5 +417,10 @@ protected void OnError(string message, Exception e = null)
else
Logger?.LogError(message);
}

protected virtual void CompleteReader(PipeReader reader, bool isDetaching)
{
reader.Complete();
}
}
}
31 changes: 20 additions & 11 deletions src/SuperSocket.Kestrel/KestrelPipeConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace SuperSocket.Kestrel;
using Microsoft.Extensions.Logging;

namespace SuperSocket.Kestrel;

using System;
using System.IO;
Expand All @@ -21,6 +23,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option
context.ConnectionClosed.Register(() => OnConnectionClosed());
LocalEndPoint = context.LocalEndPoint;
RemoteEndPoint = context.RemoteEndPoint;

if (options.ReadAsDemand)
{
Logger.LogWarning($"{nameof(KestrelPipeConnection)} doesn't support ReadAsDemand.");
}
}

protected override void CompleteReader(PipeReader reader, bool isDetaching)
{
if (!isDetaching)
{
reader.Complete();
}
}

protected override void OnClosed()
Expand All @@ -31,11 +46,6 @@ protected override void OnClosed()
base.OnClosed();
}

public override ValueTask DetachAsync()
{
throw new NotSupportedException($"Detach is not supported by {nameof(KestrelPipeConnection)}.");
}

protected override async void Close()
{
var context = _context;
Expand All @@ -51,10 +61,8 @@ protected override async void Close()

protected override void OnInputPipeRead(ReadResult result)
{
if (!result.IsCanceled && !result.IsCompleted)
{
if (result is { IsCanceled: false, IsCompleted: false })
UpdateLastActiveTime();
}
}

public override async ValueTask SendAsync(Action<PipeWriter> write, CancellationToken cancellationToken)
Expand All @@ -69,7 +77,8 @@ public override async ValueTask SendAsync(ReadOnlyMemory<byte> buffer, Cancellat
UpdateLastActiveTime();
}

public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package, CancellationToken cancellationToken)
public override async ValueTask SendAsync<TPackage>(IPackageEncoder<TPackage> packageEncoder, TPackage package,
CancellationToken cancellationToken)
{
await base.SendAsync(packageEncoder, package, cancellationToken);
UpdateLastActiveTime();
Expand All @@ -94,4 +103,4 @@ private void OnConnectionClosed()
{
Cancel();
}
}
}
111 changes: 64 additions & 47 deletions test/SuperSocket.Tests/ClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
using SuperSocket.Server.Abstractions;
using SuperSocket.Server.Abstractions.Session;
using System.Threading;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging.Abstractions;
using SuperSocket.Kestrel;
using SuperSocket.WebSocket;

namespace SuperSocket.Tests
Expand Down Expand Up @@ -147,7 +150,7 @@
if (e.SocketErrorCode == SocketError.AccessDenied || e.SocketErrorCode == SocketError.AddressAlreadyInUse)
continue;

throw e;

Check warning on line 153 in test/SuperSocket.Tests/ClientTest.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Re-throwing caught exception changes stack information (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2200)

Check warning on line 153 in test/SuperSocket.Tests/ClientTest.cs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

Re-throwing caught exception changes stack information (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2200)

Check warning on line 153 in test/SuperSocket.Tests/ClientTest.cs

View workflow job for this annotation

GitHub Actions / build (windows-latest)

Re-throwing caught exception changes stack information (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2200)

Check warning on line 153 in test/SuperSocket.Tests/ClientTest.cs

View workflow job for this annotation

GitHub Actions / build (windows-latest)

Re-throwing caught exception changes stack information (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2200)

Check warning on line 153 in test/SuperSocket.Tests/ClientTest.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

Re-throwing caught exception changes stack information (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2200)

Check warning on line 153 in test/SuperSocket.Tests/ClientTest.cs

View workflow job for this annotation

GitHub Actions / build (macos-latest)

Re-throwing caught exception changes stack information (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2200)
}

break;
Expand Down Expand Up @@ -256,78 +259,92 @@
}
}

[Fact]
[Fact]
[Trait("Category", "TestDetachableConnection")]
public async Task TestDetachableConnection()
{
IHostConfigurator hostConfigurator = new RegularHostConfigurator();

await TestDetachableConnectionInternal(hostConfigurator, (_, socket) =>
new StreamPipeConnection(
hostConfigurator.GetClientStream(socket).Result,
socket.RemoteEndPoint,
socket.LocalEndPoint,
new ConnectionOptions
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)),
ReadAsDemand = true
})
);

/* KestrelPipeConnection doesn't support Detach right now.
await TestDetachableConnectionInternal(new KestralConnectionHostConfigurator(), (server, socket) =>
new KestrelPipeConnection(
server.ServiceProvider.GetService<SocketConnectionContextFactory>().Create(socket),
using (var server = CreateSocketServerBuilder<TextPackageInfo, LinePipelineFilter>(hostConfigurator)
.UsePackageHandler(async (s, p) =>
{
await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n"));
}).BuildAsServer())
{
Assert.Equal("TestServer", server.Name);

Assert.True(await server.StartAsync());
OutputHelper.WriteLine("Server started.");

using (var socket = hostConfigurator.CreateClient())
{
await TestDetachableConnectionInternal(hostConfigurator, server, ser => new StreamPipeConnection(
hostConfigurator.GetClientStream(socket).Result,
socket.RemoteEndPoint,
socket.LocalEndPoint,
new ConnectionOptions
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)),
ReadAsDemand = false
}
)
);
*/
}
ReadAsDemand = true
}), () => socket.Connected);
}

await server.StopAsync();
}

async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, Func<IServer, Socket, IConnection> connectionFactory)
{
using (var server = CreateSocketServerBuilder<TextPackageInfo, LinePipelineFilter>(hostConfigurator)
.UsePackageHandler(async (s, p) =>
{
await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n"));
}).BuildAsServer())
.ConfigureServices((ctx, services) => services.AddSocketConnectionFactory())
.UsePackageHandler(async (s, p) =>
{
await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n"));
}).UseKestrelPipeConnection().BuildAsServer())
{

Assert.Equal("TestServer", server.Name);

Assert.True(await server.StartAsync());
OutputHelper.WriteLine("Server started.");

using (var socket = hostConfigurator.CreateClient())
var connectionFactory = server.ServiceProvider
.GetRequiredService<Microsoft.AspNetCore.Connections.IConnectionFactory>();
await using (var context = await connectionFactory.ConnectAsync(hostConfigurator.GetServerEndPoint()))
{
var connection = connectionFactory(server, socket);
await TestDetachableConnectionInternal(hostConfigurator, server, ser => new KestrelPipeConnection(
context,
new ConnectionOptions
{
Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)),
ReadAsDemand = false
}
), () => !context.ConnectionClosed.IsCancellationRequested);
}

await TestConnection(connection);
await server.StopAsync();
}
}

OutputHelper.WriteLine("Before DetachAsync");
async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator,
IServer server,
Func<IServer, IConnection> connectionFactory,
Func<bool> checkConnectionFactory)
{
var connection = connectionFactory(server);

await connection.DetachAsync();
await TestConnection(connection);

// the connection is still alive in the server
Assert.Equal(1, server.SessionCount);
OutputHelper.WriteLine("Before DetachAsync");

// socket.Connected is is still connected
Assert.True(socket.Connected);
await connection.DetachAsync();

// Attach the socket with another connection
connection = connectionFactory(server, socket);
// the connection is still alive in the server
Assert.Equal(1, server.SessionCount);

await TestConnection(connection);
}
// socket.Connected is is still connected
Assert.True(checkConnectionFactory());

await server.StopAsync();
}
}
// Attach the socket with another connection
connection = connectionFactory(server);

await TestConnection(connection);
}
async Task TestConnection(IConnection connection)
{
var packagePipe = connection.RunAsync(new LinePipelineFilter());
Expand Down
25 changes: 25 additions & 0 deletions test/SuperSocket.Tests/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets;
using Microsoft.Extensions.DependencyInjection;

namespace SuperSocket.Tests;

public static class ServiceCollectionExtensions
{
private const string SocketConnectionFactoryTypeName =
"Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactory";

private static Type FindSocketConnectionFactory()
{
var assembly = typeof(SocketTransportOptions).Assembly;
var connectionFactoryType = assembly.GetType(SocketConnectionFactoryTypeName);
return connectionFactoryType ?? throw new NotSupportedException(SocketConnectionFactoryTypeName);
}

public static IServiceCollection AddSocketConnectionFactory(this IServiceCollection services)
{
var factoryType = FindSocketConnectionFactory();
return services.AddSingleton(typeof(IConnectionFactory), factoryType);
}
}
Loading