Skip to content

Commit 45c94fd

Browse files
authored
Merge pull request #813 from patchlevel/refresh-subscription
allow to refresh subscriptions
2 parents 905b6c5 + 2211cfa commit 45c94fd

13 files changed

Lines changed: 798 additions & 187 deletions

composer.lock

Lines changed: 205 additions & 181 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/pages/subscription.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,6 +1315,18 @@ foreach ($subscriptions as $subscription) {
13151315
echo $subscription->status()->value;
13161316
}
13171317
```
1318+
### Refresh
1319+
1320+
If you change the metadata of a subscriber in the code (e.g. `runMode`, `group` or `cleanupTasks`),
1321+
you can use the `refresh` method to update the existing subscriptions in the store.
1322+
1323+
```php
1324+
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngine;
1325+
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
1326+
1327+
/** @var SubscriptionEngine $subscriptionEngine */
1328+
$subscriptionEngine->refresh(new SubscriptionEngineCriteria());
1329+
```
13181330
## Learn more
13191331

13201332
* [How to use CLI commands](./cli.md)

phpstan-baseline.neon

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
parameters:
22
ignoreErrors:
33
-
4-
message: '#^Cannot unset offset ''url'' on array\{application_name\?\: string, charset\?\: string, dbname\?\: string, defaultTableOptions\?\: array\<string, mixed\>, driver\?\: ''ibm_db2''\|''mysqli''\|''oci8''\|''pdo_mysql''\|''pdo_oci''\|''pdo_pgsql''\|''pdo_sqlite''\|''pdo_sqlsrv''\|''pgsql''\|''sqlite3''\|''sqlsrv'', driverClass\?\: class\-string\<Doctrine\\DBAL\\Driver\>, driverOptions\?\: array\<mixed\>, host\?\: string, \.\.\.\}\.$#'
4+
message: '#^Cannot unset offset ''url'' on array\{application_name\?\: string, charset\?\: string, defaultTableOptions\?\: array\<string, mixed\>, driver\?\: ''ibm_db2''\|''mysqli''\|''oci8''\|''pdo_mysql''\|''pdo_oci''\|''pdo_pgsql''\|''pdo_sqlite''\|''pdo_sqlsrv''\|''pgsql''\|''sqlite3''\|''sqlsrv'', driverClass\?\: class\-string\<Doctrine\\DBAL\\Driver\>, driverOptions\?\: array\<mixed\>, host\?\: string, keepReplica\?\: bool, \.\.\.\}\.$#'
55
identifier: unset.offset
66
count: 1
77
path: src/Console/DoctrineHelper.php
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Patchlevel\EventSourcing\Console\Command;
6+
7+
use LogicException;
8+
use Patchlevel\EventSourcing\Subscription\Engine\CanRefreshSubscriptions;
9+
use Symfony\Component\Console\Attribute\AsCommand;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Output\OutputInterface;
12+
13+
use function sprintf;
14+
15+
#[AsCommand(
16+
'event-sourcing:subscription:refresh',
17+
'Refresh subscriptions (run-mode, group)',
18+
)]
19+
final class SubscriptionRefreshCommand extends SubscriptionCommand
20+
{
21+
protected function execute(InputInterface $input, OutputInterface $output): int
22+
{
23+
if (!$this->engine instanceof CanRefreshSubscriptions) {
24+
throw new LogicException(sprintf(
25+
'"%s" does not implement "%s" and cannot call refresh.',
26+
$this->engine::class,
27+
CanRefreshSubscriptions::class,
28+
));
29+
}
30+
31+
$criteria = $this->subscriptionEngineCriteria($input);
32+
$this->engine->refresh($criteria);
33+
34+
return 0;
35+
}
36+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Patchlevel\EventSourcing\Subscription\Engine;
6+
7+
interface CanRefreshSubscriptions
8+
{
9+
public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result;
10+
}

src/Subscription/Engine/CatchUpSubscriptionEngine.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44

55
namespace Patchlevel\EventSourcing\Subscription\Engine;
66

7+
use LogicException;
78
use Patchlevel\EventSourcing\Subscription\Subscription;
89

910
use function array_merge;
11+
use function sprintf;
1012

1113
use const PHP_INT_MAX;
1214

13-
final class CatchUpSubscriptionEngine implements SubscriptionEngine
15+
final class CatchUpSubscriptionEngine implements SubscriptionEngine, CanRefreshSubscriptions
1416
{
1517
public function __construct(
1618
private readonly SubscriptionEngine $parent,
@@ -86,6 +88,19 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
8688
return $this->parent->subscriptions($criteria);
8789
}
8890

91+
public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result
92+
{
93+
if (!$this->parent instanceof CanRefreshSubscriptions) {
94+
throw new LogicException(sprintf(
95+
'"%s" does not implement "%s" and cannot call refresh.',
96+
$this->parent::class,
97+
CanRefreshSubscriptions::class,
98+
));
99+
}
100+
101+
return $this->parent->refresh($criteria);
102+
}
103+
89104
private function mergeResult(ProcessedResult ...$results): ProcessedResult
90105
{
91106
$processedMessages = 0;

src/Subscription/Engine/DefaultSubscriptionEngine.php

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
use function count;
3030
use function sprintf;
3131

32-
final class DefaultSubscriptionEngine implements SubscriptionEngine
32+
final class DefaultSubscriptionEngine implements SubscriptionEngine, CanRefreshSubscriptions
3333
{
3434
private SubscriptionManager $subscriptionManager;
3535

@@ -823,6 +823,82 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
823823
);
824824
}
825825

826+
public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result
827+
{
828+
$criteria ??= new SubscriptionEngineCriteria();
829+
830+
$this->discoverNewSubscriptions();
831+
832+
$subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria(
833+
ids: $criteria->ids,
834+
groups: $criteria->groups,
835+
));
836+
837+
foreach ($subscriptions as $subscription) {
838+
$subscriber = $this->subscriber($subscription->id());
839+
840+
if (!$subscriber) {
841+
continue;
842+
}
843+
844+
$changed = false;
845+
846+
if ($subscription->runMode() !== $subscriber->runMode()) {
847+
$changed = true;
848+
$oldRunMode = $subscription->runMode();
849+
$subscription->changeRunMode($subscriber->runMode());
850+
851+
$this->logger?->info(
852+
sprintf(
853+
'Subscription Engine: Subscription "%s" run mode changed from "%s" to "%s".',
854+
$subscription->id(),
855+
$oldRunMode->value,
856+
$subscription->runMode()->value,
857+
),
858+
);
859+
}
860+
861+
if ($subscription->group() !== $subscriber->group()) {
862+
$changed = true;
863+
$oldGroup = $subscription->group();
864+
$subscription->changeGroup($subscriber->group());
865+
866+
$this->logger?->info(
867+
sprintf(
868+
'Subscription Engine: Subscription "%s" group changed from "%s" to "%s".',
869+
$subscription->id(),
870+
$oldGroup,
871+
$subscription->group(),
872+
),
873+
);
874+
}
875+
876+
$cleanupTasks = $this->cleanupTasks($subscriber);
877+
878+
if ($subscription->cleanupTasks() !== $cleanupTasks) {
879+
$changed = true;
880+
$subscription->replaceCleanupTasks($cleanupTasks);
881+
882+
$this->logger?->info(
883+
sprintf(
884+
'Subscription Engine: Subscription "%s" cleanup tasks changed.',
885+
$subscription->id(),
886+
),
887+
);
888+
}
889+
890+
if (!$changed) {
891+
continue;
892+
}
893+
894+
$this->subscriptionManager->update($subscription);
895+
}
896+
897+
$this->subscriptionManager->flush();
898+
899+
return new Result();
900+
}
901+
826902
private function handleMessage(int $index, Message $message, Subscription $subscription): Error|null
827903
{
828904
$subscriber = $this->subscriber($subscription->id());

src/Subscription/Engine/ThrowOnErrorSubscriptionEngine.php

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55
namespace Patchlevel\EventSourcing\Subscription\Engine;
66

7+
use LogicException;
78
use Patchlevel\EventSourcing\Subscription\Subscription;
89

9-
final class ThrowOnErrorSubscriptionEngine implements SubscriptionEngine
10+
use function sprintf;
11+
12+
final class ThrowOnErrorSubscriptionEngine implements SubscriptionEngine, CanRefreshSubscriptions
1013
{
1114
public function __construct(
1215
private readonly SubscriptionEngine $parent,
@@ -54,6 +57,19 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
5457
return $this->parent->subscriptions($criteria);
5558
}
5659

60+
public function refresh(SubscriptionEngineCriteria|null $criteria = null): Result
61+
{
62+
if (!$this->parent instanceof CanRefreshSubscriptions) {
63+
throw new LogicException(sprintf(
64+
'"%s" does not implement "%s" and cannot call refresh.',
65+
$this->parent::class,
66+
CanRefreshSubscriptions::class,
67+
));
68+
}
69+
70+
return $this->throwOnError($this->parent->refresh($criteria));
71+
}
72+
5773
/**
5874
* @param T $result
5975
*

src/Subscription/Subscription.php

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ final class Subscription
1414
/** @param list<object>|null $cleanupTasks */
1515
public function __construct(
1616
private readonly string $id,
17-
private readonly string $group = self::DEFAULT_GROUP,
18-
private readonly RunMode $runMode = RunMode::FromBeginning,
17+
private string $group = self::DEFAULT_GROUP,
18+
private RunMode $runMode = RunMode::FromBeginning,
1919
private Status $status = Status::New,
2020
private int $position = 0,
2121
private SubscriptionError|null $error = null,
@@ -35,11 +35,21 @@ public function group(): string
3535
return $this->group;
3636
}
3737

38+
public function changeGroup(string $group): void
39+
{
40+
$this->group = $group;
41+
}
42+
3843
public function runMode(): RunMode
3944
{
4045
return $this->runMode;
4146
}
4247

48+
public function changeRunMode(RunMode $runMode): void
49+
{
50+
$this->runMode = $runMode;
51+
}
52+
4353
public function status(): Status
4454
{
4555
return $this->status;
@@ -193,6 +203,12 @@ public function updateLastSavedAt(DateTimeImmutable $lastSavedAt): void
193203
$this->lastSavedAt = $lastSavedAt;
194204
}
195205

206+
/** @param list<object>|null $cleanupTask */
207+
public function replaceCleanupTasks(array|null $cleanupTask): void
208+
{
209+
$this->cleanupTasks = $cleanupTask;
210+
}
211+
196212
/** @return list<object>|null */
197213
public function cleanupTasks(): array|null
198214
{

tests/Integration/Subscription/SubscriptionTest.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Patchlevel\EventSourcing\Subscription\Engine\DefaultSubscriptionEngine;
2929
use Patchlevel\EventSourcing\Subscription\Engine\EventFilteredStoreMessageLoader;
3030
use Patchlevel\EventSourcing\Subscription\Engine\GapResolverStoreMessageLoader;
31+
use Patchlevel\EventSourcing\Subscription\Engine\MessageLoader;
3132
use Patchlevel\EventSourcing\Subscription\Engine\StoreMessageLoader;
3233
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionEngineCriteria;
3334
use Patchlevel\EventSourcing\Subscription\RetryStrategy\ClockBasedRetryStrategy;
@@ -1565,6 +1566,72 @@ public function testLookup(): void
15651566
self::assertSame('Hans', $result['name']);
15661567
}
15671568

1569+
public function testRefreshSubscriptions(): void
1570+
{
1571+
$store = new DoctrineDbalStore(
1572+
$this->connection,
1573+
DefaultEventSerializer::createFromPaths([__DIR__ . '/Events']),
1574+
);
1575+
1576+
$clock = new FrozenClock(new DateTimeImmutable('2021-01-01T00:00:00'));
1577+
1578+
$subscriptionStore = new DoctrineSubscriptionStore(
1579+
$this->connection,
1580+
$clock,
1581+
);
1582+
1583+
$schemaDirector = new DoctrineSchemaDirector(
1584+
$this->connection,
1585+
new ChainDoctrineSchemaConfigurator([
1586+
$store,
1587+
$subscriptionStore,
1588+
]),
1589+
);
1590+
1591+
$schemaDirector->create();
1592+
1593+
$subscriber = new #[Subscriber('test', RunMode::FromBeginning, group: 'default')]
1594+
class {
1595+
};
1596+
1597+
$subscriberRepository = new MetadataSubscriberAccessorRepository([$subscriber]);
1598+
1599+
$engine = new DefaultSubscriptionEngine(
1600+
$this->createMock(MessageLoader::class),
1601+
$subscriptionStore,
1602+
$subscriberRepository,
1603+
);
1604+
1605+
$engine->setup();
1606+
1607+
$subscriptions = $engine->subscriptions();
1608+
self::assertCount(1, $subscriptions);
1609+
self::assertEquals('test', $subscriptions[0]->id());
1610+
self::assertEquals('default', $subscriptions[0]->group());
1611+
self::assertEquals(RunMode::FromBeginning, $subscriptions[0]->runMode());
1612+
1613+
// change subscriber metadata
1614+
$newSubscriber = new #[Subscriber('test', RunMode::FromNow, group: 'new-group')]
1615+
class {
1616+
};
1617+
1618+
$newSubscriberRepository = new MetadataSubscriberAccessorRepository([$newSubscriber]);
1619+
1620+
$engine = new DefaultSubscriptionEngine(
1621+
$this->createMock(MessageLoader::class),
1622+
$subscriptionStore,
1623+
$newSubscriberRepository,
1624+
);
1625+
1626+
$engine->refresh();
1627+
1628+
$subscriptions = $engine->subscriptions();
1629+
self::assertCount(1, $subscriptions);
1630+
self::assertEquals('test', $subscriptions[0]->id());
1631+
self::assertEquals('new-group', $subscriptions[0]->group());
1632+
self::assertEquals(RunMode::FromNow, $subscriptions[0]->runMode());
1633+
}
1634+
15681635
/** @param list<Subscription> $subscriptions */
15691636
private static function findSubscription(array $subscriptions, string $id): Subscription
15701637
{

0 commit comments

Comments
 (0)