From d3970775f3e145b46f8588c8773d561029ecf23d Mon Sep 17 00:00:00 2001 From: Chris Pulman Date: Tue, 2 Jun 2026 14:15:32 +0100 Subject: [PATCH 1/3] Add serial/state disposables and ObservableMixins Introduce SerialDisposable and StateActionDisposable to support replaceable disposables and stateful disposal semantics, both implemented with thread-safe Interlocked/Volatile usage. Extend Disposable with Create and Combine overloads to create stateful disposables and combine multiple disposables via MultipleDisposable. Add internal ObservableMixins.TakeUntil helper to forward values until another observable emits, using a gate and MultipleDisposable for coordinated completion/error handling. --- .../Disposables/Disposable.cs | 27 +++++ .../Disposables/SerialDisposable.cs | 73 +++++++++++++ .../StateActionDisposable{TState}.cs | 37 +++++++ src/ReactiveUI.Primitives/ObservableMixins.cs | 102 ++++++++++++++++++ 4 files changed, 239 insertions(+) create mode 100644 src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs create mode 100644 src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs create mode 100644 src/ReactiveUI.Primitives/ObservableMixins.cs diff --git a/src/ReactiveUI.Primitives/Disposables/Disposable.cs b/src/ReactiveUI.Primitives/Disposables/Disposable.cs index 8264706..43d8c14 100644 --- a/src/ReactiveUI.Primitives/Disposables/Disposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/Disposable.cs @@ -22,4 +22,31 @@ public static class Disposable /// A action returns for backward compatibility with existing ReactiveUI.Primitives create pipelines. public static IDisposable Create(Action dispose) => dispose == null ? Empty : new ActionDisposable(dispose); + + /// + /// Creates a disposable object that invokes the specified stateful action when disposed. + /// + /// The state type. + /// State passed to . + /// Action to run during the first call to . + /// The disposable object that runs the given action upon disposal. + public static IDisposable Create(TState state, Action dispose) => + dispose == null ? Empty : new StateActionDisposable(state, dispose); + + /// + /// Combines two disposable resources into a single disposable. + /// + /// The first disposable. + /// The second disposable. + /// A disposable that disposes both supplied resources. + public static IDisposable Combine(IDisposable first, IDisposable second) => + MultipleDisposable.Create(first, second); + + /// + /// Combines disposable resources into a single disposable. + /// + /// The disposables to combine. + /// A disposable that disposes all supplied resources. + public static IDisposable Combine(params IDisposable[] disposables) => + MultipleDisposable.Create(disposables); } diff --git a/src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs b/src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs new file mode 100644 index 0000000..cff504b --- /dev/null +++ b/src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs @@ -0,0 +1,73 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace ReactiveUI.Primitives.Disposables; + +/// +/// Holds a replaceable disposable and disposes the previous assignment when changed. +/// +public sealed class SerialDisposable : IDisposable +{ + /// + /// Marker used after disposal. + /// + private static readonly IDisposable DisposedSentinel = new DisposedMarker(); + + /// + /// Current disposable assignment. + /// + private IDisposable? _current; + + /// + /// Gets or sets the current disposable. + /// + public IDisposable? Disposable + { + get + { + var current = Volatile.Read(ref _current); + return ReferenceEquals(current, DisposedSentinel) ? null : current; + } + + set + { + IDisposable? current; + do + { + current = Volatile.Read(ref _current); + if (ReferenceEquals(current, DisposedSentinel)) + { + value?.Dispose(); + return; + } + } + while (!ReferenceEquals(Interlocked.CompareExchange(ref _current, value, current), current)); + + current?.Dispose(); + } + } + + /// + public void Dispose() + { + var current = Interlocked.Exchange(ref _current, DisposedSentinel); + if (current == null || ReferenceEquals(current, DisposedSentinel)) + { + return; + } + + current.Dispose(); + } + + /// + /// Disposable marker for disposed slots. + /// + private sealed class DisposedMarker : IDisposable + { + /// + public void Dispose() + { + } + } +} diff --git a/src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs b/src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs new file mode 100644 index 0000000..6a119bc --- /dev/null +++ b/src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs @@ -0,0 +1,37 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace ReactiveUI.Primitives.Disposables; + +/// +/// Disposable that invokes a stateful action once. +/// +/// The state type. +internal sealed class StateActionDisposable : IDisposable +{ + /// + /// State supplied to the dispose action. + /// + private readonly TState _state; + + /// + /// Dispose action, cleared after the first invocation. + /// + private Action? _dispose; + + /// + /// Initializes a new instance of the class. + /// + /// State passed to the dispose action. + /// The dispose action. + public StateActionDisposable(TState state, Action dispose) + { + _state = state; + _dispose = dispose; + } + + /// + public void Dispose() => + Interlocked.Exchange(ref _dispose, null)?.Invoke(_state); +} diff --git a/src/ReactiveUI.Primitives/ObservableMixins.cs b/src/ReactiveUI.Primitives/ObservableMixins.cs new file mode 100644 index 0000000..a3ad658 --- /dev/null +++ b/src/ReactiveUI.Primitives/ObservableMixins.cs @@ -0,0 +1,102 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System; +using System.Threading; +using ReactiveUI.Primitives.Disposables; +using ReactiveUI.Primitives.Signals; + +namespace ReactiveUI.Primitives; + +/// +/// Internal observable helpers for operators that ReactiveUI.Primitives needs but does not expose directly. +/// +internal static class ObservableMixins +{ + /// + /// Forwards source values until emits a value. Completion of without + /// a value does not stop the source. + /// + /// The source value type. + /// The cancellation value type. + /// The source observable. + /// The observable that stops the source when it emits. + /// An observable that completes when the source completes or emits. + public static IObservable TakeUntil(this IObservable source, IObservable other) + { +#if NET8_0_OR_GREATER + ArgumentNullException.ThrowIfNull(source); + ArgumentNullException.ThrowIfNull(other); +#else + if (source is null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (other is null) + { + throw new ArgumentNullException(nameof(other)); + } +#endif + + return Signal.Create(observer => + { + var subscriptions = new MultipleDisposable(); + var gate = new Lock(); + var stopped = 0; + + void Complete() + { + if (Interlocked.Exchange(ref stopped, 1) != 0) + { + return; + } + + lock (gate) + { + observer.OnCompleted(); + } + + subscriptions.Dispose(); + } + + void Error(Exception error) + { + if (Interlocked.Exchange(ref stopped, 1) != 0) + { + return; + } + + lock (gate) + { + observer.OnError(error); + } + + subscriptions.Dispose(); + } + + subscriptions.Add(other.Subscribe(_ => Complete(), Error)); + if (Volatile.Read(ref stopped) != 0) + { + return subscriptions; + } + + subscriptions.Add(source.Subscribe( + value => + { + lock (gate) + { + if (Volatile.Read(ref stopped) == 0) + { + observer.OnNext(value); + } + } + }, + Error, + Complete)); + + return subscriptions; + }); + } +} From 00fba1da09d4a0f46e14c4d8722fd3ef34fff9c2 Mon Sep 17 00:00:00 2001 From: Chris Pulman Date: Wed, 3 Jun 2026 07:21:10 +0100 Subject: [PATCH 2/3] Refactor TakeUntil to coordinator and add tests Replace inline locking/subscription management in ObservableMixins.TakeUntil with a dedicated TakeUntilCoordinator that serializes observer callbacks, manages subscriptions, and ensures single completion/error handling and proper disposal. Add ObservableMixinsTests to verify TakeUntil behavior when the other observable emits, completes, or errors. Update API approval file to reflect added Disposable helpers and SerialDisposable types. --- src/ReactiveUI.Primitives/ObservableMixins.cs | 137 ++++++++++++------ ...valTests.Primitives.DotNet8_0.verified.txt | 9 ++ .../ObservableMixinsTests.cs | 105 ++++++++++++++ 3 files changed, 210 insertions(+), 41 deletions(-) create mode 100644 src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs diff --git a/src/ReactiveUI.Primitives/ObservableMixins.cs b/src/ReactiveUI.Primitives/ObservableMixins.cs index a3ad658..3761996 100644 --- a/src/ReactiveUI.Primitives/ObservableMixins.cs +++ b/src/ReactiveUI.Primitives/ObservableMixins.cs @@ -42,61 +42,116 @@ public static IObservable TakeUntil(this IObservable source, IO return Signal.Create(observer => { - var subscriptions = new MultipleDisposable(); - var gate = new Lock(); - var stopped = 0; + var coordinator = new TakeUntilCoordinator(observer); - void Complete() + coordinator.Add(other.Subscribe(_ => coordinator.Complete(), coordinator.Error)); + if (coordinator.IsStopped) { - if (Interlocked.Exchange(ref stopped, 1) != 0) - { - return; - } + return coordinator; + } - lock (gate) - { - observer.OnCompleted(); - } + coordinator.Add(source.Subscribe(coordinator.Next, coordinator.Error, coordinator.Complete)); + + return coordinator; + }); + } + + /// + /// Coordinates serialized observer callbacks and subscription lifetime for . + /// + /// The source value type. + private sealed class TakeUntilCoordinator : IDisposable + { + /// + /// The downstream observer. + /// + private readonly IObserver _observer; + + /// + /// Serializes downstream observer callbacks. + /// + private readonly Lock _gate = new(); + + /// + /// Tracks the source and cancellation subscriptions. + /// + private readonly MultipleDisposable _subscriptions = new(); + + /// + /// Indicates whether the sequence has already stopped. + /// + private int _stopped; + + /// + /// Initializes a new instance of the class. + /// + /// The downstream observer. + public TakeUntilCoordinator(IObserver observer) => _observer = observer; + + /// + /// Gets a value indicating whether the sequence has stopped. + /// + public bool IsStopped => Volatile.Read(ref _stopped) != 0; + + /// + /// Adds a subscription to the coordinator lifetime. + /// + /// The subscription to add. + public void Add(IDisposable subscription) => _subscriptions.Add(subscription); - subscriptions.Dispose(); + /// + /// Completes the downstream observer once and disposes all subscriptions. + /// + public void Complete() + { + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; } - void Error(Exception error) + lock (_gate) { - if (Interlocked.Exchange(ref stopped, 1) != 0) - { - return; - } + _observer.OnCompleted(); + } - lock (gate) - { - observer.OnError(error); - } + _subscriptions.Dispose(); + } - subscriptions.Dispose(); + /// + /// Sends an error to the downstream observer once and disposes all subscriptions. + /// + /// The exception to forward. + public void Error(Exception exception) + { + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; } - subscriptions.Add(other.Subscribe(_ => Complete(), Error)); - if (Volatile.Read(ref stopped) != 0) + lock (_gate) { - return subscriptions; + _observer.OnError(exception); } - subscriptions.Add(source.Subscribe( - value => + _subscriptions.Dispose(); + } + + /// + /// Forwards a source value when the sequence has not stopped. + /// + /// The source value. + public void Next(T value) + { + lock (_gate) + { + if (!IsStopped) { - lock (gate) - { - if (Volatile.Read(ref stopped) == 0) - { - observer.OnNext(value); - } - } - }, - Error, - Complete)); - - return subscriptions; - }); + _observer.OnNext(value); + } + } + } + + /// + public void Dispose() => _subscriptions.Dispose(); } } diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt index a102e5b..bc71ea0 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables public static class Disposable { public static System.IDisposable Empty { get; } + public static System.IDisposable Combine(params System.IDisposable[] disposables) { } + public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { } public static System.IDisposable Create(System.Action dispose) { } + public static System.IDisposable Create(TState state, System.Action dispose) { } } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable @@ -873,6 +876,12 @@ namespace ReactiveUI.Primitives.Disposables public Pocket(System.IDisposable first, System.IDisposable second) { } public Pocket(System.IDisposable first, System.IDisposable second, System.IDisposable third) { } } + public sealed class SerialDisposable : System.IDisposable + { + public SerialDisposable() { } + public System.IDisposable? Disposable { get; set; } + public void Dispose() { } + } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public class SingleDisposable : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable { diff --git a/src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs b/src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs new file mode 100644 index 0000000..cf603c8 --- /dev/null +++ b/src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs @@ -0,0 +1,105 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Subjects; + +namespace ReactiveUI.Primitives.Tests; + +/// +/// Tests for internal observable helper operators. +/// +public class ObservableMixinsTests +{ + /// + /// First emitted source value. + /// + private const int FirstValue = 1; + + /// + /// Second emitted source value. + /// + private const int SecondValue = 2; + + /// + /// Stopper signal value. + /// + private const string StopValue = "stop"; + + /// + /// Verifies that TakeUntil completes when the other observable emits. + /// + [Test] + public void WhenOtherEmits_ThenCompletesAndStopsForwardingSource() + { + using var source = new Subject(); + using var other = new Subject(); + var values = new List(); + var completed = false; + + using var subscription = source.TakeUntil(other) + .Subscribe(values.Add, ThrowUnexpectedError, () => completed = true); + + source.OnNext(FirstValue); + other.OnNext(StopValue); + source.OnNext(SecondValue); + + Assert.Equal([FirstValue], values); + Assert.True(completed); + } + + /// + /// Verifies that TakeUntil keeps the source alive when the other observable completes without a value. + /// + [Test] + public void WhenOtherCompletesWithoutValue_ThenSourceContinues() + { + using var source = new Subject(); + using var other = new Subject(); + var values = new List(); + var completed = false; + + using var subscription = source.TakeUntil(other) + .Subscribe(values.Add, ThrowUnexpectedError, () => completed = true); + + source.OnNext(FirstValue); + other.OnCompleted(); + source.OnNext(SecondValue); + source.OnCompleted(); + + Assert.Equal([FirstValue, SecondValue], values); + Assert.True(completed); + } + + /// + /// Verifies that TakeUntil forwards errors from the other observable. + /// + [Test] + public void WhenOtherErrors_ThenErrorIsForwardedAndSourceStops() + { + using var source = new Subject(); + using var other = new Subject(); + var expected = new InvalidOperationException("expected"); + var values = new List(); + Exception? observed = null; + var completed = false; + + using var subscription = source.TakeUntil(other) + .Subscribe(values.Add, exception => observed = exception, () => completed = true); + + source.OnNext(FirstValue); + other.OnError(expected); + source.OnNext(SecondValue); + + Assert.Equal([FirstValue], values); + Assert.Same(expected, observed!); + Assert.False(completed); + } + + /// + /// Throws when an unexpected error arrives. + /// + /// The unexpected exception. + private static void ThrowUnexpectedError(Exception exception) => + throw new InvalidOperationException("Unexpected error.", exception); +} From f01ea5d0a0796c11c8eeabcd8c5dc991a3038011 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 16:51:20 +1000 Subject: [PATCH 3/3] refactor: consolidate disposable + lighten TakeUntil on the disposables work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Validate the AddDisposables changes for perf/alloc/thread-safety in line with the codebase: - Drop the new SerialDisposable (it conflicted with the System.Reactive name and reimplemented the lock-free replaceable-disposable with a private DisposedMarker). The existing public SwapDisposable already provides it under a non-conflicting name, backed by the shared DisposableSlotHelper — one implementation. - Rewrite TakeUntil as a dedicated TakeUntilSignal with nested source/ cancel IObserver classes instead of Signal.Create(closure) + delegate-based Subscribe, removing the per-subscription closure and delegate allocations while preserving the Interlocked one-shot + gate thread-safety. - Complete the API snapshots (net9/net10 were missing Combine/Create; net8 no longer lists SerialDisposable). Co-Authored-By: Chris Pulman --- .../Disposables/SerialDisposable.cs | 73 ------ src/ReactiveUI.Primitives/ObservableMixins.cs | 232 ++++++++++-------- ...alTests.Primitives.DotNet10_0.verified.txt | 3 + ...valTests.Primitives.DotNet8_0.verified.txt | 6 - ...valTests.Primitives.DotNet9_0.verified.txt | 3 + 5 files changed, 142 insertions(+), 175 deletions(-) delete mode 100644 src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs diff --git a/src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs b/src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs deleted file mode 100644 index cff504b..0000000 --- a/src/ReactiveUI.Primitives/Disposables/SerialDisposable.cs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. -// ReactiveUI Association Incorporated licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -namespace ReactiveUI.Primitives.Disposables; - -/// -/// Holds a replaceable disposable and disposes the previous assignment when changed. -/// -public sealed class SerialDisposable : IDisposable -{ - /// - /// Marker used after disposal. - /// - private static readonly IDisposable DisposedSentinel = new DisposedMarker(); - - /// - /// Current disposable assignment. - /// - private IDisposable? _current; - - /// - /// Gets or sets the current disposable. - /// - public IDisposable? Disposable - { - get - { - var current = Volatile.Read(ref _current); - return ReferenceEquals(current, DisposedSentinel) ? null : current; - } - - set - { - IDisposable? current; - do - { - current = Volatile.Read(ref _current); - if (ReferenceEquals(current, DisposedSentinel)) - { - value?.Dispose(); - return; - } - } - while (!ReferenceEquals(Interlocked.CompareExchange(ref _current, value, current), current)); - - current?.Dispose(); - } - } - - /// - public void Dispose() - { - var current = Interlocked.Exchange(ref _current, DisposedSentinel); - if (current == null || ReferenceEquals(current, DisposedSentinel)) - { - return; - } - - current.Dispose(); - } - - /// - /// Disposable marker for disposed slots. - /// - private sealed class DisposedMarker : IDisposable - { - /// - public void Dispose() - { - } - } -} diff --git a/src/ReactiveUI.Primitives/ObservableMixins.cs b/src/ReactiveUI.Primitives/ObservableMixins.cs index 3761996..9089d7b 100644 --- a/src/ReactiveUI.Primitives/ObservableMixins.cs +++ b/src/ReactiveUI.Primitives/ObservableMixins.cs @@ -2,10 +2,7 @@ // ReactiveUI Association Incorporated licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. -using System; -using System.Threading; using ReactiveUI.Primitives.Disposables; -using ReactiveUI.Primitives.Signals; namespace ReactiveUI.Primitives; @@ -23,135 +20,178 @@ internal static class ObservableMixins /// The source observable. /// The observable that stops the source when it emits. /// An observable that completes when the source completes or emits. + /// or is . public static IObservable TakeUntil(this IObservable source, IObservable other) { -#if NET8_0_OR_GREATER - ArgumentNullException.ThrowIfNull(source); - ArgumentNullException.ThrowIfNull(other); -#else - if (source is null) + if (source == null) { throw new ArgumentNullException(nameof(source)); } - if (other is null) + if (other == null) { throw new ArgumentNullException(nameof(other)); } -#endif - return Signal.Create(observer => + return new TakeUntilSignal(source, other); + } + + /// Dedicated signal for TakeUntil that holds its sources without a per-subscription closure. + /// The source value type. + /// The cancellation value type. + private sealed class TakeUntilSignal : IObservable + { + /// The source observable. + private readonly IObservable _source; + + /// The observable that stops the source when it emits. + private readonly IObservable _other; + + /// Initializes a new instance of the class. + /// The source observable. + /// The observable that stops the source when it emits. + internal TakeUntilSignal(IObservable source, IObservable other) { - var coordinator = new TakeUntilCoordinator(observer); + _source = source; + _other = other; + } - coordinator.Add(other.Subscribe(_ => coordinator.Complete(), coordinator.Error)); + /// + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var coordinator = new Coordinator(observer); + coordinator.Add(_other.Subscribe(new CancelObserver(coordinator))); if (coordinator.IsStopped) { return coordinator; } - coordinator.Add(source.Subscribe(coordinator.Next, coordinator.Error, coordinator.Complete)); - + coordinator.Add(_source.Subscribe(new SourceObserver(coordinator))); return coordinator; - }); - } + } - /// - /// Coordinates serialized observer callbacks and subscription lifetime for . - /// - /// The source value type. - private sealed class TakeUntilCoordinator : IDisposable - { - /// - /// The downstream observer. - /// - private readonly IObserver _observer; - - /// - /// Serializes downstream observer callbacks. - /// - private readonly Lock _gate = new(); - - /// - /// Tracks the source and cancellation subscriptions. - /// - private readonly MultipleDisposable _subscriptions = new(); - - /// - /// Indicates whether the sequence has already stopped. - /// - private int _stopped; - - /// - /// Initializes a new instance of the class. - /// - /// The downstream observer. - public TakeUntilCoordinator(IObserver observer) => _observer = observer; - - /// - /// Gets a value indicating whether the sequence has stopped. - /// - public bool IsStopped => Volatile.Read(ref _stopped) != 0; - - /// - /// Adds a subscription to the coordinator lifetime. - /// - /// The subscription to add. - public void Add(IDisposable subscription) => _subscriptions.Add(subscription); - - /// - /// Completes the downstream observer once and disposes all subscriptions. - /// - public void Complete() + /// Coordinates serialized observer callbacks and subscription lifetime for the source and cancellation streams. + private sealed class Coordinator : IDisposable { - if (Interlocked.Exchange(ref _stopped, 1) != 0) + /// The downstream observer. + private readonly IObserver _observer; + + /// Serializes downstream observer callbacks. + private readonly Lock _gate = new(); + + /// Tracks the source and cancellation subscriptions. + private readonly MultipleDisposable _subscriptions = new(); + + /// Indicates whether the sequence has stopped (0 = running, 1 = stopped). + private int _stopped; + + /// Initializes a new instance of the class. + /// The downstream observer. + internal Coordinator(IObserver observer) => _observer = observer; + + /// Gets a value indicating whether the sequence has stopped. + internal bool IsStopped => Volatile.Read(ref _stopped) != 0; + + /// + public void Dispose() => _subscriptions.Dispose(); + + /// Adds a subscription to the coordinator lifetime. + /// The subscription to add. + internal void Add(IDisposable subscription) => _subscriptions.Add(subscription); + + /// Forwards a source value when the sequence has not stopped. + /// The source value. + internal void Next(T value) { - return; + lock (_gate) + { + if (!IsStopped) + { + _observer.OnNext(value); + } + } } - lock (_gate) + /// Completes the downstream observer once and disposes all subscriptions. + internal void Complete() { - _observer.OnCompleted(); - } + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; + } - _subscriptions.Dispose(); - } + lock (_gate) + { + _observer.OnCompleted(); + } - /// - /// Sends an error to the downstream observer once and disposes all subscriptions. - /// - /// The exception to forward. - public void Error(Exception exception) - { - if (Interlocked.Exchange(ref _stopped, 1) != 0) - { - return; + _subscriptions.Dispose(); } - lock (_gate) + /// Sends an error to the downstream observer once and disposes all subscriptions. + /// The exception to forward. + internal void Error(Exception exception) { - _observer.OnError(exception); + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; + } + + lock (_gate) + { + _observer.OnError(exception); + } + + _subscriptions.Dispose(); } + } + + /// Observes the source stream and routes its notifications through the coordinator. + private sealed class SourceObserver : IObserver + { + /// The owning coordinator. + private readonly Coordinator _coordinator; + + /// Initializes a new instance of the class. + /// The owning coordinator. + internal SourceObserver(Coordinator coordinator) => _coordinator = coordinator; - _subscriptions.Dispose(); + /// + public void OnNext(T value) => _coordinator.Next(value); + + /// + public void OnError(Exception error) => _coordinator.Error(error); + + /// + public void OnCompleted() => _coordinator.Complete(); } - /// - /// Forwards a source value when the sequence has not stopped. - /// - /// The source value. - public void Next(T value) + /// Observes the cancellation stream; its first value (or error) stops the source. + private sealed class CancelObserver : IObserver { - lock (_gate) + /// The owning coordinator. + private readonly Coordinator _coordinator; + + /// Initializes a new instance of the class. + /// The owning coordinator. + internal CancelObserver(Coordinator coordinator) => _coordinator = coordinator; + + /// + public void OnNext(TOther value) => _coordinator.Complete(); + + /// + public void OnError(Exception error) => _coordinator.Error(error); + + /// + public void OnCompleted() { - if (!IsStopped) - { - _observer.OnNext(value); - } + // Completion of the cancellation stream without a value does not stop the source. } } - - /// - public void Dispose() => _subscriptions.Dispose(); } } diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt index a102e5b..d6927b6 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables public static class Disposable { public static System.IDisposable Empty { get; } + public static System.IDisposable Combine(params System.IDisposable[] disposables) { } + public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { } public static System.IDisposable Create(System.Action dispose) { } + public static System.IDisposable Create(TState state, System.Action dispose) { } } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt index bc71ea0..d6927b6 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt @@ -876,12 +876,6 @@ namespace ReactiveUI.Primitives.Disposables public Pocket(System.IDisposable first, System.IDisposable second) { } public Pocket(System.IDisposable first, System.IDisposable second, System.IDisposable third) { } } - public sealed class SerialDisposable : System.IDisposable - { - public SerialDisposable() { } - public System.IDisposable? Disposable { get; set; } - public void Dispose() { } - } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public class SingleDisposable : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable { diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt index a102e5b..d6927b6 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables public static class Disposable { public static System.IDisposable Empty { get; } + public static System.IDisposable Combine(params System.IDisposable[] disposables) { } + public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { } public static System.IDisposable Create(System.Action dispose) { } + public static System.IDisposable Create(TState state, System.Action dispose) { } } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable