1515using System ;
1616using System . Threading ;
1717using System . Threading . Tasks ;
18+ using ServiceStack . AsyncEx ;
1819
1920namespace ServiceStack . Redis
2021{
2122 public partial class PooledRedisClientManager
2223 : IRedisClientsManagerAsync
2324 {
25+ /// <summary>
26+ /// Use previous client resolving behavior
27+ /// </summary>
28+ public static bool UseGetClientBlocking = false ;
29+
2430 ValueTask < ICacheClientAsync > IRedisClientsManagerAsync . GetCacheClientAsync ( CancellationToken token )
2531 => new RedisClientManagerCacheClient ( this ) . AsValueTaskResult < ICacheClientAsync > ( ) ;
2632
27- ValueTask < IRedisClientAsync > IRedisClientsManagerAsync . GetClientAsync ( CancellationToken token )
28- => GetClientAsync ( ) ;
33+ ValueTask < IRedisClientAsync > IRedisClientsManagerAsync . GetClientAsync ( CancellationToken token ) => UseGetClientBlocking
34+ ? GetClientBlocking ( ) . AsValueTaskResult < IRedisClientAsync > ( )
35+ : GetClientAsync ( ) ;
2936
3037 ValueTask < ICacheClientAsync > IRedisClientsManagerAsync . GetReadOnlyCacheClientAsync ( CancellationToken token )
3138 => new RedisClientManagerCacheClient ( this ) { ReadOnly = true } . AsValueTaskResult < ICacheClientAsync > ( ) ;
3239
33- ValueTask < IRedisClientAsync > IRedisClientsManagerAsync . GetReadOnlyClientAsync ( CancellationToken token )
34- => GetReadOnlyClient ( true ) . AsValueTaskResult < IRedisClientAsync > ( ) ;
40+ ValueTask < IRedisClientAsync > IRedisClientsManagerAsync . GetReadOnlyClientAsync ( CancellationToken token ) => UseGetClientBlocking
41+ ? GetReadOnlyClientBlocking ( ) . AsValueTaskResult < IRedisClientAsync > ( )
42+ : GetReadOnlyClientAsync ( ) ;
3543
3644 ValueTask IAsyncDisposable . DisposeAsync ( )
3745 {
3846 Dispose ( ) ;
3947 return default ;
4048 }
49+
50+ private AsyncManualResetEvent readAsyncEvent ;
51+ partial void PulseAllReadAsync ( )
52+ {
53+ readAsyncEvent ? . Set ( ) ;
54+ readAsyncEvent ? . Reset ( ) ;
55+ }
56+
57+ private AsyncManualResetEvent writeAsyncEvent ;
58+ partial void PulseAllWriteAsync ( )
59+ {
60+ writeAsyncEvent ? . Set ( ) ;
61+ writeAsyncEvent ? . Reset ( ) ;
62+ }
63+
64+ private async Task < bool > WaitForWriter ( int msTimeout )
65+ {
66+ // If we're not doing async, no need to create this till we need it.
67+ writeAsyncEvent ??= new AsyncManualResetEvent ( false ) ;
68+ var cts = new CancellationTokenSource ( TimeSpan . FromMilliseconds ( msTimeout ) ) ;
69+ try
70+ {
71+ await writeAsyncEvent . WaitAsync ( cts . Token ) ;
72+ }
73+ catch ( OperationCanceledException ) { return false ; }
74+ return true ;
75+ }
76+
77+ private async ValueTask < IRedisClientAsync > GetClientAsync ( )
78+ {
79+ try
80+ {
81+ var inactivePoolIndex = - 1 ;
82+ do
83+ {
84+ lock ( writeClients )
85+ {
86+ AssertValidReadWritePool ( ) ;
87+
88+ // If it's -1, then we want to try again after a delay of some kind. So if it's NOT negative one, process it...
89+ if ( ( inactivePoolIndex = GetInActiveWriteClient ( out var inActiveClient ) ) != - 1 )
90+ {
91+ //inActiveClient != null only for Valid InActive Clients
92+ if ( inActiveClient != null )
93+ {
94+ WritePoolIndex ++ ;
95+ inActiveClient . Activate ( ) ;
96+
97+ InitClient ( inActiveClient ) ;
98+
99+ return inActiveClient ;
100+ }
101+ else
102+ {
103+ // Still need to be in lock for this!
104+ break ;
105+ }
106+ }
107+ }
108+
109+ if ( PoolTimeout . HasValue )
110+ {
111+ // We have a timeout value set - so try to not wait longer than this.
112+ if ( ! await WaitForWriter ( PoolTimeout . Value ) )
113+ {
114+ throw new TimeoutException ( PoolTimeoutError ) ;
115+ }
116+ }
117+ else
118+ {
119+ // Wait forever, so just retry till we get one.
120+ await WaitForWriter ( RecheckPoolAfterMs ) ;
121+ }
122+ } while ( true ) ; // Just keep repeating until we get a slot.
123+
124+ //Reaches here when there's no Valid InActive Clients, but we have a slot for one!
125+ try
126+ {
127+ //inactivePoolIndex = index of reservedSlot || index of invalid client
128+ var existingClient = writeClients [ inactivePoolIndex ] ;
129+ if ( existingClient != null && existingClient != reservedSlot && existingClient . HadExceptions )
130+ {
131+ RedisState . DeactivateClient ( existingClient ) ;
132+ }
133+
134+ var newClient = InitNewClient ( RedisResolver . CreateMasterClient ( inactivePoolIndex ) ) ;
135+
136+ //Put all blocking I/O or potential Exceptions before lock
137+ lock ( writeClients )
138+ {
139+ //If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
140+ if ( writeClients [ inactivePoolIndex ] != existingClient )
141+ {
142+ if ( Log . IsDebugEnabled )
143+ Log . Debug ( "writeClients[inactivePoolIndex] != existingClient: {0}" . Fmt ( writeClients [ inactivePoolIndex ] ) ) ;
144+
145+ return newClient ; //return client outside of pool
146+ }
147+
148+ WritePoolIndex ++ ;
149+ writeClients [ inactivePoolIndex ] = newClient ;
150+
151+ return ! AssertAccessOnlyOnSameThread
152+ ? newClient
153+ : newClient . LimitAccessToThread ( Thread . CurrentThread . ManagedThreadId , Environment . StackTrace ) ;
154+ }
155+ }
156+ catch
157+ {
158+ //Revert free-slot for any I/O exceptions that can throw (before lock)
159+ lock ( writeClients )
160+ {
161+ writeClients [ inactivePoolIndex ] = null ; //free slot
162+ }
163+ throw ;
164+ }
165+ }
166+ finally
167+ {
168+ RedisState . DisposeExpiredClients ( ) ;
169+ }
170+ }
171+
172+ private async Task < bool > WaitForReader ( int msTimeout )
173+ {
174+ // If we're not doing async, no need to create this till we need it.
175+ readAsyncEvent ??= new AsyncManualResetEvent ( false ) ;
176+ var cts = new CancellationTokenSource ( TimeSpan . FromMilliseconds ( msTimeout ) ) ;
177+ try
178+ {
179+ await readAsyncEvent . WaitAsync ( cts . Token ) ;
180+ }
181+ catch ( OperationCanceledException ) { return false ; }
182+ return true ;
183+ }
184+
185+ private async ValueTask < IRedisClientAsync > GetReadOnlyClientAsync ( )
186+ {
187+ try
188+ {
189+ var inactivePoolIndex = - 1 ;
190+ do
191+ {
192+ lock ( readClients )
193+ {
194+ AssertValidReadOnlyPool ( ) ;
195+
196+ // If it's -1, then we want to try again after a delay of some kind. So if it's NOT negative one, process it...
197+ if ( ( inactivePoolIndex = GetInActiveReadClient ( out var inActiveClient ) ) != - 1 )
198+ {
199+ //inActiveClient != null only for Valid InActive Clients
200+ if ( inActiveClient != null )
201+ {
202+ ReadPoolIndex ++ ;
203+ inActiveClient . Activate ( ) ;
204+
205+ InitClient ( inActiveClient ) ;
206+
207+ return inActiveClient ;
208+ }
209+ else
210+ {
211+ // Still need to be in lock for this!
212+ break ;
213+ }
214+ }
215+ }
216+
217+ if ( PoolTimeout . HasValue )
218+ {
219+ // We have a timeout value set - so try to not wait longer than this.
220+ if ( ! await WaitForReader ( PoolTimeout . Value ) )
221+ {
222+ throw new TimeoutException ( PoolTimeoutError ) ;
223+ }
224+ }
225+ else
226+ {
227+ // Wait forever, so just retry till we get one.
228+ await WaitForReader ( RecheckPoolAfterMs ) ;
229+ }
230+ } while ( true ) ; // Just keep repeating until we get a slot.
231+
232+ //Reaches here when there's no Valid InActive Clients
233+ try
234+ {
235+ //inactivePoolIndex = index of reservedSlot || index of invalid client
236+ var existingClient = readClients [ inactivePoolIndex ] ;
237+ if ( existingClient != null && existingClient != reservedSlot && existingClient . HadExceptions )
238+ {
239+ RedisState . DeactivateClient ( existingClient ) ;
240+ }
241+
242+ var newClient = InitNewClient ( RedisResolver . CreateSlaveClient ( inactivePoolIndex ) ) ;
243+
244+ //Put all blocking I/O or potential Exceptions before lock
245+ lock ( readClients )
246+ {
247+ //If existingClient at inactivePoolIndex changed (failover) return new client outside of pool
248+ if ( readClients [ inactivePoolIndex ] != existingClient )
249+ {
250+ if ( Log . IsDebugEnabled )
251+ Log . Debug ( "readClients[inactivePoolIndex] != existingClient: {0}" . Fmt ( readClients [ inactivePoolIndex ] ) ) ;
252+
253+ Interlocked . Increment ( ref RedisState . TotalClientsCreatedOutsidePool ) ;
254+
255+ //Don't handle callbacks for new client outside pool
256+ newClient . ClientManager = null ;
257+ return newClient ; //return client outside of pool
258+ }
259+
260+ ReadPoolIndex ++ ;
261+ readClients [ inactivePoolIndex ] = newClient ;
262+ return newClient ;
263+ }
264+ }
265+ catch
266+ {
267+ //Revert free-slot for any I/O exceptions that can throw
268+ lock ( readClients )
269+ {
270+ readClients [ inactivePoolIndex ] = null ; //free slot
271+ }
272+ throw ;
273+ }
274+ }
275+ finally
276+ {
277+ RedisState . DisposeExpiredClients ( ) ;
278+ }
279+ }
280+
41281 }
42282
43283}
0 commit comments