-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstore_test.go
More file actions
458 lines (382 loc) · 11 KB
/
store_test.go
File metadata and controls
458 lines (382 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
package debugmonitor
import (
"sync"
"testing"
"time"
)
func TestStore_Add(t *testing.T) {
store := NewStore(5)
// Add some records
for i := 1; i <= 3; i++ {
store.Add(map[string]any{"message": "test", "index": i})
}
if store.Len() != 3 {
t.Errorf("Expected 3 records, got %d", store.Len())
}
// Get all records and verify IDs
allData := store.GetSince(0)
if len(allData) != 3 {
t.Errorf("Expected 3 records, got %d", len(allData))
}
// Verify all IDs are unique and in ascending order (Snowflake IDs)
seen := make(map[int64]bool)
var prevID int64 = 0
for _, entry := range allData {
// IDs should be positive
if entry.Id <= 0 {
t.Errorf("Expected positive ID, got %d", entry.Id)
}
// IDs should be unique
if seen[entry.Id] {
t.Errorf("Duplicate ID found: %d", entry.Id)
}
// IDs should be in ascending order
if entry.Id <= prevID {
t.Errorf("IDs not in ascending order: prev=%d, current=%d", prevID, entry.Id)
}
seen[entry.Id] = true
prevID = entry.Id
}
}
// TestStore_Get is removed because Get method is no longer needed.
// Use GetSince to retrieve records by ID range.
func TestStore_MaxRecords(t *testing.T) {
store := NewStore(3)
// Add 5 records (exceeds limit of 3)
for i := 1; i <= 5; i++ {
store.Add(map[string]any{"index": i})
}
// Should only have 3 records
if store.Len() != 3 {
t.Errorf("Expected 3 records, got %d", store.Len())
}
// Get all records and verify only the newest 3 remain
allData := store.GetSince(0)
if len(allData) != 3 {
t.Errorf("Expected 3 records, got %d", len(allData))
}
// Verify the last 3 records have the correct index values (3, 4, 5)
expectedIndexes := []int{3, 4, 5}
for i, entry := range allData {
payloadMap := entry.Payload.(map[string]any)
if payloadMap["index"] != expectedIndexes[i] {
t.Errorf("Expected index %d at position %d, got %v", expectedIndexes[i], i, payloadMap["index"])
}
}
}
func TestStore_GetLatest(t *testing.T) {
store := NewStore(10)
// Add records
for i := 1; i <= 5; i++ {
store.Add(map[string]any{"index": i})
}
// Get all records
all := store.GetLatest()
if len(all) != 5 {
t.Errorf("Expected 5 records, got %d", len(all))
}
// Get all records first to extract IDs for verification
allData := store.GetSince(0)
var ids []int64
for _, entry := range allData {
ids = append(ids, entry.Id)
}
// Should be in reverse chronological order (newest first)
expectedIDs := []int64{ids[4], ids[3], ids[2], ids[1], ids[0]}
for i, entry := range all {
if entry.Id != expectedIDs[i] {
t.Errorf("Expected ID %d at position %d, got %v", expectedIDs[i], i, entry.Id)
}
}
// Test GetLatestWithLimit
latest := store.GetLatestWithLimit(3)
if len(latest) != 3 {
t.Errorf("Expected 3 records, got %d", len(latest))
}
// Should be in reverse chronological order (newest first)
expectedIDsLimited := []int64{ids[4], ids[3], ids[2]}
for i, entry := range latest {
if entry.Id != expectedIDsLimited[i] {
t.Errorf("Expected ID %d at position %d, got %v", expectedIDsLimited[i], i, entry.Id)
}
}
// Request more than available
allWithLimit := store.GetLatestWithLimit(100)
if len(allWithLimit) != 5 {
t.Errorf("Expected 5 records, got %d", len(allWithLimit))
}
}
func TestStore_GetSince(t *testing.T) {
store := NewStore(10)
// Add records
for i := 1; i <= 5; i++ {
store.Add(map[string]any{"index": i})
}
// Get all records first to extract IDs
allData := store.GetSince(0)
var ids []int64
for _, entry := range allData {
ids = append(ids, entry.Id)
}
// Get records since ID 2 (third, fourth, and fifth records)
since := store.GetSince(ids[1])
if len(since) != 3 {
t.Errorf("Expected 3 records, got %d", len(since))
}
// Should be in chronological order
expectedIDs := []int64{ids[2], ids[3], ids[4]}
for i, entry := range since {
if entry.Id != expectedIDs[i] {
t.Errorf("Expected ID %d at position %d, got %v", expectedIDs[i], i, entry.Id)
}
}
// Get since 0 (all records)
since = store.GetSince(0)
if len(since) != 5 {
t.Errorf("Expected 5 records, got %d", len(since))
}
// Get since last ID (no records)
since = store.GetSince(ids[4])
if len(since) != 0 {
t.Errorf("Expected 0 records, got %d", len(since))
}
}
func TestStore_GetSince_WithRemovedID(t *testing.T) {
store := NewStore(3)
// Add 5 records and capture IDs after each addition
var ids []int64
for i := 1; i <= 5; i++ {
store.Add(map[string]any{"index": i})
// Get all records to capture the latest ID
allData := store.GetSince(0)
if len(allData) > 0 {
// Get the last added ID
lastID := allData[len(allData)-1].Id
ids = append(ids, lastID)
}
}
// GetSince with an ID that was removed (first ID)
// Should return all records that exist with ID > first ID (which are the last 3)
since := store.GetSince(ids[0])
if len(since) != 3 {
t.Errorf("Expected 3 records, got %d", len(since))
}
expectedIDs := []int64{ids[2], ids[3], ids[4]}
for i, entry := range since {
if entry.Id != expectedIDs[i] {
t.Errorf("Expected ID %d at position %d, got %v", expectedIDs[i], i, entry.Id)
}
}
// GetSince with an ID that exists (third ID)
// Should return the last 2 records
since = store.GetSince(ids[2])
if len(since) != 2 {
t.Errorf("Expected 2 records, got %d", len(since))
}
expectedIDs = []int64{ids[3], ids[4]}
for i, entry := range since {
if entry.Id != expectedIDs[i] {
t.Errorf("Expected ID %d at position %d, got %v", expectedIDs[i], i, entry.Id)
}
}
}
// TestStore_GetAll is removed because GetAll method is no longer needed.
// Use GetSince(0) to get all records instead.
func TestStore_GetAll_ViaGetSince(t *testing.T) {
store := NewStore(10)
// Add records
for i := 1; i <= 5; i++ {
store.Add(map[string]any{"index": i})
}
// GetSince(0) should return all records
all := store.GetSince(0)
if len(all) != 5 {
t.Errorf("Expected 5 records, got %d", len(all))
}
// Verify all records have valid Snowflake IDs and are in chronological order
var prevID int64 = 0
for i, entry := range all {
// IDs should be positive
if entry.Id <= 0 {
t.Errorf("Expected positive ID at position %d, got %d", i, entry.Id)
}
// IDs should be in ascending order
if entry.Id <= prevID {
t.Errorf("IDs not in chronological order at position %d: prev=%d, current=%d", i, prevID, entry.Id)
}
prevID = entry.Id
}
}
func TestStore_Clear(t *testing.T) {
store := NewStore(10)
// Add records
for i := 1; i <= 5; i++ {
store.Add(map[string]any{"index": i})
}
if store.Len() != 5 {
t.Errorf("Expected 5 records before clear, got %d", store.Len())
}
// Clear the store
store.Clear()
if store.Len() != 0 {
t.Errorf("Expected 0 records after clear, got %d", store.Len())
}
// Verify records are actually gone by checking GetSince(0)
allData := store.GetSince(0)
if len(allData) != 0 {
t.Error("Records should not exist after clear")
}
}
func TestStore_Concurrency(t *testing.T) {
store := NewStore(1000)
const numGoroutines = 50
const recordsPerGoroutine = 20
var wg sync.WaitGroup
// Multiple goroutines adding records concurrently
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(offset int) {
defer wg.Done()
for j := 0; j < recordsPerGoroutine; j++ {
store.Add(map[string]any{"goroutine": offset, "index": j})
}
}(i)
}
wg.Wait()
// Verify all records were added
expectedCount := numGoroutines * recordsPerGoroutine
if store.Len() != expectedCount {
t.Errorf("Expected %d records, got %d", expectedCount, store.Len())
}
// Verify all IDs are unique
allData := store.GetSince(0)
seen := make(map[int64]bool)
for _, entry := range allData {
if seen[entry.Id] {
t.Errorf("Duplicate ID found: %d", entry.Id)
}
seen[entry.Id] = true
}
// Concurrent reads
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = store.GetLatestWithLimit(10)
latest := store.GetLatestWithLimit(1)
if len(latest) > 0 {
_ = store.GetSince(latest[0].Id)
}
_ = store.GetSince(0)
}()
}
wg.Wait()
}
func TestStore_DefaultMaxRecords(t *testing.T) {
// Test with invalid maxRecords
store := NewStore(0)
// Should use default value (1000)
for i := 1; i <= 1001; i++ {
store.Add(map[string]any{"index": i})
}
// Should have default max (1000) records
if store.Len() != 1000 {
t.Errorf("Expected default 1000 records, got %d", store.Len())
}
}
func TestStore_NewAddEvent(t *testing.T) {
store := NewStore(10)
// Create an Add event subscription
event := store.NewAddEvent()
defer event.Close()
// Add a record
testPayload := map[string]any{"message": "test notification"}
store.Add(testPayload)
// Wait for notification
select {
case entry := <-event.C:
if entry.Payload.(map[string]any)["message"] != "test notification" {
t.Errorf("Expected notification with 'test notification', got %v", entry.Payload)
}
if entry.Id <= 0 {
t.Errorf("Expected positive ID in notification, got %d", entry.Id)
}
case <-time.After(1 * time.Second):
t.Error("Timeout waiting for notification")
}
}
func TestStore_MultipleAddSubscribers(t *testing.T) {
store := NewStore(10)
// Create multiple event subscriptions
const numSubscribers = 3
events := make([]*AddEvent, numSubscribers)
for i := 0; i < numSubscribers; i++ {
events[i] = store.NewAddEvent()
defer events[i].Close()
}
// Add a record
testPayload := map[string]any{"index": 1}
store.Add(testPayload)
// All subscribers should receive the notification
for i := 0; i < numSubscribers; i++ {
select {
case entry := <-events[i].C:
if entry.Payload.(map[string]any)["index"] != 1 {
t.Errorf("Subscriber %d: Expected index 1, got %v", i, entry.Payload)
}
case <-time.After(1 * time.Second):
t.Errorf("Subscriber %d did not receive notification", i)
}
}
}
func TestStore_NewClearEvent(t *testing.T) {
store := NewStore(10)
// Create event subscriptions
addEvent := store.NewAddEvent()
defer addEvent.Close()
clearEvent := store.NewClearEvent()
defer clearEvent.Close()
// Add a record first
store.Add(map[string]any{"message": "test"})
// Wait for Add event
select {
case entry := <-addEvent.C:
if entry.Payload.(map[string]any)["message"] != "test" {
t.Errorf("Expected 'test', got %v", entry.Payload)
}
case <-time.After(1 * time.Second):
t.Error("Timeout waiting for Add event")
}
// Clear the store
store.Clear()
// Wait for Clear event
select {
case <-clearEvent.C:
// Expected - clear event received
case <-time.After(1 * time.Second):
t.Error("Timeout waiting for Clear event")
}
// Verify store is actually cleared
if store.Len() != 0 {
t.Errorf("Expected store length 0 after clear, got %d", store.Len())
}
}
func TestStore_EventClose(t *testing.T) {
store := NewStore(10)
// Create an event and close it
event := store.NewAddEvent()
event.Close()
// Add a record - closed event should not receive it
store.Add(map[string]any{"message": "test"})
// Channel should be closed
select {
case _, ok := <-event.C:
if ok {
t.Error("Expected channel to be closed")
}
case <-time.After(100 * time.Millisecond):
t.Error("Expected channel to be closed immediately")
}
// Calling Close again should be safe
event.Close()
}