Skip to content

Commit

Permalink
Fix issue #371
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 14, 2024
1 parent 955b246 commit 8870bdd
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 19 deletions.
9 changes: 8 additions & 1 deletion src/Driver/Http1Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,8 @@ public function handleClient(
continue;
}

$this->suspendTimeout();

$this->currentBuffer = $buffer;
$this->handleRequest($request);
$this->pendingResponseCount--;
Expand Down Expand Up @@ -740,7 +742,7 @@ static function (int $bodySize) use (&$bodySizeLimit): void {
$this->bodyQueue = null;
$queue->complete();

$this->updateTimeout();
$this->suspendTimeout();

if ($this->http2driver) {
continue;
Expand Down Expand Up @@ -802,6 +804,11 @@ private function updateTimeout(): void
self::getTimeoutQueue()->update($this->client, 0, $this->connectionTimeout);
}

private function suspendTimeout(): void
{
self::getTimeoutQueue()->suspend($this->client, 0);
}

private function removeTimeout(): void
{
self::getTimeoutQueue()->remove($this->client, 0);
Expand Down
46 changes: 30 additions & 16 deletions src/Driver/Http2Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Amp\Http\Server\ClientException;
use Amp\Http\Server\Driver\Internal\AbstractHttpDriver;
use Amp\Http\Server\Driver\Internal\Http2Stream;
use Amp\Http\Server\Driver\Internal\StreamTimeoutTracker;
use Amp\Http\Server\ErrorHandler;
use Amp\Http\Server\Push;
use Amp\Http\Server\Request;
Expand Down Expand Up @@ -67,6 +68,8 @@ final class Http2Driver extends AbstractHttpDriver implements Http2Processor
private ReadableStream $readableStream;
private WritableStream $writableStream;

private StreamTimeoutTracker $timeoutTracker;

private int $serverWindow = self::DEFAULT_WINDOW_SIZE;

private int $clientWindow = self::DEFAULT_WINDOW_SIZE;
Expand Down Expand Up @@ -140,9 +143,12 @@ public function handleClient(
$this->readableStream = $readableStream;
$this->writableStream = $writableStream;

self::getTimeoutQueue()->insert($this->client, 0, fn () => $this->shutdown(
new ClientException($this->client, 'Shutting down connection due to inactivity'),
), $this->streamTimeout);
$this->timeoutTracker = new StreamTimeoutTracker(
$this->client,
self::getTimeoutQueue(),
$this->connectionTimeout,
fn () => $this->shutdown(new ClientException($this->client, 'Shutting down connection due to inactivity')),
);

$this->processClientInput();
}
Expand Down Expand Up @@ -196,9 +202,12 @@ public function handleClientWithBuffer(string $buffer, ReadableStream $readableS

$this->readableStream = $readableStream;

self::getTimeoutQueue()->insert($this->client, 0, fn () => $this->shutdown(
new ClientException($this->client, 'Shutting down connection due to inactivity'),
), $this->streamTimeout);
$this->timeoutTracker = new StreamTimeoutTracker(
$this->client,
self::getTimeoutQueue(),
$this->connectionTimeout,
fn () => $this->shutdown(new ClientException($this->client, 'Shutting down connection due to inactivity')),
);

$this->processClientInput($buffer);
}
Expand Down Expand Up @@ -231,7 +240,6 @@ private function processClientInput(?string $chunk = null): void
));
} finally {
$parser->cancel();
self::getTimeoutQueue()->remove($this->client, 0);
}
}

Expand Down Expand Up @@ -646,10 +654,9 @@ private function createStream(int $id, int $bodySizeLimit, int $flags = Http2Str
\assert(!isset($this->streams[$id]));

if ($id & 1) {
self::getTimeoutQueue()->insert(
$this->client,
$this->timeoutTracker->insert(
$id,
fn () => $this->releaseStream(
fn (int $id) => $this->releaseStream(
$id,
new ClientException($this->client, "Closing stream due to inactivity"),
),
Expand All @@ -672,7 +679,7 @@ private function releaseStream(int $id, ?ClientException $exception = null): voi
$this->streams[$id]->deferredCancellation->cancel();

if ($id & 1) {
self::getTimeoutQueue()->remove($this->client, $id);
$this->timeoutTracker->remove($id);
}

($this->bodyQueues[$id] ?? null)?->error(
Expand All @@ -692,10 +699,15 @@ private function releaseStream(int $id, ?ClientException $exception = null): voi

private function updateTimeout(int $id): void
{
self::getTimeoutQueue()->update($this->client, 0, $this->connectionTimeout);
if ($id & 1) {
$this->timeoutTracker->update($id, $this->streamTimeout);
}
}

private function suspendTimeout(int $id): void
{
if ($id & 1) {
self::getTimeoutQueue()->update($this->client, $id, $this->streamTimeout);
$this->timeoutTracker->suspend($id);
}
}

Expand Down Expand Up @@ -786,7 +798,7 @@ public function handlePing(string $data): void
{
if (!$this->pinged) {
// Ensure there are a few extra seconds for request after first ping.
self::getTimeoutQueue()->update($this->client, 0, 5);
$this->timeoutTracker->ping(5);
}

$this->pinged++;
Expand Down Expand Up @@ -899,8 +911,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
// Header frames can be received on previously opened streams (trailer headers).
$this->remoteStreamId = \max($streamId, $this->remoteStreamId);

$this->updateTimeout($streamId);

if (isset($this->trailerDeferreds[$streamId]) && $stream->state & Http2Stream::RESERVED) {
if (!$streamEnded) {
throw new Http2ConnectionException(
Expand Down Expand Up @@ -932,12 +942,16 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool

unset($this->bodyQueues[$streamId], $this->trailerDeferreds[$streamId]);

$this->suspendTimeout($streamId);

$queue->complete();
$deferred->complete($headers);

return;
}

$this->updateTimeout($streamId);

if ($stream->state & Http2Stream::RESERVED) {
throw new Http2StreamException(
"Stream already reserved",
Expand Down
84 changes: 84 additions & 0 deletions src/Driver/Internal/StreamTimeoutTracker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

namespace Amp\Http\Server\Driver\Internal;

use Amp\Http\Server\Driver\Client;
use function Amp\async;
use function Amp\weakClosure;

/** @internal */
final class StreamTimeoutTracker
{
private readonly \Closure $onStreamTimeout;

/** @var array<int, \Closure(int): void> */
private array $callbacks = [];

private int $pingTimeout = 0;

/**
* @param \Closure():void $onConnectionTimeout
*/
public function __construct(
private readonly Client $client,
private readonly TimeoutQueue $timeoutQueue,
private readonly int $connectionTimeout,
\Closure $onConnectionTimeout,
) {
$this->onStreamTimeout = weakClosure(function (Client $client, int $streamId): void {
\assert(isset($this->callbacks[$streamId]), "Callback missing for stream ID " . $streamId);

$callback = $this->callbacks[$streamId];
unset($this->callbacks[$streamId]);

async($callback, $streamId)->ignore();

if (!$this->callbacks) {
$this->timeoutQueue->update($this->client, 0, \min(0, $this->pingTimeout - \time()));
}
});

$timeoutQueue->insert($this->client, 0, $onConnectionTimeout, $this->connectionTimeout);
}

public function __destruct()
{
$this->timeoutQueue->remove($this->client, 0);
}

public function insert(int $streamId, \Closure $onTimeout, int $timeout): void
{
$this->timeoutQueue->insert($this->client, $streamId, $this->onStreamTimeout, $timeout);
$this->callbacks[$streamId] = $onTimeout;
$this->timeoutQueue->suspend($this->client, 0);
}

public function ping(int $timeout): void
{
$this->pingTimeout = \time() + $timeout;

if (!$this->callbacks) {
$this->timeoutQueue->update($this->client, 0, $timeout);
}
}

public function update(int $streamId, int $timeout): void
{
$this->timeoutQueue->update($this->client, $streamId, $timeout);
}

public function suspend(int $streamId): void
{
$this->timeoutQueue->suspend($this->client, $streamId);
}

public function remove(int $streamId): void
{
$this->timeoutQueue->remove($this->client, $streamId);
unset($this->callbacks[$streamId]);

if (!$this->callbacks) {
$this->timeoutQueue->update($this->client, 0, $this->connectionTimeout);
}
}
}
10 changes: 9 additions & 1 deletion src/Driver/Internal/TimeoutQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,19 @@ private function makeId(Client $client, int $streamId): string
public function update(Client $client, int $streamId, int $timeout): void
{
$cacheId = $this->makeId($client, $streamId);
\assert(isset($this->callbacks[$cacheId]));
\assert(isset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId]));

$this->timeoutCache->update($cacheId, $this->now + $timeout);
}

public function suspend(Client $client, int $streamId): void
{
$cacheId = $this->makeId($client, $streamId);
\assert(isset($this->callbacks[$cacheId], $this->streamNames[$client][$streamId]));

$this->timeoutCache->clear($cacheId);
}

/**
* Remove the given stream ID.
*/
Expand Down
2 changes: 1 addition & 1 deletion test/Driver/Http2DriverTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ public function testStreamClosesWhileAwaitingResponseRead(): void

$requestQueue->push(self::packFrame(\pack("N", 0), Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, 1));

delay(0); // Invoke onDispose handler.
delay(0.1); // Invoke onDispose handler.

self::assertTrue($invoked);
}
Expand Down

0 comments on commit 8870bdd

Please sign in to comment.