diff --git a/src/vennv/vapm/AwaitGroup.php b/src/vennv/vapm/AwaitGroup.php index 8f27ef4e..2a6a23ff 100644 --- a/src/vennv/vapm/AwaitGroup.php +++ b/src/vennv/vapm/AwaitGroup.php @@ -112,8 +112,10 @@ public function reset(): void public function wait(): void { + $gc = new GarbageCollection(); while ($this->count > 0) { CoroutineGen::run(); + $gc->collectWL(); } } diff --git a/src/vennv/vapm/CoroutineGen.php b/src/vennv/vapm/CoroutineGen.php index b445af68..f67fc01b 100644 --- a/src/vennv/vapm/CoroutineGen.php +++ b/src/vennv/vapm/CoroutineGen.php @@ -117,8 +117,11 @@ public static function runNonBlocking(mixed ...$coroutines): void public static function runBlocking(mixed ...$coroutines): void { self::runNonBlocking(...$coroutines); + + $gc = new GarbageCollection(); while (!self::$taskQueue?->isEmpty()) { self::run(); + $gc->collectWL(); } } @@ -141,11 +144,13 @@ private static function processCoroutine(mixed ...$coroutines): Closure public static function repeat(callable $callback, int $times): Closure { + $gc = new GarbageCollection(); for ($i = 0; $i < $times; $i++) { $result = $callback(); if ($result instanceof Generator) { $callback = self::processCoroutine($result); } + $gc->collectWL(); } return fn() => null; } diff --git a/src/vennv/vapm/Deferred.php b/src/vennv/vapm/Deferred.php index 71410c4d..6d8516e4 100644 --- a/src/vennv/vapm/Deferred.php +++ b/src/vennv/vapm/Deferred.php @@ -111,6 +111,7 @@ public static function awaitAll(DeferredInterface ...$deferreds): Generator } else { $childCoroutine->run(); } + yield; } yield; @@ -134,6 +135,7 @@ public static function awaitAny(DeferredInterface ...$deferreds): Generator } else { $childCoroutine->run(); } + yield; } yield; diff --git a/src/vennv/vapm/EventLoop.php b/src/vennv/vapm/EventLoop.php index c9589dfd..5d02a661 100644 --- a/src/vennv/vapm/EventLoop.php +++ b/src/vennv/vapm/EventLoop.php @@ -133,7 +133,11 @@ public static function getReturns(): Generator */ private static function clearGarbage(): void { - foreach (self::getReturns() as $id => $promise) if ($promise instanceof Promise && $promise->canDrop()) unset(self::$returns[$id]); + $gc = new GarbageCollection(); + foreach (self::getReturns() as $id => $promise) { + if ($promise instanceof Promise && $promise->canDrop()) unset(self::$returns[$id]); + $gc->collectWL(); + } } /** @@ -149,7 +153,10 @@ protected static function run(): void $promise = self::$queues->dequeue(); $fiber = $promise->getFiber(); if ($fiber->isSuspended()) $fiber->resume(); - if ($fiber->isTerminated() && ($promise->getStatus() !== StatusPromise::PENDING || $promise->isJustGetResult())) { + if ( + $fiber->isTerminated() && + ($promise->getStatus() !== StatusPromise::PENDING || $promise->isJustGetResult()) + ) { try { $promise->isJustGetResult() && $promise->setResult($fiber->getReturn()); } catch (Throwable $e) { @@ -172,7 +179,15 @@ protected static function run(): void */ protected static function runSingle(): void { - while (!self::$queues->isEmpty() || (CoroutineGen::getTaskQueue() !== null && !CoroutineGen::getTaskQueue()->isEmpty()) || MicroTask::isPrepare() || MacroTask::isPrepare()) self::run(); + $gc = new GarbageCollection(); + while ( + !self::$queues->isEmpty() || + (CoroutineGen::getTaskQueue() !== null && !CoroutineGen::getTaskQueue()->isEmpty()) || + MicroTask::isPrepare() || MacroTask::isPrepare() + ) { + self::run(); + $gc->collectWL(); + } } } diff --git a/src/vennv/vapm/GarbageCollection.php b/src/vennv/vapm/GarbageCollection.php new file mode 100644 index 00000000..d1d82fba --- /dev/null +++ b/src/vennv/vapm/GarbageCollection.php @@ -0,0 +1,88 @@ += 8.1 + * + * Copyright (C) 2023 VennDev + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + */ + +declare(strict_types=1); + +namespace vennv\vapm; + +use function gc_collect_cycles; + +interface GarbageCollectionInterface +{ + /** + * Set the limit of the loop to collect garbage + * + * @param int $limit + * @return void + */ + public function setLimitLoop(int $limit): void; + + /** + * Collect garbage with limit + * + * @return void + */ + public function collectWL(): void; + + /** + * Collect garbage + * + * @return void + */ + public static function collect(): void; + +} + +final class GarbageCollection implements GarbageCollectionInterface +{ + + + private int $countLoop = 0; + + /** + * @param int $limitLoop The limit of the loop to collect garbage + */ + public function __construct(private int $limitLoop = 1000) + { + // TODO: Implement __construct() method. + } + + public function setLimitLoop(int $limit): void + { + $this->limitLoop = $limit; + } + + public function collectWL(): void + { + if ($this->countLoop >= $this->limitLoop) { + gc_collect_cycles(); + $this->countLoop = 0; + } else { + ++$this->countLoop; + } + } + + public static function collect(): void + { + gc_collect_cycles(); + } + +} diff --git a/src/vennv/vapm/MacroTask.php b/src/vennv/vapm/MacroTask.php index a35ff987..a43f633b 100644 --- a/src/vennv/vapm/MacroTask.php +++ b/src/vennv/vapm/MacroTask.php @@ -73,12 +73,14 @@ public static function isPrepare(): bool public static function run(): void { + $gc = new GarbageCollection(); foreach (self::getTasks() as $task) { /** @var SampleMacro $task */ if ($task->checkTimeOut()) { $task->run(); !$task->isRepeat() ? self::removeTask($task) : $task->resetTimeOut(); } + $gc->collectWL(); } } diff --git a/src/vennv/vapm/MicroTask.php b/src/vennv/vapm/MicroTask.php index 2bc8a569..daa0d250 100644 --- a/src/vennv/vapm/MicroTask.php +++ b/src/vennv/vapm/MicroTask.php @@ -68,6 +68,7 @@ public static function isPrepare(): bool */ public static function run(): void { + $gc = new GarbageCollection(); foreach (self::getTasks() as $id => $promise) { /** @var Promise $promise */ $promise->useCallbacks(); @@ -75,6 +76,7 @@ public static function run(): void EventLoop::addReturn($promise); /** @var int $id */ self::removeTask($id); + $gc->collectWL(); } } diff --git a/src/vennv/vapm/Work.php b/src/vennv/vapm/Work.php index 6308e99b..e00903f3 100644 --- a/src/vennv/vapm/Work.php +++ b/src/vennv/vapm/Work.php @@ -149,9 +149,11 @@ public function getAll(): Generator public function run(): void { + $gc = new GarbageCollection(); while (!$this->queue->isEmpty()) { $work = $this->queue->dequeue(); if (is_callable($work)) $work(); + $gc->collectWL(); } } diff --git a/src/vennv/vapm/Worker.php b/src/vennv/vapm/Worker.php index 29fb1819..362eb590 100644 --- a/src/vennv/vapm/Worker.php +++ b/src/vennv/vapm/Worker.php @@ -225,6 +225,8 @@ public function run(callable $callback): Async if ($threads >= 1) { $promises = []; $totalCountWorks = $work->count(); + + $gc = new GarbageCollection(); while ($this->isLocked() || $totalCountWorks > 0) { if (!$this->isLocked()) { if (count($promises) < $threads && $work->count() > 0) { @@ -247,6 +249,7 @@ public function run(callable $callback): Async } } } + $gc->collectWL(); FiberManager::wait(); } @@ -264,6 +267,8 @@ public function run(callable $callback): Async $this->collect($worker->get()); $worker->done(); } + FiberManager::wait(); + $gc->collectWL(); } $data = Async::await(Stream::flattenArray($this->get()));