From c15f7ab2da570eed95fee973f5e377aae665d510 Mon Sep 17 00:00:00 2001 From: WhiteCAT <64885812+whitecat346@users.noreply.github.com> Date: Sat, 23 May 2026 14:21:18 +0800 Subject: [PATCH 1/4] feat: event bus finished --- .../App/EventBus/EventBusServiceTest.cs | 91 +++++++++ PCL.Core/App/EventBus/EventBusService.cs | 184 ++++++++++++++++++ PCL.Core/App/EventBus/EventDataBase.cs | 5 + PCL.Core/App/EventBus/IEventHandler.cs | 14 ++ .../App/EventBus/IResponsibleEventHandler.cs | 13 ++ 5 files changed, 307 insertions(+) create mode 100644 PCL.Core.Test/App/EventBus/EventBusServiceTest.cs create mode 100644 PCL.Core/App/EventBus/EventBusService.cs create mode 100644 PCL.Core/App/EventBus/EventDataBase.cs create mode 100644 PCL.Core/App/EventBus/IEventHandler.cs create mode 100644 PCL.Core/App/EventBus/IResponsibleEventHandler.cs diff --git a/PCL.Core.Test/App/EventBus/EventBusServiceTest.cs b/PCL.Core.Test/App/EventBus/EventBusServiceTest.cs new file mode 100644 index 000000000..73565e0f8 --- /dev/null +++ b/PCL.Core.Test/App/EventBus/EventBusServiceTest.cs @@ -0,0 +1,91 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using PCL.Core.App.EventBus; +using System; +using System.Threading.Tasks; + +namespace PCL.Core.Test.App.EventBus; + +[TestClass] +public class EventBusServiceTest +{ + private record MyEvent(Guid Id, string Name, int Value) : EventDataBase(Id, Name); + + [TestMethod] + public async Task Publish_Calls_Delegate_Handler() + { + var channel = "test-delegate-" + Guid.NewGuid(); + Assert.IsTrue(EventBusService.AddChannel(channel)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + using var sub = EventBusService.Subscribe(channel, ev => + { + tcs.TrySetResult(ev.Value == 42); + return Task.CompletedTask; + }); + + await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "x", 42)); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(1000)); + Assert.AreEqual(tcs.Task, completed, "Handler was not invoked within timeout"); + Assert.IsTrue(await tcs.Task.ConfigureAwait(false)); + + EventBusService.RemoveChannel(channel); + } + + private class HandlerObject : IEventHandler + { + private readonly TaskCompletionSource _tcs; + public HandlerObject(TaskCompletionSource tcs) => _tcs = tcs; + public void Dispose() { } + public Task HandleEventAsync(MyEvent eventData) + { + _tcs.TrySetResult(eventData); + return _tcs.Task; + } + } + + [TestMethod] + public async Task Publish_Calls_IEventHandler_Instance() + { + var channel = "test-instance-" + Guid.NewGuid(); + Assert.IsTrue(EventBusService.AddChannel(channel)); + + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var handler = new HandlerObject(tcs); + using var sub = EventBusService.Subscribe(channel, handler); + + await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "y", 7)); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(1000)); + Assert.AreEqual(tcs.Task, completed, "IEventHandler instance was not invoked"); + Assert.AreEqual(7, (await tcs.Task.ConfigureAwait(false)).Value); + + EventBusService.RemoveChannel(channel); + } + + [TestMethod] + public async Task Unsubscribe_Prevents_Handler_Call() + { + var channel = "test-unsub-" + Guid.NewGuid(); + Assert.IsTrue(EventBusService.AddChannel(channel)); + + var called = false; + var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var sub = EventBusService.Subscribe(channel, ev => + { + called = true; + tcs.TrySetResult(true); + return Task.CompletedTask; + }); + + sub.Dispose(); + + await EventBusService.PublishAsync(channel, new MyEvent(Guid.NewGuid(), "z", 1)); + + var completed = await Task.WhenAny(tcs.Task, Task.Delay(300)); + Assert.AreNotEqual(tcs.Task, completed, "Handler should not be called after unsubscribe"); + Assert.IsFalse(called, "Handler flag should remain false"); + + EventBusService.RemoveChannel(channel); + } +} diff --git a/PCL.Core/App/EventBus/EventBusService.cs b/PCL.Core/App/EventBus/EventBusService.cs new file mode 100644 index 000000000..e5faa7530 --- /dev/null +++ b/PCL.Core/App/EventBus/EventBusService.cs @@ -0,0 +1,184 @@ +using PCL.Core.App.IoC; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace PCL.Core.App.EventBus; + +[LifecycleService(LifecycleState.BeforeLoading)] +[LifecycleScope("eventbus", "EventBus")] +public sealed partial class EventBusService +{ + private static readonly ConcurrentDictionary Handler, object? Owner)>>> _Channels = []; + + [LifecycleStop] + private static Task _StopAsync() + { + try + { + foreach (var channel in _Channels.Values) + { + foreach (var handlersByType in channel.Values) + { + foreach (var entry in handlersByType.Values) + { + if (entry.Owner is IDisposable d) + { + try { d.Dispose(); } catch { /* ignore */ } + } + } + } + } + + _Channels.Clear(); + return Task.CompletedTask; + } + catch (Exception exception) + { + return Task.FromException(exception); + } + } + + public static Task PublishAsync(string channelName, TEventData data) where TEventData : EventDataBase + => _CallChannelAsync(channelName, data); + + /// + /// 订阅使用 IEventHandler{TEventData} 的对象实例。 + /// 返回 用于取消订阅。 + /// + public static IDisposable Subscribe(string channel, IEventHandler handler) + where TEventData : EventDataBase + { + if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel)); + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + if (!_Channels.TryGetValue(channel, out var dataHandler)) + { + Context.Error($"Channel {channel} not found."); + throw new InvalidOperationException("No channel found for the given channel identification."); + } + + var dataType = typeof(TEventData); + var handlers = dataHandler.GetOrAdd(dataType, _ => []); + + var id = Guid.NewGuid(); + handlers.TryAdd(id, (Wrapper, handler)); + + return new Subscription(() => + { + handlers.TryRemove(id, out _); + if (handlers.IsEmpty) + { + dataHandler.TryRemove(dataType, out _); + } + }); + + Task Wrapper(EventDataBase ev) => handler.HandleEventAsync((TEventData)ev); + } + + /// + /// 订阅一个委托(更轻量) + /// + public static IDisposable Subscribe(string channel, Func handler) + where TEventData : EventDataBase + { + if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel)); + if (handler == null) throw new ArgumentNullException(nameof(handler)); + + if (!_Channels.TryGetValue(channel, out var dataHandler)) + { + Context.Error($"Channel {channel} not found."); + throw new InvalidOperationException("No channel found for the given channel identification."); + } + + var dataType = typeof(TEventData); + var handlers = dataHandler.GetOrAdd(dataType, _ => []); + + var id = Guid.NewGuid(); + handlers.TryAdd(id, (Wrapper, null)); + + return new Subscription(() => + { + handlers.TryRemove(id, out _); + if (handlers.IsEmpty) + { + dataHandler.TryRemove(dataType, out _); + } + }); + + Task Wrapper(EventDataBase ev) => handler((TEventData)ev); + } + + /// + /// 创建 channel(显式) + /// + public static bool AddChannel(string name) => !string.IsNullOrWhiteSpace(name) && _Channels.TryAdd(name, []); + + public static bool RemoveChannel(string name) => _Channels.TryRemove(name, out _); + + private static Task _CallChannelAsync(string channel, TEventData data) + where TEventData : EventDataBase + { + if (!_Channels.TryGetValue(channel, out var eventHandlers)) + { + Context.Error($"Channel {channel} not found."); + throw new InvalidOperationException("No channel found for the given channel identification."); + } + + return _CallEventHandlerAsync(data, eventHandlers); + } + + private static Task _CallEventHandlerAsync(TEventData data, ConcurrentDictionary Handler, object? Owner)>> dataHandlers) + where TEventData : EventDataBase + { + var eventType = data.GetType(); + + var matching = new List>(); + foreach (var (registeredType, handlers) in dataHandlers) + { + if (registeredType.IsAssignableFrom(eventType)) + { + foreach (var entry in handlers.Values.ToImmutableArray()) + { + matching.Add(entry.Handler); + } + } + } + + if (matching.Count == 0) + { + Context.Error($"No handler found for event data type {eventType.Name}"); + throw new InvalidOperationException("No handler found for the given event data type."); + } + + var tasks = matching.Select(async h => + { + try + { + await h(data).ConfigureAwait(false); + } + catch (Exception ex) + { + Context.Error($"Event handler threw an exception: {ex}"); + } + }).ToImmutableArray(); + + return Task.WhenAll(tasks); + } + + private sealed class Subscription : IDisposable + { + private Action? _dispose; + public Subscription(Action dispose) => _dispose = dispose ?? throw new ArgumentNullException(nameof(dispose)); + public void Dispose() + { + var d = Interlocked.Exchange(ref _dispose, null); + d?.Invoke(); + } + } +} diff --git a/PCL.Core/App/EventBus/EventDataBase.cs b/PCL.Core/App/EventBus/EventDataBase.cs new file mode 100644 index 000000000..d83d5c70c --- /dev/null +++ b/PCL.Core/App/EventBus/EventDataBase.cs @@ -0,0 +1,5 @@ +using System; + +namespace PCL.Core.App.EventBus; + +public record EventDataBase(Guid Id, string Name); \ No newline at end of file diff --git a/PCL.Core/App/EventBus/IEventHandler.cs b/PCL.Core/App/EventBus/IEventHandler.cs new file mode 100644 index 000000000..24adbcb79 --- /dev/null +++ b/PCL.Core/App/EventBus/IEventHandler.cs @@ -0,0 +1,14 @@ +using System; +using System.Threading.Tasks; + +namespace PCL.Core.App.EventBus; + +public interface IEventHandler : IDisposable + where TEventData : EventDataBase +{ + /// + /// Handle a event with the data, and the event is published by a publisher. + /// + /// The data that published by a publisher. + Task HandleEventAsync(TEventData eventData); +} \ No newline at end of file diff --git a/PCL.Core/App/EventBus/IResponsibleEventHandler.cs b/PCL.Core/App/EventBus/IResponsibleEventHandler.cs new file mode 100644 index 000000000..cf9ef5ddd --- /dev/null +++ b/PCL.Core/App/EventBus/IResponsibleEventHandler.cs @@ -0,0 +1,13 @@ +using System.Threading.Tasks; + +namespace PCL.Core.App.EventBus; + +// I think this is too hard to implement. So this is Obsolete. +public interface IResponsibleEventHandler +{ + /// + /// Handle a event with the data, and the event is published by a publisher, and return a response to the publisher. + /// + /// The event data that published by a publisher + Task HandleEventAsync(EventDataBase eventData); +} \ No newline at end of file From 3ece80402a034b8e27bd8d3b6642947b4cc75df5 Mon Sep 17 00:00:00 2001 From: WhiteCAT <64885812+whitecat346@users.noreply.github.com> Date: Sat, 23 May 2026 14:40:15 +0800 Subject: [PATCH 2/4] fix: fix some probs --- PCL.Core/App/EventBus/EventBusService.cs | 83 +++++++++++++++++------- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/PCL.Core/App/EventBus/EventBusService.cs b/PCL.Core/App/EventBus/EventBusService.cs index e5faa7530..0ca08fbce 100644 --- a/PCL.Core/App/EventBus/EventBusService.cs +++ b/PCL.Core/App/EventBus/EventBusService.cs @@ -14,46 +14,49 @@ namespace PCL.Core.App.EventBus; public sealed partial class EventBusService { private static readonly ConcurrentDictionary Handler, object? Owner)>>> _Channels = []; + ConcurrentDictionary Handler, WeakReference? OwnerRef, bool OwnsOwner)>>> _Channels = []; + + /// + /// 0 = running, 1 = stopping/closed + /// + private static int _isStopping; [LifecycleStop] private static Task _StopAsync() { + Interlocked.Exchange(ref _isStopping, 1); try { - foreach (var channel in _Channels.Values) - { - foreach (var handlersByType in channel.Values) - { - foreach (var entry in handlersByType.Values) - { - if (entry.Owner is IDisposable d) - { - try { d.Dispose(); } catch { /* ignore */ } - } - } - } - } - + var channelCount = _Channels.Count; + var handlerCount = _Channels.Values.Sum(c => c.Values.Sum(h => h.Count)); _Channels.Clear(); + Context.Error($"EventBus stopping: cleared {channelCount} channels and {handlerCount} handlers."); return Task.CompletedTask; } catch (Exception exception) { + Context.Error($"Exception while stopping EventBus: {exception}"); return Task.FromException(exception); } } + /// EventBus is stopping public static Task PublishAsync(string channelName, TEventData data) where TEventData : EventDataBase - => _CallChannelAsync(channelName, data); + { + if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping"); + return _CallChannelAsync(channelName, data); + } /// /// 订阅使用 IEventHandler{TEventData} 的对象实例。 /// 返回 用于取消订阅。 /// - public static IDisposable Subscribe(string channel, IEventHandler handler) + /// EventBus is stopping + /// is + public static IDisposable Subscribe(string channel, IEventHandler handler, bool disposeOwnerOnUnsubscribe = false) where TEventData : EventDataBase { + if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping"); if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel)); if (handler == null) throw new ArgumentNullException(nameof(handler)); @@ -64,10 +67,12 @@ public static IDisposable Subscribe(string channel, IEventHandler []); + var handlers = dataHandler.GetOrAdd(dataType, _ => new ConcurrentDictionary, WeakReference?, bool)>()); + + var ownerRef = new WeakReference(handler); var id = Guid.NewGuid(); - handlers.TryAdd(id, (Wrapper, handler)); + handlers.TryAdd(id, (Wrapper, ownerRef, disposeOwnerOnUnsubscribe)); return new Subscription(() => { @@ -76,9 +81,27 @@ public static IDisposable Subscribe(string channel, IEventHandler dict.Values.Any(e => e.OwnerRef != null && e.OwnerRef.TryGetTarget(out var other) && ReferenceEquals(other, tgt))); + + if (stillReferenced) return; + + try { d.Dispose(); } catch (Exception ex) { Context.Error($"Exception disposing subscription owner: {ex}"); } + } }); - Task Wrapper(EventDataBase ev) => handler.HandleEventAsync((TEventData)ev); + Task Wrapper(EventDataBase ev) + { + if (ownerRef.TryGetTarget(out var target) && target is IEventHandler typed) + { + return typed.HandleEventAsync((TEventData)ev); + } + return Task.CompletedTask; + } } /// @@ -87,6 +110,7 @@ public static IDisposable Subscribe(string channel, IEventHandler(string channel, Func handler) where TEventData : EventDataBase { + if (Volatile.Read(ref _isStopping) != 0) throw new InvalidOperationException("EventBus is stopping"); if (string.IsNullOrWhiteSpace(channel)) throw new ArgumentNullException(nameof(channel)); if (handler == null) throw new ArgumentNullException(nameof(handler)); @@ -100,7 +124,7 @@ public static IDisposable Subscribe(string channel, Func []); var id = Guid.NewGuid(); - handlers.TryAdd(id, (Wrapper, null)); + handlers.TryAdd(id, (Wrapper, null, false)); return new Subscription(() => { @@ -133,7 +157,7 @@ private static Task _CallChannelAsync(string channel, TEventData dat return _CallEventHandlerAsync(data, eventHandlers); } - private static Task _CallEventHandlerAsync(TEventData data, ConcurrentDictionary Handler, object? Owner)>> dataHandlers) + private static Task _CallEventHandlerAsync(TEventData data, ConcurrentDictionary Handler, WeakReference? OwnerRef, bool OwnsOwner)>> dataHandlers) where TEventData : EventDataBase { var eventType = data.GetType(); @@ -143,8 +167,19 @@ private static Task _CallEventHandlerAsync(TEventData data, Concurre { if (registeredType.IsAssignableFrom(eventType)) { - foreach (var entry in handlers.Values.ToImmutableArray()) + foreach (var kv in handlers.ToImmutableArray()) { + var key = kv.Key; + var entry = kv.Value; + if (entry.OwnerRef != null) + { + if (!entry.OwnerRef.TryGetTarget(out var _)) + { + // owner was collected, remove this subscription + handlers.TryRemove(key, out _); + continue; + } + } matching.Add(entry.Handler); } } @@ -171,6 +206,8 @@ private static Task _CallEventHandlerAsync(TEventData data, Concurre return Task.WhenAll(tasks); } + + private sealed class Subscription : IDisposable { private Action? _dispose; From 423d962c5cfadc454f2c1b4402b7ab5f93e09933 Mon Sep 17 00:00:00 2001 From: WhiteCAT <64885812+whitecat346@users.noreply.github.com> Date: Sat, 23 May 2026 14:46:35 +0800 Subject: [PATCH 3/4] fix: will not throw exception when publish an event without handler --- PCL.Core/App/EventBus/EventBusService.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/PCL.Core/App/EventBus/EventBusService.cs b/PCL.Core/App/EventBus/EventBusService.cs index 0ca08fbce..8532cc87c 100644 --- a/PCL.Core/App/EventBus/EventBusService.cs +++ b/PCL.Core/App/EventBus/EventBusService.cs @@ -188,7 +188,9 @@ private static Task _CallEventHandlerAsync(TEventData data, Concurre if (matching.Count == 0) { Context.Error($"No handler found for event data type {eventType.Name}"); - throw new InvalidOperationException("No handler found for the given event data type."); + return Task.CompletedTask; + // will not throw Exception + //throw new InvalidOperationException("No handler found for the given event data type."); } var tasks = matching.Select(async h => From e1c098d984c5915fefcf6d89c2088f122c68f44c Mon Sep 17 00:00:00 2001 From: WhiteCAT <64885812+whitecat346@users.noreply.github.com> Date: Sat, 23 May 2026 16:33:41 +0800 Subject: [PATCH 4/4] fix: fix some proms; add some comments --- PCL.Core/App/EventBus/EventBusService.cs | 66 +++++++++++++++++++----- 1 file changed, 52 insertions(+), 14 deletions(-) diff --git a/PCL.Core/App/EventBus/EventBusService.cs b/PCL.Core/App/EventBus/EventBusService.cs index 8532cc87c..3f6eef83e 100644 --- a/PCL.Core/App/EventBus/EventBusService.cs +++ b/PCL.Core/App/EventBus/EventBusService.cs @@ -14,7 +14,7 @@ namespace PCL.Core.App.EventBus; public sealed partial class EventBusService { private static readonly ConcurrentDictionary Handler, WeakReference? OwnerRef, bool OwnsOwner)>>> _Channels = []; + ConcurrentDictionary Handler, WeakReference? OwnerRef)>>> _Channels = []; /// /// 0 = running, 1 = stopping/closed @@ -30,7 +30,7 @@ private static Task _StopAsync() var channelCount = _Channels.Count; var handlerCount = _Channels.Values.Sum(c => c.Values.Sum(h => h.Count)); _Channels.Clear(); - Context.Error($"EventBus stopping: cleared {channelCount} channels and {handlerCount} handlers."); + Context.Info($"EventBus stopping: cleared {channelCount} channels and {handlerCount} handlers."); return Task.CompletedTask; } catch (Exception exception) @@ -40,6 +40,9 @@ private static Task _StopAsync() } } + /// + /// Publish an event to a channel. All handlers subscribed to this channel with compatible event data type will be invoked. + /// /// EventBus is stopping public static Task PublishAsync(string channelName, TEventData data) where TEventData : EventDataBase { @@ -52,6 +55,7 @@ public static Task PublishAsync(string channelName, TEventData data) /// 返回 用于取消订阅。 /// /// EventBus is stopping + /// Failed to create channel /// is public static IDisposable Subscribe(string channel, IEventHandler handler, bool disposeOwnerOnUnsubscribe = false) where TEventData : EventDataBase @@ -62,17 +66,26 @@ public static IDisposable Subscribe(string channel, IEventHandler new ConcurrentDictionary, WeakReference?, bool)>()); + var handlers = dataHandler.GetOrAdd(dataType, _ => []); var ownerRef = new WeakReference(handler); var id = Guid.NewGuid(); - handlers.TryAdd(id, (Wrapper, ownerRef, disposeOwnerOnUnsubscribe)); + handlers.TryAdd(id, (Wrapper, ownerRef)); return new Subscription(() => { @@ -83,7 +96,9 @@ public static IDisposable Subscribe(string channel, IEventHandler dict.Values.Any(e => e.OwnerRef != null && e.OwnerRef.TryGetTarget(out var other) && ReferenceEquals(other, tgt))); @@ -105,8 +120,11 @@ Task Wrapper(EventDataBase ev) } /// - /// 订阅一个委托(更轻量) + /// 订阅一个委托 /// + /// EventBus is stopping + /// Failed to create channel + /// is public static IDisposable Subscribe(string channel, Func handler) where TEventData : EventDataBase { @@ -116,15 +134,25 @@ public static IDisposable Subscribe(string channel, Func []); var id = Guid.NewGuid(); - handlers.TryAdd(id, (Wrapper, null, false)); + handlers.TryAdd(id, (Wrapper, null)); return new Subscription(() => { @@ -143,6 +171,11 @@ public static IDisposable Subscribe(string channel, Func public static bool AddChannel(string name) => !string.IsNullOrWhiteSpace(name) && _Channels.TryAdd(name, []); + /// + /// Remove a channel and all its handlers. Use with caution. + /// + /// Channel name. + /// if the channel was removed; otherwise, . public static bool RemoveChannel(string name) => _Channels.TryRemove(name, out _); private static Task _CallChannelAsync(string channel, TEventData data) @@ -157,7 +190,8 @@ private static Task _CallChannelAsync(string channel, TEventData dat return _CallEventHandlerAsync(data, eventHandlers); } - private static Task _CallEventHandlerAsync(TEventData data, ConcurrentDictionary Handler, WeakReference? OwnerRef, bool OwnsOwner)>> dataHandlers) + private static Task _CallEventHandlerAsync(TEventData data, + ConcurrentDictionary Handler, WeakReference? OwnerRef)>> dataHandlers) where TEventData : EventDataBase { var eventType = data.GetType(); @@ -171,7 +205,7 @@ private static Task _CallEventHandlerAsync(TEventData data, Concurre { var key = kv.Key; var entry = kv.Value; - if (entry.OwnerRef != null) + if (entry.OwnerRef is not null) { if (!entry.OwnerRef.TryGetTarget(out var _)) { @@ -187,7 +221,7 @@ private static Task _CallEventHandlerAsync(TEventData data, Concurre if (matching.Count == 0) { - Context.Error($"No handler found for event data type {eventType.Name}"); + Context.Trace($"No handler found for event data type {eventType.Name}"); return Task.CompletedTask; // will not throw Exception //throw new InvalidOperationException("No handler found for the given event data type."); @@ -213,7 +247,11 @@ private static Task _CallEventHandlerAsync(TEventData data, Concurre private sealed class Subscription : IDisposable { private Action? _dispose; + + /// The dispose action is null. public Subscription(Action dispose) => _dispose = dispose ?? throw new ArgumentNullException(nameof(dispose)); + + /// A delegate callback throws an exception. public void Dispose() { var d = Interlocked.Exchange(ref _dispose, null);