From 3ce6055d75fa23d6c6f307d9ff97ceb6b6cb03c2 Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Wed, 17 Dec 2025 14:33:40 +0100 Subject: [PATCH 1/7] feat: initial sub-processor support --- phpunit.xml.dist | 2 +- src/Command/SubprocessJobRunnerCommand.php | 182 +++++++ src/Command/WorkerCommand.php | 24 +- src/Queue/SubprocessProcessor.php | 286 ++++++++++ src/QueuePlugin.php | 5 + .../SubprocessJobRunnerCommandTest.php | 255 +++++++++ tests/TestCase/Command/WorkerCommandTest.php | 66 +++ .../Mailer/Transport/QueueTransportTest.php | 14 +- tests/TestCase/PluginTest.php | 8 +- .../Queue/SubprocessProcessorTest.php | 505 ++++++++++++++++++ tests/TestCase/QueueManagerTest.php | 2 +- tests/bootstrap.php | 7 + tests/test_app/bin/cake | 75 +++ tests/test_app/bin/cake.php | 13 + tests/test_app/src/Application.php | 18 + 15 files changed, 1445 insertions(+), 17 deletions(-) create mode 100644 src/Command/SubprocessJobRunnerCommand.php create mode 100644 src/Queue/SubprocessProcessor.php create mode 100644 tests/TestCase/Command/SubprocessJobRunnerCommandTest.php create mode 100644 tests/TestCase/Queue/SubprocessProcessorTest.php create mode 100755 tests/test_app/bin/cake create mode 100644 tests/test_app/bin/cake.php diff --git a/phpunit.xml.dist b/phpunit.xml.dist index fa7442a..dfeea09 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -3,7 +3,7 @@ colors="true" cacheDirectory=".phpunit.cache" bootstrap="tests/bootstrap.php" - xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.1/phpunit.xsd"> + xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/12.0/phpunit.xsd"> tests/TestCase diff --git a/src/Command/SubprocessJobRunnerCommand.php b/src/Command/SubprocessJobRunnerCommand.php new file mode 100644 index 0000000..e6559c8 --- /dev/null +++ b/src/Command/SubprocessJobRunnerCommand.php @@ -0,0 +1,182 @@ +readInput($io); + + if (empty($input)) { + $this->outputResult($io, [ + 'success' => false, + 'error' => 'No input received', + ]); + + return self::CODE_ERROR; + } + + $data = json_decode($input, true); + if (json_last_error() !== JSON_ERROR_NONE) { + $this->outputResult($io, [ + 'success' => false, + 'error' => 'Invalid JSON input: ' . json_last_error_msg(), + ]); + + return self::CODE_ERROR; + } + + try { + $result = $this->executeJob($data); + $this->outputResult($io, [ + 'success' => true, + 'result' => $result, + ]); + + return self::CODE_SUCCESS; + } catch (Throwable $throwable) { + $this->outputResult($io, [ + 'success' => false, + 'result' => InteropProcessor::REQUEUE, + 'exception' => [ + 'class' => get_class($throwable), + 'message' => $throwable->getMessage(), + 'code' => $throwable->getCode(), + 'file' => $throwable->getFile(), + 'line' => $throwable->getLine(), + 'trace' => $throwable->getTraceAsString(), + ], + ]); + + return self::CODE_SUCCESS; + } + } + + /** + * Read input from STDIN or ConsoleIo + * + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @return string + */ + protected function readInput(ConsoleIo $io): string + { + $input = ''; + while (!feof(STDIN)) { + $chunk = fread(STDIN, 8192); + if ($chunk === false) { + break; + } + + $input .= $chunk; + if ($input !== '' && strlen($chunk) < 8192) { + break; + } + } + + return $input; + } + + /** + * Execute the job with the provided data. + * + * @param array $data Job data + * @return string + */ + protected function executeJob(array $data): string + { + $connectionFactory = new NullConnectionFactory(); + $context = $connectionFactory->createContext(); + + $messageClass = $data['messageClass'] ?? NullMessage::class; + $messageBody = json_encode($data['body']); + + /** @var \Interop\Queue\Message $queueMessage */ + $queueMessage = new $messageClass($messageBody); + + if (isset($data['properties']) && is_array($data['properties'])) { + foreach ($data['properties'] as $key => $value) { + $queueMessage->setProperty($key, $value); + } + } + + $message = new Message($queueMessage, $context, $this->container); + $processor = new Processor(new NullLogger(), $this->container); + + $result = $processor->processMessage($message); + + // Result is string|object (with __toString) + /** @phpstan-ignore cast.string */ + return is_string($result) ? $result : (string)$result; + } + + /** + * Output result as JSON to STDOUT. + * + * @param \Cake\Console\ConsoleIo $io ConsoleIo + * @param array $result Result data + * @return void + */ + protected function outputResult(ConsoleIo $io, array $result): void + { + $json = json_encode($result); + if ($json !== false) { + $io->out($json); + } + } +} diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 44605d1..13cecb6 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -28,6 +28,7 @@ use Cake\Queue\Consumption\RemoveUniqueJobIdFromCacheExtension; use Cake\Queue\Listener\FailedJobsListener; use Cake\Queue\Queue\Processor; +use Cake\Queue\Queue\SubprocessProcessor; use Cake\Queue\QueueManager; use DateTime; use Enqueue\Consumption\ChainExtension; @@ -105,6 +106,11 @@ public function getOptionParser(): ConsoleOptionParser 'default' => null, 'short' => 'a', ]); + $parser->addOption('subprocess', [ + 'help' => 'Execute jobs in a subprocess. Useful for development to reload code for each job.', + 'boolean' => true, + 'default' => false, + ]); $parser->setDescription( 'Runs a queue worker that consumes from the named queue.', ); @@ -191,7 +197,23 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $this->abort(); } - return new $processorClass($logger, $this->container); + $processor = new $processorClass($logger, $this->container); + + if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) { + $subprocessConfig = array_merge( + $config['subprocess'] ?? [], + ['enabled' => true], + ); + + if (!($processor instanceof Processor)) { + $io->error('Subprocess mode is only supported with the default Processor class'); + $this->abort(); + } + + $processor = new SubprocessProcessor($logger, $subprocessConfig, $this->container); + } + + return $processor; } /** diff --git a/src/Queue/SubprocessProcessor.php b/src/Queue/SubprocessProcessor.php new file mode 100644 index 0000000..a1fefa9 --- /dev/null +++ b/src/Queue/SubprocessProcessor.php @@ -0,0 +1,286 @@ + $config Subprocess configuration + * @param \Cake\Core\ContainerInterface|null $container DI container instance + */ + public function __construct( + LoggerInterface $logger, + protected readonly array $config = [], + ?ContainerInterface $container = null, + ) { + parent::__construct($logger, $container); + } + + /** + * Process a message in a subprocess. + * Overrides parent to execute in subprocess, but reuses parent's event dispatching. + * + * @param \Interop\Queue\Message $message Message. + * @param \Interop\Queue\Context $context Context. + * @return object|string with __toString method implemented + */ + public function process(QueueMessage $message, Context $context): string|object + { + $this->dispatchEvent('Processor.message.seen', ['queueMessage' => $message]); + + $jobMessage = new Message($message, $context, $this->container); + try { + $jobMessage->getCallable(); + } catch (RuntimeException | Error $e) { + $this->logger->debug('Invalid callable for message. Rejecting message from queue.'); + $this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]); + + return InteropProcessor::REJECT; + } + + $startTime = microtime(true) * 1000; + $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); + + try { + $jobData = $this->prepareJobData($message); + $subprocessResult = $this->executeInSubprocess($jobData); + $response = $this->handleSubprocessResult($subprocessResult, $message); + } catch (Throwable $throwable) { + $message->setProperty('jobException', $throwable); + + $this->logger->debug(sprintf('Message encountered exception: %s', $throwable->getMessage())); + $this->dispatchEvent('Processor.message.exception', [ + 'message' => $jobMessage, + 'exception' => $throwable, + 'duration' => (int)((microtime(true) * 1000) - $startTime), + ]); + + return Result::requeue('Exception occurred while processing message'); + } + + $duration = (int)((microtime(true) * 1000) - $startTime); + + if ($response === InteropProcessor::ACK) { + $this->logger->debug('Message processed successfully'); + $this->dispatchEvent('Processor.message.success', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); + + return InteropProcessor::ACK; + } + + if ($response === InteropProcessor::REJECT) { + $this->logger->debug('Message processed with rejection'); + $this->dispatchEvent('Processor.message.reject', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); + + return InteropProcessor::REJECT; + } + + $this->logger->debug('Message processed with failure, requeuing'); + $this->dispatchEvent('Processor.message.failure', [ + 'message' => $jobMessage, + 'duration' => $duration, + ]); + + return InteropProcessor::REQUEUE; + } + + /** + * Handle subprocess result and return appropriate response. + * + * @param array $result Subprocess result + * @param \Interop\Queue\Message $message Original message + * @return string + * @throws \RuntimeException + */ + protected function handleSubprocessResult(array $result, QueueMessage $message): string + { + if ($result['success']) { + return $result['result']; + } + + if (isset($result['exception'])) { + $exception = $this->reconstructException($result['exception']); + $message->setProperty('jobException', $exception); + + throw $exception; + } + + throw new RuntimeException($result['error'] ?? 'Subprocess execution failed'); + } + + /** + * Prepare job data for subprocess execution. + * + * @param \Interop\Queue\Message $message Message + * @return array + */ + protected function prepareJobData(QueueMessage $message): array + { + $body = json_decode($message->getBody(), true); + if (json_last_error() !== JSON_ERROR_NONE) { + throw new RuntimeException('Invalid JSON in message body'); + } + + $properties = $message->getProperties(); + + return [ + 'messageClass' => get_class($message), + 'body' => $body, + 'properties' => $properties, + ]; + } + + /** + * Execute job in subprocess. + * + * @param array $jobData Job data + * @return array + */ + protected function executeInSubprocess(array $jobData): array + { + $command = $this->config['command'] ?? 'php bin/cake.php queue subprocess-runner'; + $timeout = $this->config['timeout'] ?? 300; + + $descriptors = [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ]; + + $process = proc_open($command, $descriptors, $pipes); + + if (!is_resource($process)) { + throw new RuntimeException('Failed to create subprocess'); + } + + $jobDataJson = json_encode($jobData); + if ($jobDataJson !== false) { + fwrite($pipes[0], $jobDataJson); + } + + fclose($pipes[0]); + + $output = ''; + $errorOutput = ''; + $startTime = time(); + + stream_set_blocking($pipes[1], false); + stream_set_blocking($pipes[2], false); + + while (true) { + if ($timeout > 0 && (time() - $startTime) > $timeout) { + proc_terminate($process, 9); + fclose($pipes[1]); + fclose($pipes[2]); + proc_close($process); + + return [ + 'success' => false, + 'error' => sprintf('Subprocess execution timeout after %d seconds', $timeout), + ]; + } + + $read = [$pipes[1], $pipes[2]]; + $write = null; + $except = null; + $selectResult = stream_select($read, $write, $except, 1); + + if ($selectResult === false) { + break; + } + + if (in_array($pipes[1], $read)) { + $chunk = fread($pipes[1], 8192); + if ($chunk !== false) { + $output .= $chunk; + } + } + + if (in_array($pipes[2], $read)) { + $chunk = fread($pipes[2], 8192); + if ($chunk !== false) { + $errorOutput .= $chunk; + } + } + + if (feof($pipes[1]) && feof($pipes[2])) { + break; + } + } + + fclose($pipes[1]); + fclose($pipes[2]); + $exitCode = proc_close($process); + + if ($exitCode !== 0 && empty($output)) { + return [ + 'success' => false, + 'error' => sprintf('Subprocess exited with code %d. Error: %s', $exitCode, $errorOutput), + ]; + } + + $result = json_decode($output, true); + if (json_last_error() !== JSON_ERROR_NONE) { + return [ + 'success' => false, + 'error' => 'Invalid JSON output from subprocess: ' . $output, + ]; + } + + return $result; + } + + /** + * Reconstruct exception from array data. + * + * @param array $exceptionData Exception data + * @return \RuntimeException + */ + protected function reconstructException(array $exceptionData): RuntimeException + { + $message = sprintf( + '%s: %s in %s:%d', + $exceptionData['class'] ?? 'Exception', + $exceptionData['message'] ?? 'Unknown error', + $exceptionData['file'] ?? 'unknown', + $exceptionData['line'] ?? 0, + ); + + return new RuntimeException($message, (int)($exceptionData['code'] ?? 0)); + } +} diff --git a/src/QueuePlugin.php b/src/QueuePlugin.php index 9f7202b..8be616d 100644 --- a/src/QueuePlugin.php +++ b/src/QueuePlugin.php @@ -25,6 +25,7 @@ use Cake\Queue\Command\JobCommand; use Cake\Queue\Command\PurgeFailedCommand; use Cake\Queue\Command\RequeueCommand; +use Cake\Queue\Command\SubprocessJobRunnerCommand; use Cake\Queue\Command\WorkerCommand; use InvalidArgumentException; @@ -80,6 +81,7 @@ public function console(CommandCollection $commands): CommandCollection return $commands ->add('queue worker', WorkerCommand::class) ->add('worker', WorkerCommand::class) + ->add('queue subprocess-runner', SubprocessJobRunnerCommand::class) ->add('queue requeue', RequeueCommand::class) ->add('queue purge_failed', PurgeFailedCommand::class); } @@ -96,5 +98,8 @@ public function services(ContainerInterface $container): void $container ->add(WorkerCommand::class) ->addArgument(ContainerInterface::class); + $container + ->add(SubprocessJobRunnerCommand::class) + ->addArgument(ContainerInterface::class); } } diff --git a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php new file mode 100644 index 0000000..b191a6a --- /dev/null +++ b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php @@ -0,0 +1,255 @@ + NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::ACK, $result); + } + + /** + * Test that executeJob handles REJECT response + */ + public function testExecuteJobReturnsReject(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnReject'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::REJECT, $result); + } + + /** + * Test that executeJob handles REQUEUE response + */ + public function testExecuteJobReturnsRequeue(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnRequeue'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::REQUEUE, $result); + } + + /** + * Test that executeJob handles job returning null (defaults to ACK) + */ + public function testExecuteJobReturnsNull(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnNull'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::ACK, $result); + } + + /** + * Test that executeJob handles properties correctly + */ + public function testExecuteJobWithProperties(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [ + 'attempts' => 1, + 'custom_property' => 'test_value', + ], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + $this->assertSame(Processor::ACK, $result); + } + + /** + * Test execute with invalid JSON input + */ + public function testExecuteWithInvalidJson(): void + { + $command = $this->getMockBuilder(SubprocessJobRunnerCommand::class) + ->onlyMethods(['readInput']) + ->getMock(); + + $command->expects($this->once()) + ->method('readInput') + ->willReturn('invalid json {]'); + + $args = $this->createStub(Arguments::class); + $io = $this->createMock(ConsoleIo::class); + + $io->expects($this->once()) + ->method('out') + ->with($this->stringContains('Invalid JSON input')); + + $result = $command->execute($args, $io); + + $this->assertSame(SubprocessJobRunnerCommand::CODE_ERROR, $result); + } + + /** + * Test execute with empty input + */ + public function testExecuteWithEmptyInput(): void + { + $command = $this->getMockBuilder(SubprocessJobRunnerCommand::class) + ->onlyMethods(['readInput']) + ->getMock(); + + $command->expects($this->once()) + ->method('readInput') + ->willReturn(''); + + $args = $this->createStub(Arguments::class); + $io = $this->createMock(ConsoleIo::class); + + $io->expects($this->once()) + ->method('out') + ->with($this->stringContains('No input received')); + + $result = $command->execute($args, $io); + + $this->assertSame(SubprocessJobRunnerCommand::CODE_ERROR, $result); + } + + /** + * Test execute with job that throws exception + */ + public function testExecuteWithJobException(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processAndThrowException'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = $this->getMockBuilder(SubprocessJobRunnerCommand::class) + ->onlyMethods(['readInput']) + ->getMock(); + + $command->expects($this->once()) + ->method('readInput') + ->willReturn(json_encode($jobData)); + + $args = $this->createStub(Arguments::class); + $io = $this->createMock(ConsoleIo::class); + + $io->expects($this->once()) + ->method('out') + ->with($this->callback(function ($output) { + $result = json_decode($output, true); + + return $result['success'] === false && + isset($result['exception']) && + in_array($result['exception']['class'], ['RuntimeException', 'Exception']); + })); + + $result = $command->execute($args, $io); + + $this->assertSame(SubprocessJobRunnerCommand::CODE_SUCCESS, $result); + } + + /** + * Test outputResult method + */ + public function testOutputResult(): void + { + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('outputResult'); + + $io = $this->createMock(ConsoleIo::class); + $io->expects($this->once()) + ->method('out') + ->with('{"success":true,"result":"ack"}'); + + $method->invoke($command, $io, ['success' => true, 'result' => 'ack']); + } +} diff --git a/tests/TestCase/Command/WorkerCommandTest.php b/tests/TestCase/Command/WorkerCommandTest.php index f77552d..cbee63a 100644 --- a/tests/TestCase/Command/WorkerCommandTest.php +++ b/tests/TestCase/Command/WorkerCommandTest.php @@ -438,4 +438,70 @@ public function testCustomProcessorWithListener() $this->assertDebugLogContains('Debug job was run'); $this->assertDebugLogContains('TestCustomProcessor processing message'); } + + /** + * Test that queue processes job with subprocess flag + */ + #[RunInSeparateProcess] + public function testQueueProcessesJobWithSubprocessFlag() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'subprocess' => [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ], + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --subprocess --logger=debug --verbose'); + + // In subprocess mode, logs from the job itself are isolated to the subprocess + // We can only verify that the parent process logged successful processing + $this->assertDebugLogContains('Message processed successfully'); + } + + /** + * Test that queue processes job with subprocess enabled in config + */ + #[RunInSeparateProcess] + public function testQueueProcessesJobWithSubprocessConfig() + { + $config = [ + 'queue' => 'default', + 'url' => 'file:///' . TMP . DS . 'queue', + 'receiveTimeout' => 100, + 'subprocess' => [ + 'enabled' => true, + 'timeout' => 30, + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ], + ]; + Configure::write('Queue', ['default' => $config]); + + Log::setConfig('debug', [ + 'className' => 'Array', + 'levels' => ['notice', 'info', 'debug'], + ]); + + QueueManager::setConfig('default', $config); + QueueManager::push(LogToDebugJob::class); + QueueManager::drop('default'); + + $this->exec('queue worker --max-jobs=1 --logger=debug --verbose'); + + // In subprocess mode, logs from the job itself are isolated to the subprocess + // We can only verify that the parent process logged successful processing + $this->assertDebugLogContains('Message processed successfully'); + } } diff --git a/tests/TestCase/Mailer/Transport/QueueTransportTest.php b/tests/TestCase/Mailer/Transport/QueueTransportTest.php index 7da8041..761c6b3 100644 --- a/tests/TestCase/Mailer/Transport/QueueTransportTest.php +++ b/tests/TestCase/Mailer/Transport/QueueTransportTest.php @@ -27,7 +27,7 @@ class QueueTransportTest extends TestCase { use QueueTestTrait; - private $fsQueuePath = TMP . DS . 'queue'; + private string $fsQueuePath = TMP . DS . 'queue'; private function getFsQueueUrl(): string { @@ -41,10 +41,8 @@ private function getFsQueueFile(): string /** * Test send - * - * @return void */ - public function testSend() + public function testSend(): void { QueueManager::setConfig('default', [ 'queue' => 'default', @@ -87,10 +85,8 @@ public function testSend() /** * Test send custom transport - * - * @return void */ - public function testSendCustomTransport() + public function testSendCustomTransport(): void { QueueManager::setConfig('default', [ 'queue' => 'default', @@ -118,10 +114,8 @@ public function testSendCustomTransport() /** * Test send backwards compatibility transport config - * - * @return void */ - public function testSendBcTransport() + public function testSendBcTransport(): void { QueueManager::setConfig('default', [ 'queue' => 'default', diff --git a/tests/TestCase/PluginTest.php b/tests/TestCase/PluginTest.php index d695069..16cb70b 100644 --- a/tests/TestCase/PluginTest.php +++ b/tests/TestCase/PluginTest.php @@ -17,13 +17,13 @@ class PluginTest extends TestCase * * @return void */ - public function testBootstrapNoConfig() + public function testBootstrapNoConfig(): void { $this->expectException(InvalidArgumentException::class); $this->expectExceptionMessage('Missing `Queue` configuration key, please check the CakePHP Queue documentation to complete the plugin setup'); Configure::delete('Queue'); $plugin = new QueuePlugin(); - $app = $this->getMockBuilder(Application::class)->disableOriginalConstructor()->getMock(); + $app = $this->createStub(Application::class); $plugin->bootstrap($app); } @@ -32,7 +32,7 @@ public function testBootstrapNoConfig() * * @return void */ - public function testBootstrapWithConfig() + public function testBootstrapWithConfig(): void { $queueConfig = [ 'url' => 'null:', @@ -41,7 +41,7 @@ public function testBootstrapWithConfig() ]; Configure::write('Queue', ['default' => $queueConfig]); $plugin = new QueuePlugin(); - $app = $this->getMockBuilder(Application::class)->disableOriginalConstructor()->getMock(); + $app = $this->createStub(Application::class); $plugin->bootstrap($app); $queueConfig['url'] = [ 'transport' => 'null:', diff --git a/tests/TestCase/Queue/SubprocessProcessorTest.php b/tests/TestCase/Queue/SubprocessProcessorTest.php new file mode 100644 index 0000000..1cd6f07 --- /dev/null +++ b/tests/TestCase/Queue/SubprocessProcessorTest.php @@ -0,0 +1,505 @@ + [TestProcessor::class, 'processReturnAck'], + 'args' => ['test' => 'data'], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + $queueMessage->setProperty('attempts', 1); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('prepareJobData'); + + $jobData = $method->invoke($processor, $queueMessage); + + $this->assertArrayHasKey('messageClass', $jobData); + $this->assertArrayHasKey('body', $jobData); + $this->assertArrayHasKey('properties', $jobData); + $this->assertSame($messageBody, $jobData['body']); + $this->assertArrayHasKey('attempts', $jobData['properties']); + $this->assertSame(1, $jobData['properties']['attempts']); + } + + /** + * Test reconstructException method + */ + public function testReconstructException(): void + { + $exceptionData = [ + 'class' => 'RuntimeException', + 'message' => 'Test error', + 'code' => 500, + 'file' => '/path/to/file.php', + 'line' => 42, + ]; + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('reconstructException'); + + $exception = $method->invoke($processor, $exceptionData); + + $this->assertInstanceOf(RuntimeException::class, $exception); + $this->assertStringContainsString('Test error', $exception->getMessage()); + $this->assertStringContainsString('RuntimeException', $exception->getMessage()); + $this->assertSame(500, $exception->getCode()); + } + + /** + * Test handleSubprocessResult with success response + */ + public function testHandleSubprocessResultSuccess(): void + { + $result = [ + 'success' => true, + 'result' => InteropProcessor::ACK, + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $actual = $method->invoke($processor, $result, $queueMessage); + + $this->assertSame(InteropProcessor::ACK, $actual); + } + + /** + * Test handleSubprocessResult with reject response + */ + public function testHandleSubprocessResultReject(): void + { + $result = [ + 'success' => true, + 'result' => InteropProcessor::REJECT, + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnReject'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $actual = $method->invoke($processor, $result, $queueMessage); + + $this->assertSame(InteropProcessor::REJECT, $actual); + } + + /** + * Test handleSubprocessResult with requeue response + */ + public function testHandleSubprocessResultRequeue(): void + { + $result = [ + 'success' => true, + 'result' => InteropProcessor::REQUEUE, + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnRequeue'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $actual = $method->invoke($processor, $result, $queueMessage); + + $this->assertSame(InteropProcessor::REQUEUE, $actual); + } + + /** + * Test handleSubprocessResult with exception + */ + public function testHandleSubprocessResultWithException(): void + { + $result = [ + 'success' => false, + 'exception' => [ + 'class' => 'RuntimeException', + 'message' => 'Test error', + 'code' => 500, + 'file' => '/path/to/file.php', + 'line' => 42, + ], + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processAndThrowException'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Test error'); + + $method->invoke($processor, $result, $queueMessage); + } + + /** + * Test handleSubprocessResult with error message + */ + public function testHandleSubprocessResultWithError(): void + { + $result = [ + 'success' => false, + 'error' => 'Subprocess execution failed', + ]; + + $messageBody = ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('handleSubprocessResult'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Subprocess execution failed'); + + $method->invoke($processor, $result, $queueMessage); + } + + /** + * Test real subprocess execution with ACK result + */ + public function testRealSubprocessExecutionAck(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertTrue($result['success']); + $this->assertSame(InteropProcessor::ACK, $result['result']); + } + + /** + * Test real subprocess execution with REJECT result + */ + public function testRealSubprocessExecutionReject(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnReject'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertTrue($result['success']); + $this->assertSame(InteropProcessor::REJECT, $result['result']); + } + + /** + * Test real subprocess execution with exception + */ + public function testRealSubprocessExecutionWithException(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processAndThrowException'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('exception', $result); + $this->assertContains($result['exception']['class'], ['RuntimeException', 'Exception']); + } + + /** + * Test subprocess timeout handling + */ + public function testSubprocessTimeout(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'timeout' => 1, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + // Create job data that simulates a long-running process + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => $messageBody, + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + // Should complete successfully (fast job) or timeout + $this->assertIsArray($result); + $this->assertArrayHasKey('success', $result); + } + + /** + * Test subprocess with invalid command (non-existent binary) + */ + public function testSubprocessWithInvalidCommand(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $message = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => '/nonexistent/binary queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $message); + $result = $method->invoke($processor, $jobData); + + // Invalid command will fail with an error result + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + } + + /** + * Test subprocess with invalid JSON in message body + */ + public function testPrepareJobDataWithInvalidJson(): void + { + $queueMessage = new NullMessage('invalid json {]'); + + $logger = new ArrayLog(); + $processor = new SubprocessProcessor($logger); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('prepareJobData'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Invalid JSON in message body'); + + $method->invoke($processor, $queueMessage); + } + + /** + * Test executeInSubprocess returns error when subprocess returns invalid JSON + */ + public function testExecuteInSubprocessWithInvalidJsonOutput(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'echo invalid-json-output', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('Invalid JSON output from subprocess', $result['error']); + } + + /** + * Test executeInSubprocess handles non-zero exit code + */ + public function testExecuteInSubprocessWithNonZeroExitCode(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "exit(1);"', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + } + + /** + * Test full process() method with real subprocess execution + */ + public function testProcessJobInSubprocess(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $context = (new NullConnectionFactory())->createContext(); + $result = $processor->process($queueMessage, $context); + + $this->assertSame(InteropProcessor::ACK, $result); + } + + /** + * Test full process() with job that rejects + */ + public function testProcessJobInSubprocessReject(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnReject'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $context = (new NullConnectionFactory())->createContext(); + $result = $processor->process($queueMessage, $context); + + $this->assertSame(InteropProcessor::REJECT, $result); + } + + /** + * Test full process() with job that throws exception + */ + public function testProcessJobInSubprocessWithException(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processAndThrowException'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ]; + $processor = new SubprocessProcessor($logger, $config); + + $context = (new NullConnectionFactory())->createContext(); + $result = $processor->process($queueMessage, $context); + + // Should requeue on exception - result can be Result object or string + /** @phpstan-ignore cast.string */ + $this->assertStringContainsString('requeue', (string)$result); + } +} diff --git a/tests/TestCase/QueueManagerTest.php b/tests/TestCase/QueueManagerTest.php index 04576a0..85c8474 100644 --- a/tests/TestCase/QueueManagerTest.php +++ b/tests/TestCase/QueueManagerTest.php @@ -99,7 +99,7 @@ public function testSetMultipleConfigs() public function testSetConfigWithInvalidConfigValue() { $this->expectException(LogicException::class); - QueueManager::setConfig('test', null); + QueueManager::setConfig('test'); } public function testSetConfigInvalidKeyValue() diff --git a/tests/bootstrap.php b/tests/bootstrap.php index e2f7ee2..e0f2731 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -93,6 +93,13 @@ // The name of a configured logger, default: null 'logger' => 'stdout', + + // Subprocess configuration for development + 'subprocess' => [ + 'enabled' => false, + 'timeout' => 30, + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + ], ], ]); diff --git a/tests/test_app/bin/cake b/tests/test_app/bin/cake new file mode 100755 index 0000000..4b696c8 --- /dev/null +++ b/tests/test_app/bin/cake @@ -0,0 +1,75 @@ +#!/usr/bin/env sh +################################################################################ +# +# Cake is a shell script for invoking CakePHP shell commands +# +# CakePHP(tm) : Rapid Development Framework (https://cakephp.org) +# Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org) +# +# Licensed under The MIT License +# For full copyright and license information, please see the LICENSE.txt +# Redistributions of files must retain the above copyright notice. +# +# @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org) +# @link https://cakephp.org CakePHP(tm) Project +# @since 1.2.0 +# @license https://opensource.org/licenses/mit-license.php MIT License +# +################################################################################ + +# Canonicalize by following every symlink of the given name recursively +canonicalize() { + NAME="$1" + if [ -f "$NAME" ] + then + DIR=$(dirname -- "$NAME") + NAME=$(cd -P "$DIR" > /dev/null && pwd -P)/$(basename -- "$NAME") + fi + while [ -h "$NAME" ]; do + DIR=$(dirname -- "$NAME") + SYM=$(readlink "$NAME") + NAME=$(cd "$DIR" > /dev/null && cd "$(dirname -- "$SYM")" > /dev/null && pwd)/$(basename -- "$SYM") + done + echo "$NAME" +} + +# Find a CLI version of PHP +findCliPhp() { + for TESTEXEC in php php-cli /usr/local/bin/php + do + SAPI=$(echo "" | $TESTEXEC 2>/dev/null) + if [ "$SAPI" = "cli" ] + then + echo $TESTEXEC + return + fi + done + echo "Failed to find a CLI version of PHP; falling back to system standard php executable" >&2 + echo "php"; +} + +# If current path is a symlink, resolve to real path +realname="$0" +if [ -L "$realname" ] +then + realname=$(readlink -f "$0") +fi + +CONSOLE=$(dirname -- "$(canonicalize "$realname")") +APP=$(dirname "$CONSOLE") + +# If your CLI PHP is somewhere that this doesn't find, you can define a PHP environment +# variable with the correct path in it. +if [ -z "$PHP" ] +then + PHP=$(findCliPhp) +fi + +if [ "$(basename "$realname")" != 'cake' ] +then + exec "$PHP" "$CONSOLE"/cake.php "$(basename "$realname")" "$@" +else + exec "$PHP" "$CONSOLE"/cake.php "$@" +fi + +exit diff --git a/tests/test_app/bin/cake.php b/tests/test_app/bin/cake.php new file mode 100644 index 0000000..05dd85a --- /dev/null +++ b/tests/test_app/bin/cake.php @@ -0,0 +1,13 @@ +#!/usr/bin/php -q +run($argv)); diff --git a/tests/test_app/src/Application.php b/tests/test_app/src/Application.php index e8119b2..73f8964 100644 --- a/tests/test_app/src/Application.php +++ b/tests/test_app/src/Application.php @@ -3,6 +3,7 @@ namespace TestApp; +use Cake\Core\Configure; use Cake\Core\ContainerInterface; use Cake\Http\BaseApplication; use Cake\Http\MiddlewareQueue; @@ -24,6 +25,23 @@ public function bootstrap(): void { $this->addPlugin('Cake/Queue'); $this->addPlugin('Bake'); + + // Only set default Queue configuration if no Queue config exists at all + // This allows tests to fully control configuration + if (!Configure::check('Queue')) { + Configure::write('Queue', [ + 'default' => [ + 'url' => 'null:', + 'queue' => 'default', + 'logger' => 'stdout', + 'subprocess' => [ + 'enabled' => false, + 'timeout' => 30, + 'command' => 'php ' . dirname(__DIR__, 2) . '/bin/cake.php queue subprocess-runner', + ], + ], + ]); + } } public function services(ContainerInterface $container): void From 3ccfd9a0a6087c58b24cfc4b147d6649f10cfc21 Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Wed, 17 Dec 2025 15:39:07 +0100 Subject: [PATCH 2/7] Implement copilot suggestions if valid --- src/Command/SubprocessJobRunnerCommand.php | 11 +- src/Command/WorkerCommand.php | 7 +- src/Queue/SubprocessProcessor.php | 138 ++++++++++++++------- 3 files changed, 108 insertions(+), 48 deletions(-) diff --git a/src/Command/SubprocessJobRunnerCommand.php b/src/Command/SubprocessJobRunnerCommand.php index e6559c8..e36c8ba 100644 --- a/src/Command/SubprocessJobRunnerCommand.php +++ b/src/Command/SubprocessJobRunnerCommand.php @@ -24,8 +24,10 @@ use Cake\Queue\Queue\Processor; use Enqueue\Null\NullConnectionFactory; use Enqueue\Null\NullMessage; +use Interop\Queue\Message as QueueMessage; use Interop\Queue\Processor as InteropProcessor; use Psr\Log\NullLogger; +use RuntimeException; use Throwable; /** @@ -124,9 +126,6 @@ protected function readInput(ConsoleIo $io): string } $input .= $chunk; - if ($input !== '' && strlen($chunk) < 8192) { - break; - } } return $input; @@ -144,6 +143,12 @@ protected function executeJob(array $data): string $context = $connectionFactory->createContext(); $messageClass = $data['messageClass'] ?? NullMessage::class; + + // Validate message class for security + if (!class_exists($messageClass) || !is_subclass_of($messageClass, QueueMessage::class)) { + throw new RuntimeException(sprintf('Invalid message class: %s', $messageClass)); + } + $messageBody = json_encode($data['body']); /** @var \Interop\Queue\Message $queueMessage */ diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 13cecb6..8c08ec0 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -197,20 +197,21 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $this->abort(); } - $processor = new $processorClass($logger, $this->container); - + // Check subprocess mode before instantiating processor if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) { $subprocessConfig = array_merge( $config['subprocess'] ?? [], ['enabled' => true], ); - if (!($processor instanceof Processor)) { + if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) { $io->error('Subprocess mode is only supported with the default Processor class'); $this->abort(); } $processor = new SubprocessProcessor($logger, $subprocessConfig, $this->container); + } else { + $processor = new $processorClass($logger, $this->container); } return $processor; diff --git a/src/Queue/SubprocessProcessor.php b/src/Queue/SubprocessProcessor.php index a1fefa9..8cbbf9b 100644 --- a/src/Queue/SubprocessProcessor.php +++ b/src/Queue/SubprocessProcessor.php @@ -29,13 +29,36 @@ /** * Subprocess processor that executes jobs in isolated PHP processes. + * + * This processor spawns a new PHP process for each job, providing complete isolation + * between jobs. This is useful for development environments where code changes need + * to be reloaded without restarting the worker. + * + * Configuration options: + * - `command`: Full command to execute (default: 'php bin/cake.php queue subprocess-runner') + * - `timeout`: Maximum execution time in seconds (default: 300) + * - `maxOutputSize`: Maximum output size in bytes (default: 1048576 = 1MB) + * + * Example configuration: + * ``` + * 'Queue' => [ + * 'default' => [ + * 'subprocess' => [ + * 'command' => 'php bin/cake.php queue subprocess-runner', + * 'timeout' => 60, + * 'maxOutputSize' => 2097152, // 2MB + * ], + * ], + * ], + * ``` + * * Extends Processor to reuse event handling and processing logic (DRY principle). */ class SubprocessProcessor extends Processor { /** * @param \Psr\Log\LoggerInterface $logger Logger instance - * @param array $config Subprocess configuration + * @param array $config Subprocess configuration options * @param \Cake\Core\ContainerInterface|null $container DI container instance */ public function __construct( @@ -188,63 +211,94 @@ protected function executeInSubprocess(array $jobData): array throw new RuntimeException('Failed to create subprocess'); } - $jobDataJson = json_encode($jobData); - if ($jobDataJson !== false) { - fwrite($pipes[0], $jobDataJson); - } + try { + $jobDataJson = json_encode($jobData); + if ($jobDataJson !== false) { + fwrite($pipes[0], $jobDataJson); + } - fclose($pipes[0]); + fclose($pipes[0]); - $output = ''; - $errorOutput = ''; - $startTime = time(); + $output = ''; + $errorOutput = ''; + $startTime = time(); + $maxOutputSize = $this->config['maxOutputSize'] ?? 1048576; // 1MB default - stream_set_blocking($pipes[1], false); - stream_set_blocking($pipes[2], false); + stream_set_blocking($pipes[1], false); + stream_set_blocking($pipes[2], false); - while (true) { - if ($timeout > 0 && (time() - $startTime) > $timeout) { - proc_terminate($process, 9); - fclose($pipes[1]); - fclose($pipes[2]); - proc_close($process); + while (true) { + if ($timeout > 0 && (time() - $startTime) > $timeout) { + proc_terminate($process, 9); - return [ - 'success' => false, - 'error' => sprintf('Subprocess execution timeout after %d seconds', $timeout), - ]; - } + return [ + 'success' => false, + 'error' => sprintf('Subprocess execution timeout after %d seconds', $timeout), + ]; + } - $read = [$pipes[1], $pipes[2]]; - $write = null; - $except = null; - $selectResult = stream_select($read, $write, $except, 1); + $read = [$pipes[1], $pipes[2]]; + $write = null; + $except = null; + $selectResult = stream_select($read, $write, $except, 1); - if ($selectResult === false) { - break; - } + if ($selectResult === false) { + return [ + 'success' => false, + 'error' => 'Stream select failed', + ]; + } + + if (in_array($pipes[1], $read)) { + $chunk = fread($pipes[1], 8192); + if ($chunk !== false) { + if (strlen($output) + strlen($chunk) > $maxOutputSize) { + proc_terminate($process, 9); - if (in_array($pipes[1], $read)) { - $chunk = fread($pipes[1], 8192); - if ($chunk !== false) { - $output .= $chunk; + return [ + 'success' => false, + 'error' => sprintf('Subprocess output exceeded maximum size of %d bytes', $maxOutputSize), + ]; + } + + $output .= $chunk; + } } - } - if (in_array($pipes[2], $read)) { - $chunk = fread($pipes[2], 8192); - if ($chunk !== false) { - $errorOutput .= $chunk; + if (in_array($pipes[2], $read)) { + $chunk = fread($pipes[2], 8192); + if ($chunk !== false) { + if (strlen($errorOutput) + strlen($chunk) > $maxOutputSize) { + proc_terminate($process, 9); + + return [ + 'success' => false, + 'error' => sprintf( + 'Subprocess error output exceeded maximum size of %d bytes', + $maxOutputSize, + ), + ]; + } + + $errorOutput .= $chunk; + } + } + + if (feof($pipes[1]) && feof($pipes[2])) { + break; } } + } finally { + // Always cleanup resources + if (is_resource($pipes[1])) { + fclose($pipes[1]); + } - if (feof($pipes[1]) && feof($pipes[2])) { - break; + if (is_resource($pipes[2])) { + fclose($pipes[2]); } } - fclose($pipes[1]); - fclose($pipes[2]); $exitCode = proc_close($process); if ($exitCode !== 0 && empty($output)) { From 487b9547d5a81bd5a703c5ab56ea874c0de23071 Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Wed, 17 Dec 2025 16:09:00 +0100 Subject: [PATCH 3/7] Redirect stdout loggers to stderr preventing json response polutions --- src/Command/SubprocessJobRunnerCommand.php | 35 +++++++++- src/Command/WorkerCommand.php | 24 ++++++- src/Queue/SubprocessProcessor.php | 6 ++ .../SubprocessJobRunnerCommandTest.php | 64 +++++++++++++++++++ tests/test_app/src/Job/MultilineLogJob.php | 26 ++++++++ 5 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 tests/test_app/src/Job/MultilineLogJob.php diff --git a/src/Command/SubprocessJobRunnerCommand.php b/src/Command/SubprocessJobRunnerCommand.php index e36c8ba..149fd71 100644 --- a/src/Command/SubprocessJobRunnerCommand.php +++ b/src/Command/SubprocessJobRunnerCommand.php @@ -20,12 +20,15 @@ use Cake\Console\Arguments; use Cake\Console\ConsoleIo; use Cake\Core\ContainerInterface; +use Cake\Log\Engine\ConsoleLog; +use Cake\Log\Log; use Cake\Queue\Job\Message; use Cake\Queue\Queue\Processor; use Enqueue\Null\NullConnectionFactory; use Enqueue\Null\NullMessage; use Interop\Queue\Message as QueueMessage; use Interop\Queue\Processor as InteropProcessor; +use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use RuntimeException; use Throwable; @@ -160,8 +163,10 @@ protected function executeJob(array $data): string } } + $logger = $this->configureLogging($data); + $message = new Message($queueMessage, $context, $this->container); - $processor = new Processor(new NullLogger(), $this->container); + $processor = new Processor($logger, $this->container); $result = $processor->processMessage($message); @@ -170,6 +175,34 @@ protected function executeJob(array $data): string return is_string($result) ? $result : (string)$result; } + /** + * Configure logging to use STDERR to prevent job logs from contaminating STDOUT. + * Reconfigures all CakePHP loggers to write to STDERR with no additional formatting. + * + * @param array $data Job data + * @return \Psr\Log\LoggerInterface + */ + protected function configureLogging(array $data): LoggerInterface + { + // Drop all existing loggers to prevent duplicate logging + foreach (Log::configured() as $loggerName) { + Log::drop($loggerName); + } + + // Configure a single stderr logger + Log::setConfig('default', [ + 'className' => ConsoleLog::class, + 'stream' => 'php://stderr', + ]); + + $logger = Log::engine('default'); + if (!$logger instanceof LoggerInterface) { + $logger = new NullLogger(); + } + + return $logger; + } + /** * Output result as JSON to STDOUT. * diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index 8c08ec0..cc845f6 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -172,6 +172,21 @@ protected function getLogger(Arguments $args): LoggerInterface return $logger ?? new NullLogger(); } + /** + * Get logger for subprocess output. + * Always returns a real logger for subprocess mode to show job logs. + * + * @param \Cake\Console\Arguments $args Arguments + * @return \Psr\Log\LoggerInterface + */ + protected function getSubprocessLogger(Arguments $args): LoggerInterface + { + $loggerName = (string)$args->getOption('logger'); + $logger = Log::engine($loggerName); + + return $logger ?? new NullLogger(); + } + /** * Creates and returns a Processor object * @@ -201,7 +216,10 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) { $subprocessConfig = array_merge( $config['subprocess'] ?? [], - ['enabled' => true], + [ + 'enabled' => true, + 'logger' => $config['logger'] ?? (string)$args->getOption('logger'), + ], ); if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) { @@ -209,7 +227,9 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface $this->abort(); } - $processor = new SubprocessProcessor($logger, $subprocessConfig, $this->container); + // Use a real logger for subprocess output so logs are visible + $subprocessLogger = $this->getSubprocessLogger($args); + $processor = new SubprocessProcessor($subprocessLogger, $subprocessConfig, $this->container); } else { $processor = new $processorClass($logger, $this->container); } diff --git a/src/Queue/SubprocessProcessor.php b/src/Queue/SubprocessProcessor.php index 8cbbf9b..afabc6d 100644 --- a/src/Queue/SubprocessProcessor.php +++ b/src/Queue/SubprocessProcessor.php @@ -185,6 +185,7 @@ protected function prepareJobData(QueueMessage $message): array 'messageClass' => get_class($message), 'body' => $body, 'properties' => $properties, + 'logger' => $this->config['logger'] ?? 'stderr', ]; } @@ -281,6 +282,11 @@ protected function executeInSubprocess(array $jobData): array } $errorOutput .= $chunk; + // Stream subprocess logs to parent's stderr in real-time + // Skip in PHPUnit test context to avoid test framework issues + if (!defined('PHPUNIT_COMPOSER_INSTALL') && !defined('__PHPUNIT_PHAR__')) { + fwrite(STDERR, $chunk); + } } } diff --git a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php index b191a6a..a020738 100644 --- a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php +++ b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php @@ -24,6 +24,7 @@ use Enqueue\Null\NullMessage; use Interop\Queue\Processor; use ReflectionClass; +use TestApp\Job\MultilineLogJob; use TestApp\TestProcessor; class SubprocessJobRunnerCommandTest extends TestCase @@ -252,4 +253,67 @@ public function testOutputResult(): void $method->invoke($command, $io, ['success' => true, 'result' => 'ack']); } + + /** + * Test that subprocess jobs with multiple log lines properly separate logs from JSON output + */ + public function testLogsRedirectedToStderr(): void + { + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [MultilineLogJob::class, 'execute'], + 'args' => [], + ], + 'properties' => [], + 'logger' => 'debug', + ]; + + $command = 'php ' . ROOT . 'bin/cake.php queue subprocess-runner'; + + $descriptors = [ + 0 => ['pipe', 'r'], + 1 => ['pipe', 'w'], + 2 => ['pipe', 'w'], + ]; + + $process = proc_open($command, $descriptors, $pipes); + $this->assertIsResource($process); + + // Write job data to STDIN + $jobDataJson = json_encode($jobData); + if ($jobDataJson !== false) { + fwrite($pipes[0], $jobDataJson); + } + + fclose($pipes[0]); + + // Read STDOUT and STDERR + $stdout = stream_get_contents($pipes[1]); + fclose($pipes[1]); + + $stderr = stream_get_contents($pipes[2]); + fclose($pipes[2]); + + proc_close($process); + + // STDOUT should be valid JSON without any log messages + $result = json_decode($stdout, true); + $this->assertIsArray($result, 'STDOUT should contain valid JSON: ' . $stdout); + $this->assertArrayHasKey('success', $result); + $this->assertTrue($result['success']); + $this->assertSame(Processor::ACK, $result['result']); + + // STDOUT should not contain any job log messages + $this->assertStringNotContainsString('Job execution started', $stdout); + $this->assertStringNotContainsString('Processing step', $stdout); + $this->assertStringNotContainsString('Job execution finished', $stdout); + + // All log messages should be in STDERR + $this->assertStringContainsString('Job execution started', $stderr); + $this->assertStringContainsString('Processing step 1 completed', $stderr); + $this->assertStringContainsString('Processing step 2 completed', $stderr); + $this->assertStringContainsString('Processing step 3 completed', $stderr); + $this->assertStringContainsString('Job execution finished', $stderr); + } } diff --git a/tests/test_app/src/Job/MultilineLogJob.php b/tests/test_app/src/Job/MultilineLogJob.php new file mode 100644 index 0000000..b5624b5 --- /dev/null +++ b/tests/test_app/src/Job/MultilineLogJob.php @@ -0,0 +1,26 @@ +log('Job execution started', LogLevel::INFO); + $this->log('Processing step 1 completed', LogLevel::INFO); + $this->log('Processing step 2 completed', LogLevel::INFO); + $this->log('Processing step 3 completed', LogLevel::INFO); + $this->log('Job execution finished', LogLevel::INFO); + + return Processor::ACK; + } +} From aa87aeeea505ea93fa3f67f310d004982d729a92 Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Thu, 18 Dec 2025 13:23:13 +0100 Subject: [PATCH 4/7] refactor: subprocessProcessor now extends processor --- src/Queue/Processor.php | 14 +++++- src/Queue/SubprocessProcessor.php | 77 +++---------------------------- 2 files changed, 20 insertions(+), 71 deletions(-) diff --git a/src/Queue/Processor.php b/src/Queue/Processor.php index bff6628..e1fed4c 100644 --- a/src/Queue/Processor.php +++ b/src/Queue/Processor.php @@ -71,7 +71,7 @@ public function process(QueueMessage $message, Context $context): string|object $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); try { - $response = $this->processMessage($jobMessage); + $response = $this->executeJob($jobMessage, $message); } catch (Throwable $throwable) { $message->setProperty('jobException', $throwable); @@ -116,6 +116,18 @@ public function process(QueueMessage $message, Context $context): string|object return InteropProcessor::REQUEUE; } + /** + * Execute the job and return the response. + * + * @param \Cake\Queue\Job\Message $jobMessage Job message wrapper + * @param \Interop\Queue\Message $queueMessage Original queue message + * @return object|string with __toString method implemented + */ + protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object + { + return $this->processMessage($jobMessage); + } + /** * @param \Cake\Queue\Job\Message $message Message. * @return object|string with __toString method implemented diff --git a/src/Queue/SubprocessProcessor.php b/src/Queue/SubprocessProcessor.php index afabc6d..f9127c1 100644 --- a/src/Queue/SubprocessProcessor.php +++ b/src/Queue/SubprocessProcessor.php @@ -18,14 +18,9 @@ use Cake\Core\ContainerInterface; use Cake\Queue\Job\Message; -use Enqueue\Consumption\Result; -use Error; -use Interop\Queue\Context; use Interop\Queue\Message as QueueMessage; -use Interop\Queue\Processor as InteropProcessor; use Psr\Log\LoggerInterface; use RuntimeException; -use Throwable; /** * Subprocess processor that executes jobs in isolated PHP processes. @@ -70,76 +65,18 @@ public function __construct( } /** - * Process a message in a subprocess. - * Overrides parent to execute in subprocess, but reuses parent's event dispatching. + * Execute the job in a subprocess. * - * @param \Interop\Queue\Message $message Message. - * @param \Interop\Queue\Context $context Context. + * @param \Cake\Queue\Job\Message $jobMessage Job message wrapper + * @param \Interop\Queue\Message $queueMessage Original queue message * @return object|string with __toString method implemented */ - public function process(QueueMessage $message, Context $context): string|object + protected function executeJob(Message $jobMessage, QueueMessage $queueMessage): string|object { - $this->dispatchEvent('Processor.message.seen', ['queueMessage' => $message]); + $jobData = $this->prepareJobData($queueMessage); + $subprocessResult = $this->executeInSubprocess($jobData); - $jobMessage = new Message($message, $context, $this->container); - try { - $jobMessage->getCallable(); - } catch (RuntimeException | Error $e) { - $this->logger->debug('Invalid callable for message. Rejecting message from queue.'); - $this->dispatchEvent('Processor.message.invalid', ['message' => $jobMessage]); - - return InteropProcessor::REJECT; - } - - $startTime = microtime(true) * 1000; - $this->dispatchEvent('Processor.message.start', ['message' => $jobMessage]); - - try { - $jobData = $this->prepareJobData($message); - $subprocessResult = $this->executeInSubprocess($jobData); - $response = $this->handleSubprocessResult($subprocessResult, $message); - } catch (Throwable $throwable) { - $message->setProperty('jobException', $throwable); - - $this->logger->debug(sprintf('Message encountered exception: %s', $throwable->getMessage())); - $this->dispatchEvent('Processor.message.exception', [ - 'message' => $jobMessage, - 'exception' => $throwable, - 'duration' => (int)((microtime(true) * 1000) - $startTime), - ]); - - return Result::requeue('Exception occurred while processing message'); - } - - $duration = (int)((microtime(true) * 1000) - $startTime); - - if ($response === InteropProcessor::ACK) { - $this->logger->debug('Message processed successfully'); - $this->dispatchEvent('Processor.message.success', [ - 'message' => $jobMessage, - 'duration' => $duration, - ]); - - return InteropProcessor::ACK; - } - - if ($response === InteropProcessor::REJECT) { - $this->logger->debug('Message processed with rejection'); - $this->dispatchEvent('Processor.message.reject', [ - 'message' => $jobMessage, - 'duration' => $duration, - ]); - - return InteropProcessor::REJECT; - } - - $this->logger->debug('Message processed with failure, requeuing'); - $this->dispatchEvent('Processor.message.failure', [ - 'message' => $jobMessage, - 'duration' => $duration, - ]); - - return InteropProcessor::REQUEUE; + return $this->handleSubprocessResult($subprocessResult, $queueMessage); } /** From 8d7a9ec98be3721cba5bb1bf352ab7865741fd5a Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Thu, 18 Dec 2025 13:36:05 +0100 Subject: [PATCH 5/7] refactor: consolidate logger methods and clean up Processor detection --- src/Command/WorkerCommand.php | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/src/Command/WorkerCommand.php b/src/Command/WorkerCommand.php index cc845f6..44ec0c1 100644 --- a/src/Command/WorkerCommand.php +++ b/src/Command/WorkerCommand.php @@ -160,33 +160,19 @@ protected function getQueueExtension(Arguments $args, LoggerInterface $logger): * Creates and returns a LoggerInterface object * * @param \Cake\Console\Arguments $args Arguments + * @param bool $forceLogger Force logger creation even without verbose flag * @return \Psr\Log\LoggerInterface */ - protected function getLogger(Arguments $args): LoggerInterface + protected function getLogger(Arguments $args, bool $forceLogger = false): LoggerInterface { $logger = null; - if (!empty($args->getOption('verbose'))) { + if ($forceLogger || !empty($args->getOption('verbose'))) { $logger = Log::engine((string)$args->getOption('logger')); } return $logger ?? new NullLogger(); } - /** - * Get logger for subprocess output. - * Always returns a real logger for subprocess mode to show job logs. - * - * @param \Cake\Console\Arguments $args Arguments - * @return \Psr\Log\LoggerInterface - */ - protected function getSubprocessLogger(Arguments $args): LoggerInterface - { - $loggerName = (string)$args->getOption('logger'); - $logger = Log::engine($loggerName); - - return $logger ?? new NullLogger(); - } - /** * Creates and returns a Processor object * @@ -214,6 +200,11 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface // Check subprocess mode before instantiating processor if ($args->getOption('subprocess') || ($config['subprocess']['enabled'] ?? false)) { + if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) { + $io->error('Subprocess mode is only supported with the default Processor class'); + $this->abort(); + } + $subprocessConfig = array_merge( $config['subprocess'] ?? [], [ @@ -222,13 +213,7 @@ protected function getProcessor(Arguments $args, ConsoleIo $io, LoggerInterface ], ); - if ($processorClass !== Processor::class && !is_subclass_of($processorClass, Processor::class)) { - $io->error('Subprocess mode is only supported with the default Processor class'); - $this->abort(); - } - - // Use a real logger for subprocess output so logs are visible - $subprocessLogger = $this->getSubprocessLogger($args); + $subprocessLogger = $this->getLogger($args, forceLogger: true); $processor = new SubprocessProcessor($subprocessLogger, $subprocessConfig, $this->container); } else { $processor = new $processorClass($logger, $this->container); From 106562b10bc0a5d663db31cc209ab77e9acb191a Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Thu, 18 Dec 2025 14:11:10 +0100 Subject: [PATCH 6/7] test: add tests with code coverage bump in mind --- .../SubprocessJobRunnerCommandTest.php | 177 ++++++++++++++++++ .../Queue/SubprocessProcessorTest.php | 116 ++++++++++++ 2 files changed, 293 insertions(+) diff --git a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php index a020738..35fb1e6 100644 --- a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php +++ b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php @@ -19,11 +19,15 @@ use Cake\Console\Arguments; use Cake\Console\ConsoleIo; +use Cake\Core\ContainerInterface; use Cake\Queue\Command\SubprocessJobRunnerCommand; use Cake\TestSuite\TestCase; use Enqueue\Null\NullMessage; use Interop\Queue\Processor; +use Psr\Log\LoggerInterface; use ReflectionClass; +use RuntimeException; +use stdClass; use TestApp\Job\MultilineLogJob; use TestApp\TestProcessor; @@ -316,4 +320,177 @@ public function testLogsRedirectedToStderr(): void $this->assertStringContainsString('Processing step 3 completed', $stderr); $this->assertStringContainsString('Job execution finished', $stderr); } + + /** + * Test defaultName method + */ + public function testDefaultName(): void + { + $this->assertSame('queue subprocess-runner', SubprocessJobRunnerCommand::defaultName()); + } + + /** + * Test executeJob with invalid message class (non-existent) + */ + public function testExecuteJobWithInvalidMessageClass(): void + { + $jobData = [ + 'messageClass' => 'NonExistentClass', + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Invalid message class'); + + $method->invoke($command, $jobData); + } + + /** + * Test executeJob with non-QueueMessage class + */ + public function testExecuteJobWithNonQueueMessageClass(): void + { + $jobData = [ + 'messageClass' => stdClass::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('Invalid message class'); + + $method->invoke($command, $jobData); + } + + /** + * Test configureLogging with fallback to NullLogger + */ + public function testConfigureLoggingConfiguresStderrLogger(): void + { + $jobData = ['logger' => 'stderr']; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('configureLogging'); + + $logger = $method->invoke($command, $jobData); + + $this->assertInstanceOf(LoggerInterface::class, $logger); + } + + /** + * Test outputResult with valid JSON + */ + public function testOutputResultWithValidData(): void + { + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('outputResult'); + + $io = $this->createMock(ConsoleIo::class); + $io->expects($this->once()) + ->method('out') + ->with('{"success":true,"result":"ack"}'); + + $method->invoke($command, $io, ['success' => true, 'result' => 'ack']); + } + + /** + * Test outputResult with data that cannot be JSON encoded + */ + public function testOutputResultWithInvalidJsonData(): void + { + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('outputResult'); + + $io = $this->createMock(ConsoleIo::class); + $io->expects($this->never()) + ->method('out'); + + // Create data with a resource which cannot be JSON encoded + $resource = fopen('php://memory', 'r'); + $this->assertIsResource($resource); + $method->invoke($command, $io, ['resource' => $resource]); + if (is_resource($resource)) { + fclose($resource); + } + } + + /** + * Test readInput with multiple chunks + */ + public function testReadInputWithLargeData(): void + { + // We can't easily mock STDIN, so we'll verify the method exists and is protected + // Large data reading is already covered by the integration test (testLogsRedirectedToStderr) + $reflection = new ReflectionClass(SubprocessJobRunnerCommand::class); + $this->assertTrue($reflection->hasMethod('readInput')); + + $method = $reflection->getMethod('readInput'); + $this->assertTrue($method->isProtected()); + } + + /** + * Test constructor with container + */ + public function testConstructorWithContainer(): void + { + $container = $this->createStub(ContainerInterface::class); + $command = new SubprocessJobRunnerCommand($container); + + $this->assertInstanceOf(SubprocessJobRunnerCommand::class, $command); + } + + /** + * Test constructor without container + */ + public function testConstructorWithoutContainer(): void + { + $command = new SubprocessJobRunnerCommand(); + + $this->assertInstanceOf(SubprocessJobRunnerCommand::class, $command); + } + + /** + * Test executeJob when message body json_encode fails + */ + public function testExecuteJobWithJsonEncodeFailure(): void + { + // PHP's json_encode can fail with certain data (like invalid UTF-8) + // However, in this code path json_encode is called on $data['body'] which is already decoded + // So this edge case is hard to trigger. We'll test normal flow is covered. + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ], + 'properties' => [], + ]; + + $command = new SubprocessJobRunnerCommand(); + $reflection = new ReflectionClass($command); + $method = $reflection->getMethod('executeJob'); + + $result = $method->invoke($command, $jobData); + + // Verify it successfully encodes and processes + $this->assertIsString($result); + } } diff --git a/tests/TestCase/Queue/SubprocessProcessorTest.php b/tests/TestCase/Queue/SubprocessProcessorTest.php index 1cd6f07..8dca13a 100644 --- a/tests/TestCase/Queue/SubprocessProcessorTest.php +++ b/tests/TestCase/Queue/SubprocessProcessorTest.php @@ -502,4 +502,120 @@ public function testProcessJobInSubprocessWithException(): void /** @phpstan-ignore cast.string */ $this->assertStringContainsString('requeue', (string)$result); } + + /** + * Test subprocess with maxOutputSize limit exceeded + */ + public function testSubprocessMaxOutputSizeExceeded(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "echo str_repeat(\'a\', 10000);"', + 'maxOutputSize' => 100, // Very small limit + 'timeout' => 5, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('output exceeded maximum size', $result['error']); + } + + /** + * Test subprocess with maxOutputSize limit on stderr + */ + public function testSubprocessMaxErrorOutputSizeExceeded(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "fwrite(STDERR, str_repeat(\'e\', 10000));"', + 'maxOutputSize' => 100, // Very small limit + 'timeout' => 5, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('error output exceeded maximum size', $result['error']); + } + + /** + * Test subprocess handles normal sized output correctly + */ + public function testSubprocessWithNormalOutputSize(): void + { + $messageBody = [ + 'class' => [TestProcessor::class, 'processReturnAck'], + 'args' => [], + ]; + $queueMessage = new NullMessage(json_encode($messageBody) ?: ''); + + $logger = new ArrayLog(); + $config = [ + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'maxOutputSize' => 1048576, // 1MB - normal size + 'timeout' => 30, + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + $prepareMethod = $reflection->getMethod('prepareJobData'); + + $jobData = $prepareMethod->invoke($processor, $queueMessage); + $result = $method->invoke($processor, $jobData); + + $this->assertTrue($result['success']); + $this->assertSame(InteropProcessor::ACK, $result['result']); + } + + /** + * Test executeInSubprocess with very short timeout + */ + public function testSubprocessWithVeryShortTimeout(): void + { + $logger = new ArrayLog(); + $config = [ + 'command' => 'php -r "sleep(5);"', + 'timeout' => 1, // 1 second timeout + ]; + $processor = new SubprocessProcessor($logger, $config); + + $reflection = new ReflectionClass($processor); + $method = $reflection->getMethod('executeInSubprocess'); + + $jobData = [ + 'messageClass' => NullMessage::class, + 'body' => ['class' => [TestProcessor::class, 'processReturnAck'], 'args' => []], + 'properties' => [], + ]; + + $result = $method->invoke($processor, $jobData); + + $this->assertFalse($result['success']); + $this->assertArrayHasKey('error', $result); + $this->assertStringContainsString('timeout', $result['error']); + } } From 1320cf6a50415de8dee3b59ff0159eded855d2ee Mon Sep 17 00:00:00 2001 From: Jasper Smet Date: Thu, 18 Dec 2025 15:43:14 +0100 Subject: [PATCH 7/7] process review feedback --- src/Command/SubprocessJobRunnerCommand.php | 14 ++++++++------ src/Queue/SubprocessProcessor.php | 8 ++++---- src/QueuePlugin.php | 2 +- .../Command/SubprocessJobRunnerCommandTest.php | 4 ++-- tests/TestCase/Command/WorkerCommandTest.php | 4 ++-- .../TestCase/Queue/SubprocessProcessorTest.php | 18 +++++++++--------- tests/bootstrap.php | 2 +- tests/test_app/src/Application.php | 2 +- 8 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/Command/SubprocessJobRunnerCommand.php b/src/Command/SubprocessJobRunnerCommand.php index 149fd71..53e2814 100644 --- a/src/Command/SubprocessJobRunnerCommand.php +++ b/src/Command/SubprocessJobRunnerCommand.php @@ -11,7 +11,7 @@ * * @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/) * @link https://cakephp.org CakePHP(tm) Project - * @since 0.1.0 + * @since 2.2.0 * @license https://opensource.org/licenses/MIT MIT License */ namespace Cake\Queue\Command; @@ -28,6 +28,7 @@ use Enqueue\Null\NullMessage; use Interop\Queue\Message as QueueMessage; use Interop\Queue\Processor as InteropProcessor; +use JsonException; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; use RuntimeException; @@ -54,7 +55,7 @@ public function __construct( */ public static function defaultName(): string { - return 'queue subprocess-runner'; + return 'queue subprocess_runner'; } /** @@ -68,7 +69,7 @@ public function execute(Arguments $args, ConsoleIo $io): int { $input = $this->readInput($io); - if (empty($input)) { + if ($input === '') { $this->outputResult($io, [ 'success' => false, 'error' => 'No input received', @@ -77,11 +78,12 @@ public function execute(Arguments $args, ConsoleIo $io): int return self::CODE_ERROR; } - $data = json_decode($input, true); - if (json_last_error() !== JSON_ERROR_NONE) { + try { + $data = json_decode($input, true, 512, JSON_THROW_ON_ERROR); + } catch (JsonException $jsonException) { $this->outputResult($io, [ 'success' => false, - 'error' => 'Invalid JSON input: ' . json_last_error_msg(), + 'error' => 'Invalid JSON input: ' . $jsonException->getMessage(), ]); return self::CODE_ERROR; diff --git a/src/Queue/SubprocessProcessor.php b/src/Queue/SubprocessProcessor.php index f9127c1..494fc50 100644 --- a/src/Queue/SubprocessProcessor.php +++ b/src/Queue/SubprocessProcessor.php @@ -11,7 +11,7 @@ * * @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org/) * @link https://cakephp.org CakePHP(tm) Project - * @since 0.1.0 + * @since 2.2.0 * @license https://opensource.org/licenses/MIT MIT License */ namespace Cake\Queue\Queue; @@ -30,7 +30,7 @@ * to be reloaded without restarting the worker. * * Configuration options: - * - `command`: Full command to execute (default: 'php bin/cake.php queue subprocess-runner') + * - `command`: Full command to execute (default: 'php bin/cake.php queue subprocess_runner') * - `timeout`: Maximum execution time in seconds (default: 300) * - `maxOutputSize`: Maximum output size in bytes (default: 1048576 = 1MB) * @@ -39,7 +39,7 @@ * 'Queue' => [ * 'default' => [ * 'subprocess' => [ - * 'command' => 'php bin/cake.php queue subprocess-runner', + * 'command' => 'php bin/cake.php queue subprocess_runner', * 'timeout' => 60, * 'maxOutputSize' => 2097152, // 2MB * ], @@ -134,7 +134,7 @@ protected function prepareJobData(QueueMessage $message): array */ protected function executeInSubprocess(array $jobData): array { - $command = $this->config['command'] ?? 'php bin/cake.php queue subprocess-runner'; + $command = $this->config['command'] ?? 'php bin/cake.php queue subprocess_runner'; $timeout = $this->config['timeout'] ?? 300; $descriptors = [ diff --git a/src/QueuePlugin.php b/src/QueuePlugin.php index 8be616d..874e38a 100644 --- a/src/QueuePlugin.php +++ b/src/QueuePlugin.php @@ -81,7 +81,7 @@ public function console(CommandCollection $commands): CommandCollection return $commands ->add('queue worker', WorkerCommand::class) ->add('worker', WorkerCommand::class) - ->add('queue subprocess-runner', SubprocessJobRunnerCommand::class) + ->add('queue subprocess_runner', SubprocessJobRunnerCommand::class) ->add('queue requeue', RequeueCommand::class) ->add('queue purge_failed', PurgeFailedCommand::class); } diff --git a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php index 35fb1e6..7ecb5c2 100644 --- a/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php +++ b/tests/TestCase/Command/SubprocessJobRunnerCommandTest.php @@ -273,7 +273,7 @@ public function testLogsRedirectedToStderr(): void 'logger' => 'debug', ]; - $command = 'php ' . ROOT . 'bin/cake.php queue subprocess-runner'; + $command = 'php ' . ROOT . 'bin/cake.php queue subprocess_runner'; $descriptors = [ 0 => ['pipe', 'r'], @@ -326,7 +326,7 @@ public function testLogsRedirectedToStderr(): void */ public function testDefaultName(): void { - $this->assertSame('queue subprocess-runner', SubprocessJobRunnerCommand::defaultName()); + $this->assertSame('queue subprocess_runner', SubprocessJobRunnerCommand::defaultName()); } /** diff --git a/tests/TestCase/Command/WorkerCommandTest.php b/tests/TestCase/Command/WorkerCommandTest.php index cbee63a..9a29e82 100644 --- a/tests/TestCase/Command/WorkerCommandTest.php +++ b/tests/TestCase/Command/WorkerCommandTest.php @@ -450,7 +450,7 @@ public function testQueueProcessesJobWithSubprocessFlag() 'url' => 'file:///' . TMP . DS . 'queue', 'receiveTimeout' => 100, 'subprocess' => [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ], ]; Configure::write('Queue', ['default' => $config]); @@ -484,7 +484,7 @@ public function testQueueProcessesJobWithSubprocessConfig() 'subprocess' => [ 'enabled' => true, 'timeout' => 30, - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ], ]; Configure::write('Queue', ['default' => $config]); diff --git a/tests/TestCase/Queue/SubprocessProcessorTest.php b/tests/TestCase/Queue/SubprocessProcessorTest.php index 8dca13a..3f9540f 100644 --- a/tests/TestCase/Queue/SubprocessProcessorTest.php +++ b/tests/TestCase/Queue/SubprocessProcessorTest.php @@ -225,7 +225,7 @@ public function testRealSubprocessExecutionAck(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -253,7 +253,7 @@ public function testRealSubprocessExecutionReject(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -281,7 +281,7 @@ public function testRealSubprocessExecutionWithException(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -309,7 +309,7 @@ public function testSubprocessTimeout(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', 'timeout' => 1, ]; $processor = new SubprocessProcessor($logger, $config); @@ -344,7 +344,7 @@ public function testSubprocessWithInvalidCommand(): void $logger = new ArrayLog(); $config = [ - 'command' => '/nonexistent/binary queue subprocess-runner', + 'command' => '/nonexistent/binary queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -445,7 +445,7 @@ public function testProcessJobInSubprocess(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -468,7 +468,7 @@ public function testProcessJobInSubprocessReject(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -491,7 +491,7 @@ public function testProcessJobInSubprocessWithException(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ]; $processor = new SubprocessProcessor($logger, $config); @@ -574,7 +574,7 @@ public function testSubprocessWithNormalOutputSize(): void $logger = new ArrayLog(); $config = [ - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', 'maxOutputSize' => 1048576, // 1MB - normal size 'timeout' => 30, ]; diff --git a/tests/bootstrap.php b/tests/bootstrap.php index e0f2731..a146dea 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -98,7 +98,7 @@ 'subprocess' => [ 'enabled' => false, 'timeout' => 30, - 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess-runner', + 'command' => 'php ' . ROOT . 'bin/cake.php queue subprocess_runner', ], ], ]); diff --git a/tests/test_app/src/Application.php b/tests/test_app/src/Application.php index 73f8964..30484d3 100644 --- a/tests/test_app/src/Application.php +++ b/tests/test_app/src/Application.php @@ -37,7 +37,7 @@ public function bootstrap(): void 'subprocess' => [ 'enabled' => false, 'timeout' => 30, - 'command' => 'php ' . dirname(__DIR__, 2) . '/bin/cake.php queue subprocess-runner', + 'command' => 'php ' . dirname(__DIR__, 2) . '/bin/cake.php queue subprocess_runner', ], ], ]);