From 9a4b2b1ac56d32206dc8ee94b2baf78e7cfa959b Mon Sep 17 00:00:00 2001 From: bota Date: Fri, 5 Sep 2025 11:25:24 +0300 Subject: [PATCH 1/2] Issue #59: dynamic stream flag and readable stream messages Signed-off-by: bota --- config/autoload/messenger.local.php.dist | 12 +-- .../Command/GetQueuedMessagesCommand.php | 92 +++++++++++++++++-- .../Command/GetQueuedMessagesCommandTest.php | 67 ++++++++++++-- 3 files changed, 150 insertions(+), 21 deletions(-) diff --git a/config/autoload/messenger.local.php.dist b/config/autoload/messenger.local.php.dist index 78a2c4a..a61d1e4 100644 --- a/config/autoload/messenger.local.php.dist +++ b/config/autoload/messenger.local.php.dist @@ -8,8 +8,8 @@ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer; return [ - "symfony" => [ - "messenger" => [ + 'symfony' => [ + 'messenger' => [ 'transports' => [ 'redis_transport' => [ 'dsn' => 'redis://127.0.0.1:6379/messages', @@ -31,10 +31,10 @@ return [ 'failure_transport' => 'failed', ], ], - "dependencies" => [ - "factories" => [ - "redis_transport" => [TransportFactory::class, 'redis_transport'], - "failed" => [TransportFactory::class, 'failed'], + 'dependencies' => [ + 'factories' => [ + 'redis_transport' => [TransportFactory::class, 'redis_transport'], + 'failed' => [TransportFactory::class, 'failed'], SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(), ], ], diff --git a/src/Swoole/Command/GetQueuedMessagesCommand.php b/src/Swoole/Command/GetQueuedMessagesCommand.php index ed039cd..4217c28 100644 --- a/src/Swoole/Command/GetQueuedMessagesCommand.php +++ b/src/Swoole/Command/GetQueuedMessagesCommand.php @@ -7,16 +7,28 @@ use Dot\DependencyInjection\Attribute\Inject; use Redis; use RedisException; +use ReflectionClass; use Symfony\Component\Console\Attribute\AsCommand; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use function count; +use function is_array; +use function is_string; +use function json_decode; use function json_encode; +use function method_exists; +use function preg_match; use function str_repeat; +use function stripcslashes; +use function unserialize; use const JSON_PRETTY_PRINT; +use const JSON_UNESCAPED_SLASHES; use const JSON_UNESCAPED_UNICODE; #[AsCommand( @@ -25,9 +37,7 @@ )] class GetQueuedMessagesCommand extends Command { - /** @var string $defaultName */ - protected static $defaultName = 'inventory'; - + protected static string $defaultName = 'inventory'; private Redis $redis; #[Inject('redis')] @@ -39,7 +49,8 @@ public function __construct(Redis $redis) protected function configure(): void { - $this->setDescription('Get all queued messages from Redis stream "messages"'); + $this->setDescription('Get all queued messages from Redis stream "messages"') + ->addOption('stream', null, InputOption::VALUE_REQUIRED, 'stream name'); } /** @@ -47,21 +58,84 @@ protected function configure(): void */ protected function execute(InputInterface $input, OutputInterface $output): int { - $entries = $this->redis->xRange('messages', '-', '+'); + $streamName = (string) $input->getOption('stream'); + $entries = $this->redis->xRange($streamName, '-', '+'); if (empty($entries)) { - $output->writeln('No messages queued found in Redis stream "messages".'); + $output->writeln("No messages queued found in Redis stream $streamName"); return Command::SUCCESS; } foreach ($entries as $id => $entry) { - $output->writeln("Message ID: $id"); - $output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE)); + $output->writeln("Message ID: $id"); + + foreach ($entry as $field => $value) { + $raw = is_string($value) ? $value : (string) $value; + + if (preg_match('/^s:\d+:\".*\";$/s', $raw)) { + $raw = @unserialize($raw, ['allowed_classes' => true]); + } + + $json = json_decode((string) $raw, true); + + if (is_array($json) && isset($json['body'])) { + $body = stripcslashes($json['body']); + $envelope = @unserialize($body, ['allowed_classes' => true]); + + if ($envelope instanceof Envelope) { + $message = $envelope->getMessage(); + $output->writeln(" $field:"); + $output->writeln(" Message Class: " . $message::class); + + $payload = null; + if (method_exists($message, 'getPayload')) { + $payload = $message->getPayload(); + } else { + try { + $refClass = new ReflectionClass($message); + if ($refClass->hasProperty('payload')) { + $prop = $refClass->getProperty('payload'); + $payload = $prop->getValue($message); + } + } catch (\Throwable $e) { + $payload = '[unavailable: ' . $e->getMessage() . ']'; + } + } + + if ($payload !== null) { + $output->writeln(" Payload:"); + $output->writeln( + json_encode( + $payload, + JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES + ) + ); + } + + $redeliveryStamps = $envelope->all(RedeliveryStamp::class); + if (! empty($redeliveryStamps)) { + $output->writeln(" Timestamps:"); + foreach ($redeliveryStamps as $stamp) { + if ($stamp instanceof RedeliveryStamp) { + $output->writeln(" - " + . $stamp->getRedeliveredAt()->format('Y-m-d H:i:s') + . " (retry count: " . $stamp->getRetryCount() . ")"); + } + } + } + } else { + $output->writeln(" $field: failed to unserialize envelope"); + } + } else { + $output->writeln(" $field: $raw"); + } + } + $output->writeln(str_repeat('-', 40)); } $total = count($entries); - $output->writeln("Total queued messages in stream 'messages': $total"); + $output->writeln("Total queued messages in stream $streamName: $total"); $output->writeln(str_repeat('-', 40)); return Command::SUCCESS; diff --git a/test/Swoole/Command/GetQueuedMessagesCommandTest.php b/test/Swoole/Command/GetQueuedMessagesCommandTest.php index 969842a..60fa213 100644 --- a/test/Swoole/Command/GetQueuedMessagesCommandTest.php +++ b/test/Swoole/Command/GetQueuedMessagesCommandTest.php @@ -7,6 +7,7 @@ use PHPUnit\Framework\MockObject\Exception; use PHPUnit\Framework\MockObject\MockObject; use PHPUnit\Framework\TestCase; +use Queue\App\Message\Message; use Queue\Swoole\Command\GetQueuedMessagesCommand; use Redis; use RedisException; @@ -14,9 +15,14 @@ use Symfony\Component\Console\Exception\ExceptionInterface; use Symfony\Component\Console\Input\ArrayInput; use Symfony\Component\Console\Output\BufferedOutput; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use function addslashes; use function array_keys; use function count; +use function json_encode; +use function serialize; class GetQueuedMessagesCommandTest extends TestCase { @@ -42,7 +48,7 @@ public function testExecuteWithNoMessages(): void ->willReturn([]); $command = new GetQueuedMessagesCommand($this->redisMock); - $input = new ArrayInput([]); + $input = new ArrayInput(['--stream' => 'messages']); $output = new BufferedOutput(); $exitCode = $command->run($input, $output); @@ -55,11 +61,11 @@ public function testExecuteWithNoMessages(): void /** * @throws ExceptionInterface */ - public function testExecuteWithMessages(): void + public function testExecuteWithSimpleMessages(): void { $fakeMessages = [ - '1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@dotkernel.com"}'], - '1691000000001-0' => ['type' => 'testSms', 'payload' => '{"to":"+123456789"}'], + '1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@example.com"}'], + '1691000000001-0' => ['type' => 'testEmail2', 'payload' => '{"to":"test@example2.com"}'], ]; $this->redisMock @@ -69,7 +75,7 @@ public function testExecuteWithMessages(): void ->willReturn($fakeMessages); $command = new GetQueuedMessagesCommand($this->redisMock); - $input = new ArrayInput([]); + $input = new ArrayInput(['--stream' => 'messages']); $output = new BufferedOutput(); $exitCode = $command->run($input, $output); @@ -97,10 +103,59 @@ public function testRedisThrowsException(): void ->willThrowException(new RedisException('Redis unavailable')); $command = new GetQueuedMessagesCommand($this->redisMock); - $input = new ArrayInput([]); + $input = new ArrayInput(['--stream' => 'messages']); $output = new BufferedOutput(); $this->expectException(RedisException::class); $command->run($input, $output); } + + public function testExecuteWithInvalidBody(): void + { + $invalidBodyJson = json_encode(['body' => 'not-a-serialized-envelope']); + + $this->redisMock + ->expects($this->once()) + ->method('xRange') + ->willReturn([ + '2-0' => ['body' => $invalidBodyJson], + ]); + + $command = new GetQueuedMessagesCommand($this->redisMock); + $input = new ArrayInput(['--stream' => 'messages']); + $output = new BufferedOutput(); + + $exitCode = $command->run($input, $output); + $outputText = $output->fetch(); + + $this->assertEquals(Command::SUCCESS, $exitCode); + $this->assertStringContainsString('failed to unserialize envelope', $outputText); + } + + public function testExecuteWithValidEnvelope(): void + { + $message = new Message(['foo' => 'bar']); + $envelope = new Envelope($message, [new RedeliveryStamp(1)]); + $serializedEnvelope = serialize($envelope); + $jsonBody = json_encode(['body' => addslashes($serializedEnvelope)]); + + $this->redisMock + ->expects($this->once()) + ->method('xRange') + ->willReturn([ + '100-0' => ['body' => $jsonBody], + ]); + + $command = new GetQueuedMessagesCommand($this->redisMock); + $input = new ArrayInput(['--stream' => 'messages']); + $output = new BufferedOutput(); + + $exitCode = $command->run($input, $output); + $outputText = $output->fetch(); + + $this->assertEquals(Command::SUCCESS, $exitCode); + $this->assertStringContainsString('Message Class', $outputText); + $this->assertStringContainsString('Payload', $outputText); + $this->assertStringContainsString('Timestamps', $outputText); + } } From 0dac6853c4696e23d8177bbb65b71ab0a73d582b Mon Sep 17 00:00:00 2001 From: bota Date: Fri, 5 Sep 2025 11:41:37 +0300 Subject: [PATCH 2/2] must not be defined error Signed-off-by: bota --- src/Swoole/Command/GetQueuedMessagesCommand.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Swoole/Command/GetQueuedMessagesCommand.php b/src/Swoole/Command/GetQueuedMessagesCommand.php index 4217c28..9a4a769 100644 --- a/src/Swoole/Command/GetQueuedMessagesCommand.php +++ b/src/Swoole/Command/GetQueuedMessagesCommand.php @@ -37,7 +37,9 @@ )] class GetQueuedMessagesCommand extends Command { - protected static string $defaultName = 'inventory'; + /** @var string $defaultName */ + protected static $defaultName = 'inventory'; + private Redis $redis; #[Inject('redis')]