diff --git a/src/ReactiveUI.Primitives/Disposables/Disposable.cs b/src/ReactiveUI.Primitives/Disposables/Disposable.cs index 8264706..43d8c14 100644 --- a/src/ReactiveUI.Primitives/Disposables/Disposable.cs +++ b/src/ReactiveUI.Primitives/Disposables/Disposable.cs @@ -22,4 +22,31 @@ public static class Disposable /// A action returns for backward compatibility with existing ReactiveUI.Primitives create pipelines. public static IDisposable Create(Action dispose) => dispose == null ? Empty : new ActionDisposable(dispose); + + /// + /// Creates a disposable object that invokes the specified stateful action when disposed. + /// + /// The state type. + /// State passed to . + /// Action to run during the first call to . + /// The disposable object that runs the given action upon disposal. + public static IDisposable Create(TState state, Action dispose) => + dispose == null ? Empty : new StateActionDisposable(state, dispose); + + /// + /// Combines two disposable resources into a single disposable. + /// + /// The first disposable. + /// The second disposable. + /// A disposable that disposes both supplied resources. + public static IDisposable Combine(IDisposable first, IDisposable second) => + MultipleDisposable.Create(first, second); + + /// + /// Combines disposable resources into a single disposable. + /// + /// The disposables to combine. + /// A disposable that disposes all supplied resources. + public static IDisposable Combine(params IDisposable[] disposables) => + MultipleDisposable.Create(disposables); } diff --git a/src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs b/src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs new file mode 100644 index 0000000..6a119bc --- /dev/null +++ b/src/ReactiveUI.Primitives/Disposables/StateActionDisposable{TState}.cs @@ -0,0 +1,37 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace ReactiveUI.Primitives.Disposables; + +/// +/// Disposable that invokes a stateful action once. +/// +/// The state type. +internal sealed class StateActionDisposable : IDisposable +{ + /// + /// State supplied to the dispose action. + /// + private readonly TState _state; + + /// + /// Dispose action, cleared after the first invocation. + /// + private Action? _dispose; + + /// + /// Initializes a new instance of the class. + /// + /// State passed to the dispose action. + /// The dispose action. + public StateActionDisposable(TState state, Action dispose) + { + _state = state; + _dispose = dispose; + } + + /// + public void Dispose() => + Interlocked.Exchange(ref _dispose, null)?.Invoke(_state); +} diff --git a/src/ReactiveUI.Primitives/ObservableMixins.cs b/src/ReactiveUI.Primitives/ObservableMixins.cs new file mode 100644 index 0000000..9089d7b --- /dev/null +++ b/src/ReactiveUI.Primitives/ObservableMixins.cs @@ -0,0 +1,197 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives; + +/// +/// Internal observable helpers for operators that ReactiveUI.Primitives needs but does not expose directly. +/// +internal static class ObservableMixins +{ + /// + /// Forwards source values until emits a value. Completion of without + /// a value does not stop the source. + /// + /// The source value type. + /// The cancellation value type. + /// The source observable. + /// The observable that stops the source when it emits. + /// An observable that completes when the source completes or emits. + /// or is . + public static IObservable TakeUntil(this IObservable source, IObservable other) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (other == null) + { + throw new ArgumentNullException(nameof(other)); + } + + return new TakeUntilSignal(source, other); + } + + /// Dedicated signal for TakeUntil that holds its sources without a per-subscription closure. + /// The source value type. + /// The cancellation value type. + private sealed class TakeUntilSignal : IObservable + { + /// The source observable. + private readonly IObservable _source; + + /// The observable that stops the source when it emits. + private readonly IObservable _other; + + /// Initializes a new instance of the class. + /// The source observable. + /// The observable that stops the source when it emits. + internal TakeUntilSignal(IObservable source, IObservable other) + { + _source = source; + _other = other; + } + + /// + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var coordinator = new Coordinator(observer); + coordinator.Add(_other.Subscribe(new CancelObserver(coordinator))); + if (coordinator.IsStopped) + { + return coordinator; + } + + coordinator.Add(_source.Subscribe(new SourceObserver(coordinator))); + return coordinator; + } + + /// Coordinates serialized observer callbacks and subscription lifetime for the source and cancellation streams. + private sealed class Coordinator : IDisposable + { + /// The downstream observer. + private readonly IObserver _observer; + + /// Serializes downstream observer callbacks. + private readonly Lock _gate = new(); + + /// Tracks the source and cancellation subscriptions. + private readonly MultipleDisposable _subscriptions = new(); + + /// Indicates whether the sequence has stopped (0 = running, 1 = stopped). + private int _stopped; + + /// Initializes a new instance of the class. + /// The downstream observer. + internal Coordinator(IObserver observer) => _observer = observer; + + /// Gets a value indicating whether the sequence has stopped. + internal bool IsStopped => Volatile.Read(ref _stopped) != 0; + + /// + public void Dispose() => _subscriptions.Dispose(); + + /// Adds a subscription to the coordinator lifetime. + /// The subscription to add. + internal void Add(IDisposable subscription) => _subscriptions.Add(subscription); + + /// Forwards a source value when the sequence has not stopped. + /// The source value. + internal void Next(T value) + { + lock (_gate) + { + if (!IsStopped) + { + _observer.OnNext(value); + } + } + } + + /// Completes the downstream observer once and disposes all subscriptions. + internal void Complete() + { + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; + } + + lock (_gate) + { + _observer.OnCompleted(); + } + + _subscriptions.Dispose(); + } + + /// Sends an error to the downstream observer once and disposes all subscriptions. + /// The exception to forward. + internal void Error(Exception exception) + { + if (Interlocked.Exchange(ref _stopped, 1) != 0) + { + return; + } + + lock (_gate) + { + _observer.OnError(exception); + } + + _subscriptions.Dispose(); + } + } + + /// Observes the source stream and routes its notifications through the coordinator. + private sealed class SourceObserver : IObserver + { + /// The owning coordinator. + private readonly Coordinator _coordinator; + + /// Initializes a new instance of the class. + /// The owning coordinator. + internal SourceObserver(Coordinator coordinator) => _coordinator = coordinator; + + /// + public void OnNext(T value) => _coordinator.Next(value); + + /// + public void OnError(Exception error) => _coordinator.Error(error); + + /// + public void OnCompleted() => _coordinator.Complete(); + } + + /// Observes the cancellation stream; its first value (or error) stops the source. + private sealed class CancelObserver : IObserver + { + /// The owning coordinator. + private readonly Coordinator _coordinator; + + /// Initializes a new instance of the class. + /// The owning coordinator. + internal CancelObserver(Coordinator coordinator) => _coordinator = coordinator; + + /// + public void OnNext(TOther value) => _coordinator.Complete(); + + /// + public void OnError(Exception error) => _coordinator.Error(error); + + /// + public void OnCompleted() + { + // Completion of the cancellation stream without a value does not stop the source. + } + } + } +} diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt index a102e5b..d6927b6 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables public static class Disposable { public static System.IDisposable Empty { get; } + public static System.IDisposable Combine(params System.IDisposable[] disposables) { } + public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { } public static System.IDisposable Create(System.Action dispose) { } + public static System.IDisposable Create(TState state, System.Action dispose) { } } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt index a102e5b..d6927b6 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables public static class Disposable { public static System.IDisposable Empty { get; } + public static System.IDisposable Combine(params System.IDisposable[] disposables) { } + public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { } public static System.IDisposable Create(System.Action dispose) { } + public static System.IDisposable Create(TState state, System.Action dispose) { } } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt index a102e5b..d6927b6 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables public static class Disposable { public static System.IDisposable Empty { get; } + public static System.IDisposable Combine(params System.IDisposable[] disposables) { } + public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { } public static System.IDisposable Create(System.Action dispose) { } + public static System.IDisposable Create(TState state, System.Action dispose) { } } [System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")] public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable diff --git a/src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs b/src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs new file mode 100644 index 0000000..cf603c8 --- /dev/null +++ b/src/tests/ReactiveUI.Primitives.Tests/ObservableMixinsTests.cs @@ -0,0 +1,105 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using System.Reactive.Subjects; + +namespace ReactiveUI.Primitives.Tests; + +/// +/// Tests for internal observable helper operators. +/// +public class ObservableMixinsTests +{ + /// + /// First emitted source value. + /// + private const int FirstValue = 1; + + /// + /// Second emitted source value. + /// + private const int SecondValue = 2; + + /// + /// Stopper signal value. + /// + private const string StopValue = "stop"; + + /// + /// Verifies that TakeUntil completes when the other observable emits. + /// + [Test] + public void WhenOtherEmits_ThenCompletesAndStopsForwardingSource() + { + using var source = new Subject(); + using var other = new Subject(); + var values = new List(); + var completed = false; + + using var subscription = source.TakeUntil(other) + .Subscribe(values.Add, ThrowUnexpectedError, () => completed = true); + + source.OnNext(FirstValue); + other.OnNext(StopValue); + source.OnNext(SecondValue); + + Assert.Equal([FirstValue], values); + Assert.True(completed); + } + + /// + /// Verifies that TakeUntil keeps the source alive when the other observable completes without a value. + /// + [Test] + public void WhenOtherCompletesWithoutValue_ThenSourceContinues() + { + using var source = new Subject(); + using var other = new Subject(); + var values = new List(); + var completed = false; + + using var subscription = source.TakeUntil(other) + .Subscribe(values.Add, ThrowUnexpectedError, () => completed = true); + + source.OnNext(FirstValue); + other.OnCompleted(); + source.OnNext(SecondValue); + source.OnCompleted(); + + Assert.Equal([FirstValue, SecondValue], values); + Assert.True(completed); + } + + /// + /// Verifies that TakeUntil forwards errors from the other observable. + /// + [Test] + public void WhenOtherErrors_ThenErrorIsForwardedAndSourceStops() + { + using var source = new Subject(); + using var other = new Subject(); + var expected = new InvalidOperationException("expected"); + var values = new List(); + Exception? observed = null; + var completed = false; + + using var subscription = source.TakeUntil(other) + .Subscribe(values.Add, exception => observed = exception, () => completed = true); + + source.OnNext(FirstValue); + other.OnError(expected); + source.OnNext(SecondValue); + + Assert.Equal([FirstValue], values); + Assert.Same(expected, observed!); + Assert.False(completed); + } + + /// + /// Throws when an unexpected error arrives. + /// + /// The unexpected exception. + private static void ThrowUnexpectedError(Exception exception) => + throw new InvalidOperationException("Unexpected error.", exception); +}