-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_internal_test.go
More file actions
296 lines (270 loc) · 9.68 KB
/
stream_internal_test.go
File metadata and controls
296 lines (270 loc) · 9.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
// stream_internal_test.go — Start-time EnsureStream dispatch tests. Lives in
// the internal package so we can pre-seed the broker-instance registry with
// hand-built *clusterModule values + the mockRuntime/mockConn defined in
// module_internal_test.go. External-API tests (factory, validation, Init/Stop
// registry round-trip) remain in stream_test.go.
package eventbus
import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
eventbusv1 "github.com/GoCodeAlone/workflow-plugin-eventbus/gen"
"github.com/GoCodeAlone/workflow-plugin-eventbus/providers"
)
// ensureRecordingRuntime records every EnsureStream / EnsureConsumer call so
// the Start tests can assert the dispatch happened exactly once with the
// expected config. Methods not exercised by the tests delegate to mockRuntime
// (returning errors) so accidental dispatch fails loudly.
type ensureRecordingRuntime struct {
mockRuntime
mu sync.Mutex
streamCalls []*eventbusv1.StreamConfig
streamErr error
consumerCalls []consumerCall
consumerErr error
}
type consumerCall struct {
streamName string
cfg *eventbusv1.ConsumerConfig
}
func (r *ensureRecordingRuntime) EnsureStream(_ context.Context, _ providers.Connection, cfg *eventbusv1.StreamConfig) error {
r.mu.Lock()
defer r.mu.Unlock()
r.streamCalls = append(r.streamCalls, cfg)
return r.streamErr
}
func (r *ensureRecordingRuntime) EnsureConsumer(_ context.Context, _ providers.Connection, streamName string, cfg *eventbusv1.ConsumerConfig) error {
r.mu.Lock()
defer r.mu.Unlock()
r.consumerCalls = append(r.consumerCalls, consumerCall{streamName: streamName, cfg: cfg})
return r.consumerErr
}
func (r *ensureRecordingRuntime) snapshotStreamCalls() []*eventbusv1.StreamConfig {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]*eventbusv1.StreamConfig, len(r.streamCalls))
copy(out, r.streamCalls)
return out
}
func (r *ensureRecordingRuntime) snapshotConsumerCalls() []consumerCall {
r.mu.Lock()
defer r.mu.Unlock()
out := make([]consumerCall, len(r.consumerCalls))
copy(out, r.consumerCalls)
return out
}
// seedBroker registers a *clusterModule under brokerName with the given
// runtime + conn already populated, so LookupRuntime succeeds without going
// through the real Start path (which would require a live NATS server or
// Postgres). Returns the cleanup func.
func seedBroker(t *testing.T, brokerName string, rt providers.RuntimeBroker, conn providers.Connection) {
t.Helper()
cm := &clusterModule{
instanceName: brokerName,
runtime: rt,
conn: conn,
}
RegisterBrokerInstance(brokerName, cm)
t.Cleanup(func() { UnregisterBrokerInstance(brokerName) })
}
// ── stream Start tests ────────────────────────────────────────────────────────
func TestStreamModule_StartCallsEnsureStream(t *testing.T) {
rt := &ensureRecordingRuntime{}
seedBroker(t, "broker-stream-ensure", rt, &mockConn{})
cfg := &eventbusv1.StreamConfig{
Name: "BMW_FULFILLMENT",
Subjects: []string{"fulfillment.>"},
BrokerRef: "broker-stream-ensure",
}
m, err := NewStreamModule("stream-ensure", cfg)
if err != nil {
t.Fatalf("create: %v", err)
}
if err := m.Start(context.Background()); err != nil {
t.Fatalf("Start: %v", err)
}
calls := rt.snapshotStreamCalls()
if len(calls) != 1 {
t.Fatalf("EnsureStream calls = %d, want 1", len(calls))
}
if calls[0] != cfg {
t.Errorf("EnsureStream got cfg pointer %p, want %p", calls[0], cfg)
}
}
func TestStreamModule_StartRetries(t *testing.T) {
rt := &ensureRecordingRuntime{}
const brokerName = "broker-stream-retry"
// Register broker after ~200ms so the first few LookupRuntime calls fail
// and the retry loop has to back off + try again. The 10s budget on Start
// gives ample headroom for the ~200ms delay.
go func() {
time.Sleep(200 * time.Millisecond)
cm := &clusterModule{instanceName: brokerName, runtime: rt, conn: &mockConn{}}
RegisterBrokerInstance(brokerName, cm)
}()
t.Cleanup(func() { UnregisterBrokerInstance(brokerName) })
cfg := &eventbusv1.StreamConfig{
Name: "BMW_RETRY",
Subjects: []string{"retry.>"},
BrokerRef: brokerName,
}
m, err := NewStreamModule("stream-retry", cfg)
if err != nil {
t.Fatalf("create: %v", err)
}
start := time.Now()
if err := m.Start(context.Background()); err != nil {
t.Fatalf("Start: %v (elapsed %v)", err, time.Since(start))
}
elapsed := time.Since(start)
if elapsed < 100*time.Millisecond {
t.Errorf("Start returned in %v; expected at least ~100ms while waiting for broker", elapsed)
}
if elapsed > 2*time.Second {
t.Errorf("Start took %v; expected <2s (broker was up after 200ms)", elapsed)
}
if calls := rt.snapshotStreamCalls(); len(calls) != 1 {
t.Errorf("EnsureStream calls = %d, want 1", len(calls))
}
}
func TestStreamModule_StartTimesOut(t *testing.T) {
// Broker is never registered. Use an internal helper to shrink the budget
// so we don't burn 10s in unit tests — drive the Start logic directly with
// retryWithBackoff so the test stays under a second.
cfg := &eventbusv1.StreamConfig{
Name: "BMW_TIMEOUT",
Subjects: []string{"timeout.>"},
BrokerRef: "broker-stream-never-registered",
}
m, err := NewStreamModule("stream-timeout", cfg)
if err != nil {
t.Fatalf("create: %v", err)
}
// We invoke the real Start with a context that we cancel quickly so the
// retry loop exits via ctx instead of waiting the full 10s. Asserting the
// "not available" wrapper still requires the Start to use retryWithBackoff
// + the LookupRuntime path; the ctx-cancelled branch returns ctx.Err which
// the wrapper formats with "not available within 10s: context canceled".
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
err = m.(*streamModule).Start(ctx)
if err == nil {
t.Fatal("expected error when broker never registered")
}
if !strings.Contains(err.Error(), "not available within 10s") {
t.Errorf("error = %q, want substring \"not available within 10s\"", err.Error())
}
}
func TestStreamModule_StartLegacyNoBrokerRef(t *testing.T) {
cfg := &eventbusv1.StreamConfig{
Name: "BMW_LEGACY",
Subjects: []string{"legacy.>"},
// BrokerRef intentionally empty
}
m, err := NewStreamModule("stream-legacy", cfg)
if err != nil {
t.Fatalf("create: %v", err)
}
// Must return immediately without contacting any broker.
start := time.Now()
if err := m.Start(context.Background()); err != nil {
t.Fatalf("Start: %v", err)
}
if elapsed := time.Since(start); elapsed > 10*time.Millisecond {
t.Errorf("legacy Start took %v; expected near-instant return", elapsed)
}
}
func TestStreamModule_StartCtxCancelled(t *testing.T) {
// Broker not registered; ctx is pre-cancelled. Start should return quickly
// without burning the full 10s budget. The first attempt runs (per
// retryWithBackoff contract) and fails with "not registered", then the
// select observes ctx.Done and returns ctx.Err — wrapped by Start as
// "not available within 10s: context canceled".
ctx, cancel := context.WithCancel(context.Background())
cancel()
cfg := &eventbusv1.StreamConfig{
Name: "BMW_CTX",
Subjects: []string{"ctx.>"},
BrokerRef: "broker-stream-ctx-cancelled",
}
m, err := NewStreamModule("stream-ctx-cancelled", cfg)
if err != nil {
t.Fatalf("create: %v", err)
}
start := time.Now()
err = m.Start(ctx)
elapsed := time.Since(start)
if err == nil {
t.Fatal("expected error from cancelled ctx")
}
if !errors.Is(err, context.Canceled) {
t.Errorf("err = %v, want errors.Is(context.Canceled)", err)
}
if elapsed > 100*time.Millisecond {
t.Errorf("ctx-cancelled Start took %v; expected fast return", elapsed)
}
}
// TestStreamModule_StartEnsurePropagatesError verifies that a non-nil
// EnsureStream error bubbles back out of Start with the module-instance prefix.
func TestStreamModule_StartEnsurePropagatesError(t *testing.T) {
sentinel := errors.New("ensure boom")
rt := &ensureRecordingRuntime{streamErr: sentinel}
seedBroker(t, "broker-stream-err", rt, &mockConn{})
cfg := &eventbusv1.StreamConfig{
Name: "BMW_ERR",
Subjects: []string{"err.>"},
BrokerRef: "broker-stream-err",
}
m, err := NewStreamModule("stream-err", cfg)
if err != nil {
t.Fatalf("create: %v", err)
}
err = m.Start(context.Background())
if !errors.Is(err, sentinel) {
t.Fatalf("err = %v, want errors.Is(sentinel)", err)
}
if !strings.Contains(err.Error(), "stream-err") {
t.Errorf("error %q missing instance-name prefix", err.Error())
}
}
// ── concurrency probe — Start across many streams sharing a broker ───────────
// TestStreamModule_StartConcurrent makes sure the retry loop + broker lookup
// pair is safe under -race when many stream modules start concurrently against
// the same broker. Mirrors the real workflow runtime where modular boots
// dozens of modules in parallel goroutines.
func TestStreamModule_StartConcurrent(t *testing.T) {
rt := &ensureRecordingRuntime{}
seedBroker(t, "broker-stream-concurrent", rt, &mockConn{})
const n = 32
var ok atomic.Int32
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
cfg := &eventbusv1.StreamConfig{
Name: "BMW_CONCURRENT",
Subjects: []string{"concurrent.>"},
BrokerRef: "broker-stream-concurrent",
}
m, err := NewStreamModule("stream-concurrent", cfg)
if err != nil {
return
}
if err := m.Start(context.Background()); err == nil {
ok.Add(1)
}
}(i)
}
wg.Wait()
if ok.Load() != int32(n) {
t.Errorf("%d/%d concurrent Starts succeeded", ok.Load(), n)
}
if calls := rt.snapshotStreamCalls(); len(calls) != n {
t.Errorf("EnsureStream calls = %d, want %d", len(calls), n)
}
}