Skip to content

Commit

Permalink
Add files via upload
Browse files Browse the repository at this point in the history
  • Loading branch information
VennDev authored Aug 9, 2024
1 parent 0e03338 commit a27fa37
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 44 deletions.
68 changes: 30 additions & 38 deletions src/vennv/vapm/EventLoop.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

namespace vennv\vapm;

use Generator;
use SplObjectStorage;
use Throwable;
use function count;
Expand All @@ -44,9 +45,9 @@ public static function isQueue(int $id): bool;
public static function getQueue(int $id): ?Promise;

/**
* @return SplObjectStorage
* @return Generator
*/
public static function getQueues(): SplObjectStorage;
public static function getQueues(): Generator;

public static function addReturn(Promise $promise): void;

Expand All @@ -57,9 +58,9 @@ public static function isReturn(int $id): bool;
public static function getReturn(int $id): ?Promise;

/**
* @return SplObjectStorage
* @return Generator
*/
public static function getReturns(): SplObjectStorage;
public static function getReturns(): Generator;

}

Expand All @@ -74,14 +75,14 @@ class EventLoop implements EventLoopInterface
protected static SplObjectStorage $queues;

/**
* @var SplObjectStorage
* @var array<int, Promise>
*/
protected static SplObjectStorage $returns;
protected static array $returns = [];
protected static bool $isCleaningGarbage = false;

public static function init(): void
{
if (!isset(self::$queues)) self::$queues = new SplObjectStorage();
if (!isset(self::$returns)) self::$returns = new SplObjectStorage();
}

public static function generateId(): int
Expand All @@ -97,11 +98,8 @@ public static function addQueue(Promise $promise): void

public static function removeQueue(int $id): void
{
/**
* @var Promise $promise
*/
foreach (self::$queues as $promise) {
if ($promise->getId() === $id) {
if ($promise instanceof Promise && $promise->getId() === $id) {
self::$queues->offsetUnset($promise);
break;
}
Expand All @@ -123,57 +121,51 @@ public static function getQueue(int $id): ?Promise
}

/**
* @return SplObjectStorage
* @return Generator
*/
public static function getQueues(): SplObjectStorage
public static function getQueues(): Generator
{
return self::$queues;
foreach (self::$queues as $promise) {
yield $promise;
}
}

public static function addReturn(Promise $promise): void
{
if (!self::getReturn($promise->getId())) self::$returns->offsetSet($promise);
if (!isset(self::$returns[$promise->getId()])) self::$returns[$promise->getId()] = $promise;
}

public static function isReturn(int $id): bool
{
/* @var Promise $promise */
foreach (self::$returns as $promise) if ($promise instanceof Promise && $promise->getId() === $id) return true;
return false;
return isset(self::$returns[$id]);
}

public static function removeReturn(int $id): void
{
/**
* @var Promise $promise
*/
foreach (self::$returns as $promise) {
if ($promise->getId() === $id) {
self::$returns->offsetUnset($promise);
break;
}
}
if (self::isReturn($id)) unset(self::$returns[$id]);
}

public static function getReturn(int $id): ?Promise
{
/* @var Promise $promise */
foreach (self::$returns as $promise) if ($promise instanceof Promise && $promise->getId() === $id) return $promise;
return null;
return self::$returns[$id] ?? null;
}

/**
* @return SplObjectStorage
* @return Generator
*/
public static function getReturns(): SplObjectStorage
public static function getReturns(): Generator
{
return self::$returns;
foreach (self::$returns as $id => $promise) {
yield $id => $promise;
}
}

/**
* @throws Throwable
*/
private static function clearGarbage(): void
{
/* @var Promise $promise */
foreach (self::$returns as $promise) if ($promise instanceof Promise && $promise->canDrop()) self::removeReturn($promise->getId());
foreach (self::getReturns() as $id => $promise) if ($promise instanceof Promise && $promise->canDrop()) unset(self::$returns[$id]);
}

/**
Expand All @@ -186,7 +178,7 @@ protected static function run(): void
/**
* @var Promise $promise
*/
foreach (self::$queues as $promise) {
foreach (self::getQueues() as $promise) {
$id = $promise->getId();
$fiber = $promise->getFiber();

Expand All @@ -203,7 +195,7 @@ protected static function run(): void
echo $e->getMessage();
}
MicroTask::addTask($id, $promise);
self::removeQueue($id);
self::$queues->offsetUnset($promise); // Remove from queue
}
}

Expand All @@ -221,4 +213,4 @@ protected static function runSingle(): void
while (count(self::$queues) > 0 || count(MicroTask::getTasks()) > 0 || count(MacroTask::getTasks()) > 0 || count(GreenThread::getFibers()) > 0) self::run();
}

}
}
21 changes: 15 additions & 6 deletions src/vennv/vapm/Promise.php
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public function finally(callable $callback): Promise
return $this;
}

/**
/**
* @throws Throwable
*/
public function useCallbacks(): void
Expand All @@ -402,11 +402,12 @@ public function useCallbacks(): void
$callbacks = $this->callbacksResolve;

/** @var callable $master */
$master = $callbacks["master"];

$this->result = call_user_func($master, $result);
$master = $callbacks["master"] ?? null;

unset($callbacks["master"]);
if (is_callable($master)) {
$this->result = call_user_func($master, $result);
unset($callbacks["master"]);
}

if (count($callbacks) > 0) {
/** @var callable $callback */
Expand Down Expand Up @@ -497,6 +498,7 @@ public static function all(array $promises): Promise
if ($return?->isRejected() === true) {
$reject($return->getResult());
$isSolved = true;
break;
}

if ($return?->isResolved() === true) {
Expand All @@ -509,6 +511,7 @@ public static function all(array $promises): Promise
$resolve($results);
$isSolved = true;
}
FiberManager::wait();
}

if (!$isSolved) FiberManager::wait();
Expand Down Expand Up @@ -549,6 +552,7 @@ public static function allSettled(array $promises): Promise
$resolve($results);
$isSolved = true;
}
FiberManager::wait();
}

if (!$isSolved === false) FiberManager::wait();
Expand Down Expand Up @@ -587,13 +591,15 @@ public static function any(array $promises): Promise
if ($return?->isResolved() === true) {
$resolve($return->getResult());
$isSolved = true;
break;
}
}

if (count($results) === $count) {
$reject($results);
$isSolved = true;
}
FiberManager::wait();
}

if ($isSolved === false) FiberManager::wait();
Expand Down Expand Up @@ -624,13 +630,16 @@ public static function race(array $promises): Promise
if ($return?->isRejected() === true) {
$reject($return->getResult());
$isSolved = true;
break;
}

if ($return?->isResolved() === true) {
$resolve($return->getResult());
$isSolved = true;
break;
}
}
FiberManager::wait();
}

if ($isSolved === false) FiberManager::wait();
Expand All @@ -642,4 +651,4 @@ public static function race(array $promises): Promise
return $promise;
}

}
}

0 comments on commit a27fa37

Please sign in to comment.