Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions SteamKit2/SteamKit2/Steam/SteamClient/CallbackMgr/CallbackMgr.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public CallbackManager( SteamClient client )
/// Runs a single queued callback.
/// If no callback is queued, this method will instantly return.
/// </summary>
/// <remarks>If any asynchronous callbacks are registered, they will be blocked on synchronously.
/// Use <see cref="RunWaitCallbackAsync"/> to properly await asynchronous callbacks.</remarks>
/// <returns>Returns true if a callback has been run, false otherwise.</returns>
public bool RunCallbacks()
{
Expand All @@ -61,6 +63,7 @@ public bool RunCallbacks()
/// If no callback is queued, the method will block for the given timeout or until a callback becomes available.
/// </summary>
/// <param name="timeout">The length of time to block.</param>
/// <remarks><inheritdoc cref="RunCallbacks" path="/remarks"/></remarks>
/// <returns>Returns true if a callback has been run, false otherwise.</returns>
public bool RunWaitCallbacks( TimeSpan timeout )
{
Expand All @@ -78,6 +81,7 @@ public bool RunWaitCallbacks( TimeSpan timeout )
/// This method returns once the queue has been emptied.
/// </summary>
/// <param name="timeout">The length of time to block.</param>
/// <remarks><inheritdoc cref="RunCallbacks" path="/remarks"/></remarks>
public void RunWaitAllCallbacks( TimeSpan timeout )
{
if ( !RunWaitCallbacks( timeout ) )
Expand All @@ -94,6 +98,7 @@ public void RunWaitAllCallbacks( TimeSpan timeout )
/// Blocks the current thread to run a single queued callback.
/// If no callback is queued, the method will block until one becomes available.
/// </summary>
/// <remarks><inheritdoc cref="RunCallbacks" path="/remarks"/></remarks>
public void RunWaitCallbacks()
{
var call = client.WaitForCallback();
Expand All @@ -106,7 +111,7 @@ public void RunWaitCallbacks()
public async Task RunWaitCallbackAsync( CancellationToken cancellationToken = default )
{
var call = await client.WaitForCallbackAsync( cancellationToken ).ConfigureAwait( false );
Handle( call );
await HandleAsync( call ).ConfigureAwait( false );
}

/// <summary>
Expand Down Expand Up @@ -139,6 +144,36 @@ public IDisposable Subscribe<TCallback>( Action<TCallback> callbackFunc )
return Subscribe( JobID.Invalid, callbackFunc );
}

/// <summary>
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks of type <typeparamref name="TCallback" />.
/// </summary>
/// <param name="jobID">The <see cref="JobID"/> of the callbacks that should be subscribed to.
/// If this is <see cref="JobID.Invalid"/>, all callbacks of type <typeparamref name="TCallback" /> will be received.</param>
/// <param name="callbackFunc">The function to invoke with the callback.</param>
/// <typeparam name="TCallback">The type of callback to subscribe to.</typeparam>
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
public IDisposable Subscribe<TCallback>( JobID jobID, Func<TCallback, Task> callbackFunc ) where TCallback : CallbackMsg
{
ArgumentNullException.ThrowIfNull( jobID );
ArgumentNullException.ThrowIfNull( callbackFunc );

var callback = new Internal.AsyncCallback<TCallback>( callbackFunc, this, jobID );
return callback;
}

/// <summary>
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks of type <typeparam name="TCallback" />.
/// </summary>
/// <param name="callbackFunc">The function to invoke with the callback.</param>
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
public IDisposable Subscribe<TCallback>( Func<TCallback, Task> callbackFunc )
where TCallback : CallbackMsg
{
return Subscribe( JobID.Invalid, callbackFunc );
}

/// <summary>
/// Registers the provided <see cref="Action{T}"/> to receive callbacks for notifications from the service of type <typeparam name="TService" />
/// with the notification message of type <typeparam name="TNotification"></typeparam>.
Expand All @@ -157,6 +192,25 @@ public IDisposable SubscribeServiceNotification<TService, TNotification>( Action
return callback;
}

/// <summary>
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks for notifications from the service of type <typeparam name="TService" />
/// with the notification message of type <typeparam name="TNotification"></typeparam>.
/// </summary>
/// <param name="callbackFunc">The function to invoke with the callback.</param>
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
public IDisposable SubscribeServiceNotification<TService, TNotification>( Func<SteamUnifiedMessages.ServiceMethodNotification<TNotification>, Task> callbackFunc )
where TService : SteamUnifiedMessages.UnifiedService, new()
where TNotification : IExtensible, new()
{
ArgumentNullException.ThrowIfNull( callbackFunc );

steamUnifiedMessages.CreateService<TService>();

var callback = new AsyncCallback<SteamUnifiedMessages.ServiceMethodNotification<TNotification>>( callbackFunc, this, JobID.Invalid );
return callback;
}

/// <summary>
/// Registers the provided <see cref="Action{T}"/> to receive callbacks for responses of <see cref="SteamUnifiedMessages"/> requests
/// made by the service of type <typeparam name="TService" /> with the response of type <typeparam name="TResponse"></typeparam>.
Expand All @@ -175,6 +229,25 @@ public IDisposable SubscribeServiceResponse<TService, TResponse>( Action<SteamUn
return callback;
}

/// <summary>
/// Registers the provided <see cref="Func{T, Task}"/> to receive callbacks for responses of <see cref="SteamUnifiedMessages"/> requests
/// made by the service of type <typeparam name="TService" /> with the response of type <typeparam name="TResponse"></typeparam>.
/// </summary>
/// <param name="callbackFunc">The function to invoke with the callback.</param>
/// <remarks>When subscribing to asynchronous methods, <see cref="RunWaitCallbackAsync"/> should be used for awaiting callbacks.</remarks>
/// <returns>An <see cref="IDisposable"/>. Disposing of the return value will unsubscribe the <paramref name="callbackFunc"/>.</returns>
public IDisposable SubscribeServiceResponse<TService, TResponse>( Func<SteamUnifiedMessages.ServiceMethodResponse<TResponse>, Task> callbackFunc )
where TService : SteamUnifiedMessages.UnifiedService, new()
where TResponse : IExtensible, new()
{
ArgumentNullException.ThrowIfNull( callbackFunc );

steamUnifiedMessages.CreateService<TService>();

var callback = new AsyncCallback<SteamUnifiedMessages.ServiceMethodResponse<TResponse>>( callbackFunc, this, JobID.Invalid );
return callback;
}

internal void Register( CallbackBase call )
=> ImmutableInterlocked.Update( ref registeredCallbacks, static ( list, item ) => list.Add( item ), call );

Expand All @@ -191,7 +264,27 @@ void Handle( CallbackMsg call )
{
if ( callback.CallbackType.IsAssignableFrom( type ) )
{
callback.Run( call );
var task = callback.Run( call );

if ( !task.IsCompletedSuccessfully )
{
task.AsTask().Wait();
}
}
}
}

async Task HandleAsync( CallbackMsg call )
{
var callbacks = registeredCallbacks;
var type = call.GetType();

// find handlers interested in this callback
foreach ( var callback in callbacks )
{
if ( callback.CallbackType.IsAssignableFrom( type ) )
{
await callback.Run( call ).ConfigureAwait( false );
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@


using System;
using System.Threading.Tasks;

namespace SteamKit2.Internal
{
Expand All @@ -16,22 +17,24 @@ namespace SteamKit2.Internal
abstract class CallbackBase
{
internal abstract Type CallbackType { get; }
internal abstract void Run( CallbackMsg callback );
internal abstract ValueTask Run( CallbackMsg callback );
}

sealed class Callback<TCall> : CallbackBase, IDisposable
where TCall : CallbackMsg
{
CallbackManager? mgr;

public JobID JobID { get; set; }
public JobID JobID { get; }

public Action<TCall> OnRun { get; set; }
public Action<TCall> OnRun { get; }

internal override Type CallbackType => typeof( TCall );

public Callback( Action<TCall> func, CallbackManager mgr, JobID jobID )
{
ArgumentNullException.ThrowIfNull( func );

this.JobID = jobID;
this.OnRun = func;
this.mgr = mgr;
Expand All @@ -52,13 +55,60 @@ public void Dispose()
System.GC.SuppressFinalize( this );
}

internal override void Run( CallbackMsg callback )
internal override ValueTask Run( CallbackMsg callback )
{
var cb = callback as TCall;
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) && OnRun != null )
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) )
{
OnRun( cb );
}
return default;
}
}

sealed class AsyncCallback<TCall> : CallbackBase, IDisposable
where TCall : CallbackMsg
{
CallbackManager? mgr;

public JobID JobID { get; }

public Func<TCall, Task> OnRun { get; }

internal override Type CallbackType => typeof( TCall );

public AsyncCallback( Func<TCall, Task> func, CallbackManager mgr, JobID jobID )
{
ArgumentNullException.ThrowIfNull( func );

this.JobID = jobID;
this.OnRun = func;
this.mgr = mgr;

mgr.Register( this );
}

~AsyncCallback()
{
Dispose();
}

public void Dispose()
{
mgr?.Unregister( this );
mgr = null;

System.GC.SuppressFinalize( this );
}

internal override ValueTask Run( CallbackMsg callback )
{
var cb = callback as TCall;
if ( cb != null && ( cb.JobID == JobID || JobID == JobID.Invalid ) )
{
return new ValueTask( OnRun( cb ) );
}
return default;
}
}
}
Loading