-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel.ts
More file actions
69 lines (65 loc) · 1.86 KB
/
parallel.ts
File metadata and controls
69 lines (65 loc) · 1.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
* @fileoverview Parallel iteration helpers — `parallelMap()` and the
* fire-and-forget `parallelEach()`. Built on top of
* `streaming-iterables.parallelMap` with the project's `pRetry`
* wrapper applied per item.
*/
import { parallelMap as siParallelMap } from '../external/streaming-iterables'
import { normalizeIterationOptions } from '../promises/options'
import { pRetry } from '../promises/retry'
import type { IterationOptions } from '../promises/types'
/**
* Execute a function for each item in an iterable in parallel.
*
* @example
* ```typescript
* const urls = ['https://a.io', 'https://b.io']
* await parallelEach(urls, async (url) => {
* await fetch(url)
* }, { concurrency: 4 })
* ```
*/
/*@__NO_SIDE_EFFECTS__*/
export async function parallelEach<T>(
iterable: Iterable<T> | AsyncIterable<T>,
func: (item: T) => Promise<unknown>,
options?: number | IterationOptions,
): Promise<void> {
for await (const _ of parallelMap(iterable, func, options)) {
/* empty block */
}
}
/**
* Map over an iterable in parallel with concurrency control.
*
* @example
* ```typescript
* const ids = [1, 2, 3]
* for await (const result of parallelMap(ids, async (id) => {
* return await fetchData(id)
* }, 4)) {
* console.log(result)
* }
* ```
*/
/*@__NO_SIDE_EFFECTS__*/
export function parallelMap<T, U>(
iterable: Iterable<T> | AsyncIterable<T>,
func: (item: T) => Promise<U>,
options?: number | IterationOptions,
): AsyncIterable<U> {
const opts = normalizeIterationOptions(options)
/* c8 ignore next - External streaming-iterables call */
const result = siParallelMap(
opts.concurrency,
async (item: T) => {
const result = await pRetry((...args: unknown[]) => func(args[0] as T), {
...opts.retries,
args: [item],
})
return result as U
},
iterable,
)
return result as AsyncIterable<U>
}