diff --git a/src/DoctrineOutbox/DoctrineTransactionalMessageRepository.php b/src/DoctrineOutbox/DoctrineTransactionalMessageRepository.php index 2c9a62c..00bf3b6 100644 --- a/src/DoctrineOutbox/DoctrineTransactionalMessageRepository.php +++ b/src/DoctrineOutbox/DoctrineTransactionalMessageRepository.php @@ -4,12 +4,14 @@ use Doctrine\DBAL\Connection; use EventSauce\EventSourcing\AggregateRootId; +use EventSauce\EventSourcing\Header; use EventSauce\EventSourcing\Message; use EventSauce\EventSourcing\MessageRepository; use EventSauce\EventSourcing\PaginationCursor; use EventSauce\EventSourcing\UnableToPersistMessages; use EventSauce\MessageOutbox\OutboxRepository; use Generator; +use Ramsey\Uuid\Uuid; use Throwable; class DoctrineTransactionalMessageRepository implements MessageRepository @@ -23,6 +25,12 @@ public function __construct( public function persist(Message ...$messages): void { try { + $messages = array_map(static function (Message $message) { + return $message->header(Header::EVENT_ID) === null + ? $message->withHeader(Header::EVENT_ID, Uuid::uuid4()->toString()) + : $message; + }, $messages); + $this->connection->beginTransaction(); try { diff --git a/src/DoctrineV2Outbox/DoctrineTransactionalMessageRepository.php b/src/DoctrineV2Outbox/DoctrineTransactionalMessageRepository.php index 1987bcd..62592cf 100644 --- a/src/DoctrineV2Outbox/DoctrineTransactionalMessageRepository.php +++ b/src/DoctrineV2Outbox/DoctrineTransactionalMessageRepository.php @@ -4,12 +4,14 @@ use Doctrine\DBAL\Connection; use EventSauce\EventSourcing\AggregateRootId; +use EventSauce\EventSourcing\Header; use EventSauce\EventSourcing\Message; use EventSauce\EventSourcing\MessageRepository; use EventSauce\EventSourcing\PaginationCursor; use EventSauce\EventSourcing\UnableToPersistMessages; use EventSauce\MessageOutbox\OutboxRepository; use Generator; +use Ramsey\Uuid\Uuid; use Throwable; class DoctrineTransactionalMessageRepository implements MessageRepository @@ -23,6 +25,12 @@ public function __construct( public function persist(Message ...$messages): void { try { + $messages = array_map(static function (Message $message) { + return $message->header(Header::EVENT_ID) === null + ? $message->withHeader(Header::EVENT_ID, Uuid::uuid4()->toString()) + : $message; + }, $messages); + $this->connection->beginTransaction(); try { diff --git a/src/IlluminateOutbox/IlluminateTransactionalMessageRepository.php b/src/IlluminateOutbox/IlluminateTransactionalMessageRepository.php index 1ae9740..554126d 100644 --- a/src/IlluminateOutbox/IlluminateTransactionalMessageRepository.php +++ b/src/IlluminateOutbox/IlluminateTransactionalMessageRepository.php @@ -3,6 +3,7 @@ namespace EventSauce\MessageOutbox\IlluminateOutbox; use EventSauce\EventSourcing\AggregateRootId; +use EventSauce\EventSourcing\Header; use EventSauce\EventSourcing\Message; use EventSauce\EventSourcing\MessageRepository; use EventSauce\EventSourcing\PaginationCursor; @@ -10,6 +11,7 @@ use EventSauce\MessageOutbox\OutboxRepository; use Generator; use Illuminate\Database\ConnectionInterface; +use Ramsey\Uuid\Uuid; use Throwable; class IlluminateTransactionalMessageRepository implements MessageRepository @@ -24,6 +26,12 @@ public function __construct( public function persist(Message ...$messages): void { try { + $messages = array_map(static function (Message $message) { + return $message->header(Header::EVENT_ID) === null + ? $message->withHeader(Header::EVENT_ID, Uuid::uuid4()->toString()) + : $message; + }, $messages); + $this->connection->beginTransaction(); try { diff --git a/src/TestTooling/TransactionalMessageRepositoryTestCase.php b/src/TestTooling/TransactionalMessageRepositoryTestCase.php index db9a845..17ad7d6 100644 --- a/src/TestTooling/TransactionalMessageRepositoryTestCase.php +++ b/src/TestTooling/TransactionalMessageRepositoryTestCase.php @@ -107,6 +107,34 @@ public function messages_are_not_stored_in_the_outbox_when_the_repository_fails( self::assertEquals(0, $outboxRepository->numberOfMessages()); } + /** + * @test + */ + public function messages_are_persisted_with_consistent_ids(): void + { + $messageRepository = $this->messageRepository(); + $outboxRepository = $this->outboxRepository(); + $transactionalRepository = $this->transactionalRepository(); + $message1 = $this->createMessageWithoutId('one', 1); + $message2 = $this->createMessageWithoutId('two', 2); + $message3 = $this->createMessageWithoutId('three', 3); + $message4 = $this->createMessageWithoutId('four', 4); + + $transactionalRepository->persist($message1, $message2, $message3, $message4); + + $messageIds = array_map( + static fn (Message $message) => $message->header(Header::EVENT_ID), + iterator_to_array($messageRepository->retrieveAll($this->aggregateRootId)), + ); + + $messageIdsInOutbox = array_map( + static fn (Message $message) => $message->header(Header::EVENT_ID), + iterator_to_array($outboxRepository->retrieveBatch(10)), + ); + + self::assertSame($messageIds, $messageIdsInOutbox); + } + /** * @test */ @@ -137,4 +165,13 @@ protected function createMessage(string $value, int $version): Message return (new DefaultHeadersDecorator())->decorate($message); } + + protected function createMessageWithoutId(string $value, int $version): Message + { + $message = (new Message(new DummyEvent($value))) + ->withHeader(Header::AGGREGATE_ROOT_ID, $this->aggregateRootId) + ->withHeader(Header::AGGREGATE_ROOT_VERSION, $version); + + return (new DefaultHeadersDecorator())->decorate($message); + } }