Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions src/ReactiveUI.Primitives/Disposables/Disposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,31 @@ public static class Disposable
/// <remarks>A <see langword="null"/> action returns <see cref="Empty"/> for backward compatibility with existing ReactiveUI.Primitives create pipelines.</remarks>
public static IDisposable Create(Action dispose) =>
dispose == null ? Empty : new ActionDisposable(dispose);

/// <summary>
/// Creates a disposable object that invokes the specified stateful action when disposed.
/// </summary>
/// <typeparam name="TState">The state type.</typeparam>
/// <param name="state">State passed to <paramref name="dispose"/>.</param>
/// <param name="dispose">Action to run during the first call to <see cref="IDisposable.Dispose"/>.</param>
/// <returns>The disposable object that runs the given action upon disposal.</returns>
public static IDisposable Create<TState>(TState state, Action<TState> dispose) =>
dispose == null ? Empty : new StateActionDisposable<TState>(state, dispose);

/// <summary>
/// Combines two disposable resources into a single disposable.
/// </summary>
/// <param name="first">The first disposable.</param>
/// <param name="second">The second disposable.</param>
/// <returns>A disposable that disposes both supplied resources.</returns>
public static IDisposable Combine(IDisposable first, IDisposable second) =>
MultipleDisposable.Create(first, second);

/// <summary>
/// Combines disposable resources into a single disposable.
/// </summary>
/// <param name="disposables">The disposables to combine.</param>
/// <returns>A disposable that disposes all supplied resources.</returns>
public static IDisposable Combine(params IDisposable[] disposables) =>
MultipleDisposable.Create(disposables);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

namespace ReactiveUI.Primitives.Disposables;

/// <summary>
/// Disposable that invokes a stateful action once.
/// </summary>
/// <typeparam name="TState">The state type.</typeparam>
internal sealed class StateActionDisposable<TState> : IDisposable
{
/// <summary>
/// State supplied to the dispose action.
/// </summary>
private readonly TState _state;

/// <summary>
/// Dispose action, cleared after the first invocation.
/// </summary>
private Action<TState>? _dispose;

/// <summary>
/// Initializes a new instance of the <see cref="StateActionDisposable{TState}"/> class.
/// </summary>
/// <param name="state">State passed to the dispose action.</param>
/// <param name="dispose">The dispose action.</param>
public StateActionDisposable(TState state, Action<TState> dispose)
{
_state = state;
_dispose = dispose;
}

/// <inheritdoc/>
public void Dispose() =>
Interlocked.Exchange(ref _dispose, null)?.Invoke(_state);
}
197 changes: 197 additions & 0 deletions src/ReactiveUI.Primitives/ObservableMixins.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved.
// ReactiveUI Association Incorporated licenses this file to you under the MIT license.
// See the LICENSE file in the project root for full license information.

using ReactiveUI.Primitives.Disposables;

namespace ReactiveUI.Primitives;

/// <summary>
/// Internal observable helpers for operators that ReactiveUI.Primitives needs but does not expose directly.
/// </summary>
internal static class ObservableMixins
{
/// <summary>
/// Forwards source values until <paramref name="other"/> emits a value. Completion of <paramref name="other"/> without
/// a value does not stop the source.
/// </summary>
/// <typeparam name="T">The source value type.</typeparam>
/// <typeparam name="TOther">The cancellation value type.</typeparam>
/// <param name="source">The source observable.</param>
/// <param name="other">The observable that stops the source when it emits.</param>
/// <returns>An observable that completes when the source completes or <paramref name="other"/> emits.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="other"/> is <see langword="null"/>.</exception>
public static IObservable<T> TakeUntil<T, TOther>(this IObservable<T> source, IObservable<TOther> other)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (other == null)
{
throw new ArgumentNullException(nameof(other));
}

return new TakeUntilSignal<T, TOther>(source, other);
}

/// <summary>Dedicated signal for <c>TakeUntil</c> that holds its sources without a per-subscription closure.</summary>
/// <typeparam name="T">The source value type.</typeparam>
/// <typeparam name="TOther">The cancellation value type.</typeparam>
private sealed class TakeUntilSignal<T, TOther> : IObservable<T>
{
/// <summary>The source observable.</summary>
private readonly IObservable<T> _source;

/// <summary>The observable that stops the source when it emits.</summary>
private readonly IObservable<TOther> _other;

/// <summary>Initializes a new instance of the <see cref="TakeUntilSignal{T, TOther}"/> class.</summary>
/// <param name="source">The source observable.</param>
/// <param name="other">The observable that stops the source when it emits.</param>
internal TakeUntilSignal(IObservable<T> source, IObservable<TOther> other)
{
_source = source;
_other = other;
}

/// <inheritdoc/>
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}

var coordinator = new Coordinator(observer);
coordinator.Add(_other.Subscribe(new CancelObserver(coordinator)));
if (coordinator.IsStopped)
{
return coordinator;
}

coordinator.Add(_source.Subscribe(new SourceObserver(coordinator)));
return coordinator;
}

/// <summary>Coordinates serialized observer callbacks and subscription lifetime for the source and cancellation streams.</summary>
private sealed class Coordinator : IDisposable
{
/// <summary>The downstream observer.</summary>
private readonly IObserver<T> _observer;

/// <summary>Serializes downstream observer callbacks.</summary>
private readonly Lock _gate = new();

/// <summary>Tracks the source and cancellation subscriptions.</summary>
private readonly MultipleDisposable _subscriptions = new();

/// <summary>Indicates whether the sequence has stopped (0 = running, 1 = stopped).</summary>
private int _stopped;

/// <summary>Initializes a new instance of the <see cref="Coordinator"/> class.</summary>
/// <param name="observer">The downstream observer.</param>
internal Coordinator(IObserver<T> observer) => _observer = observer;

/// <summary>Gets a value indicating whether the sequence has stopped.</summary>
internal bool IsStopped => Volatile.Read(ref _stopped) != 0;

/// <inheritdoc/>
public void Dispose() => _subscriptions.Dispose();

/// <summary>Adds a subscription to the coordinator lifetime.</summary>
/// <param name="subscription">The subscription to add.</param>
internal void Add(IDisposable subscription) => _subscriptions.Add(subscription);

/// <summary>Forwards a source value when the sequence has not stopped.</summary>
/// <param name="value">The source value.</param>
internal void Next(T value)
{
lock (_gate)
{
if (!IsStopped)
{
_observer.OnNext(value);
}
}
}

/// <summary>Completes the downstream observer once and disposes all subscriptions.</summary>
internal void Complete()
{
if (Interlocked.Exchange(ref _stopped, 1) != 0)
{
return;
}

lock (_gate)
{
_observer.OnCompleted();
}

_subscriptions.Dispose();
}

/// <summary>Sends an error to the downstream observer once and disposes all subscriptions.</summary>
/// <param name="exception">The exception to forward.</param>
internal void Error(Exception exception)
{
if (Interlocked.Exchange(ref _stopped, 1) != 0)
{
return;
}

lock (_gate)
{
_observer.OnError(exception);
}

_subscriptions.Dispose();
}
}

/// <summary>Observes the source stream and routes its notifications through the coordinator.</summary>
private sealed class SourceObserver : IObserver<T>
{
/// <summary>The owning coordinator.</summary>
private readonly Coordinator _coordinator;

/// <summary>Initializes a new instance of the <see cref="SourceObserver"/> class.</summary>
/// <param name="coordinator">The owning coordinator.</param>
internal SourceObserver(Coordinator coordinator) => _coordinator = coordinator;

/// <inheritdoc/>
public void OnNext(T value) => _coordinator.Next(value);

/// <inheritdoc/>
public void OnError(Exception error) => _coordinator.Error(error);

/// <inheritdoc/>
public void OnCompleted() => _coordinator.Complete();
}

/// <summary>Observes the cancellation stream; its first value (or error) stops the source.</summary>
private sealed class CancelObserver : IObserver<TOther>
{
/// <summary>The owning coordinator.</summary>
private readonly Coordinator _coordinator;

/// <summary>Initializes a new instance of the <see cref="CancelObserver"/> class.</summary>
/// <param name="coordinator">The owning coordinator.</param>
internal CancelObserver(Coordinator coordinator) => _coordinator = coordinator;

/// <inheritdoc/>
public void OnNext(TOther value) => _coordinator.Complete();

/// <inheritdoc/>
public void OnError(Exception error) => _coordinator.Error(error);

/// <inheritdoc/>
public void OnCompleted()
{
// Completion of the cancellation stream without a value does not stop the source.
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables
public static class Disposable
{
public static System.IDisposable Empty { get; }
public static System.IDisposable Combine(params System.IDisposable[] disposables) { }
public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { }
public static System.IDisposable Create(System.Action dispose) { }
public static System.IDisposable Create<TState>(TState state, System.Action<TState> dispose) { }
}
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables
public static class Disposable
{
public static System.IDisposable Empty { get; }
public static System.IDisposable Combine(params System.IDisposable[] disposables) { }
public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { }
public static System.IDisposable Create(System.Action dispose) { }
public static System.IDisposable Create<TState>(TState state, System.Action<TState> dispose) { }
}
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ namespace ReactiveUI.Primitives.Disposables
public static class Disposable
{
public static System.IDisposable Empty { get; }
public static System.IDisposable Combine(params System.IDisposable[] disposables) { }
public static System.IDisposable Combine(System.IDisposable first, System.IDisposable second) { }
public static System.IDisposable Create(System.Action dispose) { }
public static System.IDisposable Create<TState>(TState state, System.Action<TState> dispose) { }
}
[System.Diagnostics.DebuggerDisplay("{DebuggerDisplay,nq}")]
public sealed class DisposableBag : ReactiveUI.Primitives.Disposables.IsDisposed, System.IDisposable
Expand Down
Loading
Loading