-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathParallelHelper.cs
More file actions
115 lines (99 loc) · 4.01 KB
/
ParallelHelper.cs
File metadata and controls
115 lines (99 loc) · 4.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
using System.Runtime.CompilerServices;
namespace DGEMMSharp;
internal static class ParallelHelper
{/// <summary>
/// Executes a specified action in an optimized parallel loop.
/// </summary>
/// <typeparam name="TAction">The type of action (implementing <see cref="IAction"/>) to invoke for each iteration index.</typeparam>
/// <param name="start">The starting iteration index.</param>
/// <param name="end">The final iteration index (exclusive).</param>
/// <param name="action">The <typeparamref name="TAction"/> instance representing the action to invoke.</param>
/// <param name="minimumActionsPerThread">
/// The minimum number of actions to run per individual thread. Set to 1 if all invocations
/// should be parallelized, or to a greater number if each individual invocation is fast
/// enough that it is more efficient to set a lower bound per each running thread.
/// </param>
internal static void For<TAction>(int start, int end,
in TAction action, int minimumActionsPerThread, int maxDegreeOfParallelism)
where TAction : struct, IAction
{
ArgumentOutOfRangeException.ThrowIfNegativeOrZero(minimumActionsPerThread,
nameof(minimumActionsPerThread));
ArgumentOutOfRangeException.ThrowIfGreaterThan(start, end,
nameof(start));
if (start == end)
{
return;
}
int count = Math.Abs(start - end);
int maxBatches = 1 + ((count - 1) / minimumActionsPerThread);
int cores = Environment.ProcessorCount;
int numBatches = Math.Min(maxBatches, cores);
// Skip the parallel invocation when a single batch is needed
if (numBatches == 1 || maxDegreeOfParallelism == 1)
{
for (int i = start; i < end; i++)
{
Unsafe.AsRef(in action).Invoke(i);
}
return;
}
int batchSize = 1 + ((count - 1) / numBatches);
ActionInvoker<TAction> actionInvoker = new(start, end, batchSize, action);
int degreeOfParallelism = Math.Min(numBatches, maxDegreeOfParallelism);
// Run the batched operations in parallel
_ = Parallel.For(
0,
numBatches,
new ParallelOptions { MaxDegreeOfParallelism = degreeOfParallelism },
actionInvoker.Invoke);
}
// Wrapping struct acting as explicit closure to execute the processing batches
private readonly struct ActionInvoker<TAction>
where TAction : struct, IAction
{
private readonly int start;
private readonly int end;
private readonly int batchSize;
private readonly TAction action;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ActionInvoker(
int start,
int end,
int batchSize,
in TAction action)
{
this.start = start;
this.end = end;
this.batchSize = batchSize;
this.action = action;
}
/// <summary>
/// Processes the batch of actions at a specified index
/// </summary>
/// <param name="i">The index of the batch to process</param>
public void Invoke(int i)
{
int offset = i * this.batchSize;
int low = this.start + offset;
int high = low + this.batchSize;
int stop = Math.Min(high, this.end);
for (int j = low; j < stop; j++)
{
Unsafe.AsRef(in this.action).Invoke(j);
}
}
}
}
/// <summary>
/// A contract for actions being executed with an input index.
/// </summary>
/// <remarks>If the <see cref="Invoke"/> method is small enough, it is highly recommended to mark it with <see cref="MethodImplOptions.AggressiveInlining"/>.</remarks>
public interface IAction
{
/// <summary>
/// Executes the action associated with a specific index.
/// </summary>
/// <param name="i">The current index for the action to execute.</param>
void Invoke(int i);
}