Skip to content

Commit c6ab658

Browse files
committed
Adds async iterators
1 parent ce21504 commit c6ab658

File tree

2 files changed

+203
-0
lines changed

2 files changed

+203
-0
lines changed

source/PlainBytes.System.Extensions/Collections/Iterators.cs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
using System.Collections;
33
using System.Collections.Generic;
44
using System.Runtime.CompilerServices;
5+
using System.Threading;
6+
using System.Threading.Tasks;
57

68
namespace PlainBytes.System.Extensions.Collections
79
{
@@ -120,5 +122,71 @@ public static IEnumerable<T> Append<T>(this IEnumerable<T> collection, IEnumerab
120122
yield return item;
121123
}
122124
}
125+
126+
/// <summary>
127+
/// Asynchronously enumerates the elements of the source sequence and invokes the specified asynchronous action
128+
/// for each element.
129+
/// </summary>
130+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
131+
/// <param name="source">The asynchronous sequence whose elements are to be processed.</param>
132+
/// <param name="action">An asynchronous delegate to invoke for each element in the source sequence.</param>
133+
/// <param name="token">A cancellation token that can be used to cancel the operation.</param>
134+
/// <returns>A task that represents the asynchronous operation. The task completes when all elements have been processed
135+
/// or the operation is canceled.</returns>
136+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
137+
public static async Task ForEachAsync<T>(this IAsyncEnumerable<T> source, Func<T, CancellationToken, ValueTask> action, CancellationToken token = default)
138+
{
139+
await foreach (var item in source.WithCancellation(token))
140+
{
141+
token.ThrowIfCancellationRequested();
142+
143+
await action(item, token).ConfigureAwait(false);
144+
}
145+
}
146+
147+
148+
/// <summary>
149+
/// Iterates through the asynchronous sequence while processing the elements.
150+
/// </summary>
151+
/// <param name="source">The source asynchronous sequence to process.</param>
152+
/// <param name="selector">Transformer that processes the iterated items.</param>
153+
/// <param name="token">A cancellation token that can be used to cancel the asynchronous iteration.</param>
154+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
155+
/// <typeparam name="TR">Type of result item.</typeparam>
156+
/// <returns></returns>
157+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
158+
public static async IAsyncEnumerable<TR> SelectAsync<T, TR>(this IAsyncEnumerable<T> source, Func<T, TR> selector, [EnumeratorCancellation] CancellationToken token = default)
159+
{
160+
await foreach (var item in source.WithCancellation(token))
161+
{
162+
token.ThrowIfCancellationRequested();
163+
164+
yield return selector(item);
165+
}
166+
}
167+
168+
/// <summary>
169+
/// Filters the elements of an asynchronous sequence based on a specified predicate.
170+
/// </summary>
171+
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
172+
/// <param name="source">The source asynchronous sequence to filter.</param>
173+
/// <param name="predicate">A function to test each element for a condition. The element is included in the result if the function
174+
/// returns <see langword="true"/>.</param>
175+
/// <param name="token">A cancellation token that can be used to cancel the asynchronous iteration.</param>
176+
/// <returns>An asynchronous sequence that contains elements from the source sequence that satisfy the condition
177+
/// specified by the predicate.</returns>
178+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
179+
public static async IAsyncEnumerable<T> WhereAsync<T>(this IAsyncEnumerable<T> source, Func<T, bool> predicate, [EnumeratorCancellation] CancellationToken token = default)
180+
{
181+
await foreach (var item in source.WithCancellation(token))
182+
{
183+
token.ThrowIfCancellationRequested();
184+
185+
if (predicate(item))
186+
{
187+
yield return item;
188+
}
189+
}
190+
}
123191
}
124192
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using PlainBytes.System.Extensions.Collections;
7+
using Xunit;
8+
9+
namespace PlainBytes.System.Extensions.Tests.Collections
10+
{
11+
public class AsyncIteratorsTests
12+
{
13+
private static async IAsyncEnumerable<T> ToAsync<T>(IEnumerable<T> source)
14+
{
15+
foreach (var item in source)
16+
{
17+
yield return item;
18+
await Task.Yield(); // ensure asynchronous execution path
19+
}
20+
}
21+
22+
[Fact]
23+
public async Task ForEachAsync_GivenNullSource_ShouldThrow()
24+
{
25+
// Arrange
26+
IAsyncEnumerable<int> source = null;
27+
28+
// Act / Assert
29+
await Assert.ThrowsAsync<NullReferenceException>(async () => await source.ForEachAsync((i, ct) => ValueTask.CompletedTask));
30+
}
31+
32+
[Fact]
33+
public async Task ForEachAsync_GivenSource_ActionIsExecutedForEachElement()
34+
{
35+
// Arrange
36+
var source = Enumerable.Range(0, 10).ToArray();
37+
var results = new List<int>();
38+
39+
// Act
40+
await ToAsync(source).ForEachAsync((item, ct) =>
41+
{
42+
results.Add(item);
43+
return ValueTask.CompletedTask;
44+
});
45+
46+
// Assert
47+
Assert.Equal(source, results);
48+
}
49+
50+
[Fact]
51+
public async Task ForEachAsync_GivenCancellation_ShouldThrowOperationCanceled()
52+
{
53+
// Arrange
54+
var cts = new CancellationTokenSource();
55+
var source = Enumerable.Range(0, 20);
56+
57+
ValueTask Action(int item, CancellationToken token)
58+
{
59+
if (item == 3)
60+
{
61+
cts.Cancel();
62+
}
63+
return ValueTask.CompletedTask;
64+
}
65+
66+
// Act / Assert
67+
await Assert.ThrowsAsync<OperationCanceledException>(async () => await ToAsync(source).ForEachAsync(Action, cts.Token));
68+
}
69+
70+
[Fact]
71+
public async Task SelectAsync_GivenNullSource_ShouldThrow()
72+
{
73+
// Arrange
74+
IAsyncEnumerable<int> source = null;
75+
76+
// Act / Assert
77+
await Assert.ThrowsAsync<NullReferenceException>(async () =>
78+
{
79+
await foreach (var _ in source.SelectAsync(i => i)) { }
80+
});
81+
}
82+
83+
[Fact]
84+
public async Task SelectAsync_GivenSelector_ProjectsAllElements()
85+
{
86+
// Arrange
87+
var source = Enumerable.Range(0, 15).ToArray();
88+
var results = new List<string>();
89+
90+
// Act
91+
await foreach (var item in ToAsync(source).SelectAsync(i => $"#{i}"))
92+
{
93+
results.Add(item);
94+
}
95+
96+
// Assert
97+
Assert.Equal(source.Length, results.Count);
98+
for (int i = 0; i < source.Length; i++)
99+
{
100+
Assert.Equal($"#{source[i]}", results[i]);
101+
}
102+
}
103+
104+
[Fact]
105+
public async Task WhereAsync_GivenNullSource_ShouldThrow()
106+
{
107+
// Arrange
108+
IAsyncEnumerable<int> source = null;
109+
110+
// Act / Assert
111+
await Assert.ThrowsAsync<NullReferenceException>(async () =>
112+
{
113+
await foreach (var _ in source.WhereAsync(i => true)) { }
114+
});
115+
}
116+
117+
[Fact]
118+
public async Task WhereAsync_GivenPredicate_FiltersElements()
119+
{
120+
// Arrange
121+
var source = Enumerable.Range(0, 30).ToArray();
122+
var result = new List<int>();
123+
124+
// Act
125+
await foreach (var item in ToAsync(source).WhereAsync(i => i % 2 == 0))
126+
{
127+
result.Add(item);
128+
}
129+
130+
// Assert
131+
Assert.All(result, x => Assert.True(x % 2 == 0));
132+
Assert.Equal(source.Where(i => i % 2 == 0), result);
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)