|
28 | 28 |
|
29 | 29 | class Consumer |
30 | 30 | { |
31 | | - public readonly Config $config; |
| 31 | + public Config $config; |
32 | 32 |
|
33 | | - public readonly string $name; |
| 33 | + public string $name; |
34 | 34 |
|
35 | | - public readonly string $identifier; |
| 35 | + public string $identifier; |
36 | 36 |
|
37 | | - public readonly ?HealthMonitor $healthMonitor; |
| 37 | + public ?HealthMonitor $healthMonitor = null; |
38 | 38 |
|
39 | | - public readonly ?ServerMutexInterface $serverMutex; |
| 39 | + public ?ServerMutexInterface $serverMutex = null; |
40 | 40 |
|
41 | | - public readonly BinLogCurrentSnapshotInterface $binLogCurrentSnapshot; |
| 41 | + public BinLogCurrentSnapshotInterface $binLogCurrentSnapshot; |
42 | 42 |
|
43 | 43 | private bool $stopped = false; |
44 | 44 |
|
45 | 45 | public function __construct( |
46 | | - protected readonly SubscriberManager $subscriberManager, |
47 | | - protected readonly TriggerManager $triggerManager, |
48 | | - public readonly string $connection = 'default', |
| 46 | + protected SubscriberManager $subscriberManager, |
| 47 | + protected TriggerManager $triggerManager, |
| 48 | + public string $connection = 'default', |
49 | 49 | array $options = [], |
50 | | - public readonly ?LoggerInterface $logger = null |
| 50 | + public ?LoggerInterface $logger = null |
51 | 51 | ) { |
52 | 52 | $this->name = $options['name'] ?? sprintf('trigger.%s', $this->connection); |
53 | 53 | $this->identifier = $options['identifier'] ?? sprintf('trigger.%s', $this->connection); |
54 | 54 | $this->config = new Config($options); |
55 | 55 |
|
56 | 56 | $this->binLogCurrentSnapshot = make(BinLogCurrentSnapshotInterface::class, ['consumer' => $this]); |
57 | | - $this->healthMonitor = $this->config->get('health_monitor.enable', true) ? make(HealthMonitor::class, ['consumer' => $this]) : null; |
58 | | - $this->serverMutex = $this->config->get('server_mutex.enable', true) ? make(ServerMutexInterface::class, [ |
59 | | - 'name' => 'trigger:mutex:' . $this->connection, |
60 | | - 'owner' => Util::getInternalIp(), |
61 | | - 'options' => $this->config->get('server_mutex', []) + ['connection' => $this->connection], |
62 | | - 'logger' => $this->logger, |
63 | | - ]) : null; |
| 57 | + |
| 58 | + if ($this->config->get('health_monitor.enable', true)) { |
| 59 | + $this->healthMonitor = make(HealthMonitor::class, ['consumer' => $this]); |
| 60 | + } |
| 61 | + |
| 62 | + if ($this->config->get('server_mutex.enable', true)) { |
| 63 | + $this->serverMutex = make(ServerMutexInterface::class, [ |
| 64 | + 'connection' => $this->connection, |
| 65 | + 'options' => (array) $this->config->get('server_mutex', []), |
| 66 | + 'owner' => Util::getInternalIp(), |
| 67 | + 'logger' => $this->logger, |
| 68 | + ]); |
| 69 | + } |
64 | 70 | } |
65 | 71 |
|
66 | 72 | public function start(): void |
|
0 commit comments