Skip to content

Commit

Permalink
Update Eventloop + CoroutineGen!
Browse files Browse the repository at this point in the history
  • Loading branch information
VennDev authored Aug 29, 2024
1 parent 13ab195 commit 781971e
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 73 deletions.
6 changes: 6 additions & 0 deletions src/vennv/vapm/AwaitGroup.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@

use Generator;

/**
* @author VennDev <[email protected]>
* @package vennv\vapm
*
* This interface is used to create a await group object that can be used to wait for a group of coroutines to complete.
*/
interface AwaitGroupInterface
{

Expand Down
35 changes: 20 additions & 15 deletions src/vennv/vapm/CoroutineGen.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
use SplQueue;
use Generator;
use Throwable;
use function call_user_func;

interface CoroutineGenInterface
{
Expand Down Expand Up @@ -102,13 +101,12 @@ public static function runNonBlocking(mixed ...$coroutines): void
System::init();
self::$taskQueue ??= new SplQueue();
foreach ($coroutines as $coroutine) {
if (is_callable($coroutine)) $coroutine = call_user_func($coroutine);
if ($coroutine instanceof Generator) {
self::schedule(new ChildCoroutine($coroutine));
} else {
call_user_func(fn() => $coroutine);
}
$result = is_callable($coroutine) ? $coroutine() : $coroutine;
$result instanceof Generator
? self::schedule(new ChildCoroutine($result))
: $result;
}
self::run();
}

/**
Expand All @@ -119,7 +117,9 @@ public static function runNonBlocking(mixed ...$coroutines): void
public static function runBlocking(mixed ...$coroutines): void
{
self::runNonBlocking(...$coroutines);
while (self::$taskQueue?->isEmpty() === false) self::run();
while (!self::$taskQueue?->isEmpty()) {
self::run();
}
}

/**
Expand All @@ -130,18 +130,23 @@ private static function processCoroutine(mixed ...$coroutines): Closure
{
return function () use ($coroutines): void {
foreach ($coroutines as $coroutine) {
if (is_callable($coroutine)) {
$coroutine = call_user_func($coroutine);
}
!$coroutine instanceof Generator ? call_user_func(fn() => $coroutine) : self::schedule(new ChildCoroutine($coroutine));
$result = is_callable($coroutine) ? $coroutine() : $coroutine;
$result instanceof Generator
? self::schedule(new ChildCoroutine($result))
: $result;
}
self::run();
};
}

public static function repeat(callable $callback, int $times): Closure
{
for ($i = 0; $i <= $times; $i++) if (call_user_func($callback) instanceof Generator) $callback = self::processCoroutine($callback);
for ($i = 0; $i < $times; $i++) {
$result = $callback();
if ($result instanceof Generator) {
$callback = self::processCoroutine($result);
}
}
return fn() => null;
}

Expand All @@ -161,8 +166,8 @@ private static function schedule(ChildCoroutine $childCoroutine): void
*/
public static function run(): void
{
if (self::$taskQueue?->isEmpty() === false) {
$coroutine = self::$taskQueue->dequeue();
if (!self::$taskQueue?->isEmpty()) {
$coroutine = self::$taskQueue?->dequeue();
if ($coroutine instanceof ChildCoroutine && !$coroutine->isFinished()) {
self::schedule($coroutine->run());
}
Expand Down
6 changes: 6 additions & 0 deletions src/vennv/vapm/Deferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@

use Generator;

/**
* @author VennDev <[email protected]>
* @package vennv\vapm
*
* This interface is used to create a deferred object that can be used to get the result of a coroutine.
*/
interface DeferredInterface
{

Expand Down
27 changes: 10 additions & 17 deletions src/vennv/vapm/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
use SplQueue;
use Generator;
use Throwable;
use function count;
use const PHP_INT_MAX;

interface EventLoopInterface
Expand Down Expand Up @@ -58,12 +57,12 @@ public static function getReturns(): Generator;
class EventLoop implements EventLoopInterface
{

protected const LIMIT = 20;
protected const LIMIT = 20; // 20 times run

protected static int $nextId = 0;

/**
* @var SplQueue
* @var SplQueue<Promise>
*/
protected static SplQueue $queues;

Expand All @@ -74,7 +73,7 @@ class EventLoop implements EventLoopInterface

public static function init(): void
{
if (!isset(self::$queues)) self::$queues = new SplQueue();
self::$queues ??= new SplQueue();
}

public static function generateId(): int
Expand Down Expand Up @@ -142,34 +141,28 @@ private static function clearGarbage(): void
*/
protected static function run(): void
{
CoroutineGen::run(); // Run CoroutineGen
CoroutineGen::run();

$i = 0;
while (!self::$queues->isEmpty()) {
if ($i++ >= self::LIMIT) break;
/**
* @var Promise $promise
*/
while (!self::$queues->isEmpty() && $i++ < self::LIMIT) {
/** @var Promise $promise */
$promise = self::$queues->dequeue();

$id = $promise->getId();
$fiber = $promise->getFiber();

if ($fiber->isSuspended()) $fiber->resume();
if ($fiber->isTerminated() && ($promise->getStatus() !== StatusPromise::PENDING || $promise->isJustGetResult())) {
try {
if ($promise->isJustGetResult()) $promise->setResult($fiber->getReturn());
$promise->isJustGetResult() && $promise->setResult($fiber->getReturn());
} catch (Throwable $e) {
echo $e->getMessage();
}
MicroTask::addTask($id, $promise);
MicroTask::addTask($promise->getId(), $promise);
} else {
self::$queues->enqueue($promise);
}
}

if (MicroTask::isPrepare()) MicroTask::run();
if (MacroTask::isPrepare()) MacroTask::run();
MicroTask::isPrepare() && MicroTask::run();
MacroTask::isPrepare() && MacroTask::run();

self::clearGarbage();
}
Expand Down
7 changes: 7 additions & 0 deletions src/vennv/vapm/Mutex.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@

use Generator;

/**
* @author VennDev <[email protected]>
* @package vennv\vapm
*
* This class is used to create a mutex object that can be used to synchronize access to shared resources.
* Note: this just for coroutine, if you want to use it in other places, you need to implement it yourself.
*/
interface MutexInterface
{

Expand Down
105 changes: 66 additions & 39 deletions src/vennv/vapm/PHPUtils.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

use Throwable;

final class PHPUtils
interface PHPUtilsInterface
{

/**
Expand All @@ -35,7 +35,72 @@ final class PHPUtils
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*
* This function is used to iterate over an array and call a callback function for each element.
*/
public static function forEach(array $array, callable $callback): Async;

/**
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*
* This function is used to map over an array and apply a callback function to each element.
*/
public static function arrayMap(array $array, callable $callback): Async;

/**
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*/
public static function arrayFilter(array $array, callable $callback): Async;

/**
* @param array<int|float|string|object> $array
* @param callable $callback
* @param mixed $initialValue
* @return Async
*
* @throws Throwable
*
* This function is used to reduce an array to a single value by applying a callback function to each element.
*/
public static function arrayReduce(array $array, callable $callback, mixed $initialValue): Async;

/**
* @param array<int|float|string|object> $array
* @param string $className
* @return Async
*
* @throws Throwable
*
* This function is used to check if all elements in an array are instances of a specific class.
*/
public static function instanceOfAll(array $array, string $className): Async;

/**
* @param array<int|float|string|object> $array
* @param string $className
* @return Async
*
* @throws Throwable
*
* This function is used to check if any element in an array is an instance of a specific class.
*/
public static function instanceOfAny(array $array, string $className): Async;

}

final class PHPUtils implements PHPUtilsInterface
{

public static function forEach(array $array, callable $callback): Async
{
return new Async(function () use ($array, $callback) {
Expand All @@ -46,14 +111,6 @@ public static function forEach(array $array, callable $callback): Async
});
}

/**
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*/
public static function arrayMap(array $array, callable $callback): Async
{
return new Async(function () use ($array, $callback) {
Expand All @@ -66,14 +123,6 @@ public static function arrayMap(array $array, callable $callback): Async
});
}

/**
* @param array<int|float|string|object> $array
* @param callable $callback
* @return Async
*
* @phpstan-param array<int|float|string|object> $array
* @throws Throwable
*/
public static function arrayFilter(array $array, callable $callback): Async
{
return new Async(function () use ($array, $callback) {
Expand All @@ -88,14 +137,6 @@ public static function arrayFilter(array $array, callable $callback): Async
});
}

/**
* @param array<int|float|string|object> $array
* @param callable $callback
* @param mixed $initialValue
* @return Async
*
* @throws Throwable
*/
public static function arrayReduce(array $array, callable $callback, mixed $initialValue): Async
{
return new Async(function () use ($array, $callback, $initialValue) {
Expand All @@ -108,13 +149,6 @@ public static function arrayReduce(array $array, callable $callback, mixed $init
});
}

/**
* @param array<int|float|string|object> $array
* @param string $className
* @return Async
*
* @throws Throwable
*/
public static function instanceOfAll(array $array, string $className): Async
{
return new Async(function () use ($array, $className) {
Expand All @@ -126,13 +160,6 @@ public static function instanceOfAll(array $array, string $className): Async
});
}

/**
* @param array<int|float|string|object> $array
* @param string $className
* @return Async
*
* @throws Throwable
*/
public static function instanceOfAny(array $array, string $className): Async
{
return new Async(function () use ($array, $className) {
Expand Down
6 changes: 6 additions & 0 deletions src/vennv/vapm/Promise.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
use function is_callable;
use function microtime;

/**
* @author VennDev <[email protected]>
* @package vennv\vapm
*
* This interface is used to create a promise object that can be used to get the result of a coroutine.
*/
interface PromiseInterface
{

Expand Down
Loading

0 comments on commit 781971e

Please sign in to comment.