Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions config/autoload/messenger.local.php.dist
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface as SymfonySerializer;

return [
"symfony" => [
"messenger" => [
'symfony' => [
'messenger' => [
'transports' => [
'redis_transport' => [
'dsn' => 'redis://127.0.0.1:6379/messages',
Expand All @@ -31,10 +31,10 @@ return [
'failure_transport' => 'failed',
],
],
"dependencies" => [
"factories" => [
"redis_transport" => [TransportFactory::class, 'redis_transport'],
"failed" => [TransportFactory::class, 'failed'],
'dependencies' => [
'factories' => [
'redis_transport' => [TransportFactory::class, 'redis_transport'],
'failed' => [TransportFactory::class, 'failed'],
SymfonySerializer::class => fn(ContainerInterface $container) => new PhpSerializer(),
],
],
Expand Down
88 changes: 82 additions & 6 deletions src/Swoole/Command/GetQueuedMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,28 @@
use Dot\DependencyInjection\Attribute\Inject;
use Redis;
use RedisException;
use ReflectionClass;
use Symfony\Component\Console\Attribute\AsCommand;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;

use function count;
use function is_array;
use function is_string;
use function json_decode;
use function json_encode;
use function method_exists;
use function preg_match;
use function str_repeat;
use function stripcslashes;
use function unserialize;

use const JSON_PRETTY_PRINT;
use const JSON_UNESCAPED_SLASHES;
use const JSON_UNESCAPED_UNICODE;

#[AsCommand(
Expand All @@ -39,29 +51,93 @@ public function __construct(Redis $redis)

protected function configure(): void
{
$this->setDescription('Get all queued messages from Redis stream "messages"');
$this->setDescription('Get all queued messages from Redis stream "messages"')
->addOption('stream', null, InputOption::VALUE_REQUIRED, 'stream name');
}

/**
* @throws RedisException
*/
protected function execute(InputInterface $input, OutputInterface $output): int
{
$entries = $this->redis->xRange('messages', '-', '+');
$streamName = (string) $input->getOption('stream');
$entries = $this->redis->xRange($streamName, '-', '+');

if (empty($entries)) {
$output->writeln('<info>No messages queued found in Redis stream "messages".</info>');
$output->writeln("<info>No messages queued found in Redis stream $streamName</info>");
return Command::SUCCESS;
}

foreach ($entries as $id => $entry) {
$output->writeln("<info>Message ID:</info> $id");
$output->writeln(json_encode($entry, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE));
$output->writeln("<comment>Message ID:</comment> $id");

foreach ($entry as $field => $value) {
$raw = is_string($value) ? $value : (string) $value;

if (preg_match('/^s:\d+:\".*\";$/s', $raw)) {
$raw = @unserialize($raw, ['allowed_classes' => true]);
}

$json = json_decode((string) $raw, true);

if (is_array($json) && isset($json['body'])) {
$body = stripcslashes($json['body']);
$envelope = @unserialize($body, ['allowed_classes' => true]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ is a code smell ...
You should remove it


if ($envelope instanceof Envelope) {
$message = $envelope->getMessage();
$output->writeln(" <info>$field</info>:");
$output->writeln(" <comment>Message Class</comment>: " . $message::class);

$payload = null;
if (method_exists($message, 'getPayload')) {
$payload = $message->getPayload();
} else {
try {
$refClass = new ReflectionClass($message);
if ($refClass->hasProperty('payload')) {
$prop = $refClass->getProperty('payload');
$payload = $prop->getValue($message);
}
} catch (\Throwable $e) {
$payload = '[unavailable: ' . $e->getMessage() . ']';
}
}

if ($payload !== null) {
$output->writeln(" <comment>Payload</comment>:");
$output->writeln(
json_encode(
$payload,
JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES
)
);
}

$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
if (! empty($redeliveryStamps)) {
$output->writeln(" <comment>Timestamps</comment>:");
foreach ($redeliveryStamps as $stamp) {
if ($stamp instanceof RedeliveryStamp) {
$output->writeln(" - "
. $stamp->getRedeliveredAt()->format('Y-m-d H:i:s')
. " (retry count: " . $stamp->getRetryCount() . ")");
}
}
}
} else {
$output->writeln(" <info>$field</info>: failed to unserialize envelope");
}
} else {
$output->writeln(" <info>$field</info>: $raw");
}
}

$output->writeln(str_repeat('-', 40));
}

$total = count($entries);
$output->writeln("<info>Total queued messages in stream 'messages':</info> $total");
$output->writeln("<info>Total queued messages in stream $streamName:</info> $total");
$output->writeln(str_repeat('-', 40));

return Command::SUCCESS;
Expand Down
67 changes: 61 additions & 6 deletions test/Swoole/Command/GetQueuedMessagesCommandTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@
use PHPUnit\Framework\MockObject\Exception;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Queue\App\Message\Message;
use Queue\Swoole\Command\GetQueuedMessagesCommand;
use Redis;
use RedisException;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\ExceptionInterface;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\BufferedOutput;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;

use function addslashes;
use function array_keys;
use function count;
use function json_encode;
use function serialize;

class GetQueuedMessagesCommandTest extends TestCase
{
Expand All @@ -42,7 +48,7 @@ public function testExecuteWithNoMessages(): void
->willReturn([]);

$command = new GetQueuedMessagesCommand($this->redisMock);
$input = new ArrayInput([]);
$input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();

$exitCode = $command->run($input, $output);
Expand All @@ -55,11 +61,11 @@ public function testExecuteWithNoMessages(): void
/**
* @throws ExceptionInterface
*/
public function testExecuteWithMessages(): void
public function testExecuteWithSimpleMessages(): void
{
$fakeMessages = [
'1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@dotkernel.com"}'],
'1691000000001-0' => ['type' => 'testSms', 'payload' => '{"to":"+123456789"}'],
'1691000000000-0' => ['type' => 'testEmail', 'payload' => '{"to":"test@example.com"}'],
'1691000000001-0' => ['type' => 'testEmail2', 'payload' => '{"to":"test@example2.com"}'],
];

$this->redisMock
Expand All @@ -69,7 +75,7 @@ public function testExecuteWithMessages(): void
->willReturn($fakeMessages);

$command = new GetQueuedMessagesCommand($this->redisMock);
$input = new ArrayInput([]);
$input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();

$exitCode = $command->run($input, $output);
Expand Down Expand Up @@ -97,10 +103,59 @@ public function testRedisThrowsException(): void
->willThrowException(new RedisException('Redis unavailable'));

$command = new GetQueuedMessagesCommand($this->redisMock);
$input = new ArrayInput([]);
$input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();

$this->expectException(RedisException::class);
$command->run($input, $output);
}

public function testExecuteWithInvalidBody(): void
{
$invalidBodyJson = json_encode(['body' => 'not-a-serialized-envelope']);

$this->redisMock
->expects($this->once())
->method('xRange')
->willReturn([
'2-0' => ['body' => $invalidBodyJson],
]);

$command = new GetQueuedMessagesCommand($this->redisMock);
$input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();

$exitCode = $command->run($input, $output);
$outputText = $output->fetch();

$this->assertEquals(Command::SUCCESS, $exitCode);
$this->assertStringContainsString('failed to unserialize envelope', $outputText);
}

public function testExecuteWithValidEnvelope(): void
{
$message = new Message(['foo' => 'bar']);
$envelope = new Envelope($message, [new RedeliveryStamp(1)]);
$serializedEnvelope = serialize($envelope);
$jsonBody = json_encode(['body' => addslashes($serializedEnvelope)]);

$this->redisMock
->expects($this->once())
->method('xRange')
->willReturn([
'100-0' => ['body' => $jsonBody],
]);

$command = new GetQueuedMessagesCommand($this->redisMock);
$input = new ArrayInput(['--stream' => 'messages']);
$output = new BufferedOutput();

$exitCode = $command->run($input, $output);
$outputText = $output->fetch();

$this->assertEquals(Command::SUCCESS, $exitCode);
$this->assertStringContainsString('Message Class', $outputText);
$this->assertStringContainsString('Payload', $outputText);
$this->assertStringContainsString('Timestamps', $outputText);
}
}
Loading