-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtransform.ts
More file actions
46 lines (43 loc) · 1.26 KB
/
transform.ts
File metadata and controls
46 lines (43 loc) · 1.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* @fileoverview Streaming transform helper — `transform()` wraps
* `streaming-iterables.transform` with the project's `pRetry`
* per-item retry policy.
*/
import { transform as siTransform } from '../external/streaming-iterables'
import { normalizeIterationOptions } from '../promises/options'
import { pRetry } from '../promises/retry'
import type { IterationOptions } from '../promises/types'
/**
* Transform an iterable with a function.
*
* @example
* ```typescript
* const lines = ['hello', 'world']
* for await (const upper of transform(lines, async (line) => {
* return line.toUpperCase()
* })) {
* console.log(upper) // 'HELLO', 'WORLD'
* }
* ```
*/
/*@__NO_SIDE_EFFECTS__*/
export function transform<T, U>(
iterable: Iterable<T> | AsyncIterable<T>,
func: (item: T) => Promise<U>,
options?: number | IterationOptions,
): AsyncIterable<U> {
const opts = normalizeIterationOptions(options)
/* c8 ignore next - External streaming-iterables call */
const result = siTransform(
opts.concurrency,
async (item: T) => {
const result = await pRetry((...args: unknown[]) => func(args[0] as T), {
...opts.retries,
args: [item],
})
return result as U
},
iterable,
)
return result as AsyncIterable<U>
}