From f940cbee5f23cb6c25b01211479263875b525ef7 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:23:18 +1000 Subject: [PATCH 1/8] feat(LinqMixins): add System.Reactive/LINQ name layer + dedicated sinks Add 33 familiar System.Reactive/LINQ operator names (Select, Where, Scan, Aggregate, Merge, Concat, Amb, Switch, Zip, CombineLatest, WithLatestFrom, Do, DoWith, SelectWith, WhereWith, WhereNotNull, DistinctUntilChanged(+By), IgnoreElements, SelectMany, Delay, Timeout, Sample, Retry, Materialize, Dematerialize) as first-class operators. Each builds the SAME sink as its Primitives-named counterpart directly (no forwarding/redirect), so the two names are interchangeable with identical behaviour and allocation profile. Both name sets are fully supported. Replace closure-piggyback operator bodies with dedicated sinks so no per-call closure is allocated: MapWith/KeepWith/TapWith get internal *WithSignal sinks; Tap/Do reuse the existing TapSignal; Resume gets an internal ResumeSignal (shared with a future Catch(fallback)); Timestamp gets an internal TimestampSignal for the non-range path. Tests: genericified parity tests (RxNameParityTests) drive each operator pair once via TUnit [MethodDataSource] and assert each name matches the expected sequence and is behaviorally identical to its twin (unary, higher-order, binary, and virtual-clock time families). Add CPD exclusions for the deliberately duplicated Rx-name/sink files. Update the 3 API-approval snapshots and document the name layer in the README. --- .github/workflows/sonarcloud.yml | 2 +- README.md | 36 +- .../SignalOperatorMixins.Coordinators.cs | 4 +- .../SignalOperatorMixins.cs | 39 +- .../SignalOperatorParityMixins.RxNames.cs | 717 ++++++++++++++++++ .../SignalOperatorParityMixins.cs | 2 +- .../Signals/Core/ResumeSignal{T}.cs | 161 ++++ .../Signals/KeepWithSignal{T,TState}.cs | 142 ++++ .../MapWithSignal{TSource,TState,TResult}.cs | 138 ++++ .../Signals/TapWithSignal{T,TState}.cs | 127 ++++ .../Signals/TimestampSignal{T}.cs | 112 +++ ...alTests.Primitives.DotNet10_0.verified.txt | 34 + ...valTests.Primitives.DotNet8_0.verified.txt | 34 + ...valTests.Primitives.DotNet9_0.verified.txt | 34 + .../RxNameParityTests.cs | 448 +++++++++++ 15 files changed, 2016 insertions(+), 14 deletions(-) create mode 100644 src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs create mode 100644 src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs create mode 100644 src/ReactiveUI.Primitives/Signals/KeepWithSignal{T,TState}.cs create mode 100644 src/ReactiveUI.Primitives/Signals/MapWithSignal{TSource,TState,TResult}.cs create mode 100644 src/ReactiveUI.Primitives/Signals/TapWithSignal{T,TState}.cs create mode 100644 src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs create mode 100644 src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs diff --git a/.github/workflows/sonarcloud.yml b/.github/workflows/sonarcloud.yml index ead0a18..51b66e1 100644 --- a/.github/workflows/sonarcloud.yml +++ b/.github/workflows/sonarcloud.yml @@ -40,7 +40,7 @@ jobs: sonarOrganization: reactiveui sonarExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/TestResults/**' sonarCoverageExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/*Tests/**,**/*Tests.cs,**/Generated/**' - sonarCpdExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/CombineLatest?.cs,**/CombineLatest??.cs,**/ReactiveUI.Primitives.Blazor/**,**/ReactiveUI.Primitives.Maui/**,**/ReactiveUI.Primitives.WinUI/**,**/ReactiveUI.Primitives.WinForms/**,**/ReactiveUI.Primitives.Wpf/**' + sonarCpdExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/CombineLatest?.cs,**/CombineLatest??.cs,**/ReactiveUI.Primitives.Blazor/**,**/ReactiveUI.Primitives.Maui/**,**/ReactiveUI.Primitives.WinUI/**,**/ReactiveUI.Primitives.WinForms/**,**/ReactiveUI.Primitives.Wpf/**,**/SignalOperatorParityMixins.RxNames.cs,**/MapWithSignal*.cs,**/KeepWithSignal*.cs,**/TapWithSignal*.cs' sonarTestExclusions: '**/tests/**,**/tools/**,**/benchmarks/**' testTimeout: '15m' secrets: diff --git a/README.md b/README.md index cda1a3d..36e0ff3 100644 --- a/README.md +++ b/README.md @@ -276,7 +276,41 @@ IObservable source = Signal.CreateSafe(observer => ## Operators -Operators are extension methods over `IObservable`. ReactiveUI.Primitives uses a distinct vocabulary for operators that would otherwise collide with System.Reactive or R3. +Operators are extension methods over `IObservable`. ReactiveUI.Primitives has a distinct vocabulary (`Map`, `Keep`, `Fold`, `Blend`, `SwitchTo`, …) that avoids ambiguous-call collisions with System.Reactive or R3 — but the familiar System.Reactive / LINQ names are also available (see below), so you can write whichever reads best. + +### System.Reactive / LINQ name layer + +The everyday System.Reactive / LINQ names are first-class operators that build the **same sink** as their Primitives-named counterpart — identical behavior and allocation profile, not wrappers. Both name sets are fully supported and interchangeable; pick whichever reads best for your code. + +| LINQ / System.Reactive name | Primitives name | | LINQ / System.Reactive name | Primitives name | +|---|---|---|---|---| +| `Select` | `Map` | | `Merge` | `Blend` | +| `SelectWith` | `MapWith` | | `Concat` | `Chain` | +| `Where` | `Keep` | | `Amb` | `Race` | +| `WhereWith` | `KeepWith` | | `Switch` | `SwitchTo` | +| `WhereNotNull` | `KeepNotNull` | | `Zip` | `Pair` | +| `Do` | `Tap` | | `CombineLatest` | `SyncLatest` | +| `DoWith` | `TapWith` | | `WithLatestFrom` | `Latch` | +| `Scan` | `Fold` | | `SelectMany` | `FlatMap` | +| `Aggregate` | `Reduce` | | `Delay` | `Shift` | +| `DistinctUntilChanged` | `Unique` | | `Timeout` | `Expire` | +| `DistinctUntilChangedBy` | `UniqueBy` | | `Sample` | `Probe` | +| `IgnoreElements` | `IgnoreValues` | | `Retry` | `Reattempt` | +| `Materialize` | `Spark` | | `Dematerialize` | `Unspark` | + +```csharp +using ReactiveUI.Primitives; +using ReactiveUI.Primitives.Signals; + +// Reads exactly like System.Reactive — and builds the identical sinks as Map/Keep/Fold. +using var subscription = Signal.Sequence(1, 10) + .Where(value => value % 2 == 0) + .Select(value => value * value) + .Scan(0, (total, value) => total + value) + .Subscribe(Console.WriteLine); +``` + +> Caveat: because these names live in the `ReactiveUI.Primitives` namespace, a file that *also* imports `System.Reactive.Linq` will get ambiguous-call errors on shared names like `.Select`/`.Where`. Use the Primitives names (`Map`/`Keep`) in those mixed files, or migrate the file fully off System.Reactive. ### Transformation and filtering diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs index 22f02d8..407a90d 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs @@ -757,7 +757,7 @@ public void OnNext(T value) /// The coordinator that owns the subscription cleanup. internal ExpireCoordinator Run() { - _timer = _sequencer.Schedule(this, _dueTime, static (_, coordinator) => coordinator.Timeout()); + _timer = _sequencer.Schedule(this, _dueTime, static (_, coordinator) => coordinator.EmitTimeout()); _subscription = _source.Subscribe(this); if (Volatile.Read(ref _done) == 0) { @@ -772,7 +772,7 @@ internal ExpireCoordinator Run() /// Emits the timeout error. /// /// An empty disposable. - private IDisposable Timeout() + private IDisposable EmitTimeout() { if (Interlocked.Exchange(ref _done, 1) != 0) { diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs index 8eecbfe..5411b87 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs @@ -55,12 +55,17 @@ public static IObservable Map(this IObservable is . public static IObservable MapWith(this IObservable source, TState state, Func selector) { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + if (selector == null) { throw new ArgumentNullException(nameof(selector)); } - return source.Map(value => selector(state, value)); + return new MapWithSignal(source, state, selector); } /// @@ -100,12 +105,17 @@ public static IObservable Keep(this IObservable source, Func p /// is . public static IObservable KeepWith(this IObservable source, TState state, Func predicate) { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + if (predicate == null) { throw new ArgumentNullException(nameof(predicate)); } - return source.Keep(value => predicate(state, value)); + return new KeepWithSignal(source, state, predicate); } /// @@ -178,16 +188,17 @@ public static IObservable CastTo(this IObservable sou /// is . public static IObservable Tap(this IObservable source, Action onNext) { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + if (onNext == null) { throw new ArgumentNullException(nameof(onNext)); } - return source.Map(value => - { - onNext(value); - return value; - }); + return new TapSignal(source, onNext, static _ => { }, static () => { }); } /// @@ -202,12 +213,17 @@ public static IObservable Tap(this IObservable source, Action onNext /// is . public static IObservable TapWith(this IObservable source, TState state, Action onNext) { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + if (onNext == null) { throw new ArgumentNullException(nameof(onNext)); } - return source.Tap(value => onNext(state, value)); + return new TapWithSignal(source, state, onNext); } /// @@ -652,12 +668,17 @@ public static IObservable Rescue(this IObservable source, Func or is . public static IObservable Resume(this IObservable source, IObservable fallback) { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + if (fallback == null) { throw new ArgumentNullException(nameof(fallback)); } - return source.Recover(_ => fallback); + return new ResumeSignal(source, fallback); } /// diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs new file mode 100644 index 0000000..67ec01b --- /dev/null +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs @@ -0,0 +1,717 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Core; +using ReactiveUI.Primitives.Signals; +using ReactiveUI.Primitives.Signals.Core; + +namespace ReactiveUI.Primitives; + +/// +/// System.Reactive / LINQ familiar names for the Primitives operator vocabulary. Each method builds the same sink as +/// its Primitives-named counterpart directly, so the two names are interchangeable with identical behaviour and +/// allocation profile. Both name sets are fully supported. +/// +public static partial class LinqMixins +{ + /// + /// Projects each element of an observable sequence into a new form. LINQ name for Map. + /// + /// The type of the elements in the source sequence. + /// The type of the elements in the result sequence. + /// An observable sequence of elements to project. + /// A transform function to apply to each element. + /// An observable sequence whose elements are the result of invoking the transform function on each source element. + /// or is . + public static IObservable Select(this IObservable source, Func selector) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + return new MapSignal(source, selector); + } + + /// + /// Projects each element into a new form using external state passed to the selector. State-carrying name for MapWith. + /// + /// The type of the elements in the source sequence. + /// The type of the state used in the selector function. + /// The type of the elements in the result sequence. + /// An observable sequence of elements to project. + /// The state to pass to the selector function. + /// A transform function to apply to each source element along with the state. + /// An observable sequence whose elements are the result of invoking the transform on each source element and the state. + /// or is . + public static IObservable SelectWith(this IObservable source, TState state, Func selector) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + return new MapWithSignal(source, state, selector); + } + + /// + /// Filters an observable sequence to elements that satisfy a predicate. LINQ name for Keep. + /// + /// The type of elements in the observable sequence. + /// The source observable sequence to filter. + /// A function to test each element for a condition. + /// An observable sequence containing the elements that satisfy . + /// or is . + public static IObservable Where(this IObservable source, Func predicate) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (predicate == null) + { + throw new ArgumentNullException(nameof(predicate)); + } + + return new KeepSignal(source, predicate); + } + + /// + /// Filters elements using a predicate that uses external state. State-carrying name for KeepWith. + /// + /// The type of elements in the source sequence. + /// The type of the state parameter passed to the predicate. + /// The source observable sequence to filter. + /// The state value to pass to the predicate for each element. + /// A function to test each element along with the state. + /// An observable sequence containing only the elements that satisfy the predicate. + /// or is . + public static IObservable WhereWith(this IObservable source, TState state, Func predicate) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (predicate == null) + { + throw new ArgumentNullException(nameof(predicate)); + } + + return new KeepWithSignal(source, state, predicate); + } + + /// + /// Filters out null values, emitting only non-null values. Familiar name for KeepNotNull. + /// + /// The type of elements in the observable sequence. + /// The source observable sequence to filter. + /// An observable sequence that emits only the non-null values from the source sequence. + /// is . + public static IObservable WhereNotNull(this IObservable source) + where T : class + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + return new KeepNotNullSignal(source); + } + + /// + /// Invokes an action for each value while preserving the sequence. System.Reactive name for Tap. + /// + /// The value type. + /// The source sequence. + /// The action to invoke for each value. + /// The source values after the action has run. + /// or is . + public static IObservable Do(this IObservable source, Action onNext) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (onNext == null) + { + throw new ArgumentNullException(nameof(onNext)); + } + + return new TapSignal(source, onNext, static _ => { }, static () => { }); + } + + /// + /// Invokes a stateful action for each value while preserving the sequence. State-carrying name for TapWith. + /// + /// The value type. + /// The state type. + /// The source sequence. + /// The state passed to . + /// The action to invoke for each value. + /// The source values after the action has run. + /// or is . + public static IObservable DoWith(this IObservable source, TState state, Action onNext) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (onNext == null) + { + throw new ArgumentNullException(nameof(onNext)); + } + + return new TapWithSignal(source, state, onNext); + } + + /// + /// Emits the accumulated state after each source value. System.Reactive name for Fold. + /// + /// The source value type. + /// The accumulated value type. + /// The source sequence. + /// The initial accumulated value. + /// The function that combines the current state with the next source value. + /// A sequence of intermediate accumulated values. + /// or is . + public static IObservable Scan(this IObservable source, TAccumulate seed, Func accumulator) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (accumulator == null) + { + throw new ArgumentNullException(nameof(accumulator)); + } + + return new FoldSignal(source, seed, accumulator); + } + + /// + /// Emits the final accumulated state when the source completes. System.Reactive name for Reduce. + /// + /// The source value type. + /// The accumulated value type. + /// The source sequence. + /// The initial accumulated value. + /// The function that combines the current state with the next source value. + /// A sequence that emits one accumulated value on completion. + /// or is . + public static IObservable Aggregate(this IObservable source, TAccumulate seed, Func accumulator) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (accumulator == null) + { + throw new ArgumentNullException(nameof(accumulator)); + } + + return new ReduceSignal(source, seed, accumulator); + } + + /// + /// Suppresses adjacent duplicate values. System.Reactive name for Unique. + /// + /// The value type. + /// The source sequence. + /// A sequence with adjacent duplicates removed. + /// is . + public static IObservable DistinctUntilChanged(this IObservable source) => + DistinctUntilChanged(source, null); + + /// + /// Suppresses adjacent duplicate values using the supplied comparer. System.Reactive name for Unique. + /// + /// The value type. + /// The source sequence. + /// The comparer used to compare adjacent values. + /// A sequence with adjacent duplicates removed. + /// is . + public static IObservable DistinctUntilChanged(this IObservable source, IEqualityComparer? comparer) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + comparer ??= EqualityComparer.Default; + return new UniqueSignal(source, comparer); + } + + /// + /// Suppresses adjacent values with duplicate keys. System.Reactive name for UniqueBy. + /// + /// The value type. + /// The key type. + /// The source sequence. + /// The function that selects the comparison key. + /// A sequence with adjacent duplicate keys removed. + /// or is . + public static IObservable DistinctUntilChangedBy(this IObservable source, Func keySelector) => + DistinctUntilChangedBy(source, keySelector, null); + + /// + /// Suppresses adjacent values with duplicate keys using the supplied comparer. System.Reactive name for UniqueBy. + /// + /// The value type. + /// The key type. + /// The source sequence. + /// The function that selects the comparison key. + /// The comparer used to compare adjacent keys. + /// A sequence with adjacent duplicate keys removed. + /// or is . + public static IObservable DistinctUntilChangedBy(this IObservable source, Func keySelector, IEqualityComparer? comparer) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (keySelector == null) + { + throw new ArgumentNullException(nameof(keySelector)); + } + + comparer ??= EqualityComparer.Default; + return new UniqueBySignal(source, keySelector, comparer); + } + + /// + /// Drops every value, forwarding only the terminal notification. System.Reactive name for IgnoreValues. + /// + /// The value type. + /// The source sequence. + /// A sequence that forwards only completion or error. + /// is . + public static IObservable IgnoreElements(this IObservable source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + return new IgnoreValuesSignal(source); + } + + /// + /// Projects each value to an inner sequence and merges the results. LINQ name for FlatMap. + /// + /// The source value type. + /// The inner value type. + /// The source sequence. + /// The function that projects each source value to an inner sequence. + /// A sequence containing the merged values of every inner sequence. + /// or is . + public static IObservable SelectMany(this IObservable source, Func> selector) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + return new FlatMapSignal(source, selector); + } + + /// + /// Projects each value to an inner sequence and combines each pair with a result selector. LINQ name for FlatMap. + /// + /// The source value type. + /// The inner value type. + /// The result value type. + /// The source sequence. + /// The function that projects each source value to an inner sequence. + /// The function that combines a source value with each inner value. + /// A sequence containing selected outer/inner combinations. + /// or is . + public static IObservable SelectMany( + this IObservable source, + Func> collectionSelector, + Func resultSelector) + { + if (collectionSelector == null) + { + throw new ArgumentNullException(nameof(collectionSelector)); + } + + if (resultSelector == null) + { + throw new ArgumentNullException(nameof(resultSelector)); + } + + return new FlatMapResultSignal(source, collectionSelector, resultSelector); + } + + /// + /// Subscribes to all inner sequences and forwards their values as they arrive. System.Reactive name for Blend. + /// + /// The value type. + /// The outer sequence of inner sequences. + /// A sequence containing values from all inner sequences. + /// is . + public static IObservable Merge(this IObservable> sources) + { + if (sources == null) + { + throw new ArgumentNullException(nameof(sources)); + } + + return new BlendSignal(sources); + } + + /// + /// Subscribes to inner sequences one at a time in source order. System.Reactive name for Chain. + /// + /// The value type. + /// The outer sequence of inner sequences. + /// A sequence that emits each inner sequence after the previous one completes. + /// is . + public static IObservable Concat(this IObservable> sources) + { + if (sources == null) + { + throw new ArgumentNullException(nameof(sources)); + } + + return new ChainSignal(sources); + } + + /// + /// Concatenates two sequences. System.Reactive name for Chain. + /// + /// The value type. + /// The first sequence. + /// The second sequence. + /// A sequence that emits after completes. + public static IObservable Concat(this IObservable first, IObservable second) => + Signal.Chain(first, second); + + /// + /// Mirrors the first inner sequence to produce any notification. System.Reactive name for Race. + /// + /// The value type. + /// The competing inner sequences. + /// A sequence that mirrors the winning inner sequence. + /// is . + public static IObservable Amb(this IObservable> sources) + { + if (sources == null) + { + throw new ArgumentNullException(nameof(sources)); + } + + return new RaceSignal(sources); + } + + /// + /// Switches to the most recent inner sequence. System.Reactive name for SwitchTo. + /// + /// The value type. + /// The outer sequence of inner sequences. + /// A sequence that mirrors only the latest inner sequence. + /// is . + public static IObservable Switch(this IObservable> sources) + { + if (sources == null) + { + throw new ArgumentNullException(nameof(sources)); + } + + if (TryCreateSynchronousSwitchRangeSignal(sources, out var rangeSignal)) + { + return rangeSignal; + } + + return new SwitchSignal(sources); + } + + /// + /// Combines paired values from two sequences by index. System.Reactive name for Pair. + /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines paired values. + /// A sequence containing one result for each available value pair. + /// , , or is . + public static IObservable Zip(this IObservable left, IObservable right, Func selector) + { + if (left == null) + { + throw new ArgumentNullException(nameof(left)); + } + + if (right == null) + { + throw new ArgumentNullException(nameof(right)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) + { + return new RangeZipSignal(leftRange, rightRange, (Func)(object)selector); + } + + return new ZipSignal(left, right, selector); + } + + /// + /// Combines the latest values once both sequences have produced a value. System.Reactive name for SyncLatest. + /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The left sequence. + /// The right sequence. + /// The function that combines the latest values. + /// A sequence containing selected latest-value combinations. + /// , , or is . + public static IObservable CombineLatest(this IObservable left, IObservable right, Func selector) + { + if (left == null) + { + throw new ArgumentNullException(nameof(left)); + } + + if (right == null) + { + throw new ArgumentNullException(nameof(right)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) + { + return CreateRangeCombineLatestSignal(leftRange, rightRange, (Func)(object)selector); + } + + return new CombineLatestSignal(left, right, selector); + } + + /// + /// Combines each left value with the latest right value. System.Reactive name for Latch. + /// + /// The left value type. + /// The right value type. + /// The result value type. + /// The triggering sequence. + /// The sequence that supplies the latest value. + /// The function that combines the left value with the latest right value. + /// A sequence containing selected left/latest-right combinations. + /// , , or is . + public static IObservable WithLatestFrom(this IObservable left, IObservable right, Func selector) + { + if (left == null) + { + throw new ArgumentNullException(nameof(left)); + } + + if (right == null) + { + throw new ArgumentNullException(nameof(right)); + } + + if (selector == null) + { + throw new ArgumentNullException(nameof(selector)); + } + + if (typeof(TLeft) == typeof(int) && typeof(TRight) == typeof(int) && left is RangeSignal leftRange && right is RangeSignal rightRange) + { + return CreateRangeWithLatestSignal(leftRange, rightRange, (Func)(object)selector); + } + + return new LatchSignal(left, right, selector); + } + + /// + /// Delays source notifications by the specified duration. System.Reactive name for Shift. + /// + /// The value type. + /// The source sequence. + /// The delay applied to each notification. + /// A sequence that forwards source notifications after the delay. + public static IObservable Delay(this IObservable source, TimeSpan dueTime) => + Delay(source, dueTime, null); + + /// + /// Delays source notifications by the specified duration on a sequencer. System.Reactive name for Shift. + /// + /// The value type. + /// The source sequence. + /// The delay applied to each notification. + /// The sequencer used to schedule delayed notifications. + /// A sequence that forwards source notifications after the delay. + public static IObservable Delay(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + scheduler ??= ThreadPoolSequencer.Instance; + if (source is RangeSignal range && CanReadRangeAs(typeof(T))) + { + return new ShiftedRangeSignal(range, Sequencer.Normalize(dueTime), scheduler); + } + + return new ShiftSignal(source, dueTime, scheduler); + } + + /// + /// Fails the sequence if it does not terminate before the timeout. System.Reactive name for Expire. + /// + /// The value type. + /// The source sequence. + /// The timeout duration. + /// A sequence that errors with when the timeout elapses first. + public static IObservable Timeout(this IObservable source, TimeSpan dueTime) => + Timeout(source, dueTime, null); + + /// + /// Fails the sequence if it does not terminate before the sequencer timeout. System.Reactive name for Expire. + /// + /// The value type. + /// The source sequence. + /// The timeout duration. + /// The sequencer used to schedule the timeout. + /// A sequence that errors with when the timeout elapses first. + public static IObservable Timeout(this IObservable source, TimeSpan dueTime, ISequencer? scheduler) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + scheduler ??= ThreadPoolSequencer.Instance; + return new ExpireSignal(source, dueTime, scheduler); + } + + /// + /// Emits the most recent value at the end of each sampling period. System.Reactive name for Probe. + /// + /// The value type. + /// The source sequence. + /// The sampling period. + /// A sequence containing the latest source value sampled at each period boundary. + public static IObservable Sample(this IObservable source, TimeSpan interval) => + Sample(source, interval, null); + + /// + /// Emits the most recent value at the end of each sampling period on a sequencer. System.Reactive name for Probe. + /// + /// The value type. + /// The source sequence. + /// The sampling period. + /// The sequencer used to schedule sampling. + /// A sequence containing the latest source value sampled at each period boundary. + /// is . + /// is less than . + public static IObservable Sample(this IObservable source, TimeSpan interval, ISequencer? scheduler) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (interval < TimeSpan.Zero) + { + throw new ArgumentOutOfRangeException(nameof(interval)); + } + + scheduler ??= ThreadPoolSequencer.Instance; + return new ProbeSignal(source, interval, scheduler); + } + + /// + /// Resubscribes to the source after an error up to times. System.Reactive name for Reattempt. + /// + /// The value type. + /// The source sequence. + /// The maximum number of retry attempts after the initial subscription. + /// A sequence that retries the source before forwarding the final error. + /// is . + /// is less than zero. + public static IObservable Retry(this IObservable source, int retryCount) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + if (retryCount < 0) + { + throw new ArgumentOutOfRangeException(nameof(retryCount)); + } + + return new ReattemptSignal(source, retryCount); + } + + /// + /// Converts source values and terminal notifications into values. System.Reactive name for Spark. + /// + /// The value type. + /// The source sequence. + /// A sequence of spark values representing source notifications. + /// is . + public static IObservable> Materialize(this IObservable source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + return new SparkSignal(source); + } + + /// + /// Converts values back into observer notifications. System.Reactive name for Unspark. + /// + /// The value type. + /// The spark sequence. + /// A sequence represented by the supplied spark values. + /// is . + public static IObservable Dematerialize(this IObservable> source) + { + if (source == null) + { + throw new ArgumentNullException(nameof(source)); + } + + return new UnsparkSignal(source); + } +} diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs index 612f993..1771723 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs @@ -841,7 +841,7 @@ public static IObservable> Timestamp(this IObservable source, IS return new TimestampRangeSignal(range, scheduler); } - return source.Map(value => new Moment(value, scheduler.Now)); + return new TimestampSignal(source, scheduler); } /// diff --git a/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs b/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs new file mode 100644 index 0000000..78189ea --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs @@ -0,0 +1,161 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Core; +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives.Signals.Core; + +/// +/// Dedicated cold signal for Resume (continue with a fixed fallback sequence after any error). Holds the +/// fallback observable directly so no per-subscription closure is allocated, mirroring the slot-based subscription +/// management of . +/// +/// The value type. +internal sealed class ResumeSignal : IRequireCurrentThread +{ + /// The source observable. + private readonly IObservable _source; + + /// The fallback observable subscribed to after the source errors. + private readonly IObservable _fallback; + + /// Initializes a new instance of the class. + /// The source observable. + /// The fallback observable subscribed to after the source errors. + internal ResumeSignal(IObservable source, IObservable fallback) + { + _source = source; + _fallback = fallback; + } + + /// + public bool IsRequiredSubscribeOnCurrentThread() => true; + + /// + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + if (!CurrentThreadSequencer.IsScheduleRequired) + { + return Run(observer); + } + + var subscription = new SingleDisposable(); + Sequencer.CurrentThread.Schedule(() => subscription.Create(Run(observer))); + return subscription; + } + + /// Builds the sink and subscribes it to the source. + /// The downstream observer. + /// The sink, which is the subscription. + private ResumeObserver Run(IObserver observer) => new ResumeObserver(observer, _fallback).Run(_source); + + /// Forwards source values and, on any error, switches to the fallback sequence. + private sealed class ResumeObserver : IObserver, IDisposable + { + /// Marker stored in a slot once the sink is disposed. + private static readonly IDisposable Disposed = new DisposedMarker(); + + /// The downstream observer. + private readonly IObserver _observer; + + /// The fallback observable. + private readonly IObservable _fallback; + + /// The source subscription slot. + private IDisposable? _sourceSubscription; + + /// The fallback subscription slot, populated after an error. + private IDisposable? _fallbackSubscription; + + /// Initializes a new instance of the class. + /// The downstream observer. + /// The fallback observable. + internal ResumeObserver(IObserver observer, IObservable fallback) + { + _observer = observer; + _fallback = fallback; + } + + /// + public void OnNext(T value) => _observer.OnNext(value); + + /// + public void OnError(Exception error) => SetFallback(_fallback.Subscribe(_observer)); + + /// + public void OnCompleted() + { + try + { + _observer.OnCompleted(); + } + finally + { + Dispose(); + } + } + + /// + public void Dispose() + { + Release(ref _sourceSubscription); + Release(ref _fallbackSubscription); + } + + /// Subscribes to the source and returns the sink. + /// The source observable. + /// This sink, which is the subscription. + internal ResumeObserver Run(IObservable source) + { + Assign(ref _sourceSubscription, source.Subscribe(this)); + return this; + } + + /// Exchanges a slot for the disposed marker and releases any live subscription. + /// The slot to release. + private static void Release(ref IDisposable? slot) + { + var current = Interlocked.Exchange(ref slot, Disposed); + if (current == null || ReferenceEquals(current, Disposed)) + { + return; + } + + current.Dispose(); + } + + /// Stores a subscription into an empty slot, disposing it instead if the sink is already disposed. + /// The target slot. + /// The subscription to store. + private static void Assign(ref IDisposable? slot, IDisposable subscription) + { + if (Interlocked.CompareExchange(ref slot, subscription, null) == null) + { + return; + } + + subscription.Dispose(); + } + + /// Stores the fallback subscription. + /// The fallback subscription. + private void SetFallback(IDisposable subscription) => Assign(ref _fallbackSubscription, subscription); + + /// No-op disposable used as the disposed-slot sentinel. + private sealed class DisposedMarker : IDisposable + { + /// + public void Dispose() + { + } + } + } +} diff --git a/src/ReactiveUI.Primitives/Signals/KeepWithSignal{T,TState}.cs b/src/ReactiveUI.Primitives/Signals/KeepWithSignal{T,TState}.cs new file mode 100644 index 0000000..4bd8174 --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/KeepWithSignal{T,TState}.cs @@ -0,0 +1,142 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Core; + +namespace ReactiveUI.Primitives.Signals; + +/// +/// Filters source values using a caller-supplied state value, without allocating a per-value closure: the state is +/// stored on the sink and passed to the predicate for each element. +/// +/// The value type. +/// The state type passed to the predicate. +internal sealed class KeepWithSignal : IRequireCurrentThread +{ + /// The source sequence. + private readonly IObservable _source; + + /// The state passed to the predicate. + private readonly TState _state; + + /// The predicate applied to each source value and the state. + private readonly Func _predicate; + + /// + /// Initializes a new instance of the class. + /// + /// The source sequence. + /// The state passed to the predicate. + /// The predicate applied to each source value and the state. + public KeepWithSignal(IObservable source, TState state, Func predicate) + { + _source = source; + _state = state; + _predicate = predicate; + } + + /// + /// Determines whether the sink must subscribe on the current thread. + /// + /// when the source requires current-thread subscription. + public bool IsRequiredSubscribeOnCurrentThread() => + _source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread(); + + /// + /// Subscribes the observer to the filtered sequence. + /// + /// The downstream observer. + /// The subscription handle. + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + return _source.Subscribe(new KeepWithObserver(observer, _state, _predicate)); + } + + /// Applies the stateful predicate to each source value. + private sealed class KeepWithObserver : IObserver + { + /// The downstream observer. + private readonly IObserver _observer; + + /// The state passed to the predicate. + private readonly TState _state; + + /// The predicate applied to each source value and the state. + private readonly Func _predicate; + + /// Whether a terminal notification has been forwarded. + private bool _stopped; + + /// + /// Initializes a new instance of the class. + /// + /// The downstream observer. + /// The state passed to the predicate. + /// The predicate applied to each source value and the state. + public KeepWithObserver(IObserver observer, TState state, Func predicate) + { + _observer = observer; + _state = state; + _predicate = predicate; + } + + /// Forwards completion downstream. + public void OnCompleted() + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnCompleted(); + } + + /// Forwards an error downstream. + /// The error value. + public void OnError(Exception error) + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnError(error); + } + + /// Filters and forwards a source value. + /// The source value. + public void OnNext(T value) + { + if (_stopped) + { + return; + } + + bool keep; + try + { + keep = _predicate(_state, value); + } + catch (Exception error) + { + OnError(error); + return; + } + + if (!keep) + { + return; + } + + _observer.OnNext(value); + } + } +} diff --git a/src/ReactiveUI.Primitives/Signals/MapWithSignal{TSource,TState,TResult}.cs b/src/ReactiveUI.Primitives/Signals/MapWithSignal{TSource,TState,TResult}.cs new file mode 100644 index 0000000..6c71e49 --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/MapWithSignal{TSource,TState,TResult}.cs @@ -0,0 +1,138 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Core; + +namespace ReactiveUI.Primitives.Signals; + +/// +/// Projects each source value into a new form using a caller-supplied state value, without allocating a per-value +/// closure: the state is stored on the sink and passed to the selector for each element. +/// +/// The source value type. +/// The state type passed to the selector. +/// The result value type. +internal sealed class MapWithSignal : IRequireCurrentThread +{ + /// The source sequence. + private readonly IObservable _source; + + /// The state passed to the selector. + private readonly TState _state; + + /// The transform applied to each source value and the state. + private readonly Func _selector; + + /// + /// Initializes a new instance of the class. + /// + /// The source sequence. + /// The state passed to the selector. + /// The transform applied to each source value and the state. + public MapWithSignal(IObservable source, TState state, Func selector) + { + _source = source; + _state = state; + _selector = selector; + } + + /// + /// Determines whether the sink must subscribe on the current thread. + /// + /// when the source requires current-thread subscription. + public bool IsRequiredSubscribeOnCurrentThread() => + _source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread(); + + /// + /// Subscribes the observer to the projected sequence. + /// + /// The downstream observer. + /// The subscription handle. + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + return _source.Subscribe(new MapWithObserver(observer, _state, _selector)); + } + + /// Applies the stateful selector to each source value. + private sealed class MapWithObserver : IObserver + { + /// The downstream observer. + private readonly IObserver _observer; + + /// The state passed to the selector. + private readonly TState _state; + + /// The transform applied to each source value and the state. + private readonly Func _selector; + + /// Whether a terminal notification has been forwarded. + private bool _stopped; + + /// + /// Initializes a new instance of the class. + /// + /// The downstream observer. + /// The state passed to the selector. + /// The transform applied to each source value and the state. + public MapWithObserver(IObserver observer, TState state, Func selector) + { + _observer = observer; + _state = state; + _selector = selector; + } + + /// Forwards completion downstream. + public void OnCompleted() + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnCompleted(); + } + + /// Forwards an error downstream. + /// The error value. + public void OnError(Exception error) + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnError(error); + } + + /// Projects and forwards a source value. + /// The source value. + public void OnNext(TSource value) + { + if (_stopped) + { + return; + } + + TResult result; + try + { + result = _selector(_state, value); + } + catch (Exception error) + { + OnError(error); + return; + } + + _observer.OnNext(result); + } + } +} diff --git a/src/ReactiveUI.Primitives/Signals/TapWithSignal{T,TState}.cs b/src/ReactiveUI.Primitives/Signals/TapWithSignal{T,TState}.cs new file mode 100644 index 0000000..a0c0d13 --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/TapWithSignal{T,TState}.cs @@ -0,0 +1,127 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Core; + +namespace ReactiveUI.Primitives.Signals; + +/// +/// Invokes a stateful action for each source value while forwarding the value unchanged, without allocating a +/// per-value closure: the state is stored on the sink and passed to the action for each element. +/// +/// The value type. +/// The state type passed to the action. +internal sealed class TapWithSignal : IRequireCurrentThread +{ + /// The source sequence. + private readonly IObservable _source; + + /// The state passed to the action. + private readonly TState _state; + + /// The action invoked for each value and the state. + private readonly Action _onNext; + + /// + /// Initializes a new instance of the class. + /// + /// The source sequence. + /// The state passed to the action. + /// The action invoked for each value and the state. + public TapWithSignal(IObservable source, TState state, Action onNext) + { + _source = source; + _state = state; + _onNext = onNext; + } + + /// + /// Determines whether the sink must subscribe on the current thread. + /// + /// when the source requires current-thread subscription. + public bool IsRequiredSubscribeOnCurrentThread() => + _source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread(); + + /// + /// Subscribes the observer to the tapped sequence. + /// + /// The downstream observer. + /// The subscription handle. + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + return _source.Subscribe(new TapWithObserver(observer, _state, _onNext)); + } + + /// Invokes the stateful side-effect action and forwards each value. + private sealed class TapWithObserver : IObserver + { + /// The downstream observer. + private readonly IObserver _observer; + + /// The state passed to the action. + private readonly TState _state; + + /// The action invoked for each value and the state. + private readonly Action _onNext; + + /// Whether a terminal notification has been forwarded. + private bool _stopped; + + /// + /// Initializes a new instance of the class. + /// + /// The downstream observer. + /// The state passed to the action. + /// The action invoked for each value and the state. + public TapWithObserver(IObserver observer, TState state, Action onNext) + { + _observer = observer; + _state = state; + _onNext = onNext; + } + + /// Forwards completion downstream. + public void OnCompleted() + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnCompleted(); + } + + /// Forwards an error downstream. + /// The error value. + public void OnError(Exception error) + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnError(error); + } + + /// Runs the stateful side effect and forwards a source value. + /// The source value. + public void OnNext(T value) + { + if (_stopped) + { + return; + } + + _onNext(_state, value); + _observer.OnNext(value); + } + } +} diff --git a/src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs b/src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs new file mode 100644 index 0000000..98a5ff3 --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs @@ -0,0 +1,112 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Core; + +namespace ReactiveUI.Primitives.Signals; + +/// +/// Annotates each source value with the scheduler timestamp at which it was observed, holding the sequencer directly +/// so no per-subscription closure is allocated. +/// +/// The value type. +internal sealed class TimestampSignal : IRequireCurrentThread> +{ + /// The source sequence. + private readonly IObservable _source; + + /// The sequencer that supplies timestamps. + private readonly ISequencer _scheduler; + + /// Initializes a new instance of the class. + /// The source sequence. + /// The sequencer that supplies timestamps. + internal TimestampSignal(IObservable source, ISequencer scheduler) + { + _source = source; + _scheduler = scheduler; + } + + /// + /// Determines whether the sink must subscribe on the current thread. + /// + /// when the source requires current-thread subscription. + public bool IsRequiredSubscribeOnCurrentThread() => + _source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread(); + + /// + /// Subscribes the observer to the timestamped sequence. + /// + /// The downstream observer. + /// The subscription handle. + public IDisposable Subscribe(IObserver> observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + return _source.Subscribe(new TimestampObserver(observer, _scheduler)); + } + + /// Stamps each source value with the current scheduler time. + private sealed class TimestampObserver : IObserver + { + /// The downstream observer. + private readonly IObserver> _observer; + + /// The sequencer that supplies timestamps. + private readonly ISequencer _scheduler; + + /// Whether a terminal notification has been forwarded. + private bool _stopped; + + /// Initializes a new instance of the class. + /// The downstream observer. + /// The sequencer that supplies timestamps. + public TimestampObserver(IObserver> observer, ISequencer scheduler) + { + _observer = observer; + _scheduler = scheduler; + } + + /// Forwards completion downstream. + public void OnCompleted() + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnCompleted(); + } + + /// Forwards an error downstream. + /// The error value. + public void OnError(Exception error) + { + if (_stopped) + { + return; + } + + _stopped = true; + _observer.OnError(error); + } + + /// Stamps and forwards a source value. + /// The source value. + public void OnNext(T value) + { + if (_stopped) + { + return; + } + + _observer.OnNext(new Moment(value, _scheduler.Now)); + } + } +} diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt index 08cb2ff..a102e5b 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt @@ -178,7 +178,9 @@ namespace ReactiveUI.Primitives } public static class LinqMixins { + public static System.IObservable Aggregate(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable All(this System.IObservable source, System.Func predicate) { } + public static System.IObservable Amb(this System.IObservable> sources) { } public static System.IObservable Any(this System.IObservable source) { } public static System.IObservable Any(this System.IObservable source, System.Func predicate) { } public static System.Threading.Tasks.Task AnyAsync(this System.IObservable source) { } @@ -205,6 +207,9 @@ namespace ReactiveUI.Primitives public static System.Threading.Tasks.Task CollectArrayAsync(this System.IObservable source) { } public static System.IObservable> CollectList(this System.IObservable source) { } public static System.Threading.Tasks.Task> CollectListAsync(this System.IObservable source) { } + public static System.IObservable CombineLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable Concat(this System.IObservable> sources) { } + public static System.IObservable Concat(this System.IObservable first, System.IObservable second) { } public static System.IObservable Contains(this System.IObservable source, T value) { } public static System.IObservable Contains(this System.IObservable source, T value, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable Count(this System.IObservable source) { } @@ -215,10 +220,13 @@ namespace ReactiveUI.Primitives public static System.Threading.Tasks.Task CountAsync(this System.IObservable source, System.Func predicate, System.Threading.CancellationToken cancellationToken) { } public static System.IObservable DefaultIfEmpty(this System.IObservable source) { } public static System.IObservable DefaultIfEmpty(this System.IObservable source, T defaultValue) { } + public static System.IObservable Delay(this System.IObservable source, System.TimeSpan dueTime) { } + public static System.IObservable Delay(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable DelayStart(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable DelayStart(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable DelaySubscription(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable DelaySubscription(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Dematerialize(this System.IObservable> source) { } public static ReactiveUI.Primitives.Disposables.SingleDisposable DisposeWith(this System.IDisposable disposable) { } public static System.IDisposable DisposeWith(this System.IDisposable disposable, ReactiveUI.Primitives.Disposables.MultipleDisposable disposables) { } public static ReactiveUI.Primitives.Disposables.SingleDisposable DisposeWith(this System.IDisposable disposable, System.Action? action) { } @@ -226,6 +234,12 @@ namespace ReactiveUI.Primitives public static System.IObservable Distinct(this System.IObservable source, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable DistinctBy(this System.IObservable source, System.Func keySelector) { } public static System.IObservable DistinctBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable DistinctUntilChanged(this System.IObservable source) { } + public static System.IObservable DistinctUntilChanged(this System.IObservable source, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable DistinctUntilChangedBy(this System.IObservable source, System.Func keySelector) { } + public static System.IObservable DistinctUntilChangedBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable Do(this System.IObservable source, System.Action onNext) { } + public static System.IObservable DoWith(this System.IObservable source, TState state, System.Action onNext) { } public static System.IObservable Expire(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Expire(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.Threading.Tasks.Task FirstAsync(this System.IObservable source) { } @@ -236,6 +250,7 @@ namespace ReactiveUI.Primitives public static System.IObservable Fold(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable ForkJoin(this System.IObservable left, System.IObservable right, System.Func selector) { } public static System.IObservable FuseLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable IgnoreElements(this System.IObservable source) { } public static System.IObservable IgnoreValues(this System.IObservable source) { } public static System.IObservable IsEmpty(this System.IObservable source) { } public static System.IObservable Keep(this System.IObservable source, System.Func predicate) { } @@ -252,6 +267,8 @@ namespace ReactiveUI.Primitives public static System.IObservable LongCount(this System.IObservable source, System.Func predicate) { } public static System.IObservable Map(this System.IObservable source, System.Func selector) { } public static System.IObservable MapWith(this System.IObservable source, TState state, System.Func selector) { } + public static System.IObservable> Materialize(this System.IObservable source) { } + public static System.IObservable Merge(this System.IObservable> sources) { } public static System.IObservable ObserveOn(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer scheduler) { } public static System.IObservable Pair(this System.IObservable left, System.IObservable right, System.Func selector) { } public static System.IObservable PairLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } @@ -266,6 +283,14 @@ namespace ReactiveUI.Primitives public static System.IObservable Reduce(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable Rescue(this System.IObservable source, System.Func> handler) { } public static System.IObservable Resume(this System.IObservable source, System.IObservable fallback) { } + public static System.IObservable Retry(this System.IObservable source, int retryCount) { } + public static System.IObservable Sample(this System.IObservable source, System.TimeSpan interval) { } + public static System.IObservable Sample(this System.IObservable source, System.TimeSpan interval, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Scan(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } + public static System.IObservable Select(this System.IObservable source, System.Func selector) { } + public static System.IObservable SelectMany(this System.IObservable source, System.Func> selector) { } + public static System.IObservable SelectMany(this System.IObservable source, System.Func> collectionSelector, System.Func resultSelector) { } + public static System.IObservable SelectWith(this System.IObservable source, TState state, System.Func selector) { } public static System.IObservable Shift(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Shift(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable Skip(this System.IObservable source, int count) { } @@ -274,6 +299,7 @@ namespace ReactiveUI.Primitives public static System.IObservable Stabilize(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Stabilize(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable SubscribeOn(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer scheduler) { } + public static System.IObservable Switch(this System.IObservable> sources) { } public static System.IObservable SwitchSelect(this System.IObservable source, System.Func> selector) { } public static System.IObservable SwitchTo(this System.IObservable> sources) { } public static System.IObservable SyncLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } @@ -284,6 +310,8 @@ namespace ReactiveUI.Primitives public static System.IObservable TapWith(this System.IObservable source, TState state, System.Action onNext) { } public static System.IObservable> TimeInterval(this System.IObservable source) { } public static System.IObservable> TimeInterval(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Timeout(this System.IObservable source, System.TimeSpan dueTime) { } + public static System.IObservable Timeout(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable> Timestamp(this System.IObservable source) { } public static System.IObservable> Timestamp(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable ToArray(this System.IObservable source) { } @@ -303,6 +331,12 @@ namespace ReactiveUI.Primitives public static System.IObservable UniqueBy(this System.IObservable source, System.Func keySelector) { } public static System.IObservable UniqueBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable Unspark(this System.IObservable> source) { } + public static System.IObservable Where(this System.IObservable source, System.Func predicate) { } + public static System.IObservable WhereNotNull(this System.IObservable source) + where T : class { } + public static System.IObservable WhereWith(this System.IObservable source, TState state, System.Func predicate) { } + public static System.IObservable WithLatestFrom(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable Zip(this System.IObservable left, System.IObservable right, System.Func selector) { } } public sealed class LongCountObserver : ReactiveUI.Primitives.SingleSourceObserver { diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt index 08cb2ff..a102e5b 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt @@ -178,7 +178,9 @@ namespace ReactiveUI.Primitives } public static class LinqMixins { + public static System.IObservable Aggregate(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable All(this System.IObservable source, System.Func predicate) { } + public static System.IObservable Amb(this System.IObservable> sources) { } public static System.IObservable Any(this System.IObservable source) { } public static System.IObservable Any(this System.IObservable source, System.Func predicate) { } public static System.Threading.Tasks.Task AnyAsync(this System.IObservable source) { } @@ -205,6 +207,9 @@ namespace ReactiveUI.Primitives public static System.Threading.Tasks.Task CollectArrayAsync(this System.IObservable source) { } public static System.IObservable> CollectList(this System.IObservable source) { } public static System.Threading.Tasks.Task> CollectListAsync(this System.IObservable source) { } + public static System.IObservable CombineLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable Concat(this System.IObservable> sources) { } + public static System.IObservable Concat(this System.IObservable first, System.IObservable second) { } public static System.IObservable Contains(this System.IObservable source, T value) { } public static System.IObservable Contains(this System.IObservable source, T value, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable Count(this System.IObservable source) { } @@ -215,10 +220,13 @@ namespace ReactiveUI.Primitives public static System.Threading.Tasks.Task CountAsync(this System.IObservable source, System.Func predicate, System.Threading.CancellationToken cancellationToken) { } public static System.IObservable DefaultIfEmpty(this System.IObservable source) { } public static System.IObservable DefaultIfEmpty(this System.IObservable source, T defaultValue) { } + public static System.IObservable Delay(this System.IObservable source, System.TimeSpan dueTime) { } + public static System.IObservable Delay(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable DelayStart(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable DelayStart(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable DelaySubscription(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable DelaySubscription(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Dematerialize(this System.IObservable> source) { } public static ReactiveUI.Primitives.Disposables.SingleDisposable DisposeWith(this System.IDisposable disposable) { } public static System.IDisposable DisposeWith(this System.IDisposable disposable, ReactiveUI.Primitives.Disposables.MultipleDisposable disposables) { } public static ReactiveUI.Primitives.Disposables.SingleDisposable DisposeWith(this System.IDisposable disposable, System.Action? action) { } @@ -226,6 +234,12 @@ namespace ReactiveUI.Primitives public static System.IObservable Distinct(this System.IObservable source, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable DistinctBy(this System.IObservable source, System.Func keySelector) { } public static System.IObservable DistinctBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable DistinctUntilChanged(this System.IObservable source) { } + public static System.IObservable DistinctUntilChanged(this System.IObservable source, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable DistinctUntilChangedBy(this System.IObservable source, System.Func keySelector) { } + public static System.IObservable DistinctUntilChangedBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable Do(this System.IObservable source, System.Action onNext) { } + public static System.IObservable DoWith(this System.IObservable source, TState state, System.Action onNext) { } public static System.IObservable Expire(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Expire(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.Threading.Tasks.Task FirstAsync(this System.IObservable source) { } @@ -236,6 +250,7 @@ namespace ReactiveUI.Primitives public static System.IObservable Fold(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable ForkJoin(this System.IObservable left, System.IObservable right, System.Func selector) { } public static System.IObservable FuseLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable IgnoreElements(this System.IObservable source) { } public static System.IObservable IgnoreValues(this System.IObservable source) { } public static System.IObservable IsEmpty(this System.IObservable source) { } public static System.IObservable Keep(this System.IObservable source, System.Func predicate) { } @@ -252,6 +267,8 @@ namespace ReactiveUI.Primitives public static System.IObservable LongCount(this System.IObservable source, System.Func predicate) { } public static System.IObservable Map(this System.IObservable source, System.Func selector) { } public static System.IObservable MapWith(this System.IObservable source, TState state, System.Func selector) { } + public static System.IObservable> Materialize(this System.IObservable source) { } + public static System.IObservable Merge(this System.IObservable> sources) { } public static System.IObservable ObserveOn(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer scheduler) { } public static System.IObservable Pair(this System.IObservable left, System.IObservable right, System.Func selector) { } public static System.IObservable PairLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } @@ -266,6 +283,14 @@ namespace ReactiveUI.Primitives public static System.IObservable Reduce(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable Rescue(this System.IObservable source, System.Func> handler) { } public static System.IObservable Resume(this System.IObservable source, System.IObservable fallback) { } + public static System.IObservable Retry(this System.IObservable source, int retryCount) { } + public static System.IObservable Sample(this System.IObservable source, System.TimeSpan interval) { } + public static System.IObservable Sample(this System.IObservable source, System.TimeSpan interval, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Scan(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } + public static System.IObservable Select(this System.IObservable source, System.Func selector) { } + public static System.IObservable SelectMany(this System.IObservable source, System.Func> selector) { } + public static System.IObservable SelectMany(this System.IObservable source, System.Func> collectionSelector, System.Func resultSelector) { } + public static System.IObservable SelectWith(this System.IObservable source, TState state, System.Func selector) { } public static System.IObservable Shift(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Shift(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable Skip(this System.IObservable source, int count) { } @@ -274,6 +299,7 @@ namespace ReactiveUI.Primitives public static System.IObservable Stabilize(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Stabilize(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable SubscribeOn(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer scheduler) { } + public static System.IObservable Switch(this System.IObservable> sources) { } public static System.IObservable SwitchSelect(this System.IObservable source, System.Func> selector) { } public static System.IObservable SwitchTo(this System.IObservable> sources) { } public static System.IObservable SyncLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } @@ -284,6 +310,8 @@ namespace ReactiveUI.Primitives public static System.IObservable TapWith(this System.IObservable source, TState state, System.Action onNext) { } public static System.IObservable> TimeInterval(this System.IObservable source) { } public static System.IObservable> TimeInterval(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Timeout(this System.IObservable source, System.TimeSpan dueTime) { } + public static System.IObservable Timeout(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable> Timestamp(this System.IObservable source) { } public static System.IObservable> Timestamp(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable ToArray(this System.IObservable source) { } @@ -303,6 +331,12 @@ namespace ReactiveUI.Primitives public static System.IObservable UniqueBy(this System.IObservable source, System.Func keySelector) { } public static System.IObservable UniqueBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable Unspark(this System.IObservable> source) { } + public static System.IObservable Where(this System.IObservable source, System.Func predicate) { } + public static System.IObservable WhereNotNull(this System.IObservable source) + where T : class { } + public static System.IObservable WhereWith(this System.IObservable source, TState state, System.Func predicate) { } + public static System.IObservable WithLatestFrom(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable Zip(this System.IObservable left, System.IObservable right, System.Func selector) { } } public sealed class LongCountObserver : ReactiveUI.Primitives.SingleSourceObserver { diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt index 08cb2ff..a102e5b 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt @@ -178,7 +178,9 @@ namespace ReactiveUI.Primitives } public static class LinqMixins { + public static System.IObservable Aggregate(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable All(this System.IObservable source, System.Func predicate) { } + public static System.IObservable Amb(this System.IObservable> sources) { } public static System.IObservable Any(this System.IObservable source) { } public static System.IObservable Any(this System.IObservable source, System.Func predicate) { } public static System.Threading.Tasks.Task AnyAsync(this System.IObservable source) { } @@ -205,6 +207,9 @@ namespace ReactiveUI.Primitives public static System.Threading.Tasks.Task CollectArrayAsync(this System.IObservable source) { } public static System.IObservable> CollectList(this System.IObservable source) { } public static System.Threading.Tasks.Task> CollectListAsync(this System.IObservable source) { } + public static System.IObservable CombineLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable Concat(this System.IObservable> sources) { } + public static System.IObservable Concat(this System.IObservable first, System.IObservable second) { } public static System.IObservable Contains(this System.IObservable source, T value) { } public static System.IObservable Contains(this System.IObservable source, T value, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable Count(this System.IObservable source) { } @@ -215,10 +220,13 @@ namespace ReactiveUI.Primitives public static System.Threading.Tasks.Task CountAsync(this System.IObservable source, System.Func predicate, System.Threading.CancellationToken cancellationToken) { } public static System.IObservable DefaultIfEmpty(this System.IObservable source) { } public static System.IObservable DefaultIfEmpty(this System.IObservable source, T defaultValue) { } + public static System.IObservable Delay(this System.IObservable source, System.TimeSpan dueTime) { } + public static System.IObservable Delay(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable DelayStart(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable DelayStart(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable DelaySubscription(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable DelaySubscription(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Dematerialize(this System.IObservable> source) { } public static ReactiveUI.Primitives.Disposables.SingleDisposable DisposeWith(this System.IDisposable disposable) { } public static System.IDisposable DisposeWith(this System.IDisposable disposable, ReactiveUI.Primitives.Disposables.MultipleDisposable disposables) { } public static ReactiveUI.Primitives.Disposables.SingleDisposable DisposeWith(this System.IDisposable disposable, System.Action? action) { } @@ -226,6 +234,12 @@ namespace ReactiveUI.Primitives public static System.IObservable Distinct(this System.IObservable source, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable DistinctBy(this System.IObservable source, System.Func keySelector) { } public static System.IObservable DistinctBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable DistinctUntilChanged(this System.IObservable source) { } + public static System.IObservable DistinctUntilChanged(this System.IObservable source, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable DistinctUntilChangedBy(this System.IObservable source, System.Func keySelector) { } + public static System.IObservable DistinctUntilChangedBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } + public static System.IObservable Do(this System.IObservable source, System.Action onNext) { } + public static System.IObservable DoWith(this System.IObservable source, TState state, System.Action onNext) { } public static System.IObservable Expire(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Expire(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.Threading.Tasks.Task FirstAsync(this System.IObservable source) { } @@ -236,6 +250,7 @@ namespace ReactiveUI.Primitives public static System.IObservable Fold(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable ForkJoin(this System.IObservable left, System.IObservable right, System.Func selector) { } public static System.IObservable FuseLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable IgnoreElements(this System.IObservable source) { } public static System.IObservable IgnoreValues(this System.IObservable source) { } public static System.IObservable IsEmpty(this System.IObservable source) { } public static System.IObservable Keep(this System.IObservable source, System.Func predicate) { } @@ -252,6 +267,8 @@ namespace ReactiveUI.Primitives public static System.IObservable LongCount(this System.IObservable source, System.Func predicate) { } public static System.IObservable Map(this System.IObservable source, System.Func selector) { } public static System.IObservable MapWith(this System.IObservable source, TState state, System.Func selector) { } + public static System.IObservable> Materialize(this System.IObservable source) { } + public static System.IObservable Merge(this System.IObservable> sources) { } public static System.IObservable ObserveOn(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer scheduler) { } public static System.IObservable Pair(this System.IObservable left, System.IObservable right, System.Func selector) { } public static System.IObservable PairLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } @@ -266,6 +283,14 @@ namespace ReactiveUI.Primitives public static System.IObservable Reduce(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } public static System.IObservable Rescue(this System.IObservable source, System.Func> handler) { } public static System.IObservable Resume(this System.IObservable source, System.IObservable fallback) { } + public static System.IObservable Retry(this System.IObservable source, int retryCount) { } + public static System.IObservable Sample(this System.IObservable source, System.TimeSpan interval) { } + public static System.IObservable Sample(this System.IObservable source, System.TimeSpan interval, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Scan(this System.IObservable source, TAccumulate seed, System.Func accumulator) { } + public static System.IObservable Select(this System.IObservable source, System.Func selector) { } + public static System.IObservable SelectMany(this System.IObservable source, System.Func> selector) { } + public static System.IObservable SelectMany(this System.IObservable source, System.Func> collectionSelector, System.Func resultSelector) { } + public static System.IObservable SelectWith(this System.IObservable source, TState state, System.Func selector) { } public static System.IObservable Shift(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Shift(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable Skip(this System.IObservable source, int count) { } @@ -274,6 +299,7 @@ namespace ReactiveUI.Primitives public static System.IObservable Stabilize(this System.IObservable source, System.TimeSpan dueTime) { } public static System.IObservable Stabilize(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable SubscribeOn(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer scheduler) { } + public static System.IObservable Switch(this System.IObservable> sources) { } public static System.IObservable SwitchSelect(this System.IObservable source, System.Func> selector) { } public static System.IObservable SwitchTo(this System.IObservable> sources) { } public static System.IObservable SyncLatest(this System.IObservable left, System.IObservable right, System.Func selector) { } @@ -284,6 +310,8 @@ namespace ReactiveUI.Primitives public static System.IObservable TapWith(this System.IObservable source, TState state, System.Action onNext) { } public static System.IObservable> TimeInterval(this System.IObservable source) { } public static System.IObservable> TimeInterval(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } + public static System.IObservable Timeout(this System.IObservable source, System.TimeSpan dueTime) { } + public static System.IObservable Timeout(this System.IObservable source, System.TimeSpan dueTime, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable> Timestamp(this System.IObservable source) { } public static System.IObservable> Timestamp(this System.IObservable source, ReactiveUI.Primitives.Concurrency.ISequencer? scheduler) { } public static System.IObservable ToArray(this System.IObservable source) { } @@ -303,6 +331,12 @@ namespace ReactiveUI.Primitives public static System.IObservable UniqueBy(this System.IObservable source, System.Func keySelector) { } public static System.IObservable UniqueBy(this System.IObservable source, System.Func keySelector, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable Unspark(this System.IObservable> source) { } + public static System.IObservable Where(this System.IObservable source, System.Func predicate) { } + public static System.IObservable WhereNotNull(this System.IObservable source) + where T : class { } + public static System.IObservable WhereWith(this System.IObservable source, TState state, System.Func predicate) { } + public static System.IObservable WithLatestFrom(this System.IObservable left, System.IObservable right, System.Func selector) { } + public static System.IObservable Zip(this System.IObservable left, System.IObservable right, System.Func selector) { } } public sealed class LongCountObserver : ReactiveUI.Primitives.SingleSourceObserver { diff --git a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs new file mode 100644 index 0000000..79ece63 --- /dev/null +++ b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs @@ -0,0 +1,448 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Signals; + +namespace ReactiveUI.Primitives.Tests; + +/// +/// Data-driven parity tests proving each System.Reactive/LINQ name builds a behaviorally identical sink to its +/// Primitives-named counterpart. Each operator pair is one data-source row consumed by a single test body, so the +/// behavior is asserted once and checked for both names (and for identity between them). +/// +public class RxNameParityTests +{ + /// The multiplier/state used by projection cases. + private const int Ten = 10; + + /// The divisor used to select even values. + private const int Two = 2; + + /// The fold/aggregate seed. + private const int Seed = 0; + + /// The value one, pushed by the binary drive scripts. + private const int One = 1; + + /// The value three, pushed by the binary drive scripts. + private const int Three = 3; + + /// The value twenty, pushed by the binary drive scripts. + private const int Twenty = 20; + + /// The value thirty, pushed by the binary drive scripts. + private const int Thirty = 30; + + /// The fixed delay/timeout in ticks. + private const long DueTicks = 1; + + /// The amount the virtual clock is advanced, comfortably past . + private const long AdvanceTicks = 5; + + /// Source values 1..5. + private static readonly int[] _oneToFive = [1, 2, 3, 4, 5]; + + /// Source values 1..3. + private static readonly int[] _oneToThree = [1, 2, 3]; + + /// Each of 1..5 doubled. + private static readonly int[] _doubled = [2, 4, 6, 8, 10]; + + /// Even values of 1..5. + private static readonly int[] _evens = [2, 4]; + + /// Running sum of 1..5. + private static readonly int[] _runningSum = [1, 3, 6, 10, 15]; + + /// Final sum of 1..5. + private static readonly int[] _finalSum = [15]; + + /// Each of 1..5 plus the state value . + private static readonly int[] _plusTen = [11, 12, 13, 14, 15]; + + /// Each of 1..3 emitted twice. + private static readonly int[] _fanned = [1, 1, 2, 2, 3, 3]; + + /// A sequence containing adjacent duplicate values. + private static readonly int[] _adjacentDuplicates = [1, 1, 2, 3, 3]; + + /// The adjacent-duplicate sequence with duplicates removed. + private static readonly int[] _deduplicated = [1, 2, 3]; + + /// An empty result. + private static readonly int[] _empty = []; + + /// Inner sequences for the higher-order family. + private static readonly int[][] _twoInners = [[1, 2], [3, 4]]; + + /// The flattened higher-order result (merge/concat/switch of synchronous inners). + private static readonly int[] _flattened = [1, 2, 3, 4]; + + /// The first inner sequence (the Amb/Race winner). + private static readonly int[] _firstInner = [1, 2]; + + /// Expected output for the Zip drive script. + private static readonly int[] _zipped = [11, 22, 33]; + + /// Expected output for the CombineLatest drive script. + private static readonly int[] _combined = [11, 12, 22]; + + /// Expected output for the WithLatestFrom drive script. + private static readonly int[] _latched = [11, 12, 23]; + + /// Provides the unary IObservable<int> -> IObservable<int> parity cases. + /// The unary parity cases. + public static IEnumerable UnaryCases() + { + yield return new("Select-Map", s => s.Map(Double), s => s.Select(Double), _oneToFive, _doubled); + yield return new("Where-Keep", s => s.Keep(IsEven), s => s.Where(IsEven), _oneToFive, _evens); + yield return new("Scan-Fold", s => s.Fold(Seed, Add), s => s.Scan(Seed, Add), _oneToFive, _runningSum); + yield return new("Aggregate-Reduce", s => s.Reduce(Seed, Add), s => s.Aggregate(Seed, Add), _oneToFive, _finalSum); + yield return new("DistinctUntilChanged-Unique", s => s.Unique(), s => s.DistinctUntilChanged(), _adjacentDuplicates, _deduplicated); + yield return new("DistinctUntilChangedBy-UniqueBy", s => s.UniqueBy(Identity), s => s.DistinctUntilChangedBy(Identity), _adjacentDuplicates, _deduplicated); + yield return new("IgnoreElements-IgnoreValues", s => s.IgnoreValues(), s => s.IgnoreElements(), _oneToFive, _empty); + yield return new("SelectWith-MapWith", s => s.MapWith(Ten, AddState), s => s.SelectWith(Ten, AddState), _oneToFive, _plusTen); + yield return new("WhereWith-KeepWith", s => s.KeepWith(Two, IsMultiple), s => s.WhereWith(Two, IsMultiple), _oneToFive, _evens); + yield return new("Do-Tap", s => s.Tap(Ignore), s => s.Do(Ignore), _oneToFive, _oneToFive); + yield return new("DoWith-TapWith", s => s.TapWith(Ten, IgnoreState), s => s.DoWith(Ten, IgnoreState), _oneToFive, _oneToFive); + yield return new("SelectMany-FlatMap", s => s.FlatMap(Fan), s => s.SelectMany(Fan), _oneToThree, _fanned); + yield return new("Materialize-Spark", s => s.Spark().Unspark(), s => s.Materialize().Dematerialize(), _oneToFive, _oneToFive); + } + + /// Provides the higher-order source-of-sources parity cases. + /// The higher-order parity cases. + public static IEnumerable HigherOrderCases() + { + yield return new("Merge-Blend", o => o.Blend(), o => o.Merge(), _twoInners, _flattened); + yield return new("Concat-Chain", o => o.Chain(), o => o.Concat(), _twoInners, _flattened); + yield return new("Switch-SwitchTo", o => o.SwitchTo(), o => o.Switch(), _twoInners, _flattened); + yield return new("Amb-Race", o => o.Race(), o => o.Amb(), _twoInners, _firstInner); + } + + /// Provides the binary (left, right) -> result parity cases. + /// The binary parity cases. + public static IEnumerable BinaryCases() + { + yield return new("Zip-Pair", (l, r) => l.Pair(r, Add), (l, r) => l.Zip(r, Add), DriveZip, _zipped); + yield return new("CombineLatest-SyncLatest", (l, r) => l.SyncLatest(r, Add), (l, r) => l.CombineLatest(r, Add), DriveCombine, _combined); + yield return new("WithLatestFrom-Latch", (l, r) => l.Latch(r, Add), (l, r) => l.WithLatestFrom(r, Add), DriveLatch, _latched); + } + + /// Provides the time-based parity cases, driven by a virtual clock. + /// The time-based parity cases. + public static IEnumerable TimeCases() + { + yield return new("Delay-Shift", (s, c) => s.Shift(TimeSpan.FromTicks(DueTicks), c), (s, c) => s.Delay(TimeSpan.FromTicks(DueTicks), c), FromOneToThree, _oneToThree, false); + yield return new("Timeout-Expire", (s, c) => s.Expire(TimeSpan.FromTicks(DueTicks), c), (s, c) => s.Timeout(TimeSpan.FromTicks(DueTicks), c), Silent, _empty, true); + } + + /// Verifies each unary name produces the expected sequence and is identical to its counterpart. + /// The parity case under test. + [Test] + [MethodDataSource(nameof(UnaryCases))] + public void UnaryNamesAreBehaviorallyIdentical(UnaryCase testCase) + { + var deviant = RunUnary(testCase.Deviant, testCase.Input); + var rx = RunUnary(testCase.Rx, testCase.Input); + + Assert.Equal(testCase.Expected, deviant); + Assert.Equal(testCase.Expected, rx); + Assert.Equal(deviant, rx); + } + + /// Verifies each higher-order name produces the expected sequence and is identical to its counterpart. + /// The parity case under test. + [Test] + [MethodDataSource(nameof(HigherOrderCases))] + public void HigherOrderNamesAreBehaviorallyIdentical(HigherOrderCase testCase) + { + var deviant = RunHigherOrder(testCase.Deviant, testCase.Inners); + var rx = RunHigherOrder(testCase.Rx, testCase.Inners); + + Assert.Equal(testCase.Expected, deviant); + Assert.Equal(testCase.Expected, rx); + Assert.Equal(deviant, rx); + } + + /// Verifies each binary name produces the expected sequence and is identical to its counterpart. + /// The parity case under test. + [Test] + [MethodDataSource(nameof(BinaryCases))] + public void BinaryNamesAreBehaviorallyIdentical(BinaryCase testCase) + { + var deviant = RunBinary(testCase.Deviant, testCase.Drive); + var rx = RunBinary(testCase.Rx, testCase.Drive); + + Assert.Equal(testCase.Expected, deviant); + Assert.Equal(testCase.Expected, rx); + Assert.Equal(deviant, rx); + } + + /// Verifies each time-based name produces the expected sequence/error and is identical to its counterpart. + /// The parity case under test. + [Test] + [MethodDataSource(nameof(TimeCases))] + public void TimeNamesAreBehaviorallyIdentical(TimeCase testCase) + { + var (deviantValues, deviantError) = RunTimed(testCase.Deviant, testCase.Source); + var (rxValues, rxError) = RunTimed(testCase.Rx, testCase.Source); + + Assert.Equal(testCase.Expected, deviantValues); + Assert.Equal(testCase.Expected, rxValues); + Assert.Equal(deviantValues, rxValues); + Assert.Equal(testCase.ExpectsTimeout, deviantError is TimeoutException); + Assert.Equal(testCase.ExpectsTimeout, rxError is TimeoutException); + } + + /// Verifies the WhereNotNull/KeepNotNull reference-type pair filters nulls identically. + [Test] + public void WhereNotNullMatchesKeepNotNull() + { + var keep = new List(); + var where = new List(); + + Signal.FromEnumerable(["a", null, "b"]).KeepNotNull().Subscribe(keep.Add); + Signal.FromEnumerable(["a", null, "b"]).WhereNotNull().Subscribe(where.Add); + + Assert.Equal(keep, where); + Assert.Equal(Two, where.Count); + } + + /// Verifies the binary Concat/Chain overload concatenates two sequences identically. + [Test] + public void BinaryConcatMatchesChain() + { + var chain = new List(); + var concat = new List(); + + Signal.FromEnumerable(_oneToThree).Chain(Signal.FromEnumerable(_oneToThree)).Subscribe(chain.Add); + Signal.FromEnumerable(_oneToThree).Concat(Signal.FromEnumerable(_oneToThree)).Subscribe(concat.Add); + + Assert.Equal(chain, concat); + } + + /// Doubles a value. + /// The source value. + /// The doubled value. + private static int Double(int value) => value * Two; + + /// Determines whether a value is even. + /// The source value. + /// when the value is even. + private static bool IsEven(int value) => value % Two == 0; + + /// Adds a value to an accumulator. + /// The accumulated value. + /// The source value. + /// The new accumulated value. + private static int Add(int accumulated, int value) => accumulated + value; + + /// Returns the value unchanged (key selector). + /// The source value. + /// The value. + private static int Identity(int value) => value; + + /// Adds the state value to a source value. + /// The state value. + /// The source value. + /// The sum of the state and the value. + private static int AddState(int state, int value) => value + state; + + /// Determines whether a value is a multiple of the divisor state. + /// The divisor state. + /// The source value. + /// when the value is a multiple of the divisor. + private static bool IsMultiple(int divisor, int value) => value % divisor == 0; + + /// Consumes a value without effect (the side-effect under test is irrelevant to the output). + /// The source value. + private static void Ignore(int value) + { + // Intentionally empty: Do/Tap forward values unchanged regardless of the side effect. + } + + /// Consumes a state and value without effect. + /// The state value. + /// The source value. + private static void IgnoreState(int state, int value) + { + // Intentionally empty: DoWith/TapWith forward values unchanged regardless of the side effect. + } + + /// Projects a value to an inner sequence that emits it twice. + /// The source value. + /// An inner sequence of two copies of the value. + private static IObservable Fan(int value) => Signal.FromEnumerable([value, value]); + + /// Builds the 1..3 source used by the delay case. + /// A source emitting 1..3. + private static IObservable FromOneToThree() => Signal.FromEnumerable(_oneToThree); + + /// Builds a non-terminating source used by the timeout case. + /// A source that never emits or completes. + private static IObservable Silent() => Signal.Silent(); + + /// Pushes index-paired values so Zip/Pair emits 11, 22, 33. + /// The left subject. + /// The right subject. + private static void DriveZip(Signal left, Signal right) + { + left.OnNext(One); + right.OnNext(Ten); + left.OnNext(Two); + right.OnNext(Twenty); + left.OnNext(Three); + right.OnNext(Thirty); + } + + /// Pushes interleaved values so CombineLatest/SyncLatest emits 11, 12, 22. + /// The left subject. + /// The right subject. + private static void DriveCombine(Signal left, Signal right) + { + left.OnNext(One); + right.OnNext(Ten); + left.OnNext(Two); + right.OnNext(Twenty); + } + + /// Pushes triggers and latest values so WithLatestFrom/Latch emits 11, 12, 23. + /// The triggering subject. + /// The latest-value subject. + private static void DriveLatch(Signal left, Signal right) + { + right.OnNext(Ten); + left.OnNext(One); + left.OnNext(Two); + right.OnNext(Twenty); + left.OnNext(Three); + } + + /// Runs a unary operator over a cold source and collects the forwarded values. + /// The operator under test. + /// The source values. + /// The forwarded values. + private static List RunUnary(Func, IObservable> op, int[] input) + { + var values = new List(); + op(Signal.FromEnumerable(input)).Subscribe(values.Add); + return values; + } + + /// Runs a higher-order operator over a source of cold inner sources and collects the forwarded values. + /// The operator under test. + /// The inner source values. + /// The forwarded values. + private static List RunHigherOrder(Func>, IObservable> op, int[][] inners) + { + var outer = Signal.FromEnumerable(Array.ConvertAll(inners, ToSource)); + var values = new List(); + op(outer).Subscribe(values.Add); + return values; + } + + /// Wraps an inner value array in a cold source. + /// The inner values. + /// A cold source over the inner values. + private static IObservable ToSource(int[] inner) => Signal.FromEnumerable(inner); + + /// Runs a binary operator over two manual subjects driven by a script and collects the forwarded values. + /// The operator under test. + /// The script that pushes values into the subjects. + /// The forwarded values. + private static List RunBinary(Func, IObservable, IObservable> op, Action, Signal> drive) + { + var left = new Signal(); + var right = new Signal(); + var values = new List(); + using var subscription = op(left, right).Subscribe(values.Add); + drive(left, right); + return values; + } + + /// Runs a time-based operator against a virtual clock and collects the forwarded values and any error. + /// The operator under test. + /// The source factory. + /// The forwarded values and any terminal error. + private static (List Values, Exception? Error) RunTimed(Func, ISequencer, IObservable> op, Func> source) + { + var clock = new TestClock(DateTimeOffset.UnixEpoch); + var values = new List(); + Exception? error = null; + using var subscription = op(source(), clock).Subscribe(values.Add, captured => error = captured, () => { }); + clock.AdvanceBy(TimeSpan.FromTicks(AdvanceTicks)); + return (values, error); + } + + /// A unary parity case: a Primitives-named builder and its Rx-named twin over one source. + /// The pair name. + /// The Primitives-named builder. + /// The Rx/LINQ-named builder. + /// The source values. + /// The expected forwarded values. + public sealed record UnaryCase( + string Name, + Func, IObservable> Deviant, + Func, IObservable> Rx, + int[] Input, + int[] Expected) + { + /// + public override string ToString() => Name; + } + + /// A higher-order parity case operating over a source of inner sources. + /// The pair name. + /// The Primitives-named builder. + /// The Rx/LINQ-named builder. + /// The inner source values. + /// The expected forwarded values. + public sealed record HigherOrderCase( + string Name, + Func>, IObservable> Deviant, + Func>, IObservable> Rx, + int[][] Inners, + int[] Expected) + { + /// + public override string ToString() => Name; + } + + /// A binary parity case driven by a scripted interleaving of two manual subjects. + /// The pair name. + /// The Primitives-named builder. + /// The Rx/LINQ-named builder. + /// The script that pushes values into the left and right subjects. + /// The expected forwarded values. + public sealed record BinaryCase( + string Name, + Func, IObservable, IObservable> Deviant, + Func, IObservable, IObservable> Rx, + Action, Signal> Drive, + int[] Expected) + { + /// + public override string ToString() => Name; + } + + /// A time-based parity case driven by a virtual clock. + /// The pair name. + /// The Primitives-named builder. + /// The Rx/LINQ-named builder. + /// The source factory. + /// The expected forwarded values. + /// Whether a is expected. + public sealed record TimeCase( + string Name, + Func, ISequencer, IObservable> Deviant, + Func, ISequencer, IObservable> Rx, + Func> Source, + int[] Expected, + bool ExpectsTimeout) + { + /// + public override string ToString() => Name; + } +} From d9f7cfc5963d61814bb07a3770c900fedebee3bf Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:35:41 +1000 Subject: [PATCH 2/8] refactor(Chain): add direct two-source ChainSignal so Concat/Chain share a sink MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The binary Concat (Rx name) was calling Signal.Chain (a Primitives-vocabulary factory) rather than constructing a sink directly. Add a two-source ChainSignal constructor and a matching ChainCoordinator.Run(first, second) that enqueues both inners directly, and have both the binary Concat and binary Chain build it — no FromEnumerable wrapper, no cross-vocabulary call. Every Rx name now constructs its sink directly. --- .../SignalOperatorMixins.Coordinators.cs | 41 +++++++++++++++++-- .../SignalOperatorMixins.cs | 17 +++++++- .../SignalOperatorParityMixins.RxNames.cs | 17 +++++++- 3 files changed, 67 insertions(+), 8 deletions(-) diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs index 407a90d..0f3f40a 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs @@ -187,13 +187,28 @@ public IDisposable Subscribe(IObserver observer) /// The value type. private sealed class ChainSignal : IObservable { - /// The outer sequence of inner sources. - private readonly IObservable> _sources; + /// The outer sequence of inner sources, when constructed from a source-of-sources. + private readonly IObservable>? _sources; + + /// The first inner source, when constructed from two sources. + private readonly IObservable? _first; - /// Initializes a new instance of the class. + /// The second inner source, when constructed from two sources. + private readonly IObservable? _second; + + /// Initializes a new instance of the class from a source-of-sources. /// The outer sequence of inner sources. internal ChainSignal(IObservable> sources) => _sources = sources; + /// Initializes a new instance of the class from two sources. + /// The first source. + /// The second source. + internal ChainSignal(IObservable first, IObservable second) + { + _first = first; + _second = second; + } + /// public IDisposable Subscribe(IObserver observer) { @@ -202,7 +217,8 @@ public IDisposable Subscribe(IObserver observer) throw new ArgumentNullException(nameof(observer)); } - return new ChainCoordinator(observer).Run(_sources); + var coordinator = new ChainCoordinator(observer); + return _sources is not null ? coordinator.Run(_sources) : coordinator.Run(_first!, _second!); } } @@ -244,6 +260,23 @@ internal ChainCoordinator Run(IObservable> sources) return this; } + /// Subscribes the two fixed inner sources in order. + /// The first source. + /// The second source. + /// The coordinator that owns the subscription cleanup. + internal ChainCoordinator Run(IObservable first, IObservable second) + { + lock (_gate) + { + _queue.Enqueue(first); + _queue.Enqueue(second); + _outerCompleted = true; + } + + Drain(); + return this; + } + /// Queues a new inner source and pumps the drain. /// The inner source. private void OnSource(IObservable source) diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs index 5411b87..239dbe7 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.cs @@ -444,8 +444,21 @@ public static IObservable Chain(this IObservable> sources) /// The first sequence. /// The second sequence. /// A sequence that emits after completes. - public static IObservable Chain(this IObservable first, IObservable second) => - Signal.Chain(first, second); + /// or is . + public static IObservable Chain(this IObservable first, IObservable second) + { + if (first == null) + { + throw new ArgumentNullException(nameof(first)); + } + + if (second == null) + { + throw new ArgumentNullException(nameof(second)); + } + + return new ChainSignal(first, second); + } /// /// Subscribes to all inner sequences and forwards their values as they arrive. diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs index 67ec01b..b4a2ca0 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.RxNames.cs @@ -408,8 +408,21 @@ public static IObservable Concat(this IObservable> sources) /// The first sequence. /// The second sequence. /// A sequence that emits after completes. - public static IObservable Concat(this IObservable first, IObservable second) => - Signal.Chain(first, second); + /// or is . + public static IObservable Concat(this IObservable first, IObservable second) + { + if (first == null) + { + throw new ArgumentNullException(nameof(first)); + } + + if (second == null) + { + throw new ArgumentNullException(nameof(second)); + } + + return new ChainSignal(first, second); + } /// /// Mirrors the first inner sequence to produce any notification. System.Reactive name for Race. From f1ba7ec17e09a98f17e95e55bdb5d9b6d3a3fb71 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 14:49:42 +1000 Subject: [PATCH 3/8] refactor: share subscription-slot management; drop duplicated sinks and CPD exclusions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the interlocked two-slot subscription management that RecoverSignal and ResumeSignal both copied (Release/Assign + a per-class DisposedMarker) into a shared internal static SubscriptionSlots helper, reusing the existing public DisposedMarker sentinel. Eliminate TimestampSignal entirely: Timestamp is a stateful map, so it now builds MapWithSignal with a non-capturing CreateMoment selector. Remove the sink CPD exclusions added earlier (RecoverSignal/ResumeSignal no longer duplicate; TimestampSignal is gone) — only the deliberately-duplicated Rx-name operator-body file remains excluded. --- .github/workflows/sonarcloud.yml | 2 +- .../SignalOperatorParityMixins.cs | 9 +- .../Core/RecoverSignal{T,TException}.cs | 46 +------ .../Signals/Core/ResumeSignal{T}.cs | 46 +------ .../Signals/Core/SubscriptionSlots.cs | 43 +++++++ .../Signals/TimestampSignal{T}.cs | 112 ------------------ 6 files changed, 60 insertions(+), 198 deletions(-) create mode 100644 src/ReactiveUI.Primitives/Signals/Core/SubscriptionSlots.cs delete mode 100644 src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs diff --git a/.github/workflows/sonarcloud.yml b/.github/workflows/sonarcloud.yml index 51b66e1..3110997 100644 --- a/.github/workflows/sonarcloud.yml +++ b/.github/workflows/sonarcloud.yml @@ -40,7 +40,7 @@ jobs: sonarOrganization: reactiveui sonarExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/TestResults/**' sonarCoverageExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/*Tests/**,**/*Tests.cs,**/Generated/**' - sonarCpdExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/CombineLatest?.cs,**/CombineLatest??.cs,**/ReactiveUI.Primitives.Blazor/**,**/ReactiveUI.Primitives.Maui/**,**/ReactiveUI.Primitives.WinUI/**,**/ReactiveUI.Primitives.WinForms/**,**/ReactiveUI.Primitives.Wpf/**,**/SignalOperatorParityMixins.RxNames.cs,**/MapWithSignal*.cs,**/KeepWithSignal*.cs,**/TapWithSignal*.cs' + sonarCpdExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/CombineLatest?.cs,**/CombineLatest??.cs,**/ReactiveUI.Primitives.Blazor/**,**/ReactiveUI.Primitives.Maui/**,**/ReactiveUI.Primitives.WinUI/**,**/ReactiveUI.Primitives.WinForms/**,**/ReactiveUI.Primitives.Wpf/**,**/SignalOperatorParityMixins.RxNames.cs' sonarTestExclusions: '**/tests/**,**/tools/**,**/benchmarks/**' testTimeout: '15m' secrets: diff --git a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs index 1771723..2b36cea 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorParityMixins.cs @@ -841,7 +841,7 @@ public static IObservable> Timestamp(this IObservable source, IS return new TimestampRangeSignal(range, scheduler); } - return new TimestampSignal(source, scheduler); + return new MapWithSignal>(source, scheduler, CreateMoment); } /// @@ -1452,4 +1452,11 @@ private static async Task CollectAsyncEnumerableArrayAsync(IAsyncEnumera source is RangeSignal range && CanReadRangeAs(typeof(T)) ? Task.FromResult((T)(object)(range.Start + range.Count - 1)) : null; + + /// Stamps a value with the supplied scheduler's current time. A non-capturing selector reused by Timestamp via MapWith. + /// The value type. + /// The sequencer that supplies the timestamp. + /// The value to stamp. + /// The value paired with the scheduler timestamp. + private static Moment CreateMoment(ISequencer scheduler, T value) => new(value, scheduler.Now); } diff --git a/src/ReactiveUI.Primitives/Signals/Core/RecoverSignal{T,TException}.cs b/src/ReactiveUI.Primitives/Signals/Core/RecoverSignal{T,TException}.cs index d9a5d79..b2d8178 100644 --- a/src/ReactiveUI.Primitives/Signals/Core/RecoverSignal{T,TException}.cs +++ b/src/ReactiveUI.Primitives/Signals/Core/RecoverSignal{T,TException}.cs @@ -62,9 +62,6 @@ public IDisposable Subscribe(IObserver observer) /// Forwards source values and, on a caught error, switches to the fallback sequence. private sealed class RecoverObserver : IObserver, IDisposable { - /// Marker stored in a slot once the sink is disposed. - private static readonly IDisposable Disposed = new DisposedMarker(); - /// The downstream observer. private readonly IObserver _observer; @@ -144,8 +141,8 @@ public void OnCompleted() /// public void Dispose() { - Release(ref _sourceSubscription); - Release(ref _fallbackSubscription); + SubscriptionSlots.Release(ref _sourceSubscription); + SubscriptionSlots.Release(ref _fallbackSubscription); } /// Subscribes to the source and returns the sink. @@ -153,47 +150,12 @@ public void Dispose() /// This sink, which is the subscription. internal RecoverObserver Run(IObservable source) { - Assign(ref _sourceSubscription, source.Subscribe(this)); + SubscriptionSlots.Assign(ref _sourceSubscription, source.Subscribe(this)); return this; } - /// Exchanges a slot for the disposed marker and releases any live subscription. - /// The slot to release. - private static void Release(ref IDisposable? slot) - { - var current = Interlocked.Exchange(ref slot, Disposed); - if (current == null || ReferenceEquals(current, Disposed)) - { - return; - } - - current.Dispose(); - } - - /// Stores a subscription into an empty slot, disposing it instead if the sink is already disposed. - /// The target slot. - /// The subscription to store. - private static void Assign(ref IDisposable? slot, IDisposable subscription) - { - if (Interlocked.CompareExchange(ref slot, subscription, null) == null) - { - return; - } - - subscription.Dispose(); - } - /// Stores the fallback subscription. /// The fallback subscription. - private void SetFallback(IDisposable subscription) => Assign(ref _fallbackSubscription, subscription); - - /// No-op disposable used as the disposed-slot sentinel. - private sealed class DisposedMarker : IDisposable - { - /// - public void Dispose() - { - } - } + private void SetFallback(IDisposable subscription) => SubscriptionSlots.Assign(ref _fallbackSubscription, subscription); } } diff --git a/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs b/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs index 78189ea..2c7631b 100644 --- a/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs +++ b/src/ReactiveUI.Primitives/Signals/Core/ResumeSignal{T}.cs @@ -60,9 +60,6 @@ public IDisposable Subscribe(IObserver observer) /// Forwards source values and, on any error, switches to the fallback sequence. private sealed class ResumeObserver : IObserver, IDisposable { - /// Marker stored in a slot once the sink is disposed. - private static readonly IDisposable Disposed = new DisposedMarker(); - /// The downstream observer. private readonly IObserver _observer; @@ -106,8 +103,8 @@ public void OnCompleted() /// public void Dispose() { - Release(ref _sourceSubscription); - Release(ref _fallbackSubscription); + SubscriptionSlots.Release(ref _sourceSubscription); + SubscriptionSlots.Release(ref _fallbackSubscription); } /// Subscribes to the source and returns the sink. @@ -115,47 +112,12 @@ public void Dispose() /// This sink, which is the subscription. internal ResumeObserver Run(IObservable source) { - Assign(ref _sourceSubscription, source.Subscribe(this)); + SubscriptionSlots.Assign(ref _sourceSubscription, source.Subscribe(this)); return this; } - /// Exchanges a slot for the disposed marker and releases any live subscription. - /// The slot to release. - private static void Release(ref IDisposable? slot) - { - var current = Interlocked.Exchange(ref slot, Disposed); - if (current == null || ReferenceEquals(current, Disposed)) - { - return; - } - - current.Dispose(); - } - - /// Stores a subscription into an empty slot, disposing it instead if the sink is already disposed. - /// The target slot. - /// The subscription to store. - private static void Assign(ref IDisposable? slot, IDisposable subscription) - { - if (Interlocked.CompareExchange(ref slot, subscription, null) == null) - { - return; - } - - subscription.Dispose(); - } - /// Stores the fallback subscription. /// The fallback subscription. - private void SetFallback(IDisposable subscription) => Assign(ref _fallbackSubscription, subscription); - - /// No-op disposable used as the disposed-slot sentinel. - private sealed class DisposedMarker : IDisposable - { - /// - public void Dispose() - { - } - } + private void SetFallback(IDisposable subscription) => SubscriptionSlots.Assign(ref _fallbackSubscription, subscription); } } diff --git a/src/ReactiveUI.Primitives/Signals/Core/SubscriptionSlots.cs b/src/ReactiveUI.Primitives/Signals/Core/SubscriptionSlots.cs new file mode 100644 index 0000000..24b5658 --- /dev/null +++ b/src/ReactiveUI.Primitives/Signals/Core/SubscriptionSlots.cs @@ -0,0 +1,43 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace ReactiveUI.Primitives.Signals.Core; + +/// +/// Helpers for the interlocked single-assignment subscription slots shared by the catch-style sinks +/// (, ): a slot holds at most one live +/// subscription and, once the sink is disposed, swaps to a sentinel so a late assignment is disposed instead of +/// stored. +/// +internal static class SubscriptionSlots +{ + /// The sentinel stored in a slot once it has been released. + public static readonly IDisposable Disposed = new DisposedMarker(); + + /// Exchanges a slot for the disposed sentinel and disposes any live subscription it held. + /// The slot to release. + public static void Release(ref IDisposable? slot) + { + var current = Interlocked.Exchange(ref slot, Disposed); + if (current is null || ReferenceEquals(current, Disposed)) + { + return; + } + + current.Dispose(); + } + + /// Stores a subscription into an empty slot, disposing it instead if the slot is already released. + /// The target slot. + /// The subscription to store. + public static void Assign(ref IDisposable? slot, IDisposable subscription) + { + if (Interlocked.CompareExchange(ref slot, subscription, null) is null) + { + return; + } + + subscription.Dispose(); + } +} diff --git a/src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs b/src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs deleted file mode 100644 index 98a5ff3..0000000 --- a/src/ReactiveUI.Primitives/Signals/TimestampSignal{T}.cs +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. -// ReactiveUI Association Incorporated licenses this file to you under the MIT license. -// See the LICENSE file in the project root for full license information. - -using ReactiveUI.Primitives.Concurrency; -using ReactiveUI.Primitives.Core; - -namespace ReactiveUI.Primitives.Signals; - -/// -/// Annotates each source value with the scheduler timestamp at which it was observed, holding the sequencer directly -/// so no per-subscription closure is allocated. -/// -/// The value type. -internal sealed class TimestampSignal : IRequireCurrentThread> -{ - /// The source sequence. - private readonly IObservable _source; - - /// The sequencer that supplies timestamps. - private readonly ISequencer _scheduler; - - /// Initializes a new instance of the class. - /// The source sequence. - /// The sequencer that supplies timestamps. - internal TimestampSignal(IObservable source, ISequencer scheduler) - { - _source = source; - _scheduler = scheduler; - } - - /// - /// Determines whether the sink must subscribe on the current thread. - /// - /// when the source requires current-thread subscription. - public bool IsRequiredSubscribeOnCurrentThread() => - _source is IRequireCurrentThread currentThread && currentThread.IsRequiredSubscribeOnCurrentThread(); - - /// - /// Subscribes the observer to the timestamped sequence. - /// - /// The downstream observer. - /// The subscription handle. - public IDisposable Subscribe(IObserver> observer) - { - if (observer == null) - { - throw new ArgumentNullException(nameof(observer)); - } - - return _source.Subscribe(new TimestampObserver(observer, _scheduler)); - } - - /// Stamps each source value with the current scheduler time. - private sealed class TimestampObserver : IObserver - { - /// The downstream observer. - private readonly IObserver> _observer; - - /// The sequencer that supplies timestamps. - private readonly ISequencer _scheduler; - - /// Whether a terminal notification has been forwarded. - private bool _stopped; - - /// Initializes a new instance of the class. - /// The downstream observer. - /// The sequencer that supplies timestamps. - public TimestampObserver(IObserver> observer, ISequencer scheduler) - { - _observer = observer; - _scheduler = scheduler; - } - - /// Forwards completion downstream. - public void OnCompleted() - { - if (_stopped) - { - return; - } - - _stopped = true; - _observer.OnCompleted(); - } - - /// Forwards an error downstream. - /// The error value. - public void OnError(Exception error) - { - if (_stopped) - { - return; - } - - _stopped = true; - _observer.OnError(error); - } - - /// Stamps and forwards a source value. - /// The source value. - public void OnNext(T value) - { - if (_stopped) - { - return; - } - - _observer.OnNext(new Moment(value, _scheduler.Now)); - } - } -} From 55111516b093a734555eaf66cc349be7f2218eba Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 15:06:45 +1000 Subject: [PATCH 4/8] fix(benchmarks): force System.Reactive resolution where the Rx name layer collides The benchmark namespace nests under ReactiveUI.Primitives, so the new LinqMixins Rx-named extensions are preferred over the imported System.Reactive.Linq ones. Two SystemReactive competitor calls broke the build: Materialize (Spark vs Notification return type) and a Where whose lambda IDE0200 wanted removed (a false positive once our single-overload Where was selected). Call both via the RxObservable static alias so they unambiguously measure System.Reactive. --- .../OperatorPassThroughBenchmarks.cs | 2 +- .../ReactiveExtensionsComparisonBenchmarks.Competitors.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs index 0aef44a..104290e 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs @@ -122,7 +122,7 @@ public int PrimitivesMaterializeRange() public int SystemReactiveMaterializeRange() { var observer = new CountingSignalObserver>(); - using var subscription = RxObservable.Range(1, Count).Materialize().Subscribe(observer); + using var subscription = RxObservable.Materialize(RxObservable.Range(1, Count)).Subscribe(observer); return observer.Count; } diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs index bcef675..6fb3050 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs @@ -54,7 +54,7 @@ private static int SystemReactiveCombineLatestValuesAreAllTrue() => private static int SystemReactiveFilter() { var regex = EvenRegex(); - return DrainString(RxObservable.ToObservable(StringValues).Where(value => regex.IsMatch(value))); + return DrainString(RxObservable.Where(RxObservable.ToObservable(StringValues), value => regex.IsMatch(value))); } /// From a645cdae874aa23ec7d98648c6c8f11d31c863b9 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 15:15:51 +1000 Subject: [PATCH 5/8] fix(benchmarks): force System.Reactive resolution across all SR competitors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The benchmark namespace nests under ReactiveUI.Primitives, so instance-style calls of the new Rx name-layer operators (Select/Where/Do/Scan/SelectMany/Materialize/ Dematerialize/IgnoreElements/Retry/Zip/CombineLatest, plus Buffer in one chain) silently bound to Primitives instead of System.Reactive — making the SystemReactive* benchmarks measure the wrong library. Rewrite those competitor calls in the explicit static form (RxObservable.Op(source, ...)) so they bind to System.Reactive. That static-call style is what RCS1196 flags, so it is disabled for the benchmark project via a scoped .editorconfig with a justification. --- .../.editorconfig | 7 ++++ .../OperatorFilterCastBenchmarks.cs | 4 +- .../OperatorFlatMapRangeBenchmarks.cs | 3 +- .../OperatorPassThroughBenchmarks.cs | 8 ++-- .../OperatorStatefulVariantBenchmarks.cs | 6 +-- .../OperatorZipBenchmarks.cs | 3 +- ...ensionsComparisonBenchmarks.Competitors.cs | 37 +++++++++---------- 7 files changed, 36 insertions(+), 32 deletions(-) create mode 100644 src/benchmarks/ReactiveUI.Primitives.Benchmarks/.editorconfig diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/.editorconfig b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/.editorconfig new file mode 100644 index 0000000..004c849 --- /dev/null +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/.editorconfig @@ -0,0 +1,7 @@ +# The SystemReactive competitor benchmarks call System.Reactive operators in their static form +# (e.g. RxObservable.Select(source, selector)) on purpose: the benchmark namespace nests under +# ReactiveUI.Primitives, so an instance-style call (source.Select(...)) would bind to the +# ReactiveUI.Primitives name-layer extensions instead of System.Reactive and silently benchmark the +# wrong library. RCS1196 (call extension method as instance method) is therefore disabled here. +[*.cs] +dotnet_diagnostic.RCS1196.severity = none diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFilterCastBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFilterCastBenchmarks.cs index 7ef8496..de38819 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFilterCastBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFilterCastBenchmarks.cs @@ -90,7 +90,7 @@ public int PrimitivesKeepType() public int SystemReactiveKeepType() { var observer = new CountingSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Select(static _ => (object)Shared).OfType().Subscribe(observer); + using var subscription = RxObservable.Select(RxObservable.Range(1, Count), static _ => (object)Shared).OfType().Subscribe(observer); return observer.Count; } @@ -127,7 +127,7 @@ public int PrimitivesCastTo() public int SystemReactiveCastTo() { var observer = new CountingSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Select(static _ => (object)Shared).Cast().Subscribe(observer); + using var subscription = RxObservable.Select(RxObservable.Range(1, Count), static _ => (object)Shared).Cast().Subscribe(observer); return observer.Count; } diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFlatMapRangeBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFlatMapRangeBenchmarks.cs index f8430e3..329448b 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFlatMapRangeBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorFlatMapRangeBenchmarks.cs @@ -36,8 +36,7 @@ public int PrimitivesFlatMapRange() public int SystemReactiveSelectManyRange() { var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, 8) - .SelectMany(static x => RxObservable.Range(x * 10, 2)) + using var subscription = RxObservable.SelectMany(RxObservable.Range(1, 8), static x => RxObservable.Range(x * 10, 2)) .Subscribe(observer); return observer.Total; } diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs index 104290e..991765e 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorPassThroughBenchmarks.cs @@ -49,7 +49,7 @@ public int PrimitivesTapRange() public int SystemReactiveTapRange() { var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Do(static _ => { }).Subscribe(observer); + using var subscription = RxObservable.Do(RxObservable.Range(1, Count), static _ => { }).Subscribe(observer); return observer.Total; } @@ -86,7 +86,7 @@ public int PrimitivesIgnoreValuesRange() public int SystemReactiveIgnoreValuesRange() { var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).IgnoreElements().Subscribe(observer); + using var subscription = RxObservable.IgnoreElements(RxObservable.Range(1, Count)).Subscribe(observer); return observer.CompletionCount; } @@ -158,7 +158,7 @@ public int PrimitivesDematerializeRange() public int SystemReactiveDematerializeRange() { var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Materialize().Dematerialize().Subscribe(observer); + using var subscription = RxObservable.Dematerialize(RxObservable.Materialize(RxObservable.Range(1, Count))).Subscribe(observer); return observer.Total; } @@ -235,7 +235,7 @@ public int PrimitivesReattemptRange() public int SystemReactiveReattemptRange() { var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Retry(RetryCount).Subscribe(observer); + using var subscription = RxObservable.Retry(RxObservable.Range(1, Count), RetryCount).Subscribe(observer); return observer.Total; } diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorStatefulVariantBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorStatefulVariantBenchmarks.cs index e4290d1..bf17cbf 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorStatefulVariantBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorStatefulVariantBenchmarks.cs @@ -54,7 +54,7 @@ public int SystemReactiveSelectClosure() { var factor = _factor; var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Select(x => x * factor).Subscribe(observer); + using var subscription = RxObservable.Select(RxObservable.Range(1, Count), x => x * factor).Subscribe(observer); return observer.Total; } @@ -92,7 +92,7 @@ public int SystemReactiveWhereClosure() { var threshold = _threshold; var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Where(x => x > threshold).Subscribe(observer); + using var subscription = RxObservable.Where(RxObservable.Range(1, Count), x => x > threshold).Subscribe(observer); return observer.Total; } @@ -130,7 +130,7 @@ public int SystemReactiveDoClosure() { var factor = _factor; var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(1, Count).Do(x => _ = x * factor).Subscribe(observer); + using var subscription = RxObservable.Do(RxObservable.Range(1, Count), x => _ = x * factor).Subscribe(observer); return observer.Total; } diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorZipBenchmarks.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorZipBenchmarks.cs index d2f27c7..12c865a 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorZipBenchmarks.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/OperatorZipBenchmarks.cs @@ -54,8 +54,7 @@ public int PrimitivesZip() public int SystemReactiveZip() { var observer = new IntSignalObserver(); - using var subscription = RxObservable.Range(LeftStart, Count) - .Zip(RxObservable.Range(RightStart, Count), static (left, right) => left + right) + using var subscription = RxObservable.Zip(RxObservable.Range(LeftStart, Count), RxObservable.Range(RightStart, Count), static (left, right) => left + right) .Subscribe(observer); return observer.Total; } diff --git a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs index 6fb3050..6b53b8c 100644 --- a/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs +++ b/src/benchmarks/ReactiveUI.Primitives.Benchmarks/ReactiveExtensionsComparisonBenchmarks.Competitors.cs @@ -17,7 +17,7 @@ public partial class ReactiveExtensionsComparisonBenchmarks /// /// The SystemReactiveAsSignal result. private static int SystemReactiveAsSignal() => - DrainPrimitiveUnit(RxObservable.Range(0, Count).Select(static _ => RxVoid.Default)); + DrainPrimitiveUnit(RxObservable.Select(RxObservable.Range(0, Count), static _ => RxVoid.Default)); /// /// Executes the SystemReactiveCatchAndReturn benchmark helper. @@ -76,21 +76,21 @@ private static int SystemReactiveFromArray() => /// /// The SystemReactiveGetMax result. private static int SystemReactiveGetMax() => - DrainInt(RxObservable.Return(FirstValue).CombineLatest(RxObservable.Return(SecondValue), static (left, right) => Math.Max(left, right))); + DrainInt(RxObservable.CombineLatest(RxObservable.Return(FirstValue), RxObservable.Return(SecondValue), static (left, right) => Math.Max(left, right))); /// /// Executes the SystemReactiveGetMin benchmark helper. /// /// The SystemReactiveGetMin result. private static int SystemReactiveGetMin() => - DrainInt(RxObservable.Return(FirstValue).CombineLatest(RxObservable.Return(SecondValue), static (left, right) => Math.Min(left, right))); + DrainInt(RxObservable.CombineLatest(RxObservable.Return(FirstValue), RxObservable.Return(SecondValue), static (left, right) => Math.Min(left, right))); /// /// Executes the SystemReactiveNot benchmark helper. /// /// The SystemReactiveNot result. private static int SystemReactiveNot() => - DrainBool(RxObservable.ToObservable(BooleanValues).Select(static value => !value)); + DrainBool(RxObservable.Select(RxObservable.ToObservable(BooleanValues), static value => !value)); /// /// Executes the SystemReactivePairwise benchmark helper. @@ -99,10 +99,11 @@ private static int SystemReactiveNot() => private static int SystemReactivePairwise() { var observer = new PairObserver(); - using var subscription = RxObservable.Range(0, Count) - .Buffer(2, 1) - .Where(static values => values.Count == 2) - .Select(static values => (Previous: values[0], Current: values[1])) + using var subscription = RxObservable.Select( + RxObservable.Where( + RxObservable.Buffer(RxObservable.Range(0, Count), 2, 1), + static values => values.Count == 2), + static values => (Previous: values[0], Current: values[1])) .Subscribe(observer); return observer.Total; } @@ -119,37 +120,35 @@ private static int SystemReactiveReturn() => /// /// The SystemReactiveScanWithInitial result. private static int SystemReactiveScanWithInitial() => - DrainInt(RxObservable.Range(0, Count).Scan(0, static (acc, value) => acc + value)); + DrainInt(RxObservable.Scan(RxObservable.Range(0, Count), 0, static (acc, value) => acc + value)); /// /// Executes the SystemReactiveSelectAsyncScenario benchmark helper. /// /// The SystemReactiveSelectAsyncScenario result. private static int SystemReactiveSelectAsyncScenario() => - DrainInt(RxObservable.Range(0, Count).SelectMany(static value => RxObservable.FromAsync(() => Task.FromResult(value + 1)))); + DrainInt(RxObservable.SelectMany(RxObservable.Range(0, Count), static value => RxObservable.FromAsync(() => Task.FromResult(value + 1)))); /// /// Executes the SystemReactiveSelectConstant benchmark helper. /// /// The SystemReactiveSelectConstant result. private static int SystemReactiveSelectConstant() => - DrainInt(RxObservable.Range(0, Count).Select(static _ => Value)); + DrainInt(RxObservable.Select(RxObservable.Range(0, Count), static _ => Value)); /// /// Executes the SystemReactiveSelectManyThen benchmark helper. /// /// The SystemReactiveSelectManyThen result. private static int SystemReactiveSelectManyThen() => - DrainInt(RxObservable.Return(Value) - .SelectMany(static value => RxObservable.Return(value + 1)) - .SelectMany(static value => RxObservable.Return(value + 1))); + DrainInt(RxObservable.SelectMany(RxObservable.SelectMany(RxObservable.Return(Value), static value => RxObservable.Return(value + 1)), static value => RxObservable.Return(value + 1))); /// /// Executes the SystemReactiveSkipWhileNull benchmark helper. /// /// The SystemReactiveSkipWhileNull result. private static int SystemReactiveSkipWhileNull() => - DrainString(RxObservable.ToObservable(NullableStrings).SkipWhile(static value => value is null).Select(static value => value!)); + DrainString(RxObservable.Select(RxObservable.ToObservable(NullableStrings).SkipWhile(static value => value is null), static value => value!)); /// /// Executes the SystemReactiveTakeUntil benchmark helper. @@ -177,28 +176,28 @@ private static int SystemReactiveWaitUntil() => /// /// The SystemReactiveWhereFalse result. private static int SystemReactiveWhereFalse() => - DrainBool(RxObservable.ToObservable(BooleanValues).Where(static value => !value)); + DrainBool(RxObservable.Where(RxObservable.ToObservable(BooleanValues), static value => !value)); /// /// Executes the SystemReactiveWhereIsNotNull benchmark helper. /// /// The SystemReactiveWhereIsNotNull result. private static int SystemReactiveWhereIsNotNull() => - DrainString(RxObservable.ToObservable(NullableStrings).Where(static value => value is not null).Select(static value => value!)); + DrainString(RxObservable.Select(RxObservable.Where(RxObservable.ToObservable(NullableStrings), static value => value is not null), static value => value!)); /// /// Executes the SystemReactiveWhereSelect benchmark helper. /// /// The SystemReactiveWhereSelect result. private static int SystemReactiveWhereSelect() => - DrainInt(RxObservable.Range(0, Count).Where(static value => (value & 1) == 0).Select(static value => value * ResultMultiplier)); + DrainInt(RxObservable.Select(RxObservable.Where(RxObservable.Range(0, Count), static value => (value & 1) == 0), static value => value * ResultMultiplier)); /// /// Executes the SystemReactiveWhereTrue benchmark helper. /// /// The SystemReactiveWhereTrue result. private static int SystemReactiveWhereTrue() => - DrainBool(RxObservable.ToObservable(BooleanValues).Where(static value => value)); + DrainBool(RxObservable.Where(RxObservable.ToObservable(BooleanValues), static value => value)); /// /// Executes the R3AsSignal benchmark helper. From f9d2ef91d08a76712afa999e9569ce550f609603 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 15:38:50 +1000 Subject: [PATCH 6/8] test: cover the Rx name-layer null guards, stateful-sink error paths, and Resume New-code coverage on the name layer was 60% (gate is 80%). Add: null-argument and out-of-range guard tests across all Rx names (covers the throw branches in RxNames, the bulk of the gap); stateful-sink value-then-error and throwing- projection tests (MapWith/KeepWith/TapWith OnError + catch paths); Resume fallback-on-error, plain-completion, and dispose tests (ResumeSignal); a Sample/Probe identity test and a 3-arg SelectMany/FlatMap parity test. New files now 83-100% covered. --- .../RxNameParityTests.cs | 223 ++++++++++++++++++ 1 file changed, 223 insertions(+) diff --git a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs index 79ece63..2429cb2 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for full license information. using ReactiveUI.Primitives.Concurrency; +using ReactiveUI.Primitives.Core; using ReactiveUI.Primitives.Signals; namespace ReactiveUI.Primitives.Tests; @@ -35,6 +36,12 @@ public class RxNameParityTests /// The value thirty, pushed by the binary drive scripts. private const int Thirty = 30; + /// An invalid negative count/interval used by the out-of-range tests. + private const int NegativeOne = -1; + + /// A shared error message. + private const string Boom = "boom"; + /// The fixed delay/timeout in ticks. private const long DueTicks = 1; @@ -92,6 +99,12 @@ public class RxNameParityTests /// Expected output for the WithLatestFrom drive script. private static readonly int[] _latched = [11, 12, 23]; + /// A single forwarded value before a terminal notification. + private static readonly int[] _tenOnly = [10]; + + /// A source value followed by the fallback sequence after Resume switches. + private static readonly int[] _tenThenFallback = [10, 1, 2, 3]; + /// Provides the unary IObservable<int> -> IObservable<int> parity cases. /// The unary parity cases. public static IEnumerable UnaryCases() @@ -223,6 +236,156 @@ public void BinaryConcatMatchesChain() Assert.Equal(chain, concat); } + /// Verifies every Rx name throws for a null source. + [Test] + public void RxNamesThrowOnNullSource() + { + var other = Signal.FromEnumerable(_oneToThree); + Assert.Throws(() => default(IObservable)!.Select(Double)); + Assert.Throws(() => default(IObservable)!.SelectWith(Ten, AddState)); + Assert.Throws(() => default(IObservable)!.Where(IsEven)); + Assert.Throws(() => default(IObservable)!.WhereWith(Two, IsMultiple)); + Assert.Throws(() => default(IObservable)!.WhereNotNull()); + Assert.Throws(() => default(IObservable)!.Do(Ignore)); + Assert.Throws(() => default(IObservable)!.DoWith(Ten, IgnoreState)); + Assert.Throws(() => default(IObservable)!.Scan(Seed, Add)); + Assert.Throws(() => default(IObservable)!.Aggregate(Seed, Add)); + Assert.Throws(() => default(IObservable)!.DistinctUntilChanged()); + Assert.Throws(() => default(IObservable)!.DistinctUntilChangedBy(Identity)); + Assert.Throws(() => default(IObservable)!.IgnoreElements()); + Assert.Throws(() => default(IObservable)!.SelectMany(Fan)); + Assert.Throws(() => default(IObservable>)!.Merge()); + Assert.Throws(() => default(IObservable>)!.Concat()); + Assert.Throws(() => default(IObservable)!.Concat(other)); + Assert.Throws(() => default(IObservable>)!.Amb()); + Assert.Throws(() => default(IObservable>)!.Switch()); + Assert.Throws(() => default(IObservable)!.Zip(other, Add)); + Assert.Throws(() => default(IObservable)!.CombineLatest(other, Add)); + Assert.Throws(() => default(IObservable)!.WithLatestFrom(other, Add)); + Assert.Throws(() => default(IObservable)!.Delay(TimeSpan.FromTicks(DueTicks))); + Assert.Throws(() => default(IObservable)!.Timeout(TimeSpan.FromTicks(DueTicks))); + Assert.Throws(() => default(IObservable)!.Sample(TimeSpan.FromTicks(DueTicks))); + Assert.Throws(() => default(IObservable)!.Retry(Two)); + Assert.Throws(() => default(IObservable)!.Materialize()); + Assert.Throws(() => default(IObservable>)!.Dematerialize()); + Assert.Throws(() => default(IObservable)!.Resume(other)); + Assert.Throws(() => other.Resume(null!)); + } + + /// Verifies the Rx names throw for a null projection/predicate. + [Test] + public void RxNamesThrowOnNullSelector() + { + var source = Signal.FromEnumerable(_oneToFive); + Assert.Throws(() => source.Select(null!)); + Assert.Throws(() => source.SelectWith(Ten, null!)); + Assert.Throws(() => source.Where(null!)); + Assert.Throws(() => source.WhereWith(Two, null!)); + Assert.Throws(() => source.Do(null!)); + Assert.Throws(() => source.DoWith(Ten, null!)); + Assert.Throws(() => source.Scan(Seed, null!)); + Assert.Throws(() => source.Aggregate(Seed, null!)); + Assert.Throws(() => source.DistinctUntilChangedBy(null!)); + Assert.Throws(() => source.SelectMany(null!)); + Assert.Throws(() => source.Zip(source, null!)); + Assert.Throws(() => source.CombineLatest(source, null!)); + Assert.Throws(() => source.WithLatestFrom(source, null!)); + } + + /// Verifies the count/interval guards throw . + [Test] + public void RxNamesThrowOnNegativeArguments() + { + var source = Signal.FromEnumerable(_oneToFive); + Assert.Throws(() => source.Retry(NegativeOne)); + Assert.Throws(() => source.Sample(TimeSpan.FromTicks(NegativeOne))); + } + + /// Verifies the stateful sinks forward a value and then an error (covers their error path). + [Test] + public void StatefulSinksForwardValueThenError() + { + Assert.True(RunStatefulError(s => s.SelectWith(Ten, AddState))); + Assert.True(RunStatefulError(s => s.WhereWith(Two, IsMultiple))); + Assert.True(RunStatefulError(s => s.DoWith(Ten, IgnoreState))); + } + + /// Verifies the stateful projection sinks forward an exception thrown by the projection (covers their catch path). + [Test] + public void StatefulProjectionForwardsThrownError() + { + Assert.True(RunStatefulThrow(s => s.SelectWith(Ten, ThrowProjection))); + Assert.True(RunStatefulThrow(s => s.WhereWith(Two, ThrowPredicate))); + } + + /// Verifies Resume switches to the fallback sequence after the source errors. + [Test] + public void ResumeSwitchesToFallbackOnError() + { + var source = new Signal(); + var values = new List(); + var completed = 0; + + using var subscription = source.Resume(Signal.FromEnumerable(_oneToThree)).Subscribe(values.Add, static ex => throw ex, () => completed++); + source.OnNext(Ten); + source.OnError(new InvalidOperationException(Boom)); + + Assert.Equal(_tenThenFallback, values); + Assert.Equal(One, completed); + } + + /// Verifies Resume forwards source completion without subscribing the fallback. + [Test] + public void ResumeForwardsCompletionWithoutFallback() + { + var source = new Signal(); + var values = new List(); + var completed = 0; + + using var subscription = source.Resume(Signal.FromEnumerable(_oneToThree)).Subscribe(values.Add, static ex => throw ex, () => completed++); + source.OnNext(Ten); + source.OnCompleted(); + + Assert.Equal(_tenOnly, values); + Assert.Equal(One, completed); + } + + /// Verifies disposing Resume stops forwarding from the source. + [Test] + public void ResumeDisposeStopsForwarding() + { + var source = new Signal(); + var values = new List(); + + var subscription = source.Resume(Signal.FromEnumerable(_oneToThree)).Subscribe(values.Add); + source.OnNext(Ten); + subscription.Dispose(); + source.OnNext(Twenty); + + Assert.Equal(_tenOnly, values); + } + + /// Verifies Sample mirrors Probe when sampled against an identical virtual clock drive. + [Test] + public void SampleMatchesProbe() + { + Assert.Equal(RunSampling((s, c) => s.Probe(TimeSpan.FromTicks(Two), c)), RunSampling((s, c) => s.Sample(TimeSpan.FromTicks(Two), c))); + } + + /// Verifies the 3-arg SelectMany mirrors the 3-arg FlatMap. + [Test] + public void SelectManyWithResultSelectorMatchesFlatMap() + { + var flatMap = new List(); + var selectMany = new List(); + + Signal.FromEnumerable(_oneToThree).FlatMap(Fan, AddPair).Subscribe(flatMap.Add); + Signal.FromEnumerable(_oneToThree).SelectMany(Fan, AddPair).Subscribe(selectMany.Add); + + Assert.Equal(flatMap, selectMany); + Assert.True(selectMany.Count > 0); + } + /// Doubles a value. /// The source value. /// The doubled value. @@ -376,6 +539,66 @@ private static (List Values, Exception? Error) RunTimed(FuncPushes one value then an error through a stateful sink and reports whether both were forwarded. + /// The stateful operator under test. + /// when one value and the error were forwarded. + private static bool RunStatefulError(Func, IObservable> op) + { + var source = new Signal(); + var values = new List(); + Exception? error = null; + using var subscription = op(source).Subscribe(values.Add, captured => error = captured, () => { }); + source.OnNext(Two); + source.OnError(new InvalidOperationException(Boom)); + return values.Count == One && error is InvalidOperationException; + } + + /// Pushes a value through a sink whose projection throws and reports whether the error was forwarded. + /// The stateful operator under test. + /// when the thrown error was forwarded downstream. + private static bool RunStatefulThrow(Func, IObservable> op) + { + var source = new Signal(); + Exception? error = null; + using var subscription = op(source).Subscribe(static _ => { }, captured => error = captured, () => { }); + source.OnNext(One); + return error is InvalidOperationException; + } + + /// A stateful projection that always throws (drives the sink catch path). + /// The unused state. + /// The unused value. + /// Never returns; always throws. + private static int ThrowProjection(int state, int value) => throw new InvalidOperationException(Boom); + + /// A stateful predicate that always throws (drives the sink catch path). + /// The unused state. + /// The unused value. + /// Never returns; always throws. + private static bool ThrowPredicate(int state, int value) => throw new InvalidOperationException(Boom); + + /// Runs a sampling operator against a virtual clock with a fixed drive and collects the sampled values. + /// The sampling operator under test. + /// The sampled values. + private static List RunSampling(Func, ISequencer, IObservable> op) + { + var clock = new TestClock(DateTimeOffset.UnixEpoch); + var source = new Signal(); + var values = new List(); + using var subscription = op(source, clock).Subscribe(values.Add); + source.OnNext(One); + clock.AdvanceBy(TimeSpan.FromTicks(Two)); + source.OnNext(Three); + clock.AdvanceBy(TimeSpan.FromTicks(Two)); + return values; + } + + /// Combines a source value with an inner value (result selector for the 3-arg SelectMany/FlatMap). + /// The source value. + /// The inner value. + /// The combined value. + private static int AddPair(int source, int inner) => source + inner; + /// A unary parity case: a Primitives-named builder and its Rx-named twin over one source. /// The pair name. /// The Primitives-named builder. From 6bb1fc7fea065c256ab7af7526911d14b0f230b5 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 15:50:08 +1000 Subject: [PATCH 7/8] test: bring all new name-layer files to 100% coverage Cover the remaining branches: the int-range fast paths (Zip/CombineLatest/ WithLatestFrom/Switch/Delay over ranges), the default-sequencer time overloads, Retry's happy path, and the rest of the null/right-operand guards in RxNames; the stateful sinks' null-observer rejection, post-terminal drop guards (via a manual source), and current-thread-requirement propagation (constructed directly); and ResumeSignal's null-observer, scheduled subscription path, and current-thread flag. RxNames/MapWith/KeepWith/TapWith/ResumeSignal/SubscriptionSlots are now 100%. --- .../RxNameParityTests.cs | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs index 2429cb2..806f672 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs @@ -4,7 +4,9 @@ using ReactiveUI.Primitives.Concurrency; using ReactiveUI.Primitives.Core; +using ReactiveUI.Primitives.Disposables; using ReactiveUI.Primitives.Signals; +using ReactiveUI.Primitives.Signals.Core; namespace ReactiveUI.Primitives.Tests; @@ -290,6 +292,12 @@ public void RxNamesThrowOnNullSelector() Assert.Throws(() => source.Zip(source, null!)); Assert.Throws(() => source.CombineLatest(source, null!)); Assert.Throws(() => source.WithLatestFrom(source, null!)); + Assert.Throws(() => source.Zip((IObservable)null!, Add)); + Assert.Throws(() => source.CombineLatest((IObservable)null!, Add)); + Assert.Throws(() => source.WithLatestFrom((IObservable)null!, Add)); + Assert.Throws(() => source.Concat((IObservable)null!)); + Assert.Throws(() => source.SelectMany(null!, AddPair)); + Assert.Throws(() => source.SelectMany(Fan, null!)); } /// Verifies the count/interval guards throw . @@ -386,6 +394,88 @@ public void SelectManyWithResultSelectorMatchesFlatMap() Assert.True(selectMany.Count > 0); } + /// Verifies the int-range fast paths of the binary/higher-order names match their counterparts. + [Test] + public void RxNamesRangeFastPathsMatchCounterparts() + { + Assert.Equal( + Collect(Signal.Sequence(One, Three).Pair(Signal.Sequence(Ten, Three), Add)), + Collect(Signal.Sequence(One, Three).Zip(Signal.Sequence(Ten, Three), Add))); + Assert.Equal( + Collect(Signal.Sequence(One, Three).SyncLatest(Signal.Sequence(Ten, Three), Add)), + Collect(Signal.Sequence(One, Three).CombineLatest(Signal.Sequence(Ten, Three), Add))); + Assert.Equal( + Collect(Signal.Sequence(One, Three).Latch(Signal.Sequence(Ten, Three), Add)), + Collect(Signal.Sequence(One, Three).WithLatestFrom(Signal.Sequence(Ten, Three), Add))); + Assert.Equal( + Collect(RangeInners().SwitchTo()), + Collect(RangeInners().Switch())); + } + + /// Verifies Retry mirrors the source when no error occurs (covers the happy path). + [Test] + public void RetryMirrorsSourceWhenNoError() + { + Assert.Equal(_oneToThree, Collect(Signal.FromEnumerable(_oneToThree).Retry(Two))); + } + + /// Exercises the default-sequencer (no-scheduler) overloads of the time operators. + [Test] + public void TimeOperatorsAcceptDefaultSequencer() + { + Signal.Sequence(One, Three).Delay(TimeSpan.FromTicks(DueTicks)).Subscribe(static _ => { }).Dispose(); + Signal.FromEnumerable(_oneToThree).Timeout(TimeSpan.FromSeconds(AdvanceTicks)).Subscribe(static _ => { }).Dispose(); + Signal.FromEnumerable(_oneToThree).Sample(TimeSpan.FromTicks(DueTicks)).Subscribe(static _ => { }).Dispose(); + Assert.True(true); + } + + /// Verifies the stateful sinks drop notifications that arrive after a terminal notification. + [Test] + public void StatefulSinksDropNotificationsAfterTerminal() + { + Assert.True(RunStopGuards(s => s.SelectWith(Ten, AddState))); + Assert.True(RunStopGuards(s => s.WhereWith(Two, IsMultiple))); + Assert.True(RunStopGuards(s => s.DoWith(Ten, IgnoreState))); + } + + /// Verifies the stateful sinks reject a null observer. + [Test] + public void StatefulSinksThrowOnNullObserver() + { + var source = Signal.FromEnumerable(_oneToFive); + Assert.Throws(() => source.SelectWith(Ten, AddState).Subscribe((IObserver)null!)); + Assert.Throws(() => source.WhereWith(Two, IsMultiple).Subscribe((IObserver)null!)); + Assert.Throws(() => source.DoWith(Ten, IgnoreState).Subscribe((IObserver)null!)); + } + + /// Verifies the stateful sinks propagate the source's current-thread subscription requirement. + [Test] + public void StatefulSinksReportCurrentThreadRequirement() + { + Assert.True(new MapWithSignal(new CurrentThreadSource(), Ten, AddState).IsRequiredSubscribeOnCurrentThread()); + Assert.True(new KeepWithSignal(new CurrentThreadSource(), Two, IsMultiple).IsRequiredSubscribeOnCurrentThread()); + Assert.True(new TapWithSignal(new CurrentThreadSource(), Ten, IgnoreState).IsRequiredSubscribeOnCurrentThread()); + Assert.True(!new MapWithSignal(new ManualSource(), Ten, AddState).IsRequiredSubscribeOnCurrentThread()); + Assert.True(!new KeepWithSignal(new ManualSource(), Two, IsMultiple).IsRequiredSubscribeOnCurrentThread()); + Assert.True(!new TapWithSignal(new ManualSource(), Ten, IgnoreState).IsRequiredSubscribeOnCurrentThread()); + } + + /// Verifies Resume rejects a null observer. + [Test] + public void ResumeThrowsOnNullObserver() => + Assert.Throws(() => Signal.FromEnumerable(_oneToFive).Resume(Signal.FromEnumerable(_oneToThree)).Subscribe((IObserver)null!)); + + /// Verifies Resume takes the scheduled subscription path when a current-thread sequencer is already active. + [Test] + public void ResumeSchedulesWhenCurrentThreadSequencerActive() + { + var values = new List(); + Sequencer.CurrentThread.Schedule(() => + new Signal().Resume(Signal.FromEnumerable(_oneToThree)).Subscribe(values.Add)); + Assert.Equal(0, values.Count); + Assert.True(new ResumeSignal(Signal.FromEnumerable(_oneToThree), Signal.FromEnumerable(_oneToThree)).IsRequiredSubscribeOnCurrentThread()); + } + /// Doubles a value. /// The source value. /// The doubled value. @@ -599,6 +689,81 @@ private static List RunSampling(Func, ISequencer, IObserva /// The combined value. private static int AddPair(int source, int inner) => source + inner; + /// Subscribes to a source and collects its forwarded values. + /// The source sequence. + /// The forwarded values. + private static List Collect(IObservable source) + { + var values = new List(); + source.Subscribe(values.Add); + return values; + } + + /// Builds a source of two int-range inner sources (exercises the synchronous Switch range fast path). + /// An outer source of two range inners. + private static IObservable> RangeInners() => + Signal.FromEnumerable>([Signal.Sequence(One, Two), Signal.Sequence(Three, Two)]); + + /// + /// Drives a stateful sink through a value, a terminal completion, and then further notifications, reporting + /// whether the post-terminal notifications were dropped (exactly one completion, no leaked error). + /// + /// The stateful operator under test. + /// when notifications after the terminal were dropped. + private static bool RunStopGuards(Func, IObservable> op) + { + var source = new ManualSource(); + var completed = 0; + Exception? error = null; + using var subscription = op(source).Subscribe(static _ => { }, captured => error = captured, () => completed++); + source.Next(Two); + source.Complete(); + source.Next(Three); + source.Error(new InvalidOperationException(Boom)); + source.Complete(); + return completed == One && error is null; + } + + /// + /// An observable whose subscription retains its observer and ignores disposal, letting a test push raw + /// notifications (including ones after a terminal notification) to exercise a sink's terminal guards. + /// + /// The element type. + private sealed class ManualSource : IObservable + { + /// The observer retained from the most recent subscription. + private IObserver? _observer; + + /// + public IDisposable Subscribe(IObserver observer) + { + _observer = observer; + return Disposable.Empty; + } + + /// Pushes a value to the retained observer. + /// The value to push. + public void Next(T value) => _observer?.OnNext(value); + + /// Pushes an error to the retained observer. + /// The error to push. + public void Error(Exception exception) => _observer?.OnError(exception); + + /// Pushes completion to the retained observer. + public void Complete() => _observer?.OnCompleted(); + } + + /// A source that reports it requires current-thread subscription (drives the sink's propagation check). + /// The element type. + private sealed class CurrentThreadSource : IRequireCurrentThread + { + /// + public bool IsRequiredSubscribeOnCurrentThread() => true; + + /// + public IDisposable Subscribe(IObserver observer) => Disposable.Empty; + } + /// A unary parity case: a Primitives-named builder and its Rx-named twin over one source. /// The pair name. /// The Primitives-named builder. From 9070f23dc0e4760e14ce27e50cef707817245f8c Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 16:07:13 +1000 Subject: [PATCH 8/8] test: cover the deviant Chain(first, second) null guards Codecov flagged the binary Chain first/second null checks (the Rx Concat nulls were tested, the deviant Chain ones were not). Adds both, bringing the patch to full coverage. --- src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs index 806f672..f67dff5 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/RxNameParityTests.cs @@ -272,6 +272,8 @@ public void RxNamesThrowOnNullSource() Assert.Throws(() => default(IObservable>)!.Dematerialize()); Assert.Throws(() => default(IObservable)!.Resume(other)); Assert.Throws(() => other.Resume(null!)); + Assert.Throws(() => default(IObservable)!.Chain(other)); + Assert.Throws(() => other.Chain((IObservable)null!)); } /// Verifies the Rx names throw for a null projection/predicate.