Skip to content

Commit

Permalink
Add Garbage Collection for Eventloop!
Browse files Browse the repository at this point in the history
  • Loading branch information
VennDev authored Aug 30, 2024
1 parent cf444d3 commit f25aa0a
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 3 deletions.
2 changes: 2 additions & 0 deletions src/vennv/vapm/AwaitGroup.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,10 @@ public function reset(): void

public function wait(): void
{
$gc = new GarbageCollection();
while ($this->count > 0) {
CoroutineGen::run();
$gc->collectWL();
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/vennv/vapm/CoroutineGen.php
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ public static function runNonBlocking(mixed ...$coroutines): void
public static function runBlocking(mixed ...$coroutines): void
{
self::runNonBlocking(...$coroutines);

$gc = new GarbageCollection();
while (!self::$taskQueue?->isEmpty()) {
self::run();
$gc->collectWL();
}
}

Expand All @@ -141,11 +144,13 @@ private static function processCoroutine(mixed ...$coroutines): Closure

public static function repeat(callable $callback, int $times): Closure
{
$gc = new GarbageCollection();
for ($i = 0; $i < $times; $i++) {
$result = $callback();
if ($result instanceof Generator) {
$callback = self::processCoroutine($result);
}
$gc->collectWL();
}
return fn() => null;
}
Expand Down
2 changes: 2 additions & 0 deletions src/vennv/vapm/Deferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public static function awaitAll(DeferredInterface ...$deferreds): Generator
} else {
$childCoroutine->run();
}
yield;
}

yield;
Expand All @@ -134,6 +135,7 @@ public static function awaitAny(DeferredInterface ...$deferreds): Generator
} else {
$childCoroutine->run();
}
yield;
}

yield;
Expand Down
21 changes: 18 additions & 3 deletions src/vennv/vapm/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ public static function getReturns(): Generator
*/
private static function clearGarbage(): void
{
foreach (self::getReturns() as $id => $promise) if ($promise instanceof Promise && $promise->canDrop()) unset(self::$returns[$id]);
$gc = new GarbageCollection();
foreach (self::getReturns() as $id => $promise) {
if ($promise instanceof Promise && $promise->canDrop()) unset(self::$returns[$id]);
$gc->collectWL();
}
}

/**
Expand All @@ -149,7 +153,10 @@ protected static function run(): void
$promise = self::$queues->dequeue();
$fiber = $promise->getFiber();
if ($fiber->isSuspended()) $fiber->resume();
if ($fiber->isTerminated() && ($promise->getStatus() !== StatusPromise::PENDING || $promise->isJustGetResult())) {
if (
$fiber->isTerminated() &&
($promise->getStatus() !== StatusPromise::PENDING || $promise->isJustGetResult())
) {
try {
$promise->isJustGetResult() && $promise->setResult($fiber->getReturn());
} catch (Throwable $e) {
Expand All @@ -172,7 +179,15 @@ protected static function run(): void
*/
protected static function runSingle(): void
{
while (!self::$queues->isEmpty() || (CoroutineGen::getTaskQueue() !== null && !CoroutineGen::getTaskQueue()->isEmpty()) || MicroTask::isPrepare() || MacroTask::isPrepare()) self::run();
$gc = new GarbageCollection();
while (
!self::$queues->isEmpty() ||
(CoroutineGen::getTaskQueue() !== null && !CoroutineGen::getTaskQueue()->isEmpty()) ||
MicroTask::isPrepare() || MacroTask::isPrepare()
) {
self::run();
$gc->collectWL();
}
}

}
88 changes: 88 additions & 0 deletions src/vennv/vapm/GarbageCollection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?php

/**
* Vapm - A library support for PHP about Async, Promise, Coroutine, Thread, GreenThread
* and other non-blocking methods. The library also includes some Javascript packages
* such as Express. The method is based on Fibers & Generator & Processes, requires
* you to have php version from >= 8.1
*
* Copyright (C) 2023 VennDev
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/

declare(strict_types=1);

namespace vennv\vapm;

use function gc_collect_cycles;

interface GarbageCollectionInterface
{
/**
* Set the limit of the loop to collect garbage
*
* @param int $limit
* @return void
*/
public function setLimitLoop(int $limit): void;

/**
* Collect garbage with limit
*
* @return void
*/
public function collectWL(): void;

/**
* Collect garbage
*
* @return void
*/
public static function collect(): void;

}

final class GarbageCollection implements GarbageCollectionInterface
{


private int $countLoop = 0;

/**
* @param int $limitLoop The limit of the loop to collect garbage
*/
public function __construct(private int $limitLoop = 1000)
{
// TODO: Implement __construct() method.
}

public function setLimitLoop(int $limit): void
{
$this->limitLoop = $limit;
}

public function collectWL(): void
{
if ($this->countLoop >= $this->limitLoop) {
gc_collect_cycles();
$this->countLoop = 0;
} else {
++$this->countLoop;
}
}

public static function collect(): void
{
gc_collect_cycles();
}

}
2 changes: 2 additions & 0 deletions src/vennv/vapm/MacroTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,14 @@ public static function isPrepare(): bool

public static function run(): void
{
$gc = new GarbageCollection();
foreach (self::getTasks() as $task) {
/** @var SampleMacro $task */
if ($task->checkTimeOut()) {
$task->run();
!$task->isRepeat() ? self::removeTask($task) : $task->resetTimeOut();
}
$gc->collectWL();
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/vennv/vapm/MicroTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,15 @@ public static function isPrepare(): bool
*/
public static function run(): void
{
$gc = new GarbageCollection();
foreach (self::getTasks() as $id => $promise) {
/** @var Promise $promise */
$promise->useCallbacks();
$promise->setTimeEnd(microtime(true));
EventLoop::addReturn($promise);
/** @var int $id */
self::removeTask($id);
$gc->collectWL();
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/vennv/vapm/Work.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ public function getAll(): Generator

public function run(): void
{
$gc = new GarbageCollection();
while (!$this->queue->isEmpty()) {
$work = $this->queue->dequeue();
if (is_callable($work)) $work();
$gc->collectWL();
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/vennv/vapm/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ public function run(callable $callback): Async
if ($threads >= 1) {
$promises = [];
$totalCountWorks = $work->count();

$gc = new GarbageCollection();
while ($this->isLocked() || $totalCountWorks > 0) {
if (!$this->isLocked()) {
if (count($promises) < $threads && $work->count() > 0) {
Expand All @@ -247,6 +249,7 @@ public function run(callable $callback): Async
}
}
}
$gc->collectWL();
FiberManager::wait();
}

Expand All @@ -264,6 +267,8 @@ public function run(callable $callback): Async
$this->collect($worker->get());
$worker->done();
}
FiberManager::wait();
$gc->collectWL();
}

$data = Async::await(Stream::flattenArray($this->get()));
Expand Down

0 comments on commit f25aa0a

Please sign in to comment.