diff --git a/config/autoload/messenger.local.php.dist b/config/autoload/messenger.local.php.dist
index 78a2c4a..a61d1e4 100644
--- a/config/autoload/messenger.local.php.dist
+++ b/config/autoload/messenger.local.php.dist
@@ -8,8 +8,8 @@ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;
return [
- "symfony" => [
- "messenger" => [
+ 'symfony' => [
+ 'messenger' => [
'transports' => [
'redis_transport' => [
'dsn' => 'redis://127.0.0.1:6379/messages',
@@ -31,10 +31,10 @@ return [
'failure_transport' => 'failed',
],
],
- "dependencies" => [
- "factories" => [
- "redis_transport" => [TransportFactory::class, 'redis_transport'],
- "failed" => [TransportFactory::class, 'failed'],
+ 'dependencies' => [
+ 'factories' => [
+ 'redis_transport' => [TransportFactory::class, 'redis_transport'],
+ 'failed' => [TransportFactory::class, 'failed'],
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
],
],
diff --git a/src/Swoole/Command/GetQueuedMessagesCommand.php b/src/Swoole/Command/GetQueuedMessagesCommand.php
index ed039cd..9a4a769 100644
--- a/src/Swoole/Command/GetQueuedMessagesCommand.php
+++ b/src/Swoole/Command/GetQueuedMessagesCommand.php
@@ -7,16 +7,28 @@
use Dot\DependencyInjection\Attribute\Inject;
use Redis;
use RedisException;
+use ReflectionClass;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use function count;
+use function is_array;
+use function is_string;
+use function json_decode;
use function json_encode;
+use function method_exists;
+use function preg_match;
use function str_repeat;
+use function stripcslashes;
+use function unserialize;
use const JSON_PRETTY_PRINT;
+use const JSON_UNESCAPED_SLASHES;
use const JSON_UNESCAPED_UNICODE;
#[AsCommand(
@@ -39,7 +51,8 @@ public function __construct(Redis $redis)
protected function configure(): void
{
- $this->setDescription('Get all queued messages from Redis stream "messages"');
+ $this->setDescription('Get all queued messages from Redis stream "messages"')
+ ->addOption('stream', null, InputOption::VALUE_REQUIRED, 'stream name');
}
/**
@@ -47,21 +60,84 @@ protected function configure(): void
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
- $entries = $this->redis->xRange('messages', '-', '+');
+ $streamName = (string) $input->getOption('stream');
+ $entries = $this->redis->xRange($streamName, '-', '+');
if (empty($entries)) {
- $output->writeln('No messages queued found in Redis stream "messages".');
+ $output->writeln("No messages queued found in Redis stream $streamName");
return Command::SUCCESS;
}
foreach ($entries as $id => $entry) {
- $output->writeln("Message ID: $id");
- $output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
+ $output->writeln("Message ID: $id");
+
+ foreach ($entry as $field => $value) {
+ $raw = is_string($value) ? $value : (string) $value;
+
+ if (preg_match('/^s:\d+:\".*\";$/s', $raw)) {
+ $raw = @unserialize($raw, ['allowed_classes' => true]);
+ }
+
+ $json = json_decode((string) $raw, true);
+
+ if (is_array($json) && isset($json['body'])) {
+ $body = stripcslashes($json['body']);
+ $envelope = @unserialize($body, ['allowed_classes' => true]);
+
+ if ($envelope instanceof Envelope) {
+ $message = $envelope->getMessage();
+ $output->writeln(" $field:");
+ $output->writeln(" Message Class: " . $message::class);
+
+ $payload = null;
+ if (method_exists($message, 'getPayload')) {
+ $payload = $message->getPayload();
+ } else {
+ try {
+ $refClass = new ReflectionClass($message);
+ if ($refClass->hasProperty('payload')) {
+ $prop = $refClass->getProperty('payload');
+ $payload = $prop->getValue($message);
+ }
+ } catch (\Throwable $e) {
+ $payload = '[unavailable: ' . $e->getMessage() . ']';
+ }
+ }
+
+ if ($payload !== null) {
+ $output->writeln(" Payload:");
+ $output->writeln(
+ json_encode(
+ $payload,
+ JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
+ )
+ );
+ }
+
+ $redeliveryStamps = $envelope->all(RedeliveryStamp::class);
+ if (! empty($redeliveryStamps)) {
+ $output->writeln(" Timestamps:");
+ foreach ($redeliveryStamps as $stamp) {
+ if ($stamp instanceof RedeliveryStamp) {
+ $output->writeln(" - "
+ . $stamp->getRedeliveredAt()->format('Y-m-d H:i:s')
+ . " (retry count: " . $stamp->getRetryCount() . ")");
+ }
+ }
+ }
+ } else {
+ $output->writeln(" $field: failed to unserialize envelope");
+ }
+ } else {
+ $output->writeln(" $field: $raw");
+ }
+ }
+
$output->writeln(str_repeat('-', 40));
}
$total = count($entries);
- $output->writeln("Total queued messages in stream 'messages': $total");
+ $output->writeln("Total queued messages in stream $streamName: $total");
$output->writeln(str_repeat('-', 40));
return Command::SUCCESS;
diff --git a/test/Swoole/Command/GetQueuedMessagesCommandTest.php b/test/Swoole/Command/GetQueuedMessagesCommandTest.php
index 969842a..60fa213 100644
--- a/test/Swoole/Command/GetQueuedMessagesCommandTest.php
+++ b/test/Swoole/Command/GetQueuedMessagesCommandTest.php
@@ -7,6 +7,7 @@
use PHPUnit\Framework\MockObject\Exception;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
+use Queue\App\Message\Message;
use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Redis;
use RedisException;
@@ -14,9 +15,14 @@
use Symfony\Component\Console\Exception\ExceptionInterface;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\BufferedOutput;
+use Symfony\Component\Messenger\Envelope;
+use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
+use function addslashes;
use function array_keys;
use function count;
+use function json_encode;
+use function serialize;
class GetQueuedMessagesCommandTest extends TestCase
{
@@ -42,7 +48,7 @@ public function testExecuteWithNoMessages(): void
->willReturn([]);
$command = new GetQueuedMessagesCommand($this->redisMock);
- $input = new ArrayInput([]);
+ $input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();
$exitCode = $command->run($input, $output);
@@ -55,11 +61,11 @@ public function testExecuteWithNoMessages(): void
/**
* @throws ExceptionInterface
*/
- public function testExecuteWithMessages(): void
+ public function testExecuteWithSimpleMessages(): void
{
$fakeMessages = [
- '1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@dotkernel.com"}'],
- '1691000000001-0' => ['type' => 'testSms', 'payload' => '{"to":"+123456789"}'],
+ '1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@example.com"}'],
+ '1691000000001-0' => ['type' => 'testEmail2', 'payload' => '{"to":"test@example2.com"}'],
];
$this->redisMock
@@ -69,7 +75,7 @@ public function testExecuteWithMessages(): void
->willReturn($fakeMessages);
$command = new GetQueuedMessagesCommand($this->redisMock);
- $input = new ArrayInput([]);
+ $input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();
$exitCode = $command->run($input, $output);
@@ -97,10 +103,59 @@ public function testRedisThrowsException(): void
->willThrowException(new RedisException('Redis unavailable'));
$command = new GetQueuedMessagesCommand($this->redisMock);
- $input = new ArrayInput([]);
+ $input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();
$this->expectException(RedisException::class);
$command->run($input, $output);
}
+
+ public function testExecuteWithInvalidBody(): void
+ {
+ $invalidBodyJson = json_encode(['body' => 'not-a-serialized-envelope']);
+
+ $this->redisMock
+ ->expects($this->once())
+ ->method('xRange')
+ ->willReturn([
+ '2-0' => ['body' => $invalidBodyJson],
+ ]);
+
+ $command = new GetQueuedMessagesCommand($this->redisMock);
+ $input = new ArrayInput(['--stream' => 'messages']);
+ $output = new BufferedOutput();
+
+ $exitCode = $command->run($input, $output);
+ $outputText = $output->fetch();
+
+ $this->assertEquals(Command::SUCCESS, $exitCode);
+ $this->assertStringContainsString('failed to unserialize envelope', $outputText);
+ }
+
+ public function testExecuteWithValidEnvelope(): void
+ {
+ $message = new Message(['foo' => 'bar']);
+ $envelope = new Envelope($message, [new RedeliveryStamp(1)]);
+ $serializedEnvelope = serialize($envelope);
+ $jsonBody = json_encode(['body' => addslashes($serializedEnvelope)]);
+
+ $this->redisMock
+ ->expects($this->once())
+ ->method('xRange')
+ ->willReturn([
+ '100-0' => ['body' => $jsonBody],
+ ]);
+
+ $command = new GetQueuedMessagesCommand($this->redisMock);
+ $input = new ArrayInput(['--stream' => 'messages']);
+ $output = new BufferedOutput();
+
+ $exitCode = $command->run($input, $output);
+ $outputText = $output->fetch();
+
+ $this->assertEquals(Command::SUCCESS, $exitCode);
+ $this->assertStringContainsString('Message Class', $outputText);
+ $this->assertStringContainsString('Payload', $outputText);
+ $this->assertStringContainsString('Timestamps', $outputText);
+ }
}