From 027bd022ee76927bf1b9f0c188744af461ea9f0d Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 15 Dec 2024 19:30:20 -0600 Subject: [PATCH] Cancel request reading upon shutdown Fixes #367 and #370. --- src/Driver/Http1Driver.php | 42 +++++++++++++++++++++++++-------- test/Driver/Http1DriverTest.php | 36 ++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 10 deletions(-) diff --git a/src/Driver/Http1Driver.php b/src/Driver/Http1Driver.php index c81cdc24..41f26f48 100644 --- a/src/Driver/Http1Driver.php +++ b/src/Driver/Http1Driver.php @@ -100,9 +100,10 @@ public function handleClient( $this->insertTimeout(); $headerSizeLimit = $this->headerSizeLimit; + $cancellation = $this->deferredCancellation->getCancellation(); try { - $buffer = $readableStream->read(); + $buffer = $readableStream->read($cancellation); if ($buffer === null) { $this->removeTimeout(); return; @@ -141,7 +142,7 @@ public function handleClient( ); } - $chunk = $readableStream->read(); + $chunk = $readableStream->read($cancellation); if ($chunk === null) { return; } @@ -413,7 +414,8 @@ public function handleClient( $this->suspendTimeout(); $this->currentBuffer = $buffer; - $this->handleRequest($request); + $this->pendingResponse = async($this->handleRequest(...), $request); + $this->pendingResponse->await(); $this->pendingResponseCount--; continue; @@ -486,7 +488,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { ); } - $chunk = $this->readableStream->read(); + $chunk = $this->readableStream->read($cancellation); if ($chunk === null) { return; } @@ -514,7 +516,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { if ($chunkLengthRemaining === 0) { while (!isset($buffer[1])) { - $chunk = $readableStream->read(); + $chunk = $readableStream->read($cancellation); if ($chunk === null) { return; } @@ -546,7 +548,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { ); } - $chunk = $this->readableStream->read(); + $chunk = $this->readableStream->read($cancellation); if ($chunk === null) { return; } @@ -599,7 +601,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { $remaining -= $bodyBufferSize; } - $body = $readableStream->read(); + $body = $readableStream->read($cancellation); if ($body === null) { return; } @@ -635,7 +637,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { $bufferLength = \strlen($buffer); if (!$bufferLength) { - $chunk = $readableStream->read(); + $chunk = $readableStream->read($cancellation); if ($chunk === null) { return; } @@ -647,7 +649,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { // These first two (extreme) edge cases prevent errors where the packet boundary ends after // the \r and before the \n at the end of a chunk. if ($bufferLength === $chunkLengthRemaining || $bufferLength === $chunkLengthRemaining + 1) { - $chunk = $readableStream->read(); + $chunk = $readableStream->read($cancellation); if ($chunk === null) { return; } @@ -704,7 +706,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void { $bodySize += $bodyBufferSize; } - $chunk = $readableStream->read(); + $chunk = $readableStream->read($cancellation); if ($chunk === null) { return; } @@ -756,6 +758,12 @@ static function (int $bodySize) use (&$bodySizeLimit): void { } } catch (StreamException) { // Client disconnected, finally block will clean up. + } catch (CancelledException) { + // Server shutting down. + if ($this->bodyQueue === null || !$this->pendingResponseCount) { + // Send a service unavailable response only if another response has not already been sent. + $this->sendServiceUnavailableResponse($request ?? null)->await(); + } } finally { $this->pendingResponse->finally(function (): void { $this->removeTimeout(); @@ -1023,6 +1031,19 @@ private function upgrade(Request $request, Response $response): void } } + /** + * Creates a service unavailable response from the error handler and sends that response to the client. + * + * @return Future + */ + private function sendServiceUnavailableResponse(?Request $request): Future + { + $response = $this->errorHandler->handleError(HttpStatus::SERVICE_UNAVAILABLE, request: $request); + $response->setHeader("connection", "close"); + + return $this->lastWrite = async($this->send(...), $this->lastWrite, $response); + } + /** * Creates an error response from the error handler and sends that response to the client. * @@ -1062,6 +1083,7 @@ public function stop(): void $this->pendingResponse->await(); $this->lastWrite?->await(); + $this->deferredCancellation->cancel(); } public function getApplicationLayerProtocols(): array diff --git a/test/Driver/Http1DriverTest.php b/test/Driver/Http1DriverTest.php index 34108a06..8aee798c 100644 --- a/test/Driver/Http1DriverTest.php +++ b/test/Driver/Http1DriverTest.php @@ -1209,4 +1209,40 @@ public function testTimeoutSuspendedDuringRequestHandler(): void self::assertStringStartsWith('HTTP/1.1 202', $output->buffer()); } + + public function testShutdownDuringRequestRead(): void + { + $driver = new Http1Driver( + new ClosureRequestHandler(fn () => self::fail('Request handler not expected to be called')), + $this->createMock(ErrorHandler::class), + new NullLogger, + ); + + $input = new Queue(); + $input->pushAsync( + // Insufficient request headers + "POST /post HTTP/1.1\r\n" . + "Host: localhost\r\n" . + "Content-Length: 100\r\n" + ); + + $output = new WritableBuffer(); + + async(fn () => $driver->handleClient( + $this->createClientMock(), + new ReadableIterableStream($input->iterate()), + $output, + )); + + delay(0.1); // Allow parser generator to run. + + $driver->stop(); + + delay(0.1); // Give time for cancellation to be processed. + + $input->complete(); + $output->close(); + + self::assertStringStartsWith('HTTP/1.0 503', $output->buffer()); + } }