Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/sonarcloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
sonarOrganization: reactiveui
sonarExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/TestResults/**'
sonarCoverageExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/*Tests/**,**/*Tests.cs,**/Generated/**'
sonarCpdExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/CombineLatest?.cs,**/CombineLatest??.cs,**/ReactiveUI.Primitives.Blazor/**,**/ReactiveUI.Primitives.Maui/**,**/ReactiveUI.Primitives.WinUI/**,**/ReactiveUI.Primitives.WinForms/**,**/ReactiveUI.Primitives.Wpf/**'
sonarCpdExclusions: '**/tests/**,**/tools/**,**/benchmarks/**,**/CombineLatest?.cs,**/CombineLatest??.cs,**/ReactiveUI.Primitives.Blazor/**,**/ReactiveUI.Primitives.Maui/**,**/ReactiveUI.Primitives.WinUI/**,**/ReactiveUI.Primitives.WinForms/**,**/ReactiveUI.Primitives.Wpf/**,**/SignalOperatorParityMixins.RxNames.cs'
sonarTestExclusions: '**/tests/**,**/tools/**,**/benchmarks/**'
testTimeout: '15m'
secrets:
Expand Down
36 changes: 35 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,41 @@ IObservable<string> source = Signal.CreateSafe<string>(observer =>

## Operators

Operators are extension methods over `IObservable<T>`. ReactiveUI.Primitives uses a distinct vocabulary for operators that would otherwise collide with System.Reactive or R3.
Operators are extension methods over `IObservable<T>`. ReactiveUI.Primitives has a distinct vocabulary (`Map`, `Keep`, `Fold`, `Blend`, `SwitchTo`, …) that avoids ambiguous-call collisions with System.Reactive or R3 — but the familiar System.Reactive / LINQ names are also available (see below), so you can write whichever reads best.

### System.Reactive / LINQ name layer

The everyday System.Reactive / LINQ names are first-class operators that build the **same sink** as their Primitives-named counterpart — identical behavior and allocation profile, not wrappers. Both name sets are fully supported and interchangeable; pick whichever reads best for your code.

| LINQ / System.Reactive name | Primitives name | | LINQ / System.Reactive name | Primitives name |
|---|---|---|---|---|
| `Select` | `Map` | | `Merge` | `Blend` |
| `SelectWith` | `MapWith` | | `Concat` | `Chain` |
| `Where` | `Keep` | | `Amb` | `Race` |
| `WhereWith` | `KeepWith` | | `Switch` | `SwitchTo` |
| `WhereNotNull` | `KeepNotNull` | | `Zip` | `Pair` |
| `Do` | `Tap` | | `CombineLatest` | `SyncLatest` |
| `DoWith` | `TapWith` | | `WithLatestFrom` | `Latch` |
| `Scan` | `Fold` | | `SelectMany` | `FlatMap` |
| `Aggregate` | `Reduce` | | `Delay` | `Shift` |
| `DistinctUntilChanged` | `Unique` | | `Timeout` | `Expire` |
| `DistinctUntilChangedBy` | `UniqueBy` | | `Sample` | `Probe` |
| `IgnoreElements` | `IgnoreValues` | | `Retry` | `Reattempt` |
| `Materialize` | `Spark` | | `Dematerialize` | `Unspark` |

```csharp
using ReactiveUI.Primitives;
using ReactiveUI.Primitives.Signals;

// Reads exactly like System.Reactive — and builds the identical sinks as Map/Keep/Fold.
using var subscription = Signal.Sequence(1, 10)
.Where(value => value % 2 == 0)
.Select(value => value * value)
.Scan(0, (total, value) => total + value)
.Subscribe(Console.WriteLine);
```

> Caveat: because these names live in the `ReactiveUI.Primitives` namespace, a file that *also* imports `System.Reactive.Linq` will get ambiguous-call errors on shared names like `.Select`/`.Where`. Use the Primitives names (`Map`/`Keep`) in those mixed files, or migrate the file fully off System.Reactive.

### Transformation and filtering

Expand Down
45 changes: 39 additions & 6 deletions src/ReactiveUI.Primitives/SignalOperatorMixins.Coordinators.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,28 @@ public IDisposable Subscribe(IObserver<TResult> observer)
/// <typeparam name="T">The value type.</typeparam>
private sealed class ChainSignal<T> : IObservable<T>
{
/// <summary>The outer sequence of inner sources.</summary>
private readonly IObservable<IObservable<T>> _sources;
/// <summary>The outer sequence of inner sources, when constructed from a source-of-sources.</summary>
private readonly IObservable<IObservable<T>>? _sources;

/// <summary>The first inner source, when constructed from two sources.</summary>
private readonly IObservable<T>? _first;

/// <summary>Initializes a new instance of the <see cref="ChainSignal{T}"/> class.</summary>
/// <summary>The second inner source, when constructed from two sources.</summary>
private readonly IObservable<T>? _second;

/// <summary>Initializes a new instance of the <see cref="ChainSignal{T}"/> class from a source-of-sources.</summary>
/// <param name="sources">The outer sequence of inner sources.</param>
internal ChainSignal(IObservable<IObservable<T>> sources) => _sources = sources;

/// <summary>Initializes a new instance of the <see cref="ChainSignal{T}"/> class from two sources.</summary>
/// <param name="first">The first source.</param>
/// <param name="second">The second source.</param>
internal ChainSignal(IObservable<T> first, IObservable<T> second)
{
_first = first;
_second = second;
}

/// <inheritdoc/>
public IDisposable Subscribe(IObserver<T> observer)
{
Expand All @@ -202,7 +217,8 @@ public IDisposable Subscribe(IObserver<T> observer)
throw new ArgumentNullException(nameof(observer));
}

return new ChainCoordinator<T>(observer).Run(_sources);
var coordinator = new ChainCoordinator<T>(observer);
return _sources is not null ? coordinator.Run(_sources) : coordinator.Run(_first!, _second!);
}
}

Expand Down Expand Up @@ -244,6 +260,23 @@ internal ChainCoordinator<T> Run(IObservable<IObservable<T>> sources)
return this;
}

/// <summary>Subscribes the two fixed inner sources in order.</summary>
/// <param name="first">The first source.</param>
/// <param name="second">The second source.</param>
/// <returns>The coordinator that owns the subscription cleanup.</returns>
internal ChainCoordinator<T> Run(IObservable<T> first, IObservable<T> second)
{
lock (_gate)
{
_queue.Enqueue(first);
_queue.Enqueue(second);
_outerCompleted = true;
}

Drain();
return this;
}

/// <summary>Queues a new inner source and pumps the drain.</summary>
/// <param name="source">The inner source.</param>
private void OnSource(IObservable<T> source)
Expand Down Expand Up @@ -757,7 +790,7 @@ public void OnNext(T value)
/// <returns>The coordinator that owns the subscription cleanup.</returns>
internal ExpireCoordinator<T> Run()
{
_timer = _sequencer.Schedule(this, _dueTime, static (_, coordinator) => coordinator.Timeout());
_timer = _sequencer.Schedule(this, _dueTime, static (_, coordinator) => coordinator.EmitTimeout());
_subscription = _source.Subscribe(this);
if (Volatile.Read(ref _done) == 0)
{
Expand All @@ -772,7 +805,7 @@ internal ExpireCoordinator<T> Run()
/// Emits the timeout error.
/// </summary>
/// <returns>An empty disposable.</returns>
private IDisposable Timeout()
private IDisposable EmitTimeout()
{
if (Interlocked.Exchange(ref _done, 1) != 0)
{
Expand Down
56 changes: 45 additions & 11 deletions src/ReactiveUI.Primitives/SignalOperatorMixins.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,17 @@ public static IObservable<TResult> Map<TSource, TResult>(this IObservable<TSourc
/// <exception cref="ArgumentNullException"><paramref name="selector"/> is <see langword="null"/>.</exception>
public static IObservable<TResult> MapWith<TSource, TState, TResult>(this IObservable<TSource> source, TState state, Func<TState, TSource, TResult> selector)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (selector == null)
{
throw new ArgumentNullException(nameof(selector));
}

return source.Map(value => selector(state, value));
return new MapWithSignal<TSource, TState, TResult>(source, state, selector);
}

/// <summary>
Expand Down Expand Up @@ -100,12 +105,17 @@ public static IObservable<T> Keep<T>(this IObservable<T> source, Func<T, bool> p
/// <exception cref="ArgumentNullException"><paramref name="predicate"/> is <see langword="null"/>.</exception>
public static IObservable<T> KeepWith<T, TState>(this IObservable<T> source, TState state, Func<TState, T, bool> predicate)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (predicate == null)
{
throw new ArgumentNullException(nameof(predicate));
}

return source.Keep(value => predicate(state, value));
return new KeepWithSignal<T, TState>(source, state, predicate);
}

/// <summary>
Expand Down Expand Up @@ -178,16 +188,17 @@ public static IObservable<TResult> CastTo<TResult>(this IObservable<object?> sou
/// <exception cref="ArgumentNullException"><paramref name="onNext"/> is <see langword="null"/>.</exception>
public static IObservable<T> Tap<T>(this IObservable<T> source, Action<T> onNext)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}

return source.Map(value =>
{
onNext(value);
return value;
});
return new TapSignal<T>(source, onNext, static _ => { }, static () => { });
}

/// <summary>
Expand All @@ -202,12 +213,17 @@ public static IObservable<T> Tap<T>(this IObservable<T> source, Action<T> onNext
/// <exception cref="ArgumentNullException"><paramref name="onNext"/> is <see langword="null"/>.</exception>
public static IObservable<T> TapWith<T, TState>(this IObservable<T> source, TState state, Action<TState, T> onNext)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}

return source.Tap(value => onNext(state, value));
return new TapWithSignal<T, TState>(source, state, onNext);
}

/// <summary>
Expand Down Expand Up @@ -428,8 +444,21 @@ public static IObservable<T> Chain<T>(this IObservable<IObservable<T>> sources)
/// <param name="first">The first sequence.</param>
/// <param name="second">The second sequence.</param>
/// <returns>A sequence that emits <paramref name="second"/> after <paramref name="first"/> completes.</returns>
public static IObservable<T> Chain<T>(this IObservable<T> first, IObservable<T> second) =>
Signal.Chain(first, second);
/// <exception cref="ArgumentNullException"><paramref name="first"/> or <paramref name="second"/> is <see langword="null"/>.</exception>
public static IObservable<T> Chain<T>(this IObservable<T> first, IObservable<T> second)
{
if (first == null)
{
throw new ArgumentNullException(nameof(first));
}

if (second == null)
{
throw new ArgumentNullException(nameof(second));
}

return new ChainSignal<T>(first, second);
}

/// <summary>
/// Subscribes to all inner sequences and forwards their values as they arrive.
Expand Down Expand Up @@ -652,12 +681,17 @@ public static IObservable<T> Rescue<T>(this IObservable<T> source, Func<Exceptio
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="fallback"/> is <see langword="null"/>.</exception>
public static IObservable<T> Resume<T>(this IObservable<T> source, IObservable<T> fallback)
{
if (source == null)
{
throw new ArgumentNullException(nameof(source));
}

if (fallback == null)
{
throw new ArgumentNullException(nameof(fallback));
}

return source.Recover<T, Exception>(_ => fallback);
return new ResumeSignal<T>(source, fallback);
}

/// <summary>
Expand Down
Loading
Loading