-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodule_internal_test.go
More file actions
205 lines (185 loc) · 7.66 KB
/
module_internal_test.go
File metadata and controls
205 lines (185 loc) · 7.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// module_internal_test.go — tests of the broker-instance registry +
// LookupRuntime that need to construct a *clusterModule directly (the type
// is unexported by design). External-API tests live in module_test.go.
package eventbus
import (
"context"
"errors"
"strings"
"sync"
"testing"
eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen"
"github.com/GoCodeAlone/workflow-plugin-eventbus/providers"
)
// ── mockRuntime / mockConn — minimal providers.RuntimeBroker / Connection
// implementations sufficient for registry-level tests. They never touch a
// real broker; methods that should not be called in a registry test return
// errors so any accidental dispatch fails loudly.
type mockConn struct{ closed bool }
func (m *mockConn) Close() error { m.closed = true; return nil }
func (m *mockConn) Provider() string { return "mock" }
type mockRuntime struct{}
func (mockRuntime) Connect(_ context.Context, _ *eventbusv1.ClusterConfig) (providers.Connection, error) {
return &mockConn{}, nil
}
func (mockRuntime) EnsureStream(_ context.Context, _ providers.Connection, _ *eventbusv1.StreamConfig) error {
return errors.New("mockRuntime: EnsureStream not expected in registry tests")
}
func (mockRuntime) EnsureConsumer(_ context.Context, _ providers.Connection, _ string, _ *eventbusv1.ConsumerConfig) error {
return errors.New("mockRuntime: EnsureConsumer not expected in registry tests")
}
func (mockRuntime) Publish(_ context.Context, _ providers.Connection, _ *eventbusv1.PublishRequest) (*eventbusv1.PublishResponse, error) {
return nil, errors.New("mockRuntime: Publish not expected in registry tests")
}
func (mockRuntime) Subscribe(_ context.Context, _ providers.Connection, _ string, _ string, _ providers.MessageHandler) error {
return errors.New("mockRuntime: Subscribe not expected in registry tests")
}
func (mockRuntime) Consume(_ context.Context, _ providers.Connection, _ string, _ *eventbusv1.ConsumeRequest) (*eventbusv1.ConsumeResponse, error) {
return nil, errors.New("mockRuntime: Consume not expected in registry tests")
}
func (mockRuntime) Ack(_ context.Context, _ providers.Connection, _ string) error {
return errors.New("mockRuntime: Ack not expected in registry tests")
}
// TestBrokerInstanceRegistry_RegisterLookup exercises the Register / Lookup /
// Unregister cycle. After Unregister, Lookup must return (nil, false).
func TestBrokerInstanceRegistry_RegisterLookup(t *testing.T) {
cm := &clusterModule{instanceName: "register-lookup-bus"}
RegisterBrokerInstance(cm.instanceName, cm)
t.Cleanup(func() { UnregisterBrokerInstance(cm.instanceName) })
got, ok := LookupBrokerInstance("register-lookup-bus")
if !ok {
t.Fatal("expected to find broker instance after Register")
}
if got != cm {
t.Errorf("LookupBrokerInstance returned different pointer; got %p, want %p", got, cm)
}
UnregisterBrokerInstance("register-lookup-bus")
if _, ok := LookupBrokerInstance("register-lookup-bus"); ok {
t.Fatal("expected Lookup to return false after Unregister")
}
}
// TestLookupRuntime_NotStarted: a registered module whose Start has not yet
// run (runtime/conn still nil) must surface a "runtime not initialized"
// error rather than returning a nil runtime that callers would dereference.
// The error must also include remediation guidance so operators don't
// misdiagnose it as a module ordering issue when the real cause is a
// missing DSN/URL.
func TestLookupRuntime_NotStarted(t *testing.T) {
cm := &clusterModule{instanceName: "not-started-bus"} // runtime + conn nil
RegisterBrokerInstance(cm.instanceName, cm)
t.Cleanup(func() { UnregisterBrokerInstance(cm.instanceName) })
_, _, err := LookupRuntime("not-started-bus")
if err == nil {
t.Fatal("expected error for not-yet-started broker")
}
if !strings.Contains(err.Error(), "runtime not initialized") {
t.Errorf("error = %q, want substring \"runtime not initialized\"", err.Error())
}
}
// TestClusterModule_StartStopConcurrentLookup is the regression test for the
// runtime/conn race that Group E would have surfaced once stream/consumer
// modules began calling LookupRuntime from their Start hooks. Without the
// sync.RWMutex guard on clusterModule.runtime + conn, the production Stop
// path (which sets both to nil) racing against LookupRuntime would either
// - panic on nil deref inside the caller dereferencing conn, or
// - get caught by `go test -race` as a data race on the field pair.
//
// This test spawns one goroutine repeatedly publishing runtime+conn (the
// "Start" side) and clearing them (the "Stop" side) under the mutex,
// concurrently with 100 reader goroutines each calling LookupRuntime in
// a tight loop. With the mutex, neither -race nor a nil deref should fire.
//
// We drive the field writes directly (not via the real Start/Stop, which
// hard-code provider runtimes) so the test stays hermetic — no Postgres
// container, no live NATS broker. The mutex contract being exercised
// is identical: every writer holds m.mu.Lock; every reader holds
// m.mu.RLock; readers only ever see a fully-set or fully-cleared pair.
func TestClusterModule_StartStopConcurrentLookup(t *testing.T) {
const (
readers = 100
writeCycles = 500
readsPerGoroutine = 1000
)
cm := &clusterModule{instanceName: "start-stop-race-bus"}
RegisterBrokerInstance(cm.instanceName, cm)
t.Cleanup(func() { UnregisterBrokerInstance(cm.instanceName) })
var wg sync.WaitGroup
stop := make(chan struct{})
// Writer goroutine: alternates between "Start" (set fields, register)
// and "Stop" (unregister, clear fields) under the mutex, mirroring the
// production code path exactly.
wg.Add(1)
go func() {
defer wg.Done()
mc := &mockConn{}
mr := mockRuntime{}
for i := 0; i < writeCycles; i++ {
// Start side.
cm.mu.Lock()
cm.runtime = mr
cm.conn = mc
cm.mu.Unlock()
RegisterBrokerInstance(cm.instanceName, cm)
// Stop side: snapshot under lock, clear, then "close" outside.
UnregisterBrokerInstance(cm.instanceName)
cm.mu.Lock()
c := cm.conn
cm.conn = nil
cm.runtime = nil
cm.mu.Unlock()
_ = c // would be c.Close() in production
// Re-register so the next reader iteration has a target.
RegisterBrokerInstance(cm.instanceName, cm)
}
close(stop)
}()
// Reader goroutines: hammer LookupRuntime. Each lookup must either
// return (runtime, conn, nil) with BOTH non-nil, or return an error.
// The forbidden outcome is err==nil with a nil runtime or conn (the
// torn-read symptom of the unguarded version).
for r := 0; r < readers; r++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < readsPerGoroutine; i++ {
select {
case <-stop:
return
default:
}
rt, conn, err := LookupRuntime(cm.instanceName)
if err != nil {
// "runtime not initialized" or "not registered" — both fine.
continue
}
if rt == nil || conn == nil {
t.Errorf("torn read: LookupRuntime returned nil pair without error (rt=%v conn=%v)", rt, conn)
return
}
}
}()
}
wg.Wait()
}
// TestLookupRuntime_Success: a fully-initialised module (runtime + conn set)
// must return the same runtime + conn pointers passed in.
func TestLookupRuntime_Success(t *testing.T) {
mc := &mockConn{}
cm := &clusterModule{
instanceName: "lookup-success-bus",
runtime: mockRuntime{},
conn: mc,
}
RegisterBrokerInstance(cm.instanceName, cm)
t.Cleanup(func() { UnregisterBrokerInstance(cm.instanceName) })
rt, conn, err := LookupRuntime("lookup-success-bus")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if _, ok := rt.(mockRuntime); !ok {
t.Errorf("LookupRuntime returned wrong runtime type: %T", rt)
}
if conn != mc {
t.Errorf("LookupRuntime returned wrong conn pointer; got %p, want %p", conn, mc)
}
}