diff --git a/src/SuperSocket.Connection/IObjectPipe.cs b/src/SuperSocket.Connection/IObjectPipe.cs index 5655eef90..66b05aafb 100644 --- a/src/SuperSocket.Connection/IObjectPipe.cs +++ b/src/SuperSocket.Connection/IObjectPipe.cs @@ -26,4 +26,4 @@ interface ISupplyController void SupplyEnd(); } -} +} \ No newline at end of file diff --git a/src/SuperSocket.Connection/PipeConnectionBase.cs b/src/SuperSocket.Connection/PipeConnectionBase.cs index faa0d6c4d..51a26be93 100644 --- a/src/SuperSocket.Connection/PipeConnectionBase.cs +++ b/src/SuperSocket.Connection/PipeConnectionBase.cs @@ -66,11 +66,16 @@ protected void UpdateLastActiveTime() LastActiveTime = DateTimeOffset.Now; } - public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + protected virtual IObjectPipe CreatePackagePipe(bool readAsDemand) { - var packagePipe = !Options.ReadAsDemand + return !readAsDemand ? new DefaultObjectPipe() : new DefaultObjectPipeWithSupplyControl(); + } + + public async override IAsyncEnumerable RunAsync(IPipelineFilter pipelineFilter) + { + var packagePipe = CreatePackagePipe(Options.ReadAsDemand); _packagePipe = packagePipe; _pipelineFilter = pipelineFilter; @@ -304,7 +309,7 @@ protected async Task ReadPipeAsync(PipeReader reader, IObjectPipe< } } - reader.Complete(); + CompleteReader(reader, _isDetaching); WriteEOFPackage(); } @@ -412,5 +417,10 @@ protected void OnError(string message, Exception e = null) else Logger?.LogError(message); } + + protected virtual void CompleteReader(PipeReader reader, bool isDetaching) + { + reader.Complete(); + } } } diff --git a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs index 8ed40dd93..af5d9772e 100644 --- a/src/SuperSocket.Kestrel/KestrelPipeConnection.cs +++ b/src/SuperSocket.Kestrel/KestrelPipeConnection.cs @@ -1,4 +1,6 @@ -namespace SuperSocket.Kestrel; +using Microsoft.Extensions.Logging; + +namespace SuperSocket.Kestrel; using System; using System.IO; @@ -21,6 +23,19 @@ public KestrelPipeConnection(ConnectionContext context, ConnectionOptions option context.ConnectionClosed.Register(() => OnConnectionClosed()); LocalEndPoint = context.LocalEndPoint; RemoteEndPoint = context.RemoteEndPoint; + + if (options.ReadAsDemand) + { + Logger.LogWarning($"{nameof(KestrelPipeConnection)} doesn't support ReadAsDemand."); + } + } + + protected override void CompleteReader(PipeReader reader, bool isDetaching) + { + if (!isDetaching) + { + reader.Complete(); + } } protected override void OnClosed() @@ -31,11 +46,6 @@ protected override void OnClosed() base.OnClosed(); } - public override ValueTask DetachAsync() - { - throw new NotSupportedException($"Detach is not supported by {nameof(KestrelPipeConnection)}."); - } - protected override async void Close() { var context = _context; @@ -51,10 +61,8 @@ protected override async void Close() protected override void OnInputPipeRead(ReadResult result) { - if (!result.IsCanceled && !result.IsCompleted) - { + if (result is { IsCanceled: false, IsCompleted: false }) UpdateLastActiveTime(); - } } public override async ValueTask SendAsync(Action write, CancellationToken cancellationToken) @@ -69,7 +77,8 @@ public override async ValueTask SendAsync(ReadOnlyMemory buffer, Cancellat UpdateLastActiveTime(); } - public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, CancellationToken cancellationToken) + public override async ValueTask SendAsync(IPackageEncoder packageEncoder, TPackage package, + CancellationToken cancellationToken) { await base.SendAsync(packageEncoder, package, cancellationToken); UpdateLastActiveTime(); @@ -94,4 +103,4 @@ private void OnConnectionClosed() { Cancel(); } -} +} \ No newline at end of file diff --git a/test/SuperSocket.Tests/ClientTest.cs b/test/SuperSocket.Tests/ClientTest.cs index c190f8174..38116096a 100644 --- a/test/SuperSocket.Tests/ClientTest.cs +++ b/test/SuperSocket.Tests/ClientTest.cs @@ -14,7 +14,10 @@ using SuperSocket.Server.Abstractions; using SuperSocket.Server.Abstractions.Session; using System.Threading; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; +using SuperSocket.Kestrel; using SuperSocket.WebSocket; namespace SuperSocket.Tests @@ -256,78 +259,92 @@ public async Task TestCommandLine(Type hostConfiguratorType) } } - [Fact] +[Fact] [Trait("Category", "TestDetachableConnection")] public async Task TestDetachableConnection() { IHostConfigurator hostConfigurator = new RegularHostConfigurator(); - await TestDetachableConnectionInternal(hostConfigurator, (_, socket) => - new StreamPipeConnection( - hostConfigurator.GetClientStream(socket).Result, - socket.RemoteEndPoint, - socket.LocalEndPoint, - new ConnectionOptions - { - Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = true - }) - ); - - /* KestrelPipeConnection doesn't support Detach right now. - await TestDetachableConnectionInternal(new KestralConnectionHostConfigurator(), (server, socket) => - new KestrelPipeConnection( - server.ServiceProvider.GetService().Create(socket), + using (var server = CreateSocketServerBuilder(hostConfigurator) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); + }).BuildAsServer()) + { + Assert.Equal("TestServer", server.Name); + + Assert.True(await server.StartAsync()); + OutputHelper.WriteLine("Server started."); + + using (var socket = hostConfigurator.CreateClient()) + { + await TestDetachableConnectionInternal(hostConfigurator, server, ser => new StreamPipeConnection( + hostConfigurator.GetClientStream(socket).Result, + socket.RemoteEndPoint, + socket.LocalEndPoint, new ConnectionOptions { Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), - ReadAsDemand = false - } - ) - ); - */ - } + ReadAsDemand = true + }), () => socket.Connected); + } + + await server.StopAsync(); + } - async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, Func connectionFactory) - { using (var server = CreateSocketServerBuilder(hostConfigurator) - .UsePackageHandler(async (s, p) => - { - await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); - }).BuildAsServer()) + .ConfigureServices((ctx, services) => services.AddSocketConnectionFactory()) + .UsePackageHandler(async (s, p) => + { + await s.SendAsync(Utf8Encoding.GetBytes("PRE-" + p.Text + "\r\n")); + }).UseKestrelPipeConnection().BuildAsServer()) { - Assert.Equal("TestServer", server.Name); Assert.True(await server.StartAsync()); OutputHelper.WriteLine("Server started."); - - using (var socket = hostConfigurator.CreateClient()) + var connectionFactory = server.ServiceProvider + .GetRequiredService(); + await using (var context = await connectionFactory.ConnectAsync(hostConfigurator.GetServerEndPoint())) { - var connection = connectionFactory(server, socket); + await TestDetachableConnectionInternal(hostConfigurator, server, ser => new KestrelPipeConnection( + context, + new ConnectionOptions + { + Logger = DefaultLoggerFactory.CreateLogger(nameof(TestDetachableConnection)), + ReadAsDemand = false + } + ), () => !context.ConnectionClosed.IsCancellationRequested); + } - await TestConnection(connection); + await server.StopAsync(); + } + } - OutputHelper.WriteLine("Before DetachAsync"); + async Task TestDetachableConnectionInternal(IHostConfigurator hostConfigurator, + IServer server, + Func connectionFactory, + Func checkConnectionFactory) + { + var connection = connectionFactory(server); - await connection.DetachAsync(); + await TestConnection(connection); - // the connection is still alive in the server - Assert.Equal(1, server.SessionCount); + OutputHelper.WriteLine("Before DetachAsync"); - // socket.Connected is is still connected - Assert.True(socket.Connected); + await connection.DetachAsync(); - // Attach the socket with another connection - connection = connectionFactory(server, socket); + // the connection is still alive in the server + Assert.Equal(1, server.SessionCount); - await TestConnection(connection); - } + // socket.Connected is is still connected + Assert.True(checkConnectionFactory()); - await server.StopAsync(); - } - } + // Attach the socket with another connection + connection = connectionFactory(server); + await TestConnection(connection); + } async Task TestConnection(IConnection connection) { var packagePipe = connection.RunAsync(new LinePipelineFilter()); diff --git a/test/SuperSocket.Tests/ServiceCollectionExtensions.cs b/test/SuperSocket.Tests/ServiceCollectionExtensions.cs new file mode 100644 index 000000000..bdc2932a7 --- /dev/null +++ b/test/SuperSocket.Tests/ServiceCollectionExtensions.cs @@ -0,0 +1,25 @@ +using System; +using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets; +using Microsoft.Extensions.DependencyInjection; + +namespace SuperSocket.Tests; + +public static class ServiceCollectionExtensions +{ + private const string SocketConnectionFactoryTypeName = + "Microsoft.AspNetCore.Server.Kestrel.Transport.Sockets.SocketConnectionFactory"; + + private static Type FindSocketConnectionFactory() + { + var assembly = typeof(SocketTransportOptions).Assembly; + var connectionFactoryType = assembly.GetType(SocketConnectionFactoryTypeName); + return connectionFactoryType ?? throw new NotSupportedException(SocketConnectionFactoryTypeName); + } + + public static IServiceCollection AddSocketConnectionFactory(this IServiceCollection services) + { + var factoryType = FindSocketConnectionFactory(); + return services.AddSingleton(typeof(IConnectionFactory), factoryType); + } +} \ No newline at end of file