Skip to content

Commit f574662

Browse files
committed
Allow queueing of callbacks in destructors
1 parent 358572c commit f574662

3 files changed

Lines changed: 158 additions & 22 deletions

File tree

src/EventLoop/Internal/AbstractDriver.php

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ abstract class AbstractDriver implements Driver
4848

4949
private readonly \Closure $interruptCallback;
5050
private readonly \Closure $queueCallback;
51+
/** @var \Closure(): (?\Closure(): mixed) */
5152
private readonly \Closure $runCallback;
5253

5354
private readonly \stdClass $internalSuspensionMarker;
@@ -87,13 +88,16 @@ public function __construct()
8788
/** @psalm-suppress InvalidArgument */
8889
$this->interruptCallback = $this->setInterrupt(...);
8990
$this->queueCallback = $this->queue(...);
90-
$this->runCallback = function () {
91-
if ($this->fiber->isTerminated()) {
92-
$this->createLoopFiber();
93-
}
91+
$this->runCallback =
92+
/** @return ?\Closure(): mixed */
93+
function (): ?\Closure {
94+
if ($this->fiber->isTerminated()) {
95+
$this->createLoopFiber();
96+
}
9497

95-
return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start();
96-
};
98+
// Returns a callback that returns the value of the {main} fiber, or null in case of deadlock.
99+
return $this->fiber->isStarted() ? $this->fiber->resume() : $this->fiber->start();
100+
};
97101
}
98102

99103
public function run(): void
@@ -533,26 +537,32 @@ private function createLoopFiber(): void
533537
{
534538
$this->fiber = new \Fiber(function (): void {
535539
$this->stopped = false;
540+
do {
541+
// Invoke microtasks if we have some
542+
$this->invokeCallbacks();
536543

537-
// Invoke microtasks if we have some
538-
$this->invokeCallbacks();
539-
540-
/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
541-
while (!$this->stopped) {
542-
if ($this->interrupt) {
543-
$this->invokeInterrupt();
544-
}
544+
/** @psalm-suppress RedundantCondition $this->stopped may be changed by $this->invokeCallbacks(). */
545+
while (!$this->stopped) {
546+
if ($this->interrupt) {
547+
$this->invokeInterrupt();
548+
}
545549

546-
if ($this->isEmpty()) {
547-
return;
548-
}
550+
if ($this->isEmpty()) {
551+
while (\gc_collect_cycles());
552+
if (!$this->microtaskQueue->isEmpty() || !$this->callbackQueue->isEmpty() || $this->interrupt) {
553+
continue 2;
554+
}
555+
return;
556+
}
549557

550-
$previousIdle = $this->idle;
551-
$this->idle = true;
558+
$previousIdle = $this->idle;
559+
$this->idle = true;
552560

553-
$this->tick($previousIdle);
554-
$this->invokeCallbacks();
555-
}
561+
$this->tick($previousIdle);
562+
$this->invokeCallbacks();
563+
}
564+
return;
565+
} while (true);
556566
});
557567
}
558568

src/EventLoop/Internal/DriverSuspension.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ final class DriverSuspension implements Suspension
2828
private bool $deadMain = false;
2929

3030
/**
31+
* @param \Closure(): (?\Closure(): mixed) $run
3132
* @param \WeakMap<object, \WeakReference<DriverSuspension>> $suspensions
3233
*/
3334
public function __construct(
@@ -145,6 +146,7 @@ public function suspend(): mixed
145146
throw new \Error('Event loop terminated without resuming the current suspension (the cause is either a fiber deadlock, or an incorrectly unreferenced/canceled watcher):' . $info);
146147
}
147148

149+
assert($result !== null);
148150
return $result();
149151
}
150152

test/EventLoopTest.php

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,130 @@
99

1010
class EventLoopTest extends TestCase
1111
{
12+
public function testSuspensionResumptionWithQueueInGarbageCollection(): void
13+
{
14+
$suspension = EventLoop::getSuspension();
15+
16+
$class = new class ($suspension) {
17+
public function __construct(public Suspension $suspension)
18+
{
19+
}
20+
public function __destruct()
21+
{
22+
$this->suspension->resume(true);
23+
}
24+
};
25+
$cycle = [$class, &$cycle];
26+
unset($class, $cycle);
27+
28+
$ended = $suspension->suspend();
29+
30+
$this->assertTrue($ended);
31+
}
32+
33+
public function testEventLoopResumptionWithQueueInGarbageCollection(): void
34+
{
35+
$suspension = EventLoop::getSuspension();
36+
37+
$class = new class ($suspension) {
38+
public function __construct(public Suspension $suspension)
39+
{
40+
}
41+
public function __destruct()
42+
{
43+
EventLoop::queue($this->suspension->resume(...), true);
44+
}
45+
};
46+
$cycle = [$class, &$cycle];
47+
unset($class, $cycle);
48+
49+
$ended = $suspension->suspend();
50+
51+
$this->assertTrue($ended);
52+
}
53+
54+
55+
public function testSuspensionResumptionWithQueueInGarbageCollectionNested(): void
56+
{
57+
$suspension = EventLoop::getSuspension();
58+
59+
$resumer = new class ($suspension) {
60+
public function __construct(public Suspension $suspension)
61+
{
62+
}
63+
public function __destruct()
64+
{
65+
$this->suspension->resume(true);
66+
}
67+
};
68+
69+
$class = new class ($resumer) {
70+
public static ?object $staticReference = null;
71+
public function __construct(object $resumer)
72+
{
73+
self::$staticReference = $resumer;
74+
}
75+
public function __destruct()
76+
{
77+
EventLoop::queue(function () {
78+
$class = self::$staticReference;
79+
$cycle = [$class, &$cycle];
80+
unset($class, $cycle);
81+
82+
self::$staticReference = null;
83+
});
84+
}
85+
};
86+
$cycle = [$class, &$cycle];
87+
unset($class, $resumer, $cycle);
88+
89+
90+
$ended = $suspension->suspend();
91+
92+
$this->assertTrue($ended);
93+
}
94+
95+
public function testEventLoopResumptionWithQueueInGarbageCollectionNested(): void
96+
{
97+
$suspension = EventLoop::getSuspension();
98+
99+
$resumer = new class ($suspension) {
100+
public function __construct(public Suspension $suspension)
101+
{
102+
}
103+
public function __destruct()
104+
{
105+
EventLoop::queue($this->suspension->resume(...), true);
106+
}
107+
};
108+
109+
$class = new class ($resumer) {
110+
public static ?object $staticReference = null;
111+
public function __construct(object $resumer)
112+
{
113+
self::$staticReference = $resumer;
114+
}
115+
public function __destruct()
116+
{
117+
EventLoop::queue(function () {
118+
$class = self::$staticReference;
119+
$cycle = [$class, &$cycle];
120+
unset($class, $cycle);
121+
122+
self::$staticReference = null;
123+
});
124+
}
125+
};
126+
$cycle = [$class, &$cycle];
127+
unset($class, $resumer, $cycle);
128+
129+
130+
$ended = $suspension->suspend();
131+
132+
$this->assertTrue($ended);
133+
}
134+
135+
12136
public function testDelayWithNegativeDelay(): void
13137
{
14138
$this->expectException(\Error::class);

0 commit comments

Comments
 (0)