-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodule.go
More file actions
571 lines (525 loc) · 23.5 KB
/
module.go
File metadata and controls
571 lines (525 loc) · 23.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
// Package eventbus implements the workflow-plugin-eventbus plugin.
// It provides eventbus.broker, eventbus.stream, and eventbus.consumer
// module types plus step and trigger types for durable event-bus integration.
package eventbus
import (
"context"
"fmt"
"os"
"sort"
"strings"
"sync"
"time"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen"
"github.com/GoCodeAlone/workflow-plugin-eventbus/providers"
natsruntime "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/nats"
pgchannelruntime "github.com/GoCodeAlone/workflow-plugin-eventbus/providers/pgchannel"
sdk "github.com/GoCodeAlone/workflow/plugin/external/sdk"
)
// ── cluster registry ──────────────────────────────────────────────────────────
var (
clusterMu sync.RWMutex
clusterRegistry = make(map[string]*eventbusv1.ClusterConfig)
)
// RegisterCluster stores a ClusterConfig in the global registry under instanceName.
func RegisterCluster(instanceName string, cfg *eventbusv1.ClusterConfig) {
clusterMu.Lock()
defer clusterMu.Unlock()
clusterRegistry[instanceName] = cfg
}
// GetCluster looks up a ClusterConfig by instance name.
func GetCluster(instanceName string) (*eventbusv1.ClusterConfig, bool) {
clusterMu.RLock()
defer clusterMu.RUnlock()
cfg, ok := clusterRegistry[instanceName]
return cfg, ok
}
// UnregisterCluster removes a ClusterConfig from the registry.
func UnregisterCluster(instanceName string) {
clusterMu.Lock()
defer clusterMu.Unlock()
delete(clusterRegistry, instanceName)
}
// ── bus URI registry ──────────────────────────────────────────────────────────
// busURIRegistry stores broker connection URIs keyed by module instance name.
// Steps look up the URI here via GetOrDialNATSConn to obtain a live connection.
var (
urlMu sync.RWMutex
busURIRegistry = make(map[string]string)
)
// RegisterBusURI stores a broker URI under instanceName.
func RegisterBusURI(instanceName, uri string) {
urlMu.Lock()
defer urlMu.Unlock()
busURIRegistry[instanceName] = uri
}
// GetBusURI returns the broker URI for instanceName.
func GetBusURI(instanceName string) (string, bool) {
urlMu.RLock()
defer urlMu.RUnlock()
uri, ok := busURIRegistry[instanceName]
return uri, ok
}
// UnregisterBusURI removes the URI entry for instanceName.
func UnregisterBusURI(instanceName string) {
urlMu.Lock()
defer urlMu.Unlock()
delete(busURIRegistry, instanceName)
}
// ── broker instance registry ──────────────────────────────────────────────────
// brokerInstanceRegistry maps broker module instance names to their loaded
// *clusterModule. Stream + consumer modules look up their broker by name via
// LookupRuntime to get the runtime + cached Connection.
//
// Registration happens in `Start`; the degraded-mode nats branch registers
// with nil runtime/conn to preserve the LookupRuntime → "runtime not
// initialized" contract. Removal happens in `Stop`. Lookups before Start
// return "runtime not initialized" with provider-specific remediation
// guidance so callers see a clear lifecycle error rather than a nil deref.
var brokerInstanceRegistry sync.Map // string → *clusterModule
// RegisterBrokerInstance stores m under name. Exported so integration tests
// can pre-seed the registry with a hand-built clusterModule.
func RegisterBrokerInstance(name string, m *clusterModule) {
brokerInstanceRegistry.Store(name, m)
}
// UnregisterBrokerInstance removes the entry for name. Idempotent.
func UnregisterBrokerInstance(name string) { brokerInstanceRegistry.Delete(name) }
// LookupBrokerInstance returns the *clusterModule registered under name, or
// false when absent. Primarily for tests; production callers should use
// LookupRuntime, which also validates that Start has run.
func LookupBrokerInstance(name string) (*clusterModule, bool) {
v, ok := brokerInstanceRegistry.Load(name)
if !ok {
return nil, false
}
return v.(*clusterModule), true
}
// LookupRuntime returns the RuntimeBroker + cached Connection for the named
// broker. Used by stream/consumer modules' Start (Group E) and by step
// factories + the trigger module (Group F) to dispatch through the provider
// abstraction.
//
// Returns an error when:
// - no broker module is registered under name (Init never ran or wrong name);
// - the broker module is registered but Start has not yet completed
// (runtime/conn are still nil).
func LookupRuntime(name string) (providers.RuntimeBroker, providers.Connection, error) {
m, ok := LookupBrokerInstance(name)
if !ok {
return nil, nil, fmt.Errorf("eventbus.broker %q not registered", name)
}
// Read runtime + conn under the read lock so concurrent Start/Stop can't
// flip them mid-read. Copy into locals + release before doing the nil
// checks so the lock window stays minimal.
m.mu.RLock()
rt := m.runtime
conn := m.conn
provider := m.config.GetProvider()
m.mu.RUnlock()
if rt == nil || conn == nil {
// Distinguish "Start hasn't run yet" from "Start ran but skipped
// Connect because no DSN/URI was available" so operators know
// whether to fix module ordering or fix the cluster config.
switch provider {
case "nats":
return nil, nil, fmt.Errorf("eventbus.broker %q (provider=nats): runtime not initialized — Start either has not run yet or skipped Connect because no broker URL was available; set ClusterConfig.dsn, the EVENTBUS_%s_URI env var, or NATS_URL", name, name)
case "pgchannel":
return nil, nil, fmt.Errorf("eventbus.broker %q (provider=pgchannel): runtime not initialized — Start either has not run yet or failed; set ClusterConfig.dsn (Postgres DSN) and verify Start completed without error", name)
default:
return nil, nil, fmt.Errorf("eventbus.broker %q (provider=%q): runtime not initialized — Start either has not run yet or skipped Connect", name, provider)
}
}
return rt, conn, nil
}
// LookupRuntimeWithFallback resolves a RuntimeBroker + Connection given an
// optional brokerRef. When brokerRef is non-empty, behaves like LookupRuntime.
// When brokerRef is empty AND exactly one broker is registered, returns that
// broker (the legacy single-bus fallback that mirrors DefaultBusConn).
//
// Used by step factories + the trigger so configs predating the broker_ref
// field continue to work in single-bus deployments. Returns a clear error
// when the fallback is ambiguous (multiple brokers registered) or impossible
// (no brokers registered), guiding the caller to set broker_ref explicitly.
func LookupRuntimeWithFallback(brokerRef string) (providers.RuntimeBroker, providers.Connection, error) {
if brokerRef != "" {
return LookupRuntime(brokerRef)
}
// Collect all registered broker names so we can pick the single legacy
// match or surface a descriptive ambiguity / not-registered error.
var names []string
brokerInstanceRegistry.Range(func(k, _ any) bool {
if name, ok := k.(string); ok {
names = append(names, name)
}
return true
})
switch len(names) {
case 0:
return nil, nil, fmt.Errorf("eventbus: no broker_ref provided and no broker module registered; add an eventbus.broker module or set broker_ref on the request")
case 1:
return LookupRuntime(names[0])
default:
sort.Strings(names)
return nil, nil, fmt.Errorf("eventbus: no broker_ref provided and multiple brokers are registered (%v); set broker_ref on the request to disambiguate", names)
}
}
// ── NATS connection cache ─────────────────────────────────────────────────────
// natsConnCache holds one live *nats.Conn per bus instance name.
// Connections are created lazily on the first call to GetOrDialNATSConn.
// Module.Stop() closes and evicts the entry via closeNATSConn.
var (
connCacheMu sync.Mutex
natsConnCache = make(map[string]*nats.Conn)
)
// RegisterNATSConn stores a live connection under instanceName. Exported so that
// integration tests and the trigger can pre-populate the cache.
//
// Deprecated: this helper predates the providers.RuntimeBroker abstraction.
// New code should construct an eventbus.broker module + call Init/Start, which
// publishes the runtime + connection through LookupRuntime. Kept for legacy
// callers and the bounded lifecycle teardown path in clusterModule.Stop.
func RegisterNATSConn(instanceName string, conn *nats.Conn) {
connCacheMu.Lock()
defer connCacheMu.Unlock()
natsConnCache[instanceName] = conn
}
// UnregisterNATSConn removes the cached connection entry for instanceName without
// closing the connection. Use this in tests that manage the connection's lifetime
// separately (e.g., via nc.Close() + embedded-server shutdown).
//
// Deprecated: see RegisterNATSConn.
func UnregisterNATSConn(instanceName string) {
connCacheMu.Lock()
defer connCacheMu.Unlock()
delete(natsConnCache, instanceName)
}
// GetNATSConn returns the cached *nats.Conn for instanceName, or false if absent.
//
// Deprecated: see RegisterNATSConn.
func GetNATSConn(instanceName string) (*nats.Conn, bool) {
connCacheMu.Lock()
defer connCacheMu.Unlock()
conn, ok := natsConnCache[instanceName]
return conn, ok
}
// GetOrDialNATSConn returns the cached NATS connection for instanceName, dialing
// a new one (via natsDialFn) if no live connection is cached. Returns an error if
// no URI is registered for instanceName or the dial fails.
//
// Lock ordering: connCacheMu and urlMu (held inside GetBusURI) are never held
// simultaneously. The URI lookup happens between the fast-path unlock and the
// slow-path re-lock so that no nested acquisition is possible.
//
// Deprecated: new code should use LookupRuntime / LookupRuntimeWithFallback,
// which dispatch through providers.RuntimeBroker. Retained for source-compat
// with external consumers that still hold a direct *nats.Conn.
func GetOrDialNATSConn(instanceName string) (*nats.Conn, error) {
// Fast path: return cached live connection without touching urlMu.
connCacheMu.Lock()
conn, cached := natsConnCache[instanceName]
if cached && conn != nil && conn.IsConnected() {
connCacheMu.Unlock()
return conn, nil
}
// Evict stale entry (closed or nil) while we hold the lock.
delete(natsConnCache, instanceName)
connCacheMu.Unlock()
// Slow path: resolve URI with no lock held (avoids connCacheMu→urlMu nesting).
uri, uriOk := GetBusURI(instanceName)
if !uriOk || uri == "" {
key := strings.ToUpper(strings.ReplaceAll(instanceName, "-", "_"))
return nil, fmt.Errorf(
"eventbus.broker: no URI registered for bus %q; set EVENTBUS_%s_URI or NATS_URL",
instanceName, key)
}
// Dial outside any lock — natsDialFn may block for the connection timeout.
nc, err := natsDialFn(uri)
if err != nil {
return nil, fmt.Errorf("eventbus.broker: dial NATS for bus %q at %s: %w", instanceName, uri, err)
}
// Re-acquire to insert; check again for a race where another goroutine dialled first.
connCacheMu.Lock()
defer connCacheMu.Unlock()
if existing, ok := natsConnCache[instanceName]; ok && existing != nil && existing.IsConnected() {
nc.Close() // discard the redundant connection we just dialled
return existing, nil
}
natsConnCache[instanceName] = nc
return nc, nil
}
// closeNATSConn closes the cached connection for instanceName and evicts it from
// the cache. It is idempotent — a missing or nil entry is not an error.
func closeNATSConn(instanceName string) {
connCacheMu.Lock()
defer connCacheMu.Unlock()
if conn, ok := natsConnCache[instanceName]; ok {
if conn != nil {
conn.Close()
}
delete(natsConnCache, instanceName)
}
}
// natsDialFn is the function used to create NATS connections. Tests may replace
// this package-level variable to inject a mock without a real NATS server.
var natsDialFn = func(uri string) (*nats.Conn, error) {
return nats.Connect(uri,
nats.MaxReconnects(-1),
nats.ReconnectWait(2*time.Second),
nats.Timeout(5*time.Second),
)
}
// DefaultBusConn returns a live NATS connection for the lexicographically first
// registered eventbus.broker module. Sorting ensures deterministic selection
// across invocations and concurrent goroutines, even when multiple buses are
// registered. For multi-bus workflows, use GetOrDialNATSConn(instanceName)
// directly.
//
// Deprecated: superseded by LookupRuntimeWithFallback, which routes through
// providers.RuntimeBroker and works across nats / pgchannel / future
// providers. Retained for source-compat with callers that still hold a
// direct *nats.Conn.
func DefaultBusConn() (*nats.Conn, error) {
clusterMu.RLock()
names := make([]string, 0, len(clusterRegistry))
for name := range clusterRegistry {
names = append(names, name)
}
clusterMu.RUnlock()
if len(names) == 0 {
return nil, fmt.Errorf(
"eventbus.broker: no bus module registered; add an eventbus.broker module to your workflow config",
)
}
sort.Strings(names)
return GetOrDialNATSConn(names[0])
}
// ── ClusterModuleFactory (TypedModuleProvider) ────────────────────────────────
// ClusterModuleFactory implements sdk.TypedModuleProvider for the eventbus.broker
// module type. The plugin wires this factory into CreateTypedModule.
type ClusterModuleFactory struct{}
// Compile-time assertion: ClusterModuleFactory implements sdk.TypedModuleProvider.
var _ sdk.TypedModuleProvider = (*ClusterModuleFactory)(nil)
// TypedModuleTypes returns the single module type served by this factory.
func (f *ClusterModuleFactory) TypedModuleTypes() []string {
return []string{"eventbus.broker"}
}
// CreateTypedModule unpacks the typed proto config and delegates to NewClusterModule.
func (f *ClusterModuleFactory) CreateTypedModule(typeName, name string, config *anypb.Any) (sdk.ModuleInstance, error) {
if typeName != "eventbus.broker" {
return nil, fmt.Errorf("%w: module type %q", sdk.ErrTypedContractNotHandled, typeName)
}
var cfg eventbusv1.ClusterConfig
if config != nil {
if err := config.UnmarshalTo(&cfg); err != nil {
return nil, fmt.Errorf("eventbus.broker %q: unmarshal typed config: %w", name, err)
}
}
return NewClusterModule(name, &cfg)
}
// ── clusterModule (ModuleInstance) ───────────────────────────────────────────
// clusterModule implements sdk.ModuleInstance for the eventbus.broker module type.
// It validates the ClusterConfig, registers the config and broker URI on Init(),
// selects a provider runtime + opens a connection on Start(), and tears both
// down on Stop().
//
// The runtime + conn fields are populated by Start() based on config.Provider.
// They are nil before Start runs; LookupRuntime guards against that state so
// callers see a clear lifecycle error rather than a nil deref.
type clusterModule struct {
instanceName string
config *eventbusv1.ClusterConfig
// mu guards runtime + conn so concurrent LookupRuntime callers can't
// observe a torn read while Start/Stop is flipping the pointers. The
// two fields are read together (LookupRuntime returns both), so a
// single RWMutex is simpler and cheaper than two atomic.Pointers
// with a separate consistency story.
mu sync.RWMutex
// runtime is the provider-specific RuntimeBroker selected at Start time.
// nil before Start, nil after Stop. Guarded by mu.
runtime providers.RuntimeBroker
// conn is the live broker Connection opened via runtime.Connect at Start.
// nil before Start, nil after Stop. Guarded by mu.
conn providers.Connection
}
// Compile-time assertion: clusterModule implements sdk.ModuleInstance.
var _ sdk.ModuleInstance = (*clusterModule)(nil)
// NewClusterModule creates a clusterModule from a typed ClusterConfig proto.
//
// Validation is per-provider because the configuration shape diverges:
//
// - pgchannel runs in-process against an existing Postgres database. It
// does not deploy a broker, so deploy_target is meaningless; instead
// broker_target=in_process is required (the only supported mode in
// the in-process runtime) along with cfg.dsn carrying the Postgres
// connection string.
// - nats, kafka, kinesis each deploy a managed/self-hosted broker onto
// a cloud target, so deploy_target is required and must be in the
// supported matrix (providers.ValidateProviderTarget).
//
// Any provider not in the {pgchannel, nats, kafka, kinesis} set is
// rejected here. The previous implementation rejected any empty
// deploy_target uniformly; the relaxation lands as part of design §1.7
// to enable the pg-backed-provider flow.
func NewClusterModule(instanceName string, cfg *eventbusv1.ClusterConfig) (sdk.ModuleInstance, error) {
provider := cfg.GetProvider()
if provider == "" {
return nil, fmt.Errorf("eventbus.broker %q: config.provider is required", instanceName)
}
switch provider {
case "pgchannel":
if cfg.GetBrokerTarget() != "in_process" {
return nil, fmt.Errorf("eventbus.broker %q: pgchannel requires broker_target=in_process (got %q)", instanceName, cfg.GetBrokerTarget())
}
if cfg.GetDsn() == "" {
return nil, fmt.Errorf("eventbus.broker %q: pgchannel requires dsn (Postgres connection string)", instanceName)
}
case "nats", "kafka", "kinesis":
if cfg.GetDeployTarget() == "" {
return nil, fmt.Errorf("eventbus.broker %q: %s requires deploy_target", instanceName, provider)
}
target := providers.DeployTarget(cfg.GetDeployTarget())
if err := providers.ValidateProviderTarget(provider, target); err != nil {
return nil, fmt.Errorf("eventbus.broker %q: %w", instanceName, err)
}
default:
return nil, fmt.Errorf("eventbus.broker %q: unsupported provider %q (supported: pgchannel, nats, kafka, kinesis)", instanceName, provider)
}
return &clusterModule{instanceName: instanceName, config: cfg}, nil
}
// Init registers the cluster config and resolves the broker URI.
//
// URI resolution order:
// 1. EVENTBUS_<UPPERCASE_INSTANCE_NAME>_URI (e.g. EVENTBUS_BMW_EVENTBUS_URI)
// 2. NATS_URL (fallback for the nats provider only)
//
// If neither env var is set the URI is not registered. Steps that need a live
// connection will fail at execution time with a descriptive error. This is
// intentional — the module remains valid for IaC-only (plan/apply) workflows.
func (m *clusterModule) Init() error {
RegisterCluster(m.instanceName, m.config)
// Derive instance-specific env var: dashes → underscores, uppercase.
key := strings.ToUpper(strings.ReplaceAll(m.instanceName, "-", "_"))
uri := os.Getenv("EVENTBUS_" + key + "_URI")
if uri == "" && m.config.GetProvider() == "nats" {
uri = os.Getenv("NATS_URL")
}
if uri != "" {
RegisterBusURI(m.instanceName, uri)
}
return nil
}
// Start selects a provider runtime based on m.config.Provider, opens a
// broker Connection, and registers the module in the broker-instance
// registry so stream/consumer/step callers can resolve the runtime by
// instance name.
//
// Provider selection:
// - "nats" → providers/nats.NewRuntime()
// - "pgchannel" → providers/pgchannel.NewRuntime()
// - anything else → error
//
// DSN resolution for nats:
//
// The runtime expects cfg.Dsn to carry the broker URL. The legacy Init()
// resolves env-var URIs into the busURIRegistry. When provider is "nats"
// and cfg.Dsn is empty we fall back to that registry so legacy
// NATS_URL/EVENTBUS_<NAME>_URI flows keep working. We pass a *clone* of
// cfg to runtime.Connect with the resolved Dsn set on the clone — the
// original m.config pointer (already published via RegisterCluster in
// Init) is never mutated.
//
// Step factories and the trigger module now dispatch through
// LookupRuntimeWithFallback + the RuntimeBroker interface (Group F shipped
// in this PR). The legacy nats.Conn-direct helpers (GetOrDialNATSConn,
// RegisterNATSConn, etc.) remain exported because the nats provider's
// RuntimeBroker implementation reuses them internally — callers outside
// providers/nats should not use them.
func (m *clusterModule) Start(ctx context.Context) error {
provider := m.config.GetProvider()
// Resolve DSN into a local without touching m.config so that callers
// holding the pointer published by Init (e.g. GetCluster) don't observe
// a torn write. We clone m.config before handing it to runtime.Connect.
dsn := m.config.GetDsn()
var rt providers.RuntimeBroker
switch provider {
case "nats":
// Backwards-compat: when Dsn is empty fall back to the URI that
// Init resolved from env vars.
if dsn == "" {
if uri, ok := GetBusURI(m.instanceName); ok && uri != "" {
dsn = uri
}
}
// Degraded mode: when no DSN/URI is available at Start time
// (the IaC-only / plan-apply flow + tests that exercise the missing-
// URI error path) skip Connect and leave runtime/conn nil.
// LookupRuntime returns "runtime not initialized" with provider-
// specific remediation guidance so steps/trigger callers see a clear
// error rather than a nil deref at execution time.
if dsn == "" {
RegisterBrokerInstance(m.instanceName, m)
return nil
}
rt = natsruntime.NewRuntime()
case "pgchannel":
rt = pgchannelruntime.NewRuntime()
default:
return fmt.Errorf("eventbus.broker %q: unsupported provider %q for Start (must be nats|pgchannel)", m.instanceName, provider)
}
// Clone cfg so Connect sees the resolved Dsn without us mutating the
// shared m.config pointer that Init already published via
// RegisterCluster. proto.Clone is the standard way to deep-copy a
// generated proto message.
cfgClone, _ := proto.Clone(m.config).(*eventbusv1.ClusterConfig)
cfgClone.Dsn = dsn
conn, err := rt.Connect(ctx, cfgClone)
if err != nil {
return fmt.Errorf("eventbus.broker %q: connect: %w", m.instanceName, err)
}
// Publish runtime + conn under the write lock so concurrent
// LookupRuntime readers never observe a half-set pair.
m.mu.Lock()
m.runtime = rt
m.conn = conn
m.mu.Unlock()
RegisterBrokerInstance(m.instanceName, m)
return nil
}
// Stop tears down resources in the reverse order Start established them:
// 1. unregister the broker instance so new LookupRuntime calls fail fast;
// 2. close the runtime Connection (best-effort — errors are non-fatal but
// logged via the returned error chain when nothing else fails);
// 3. close + evict any legacy nats.Conn cached for the instance (the
// providers/nats RuntimeBroker reuses this cache internally);
// 4. drop the legacy bus URI + cluster config from the global registries.
//
// Stop is safe to call when Start never ran (runtime/conn nil) — the runtime
// teardown is gated on conn != nil so the lifecycle remains symmetric.
func (m *clusterModule) Stop(_ context.Context) error {
UnregisterBrokerInstance(m.instanceName)
// Snapshot conn under the write lock and clear both fields so any
// concurrent LookupRuntime read sees a clean "runtime not initialized"
// state rather than a runtime paired with a closed conn. The actual Close
// call happens *outside* the lock — Close can be slow (network round
// trip, pgx pool drain) and we don't want to block LookupRuntime
// readers behind it.
m.mu.Lock()
conn := m.conn
m.conn = nil
m.runtime = nil
m.mu.Unlock()
var closeErr error
if conn != nil {
closeErr = conn.Close()
}
closeNATSConn(m.instanceName) // drain + close legacy cached *nats.Conn, idempotent
UnregisterBusURI(m.instanceName)
UnregisterCluster(m.instanceName)
if closeErr != nil {
return fmt.Errorf("eventbus.broker %q: close runtime connection: %w", m.instanceName, closeErr)
}
return nil
}