forked from doppar/queue
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQueueRunCommand.php
More file actions
118 lines (99 loc) · 3.38 KB
/
QueueRunCommand.php
File metadata and controls
118 lines (99 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<?php
namespace Doppar\Queue\Commands;
use Phaseolies\Console\Schedule\Command;
use Doppar\Queue\QueueWorker;
use Doppar\Queue\QueueManager;
class QueueRunCommand extends Command
{
/**
* The name of the console command.
*
* @var string
*/
protected $name = 'queue:run {--queue=default} {--sleep=3} {--memory=128} {--timeout=0} {--limit=}';
/**
* The command description.
*
* @var string
*/
protected $description = 'Process jobs on the queue one by one';
/**
* The queue manager instance.
*
* @var QueueManager
*/
protected $manager;
/**
* The queue worker instance.
*
* @var QueueWorker
*/
protected $worker;
/**
* Create a new command instance.
*
* @param QueueManager $manager
*/
public function __construct(QueueManager $manager)
{
parent::__construct();
$this->manager = $manager;
$this->worker = new QueueWorker($manager);
}
/**
* Execute the console command.
* Example: php pool queue:run --queue=reports --sleep=10 --memory=1024 --timeout=0 --limit=
*
* @return int
*/
public function handle(): int
{
return $this->withTiming(function () {
$queue = $this->option('queue', 'default');
$sleep = (int) $this->option('sleep', 3);
$maxMemory = (int) $this->option('memory', 128);
$maxTimeInput = $this->option('timeout');
$maxTime = $maxTimeInput !== 0 ? (int) $maxTimeInput : -1;
$maxLimit = $this->option('limit');
$maxLimit = $maxLimit !== null ? (int) $maxLimit : null;
$this->displaySuccess("Starting queue worker on queue: {$queue}");
$configInfo = "Configuration: sleep={$sleep}s, memory={$maxMemory}MB, timeout=";
if ($maxTime > 0) {
$configInfo .= "{$maxTime}s";
} else {
$configInfo .= "unlimited";
}
if (!empty($maxLimit)) {
$configInfo .= ", limit={$maxLimit} jobs";
} else {
$configInfo .= ", limit=unlimited";
}
$this->info($configInfo);
try {
$this->worker->setOnJobProcessing(function ($job) {
$jobClass = get_class($job);
$jobId = $job->getJobId() ?? 'N/A';
$this->info("✔ Processing job [{$jobClass}] (ID: {$jobId})");
});
$this->worker->setOnJobProcessed(function ($job) {
$jobClass = get_class($job);
$jobId = $job->getJobId() ?? 'N/A';
$this->info("✔ Processed job [{$jobClass}] (ID: {$jobId})");
// Flush system output buffer
flush();
});
$this->worker->daemon($queue, [
'sleep' => $sleep,
'maxMemory' => $maxMemory,
'maxExecutionTime' => $maxTime,
'maxJobs' => $maxLimit,
]);
return Command::SUCCESS;
} catch (\Throwable $e) {
$this->error("Queue worker failed: " . $e->getMessage());
$this->error($e->getTraceAsString());
return Command::FAILURE;
}
}, 'Queue worker stopped gracefully.');
}
}