Skip to content

Commit 85efcfc

Browse files
FRW-11110 Queue workers logging improvements. (#22)
FRW-11110 Queue workers logging improvements.
1 parent b0d90a8 commit 85efcfc

15 files changed

Lines changed: 1026 additions & 137 deletions
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/**
4+
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5+
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6+
*/
7+
8+
namespace Spryker\Zed\Queue\Business\Logger;
9+
10+
use Monolog\Formatter\FormatterInterface;
11+
12+
class QueueErrorLogFormatter implements FormatterInterface
13+
{
14+
/**
15+
* @param array $record
16+
*
17+
* @return string
18+
*/
19+
public function format(array $record): string
20+
{
21+
return $record['message'] ?? '';
22+
}
23+
24+
/**
25+
* @param array<array> $records
26+
*
27+
* @return array<string>
28+
*/
29+
public function formatBatch(array $records): array
30+
{
31+
$formatted = [];
32+
33+
foreach ($records as $record) {
34+
$formatted[] = $this->format($record);
35+
}
36+
37+
return $formatted;
38+
}
39+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
<?php
2+
3+
/**
4+
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5+
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6+
*/
7+
8+
namespace Spryker\Zed\Queue\Business\Logger;
9+
10+
use Psr\Log\LoggerInterface;
11+
12+
class QueueErrorLogger implements QueueErrorLoggerInterface
13+
{
14+
/**
15+
* @var string
16+
*/
17+
protected const MESSAGE_BODY_KEY_ERROR_MESSAGE = 'errorMessage';
18+
19+
/**
20+
* @var string
21+
*/
22+
protected const COLOR_YELLOW = "\033[33m";
23+
24+
/**
25+
* @var string
26+
*/
27+
protected const COLOR_RED = "\033[31m";
28+
29+
/**
30+
* @var string
31+
*/
32+
protected const COLOR_RESET = "\033[0m";
33+
34+
/**
35+
* @param \Psr\Log\LoggerInterface $logger
36+
*/
37+
public function __construct(protected LoggerInterface $logger)
38+
{
39+
}
40+
41+
/**
42+
* @param string $queueName
43+
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
44+
*
45+
* @return void
46+
*/
47+
public function logFailedMessages(string $queueName, array $messages): void
48+
{
49+
$failedMessages = $this->filterFailedMessages($messages);
50+
51+
if ($failedMessages === []) {
52+
return;
53+
}
54+
55+
$this->outputToStderr($queueName, $failedMessages);
56+
}
57+
58+
/**
59+
* @param string $queueName
60+
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $failedMessages
61+
*
62+
* @return void
63+
*/
64+
protected function outputToStderr(string $queueName, array $failedMessages): void
65+
{
66+
$output = $this->formatHeader($queueName, count($failedMessages));
67+
$output .= $this->formatMessages($failedMessages);
68+
69+
$this->logger->error($output);
70+
}
71+
72+
/**
73+
* @param string $queueName
74+
* @param int $failedMessagesCount
75+
*
76+
* @return string
77+
*/
78+
protected function formatHeader(string $queueName, int $failedMessagesCount): string
79+
{
80+
return sprintf(
81+
'%s%s[Queue Message Errors]%s%s' .
82+
'Queue: %s%s%s | Failed messages: %s%d%s%s%s',
83+
PHP_EOL,
84+
static::COLOR_YELLOW,
85+
static::COLOR_RESET,
86+
PHP_EOL,
87+
static::COLOR_YELLOW,
88+
$queueName,
89+
static::COLOR_RESET,
90+
static::COLOR_RED,
91+
$failedMessagesCount,
92+
static::COLOR_RESET,
93+
PHP_EOL,
94+
PHP_EOL,
95+
);
96+
}
97+
98+
/**
99+
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $failedMessages
100+
*
101+
* @return string
102+
*/
103+
protected function formatMessages(array $failedMessages): string
104+
{
105+
$output = '';
106+
107+
foreach ($failedMessages as $index => $messageTransfer) {
108+
$queueMessage = $messageTransfer->getQueueMessage();
109+
110+
if ($queueMessage === null) {
111+
continue;
112+
}
113+
114+
$output .= $this->formatSingleMessage($index + 1, $queueMessage->getBody());
115+
}
116+
117+
return $output;
118+
}
119+
120+
/**
121+
* @param int $messageNumber
122+
* @param string $body
123+
*
124+
* @return string
125+
*/
126+
protected function formatSingleMessage(int $messageNumber, string $body): string
127+
{
128+
$output = sprintf('Message #%d:%s', $messageNumber, PHP_EOL);
129+
$bodyData = json_decode($body, true);
130+
131+
if (!is_array($bodyData) || !isset($bodyData[static::MESSAGE_BODY_KEY_ERROR_MESSAGE])) {
132+
return $output . sprintf(' Body: %s%s%s', $body, PHP_EOL, PHP_EOL);
133+
}
134+
135+
$errorMessage = $bodyData[static::MESSAGE_BODY_KEY_ERROR_MESSAGE];
136+
unset($bodyData[static::MESSAGE_BODY_KEY_ERROR_MESSAGE]);
137+
138+
$output .= $this->formatMessageBody($bodyData);
139+
$output .= $this->formatErrorMessage($errorMessage);
140+
141+
return $output;
142+
}
143+
144+
/**
145+
* @param array $bodyData
146+
*
147+
* @return string
148+
*/
149+
protected function formatMessageBody(array $bodyData): string
150+
{
151+
$bodyWithoutError = json_encode($bodyData, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE);
152+
153+
return sprintf(' Body: %s%s%s', $bodyWithoutError, PHP_EOL, PHP_EOL);
154+
}
155+
156+
/**
157+
* @param string $errorMessage
158+
*
159+
* @return string
160+
*/
161+
protected function formatErrorMessage(string $errorMessage): string
162+
{
163+
return sprintf(
164+
' Error: %s%s%s%s%s',
165+
static::COLOR_RED,
166+
$errorMessage,
167+
static::COLOR_RESET,
168+
PHP_EOL,
169+
PHP_EOL,
170+
);
171+
}
172+
173+
/**
174+
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
175+
*
176+
* @return array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer>
177+
*/
178+
protected function filterFailedMessages(array $messages): array
179+
{
180+
$failedMessages = [];
181+
182+
foreach ($messages as $message) {
183+
if (!$message->getHasError()) {
184+
continue;
185+
}
186+
187+
$failedMessages[] = $message;
188+
}
189+
190+
return $failedMessages;
191+
}
192+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/**
4+
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5+
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6+
*/
7+
8+
namespace Spryker\Zed\Queue\Business\Logger;
9+
10+
interface QueueErrorLoggerInterface
11+
{
12+
/**
13+
* @param string $queueName
14+
* @param array<\Generated\Shared\Transfer\QueueReceiveMessageTransfer> $messages
15+
*
16+
* @return void
17+
*/
18+
public function logFailedMessages(string $queueName, array $messages): void;
19+
}

src/Spryker/Zed/Queue/Business/Process/ProcessManager.php

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ class ProcessManager implements ProcessManagerInterface
2929
*/
3030
protected $serverUniqueId;
3131

32+
/**
33+
* @var array<string>
34+
*/
35+
protected $errorBuffer = [];
36+
3237
/**
3338
* @var array<string, int>
3439
*/
@@ -53,7 +58,10 @@ public function __construct(QueueQueryContainerInterface $queryContainer, $serve
5358
public function triggerQueueProcess($command, $queue)
5459
{
5560
$process = $this->createProcess($command);
56-
$process->start();
61+
62+
$process->start(function ($type, $buffer) {
63+
$this->forwardOutputWithoutBootstrapInfo($buffer);
64+
});
5765

5866
if ($process->isRunning()) {
5967
$queueProcessTransfer = $this->createQueueProcessTransfer($queue, $process->getPid());
@@ -89,6 +97,30 @@ public function getBusyProcessNumber($queueName)
8997
return $busyProcessIndex;
9098
}
9199

100+
/**
101+
* Get running process PIDs for queue
102+
*
103+
* @param string $queueName
104+
*
105+
* @return array<int>
106+
*/
107+
public function getRunningProcessPids(string $queueName): array
108+
{
109+
/** @var array<int> $processIds */
110+
$processIds = $this->queryContainer
111+
->queryProcessesByServerIdAndQueueName($this->serverUniqueId, $queueName)
112+
->find();
113+
114+
$runningPids = [];
115+
foreach ($processIds as $processId) {
116+
if ($this->isProcessRunning($processId)) {
117+
$runningPids[] = $processId;
118+
}
119+
}
120+
121+
return $runningPids;
122+
}
123+
92124
/**
93125
* @return void
94126
*/
@@ -104,6 +136,49 @@ public function flushIdleProcesses()
104136
}
105137
}
106138

139+
/**
140+
* @return array<string>
141+
*/
142+
public function flushErrorBuffer(): array
143+
{
144+
$errors = $this->errorBuffer;
145+
$this->errorBuffer = [];
146+
147+
return $errors;
148+
}
149+
150+
/**
151+
* Capture process output to error buffer, filtering out console bootstrap info
152+
*
153+
* @param string $buffer
154+
*
155+
* @return void
156+
*/
157+
protected function forwardOutputWithoutBootstrapInfo(string $buffer): void
158+
{
159+
$lines = explode("\n", $buffer);
160+
161+
foreach ($lines as $line) {
162+
if ($line === '' || $this->isConsoleBootstrapInfo($line)) {
163+
continue;
164+
}
165+
166+
$this->errorBuffer[] = $line;
167+
}
168+
}
169+
170+
/**
171+
* @param string $line
172+
*
173+
* @return bool
174+
*/
175+
protected function isConsoleBootstrapInfo(string $line): bool
176+
{
177+
return str_contains($line, 'Region:')
178+
&& str_contains($line, 'Code bucket')
179+
&& str_contains($line, 'Environment:');
180+
}
181+
107182
/**
108183
* @return void
109184
*/
@@ -225,12 +300,6 @@ protected function deleteProcesses(array $processIds)
225300
*/
226301
protected function createProcess($command)
227302
{
228-
// Shim for Symfony 3.x, to be removed when Symfony dependency becomes 4.2+
229-
if (!method_exists(Process::class, 'fromShellCommandline')) {
230-
//@phpstan-ignore-next-line
231-
return new Process($command);
232-
}
233-
234303
return Process::fromShellCommandline($command, APPLICATION_ROOT_DIR);
235304
}
236305

src/Spryker/Zed/Queue/Business/Process/ProcessManagerInterface.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,16 @@ public function flushAllWorkerProcesses(): void;
4040
* @return bool
4141
*/
4242
public function isProcessRunning($processId);
43+
44+
/**
45+
* @param string $queueName
46+
*
47+
* @return array<int>
48+
*/
49+
public function getRunningProcessPids(string $queueName): array;
50+
51+
/**
52+
* @return array<string>
53+
*/
54+
public function flushErrorBuffer(): array;
4355
}

0 commit comments

Comments
 (0)