-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlazywritecache_lockfree_test.go
More file actions
1075 lines (917 loc) · 42.5 KB
/
lazywritecache_lockfree_test.go
File metadata and controls
1075 lines (917 loc) · 42.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// MIT License
//
// Copyright (c) LF0LF3 Seth Osher
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package lazywritercache
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"
)
// testItemLF is a basic implementation of CacheableLF for testing purposes.
type testItemLF[K comparable] struct {
id K
}
// Key returns the identifier for the testItemLF.
func (i testItemLF[K]) Key() K {
return i.id
}
// CopyKeyDataFrom copies the key from another CacheableLF item.
// In a real scenario, this would copy database-managed fields.
func (i testItemLF[K]) CopyKeyDataFrom(from CacheableLF[K]) CacheableLF[K] {
i.id = from.Key()
return i
}
// String returns the string representation of the testItemLF's id.
func (i testItemLF[K]) String() string {
return fmt.Sprintf("%v", i.id)
}
// newtestItemLF creates a new testItemLF with the given key.
func newtestItemLF[K comparable](key K) testItemLF[K] {
return testItemLF[K]{
id: key,
}
}
// newNoOpTestConfigLF creates a default configuration for testing,
// using a NoOpReaderWriterLF and disabling periodic writes and purges.
func newNoOpTestConfigLF[K comparable]() ConfigLF[K, testItemLF[K]] {
readerWriter := NewNoOpReaderWriterLF[K, testItemLF[K]](newtestItemLF)
return ConfigLF[K, testItemLF[K]]{
Handler: readerWriter,
Limit: 1000,
LookupOnMiss: false,
WriteFreq: 0,
PurgeFreq: 0,
}
}
// TestCacheStoreLoadLF tests the basic Save and Load functionality.
// It verifies that items saved to the cache can be retrieved correctly,
// and that loading a missing item returns false.
func TestCacheStoreLoadLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cache := NewLazyWriterCacheLF[string, testItemLF[string]](newNoOpTestConfigLF[string]())
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
item3, ok := cache.Load(ctx, "test1")
assert.Truef(t, ok, "loaded test")
assert.Equal(t, item, item3)
item4, ok := cache.Load(ctx, "testLF")
assert.Truef(t, ok, "loaded testLF")
assert.Equal(t, itemLF, item4)
_, ok = cache.Load(ctx, "missing")
assert.Falsef(t, ok, "not loaded missing")
}
// TestCacheDirtyListLF tests the management of the dirty items list.
// It checks that saving items marks them as dirty, and re-saving an
// already dirty item doesn't change the dirty count.
func TestCacheDirtyListLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test11"}
itemLF := testItemLF[string]{id: "testLFLF"}
cache := NewLazyWriterCacheLF[string, testItemLF[string]](newNoOpTestConfigLF[string]())
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "dirty records")
_, ok := cache.dirty.Load(item.Key())
assert.True(t, ok, "item should be in dirty list")
_, ok = cache.dirty.Load(itemLF.Key())
assert.True(t, ok, "itemLF should be in dirty list")
cache.Save(itemLF) // Save again
assert.Equal(t, 2, cache.dirty.Size(), "dirty records count should not change on re-save")
_, ok = cache.dirty.Load(itemLF.Key())
assert.True(t, ok, "itemLF should still be in dirty list after re-save")
// Use ctx to avoid unused variable warning
_, _ = cache.Load(ctx, "nonexistent")
}
// TestCacheLockUnlockNoPanicsLF ensures that basic cache operations
// like Load and Save do not cause panics, even when items are missing.
func TestCacheLockUnlockNoPanicsLF(t *testing.T) {
ctx := context.Background()
cache := NewLazyWriterCacheLF(newNoOpTestConfigLF[string]())
defer cache.Shutdown()
assert.NotPanics(t, func() {
cache.Load(ctx, "missing")
}, "get and Unlock")
assert.NotPanics(t, func() {
item := testItemLF[string]{id: "test"}
cache.Load(ctx, "missing")
cache.Save(item)
}, "get and Save")
}
// BenchmarkCacheWriteMax20kLF benchmarks cache write performance with a cache size of 20,000.
func BenchmarkCacheWriteMax20kLF(b *testing.B) {
cacheWriteLF(b, 20000)
}
// BenchmarkCacheWriteMax100kLF benchmarks cache write performance with a cache size of 100,000.
func BenchmarkCacheWriteMax100kLF(b *testing.B) {
cacheWriteLF(b, 100000)
}
// BenchmarkCacheRead20kLF benchmarks cache read performance with a cache size of 20,000.
func BenchmarkCacheRead20kLF(b *testing.B) {
cacheReadLF(b, 20000)
}
// BenchmarkCacheRead100kLF benchmarks cache read performance with a cache size of 100,000.
func BenchmarkCacheRead100kLF(b *testing.B) {
cacheReadLF(b, 100000)
}
// BenchmarkParallel_x5_CacheRead20kLF benchmarks parallel cache read performance
// with a cache size of 20,000 and 5 concurrent threads.
func BenchmarkParallel_x5_CacheRead20kLF(b *testing.B) {
cacheSize := 20000
nThreads := 5
parallelRunLF(b, cacheSize, nThreads)
}
// BenchmarkParallel_x10_CacheRead20kLF benchmarks parallel cache read performance
// with a cache size of 20,000 and 10 concurrent threads.
func BenchmarkParallel_x10_CacheRead20kLF(b *testing.B) {
cacheSize := 20000
nThreads := 10
parallelRunLF(b, cacheSize, nThreads)
}
// BenchmarkParallel_x20_CacheRead20kLF benchmarks parallel cache read performance
// with a cache size of 20,000 and 20 concurrent threads.
func BenchmarkParallel_x20_CacheRead20kLF(b *testing.B) {
cacheSize := 20000
nThreads := 20
parallelRunLF(b, cacheSize, nThreads)
}
// parallelRunLF is a helper function for benchmarking parallel cache reads.
// It populates the cache and then simulates nThreads concurrently reading random keys.
func parallelRunLF(b *testing.B, cacheSize int, nThreads int) {
ctx := context.Background()
cache := NewLazyWriterCacheLF(newNoOpTestConfigLF[string]())
defer cache.Shutdown()
var keys []string
for i := 0; i < cacheSize; i++ {
id := strconv.Itoa(i % cacheSize)
keys = append(keys, id)
item := testItemLF[string]{id: id}
cache.Save(item)
}
var wait sync.WaitGroup
for i := 0; i < nThreads; i++ {
wait.Add(1)
go func() {
defer wait.Done()
for i := 0; i < b.N; i++ {
key := rand.Intn(cacheSize)
_, ok := cache.Load(ctx, keys[key])
if ok {
}
}
}()
}
wait.Wait()
b.ReportAllocs()
}
// cacheWriteLF is a helper function for benchmarking cache write operations.
// It repeatedly saves items to the cache, cycling through keys up to cacheSize.
func cacheWriteLF(b *testing.B, cacheSize int) {
cache := NewLazyWriterCacheLF(newNoOpTestConfigLF[string]())
defer cache.Shutdown()
for i := 0; i < b.N; i++ {
id := strconv.Itoa(i % cacheSize)
item := testItemLF[string]{id: id}
cache.Save(item)
}
b.ReportAllocs()
}
// cacheReadLF is a helper function for benchmarking cache read operations.
// It first populates the cache and then repeatedly reads random keys.
func cacheReadLF(b *testing.B, cacheSize int) {
// init
ctx := context.Background()
cache := NewLazyWriterCacheLF(newNoOpTestConfigLF[string]())
defer cache.Shutdown()
var keys []string
for i := 0; i < cacheSize; i++ {
id := strconv.Itoa(i % cacheSize)
keys = append(keys, id)
item := testItemLF[string]{id: id}
cache.Save(item)
}
k := 0
for i := 0; i < b.N; i++ {
key := rand.Intn(cacheSize)
_, ok := cache.Load(ctx, keys[key])
if ok {
k++
}
}
assert.Truef(b, k > 0, "critical failure")
b.ReportAllocs()
}
// TestCacheEvictionLF tests the cache eviction mechanism.
// It populates the cache beyond its limit, then triggers eviction
// and verifies that the cache size is reduced to the limit,
// evicting items in FIFO order (oldest items first, after they are no longer dirty).
func TestCacheEvictionLF(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.Limit = 20
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
for i := 0; i < 30; i++ {
id := strconv.Itoa(i)
item := testItemLF[string]{id: id}
cache.Save(item)
}
assert.Equal(t, 30, cache.cache.Size())
cache.evictionProcessor(ctx)
assert.Equal(t, 30, cache.cache.Size(), "nothing evicted until flushed") // Dirty items are not evicted
cache.Flush(ctx) // Mark items as not dirty
cache.evictionProcessor(ctx) // Now eviction should occur
assert.Equal(t, 20, cache.cache.Size())
// Check which items were evicted and which remain, assuming FIFO after flush
// Items 0-9 were added first, then flushed, then eviction ran.
// The eviction queue (fifo) will have items 0 through 29.
// When evictionProcessor runs, it dequeues from the front.
// Since all are non-dirty, it will evict until size is 20.
// evictionProcessor() will evict items 0 through 9. Items 10 through 29 plus should remain.
_, ok := cache.cache.Load("0")
assert.Falsef(t, ok, "0 has been evicted")
_, ok = cache.cache.Load("1")
assert.Falsef(t, ok, "1 has been evicted")
_, ok = cache.cache.Load("9")
assert.Falsef(t, ok, "9 has been evicted")
_, ok = cache.cache.Load("10")
assert.True(t, ok, "10 has not been evicted")
_, ok = cache.cache.Load("11")
assert.Truef(t, ok, "11 has not been evicted")
_, ok = cache.cache.Load("15")
assert.Truef(t, ok, "15 has not been evicted")
_, ok = cache.cache.Load("29")
assert.Truef(t, ok, "29 should not have been evicted")
}
// TestCacheEvictionEmptyFIFOLF tests the scenario where the FIFO queue is empty
// but the cache still has items. This represents an incorrect state in the underlying cache.
func TestCacheEvictionEmptyFIFOLF(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.Limit = 5 // Set a small limit
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
// Add items to the cache directly without using the FIFO queue
for i := 0; i < 10; i++ {
id := strconv.Itoa(i)
item := testItemLF[string]{id: id}
// Use direct access to cache map to bypass the normal Save method
// which would add items to the FIFO queue
cache.cache.Store(id, item)
}
// Verify the cache has items but FIFO is empty
assert.Equal(t, 10, cache.cache.Size(), "Cache should have 10 items")
_, fifoOk := cache.fifo.Peek()
assert.False(t, fifoOk, "FIFO queue should be empty")
// Run the eviction processor
cache.evictionProcessor(ctx)
// Verify the cache size remains the same since eviction can't proceed with empty FIFO
assert.Equal(t, 10, cache.cache.Size(), "Cache size should remain unchanged when FIFO is empty")
}
// TestCacheEvictionDirtyItemNotInCacheLF tests the scenario where an item is marked as dirty
// but is not in the cache because it has been deleted while dirty.
func TestCacheEvictionDirtyItemNotInCacheLF(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.Limit = 5 // Set a small limit
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
// Add an item to the cache and mark it as dirty
testKey := "test-key"
item := testItemLF[string]{id: testKey}
cache.Save(item)
// Verify the item is in the cache and marked as dirty
_, inCache := cache.cache.Load(testKey)
assert.True(t, inCache, "Item should be in the cache")
isDirty, inDirty := cache.dirty.Load(testKey)
assert.True(t, inDirty, "Item should be in the dirty list")
assert.True(t, isDirty, "Item should be marked as dirty")
// Remove the item from the cache but leave it in the dirty list
cache.cache.Delete(testKey)
// Verify the item is not in the cache but still in the dirty list
_, inCache = cache.cache.Load(testKey)
assert.False(t, inCache, "Item should not be in the cache")
isDirty, inDirty = cache.dirty.Load(testKey)
assert.True(t, inDirty, "Item should still be in the dirty list")
assert.True(t, isDirty, "Item should still be marked as dirty")
// Make sure the cache size is above the limit to trigger eviction
for i := 0; i < 10; i++ {
id := strconv.Itoa(i)
item := testItemLF[string]{id: id}
cache.cache.Store(id, item)
}
// Ensure our test key is at the head of the FIFO queue
// First, empty the queue
for {
_, ok := cache.fifo.Dequeue()
if !ok {
break
}
}
// Then add our test key as the only item
cache.fifo.Enqueue(testKey)
// Run the eviction processor
cache.evictionProcessor(ctx)
// Verify the item has been removed from the dirty list
_, inDirty = cache.dirty.Load(testKey)
assert.False(t, inDirty, "Item should have been removed from the dirty list")
}
// TestCacheEvictionRaceConditionLF tests the race condition between two eviction processors
// where one evicts the head between peek and removing the item.
func TestCacheEvictionRaceConditionLF(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.Limit = 5 // Set a small limit
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
// Add items to the cache
for i := 0; i < 10; i++ {
id := strconv.Itoa(i)
item := testItemLF[string]{id: id}
cache.Save(item)
}
// Flush to make items non-dirty
cache.Flush(ctx)
// Simulate a race condition by manually manipulating the FIFO queue
// First, peek at the head of the queue (this would be done by one eviction processor)
keyToEvict, ok := cache.fifo.Peek()
assert.True(t, ok, "Should be able to peek at the head of the queue")
// Now simulate another eviction processor dequeuing the head
evictedKey, ok := cache.fifo.Dequeue()
assert.True(t, ok, "Should be able to dequeue the head")
assert.Equal(t, keyToEvict, evictedKey, "The dequeued key should match the peeked key")
// Now run the eviction processor, which will try to dequeue the same key
// but will find a different key at the head
cache.evictionProcessor(ctx)
// Verify the cache size has been reduced
assert.LessOrEqual(t, cache.cache.Size(), cfg.Limit, "Cache size should be at or below the limit after eviction")
}
// Test_GetAndReleaseLF is a simple test for Save and Load, similar to TestCacheStoreLoadLF.
// It ensures items can be saved and then retrieved.
func Test_GetAndReleaseLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cache := NewLazyWriterCacheLF(newNoOpTestConfigLF[string]())
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
item3, ok := cache.Load(ctx, "test1")
assert.Truef(t, ok, "loaded test")
assert.Equal(t, item, item3)
}
// Test_GetAndReleaseWithForcedPanicLF tests the behavior when LookupOnMiss is true
// and the underlying data Handler (NoOpReaderWriterLF) is configured to panic.
// It verifies that a call to Load results in a panic.
func Test_GetAndReleaseWithForcedPanicLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.LookupOnMiss = true // Enable lookup on miss for this test
cache.Save(item)
cache.Save(itemLF)
item3, ok := cache.Load(ctx, "test1")
assert.Truef(t, ok, "loaded test")
assert.Equal(t, item, item3)
// Configure the mock Handler to panic on the next Find operation
cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]]).panicOnNext.Store(true)
assert.Panics(t, func() {
_, ok := cache.Load(ctx, "test4_non_existent_to_trigger_find") // Key doesn't exist, so Find will be called
assert.Falsef(t, ok, "should not be found or panic should prevent reaching this")
})
assert.Equal(t, int64(1), cache.Misses.Load(), "1 miss expected before panic")
}
// TestCacheStats_JSONLF tests the JSON serialization of cache statistics.
// It verifies that the cache stats can be marshaled to JSON and then
// unmarshaled back into a map, checking for the presence and initial value of 'hits'.
func TestCacheStats_JSONLF(t *testing.T) {
cache := NewLazyWriterCacheLF(newNoOpTestConfigLF[string]())
defer cache.Shutdown()
jsonStr := cache.JSON()
stats := make(map[string]int64)
err := json.Unmarshal([]byte(jsonStr), &stats)
assert.Nil(t, err, "json parses")
hits, ok := stats["hits"]
assert.Truef(t, ok, "found in map")
assert.Equal(t, int64(0), hits)
}
// TestRangeLF tests the Range method for iterating over all items in the cache.
// It verifies that the provided action function is called for each item.
func TestRangeLF(t *testing.T) {
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
item3 := testItemLF[string]{id: "test3"}
cache := NewLazyWriterCacheLF[string, testItemLF[string]](newNoOpTestConfigLF[string]())
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
cache.Save(item3)
n := 0
cache.Range(func(k string, v testItemLF[string]) bool {
n++
return true // Continue iteration
})
assert.Equal(t, 3, n, "iterated over all cache items")
}
// TestRangeAbortLF tests the ability to abort iteration in the Range method.
// It verifies that if the action function returns false, the iteration stops.
func TestRangeAbortLF(t *testing.T) {
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
item3 := testItemLF[string]{id: "test3"}
cache := NewLazyWriterCacheLF[string, testItemLF[string]](newNoOpTestConfigLF[string]())
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
cache.Save(item3)
n := 0
cache.Range(func(k string, v testItemLF[string]) bool {
n++
if n == 2 {
return false // Stop iteration
}
return true
})
assert.Equal(t, 2, n, "iterated over all cache items")
}
// TestNoGoroutineLeaksLF uses goleak to verify that no goroutines are leaked
// after creating a cache with a context, saving an item, and then shutting it down.
func TestNoGoroutineLeaksLF(t *testing.T) {
defer goleak.VerifyNone(t) // Verifies no goroutines leaked at the end of the test
ctx, cancel := context.WithCancel(context.Background())
cache := NewLazyWriterCacheWithContextLF[string, testItemLF[string]](ctx, newNoOpTestConfigLF[string]())
cache.Save(testItemLF[string]{id: "test"})
time.Sleep(100 * time.Millisecond) // Allow time for any goroutines to start
cancel() // Signal goroutines to stop
cache.Shutdown() // Explicitly shutdown
time.Sleep(100 * time.Millisecond) // Allow time for goroutines to exit
}
// TestNewDefaultConfigLF tests the NewDefaultConfigLF constructor.
// It verifies that the returned configuration has the expected default values
// for Limit, LookupOnMiss, WriteFreq, PurgeFreq, and FlushOnShutdown.
func TestNewDefaultConfigLF(t *testing.T) {
handler := NewNoOpReaderWriterLF[string, testItemLF[string]](newtestItemLF)
config := NewDefaultConfigLF[string, testItemLF[string]](handler)
// Verify default values
assert.NotNil(t, config.Handler, "Handler should not be nil")
assert.Equal(t, 10000, config.Limit, "Default limit should be 10000")
assert.True(t, config.LookupOnMiss, "LookupOnMiss should be true by default")
assert.Equal(t, 500*time.Millisecond, config.WriteFreq, "Default WriteFreq should be 500ms")
assert.Equal(t, 10*time.Second, config.PurgeFreq, "Default PurgeFreq should be 10s")
assert.False(t, config.FlushOnShutdown, "FlushOnShutdown should be false by default")
}
// TestEmptyCacheableLF tests the EmptyCacheableLF placeholder type.
// It verifies that its Key method returns an empty string and
// CopyKeyDataFrom returns the input item unchanged.
func TestEmptyCacheableLF(t *testing.T) {
empty := EmptyCacheableLF{}
// Test Key method
key := empty.Key()
assert.Equal(t, "", key, "EmptyCacheableLF.Key should return empty string")
// Test CopyKeyDataFrom method
item := testItemLF[string]{id: "test"}
result := empty.CopyKeyDataFrom(item)
assert.Equal(t, item, result, "EmptyCacheableLF.CopyKeyDataFrom should return the input item")
}
// TestClearDirtyLF tests the ClearDirty method.
// It verifies that after saving items (making them dirty),
// calling ClearDirty empties the dirty list.
func TestClearDirtyLF(t *testing.T) {
item := testItemLF[string]{id: "test1"}
item2 := testItemLF[string]{id: "test2"}
cache := NewLazyWriterCacheLF[string, testItemLF[string]](newNoOpTestConfigLF[string]())
defer cache.Shutdown()
// Add items to the cache and make them dirty
cache.Save(item)
cache.Save(item2)
assert.Equal(t, 2, cache.dirty.Size(), "Should have 2 dirty items")
// Test ClearDirty
cache.ClearDirty()
assert.Equal(t, 0, cache.dirty.Size(), "Dirty list should be empty after ClearDirty")
}
// TestPeriodicSaveLF tests the periodic lazy writer functionality.
// It configures a short WriteFreq, saves items, and verifies that
// the items are eventually written (dirty list becomes empty) and
// the DirtyWrites counter is incremented.
func TestPeriodicSaveLF(t *testing.T) {
// Create a cache with a short write frequency
cfg := newNoOpTestConfigLF[string]()
cfg.WriteFreq = 50 * time.Millisecond
cache := NewLazyWriterCacheLF[string, testItemLF[string]](cfg)
defer cache.Shutdown()
// Add items to the cache
cache.Save(testItemLF[string]{id: "test1"})
cache.Save(testItemLF[string]{id: "test2"})
// Verify items are marked as dirty
assert.Equal(t, 2, cache.dirty.Size(), "Should have 2 dirty items")
// Wait for the lazy writer to process the dirty items
assert.Eventuallyf(t, func() bool {
return !cache.IsDirty()
}, 200*time.Millisecond, 10*time.Millisecond, "Cache should not be dirty after save")
// Verify dirty items were processed
assert.Equal(t, 0, cache.dirty.Size(), "Dirty list should be empty after lazy writer runs")
assert.GreaterOrEqual(t, cache.DirtyWrites.Load(), int64(1), "DirtyWrites counter should be incremented")
}
// TestPeriodicEvictionsLF tests the periodic eviction manager.
// It sets a small cache Limit and short PurgeFreq, adds more items
// than the limit, and verifies that the cache size is eventually
// reduced to the limit and the Evictions counter is incremented.
func TestPeriodicEvictionsLF(t *testing.T) {
// Create a cache with a small limit and short purge frequency
cfg := newNoOpTestConfigLF[string]()
cfg.Limit = 5
cfg.PurgeFreq = 50 * time.Millisecond
cfg.WriteFreq = 10 * time.Millisecond // Need to flush dirty items for eviction to work
cache := NewLazyWriterCacheLF[string, testItemLF[string]](cfg)
defer cache.Shutdown()
// Add more items than the limit
for i := 0; i < 10; i++ {
cache.Save(testItemLF[string]{id: strconv.Itoa(i)})
}
// Verify all items are in the cache initially
assert.Equal(t, 10, cache.cache.Size(), "Should have 10 items in cache initially")
// Wait for eviction manager to run multiple times and for items to become non-dirty
assert.Eventuallyf(t, func() bool {
// Items must be non-dirty to be evicted. The periodic writer handles this.
return cache.cache.Size() <= cfg.Limit && cache.dirty.Size() == 0
}, 500*time.Millisecond, 10*time.Millisecond, "Cache size should be at or below the limit after eviction and items non-dirty")
// Verify cache size is now at or below the limit
assert.LessOrEqual(t, cache.cache.Size(), cfg.Limit, "Cache size should be at or below the limit after eviction")
assert.Greater(t, cache.Evictions.Load(), int64(0), "Evictions counter should be incremented")
}
// TestFlushOnShutdownLF tests the FlushOnShutdown functionality.
// It enables FlushOnShutdown, saves items (making them dirty),
// then cancels the cache's context (simulating shutdown) and verifies
// that the dirty items are flushed.
func TestFlushOnShutdownLF(t *testing.T) {
// Create a cache with FlushOnShutdown enabled
cfg := newNoOpTestConfigLF[string]()
cfg.WriteFreq = 1 * time.Hour // Long enough that it won't trigger during the test
cfg.FlushOnShutdown = true
ctx, cancel := context.WithCancel(context.Background())
cache := NewLazyWriterCacheWithContextLF[string, testItemLF[string]](ctx, cfg)
// Add items to the cache
cache.Save(testItemLF[string]{id: "test1"})
cache.Save(testItemLF[string]{id: "test2"})
// Verify items are marked as dirty
assert.Equal(t, 2, cache.dirty.Size(), "Should have 2 dirty items")
// Cancel the context to trigger shutdown
cancel()
// Wait a bit for the shutdown to complete and flush to occur
assert.Eventuallyf(t, func() bool {
return cache.dirty.Size() == 0
}, 200*time.Millisecond, 10*time.Millisecond, "Cache should be empty after shutdown with FlushOnShutdown=true")
// Verify dirty items were processed during shutdown
assert.Equal(t, 0, cache.dirty.Size(), "Dirty list should be empty after shutdown with FlushOnShutdown=true")
assert.GreaterOrEqual(t, cache.DirtyWrites.Load(), int64(1), "DirtyWrites counter should be incremented")
}
// TestRequeueRecoverableErrLF tests that items are re-queued (remain dirty)
// when a recoverable error (e.g., "save deadlock") occurs during a Save operation in Flush.
func TestRequeueRecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("save deadlock") // Simulate recoverable error
cache.Flush(ctx)
assert.Equal(t, int64(0), testHandler.warnCount.Load(), "No warnings for recoverable save error (it's an Info)")
assert.Equal(t, int64(3), testHandler.infoCount.Load(), "Info for 'Found N dirty', 'Recoverable error saving...', 'Transaction rolled back...'")
assert.Equal(t, 2, cache.dirty.Size(), "2 items should still be dirty and in the cache for retry")
}
// TestRequeueSkipsNonRecoverableErrLF tests that items are NOT re-queued (one is removed from dirty)
// when a non-recoverable error (e.g., "save duplicate key") occurs during a Save operation in Flush.
// The transaction is rolled back, but the failing item is not marked for retry.
func TestRequeueSkipsNonRecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("save duplicate key") // Simulate non-recoverable error
cache.Flush(ctx)
assert.Equal(t, int64(1), testHandler.warnCount.Load(), "Warning for unrecoverable save error")
assert.Equal(t, int64(2), testHandler.infoCount.Load(), "Info for 'Found N dirty' and 'Transaction rolled back...'")
assert.Equal(t, 1, cache.dirty.Size(), "1 item should remain dirty (the one that didn't encounter the error)")
}
// TestRequeueCommitRecoverableErrLF tests that items are re-queued (remain dirty)
// when a recoverable error (e.g., "commit deadlock") occurs during the CommitTx operation in Flush.
func TestRequeueCommitRecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("commit deadlock") // Simulate recoverable commit error
cache.Flush(ctx)
assert.Equal(t, int64(2), testHandler.warnCount.Load(), "Warnings for 'Recoverable error from CommitTx' and 'Error committing transaction'")
assert.Equal(t, 2, cache.dirty.Size(), "2 items should still be dirty for retry")
}
// TestRequeueCommitSkipsNonRecoverableErrLF tests that items are NOT re-queued (dirty list becomes empty)
// when a non-recoverable error (e.g., "commit duplicate key") occurs during CommitTx in Flush.
// The items are considered lost from the dirty perspective.
func TestRequeueCommitSkipsNonRecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("commit duplicate key") // Simulate non-recoverable commit error
cache.Flush(ctx)
assert.Equal(t, int64(2), testHandler.warnCount.Load(), "Warnings for 'Unrecoverable error from CommitTx' and 'Error committing transaction'")
assert.Equal(t, 0, cache.dirty.Size(), "0 items should be dirty as commit failed unrecoverably")
}
// TestRequeueBeginRecoverableErrLF tests that items remain dirty and the batch is retried
// when a recoverable error occurs during BeginTx.
func TestRequeueBeginRecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("begin db bad connection") // Simulate recoverable BeginTx error
cache.Flush(ctx)
assert.Equal(t, int64(2), testHandler.infoCount.Load(), "Info for 'Found N dirty' and 'Recoverable error with BeginTx'")
assert.Equal(t, int64(0), testHandler.warnCount.Load(), "No warnings as BeginTx error is Info")
assert.Equal(t, 2, cache.dirty.Size(), "2 items should still be dirty for retry")
}
// TestRequeueRollbackRecoverableErrLF tests that items are re-queued (remain dirty)
// when a recoverable error occurs during Save, followed by another recoverable error during RollbackTx.
func TestRequeueRollbackRecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("save deadlock,rollback deadlock") // Recoverable save, then recoverable rollback
cache.Flush(ctx)
assert.Equal(t, "", testHandler.errorOnNext.Load(), "both errors handled")
assert.Equal(t, 2, cache.dirty.Size(), "2 items should still be dirty for retry")
assert.Equal(t, int64(0), testHandler.warnCount.Load(), "No warnings as all errors were Info-level recoverable")
assert.Equal(t, int64(4), testHandler.infoCount.Load(), "Info for 'Found N dirty', 'Recoverable save', 'Error rolling back (recoverable)', 'Transaction rolled back'")
}
// TestRequeueRollbackUnrecoverableErrLF tests behavior when a recoverable Save error
// is followed by an unrecoverable RollbackTx error.
// Items are not re-queued because the unrecoverable rollback implies the batch is aborted.
func TestRequeueRollbackUnrecoverableErrLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("save deadlock,rollback failed") // Recoverable save, then unrecoverable rollback
cache.Flush(ctx)
assert.Equal(t, "", testHandler.errorOnNext.Load(), "both errors handled")
// The logic `fail += len(unCommitted)` and `return` means items are not re-added to dirty.
assert.Equal(t, 1, cache.dirty.Size(), "1 items should be dirty after unrecoverable rollback aborts batch")
assert.Equal(t, int64(1), testHandler.warnCount.Load(), "Warning for unrecoverable rollback")
assert.Equal(t, int64(2), testHandler.infoCount.Load(), "Info for 'Found N dirty' and 'Recoverable save'")
}
// TestPanicHandlerLF tests the panic recovery mechanism within saveDirtyToDB.
// It simulates a panic after a recoverable save error and verifies that
// a warning is logged and the cache doesn't crash.
// One item (the one causing the panic during its processing) is lost from dirty.
func TestPanicHandlerLF(t *testing.T) {
ctx := context.Background()
item := testItemLF[string]{id: "test1"}
itemLF := testItemLF[string]{id: "testLF"}
cfg := newNoOpTestConfigLF[string]()
testHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF(cfg)
defer cache.Shutdown()
cache.Save(item)
cache.Save(itemLF)
assert.Equal(t, 2, cache.dirty.Size(), "2 items should be in the cache")
testHandler.errorOnNext.Store("save deadlock,panic") // Recoverable save, then panic during next item or rollback
cache.Flush(ctx)
assert.Equal(t, "", testHandler.errorOnNext.Load(), "both errors handled (or panic consumed the second part)")
// The panic occurs within the c.dirty.Range. The item causing the panic (or the one being processed)
// might have already been deleted from dirty optimistically.
// The transaction will be rolled back due to the panic (if it happens before commit/rollback call)
// or the panic Handler itself. Items in unCommitted might not be re-added.
// The exact dirty count depends on when the panic happens relative to dirty.Delete and unCommitted append.
// Given the NoOp mock, the panic is likely in RollbackTx.
// If panic in RollbackTx: unCommitted items are not re-added.
assert.Equal(t, 1, cache.dirty.Size(), "1 items should be dirty after panic during rollback")
assert.Equal(t, int64(1), testHandler.warnCount.Load(), "Warning for panic in lazy write")
}
// TestSaveDirtyToDB_AllowConcurrentWrites tests that multiple Flush calls can proceed
// concurrently when AllowConcurrentWrites is true.
// It uses goroutines to call Flush simultaneously and checks that the dirty list is cleared.
func TestSaveDirtyToDB_AllowConcurrentWrites(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.AllowConcurrentWrites = true // Explicitly enable
cfg.WriteFreq = 0 // Disable periodic writer for manual flush
mockHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF[string, testItemLF[string]](cfg)
defer cache.Shutdown()
cache.Save(newtestItemLF("item1"))
cache.Save(newtestItemLF("item2"))
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
cache.Flush(ctx) // Call Flush concurrently
}()
go func() {
defer wg.Done()
cache.Flush(ctx) // Call Flush concurrently
}()
wg.Wait()
// With AllowConcurrentWrites = true, both flushes might attempt to write.
// The NoOpReaderWriterLF doesn't have internal state to show distinct writes easily,
// but we can check that items were processed (became non-dirty).
// A more sophisticated mock could track concurrent access.
assert.Equal(t, 0, cache.dirty.Size(), "Dirty list should be empty")
// We expect DirtyWrites to be 2, but due to the nature of NoOp and potential race in clearing dirty,
// it might be 2 or more if the test runs fast enough for multiple calls to saveDirtyToDB to interleave
// before dirty list is fully cleared by one.
// For this test, primarily ensure no deadlock and items are cleared.
assert.GreaterOrEqual(t, mockHandler.infoCount.Load(), int64(1), "At least one flush attempt should log info")
}
// TestSaveDirtyToDB_DisallowConcurrentWrites tests that if a write is already in progress
// (simulated by setting cache.writing to true), subsequent Flush calls are skipped
// when AllowConcurrentWrites is false.
func TestSaveDirtyToDB_DisallowConcurrentWrites(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.AllowConcurrentWrites = false // Explicitly disable
cfg.WriteFreq = 0 // Disable periodic writer
mockHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF[string, testItemLF[string]](cfg)
defer cache.Shutdown()
cache.Save(newtestItemLF("item1"))
// Simulate one write already in progress by setting the atomic bool
cache.writing.Store(true)
// Attempt another flush, it should return early because writing is true
cache.Flush(ctx)
assert.Equal(t, 1, cache.dirty.Size(), "Item should remain dirty as concurrent write was skipped")
assert.Equal(t, int64(0), mockHandler.infoCount.Load(), "No info should be logged by the skipped flush") // No "Found N dirty"
// Reset the writing flag and flush again
cache.writing.Store(false)
cache.Flush(ctx)
assert.Equal(t, 0, cache.dirty.Size(), "Dirty list should be empty after successful flush")
assert.Equal(t, int64(2), mockHandler.infoCount.Load(), "Info for 'Found N dirty' and 'Completed DB Sync'")
}
// TestSaveDirtyToDB_BeginTx_UnrecoverableError tests that if BeginTx returns an unrecoverable error,
// the batch is aborted, a warning is logged, and items remain dirty.
func TestSaveDirtyToDB_BeginTx_UnrecoverableError(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.WriteFreq = 0 // Disable periodic writer for manual flush
mockHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF[string, testItemLF[string]](cfg)
defer cache.Shutdown()
cache.Save(newtestItemLF("item1"))
assert.Equal(t, 1, cache.dirty.Size(), "Item should be dirty")
// Simulate an unrecoverable error on BeginTx
// The NoOpReaderWriterLF's IsRecoverable doesn't treat "begin unrecoverable" as unrecoverable by default.
// For this test, we'll rely on the logging difference.
// A more robust mock could be made for IsRecoverable.
mockHandler.errorOnNext.Store("begin unrecoverable_db_error") // This error string won't match IsRecoverable's defaults
cache.Flush(ctx)
assert.Equal(t, int64(1), mockHandler.warnCount.Load(), "Warn should be logged for unrecoverable BeginTx error")
assert.Equal(t, int64(1), mockHandler.infoCount.Load(), "Info for 'Found N dirty records' still logs")
assert.Equal(t, 1, cache.dirty.Size(), "Item should remain dirty as BeginTx failed unrecoverably and batch aborted")
}
// TestSaveDirtyToDB_RollbackTx_UnrecoverableError tests the scenario where a save operation
// causes a recoverable error, leading to a transaction rollback, but the RollbackTx itself
// fails with an unrecoverable error. In this case, the batch is aborted, and items are
// not re-added to the dirty list.
func TestSaveDirtyToDB_RollbackTx_UnrecoverableError(t *testing.T) {
ctx := context.Background()
cfg := newNoOpTestConfigLF[string]()
cfg.WriteFreq = 0
mockHandler := cfg.Handler.(NoOpReaderWriterLF[string, testItemLF[string]])
cache := NewLazyWriterCacheLF[string, testItemLF[string]](cfg)
defer cache.Shutdown()
cache.Save(newtestItemLF("itemToFailSave"))
cache.Save(newtestItemLF("itemToSucceedSaveButFailRollback")) // This item won't be processed due to first item's save failure