diff --git a/PCL.Core.Test/App/EventBus/EventBusServiceTest.cs b/PCL.Core.Test/App/EventBus/EventBusServiceTest.cs new file mode 100644 index 000000000..73565e0f8 --- /dev/null +++ b/PCL.Core.Test/App/EventBus/EventBusServiceTest.cs @@ -0,0 +1,91 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using PCL.Core.App.EventBus; +using System; +using System.Threading.Tasks; + +namespace PCL.Core.Test.App.EventBus; + +[TestClass] +public class EventBusServiceTest +{ + private record MyEvent(Guid Id, string Name, int Value) : EventDataBase(Id, Name); + + [TestMethod] + public async Task Publish_Calls_Delegate_Handler() + { + var channel = "test-delegate-" + Guid.NewGuid(); + Assert.IsTrue(EventBusService.AddChannel(channel)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var sub = EventBusService.Subscribe(channel, ev => + { + tcs.TrySetResult(ev.Value == 42); + return Task.CompletedTask; + }); + + await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "x", 42)); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(1000)); + Assert.AreEqual(tcs.Task, completed, "Handler was not invoked within timeout"); + Assert.IsTrue(await tcs.Task.ConfigureAwait(false)); + + EventBusService.RemoveChannel(channel); + } + + private class HandlerObject : IEventHandler + { + private readonly TaskCompletionSource _tcs; + public HandlerObject(TaskCompletionSource tcs) => _tcs = tcs; + public void Dispose() { } + public Task HandleEventAsync(MyEvent eventData) + { + _tcs.TrySetResult(eventData); + return _tcs.Task; + } + } + + [TestMethod] + public async Task Publish_Calls_IEventHandler_Instance() + { + var channel = "test-instance-" + Guid.NewGuid(); + Assert.IsTrue(EventBusService.AddChannel(channel)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var handler = new HandlerObject(tcs); + using var sub = EventBusService.Subscribe(channel, handler); + + await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "y", 7)); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(1000)); + Assert.AreEqual(tcs.Task, completed, "IEventHandler instance was not invoked"); + Assert.AreEqual(7, (await tcs.Task.ConfigureAwait(false)).Value); + + EventBusService.RemoveChannel(channel); + } + + [TestMethod] + public async Task Unsubscribe_Prevents_Handler_Call() + { + var channel = "test-unsub-" + Guid.NewGuid(); + Assert.IsTrue(EventBusService.AddChannel(channel)); + + var called = false; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var sub = EventBusService.Subscribe(channel, ev => + { + called = true; + tcs.TrySetResult(true); + return Task.CompletedTask; + }); + + sub.Dispose(); + + await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "z", 1)); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(300)); + Assert.AreNotEqual(tcs.Task, completed, "Handler should not be called after unsubscribe"); + Assert.IsFalse(called, "Handler flag should remain false"); + + EventBusService.RemoveChannel(channel); + } +} diff --git a/PCL.Core/App/EventBus/EventBusService.cs b/PCL.Core/App/EventBus/EventBusService.cs new file mode 100644 index 000000000..3f6eef83e --- /dev/null +++ b/PCL.Core/App/EventBus/EventBusService.cs @@ -0,0 +1,261 @@ +using PCL.Core.App.IoC; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace PCL.Core.App.EventBus; + +[LifecycleService(LifecycleState.BeforeLoading)] +[LifecycleScope("eventbus", "EventBus")] +public sealed partial class EventBusService +{ + private static readonly ConcurrentDictionary Handler, WeakReference? OwnerRef)>>> _Channels = []; + + /// + /// 0 = running, 1 = stopping/closed + /// + private static int _isStopping; + + [LifecycleStop] + private static Task _StopAsync() + { + Interlocked.Exchange(ref _isStopping, 1); + try + { + var channelCount = _Channels.Count; + var handlerCount = _Channels.Values.Sum(c => c.Values.Sum(h => h.Count)); + _Channels.Clear(); + Context.Info($"EventBus stopping: cleared {channelCount} channels and {handlerCount} handlers."); + return Task.CompletedTask; + } + catch (Exception exception) + { + Context.Error($"Exception while stopping EventBus: {exception}"); + return Task.FromException(exception); + } + } + + /// + /// Publish an event to a channel. All handlers subscribed to this channel with compatible event data type will be invoked. + /// + /// EventBus is stopping + public static Task PublishAsync(string channelName, TEventData data) where TEventData : EventDataBase + { + if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping"); + return _CallChannelAsync(channelName, data); + } + + /// + /// 订阅使用 IEventHandler{TEventData} 的对象实例。 + /// 返回 用于取消订阅。 + /// + /// EventBus is stopping + /// Failed to create channel + /// is + public static IDisposable Subscribe(string channel, IEventHandler handler, bool disposeOwnerOnUnsubscribe = false) + where TEventData : EventDataBase + { + if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping"); + if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel)); + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + if (!_Channels.TryGetValue(channel, out var dataHandler)) + { + Context.Trace($"Channel {channel} not found."); + //throw new InvalidOperationException("No channel found for the given channel identification."); + + // create channel if not exist + var success = AddChannel(channel); + if (!success) + { + throw new InvalidOperationException("Failed to create channel."); + } + } + + dataHandler ??= _Channels[channel]; // ensure dataHandler is not null here + + var dataType = typeof(TEventData); + var handlers = dataHandler.GetOrAdd(dataType, _ => []); + + var ownerRef = new WeakReference(handler); + + var id = Guid.NewGuid(); + handlers.TryAdd(id, (Wrapper, ownerRef)); + + return new Subscription(() => + { + handlers.TryRemove(id, out _); + if (handlers.IsEmpty) + { + dataHandler.TryRemove(dataType, out _); + } + + // disposal responsibility: if this subscription requested owner disposal, try to dispose target if still alive + if (disposeOwnerOnUnsubscribe && + ownerRef.TryGetTarget(out var tgt) && + tgt is IDisposable d) + { + // check if any remaining subscription in this channel still references the same owner + var stillReferenced = dataHandler.Values.Any(dict => dict.Values.Any(e => e.OwnerRef != null && e.OwnerRef.TryGetTarget(out var other) && ReferenceEquals(other, tgt))); + + if (stillReferenced) return; + + try { d.Dispose(); } catch (Exception ex) { Context.Error($"Exception disposing subscription owner: {ex}"); } + } + }); + + Task Wrapper(EventDataBase ev) + { + if (ownerRef.TryGetTarget(out var target) && target is IEventHandler typed) + { + return typed.HandleEventAsync((TEventData)ev); + } + return Task.CompletedTask; + } + } + + /// + /// 订阅一个委托 + /// + /// EventBus is stopping + /// Failed to create channel + /// is + public static IDisposable Subscribe(string channel, Func handler) + where TEventData : EventDataBase + { + if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping"); + if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel)); + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + if (!_Channels.TryGetValue(channel, out var dataHandler)) + { + Context.Trace($"Channel {channel} not found."); + //throw new InvalidOperationException("No channel found for the given channel identification."); + + // create channel if not exist + var success = AddChannel(channel); + if (!success) + { + throw new InvalidOperationException("Failed to create channel."); + } + + } + + dataHandler ??= _Channels[channel]; // ensure dataHandler is not null here + + var dataType = typeof(TEventData); + var handlers = dataHandler.GetOrAdd(dataType, _ => []); + + var id = Guid.NewGuid(); + handlers.TryAdd(id, (Wrapper, null)); + + return new Subscription(() => + { + handlers.TryRemove(id, out _); + if (handlers.IsEmpty) + { + dataHandler.TryRemove(dataType, out _); + } + }); + + Task Wrapper(EventDataBase ev) => handler((TEventData)ev); + } + + /// + /// 创建 channel(显式) + /// + public static bool AddChannel(string name) => !string.IsNullOrWhiteSpace(name) && _Channels.TryAdd(name, []); + + /// + /// Remove a channel and all its handlers. Use with caution. + /// + /// Channel name. + /// if the channel was removed; otherwise, . + public static bool RemoveChannel(string name) => _Channels.TryRemove(name, out _); + + private static Task _CallChannelAsync(string channel, TEventData data) + where TEventData : EventDataBase + { + if (!_Channels.TryGetValue(channel, out var eventHandlers)) + { + Context.Error($"Channel {channel} not found."); + throw new InvalidOperationException("No channel found for the given channel identification."); + } + + return _CallEventHandlerAsync(data, eventHandlers); + } + + private static Task _CallEventHandlerAsync(TEventData data, + ConcurrentDictionary Handler, WeakReference? OwnerRef)>> dataHandlers) + where TEventData : EventDataBase + { + var eventType = data.GetType(); + + var matching = new List>(); + foreach (var (registeredType, handlers) in dataHandlers) + { + if (registeredType.IsAssignableFrom(eventType)) + { + foreach (var kv in handlers.ToImmutableArray()) + { + var key = kv.Key; + var entry = kv.Value; + if (entry.OwnerRef is not null) + { + if (!entry.OwnerRef.TryGetTarget(out var _)) + { + // owner was collected, remove this subscription + handlers.TryRemove(key, out _); + continue; + } + } + matching.Add(entry.Handler); + } + } + } + + if (matching.Count == 0) + { + Context.Trace($"No handler found for event data type {eventType.Name}"); + return Task.CompletedTask; + // will not throw Exception + //throw new InvalidOperationException("No handler found for the given event data type."); + } + + var tasks = matching.Select(async h => + { + try + { + await h(data).ConfigureAwait(false); + } + catch (Exception ex) + { + Context.Error($"Event handler threw an exception: {ex}"); + } + }).ToImmutableArray(); + + return Task.WhenAll(tasks); + } + + + + private sealed class Subscription : IDisposable + { + private Action? _dispose; + + /// The dispose action is null. + public Subscription(Action dispose) => _dispose = dispose ?? throw new ArgumentNullException(nameof(dispose)); + + /// A delegate callback throws an exception. + public void Dispose() + { + var d = Interlocked.Exchange(ref _dispose, null); + d?.Invoke(); + } + } +} diff --git a/PCL.Core/App/EventBus/EventDataBase.cs b/PCL.Core/App/EventBus/EventDataBase.cs new file mode 100644 index 000000000..d83d5c70c --- /dev/null +++ b/PCL.Core/App/EventBus/EventDataBase.cs @@ -0,0 +1,5 @@ +using System; + +namespace PCL.Core.App.EventBus; + +public record EventDataBase(Guid Id, string Name); \ No newline at end of file diff --git a/PCL.Core/App/EventBus/IEventHandler.cs b/PCL.Core/App/EventBus/IEventHandler.cs new file mode 100644 index 000000000..24adbcb79 --- /dev/null +++ b/PCL.Core/App/EventBus/IEventHandler.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; + +namespace PCL.Core.App.EventBus; + +public interface IEventHandler : IDisposable + where TEventData : EventDataBase +{ + /// + /// Handle a event with the data, and the event is published by a publisher. + /// + /// The data that published by a publisher. + Task HandleEventAsync(TEventData eventData); +} \ No newline at end of file diff --git a/PCL.Core/App/EventBus/IResponsibleEventHandler.cs b/PCL.Core/App/EventBus/IResponsibleEventHandler.cs new file mode 100644 index 000000000..cf9ef5ddd --- /dev/null +++ b/PCL.Core/App/EventBus/IResponsibleEventHandler.cs @@ -0,0 +1,13 @@ +using System.Threading.Tasks; + +namespace PCL.Core.App.EventBus; + +// I think this is too hard to implement. So this is Obsolete. +public interface IResponsibleEventHandler +{ + /// + /// Handle a event with the data, and the event is published by a publisher, and return a response to the publisher. + /// + /// The event data that published by a publisher + Task HandleEventAsync(EventDataBase eventData); +} \ No newline at end of file