Skip to content

Commit 5aabe27

Browse files
authored
Add support for subscribing to callbacks that return Task (#1486)
1 parent cd995a1 commit 5aabe27

3 files changed

Lines changed: 345 additions & 8 deletions

File tree

SteamKit2/SteamKit2/Steam/SteamClient/CallbackMgr/CallbackMgr.cs

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public CallbackManager( SteamClient client )
4545
/// Runs a single queued callback.
4646
/// If no callback is queued, this method will instantly return.
4747
/// </summary>
48+
/// <remarks>If any asynchronous callbacks are registered, they will be blocked on synchronously.
49+
/// Use <see cref="RunWaitCallbackAsync"/> to properly await asynchronous callbacks.</remarks>
4850
/// <returns>Returns true if a callback has been run, false otherwise.</returns>
4951
public bool RunCallbacks()
5052
{
@@ -61,6 +63,7 @@ public bool RunCallbacks()
6163
/// If no callback is queued, the method will block for the given timeout or until a callback becomes available.
6264
/// </summary>
6365
/// <param name="timeout">The length of time to block.</param>
66+
/// <remarks><inheritdoc cref="RunCallbacks" path="/remarks"/></remarks>
6467
/// <returns>Returns true if a callback has been run, false otherwise.</returns>
6568
public bool RunWaitCallbacks( TimeSpan timeout )
6669
{
@@ -78,6 +81,7 @@ public bool RunWaitCallbacks( TimeSpan timeout )
7881
/// This method returns once the queue has been emptied.
7982
/// </summary>
8083
/// <param name="timeout">The length of time to block.</param>
84+
/// <remarks><inheritdoc cref="RunCallbacks" path="/remarks"/></remarks>
8185
public void RunWaitAllCallbacks( TimeSpan timeout )
8286
{
8387
if ( !RunWaitCallbacks( timeout ) )
@@ -94,6 +98,7 @@ public void RunWaitAllCallbacks( TimeSpan timeout )
9498
/// Blocks the current thread to run a single queued callback.
9599
/// If no callback is queued, the method will block until one becomes available.
96100
/// </summary>
101+
/// <remarks><inheritdoc cref="RunCallbacks" path="/remarks"/></remarks>
97102
public void RunWaitCallbacks()
98103
{
99104
var call = client.WaitForCallback();
@@ -106,7 +111,7 @@ public void RunWaitCallbacks()
106111
public async Task RunWaitCallbackAsync( CancellationToken cancellationToken = default )
107112
{
108113
var call = await client.WaitForCallbackAsync( cancellationToken ).ConfigureAwait( false );
109-
Handle( call );
114+
await HandleAsync( call ).ConfigureAwait( false );
110115
}
111116

112117
/// <summary>
@@ -139,6 +144,36 @@ public IDisposable Subscribe<TCallback>( Action<TCallback> callbackFunc )
139144
return Subscribe( JobID.Invalid, callbackFunc );
140145
}
141146

147+
/// <summary>
148+
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks of type <typeparamref name="TCallback" />.
149+
/// </summary>
150+
/// <param name="jobID">The <see cref="JobID"/> of the callbacks that should be subscribed to.
151+
/// If this is <see cref="JobID.Invalid"/>, all callbacks of type <typeparamref name="TCallback" /> will be received.</param>
152+
/// <param name="callbackFunc">The function to invoke with the callback.</param>
153+
/// <typeparam name="TCallback">The type of callback to subscribe to.</typeparam>
154+
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
155+
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
156+
public IDisposable Subscribe<TCallback>( JobID jobID, Func<TCallback, Task> callbackFunc ) where TCallback : CallbackMsg
157+
{
158+
ArgumentNullException.ThrowIfNull( jobID );
159+
ArgumentNullException.ThrowIfNull( callbackFunc );
160+
161+
var callback = new Internal.AsyncCallback<TCallback>( callbackFunc, this, jobID );
162+
return callback;
163+
}
164+
165+
/// <summary>
166+
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks of type <typeparam name="TCallback" />.
167+
/// </summary>
168+
/// <param name="callbackFunc">The function to invoke with the callback.</param>
169+
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
170+
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
171+
public IDisposable Subscribe<TCallback>( Func<TCallback, Task> callbackFunc )
172+
where TCallback : CallbackMsg
173+
{
174+
return Subscribe( JobID.Invalid, callbackFunc );
175+
}
176+
142177
/// <summary>
143178
/// Registers the provided <see cref="Action{T}"/> to receive callbacks for notifications from the service of type <typeparam name="TService" />
144179
/// with the notification message of type <typeparam name="TNotification"></typeparam>.
@@ -157,6 +192,25 @@ public IDisposable SubscribeServiceNotification<TService, TNotification>( Action
157192
return callback;
158193
}
159194

195+
/// <summary>
196+
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks for notifications from the service of type <typeparam name="TService" />
197+
/// with the notification message of type <typeparam name="TNotification"></typeparam>.
198+
/// </summary>
199+
/// <param name="callbackFunc">The function to invoke with the callback.</param>
200+
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
201+
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
202+
public IDisposable SubscribeServiceNotification<TService, TNotification>( Func<SteamUnifiedMessages.ServiceMethodNotification<TNotification>, Task> callbackFunc )
203+
where TService : SteamUnifiedMessages.UnifiedService, new()
204+
where TNotification : IExtensible, new()
205+
{
206+
ArgumentNullException.ThrowIfNull( callbackFunc );
207+
208+
steamUnifiedMessages.CreateService<TService>();
209+
210+
var callback = new AsyncCallback<SteamUnifiedMessages.ServiceMethodNotification<TNotification>>( callbackFunc, this, JobID.Invalid );
211+
return callback;
212+
}
213+
160214
/// <summary>
161215
/// Registers the provided <see cref="Action{T}"/> to receive callbacks for responses of <see cref="SteamUnifiedMessages"/> requests
162216
/// made by the service of type <typeparam name="TService" /> with the response of type <typeparam name="TResponse"></typeparam>.
@@ -175,6 +229,25 @@ public IDisposable SubscribeServiceResponse<TService, TResponse>( Action<SteamUn
175229
return callback;
176230
}
177231

232+
/// <summary>
233+
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks for responses of <see cref="SteamUnifiedMessages"/> requests
234+
/// made by the service of type <typeparam name="TService" /> with the response of type <typeparam name="TResponse"></typeparam>.
235+
/// </summary>
236+
/// <param name="callbackFunc">The function to invoke with the callback.</param>
237+
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
238+
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
239+
public IDisposable SubscribeServiceResponse<TService, TResponse>( Func<SteamUnifiedMessages.ServiceMethodResponse<TResponse>, Task> callbackFunc )
240+
where TService : SteamUnifiedMessages.UnifiedService, new()
241+
where TResponse : IExtensible, new()
242+
{
243+
ArgumentNullException.ThrowIfNull( callbackFunc );
244+
245+
steamUnifiedMessages.CreateService<TService>();
246+
247+
var callback = new AsyncCallback<SteamUnifiedMessages.ServiceMethodResponse<TResponse>>( callbackFunc, this, JobID.Invalid );
248+
return callback;
249+
}
250+
178251
internal void Register( CallbackBase call )
179252
=> ImmutableInterlocked.Update( ref registeredCallbacks, static ( list, item ) => list.Add( item ), call );
180253

@@ -191,7 +264,27 @@ void Handle( CallbackMsg call )
191264
{
192265
if ( callback.CallbackType.IsAssignableFrom( type ) )
193266
{
194-
callback.Run( call );
267+
var task = callback.Run( call );
268+
269+
if ( !task.IsCompletedSuccessfully )
270+
{
271+
task.AsTask().Wait();
272+
}
273+
}
274+
}
275+
}
276+
277+
async Task HandleAsync( CallbackMsg call )
278+
{
279+
var callbacks = registeredCallbacks;
280+
var type = call.GetType();
281+
282+
// find handlers interested in this callback
283+
foreach ( var callback in callbacks )
284+
{
285+
if ( callback.CallbackType.IsAssignableFrom( type ) )
286+
{
287+
await callback.Run( call ).ConfigureAwait( false );
195288
}
196289
}
197290
}

SteamKit2/SteamKit2/Steam/SteamClient/CallbackMgr/CallbackMgr_Internals.cs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77

88
using System;
9+
using System.Threading.Tasks;
910

1011
namespace SteamKit2.Internal
1112
{
@@ -16,22 +17,24 @@ namespace SteamKit2.Internal
1617
abstract class CallbackBase
1718
{
1819
internal abstract Type CallbackType { get; }
19-
internal abstract void Run( CallbackMsg callback );
20+
internal abstract ValueTask Run( CallbackMsg callback );
2021
}
2122

2223
sealed class Callback<TCall> : CallbackBase, IDisposable
2324
where TCall : CallbackMsg
2425
{
2526
CallbackManager? mgr;
2627

27-
public JobID JobID { get; set; }
28+
public JobID JobID { get; }
2829

29-
public Action<TCall> OnRun { get; set; }
30+
public Action<TCall> OnRun { get; }
3031

3132
internal override Type CallbackType => typeof( TCall );
3233

3334
public Callback( Action<TCall> func, CallbackManager mgr, JobID jobID )
3435
{
36+
ArgumentNullException.ThrowIfNull( func );
37+
3538
this.JobID = jobID;
3639
this.OnRun = func;
3740
this.mgr = mgr;
@@ -52,13 +55,60 @@ public void Dispose()
5255
System.GC.SuppressFinalize( this );
5356
}
5457

55-
internal override void Run( CallbackMsg callback )
58+
internal override ValueTask Run( CallbackMsg callback )
5659
{
5760
var cb = callback as TCall;
58-
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) && OnRun != null )
61+
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) )
5962
{
6063
OnRun( cb );
6164
}
65+
return default;
66+
}
67+
}
68+
69+
sealed class AsyncCallback<TCall> : CallbackBase, IDisposable
70+
where TCall : CallbackMsg
71+
{
72+
CallbackManager? mgr;
73+
74+
public JobID JobID { get; }
75+
76+
public Func<TCall, Task> OnRun { get; }
77+
78+
internal override Type CallbackType => typeof( TCall );
79+
80+
public AsyncCallback( Func<TCall, Task> func, CallbackManager mgr, JobID jobID )
81+
{
82+
ArgumentNullException.ThrowIfNull( func );
83+
84+
this.JobID = jobID;
85+
this.OnRun = func;
86+
this.mgr = mgr;
87+
88+
mgr.Register( this );
89+
}
90+
91+
~AsyncCallback()
92+
{
93+
Dispose();
94+
}
95+
96+
public void Dispose()
97+
{
98+
mgr?.Unregister( this );
99+
mgr = null;
100+
101+
System.GC.SuppressFinalize( this );
102+
}
103+
104+
internal override ValueTask Run( CallbackMsg callback )
105+
{
106+
var cb = callback as TCall;
107+
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) )
108+
{
109+
return new ValueTask( OnRun( cb ) );
110+
}
111+
return default;
62112
}
63113
}
64114
}

0 commit comments

Comments
 (0)