-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDefaultWorker.php
More file actions
125 lines (97 loc) · 3.82 KB
/
DefaultWorker.php
File metadata and controls
125 lines (97 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
<?php
declare(strict_types=1);
namespace Patchlevel\Worker;
use Closure;
use Patchlevel\Worker\Event\WorkerRunningEvent;
use Patchlevel\Worker\Event\WorkerStartedEvent;
use Patchlevel\Worker\Event\WorkerStoppedEvent;
use Patchlevel\Worker\Listener\StopWorkerOnIterationLimitListener;
use Patchlevel\Worker\Listener\StopWorkerOnMemoryLimitListener;
use Patchlevel\Worker\Listener\StopWorkerOnSigtermSignalListener;
use Patchlevel\Worker\Listener\StopWorkerOnTimeLimitListener;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use function max;
use function microtime;
use function round;
use function usleep;
final class DefaultWorker implements Worker
{
private bool $shouldStop = false;
/** @param Closure(Closure):void $job */
public function __construct(
private readonly Closure $job,
private readonly EventDispatcherInterface $eventDispatcher,
private readonly LoggerInterface|null $logger = null,
) {
}
/** @param positive-int|0 $sleepTimer in milliseconds */
public function run(int $sleepTimer = 1000): void
{
$this->logger?->debug('Worker starting');
$this->eventDispatcher->dispatch(new WorkerStartedEvent($this));
while (!$this->shouldStop) {
$this->logger?->debug('Worker starting job run');
$startTime = (int)round(microtime(true) * 1000);
($this->job)($this->stop(...));
$endTime = (int)round(microtime(true) * 1000);
$ranTime = $endTime - $startTime;
$this->logger?->debug('Worker finished job run ({ranTime}ms)', ['ranTime' => $ranTime]);
$this->eventDispatcher->dispatch(new WorkerRunningEvent($this));
if ($this->shouldStop) {
break;
}
$sleepFor = max($sleepTimer - $ranTime, 0);
if ($sleepFor <= 0) {
continue;
}
$this->logger?->debug('Worker sleep for {sleepTimer}ms', ['sleepTimer' => $sleepFor]);
usleep($sleepFor * 1000);
}
$this->logger?->debug('Worker stopped');
$this->eventDispatcher->dispatch(new WorkerStoppedEvent($this));
$this->logger?->debug('Worker terminated');
}
public function stop(): void
{
$this->logger?->debug('Worker received stop signal');
$this->shouldStop = true;
}
/**
* @param Closure(Closure):void $job
* @param array{runLimit?: (positive-int|null), memoryLimit?: (string|null), timeLimit?: (positive-int|null)} $options
*/
public static function create(
Closure $job,
array $options = [],
LoggerInterface $logger = new NullLogger(),
EventDispatcherInterface|null $eventDispatcher = null,
): self {
if ($eventDispatcher === null) {
$eventDispatcher = new EventDispatcher();
}
$eventDispatcher->addSubscriber(new StopWorkerOnSigtermSignalListener($logger));
if (isset($options['runLimit'])) {
$eventDispatcher->addSubscriber(
new StopWorkerOnIterationLimitListener($options['runLimit'], $logger),
);
}
if (isset($options['memoryLimit'])) {
$eventDispatcher->addSubscriber(
new StopWorkerOnMemoryLimitListener(Bytes::parseFromString($options['memoryLimit']), $logger),
);
}
if (isset($options['timeLimit'])) {
$eventDispatcher->addSubscriber(
new StopWorkerOnTimeLimitListener($options['timeLimit'], $logger),
);
}
return new self(
$job,
$eventDispatcher,
$logger,
);
}
}