From 742ae54a2f60ce5a1acf2fd2d2b3c8e4c59cb82a Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 13 Dec 2024 16:52:38 +0000 Subject: [PATCH] Fix new warnings (#3026) Motivation: 6.0.3 added new warnings about Sendable issues, so let's fix those to keep things warnings free. Modifications: A bunch of Sendable usage fixes. Result: Warnings free builds again! --- .../NIOAsyncAwaitDemo/AsyncChannelIO.swift | 7 +- Sources/NIOPosix/Bootstrap.swift | 8 +- .../AsyncTestingChannelTests.swift | 16 ++-- .../HTTPClientUpgradeTests.swift | 4 +- .../HTTPServerUpgradeTests.swift | 4 +- .../AsyncChannelBootstrapTests.swift | 30 ++++---- .../NIOScheduledCallbackTests.swift | 76 +++++++++---------- .../RawSocketBootstrapTests.swift | 36 +++++++-- Tests/NIOPosixTests/SocketChannelTest.swift | 3 +- .../UniversalBootstrapSupportTest.swift | 8 +- .../WebSocketClientEndToEndTests.swift | 53 +++++++++---- 11 files changed, 150 insertions(+), 95 deletions(-) diff --git a/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift b/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift index e5612e1f69..5e6f04a38d 100644 --- a/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift +++ b/Sources/NIOAsyncAwaitDemo/AsyncChannelIO.swift @@ -24,8 +24,11 @@ struct AsyncChannelIO { } func start() async throws -> AsyncChannelIO { - try await channel.pipeline.addHandler(RequestResponseHandler()) - .get() + try await channel.eventLoop.submit { + try channel.pipeline.syncOperations.addHandler( + RequestResponseHandler() + ) + }.get() return self } diff --git a/Sources/NIOPosix/Bootstrap.swift b/Sources/NIOPosix/Bootstrap.swift index fa117e8576..e3b5710ffe 100644 --- a/Sources/NIOPosix/Bootstrap.swift +++ b/Sources/NIOPosix/Bootstrap.swift @@ -2420,6 +2420,8 @@ extension NIOPipeBootstrap { let channel: PipeChannel let pipeChannelInput: SelectablePipeHandle? let pipeChannelOutput: SelectablePipeHandle? + let hasNoInputPipe: Bool + let hasNoOutputPipe: Bool do { if let input = input { try self.validateFileDescriptorIsNotAFile(input) @@ -2430,6 +2432,8 @@ extension NIOPipeBootstrap { pipeChannelInput = input.flatMap { SelectablePipeHandle(takingOwnershipOfDescriptor: $0) } pipeChannelOutput = output.flatMap { SelectablePipeHandle(takingOwnershipOfDescriptor: $0) } + hasNoInputPipe = pipeChannelInput == nil + hasNoOutputPipe = pipeChannelOutput == nil do { channel = try self.hooks.makePipeChannel( eventLoop: eventLoop as! SelectableEventLoop, @@ -2458,10 +2462,10 @@ extension NIOPipeBootstrap { channel.registerAlreadyConfigured0(promise: promise) return promise.futureResult.map { result } }.flatMap { result -> EventLoopFuture in - if pipeChannelInput == nil { + if hasNoInputPipe { return channel.close(mode: .input).map { result } } - if pipeChannelOutput == nil { + if hasNoOutputPipe { return channel.close(mode: .output).map { result } } return channel.selectableEventLoop.makeSucceededFuture(result) diff --git a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift index 1619f95638..5fa112f6d1 100644 --- a/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncTestingChannelTests.swift @@ -21,7 +21,7 @@ import XCTest @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) class AsyncTestingChannelTests: XCTestCase { func testSingleHandlerInit() async throws { - class Handler: ChannelInboundHandler { + final class Handler: ChannelInboundHandler, Sendable { typealias InboundIn = Never } @@ -43,7 +43,7 @@ class AsyncTestingChannelTests: XCTestCase { } func testMultipleHandlerInit() async throws { - class Handler: ChannelInboundHandler, RemovableChannelHandler { + final class Handler: ChannelInboundHandler, RemovableChannelHandler, Sendable { typealias InboundIn = Never let identifier: String @@ -334,7 +334,7 @@ class AsyncTestingChannelTests: XCTestCase { try await XCTAsyncAssertTrue(await channel.finish().isClean) // channelInactive should fire only once. - XCTAssertEqual(inactiveHandler.inactiveNotifications, 1) + XCTAssertEqual(inactiveHandler.inactiveNotifications.load(ordering: .sequentiallyConsistent), 1) } func testEmbeddedLifecycle() async throws { @@ -355,7 +355,7 @@ class AsyncTestingChannelTests: XCTestCase { XCTAssertFalse(channel.isActive) } - private final class ExceptionThrowingInboundHandler: ChannelInboundHandler { + private final class ExceptionThrowingInboundHandler: ChannelInboundHandler, Sendable { typealias InboundIn = String public func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -363,7 +363,7 @@ class AsyncTestingChannelTests: XCTestCase { } } - private final class ExceptionThrowingOutboundHandler: ChannelOutboundHandler { + private final class ExceptionThrowingOutboundHandler: ChannelOutboundHandler, Sendable { typealias OutboundIn = String typealias OutboundOut = Never @@ -372,12 +372,12 @@ class AsyncTestingChannelTests: XCTestCase { } } - private final class CloseInChannelInactiveHandler: ChannelInboundHandler { + private final class CloseInChannelInactiveHandler: ChannelInboundHandler, Sendable { typealias InboundIn = ByteBuffer - public var inactiveNotifications = 0 + public let inactiveNotifications = ManagedAtomic(0) public func channelInactive(context: ChannelHandlerContext) { - inactiveNotifications += 1 + inactiveNotifications.wrappingIncrement(by: 1, ordering: .sequentiallyConsistent) context.close(promise: nil) } } diff --git a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift index 43cfbb4c6b..97d9df956a 100644 --- a/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPClientUpgradeTests.swift @@ -318,7 +318,7 @@ private func assertPipelineContainsUpgradeHandler(channel: Channel) { @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) class HTTPClientUpgradeTestCase: XCTestCase { func setUpClientChannel( - clientHTTPHandler: RemovableChannelHandler, + clientHTTPHandler: RemovableChannelHandler & Sendable, clientUpgraders: [any TypedAndUntypedHTTPClientProtocolUpgrader], _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void ) throws -> EmbeddedChannel { @@ -1063,7 +1063,7 @@ class HTTPClientUpgradeTestCase: XCTestCase { @available(macOS 13, iOS 16, tvOS 16, watchOS 9, *) final class TypedHTTPClientUpgradeTestCase: HTTPClientUpgradeTestCase { override func setUpClientChannel( - clientHTTPHandler: RemovableChannelHandler, + clientHTTPHandler: RemovableChannelHandler & Sendable, clientUpgraders: [any TypedAndUntypedHTTPClientProtocolUpgrader], _ upgradeCompletionHandler: @escaping (ChannelHandlerContext) -> Void ) throws -> EmbeddedChannel { diff --git a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift index 3a69f806e3..9049c5122d 100644 --- a/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift +++ b/Tests/NIOHTTP1Tests/HTTPServerUpgradeTests.swift @@ -1388,7 +1388,7 @@ class HTTPServerUpgradeTestCase: XCTestCase { XCTAssertNil(upgradeRequest.wrappedValue) upgradeHandlerCbFired.wrappedValue = true - _ = context.channel.pipeline.addHandler( + try! context.channel.pipeline.syncOperations.addHandler( CheckWeReadInlineAndExtraData( firstByteDonePromise: firstByteDonePromise, secondByteDonePromise: secondByteDonePromise, @@ -2145,7 +2145,7 @@ final class TypedHTTPServerUpgradeTestCase: HTTPServerUpgradeTestCase { XCTAssertNotNil(upgradeRequest.wrappedValue) upgradeHandlerCbFired.wrappedValue = true - _ = context.channel.pipeline.addHandler( + try! context.channel.pipeline.syncOperations.addHandler( CheckWeReadInlineAndExtraData( firstByteDonePromise: firstByteDonePromise, secondByteDonePromise: secondByteDonePromise, diff --git a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift index 847a5ac782..68604ef0f4 100644 --- a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift +++ b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift @@ -284,7 +284,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try Self.configureProtocolNegotiationHandlers(channel: channel) } } @@ -366,7 +366,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureNestedProtocolNegotiationHandlers(channel: channel) + try Self.configureNestedProtocolNegotiationHandlers(channel: channel) } } @@ -508,7 +508,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { port: 0 ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try Self.configureProtocolNegotiationHandlers(channel: channel) } } @@ -958,7 +958,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { output: pipe2WriteFD ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel) + try Self.configureProtocolNegotiationHandlers(channel: channel) } } } catch { @@ -1251,7 +1251,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( AddressedEnvelopingHandler(remoteAddress: SocketAddress(ipAddress: "127.0.0.1", port: 0)) ) - return try self.configureProtocolNegotiationHandlers( + return try Self.configureProtocolNegotiationHandlers( channel: channel, proposedALPN: nil, inboundID: 1, @@ -1275,7 +1275,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( AddressedEnvelopingHandler(remoteAddress: SocketAddress(ipAddress: "127.0.0.1", port: 0)) ) - return try self.configureProtocolNegotiationHandlers( + return try Self.configureProtocolNegotiationHandlers( channel: channel, proposedALPN: proposedALPN, inboundID: 2, @@ -1329,7 +1329,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { to: .init(ipAddress: "127.0.0.1", port: port) ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) } } } @@ -1345,7 +1345,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { to: .init(ipAddress: "127.0.0.1", port: port) ) { channel in channel.eventLoop.makeCompletedFuture { - try self.configureNestedProtocolNegotiationHandlers( + try Self.configureNestedProtocolNegotiationHandlers( channel: channel, proposedOuterALPN: proposedOuterALPN, proposedInnerALPN: proposedInnerALPN @@ -1382,7 +1382,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { ) { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler()) - return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + return try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) } } } @@ -1418,13 +1418,13 @@ final class AsyncChannelBootstrapTests: XCTestCase { ) { channel in channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(AddressedEnvelopingHandler()) - return try self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) + return try Self.configureProtocolNegotiationHandlers(channel: channel, proposedALPN: proposedALPN) } } } @discardableResult - private func configureProtocolNegotiationHandlers( + private static func configureProtocolNegotiationHandlers( channel: Channel, proposedALPN: TLSUserEventHandler.ALPN? = nil, inboundID: UInt8? = nil, @@ -1437,7 +1437,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } @discardableResult - private func configureNestedProtocolNegotiationHandlers( + private static func configureNestedProtocolNegotiationHandlers( channel: Channel, proposedOuterALPN: TLSUserEventHandler.ALPN? = nil, proposedInnerALPN: TLSUserEventHandler.ALPN? = nil @@ -1456,7 +1456,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( TLSUserEventHandler(proposedALPN: proposedInnerALPN) ) - let negotiationFuture = try self.addTypedApplicationProtocolNegotiationHandler(to: channel) + let negotiationFuture = try Self.addTypedApplicationProtocolNegotiationHandler(to: channel) return negotiationFuture } @@ -1465,7 +1465,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { try channel.pipeline.syncOperations.addHandler( TLSUserEventHandler(proposedALPN: proposedInnerALPN) ) - let negotiationHandler = try self.addTypedApplicationProtocolNegotiationHandler(to: channel) + let negotiationHandler = try Self.addTypedApplicationProtocolNegotiationHandler(to: channel) return negotiationHandler } @@ -1481,7 +1481,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } @discardableResult - private func addTypedApplicationProtocolNegotiationHandler( + private static func addTypedApplicationProtocolNegotiationHandler( to channel: Channel ) throws -> EventLoopFuture { let negotiationHandler = NIOTypedApplicationProtocolNegotiationHandler { diff --git a/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift b/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift index b115ac2f8f..9940b74e12 100644 --- a/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift +++ b/Tests/NIOPosixTests/NIOScheduledCallbackTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Atomics import NIOCore import NIOEmbedded import NIOPosix @@ -26,9 +27,6 @@ protocol ScheduledCallbackTestRequirements { // ELG-backed ELs need to be shutdown via the ELG. func shutdownEventLoop() async throws - - // This is here for NIOAsyncTestingEventLoop only. - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R } final class MTELGScheduledCallbackTests: _BaseScheduledCallbackTests { @@ -43,10 +41,6 @@ final class MTELGScheduledCallbackTests: _BaseScheduledCallbackTests { func shutdownEventLoop() async throws { try await self.group.shutdownGracefully() } - - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R { - try body() - } } override func setUp() async throws { @@ -66,10 +60,6 @@ final class NIOAsyncTestingEventLoopScheduledCallbackTests: _BaseScheduledCallba func shutdownEventLoop() async throws { await self._loop.shutdownGracefully() } - - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R { - try await self._loop.executeInContext(body) - } } override func setUp() async throws { @@ -98,10 +88,6 @@ extension _BaseScheduledCallbackTests { func shutdownEventLoop() async throws { try await self.requirements.shutdownEventLoop() } - - func maybeInContext(_ body: @escaping @Sendable () throws -> R) async throws -> R { - try await self.requirements.maybeInContext(body) - } } // The tests, abstracted over any of the event loops. @@ -111,10 +97,10 @@ extension _BaseScheduledCallbackTests { let handler = MockScheduledCallbackHandler() _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 0) } + handler.assert(callbackCount: 0, cancelCount: 0) try await self.advanceTime(by: .microseconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 0) } + handler.assert(callbackCount: 0, cancelCount: 0) } func testSheduledCallbackExecutedAtDeadline() async throws { @@ -123,7 +109,7 @@ extension _BaseScheduledCallbackTests { _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) } func testMultipleSheduledCallbacksUsingSameHandler() async throws { @@ -135,7 +121,7 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 2, cancelCount: 0) } + handler.assert(callbackCount: 2, cancelCount: 0) _ = try self.loop.scheduleCallback(in: .milliseconds(2), handler: handler) _ = try self.loop.scheduleCallback(in: .milliseconds(3), handler: handler) @@ -143,7 +129,7 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(3)) try await handler.waitForCallback(timeout: .seconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 4, cancelCount: 0) } + handler.assert(callbackCount: 4, cancelCount: 0) } func testMultipleSheduledCallbacksUsingDifferentHandlers() async throws { @@ -156,8 +142,8 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(1)) try await handlerA.waitForCallback(timeout: .seconds(1)) try await handlerB.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handlerA.assert(callbackCount: 1, cancelCount: 0) } - try await self.maybeInContext { handlerB.assert(callbackCount: 1, cancelCount: 0) } + handlerA.assert(callbackCount: 1, cancelCount: 0) + handlerB.assert(callbackCount: 1, cancelCount: 0) } func testCancelExecutesCancellationCallback() async throws { @@ -165,7 +151,7 @@ extension _BaseScheduledCallbackTests { let scheduledCallback = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testCancelAfterDeadlineDoesNotExecutesCancellationCallback() async throws { @@ -175,7 +161,7 @@ extension _BaseScheduledCallbackTests { try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) } func testCancelAfterCancelDoesNotCallCancellationCallbackAgain() async throws { @@ -184,7 +170,7 @@ extension _BaseScheduledCallbackTests { let scheduledCallback = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) scheduledCallback.cancel() scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testCancelAfterShutdownDoesNotCallCancellationCallbackAgain() async throws { @@ -192,10 +178,10 @@ extension _BaseScheduledCallbackTests { let scheduledCallback = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) scheduledCallback.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testShutdownCancelsOutstandingScheduledCallbacks() async throws { @@ -203,7 +189,7 @@ extension _BaseScheduledCallbackTests { _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testShutdownDoesNotCancelCancelledCallbacksAgain() async throws { @@ -211,10 +197,10 @@ extension _BaseScheduledCallbackTests { let handle = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) handle.cancel() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 0, cancelCount: 1) } + handler.assert(callbackCount: 0, cancelCount: 1) } func testShutdownDoesNotCancelPastCallbacks() async throws { @@ -223,16 +209,16 @@ extension _BaseScheduledCallbackTests { _ = try self.loop.scheduleCallback(in: .milliseconds(1), handler: handler) try await self.advanceTime(by: .milliseconds(1)) try await handler.waitForCallback(timeout: .seconds(1)) - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) try await self.shutdownEventLoop() - try await self.maybeInContext { handler.assert(callbackCount: 1, cancelCount: 0) } + handler.assert(callbackCount: 1, cancelCount: 0) } } -private final class MockScheduledCallbackHandler: NIOScheduledCallbackHandler { - var callbackCount = 0 - var cancelCount = 0 +private final class MockScheduledCallbackHandler: NIOScheduledCallbackHandler, Sendable { + let callbackCount = ManagedAtomic(0) + let cancelCount = ManagedAtomic(0) let callbackStream: AsyncStream private let callbackStreamContinuation: AsyncStream.Continuation @@ -246,17 +232,29 @@ private final class MockScheduledCallbackHandler: NIOScheduledCallbackHandler { } func handleScheduledCallback(eventLoop: some EventLoop) { - self.callbackCount += 1 + self.callbackCount.wrappingIncrement(by: 1, ordering: .sequentiallyConsistent) self.callbackStreamContinuation.yield() } func didCancelScheduledCallback(eventLoop: some EventLoop) { - self.cancelCount += 1 + self.cancelCount.wrappingIncrement(by: 1, ordering: .sequentiallyConsistent) } func assert(callbackCount: Int, cancelCount: Int, file: StaticString = #file, line: UInt = #line) { - XCTAssertEqual(self.callbackCount, callbackCount, "Unexpected callback count", file: file, line: line) - XCTAssertEqual(self.cancelCount, cancelCount, "Unexpected cancel count", file: file, line: line) + XCTAssertEqual( + self.callbackCount.load(ordering: .sequentiallyConsistent), + callbackCount, + "Unexpected callback count", + file: file, + line: line + ) + XCTAssertEqual( + self.cancelCount.load(ordering: .sequentiallyConsistent), + cancelCount, + "Unexpected cancel count", + file: file, + line: line + ) } func waitForCallback(timeout: TimeAmount, file: StaticString = #file, line: UInt = #line) async throws { diff --git a/Tests/NIOPosixTests/RawSocketBootstrapTests.swift b/Tests/NIOPosixTests/RawSocketBootstrapTests.swift index a65ad1ae91..a7c42c3c82 100644 --- a/Tests/NIOPosixTests/RawSocketBootstrapTests.swift +++ b/Tests/NIOPosixTests/RawSocketBootstrapTests.swift @@ -47,8 +47,13 @@ final class RawSocketBootstrapTests: XCTestCase { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } let channel = try NIORawSocketBootstrap(group: elg) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try channel.close().wait()) } @@ -93,15 +98,25 @@ final class RawSocketBootstrapTests: XCTestCase { let elg = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } let readChannel = try NIORawSocketBootstrap(group: elg) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try readChannel.close().wait()) } let writeChannel = try NIORawSocketBootstrap(group: elg) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try writeChannel.close().wait()) } @@ -147,8 +162,13 @@ final class RawSocketBootstrapTests: XCTestCase { defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } let channel = try NIORawSocketBootstrap(group: elg) .channelOption(.ipOption(.ip_hdrincl), value: 1) - .channelInitializer { - $0.pipeline.addHandler(DatagramReadRecorder(), name: "ByteReadRecorder") + .channelInitializer { channel in + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler( + DatagramReadRecorder(), + name: "ByteReadRecorder" + ) + } } .bind(host: "127.0.0.1", ipProtocol: .reservedForTesting).wait() defer { XCTAssertNoThrow(try channel.close().wait()) } diff --git a/Tests/NIOPosixTests/SocketChannelTest.swift b/Tests/NIOPosixTests/SocketChannelTest.swift index cb8161a7e5..a42eefb0fc 100644 --- a/Tests/NIOPosixTests/SocketChannelTest.swift +++ b/Tests/NIOPosixTests/SocketChannelTest.swift @@ -1138,11 +1138,12 @@ class DropAllReadsOnTheFloorHandler: ChannelDuplexHandler { // What we're trying to do here is forcing a close without calling `close`. We know that the other side of // the connection is fully closed but because we support half-closure, we need to write to 'learn' that the // other side has actually fully closed the socket. + let promise = self.waitUntilWriteFailedPromise func writeUntilError() { context.writeAndFlush(Self.wrapOutboundOut(buffer)).map { writeUntilError() }.whenFailure { (_: Error) in - self.waitUntilWriteFailedPromise.succeed(()) + promise.succeed(()) } } writeUntilError() diff --git a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift index 8b671fce63..9d729c5532 100644 --- a/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift +++ b/Tests/NIOPosixTests/UniversalBootstrapSupportTest.swift @@ -83,7 +83,13 @@ class UniversalBootstrapSupportTest: XCTestCase { let client = try NIOClientTCPBootstrap(ClientBootstrap(group: group), tls: DummyTLSProvider()) .channelInitializer { channel in - channel.pipeline.addHandlers(counter1, DropChannelReadsHandler(), counter2) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandlers( + counter1, + DropChannelReadsHandler(), + counter2 + ) + } } .channelOption(.autoRead, value: false) .connectTimeout(.hours(1)) diff --git a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift index 23202c2c4b..71a86bdc1e 100644 --- a/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift +++ b/Tests/NIOWebSocketTests/WebSocketClientEndToEndTests.swift @@ -170,7 +170,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -244,7 +246,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -275,7 +279,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -307,7 +313,9 @@ class WebSocketClientEndToEndTests: XCTestCase { let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -332,13 +340,12 @@ class WebSocketClientEndToEndTests: XCTestCase { } fileprivate func runSuccessfulUpgrade() throws -> (EmbeddedChannel, WebSocketRecorderHandler) { - - let handler = WebSocketRecorderHandler() - let basicUpgrader = NIOWebSocketClientUpgrader( requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=", upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(handler) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -361,6 +368,10 @@ class WebSocketClientEndToEndTests: XCTestCase { clientChannel.embeddedEventLoop.run() + // Ok, now grab the handler. We can do this with sync operations, because this is an + // EmbeddedChannel. + let handler = try clientChannel.pipeline.syncOperations.handler(type: WebSocketRecorderHandler.self) + return (clientChannel, handler) } @@ -501,7 +512,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -570,7 +583,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -603,7 +618,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -637,7 +654,9 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: requestKey, upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(WebSocketRecorderHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -665,12 +684,12 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { } override fileprivate func runSuccessfulUpgrade() throws -> (EmbeddedChannel, WebSocketRecorderHandler) { - let handler = WebSocketRecorderHandler() - let basicUpgrader = NIOTypedWebSocketClientUpgrader( requestKey: "OfS0wDaT5NoxF2gqm7Zj2YtetzM=", upgradePipelineHandler: { (channel: Channel, _: HTTPResponseHead) in - channel.pipeline.addHandler(handler) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(WebSocketRecorderHandler()) + } } ) @@ -695,6 +714,10 @@ final class TypedWebSocketClientEndToEndTests: WebSocketClientEndToEndTests { try upgradeResult.wait() + // Ok, now grab the handler. We can do this with sync operations, because this is an + // EmbeddedChannel. + let handler = try clientChannel.pipeline.syncOperations.handler(type: WebSocketRecorderHandler.self) + return (clientChannel, handler) } }