Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/Parallel/Pool/Swoole/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -350,21 +350,36 @@ public function shutdown(): void
if (\is_int($pid) && isset($pidsToWait[$pid])) {
unset($pidsToWait[$pid]);
}
} else {
if (\time() - $startTime > $maxWaitTime) {
foreach ($this->workers as $worker) {
if (isset($pidsToWait[$worker->pid])) {
SwooleProcess::kill($worker->pid, SIGKILL);
}
}
break;
continue;
}

// No child was ready to reap. Drop any workers that have already
// exited and been reaped elsewhere so we neither wait on nor signal
// a dead PID. kill($pid, 0) only probes for existence (no signal is
// sent) and, unlike SIGKILL, does not warn when the PID is gone.
foreach (\array_keys($pidsToWait) as $pid) {
if (!SwooleProcess::kill($pid, 0)) {
unset($pidsToWait[$pid]);
}
}

if (SwooleCoroutine::getCid() > 0) {
SwooleCoroutine::sleep(0.001); // 1ms
} else {
\usleep(1000);
if (empty($pidsToWait)) {
break;
}

if (\time() - $startTime > $maxWaitTime) {
foreach (\array_keys($pidsToWait) as $pid) {
if (SwooleProcess::kill($pid, 0)) {
SwooleProcess::kill($pid, SIGKILL);
}
}
break;
}

if (SwooleCoroutine::getCid() > 0) {
SwooleCoroutine::sleep(0.001); // 1ms
} else {
\usleep(1000);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Promise/Adapter/Swoole/Coroutine.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ public static function all(array $promises): static
$channel->push(true);
return $value;
}, function ($err) use ($channel, &$error) {
$channel->push(true);
if ($error === null) {
$error = $err;
}
$channel->push(true);
});
$key++;
}
Expand Down Expand Up @@ -159,7 +159,7 @@ public static function race(array $promises): static
{
return self::create(function (callable $resolve, callable $reject) use ($promises) {
if (empty($promises)) {
$reject(new PromiseException('Cannot race with an empty array of promises'));
$reject(new Promise('Cannot race with an empty array of promises'));
return;
}

Expand Down
1 change: 1 addition & 0 deletions src/Timer/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* Static methods are provided for API consistency with other facades.
*
* @internal Use Utopia\Async\Timer facade instead
* @phpstan-consistent-constructor
* @package Utopia\Async\Timer
*/
abstract class Adapter
Expand Down
18 changes: 2 additions & 16 deletions src/Timer/Adapter/Sync.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@
*/
class Sync extends Adapter
{
/**
* Track whether tick timers should continue running
*
* @var array<int, bool>
*/
private array $running = [];

/**
* Sync adapter is always supported as it has no dependencies.
*
Expand Down Expand Up @@ -81,15 +74,10 @@ protected function doTick(int $milliseconds, callable $callback): int
'interval' => $milliseconds,
'type' => 'tick',
];
$this->running[$timerId] = true;

while ($this->running[$timerId] ?? false) {
while ($this->doExists($timerId)) {
\usleep($milliseconds * 1000);
if (isset($this->timers[$timerId]) && ($this->running[$timerId] ?? false)) {
$callback($timerId);
} else {
break;
}
$callback($timerId);
}

return $timerId;
Expand All @@ -108,7 +96,6 @@ protected function doClear(int $timerId): bool
}

unset($this->timers[$timerId]);
unset($this->running[$timerId]);

return true;
}
Expand All @@ -120,7 +107,6 @@ protected function doClear(int $timerId): bool
*/
protected function doClearAll(): void
{
$this->running = [];
$this->timers = [];
}
}
28 changes: 28 additions & 0 deletions tests/E2e/Promise/Swoole/CoroutineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,34 @@ public function testAllWithRejection(): void
});
}

public function testAllRejectsWhenABranchRejectsAfterYielding(): void
{
// Regression: the rejection handler in all() must record the error
// before signalling the channel. If it pushes first, the awaiting
// coroutine can drain the channel and read a still-null error, causing
// all() to resolve instead of reject. Async branches that yield before
// rejecting (the realistic case) are what expose the ordering.
SwooleCoroutine\run(function () {
$promises = [
Coroutine::async(function () {
SwooleCoroutine::sleep(0.01);
throw new \Exception('delayed failure');
}),
Coroutine::async(function () {
SwooleCoroutine::sleep(0.01);
return 'ok';
}),
];

try {
Coroutine::all($promises)->await();
$this->fail('Expected rejection to propagate from all()');
} catch (\Exception $e) {
$this->assertEquals('delayed failure', $e->getMessage());
}
});
}

public function testRace(): void
{
SwooleCoroutine\run(function () {
Expand Down
2 changes: 0 additions & 2 deletions tests/E2e/Timer/Swoole/CoroutineTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ public function testAfterReturnsTimerId(): void
SwooleCoroutine\run(function () {
$timerId = Coroutine::after(100, function () {});

$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);

Coroutine::clear($timerId);
Expand All @@ -160,7 +159,6 @@ public function testTickReturnsTimerId(): void
SwooleCoroutine\run(function () {
$timerId = Coroutine::tick(100, function () {});

$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);

Coroutine::clear($timerId);
Expand Down
18 changes: 4 additions & 14 deletions tests/E2e/Timer/SyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,15 @@ public function testTickWithExternalClear(): void
public function testAfterReturnsTimerId(): void
{
$timerId = Sync::after(1, function () {});
$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);
}

public function testTickReturnsTimerId(): void
{
$count = 0;
$timerId = Sync::tick(1, function (int $id) use (&$count) {
$count++;
if ($count >= 1) {
Sync::clear($id);
}
$timerId = Sync::tick(1, function (int $id) {
Sync::clear($id);
});

$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);
}

Expand All @@ -136,14 +130,10 @@ public function testTimerIdsAreUnique(): void
public function testCallbackReceivesTimerId(): void
{
$receivedId = null;
$count = 0;

Sync::tick(1, function (int $id) use (&$receivedId, &$count) {
Sync::tick(1, function (int $id) use (&$receivedId) {
$receivedId = $id;
$count++;
if ($count >= 1) {
Sync::clear($id);
}
Sync::clear($id);
});

$this->assertIsInt($receivedId);
Expand Down
11 changes: 2 additions & 9 deletions tests/Unit/TimerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public function testFacadeAutoDetectsAdapter(): void

$timerId = Timer::after(1, function () {});

$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);
}

Expand All @@ -140,23 +139,17 @@ public function testAfterReturnsTimerId(): void

$timerId = Timer::after(1, function () {});

$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);
}

public function testTickReturnsTimerId(): void
{
Timer::setAdapter(Sync::class);

$count = 0;
$timerId = Timer::tick(1, function (int $id) use (&$count) {
$count++;
if ($count >= 1) {
Timer::clear($id);
}
$timerId = Timer::tick(1, function (int $id) {
Timer::clear($id);
});

$this->assertIsInt($timerId);
$this->assertGreaterThan(0, $timerId);
}
}
Loading