11using System ;
22using System . Collections . Generic ;
3+ using System . Reflection ;
34using System . Threading . Tasks ;
5+ using BitFaster . Caching . Buffers ;
46using BitFaster . Caching . Lfu ;
57using BitFaster . Caching . Scheduler ;
68using FluentAssertions ;
@@ -12,10 +14,101 @@ namespace BitFaster.Caching.UnitTests.Lfu
1214 [ Collection ( "Soak" ) ]
1315 public class ConcurrentLfuSoakTests
1416 {
17+ private const int iterations = 10 ;
1518 private readonly ITestOutputHelper output ;
1619 public ConcurrentLfuSoakTests ( ITestOutputHelper testOutputHelper )
1720 {
1821 this . output = testOutputHelper ;
22+ }
23+
24+ [ Theory ]
25+ [ Repeat ( iterations ) ]
26+ public async Task WhenConcurrentGetCacheEndsInConsistentState ( int iteration )
27+ {
28+ var scheduler = new BackgroundThreadScheduler ( ) ;
29+ var lfu = new ConcurrentLfuBuilder < int , string > ( ) . WithCapacity ( 9 ) . WithScheduler ( scheduler ) . Build ( ) as ConcurrentLfu < int , string > ;
30+
31+ await Threaded . Run ( 4 , ( ) => {
32+ for ( int i = 0 ; i < 100000 ; i ++ )
33+ {
34+ lfu . GetOrAdd ( i + 1 , i => i . ToString ( ) ) ;
35+ }
36+ } ) ;
37+
38+ this . output . WriteLine ( $ "iteration { iteration } keys={ string . Join ( " " , lfu . Keys ) } ") ;
39+
40+ scheduler . Dispose ( ) ;
41+ await scheduler . Completion ;
42+
43+ RunIntegrityCheck ( lfu ) ;
44+ }
45+
46+ [ Theory ]
47+ [ Repeat ( iterations ) ]
48+ public async Task WhenConcurrentGetAsyncCacheEndsInConsistentState ( int iteration )
49+ {
50+ var scheduler = new BackgroundThreadScheduler ( ) ;
51+ var lfu = new ConcurrentLfuBuilder < int , string > ( ) . WithCapacity ( 9 ) . WithScheduler ( scheduler ) . Build ( ) as ConcurrentLfu < int , string > ;
52+
53+ await Threaded . RunAsync ( 4 , async ( ) => {
54+ for ( int i = 0 ; i < 100000 ; i ++ )
55+ {
56+ await lfu . GetOrAddAsync ( i + 1 , i => Task . FromResult ( i . ToString ( ) ) ) ;
57+ }
58+ } ) ;
59+
60+ this . output . WriteLine ( $ "iteration { iteration } keys={ string . Join ( " " , lfu . Keys ) } ") ;
61+
62+ scheduler . Dispose ( ) ;
63+ await scheduler . Completion ;
64+
65+ RunIntegrityCheck ( lfu ) ;
66+ }
67+
68+ [ Theory ]
69+ [ Repeat ( iterations ) ]
70+ public async Task WhenConcurrentGetWithArgCacheEndsInConsistentState ( int iteration )
71+ {
72+ var scheduler = new BackgroundThreadScheduler ( ) ;
73+ var lfu = new ConcurrentLfuBuilder < int , string > ( ) . WithCapacity ( 9 ) . WithScheduler ( scheduler ) . Build ( ) as ConcurrentLfu < int , string > ;
74+
75+ await Threaded . Run ( 4 , ( ) => {
76+ for ( int i = 0 ; i < 100000 ; i ++ )
77+ {
78+ // use the arg overload
79+ lfu . GetOrAdd ( i + 1 , ( i , s ) => i . ToString ( ) , "Foo" ) ;
80+ }
81+ } ) ;
82+
83+ this . output . WriteLine ( $ "iteration { iteration } keys={ string . Join ( " " , lfu . Keys ) } ") ;
84+
85+ scheduler . Dispose ( ) ;
86+ await scheduler . Completion ;
87+
88+ RunIntegrityCheck ( lfu ) ;
89+ }
90+
91+ [ Theory ]
92+ [ Repeat ( iterations ) ]
93+ public async Task WhenConcurrentGetAsyncWithArgCacheEndsInConsistentState ( int iteration )
94+ {
95+ var scheduler = new BackgroundThreadScheduler ( ) ;
96+ var lfu = new ConcurrentLfuBuilder < int , string > ( ) . WithCapacity ( 9 ) . WithScheduler ( scheduler ) . Build ( ) as ConcurrentLfu < int , string > ;
97+
98+ await Threaded . RunAsync ( 4 , async ( ) => {
99+ for ( int i = 0 ; i < 100000 ; i ++ )
100+ {
101+ // use the arg overload
102+ await lfu . GetOrAddAsync ( i + 1 , ( i , s ) => Task . FromResult ( i . ToString ( ) ) , "Foo" ) ;
103+ }
104+ } ) ;
105+
106+ this . output . WriteLine ( $ "iteration { iteration } keys={ string . Join ( " " , lfu . Keys ) } ") ;
107+
108+ scheduler . Dispose ( ) ;
109+ await scheduler . Completion ;
110+
111+ RunIntegrityCheck ( lfu ) ;
19112 }
20113
21114 [ Fact ]
@@ -45,6 +138,105 @@ await Threaded.Run(threads, i =>
45138 this . output . WriteLine ( $ "Maintenance ops { cache . Scheduler . RunCount } ") ;
46139
47140 cache . Metrics . Value . Misses . Should ( ) . Be ( iterations * threads ) ;
141+ RunIntegrityCheck ( cache ) ;
142+ }
143+
144+ private void RunIntegrityCheck < K , V > ( ConcurrentLfu < K , V > cache )
145+ {
146+ new ConcurrentLfuIntegrityChecker < K , V > ( cache ) . Validate ( ) ;
147+ }
148+ }
149+
150+ public class ConcurrentLfuIntegrityChecker < K , V >
151+ {
152+ private readonly ConcurrentLfu < K , V > cache ;
153+
154+ private readonly LfuNodeList < K , V > windowLru ;
155+ private readonly LfuNodeList < K , V > probationLru ;
156+ private readonly LfuNodeList < K , V > protectedLru ;
157+
158+ private readonly StripedMpscBuffer < LfuNode < K , V > > readBuffer ;
159+ private readonly MpscBoundedBuffer < LfuNode < K , V > > writeBuffer ;
160+
161+ private static FieldInfo windowLruField = typeof ( ConcurrentLfu < K , V > ) . GetField ( "windowLru" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
162+ private static FieldInfo probationLruField = typeof ( ConcurrentLfu < K , V > ) . GetField ( "probationLru" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
163+ private static FieldInfo protectedLruField = typeof ( ConcurrentLfu < K , V > ) . GetField ( "protectedLru" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
164+
165+ private static FieldInfo readBufferField = typeof ( ConcurrentLfu < K , V > ) . GetField ( "readBuffer" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
166+ private static FieldInfo writeBufferField = typeof ( ConcurrentLfu < K , V > ) . GetField ( "writeBuffer" , BindingFlags . NonPublic | BindingFlags . Instance ) ;
167+
168+ public ConcurrentLfuIntegrityChecker ( ConcurrentLfu < K , V > cache )
169+ {
170+ this . cache = cache ;
171+
172+ // get lrus via reflection
173+ this . windowLru = ( LfuNodeList < K , V > ) windowLruField . GetValue ( cache ) ;
174+ this . probationLru = ( LfuNodeList < K , V > ) probationLruField . GetValue ( cache ) ;
175+ this . protectedLru = ( LfuNodeList < K , V > ) protectedLruField . GetValue ( cache ) ;
176+
177+ this . readBuffer = ( StripedMpscBuffer < LfuNode < K , V > > ) readBufferField . GetValue ( cache ) ;
178+ this . writeBuffer = ( MpscBoundedBuffer < LfuNode < K , V > > ) writeBufferField . GetValue ( cache ) ;
179+ }
180+
181+ public void Validate ( )
182+ {
183+ cache . DoMaintenance ( ) ;
184+
185+ // buffers should be empty after maintenance
186+ this . readBuffer . Count . Should ( ) . Be ( 0 ) ;
187+ this . writeBuffer . Count . Should ( ) . Be ( 0 ) ;
188+
189+ // all the items in the LRUs must exist in the dictionary.
190+ // no items should be marked as removed after maintenance has run
191+ VerifyLruInDictionary ( this . windowLru ) ;
192+ VerifyLruInDictionary ( this . probationLru ) ;
193+ VerifyLruInDictionary ( this . protectedLru ) ;
194+
195+ // all the items in the dictionary must exist in the node list
196+ VerifyDictionaryInLrus ( ) ;
197+
198+ // cache must be within capacity
199+ cache . Count . Should ( ) . BeLessThanOrEqualTo ( cache . Capacity , "capacity out of valid range" ) ;
200+ }
201+
202+ private void VerifyLruInDictionary ( LfuNodeList < K , V > lfuNodes )
203+ {
204+ var node = lfuNodes . First ;
205+
206+ while ( node != null )
207+ {
208+ node . WasRemoved . Should ( ) . BeFalse ( ) ;
209+ node . WasDeleted . Should ( ) . BeFalse ( ) ;
210+ cache . TryGet ( node . Key , out _ ) . Should ( ) . BeTrue ( ) ;
211+
212+ node = node . Next ;
213+ }
214+ }
215+
216+ private void VerifyDictionaryInLrus ( )
217+ {
218+ foreach ( var kvp in this . cache )
219+ {
220+ var exists = Exists ( kvp , this . windowLru ) || Exists ( kvp , this . probationLru ) || Exists ( kvp , this . protectedLru ) ;
221+ exists . Should ( ) . BeTrue ( $ "key { kvp . Key } should exist in LRU lists") ;
222+ }
223+ }
224+
225+ private static bool Exists ( KeyValuePair < K , V > kvp , LfuNodeList < K , V > lfuNodes )
226+ {
227+ var node = lfuNodes . First ;
228+
229+ while ( node != null )
230+ {
231+ if ( EqualityComparer < K > . Default . Equals ( node . Key , kvp . Key ) )
232+ {
233+ return true ;
234+ }
235+
236+ node = node . Next ;
237+ }
238+
239+ return false ;
48240 }
49241 }
50242}
0 commit comments