Skip to content

Commit 123d90b

Browse files
committed
allow to refresh subscriptions
1 parent 52f45dc commit 123d90b

6 files changed

Lines changed: 147 additions & 5 deletions

File tree

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Patchlevel\EventSourcing\Console\Command;
6+
7+
use LogicException;
8+
use Patchlevel\EventSourcing\Subscription\Engine\SubscriptionRefreshable;
9+
use Symfony\Component\Console\Attribute\AsCommand;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Output\OutputInterface;
12+
13+
#[AsCommand(
14+
'event-sourcing:subscription:refresh',
15+
'Refresh subscriptions (run-mode, group)',
16+
)]
17+
final class SubscriptionRefreshCommand extends SubscriptionCommand
18+
{
19+
protected function execute(InputInterface $input, OutputInterface $output): int
20+
{
21+
if (!$this->engine instanceof SubscriptionRefreshable) {
22+
throw new LogicException(sprintf(
23+
'"%s" does not implement "%s" and can therefore not refresh subscriptions.',
24+
$this->engine::class,
25+
SubscriptionRefreshable::class,
26+
));
27+
}
28+
29+
$criteria = $this->subscriptionEngineCriteria($input);
30+
$this->engine->refreshSubscriptions($criteria);
31+
32+
return 0;
33+
}
34+
}

src/Subscription/Engine/CatchUpSubscriptionEngine.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44

55
namespace Patchlevel\EventSourcing\Subscription\Engine;
66

7+
use LogicException;
78
use Patchlevel\EventSourcing\Subscription\Subscription;
89

910
use function array_merge;
1011

1112
use const PHP_INT_MAX;
1213

13-
final class CatchUpSubscriptionEngine implements SubscriptionEngine
14+
final class CatchUpSubscriptionEngine implements SubscriptionEngine, SubscriptionRefreshable
1415
{
1516
public function __construct(
1617
private readonly SubscriptionEngine $parent,
@@ -86,6 +87,19 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
8687
return $this->parent->subscriptions($criteria);
8788
}
8889

90+
public function refreshSubscriptions(SubscriptionEngineCriteria|null $criteria = null): Result
91+
{
92+
if (!$this->parent instanceof SubscriptionRefreshable) {
93+
throw new LogicException(sprintf(
94+
'"%s" does not implement "%s" and can therefore not refresh subscriptions.',
95+
$this->parent::class,
96+
SubscriptionRefreshable::class,
97+
));
98+
}
99+
100+
return $this->parent->refreshSubscriptions($criteria);
101+
}
102+
89103
private function mergeResult(ProcessedResult ...$results): ProcessedResult
90104
{
91105
$processedMessages = 0;

src/Subscription/Engine/DefaultSubscriptionEngine.php

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
use function count;
2828
use function sprintf;
2929

30-
final class DefaultSubscriptionEngine implements SubscriptionEngine
30+
final class DefaultSubscriptionEngine implements SubscriptionEngine, SubscriptionRefreshable
3131
{
3232
private SubscriptionManager $subscriptionManager;
3333

@@ -800,6 +800,68 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
800800
);
801801
}
802802

803+
public function refreshSubscriptions(SubscriptionEngineCriteria|null $criteria = null): Result
804+
{
805+
$criteria ??= new SubscriptionEngineCriteria();
806+
807+
$this->discoverNewSubscriptions();
808+
809+
$subscriptions = $this->subscriptionManager->find(new SubscriptionCriteria(
810+
ids: $criteria->ids,
811+
groups: $criteria->groups,
812+
));
813+
814+
foreach ($subscriptions as $subscription) {
815+
$subscriber = $this->subscriber($subscription->id());
816+
817+
if (!$subscriber) {
818+
continue;
819+
}
820+
821+
$changed = false;
822+
823+
if ($subscription->runMode() !== $subscriber->runMode()) {
824+
$changed = true;
825+
$oldRunMode = $subscription->runMode();
826+
$subscription->changeRunMode($subscriber->runMode());
827+
828+
$this->logger?->info(
829+
sprintf(
830+
'Subscription Engine: Subscription "%s" run mode changed from "%s" to "%s".',
831+
$subscription->id(),
832+
$oldRunMode->value,
833+
$subscription->runMode()->value,
834+
),
835+
);
836+
}
837+
838+
if ($subscription->group() !== $subscriber->group()) {
839+
$changed = true;
840+
$oldGroup = $subscription->group();
841+
$subscription->changeGroup($subscriber->group());
842+
843+
$this->logger?->info(
844+
sprintf(
845+
'Subscription Engine: Subscription "%s" group changed from "%s" to "%s".',
846+
$subscription->id(),
847+
$oldGroup,
848+
$subscription->group(),
849+
),
850+
);
851+
}
852+
853+
if (!$changed) {
854+
continue;
855+
}
856+
857+
$this->subscriptionManager->update($subscription);
858+
}
859+
860+
$this->subscriptionManager->flush();
861+
862+
return new Result();
863+
}
864+
803865
private function handleMessage(int $index, Message $message, Subscription $subscription): Error|null
804866
{
805867
$subscriber = $this->subscriber($subscription->id());
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Patchlevel\EventSourcing\Subscription\Engine;
4+
5+
interface SubscriptionRefreshable
6+
{
7+
public function refreshSubscriptions(SubscriptionEngineCriteria|null $criteria = null): Result;
8+
}

src/Subscription/Engine/ThrowOnErrorSubscriptionEngine.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
namespace Patchlevel\EventSourcing\Subscription\Engine;
66

7+
use LogicException;
78
use Patchlevel\EventSourcing\Subscription\Subscription;
89

9-
final class ThrowOnErrorSubscriptionEngine implements SubscriptionEngine
10+
final class ThrowOnErrorSubscriptionEngine implements SubscriptionEngine, SubscriptionRefreshable
1011
{
1112
public function __construct(
1213
private readonly SubscriptionEngine $parent,
@@ -54,6 +55,19 @@ public function subscriptions(SubscriptionEngineCriteria|null $criteria = null):
5455
return $this->parent->subscriptions($criteria);
5556
}
5657

58+
public function refreshSubscriptions(SubscriptionEngineCriteria|null $criteria = null): Result
59+
{
60+
if (!$this->parent instanceof SubscriptionRefreshable) {
61+
throw new LogicException(sprintf(
62+
'"%s" does not implement "%s" and can therefore not refresh subscriptions.',
63+
$this->parent::class,
64+
SubscriptionRefreshable::class,
65+
));
66+
}
67+
68+
return $this->throwOnError($this->parent->refreshSubscriptions($criteria));
69+
}
70+
5771
/**
5872
* @param T $result
5973
*

src/Subscription/Subscription.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ final class Subscription
1313

1414
public function __construct(
1515
private readonly string $id,
16-
private readonly string $group = self::DEFAULT_GROUP,
17-
private readonly RunMode $runMode = RunMode::FromBeginning,
16+
private string $group = self::DEFAULT_GROUP,
17+
private RunMode $runMode = RunMode::FromBeginning,
1818
private Status $status = Status::New,
1919
private int $position = 0,
2020
private SubscriptionError|null $error = null,
@@ -33,11 +33,21 @@ public function group(): string
3333
return $this->group;
3434
}
3535

36+
public function changeGroup(string $group): void
37+
{
38+
$this->group = $group;
39+
}
40+
3641
public function runMode(): RunMode
3742
{
3843
return $this->runMode;
3944
}
4045

46+
public function changeRunMode(RunMode $runMode): void
47+
{
48+
$this->runMode = $runMode;
49+
}
50+
4151
public function status(): Status
4252
{
4353
return $this->status;

0 commit comments

Comments
 (0)