Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions PCL.Core.Test/App/EventBus/EventBusServiceTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using PCL.Core.App.EventBus;
using System;
using System.Threading.Tasks;

namespace PCL.Core.Test.App.EventBus;

[TestClass]
public class EventBusServiceTest
{
private record MyEvent(Guid Id, string Name, int Value) : EventDataBase(Id, Name);

[TestMethod]
public async Task Publish_Calls_Delegate_Handler()
{
var channel = "test-delegate-" + Guid.NewGuid();
Assert.IsTrue(EventBusService.AddChannel(channel));

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
using var sub = EventBusService.Subscribe<MyEvent>(channel, ev =>
{
tcs.TrySetResult(ev.Value == 42);
return Task.CompletedTask;
});

await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "x", 42));

var completed = await Task.WhenAny(tcs.Task, Task.Delay(1000));
Assert.AreEqual(tcs.Task, completed, "Handler was not invoked within timeout");
Assert.IsTrue(await tcs.Task.ConfigureAwait(false));

EventBusService.RemoveChannel(channel);
}

private class HandlerObject : IEventHandler<MyEvent>
{
private readonly TaskCompletionSource<MyEvent> _tcs;
public HandlerObject(TaskCompletionSource<MyEvent> tcs) => _tcs = tcs;
public void Dispose() { }
public Task HandleEventAsync(MyEvent eventData)
{
_tcs.TrySetResult(eventData);
return _tcs.Task;
}
}

[TestMethod]
public async Task Publish_Calls_IEventHandler_Instance()
{
var channel = "test-instance-" + Guid.NewGuid();
Assert.IsTrue(EventBusService.AddChannel(channel));

var tcs = new TaskCompletionSource<MyEvent>(TaskCreationOptions.RunContinuationsAsynchronously);
var handler = new HandlerObject(tcs);
using var sub = EventBusService.Subscribe<MyEvent>(channel, handler);

await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "y", 7));

var completed = await Task.WhenAny(tcs.Task, Task.Delay(1000));
Assert.AreEqual(tcs.Task, completed, "IEventHandler instance was not invoked");
Assert.AreEqual(7, (await tcs.Task.ConfigureAwait(false)).Value);

EventBusService.RemoveChannel(channel);
}

[TestMethod]
public async Task Unsubscribe_Prevents_Handler_Call()
{
var channel = "test-unsub-" + Guid.NewGuid();
Assert.IsTrue(EventBusService.AddChannel(channel));

var called = false;
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var sub = EventBusService.Subscribe<MyEvent>(channel, ev =>
{
called = true;
tcs.TrySetResult(true);
return Task.CompletedTask;
});

sub.Dispose();

await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "z", 1));

var completed = await Task.WhenAny(tcs.Task, Task.Delay(300));
Assert.AreNotEqual(tcs.Task, completed, "Handler should not be called after unsubscribe");
Assert.IsFalse(called, "Handler flag should remain false");

EventBusService.RemoveChannel(channel);
}
}
261 changes: 261 additions & 0 deletions PCL.Core/App/EventBus/EventBusService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
using PCL.Core.App.IoC;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace PCL.Core.App.EventBus;

[LifecycleService(LifecycleState.BeforeLoading)]
[LifecycleScope("eventbus", "EventBus")]
public sealed partial class EventBusService
{
private static readonly ConcurrentDictionary<string,
ConcurrentDictionary<Type, ConcurrentDictionary<Guid, (Func<EventDataBase, Task> Handler, WeakReference<object>? OwnerRef)>>> _Channels = [];

/// <summary>
/// 0 = running, 1 = stopping/closed
/// </summary>
private static int _isStopping;

[LifecycleStop]
private static Task _StopAsync()
{
Interlocked.Exchange(ref _isStopping, 1);
try
{
var channelCount = _Channels.Count;
var handlerCount = _Channels.Values.Sum(c => c.Values.Sum(h => h.Count));
_Channels.Clear();
Context.Info($"EventBus stopping: cleared {channelCount} channels and {handlerCount} handlers.");
return Task.CompletedTask;
}
catch (Exception exception)
{
Context.Error($"Exception while stopping EventBus: {exception}");
return Task.FromException(exception);
}
}

/// <summary>
/// Publish an event to a channel. All handlers subscribed to this channel with compatible event data type will be invoked.
/// </summary>
/// <exception cref="InvalidOperationException">EventBus is stopping</exception>
public static Task PublishAsync<TEventData>(string channelName, TEventData data) where TEventData : EventDataBase
{
if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping");
return _CallChannelAsync(channelName, data);
}

/// <summary>
/// 订阅使用 <c>IEventHandler{TEventData}</c> 的对象实例。
/// 返回 <see cref="IDisposable"/> 用于取消订阅。
/// </summary>
/// <exception cref="InvalidOperationException">EventBus is stopping</exception>
/// <exception cref="InvalidOperationException">Failed to create channel</exception>
/// <exception cref="ArgumentNullException"><paramref name="channel"/> is <see langword="null"/></exception>
public static IDisposable Subscribe<TEventData>(string channel, IEventHandler<TEventData> handler, bool disposeOwnerOnUnsubscribe = false)
where TEventData : EventDataBase
{
if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping");
if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel));
if (handler == null) throw new ArgumentNullException(nameof(handler));

if (!_Channels.TryGetValue(channel, out var dataHandler))
{
Context.Trace($"Channel {channel} not found.");
//throw new InvalidOperationException("No channel found for the given channel identification.");

// create channel if not exist
var success = AddChannel(channel);
if (!success)
{
throw new InvalidOperationException("Failed to create channel.");
}
}

dataHandler ??= _Channels[channel]; // ensure dataHandler is not null here

var dataType = typeof(TEventData);
var handlers = dataHandler.GetOrAdd(dataType, _ => []);

var ownerRef = new WeakReference<object>(handler);

var id = Guid.NewGuid();
handlers.TryAdd(id, (Wrapper, ownerRef));

return new Subscription(() =>
{
handlers.TryRemove(id, out _);
if (handlers.IsEmpty)
{
dataHandler.TryRemove(dataType, out _);
}

// disposal responsibility: if this subscription requested owner disposal, try to dispose target if still alive
if (disposeOwnerOnUnsubscribe &&
ownerRef.TryGetTarget(out var tgt) &&
tgt is IDisposable d)
{
// check if any remaining subscription in this channel still references the same owner
var stillReferenced = dataHandler.Values.Any(dict => dict.Values.Any(e => e.OwnerRef != null && e.OwnerRef.TryGetTarget(out var other) && ReferenceEquals(other, tgt)));

if (stillReferenced) return;

try { d.Dispose(); } catch (Exception ex) { Context.Error($"Exception disposing subscription owner: {ex}"); }
}
});

Task Wrapper(EventDataBase ev)
{
if (ownerRef.TryGetTarget(out var target) && target is IEventHandler<TEventData> typed)
{
return typed.HandleEventAsync((TEventData)ev);
}
return Task.CompletedTask;
}
}

/// <summary>
/// 订阅一个委托
/// </summary>
/// <exception cref="InvalidOperationException">EventBus is stopping</exception>
/// <exception cref="InvalidOperationException">Failed to create channel</exception>
/// <exception cref="ArgumentNullException"><paramref name="channel"/> is <see langword="null"/></exception>
public static IDisposable Subscribe<TEventData>(string channel, Func<TEventData, Task> handler)
where TEventData : EventDataBase
{
if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping");
if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel));
if (handler == null) throw new ArgumentNullException(nameof(handler));

if (!_Channels.TryGetValue(channel, out var dataHandler))
{
Context.Trace($"Channel {channel} not found.");
//throw new InvalidOperationException("No channel found for the given channel identification.");

// create channel if not exist
var success = AddChannel(channel);
if (!success)
{
throw new InvalidOperationException("Failed to create channel.");
}

}

dataHandler ??= _Channels[channel]; // ensure dataHandler is not null here

var dataType = typeof(TEventData);
var handlers = dataHandler.GetOrAdd(dataType, _ => []);

var id = Guid.NewGuid();
handlers.TryAdd(id, (Wrapper, null));

return new Subscription(() =>
{
handlers.TryRemove(id, out _);
if (handlers.IsEmpty)
{
dataHandler.TryRemove(dataType, out _);
}
});

Task Wrapper(EventDataBase ev) => handler((TEventData)ev);
}

/// <summary>
/// 创建 channel(显式)
/// </summary>
public static bool AddChannel(string name) => !string.IsNullOrWhiteSpace(name) && _Channels.TryAdd(name, []);

/// <summary>
/// Remove a channel and all its handlers. Use with caution.
/// </summary>
/// <param name="name">Channel name.</param>
/// <returns><see langword="true"/> if the channel was removed; otherwise, <see langword="false"/>.</returns>
public static bool RemoveChannel(string name) => _Channels.TryRemove(name, out _);

private static Task _CallChannelAsync<TEventData>(string channel, TEventData data)
where TEventData : EventDataBase
{
if (!_Channels.TryGetValue(channel, out var eventHandlers))
{
Context.Error($"Channel {channel} not found.");
throw new InvalidOperationException("No channel found for the given channel identification.");
}

return _CallEventHandlerAsync(data, eventHandlers);
}

private static Task _CallEventHandlerAsync<TEventData>(TEventData data,
ConcurrentDictionary<Type, ConcurrentDictionary<Guid, (Func<EventDataBase, Task> Handler, WeakReference<object>? OwnerRef)>> dataHandlers)
where TEventData : EventDataBase
{
var eventType = data.GetType();

var matching = new List<Func<EventDataBase, Task>>();
foreach (var (registeredType, handlers) in dataHandlers)
{
if (registeredType.IsAssignableFrom(eventType))
{
foreach (var kv in handlers.ToImmutableArray())
{
var key = kv.Key;
var entry = kv.Value;
if (entry.OwnerRef is not null)
{
if (!entry.OwnerRef.TryGetTarget(out var _))
{
// owner was collected, remove this subscription
handlers.TryRemove(key, out _);
continue;
}
}
matching.Add(entry.Handler);
}
}
}

if (matching.Count == 0)
{
Context.Trace($"No handler found for event data type {eventType.Name}");
return Task.CompletedTask;
// will not throw Exception
//throw new InvalidOperationException("No handler found for the given event data type.");
}

var tasks = matching.Select(async h =>
{
try
{
await h(data).ConfigureAwait(false);
}
catch (Exception ex)
{
Context.Error($"Event handler threw an exception: {ex}");
}
}).ToImmutableArray();

return Task.WhenAll(tasks);
}



private sealed class Subscription : IDisposable
{
private Action? _dispose;

/// <exception cref="ArgumentNullException">The dispose action is null.</exception>
public Subscription(Action dispose) => _dispose = dispose ?? throw new ArgumentNullException(nameof(dispose));

/// <exception cref="Exception">A delegate callback throws an exception.</exception>
public void Dispose()
{
var d = Interlocked.Exchange(ref _dispose, null);
d?.Invoke();
}
}
}
5 changes: 5 additions & 0 deletions PCL.Core/App/EventBus/EventDataBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using System;

namespace PCL.Core.App.EventBus;

public record EventDataBase(Guid Id, string Name);
14 changes: 14 additions & 0 deletions PCL.Core/App/EventBus/IEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using System.Threading.Tasks;

namespace PCL.Core.App.EventBus;

public interface IEventHandler<in TEventData> : IDisposable
where TEventData : EventDataBase
{
/// <summary>
/// Handle a event with the data, and the event is published by a publisher.
/// </summary>
/// <param name="eventData">The data that published by a publisher.</param>
Task HandleEventAsync(TEventData eventData);
}
13 changes: 13 additions & 0 deletions PCL.Core/App/EventBus/IResponsibleEventHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading.Tasks;

namespace PCL.Core.App.EventBus;

// I think this is too hard to implement. So this is Obsolete.
public interface IResponsibleEventHandler<TResponse>
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
Comment thread
whitecat346 marked this conversation as resolved.
{
/// <summary>
/// Handle a event with the data, and the event is published by a publisher, and return a response to the publisher.
/// </summary>
/// <param name="eventData">The event data that published by a publisher</param>
Task<TResponse> HandleEventAsync(EventDataBase eventData);
}
Loading