diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..1eaf5ee --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,74 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Build, test, lint + +The project is pure ESM JavaScript with **types written in JSDoc** (no TypeScript source files). `.d.ts` declarations are emitted from JSDoc by `tsc -p declaration.tsconfig.json` at publish time only — do not commit them. + +- `npm test` — full check chain (lint, tsc, knip, type-coverage, installed-check) followed by mocha + c8 coverage. This is what the pre-push husky hook runs; if it fails, fix the cause rather than `--no-verify`. +- `npm run check` — only the static checks (lint + tsc + knip + type-coverage + installed-check), no tests. +- `npx mocha test/.spec.js` — run a single spec file. +- `npx mocha test/.spec.js -g ""` — filter to specific `it()` blocks within a file. +- `npm run build` — clean and emit `.d.ts` declarations. + +Type-coverage is enforced at **≥95% strict** (excluding `test/*.spec.js`). Lint is `@voxpelli/eslint-config` (neostandard). Knip's "unused devDependency" findings are treated as errors by `npm test`. + +Commits must follow Conventional Commits (validated by the `commit-msg` husky hook via `validate-conventional-commit`); `release-please` cuts releases automatically from `main`, so `feat:` bumps minor and `fix:` bumps patch. + +Engines: Node ≥22.0.0 (the well-known `Symbol.asyncDispose` is required natively). The CI matrix in `.github/workflows/nodejs.yml` should match. + +## Architecture + +The library is one core function (`bufferedAsyncMap`) plus a thin wrapper (`mergeIterables`). Everything lives in `index.js`; `lib/` contains three small helpers worth knowing about. + +### `bufferedAsyncMap(input, callback, options)` — the state machine + +The function returns a stateful `AsyncIterableIterator` with these closure variables forming the state machine: + +- **`bufferedPromises[]`** — in-flight promises (size capped at `bufferSize`). Each is the `callback(item, {signal})` result wrapped to never reject (errors are caught into `{err}` envelopes). +- **`subIterators[]`** — stack of nested iterators spawned when `callback` returns an `AsyncIterable` (async-generator callbacks). +- **`promisesToSourceIteratorMap`** — WeakMap tracking which iterator produced each buffer slot; consulted by `findLeastTargeted` (`lib/find-least-targeted.js`) for load-balancing. +- **`internalAC`** — an `AbortController` minted per call. Its signal is **always** the second arg to `callback`, regardless of whether the consumer passed `options.signal`. It fires from `markAsEnded()` on iterator close, from `options.signal` aborting (linked via `addEventListener('abort', …)`), and from the first error in `errors: 'fail-fast'` mode. This is what lets in-flight callbacks fast-path on shutdown. +- **`abortReason: { reason, delivered: boolean } | undefined`** — drives the "reject the next `.next()` once with `signal.reason`, then `done:true` forever" contract. Set by external abort, pre-aborted signal, or first fail-fast error. +- **`capturedErrors[]`** — accumulates errors in `'fail-eventually'` mode; on drain, throws the single error directly (identity-preserved) or wraps in `AggregateError` for ≥2. +- **`isDone`** — set once by `markAsEnded()` to make all close paths idempotent. + +### Two pull/dispatch loops + +`fillQueue()` is the **producer**: pulls from source up to `bufferSize`, dispatches via `callback(item, {signal})`, pushes the wrapped promise into `bufferedPromises`. In `ordered: true` mode it always feeds from `subIterators[0]`; in `ordered: false` it picks the least-targeted iterator via `findLeastTargeted` to prevent starvation. + +`nextValue()` is the **consumer**: races `bufferedPromises[0]` (ordered) or `Promise.race(bufferedPromises)` (unordered) against an abort sentinel from `internalAC.signal`. Abort always wins over a buffered value resolving in the same tick — the post-race code re-checks `abortReason` regardless of which promise won the race. + +`markAsEnded()` is the **single cleanup path**: sets `isDone`, fires `internalAC.abort()`, calls `Promise.allSettled(...iterators.map(it => it.return()))`, clears buffers. Called from `return()`, `throw()`, `Symbol.asyncDispose`, source-exhaustion, and abort delivery. Idempotent via the `isDone` guard. + +### Iterator chaining via `currentStep` + +`next()` chains each call's promise via `currentStep.then(nextValue, nextValue)` (both fulfilled and rejected handlers are `nextValue`) so that one rejection doesn't poison every subsequent call — the next call still re-enters `nextValue`, which then observes the post-rejection state machine (most often returning `{done:true}`). + +### Lib helpers (reuse these, don't reimplement) + +- `lib/find-least-targeted.js` — load-balancing: given a list of iterators and the current buffer, picks the iterator with fewest in-flight slots. +- `lib/misc.js` — `makeIterableAsync(input)` (sync iterable → async iterable) and `arrayDeleteInPlace(list, value)` (in-place splice by value). +- `lib/type-checks.js` — `isAsyncIterable`, `isIterable`, `isPartOfArray` guards. + +### Public-API contracts worth preserving + +- Callback receives `(item, { signal })` where `signal` is **always present** (the internal one) even when no `options.signal` is provided. +- Aborts cancel **consumption**, not in-flight callback work. Promises cannot be cancelled — the library propagates the signal so user code can voluntarily exit; it does not race-and-discard. The README documents this explicitly. +- `errors: 'fail-eventually'` (default) keeps the historical "drain then throw" semantics; `'fail-fast'` mirrors `Promise.all`. External abort always wins over queued/captured errors. +- Existing one-arg callbacks (`async (item) => …`) keep working — JS ignores extra args, so the second-arg widening is non-breaking. + +## Test conventions + +Mocha + chai + sinon. Tests use `sinon.useFakeTimers()` plus `clock.runAllAsync()` / `clock.tickAsync(ms)` for deterministic timing. The standard pattern for an async flow that needs the clock to advance is: + +```js +const flow = (async () => { for await (...) { ... } })(); +await clock.runAllAsync(); +await flow; +``` + +Inline `for await` blocks **without** the IIFE wrapper will deadlock under fake timers when the source uses real `setTimeout`. Test helpers in `test/utils.js` (`yieldValuesOverTime`, `nestedYieldValuesOverTime`, `promisableTimeout`) are the source of truth — reuse them. + +For testing rejections, prefer the `.catch(err => ({ rejectedWith: err }))` envelope pattern (used across `test/abort.spec.js` and `test/errors-fail-fast.spec.js`) over chai-as-promised's `should.be.rejectedWith` when asserting identity-equality on non-Error reasons. diff --git a/README.md b/README.md index b9d29e7..4539667 100644 --- a/README.md +++ b/README.md @@ -75,17 +75,95 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie #### Syntax -`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator` +`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false, signal, errors='fail-eventually' }]) => AsyncIterableIterator` #### Arguments * `input` – either an async iterable, an ordinare iterable or an array -* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer) +* `callback(item, { signal })` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer). The second argument is an `{ signal: AbortSignal }` that aborts on cancellation — see [Cancellation](#cancellation). #### Options * `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. -* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered +* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered. +* `signal` – _optional_ – an `AbortSignal`. When aborted, iteration stops pulling from the source, the next pending or freshly-called `iterator.next()` rejects with `signal.reason` exactly once, and all subsequent calls return `{ done: true, value: undefined }`. See [Cancellation](#cancellation). +* `errors` – _optional_ – defaults to `'fail-eventually'`. Controls how errors from the callback or the source surface to the consumer. See [Errors](#errors). + +The returned iterator also implements `Symbol.asyncDispose`, so it can be used with `await using` for deterministic cleanup. See [Resource management](#resource-management). + +## Cancellation + +Pass an `AbortSignal` and abort it whenever you want to stop iteration: + +```javascript +import { bufferedAsyncMap } from 'buffered-async-iterable'; + +const ac = new AbortController(); +setTimeout(() => ac.abort(new Error('took too long')), 5000); + +try { + for await (const item of bufferedAsyncMap(source, async (item) => { + return await fetchItem(item); + }, { signal: ac.signal })) { + console.log(item); + } +} catch (err) { + // err === ac.signal.reason +} +``` + +Aborting cancels *consumption* of the source. In-flight callbacks continue running until they settle. To cancel network/IO inside your callback, forward the per-callback `signal` (the second argument) into `fetch`/`undici`/etc: + +```javascript +bufferedAsyncMap(source, async (item, { signal }) => { + const res = await fetch(`/items/${item}`, { signal }); + return res.json(); +}, { signal: ac.signal }); +``` + +The per-callback `signal` is always present (even when no `options.signal` is passed) and aborts on iterator close (return / throw / dispose / source-exhaustion-with-cleanup), so callbacks can fast-path on shutdown. + +If `options.signal` is already aborted at construction time, the source is never read and the first `iterator.next()` rejects with `signal.reason`. External abort always wins over queued errors. + +## Errors + +There are two error modes: + +### `'fail-eventually'` (default) + +Iteration continues after errors. Captured errors are thrown when the iterator drains: + +* If exactly one error was captured, it is thrown directly (identity preserved). +* If two or more errors were captured, they are wrapped in an [`AggregateError`](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/AggregateError) (in capture order). + +In-flight callbacks may still complete in the background after an error is captured. Wrap your callback in `try/catch` if you need per-item isolation. + +### `'fail-fast'` + +Mirrors `Promise.all` semantics: the first error from the callback or the source short-circuits iteration. The next `iterator.next()` rejects with the original error (no `AggregateError` wrapping); subsequent calls return `{ done: true }`. The source's `.next()` is not called again, the source's `.return()` is called once, and in-flight callbacks observe `signal.aborted === true` on their per-callback signal within one microtask. + +```javascript +for await (const item of bufferedAsyncMap(source, fn, { errors: 'fail-fast' })) { + // first thrown error halts iteration immediately +} +``` + +External abort always takes precedence over either error mode: if `options.signal` aborts while errors are queued, the consumer sees `signal.reason`, not the captured errors. + +## Resource management + +The returned iterator implements `Symbol.asyncDispose`, so it can be used with [`await using`](https://github.com/tc39/proposal-explicit-resource-management) for deterministic cleanup: + +```javascript +{ + await using iterator = bufferedAsyncMap(source, fn); + for await (const item of iterator) { + if (shouldStop(item)) break; + } +} // source.return() runs here, regardless of how the block exited +``` + +`Symbol.asyncDispose` is aliased to `iterator.return()` and is idempotent. Native `await using` requires Node 22+ (or a transpiler). ### mergeIterables() diff --git a/index.js b/index.js index 6f1a2fd..a27a2a5 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,5 @@ /* eslint-disable promise/prefer-await-to-then */ -// TODO: Get inspired by Matteos https://github.com/mcollina/hwp/blob/main/index.js, eg AbortController is nice? // TODO: Check docs here https://tc39.es/ecma262/#sec-operations-on-iterator-objects // TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose // TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values @@ -33,15 +32,17 @@ export async function * mergeIterables (input, { bufferSize } = {}) { * @template T * @template R * @param {AsyncIterable | Iterable | T[]} input - * @param {(item: T) => (Promise|AsyncIterable)} callback - * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] - * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} + * @param {(item: T, opts: { signal: AbortSignal }) => (Promise|AsyncIterable)} callback + * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined, signal?: AbortSignal|undefined, errors?: 'fail-eventually'|'fail-fast'|undefined }} [options] + * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ export function bufferedAsyncMap (input, callback, options) { /** @typedef {Promise> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */ const { bufferSize = 6, + errors: errorsMode = 'fail-eventually', ordered = false, + signal: externalSignal, } = options || {}; /** @type {AsyncIterable} */ @@ -53,6 +54,8 @@ export function bufferedAsyncMap (input, callback, options) { if (!isAsyncIterable(asyncIterable)) throw new TypeError('Expected asyncIterable to have a Symbol.asyncIterator function'); if (typeof callback !== 'function') throw new TypeError('Expected callback to be a function'); if (typeof bufferSize !== 'number') throw new TypeError('Expected bufferSize to be a number'); + if (externalSignal !== undefined && !(externalSignal instanceof AbortSignal)) throw new TypeError('Expected signal to be an AbortSignal'); + if (errorsMode !== 'fail-eventually' && errorsMode !== 'fail-fast') throw new TypeError("Expected errors to be 'fail-eventually' or 'fail-fast'"); /** @type {AsyncIterator} */ const asyncIterator = asyncIterable[Symbol.asyncIterator](); @@ -72,8 +75,32 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {boolean} */ let isDone; - /** @type {Error|undefined} */ - let hasError; + /** @type {Error[]} */ + const capturedErrors = []; + + // Internal controller; aborts on iterator close (return/throw/dispose/source-exhaustion-with-cleanup) so callbacks can fast-path on shutdown. + const internalAC = new AbortController(); + + /** @type {{ reason: unknown, delivered: boolean } | undefined} */ + let abortReason; + + if (externalSignal) { + if (externalSignal.aborted) { + abortReason = { reason: externalSignal.reason, delivered: false }; + internalAC.abort(externalSignal.reason); + } else { + externalSignal.addEventListener('abort', () => { + // If the iterator already closed via return()/throw()/dispose, abort is too late: no-op. + if (isDone) return; + if (!abortReason) { + abortReason = { reason: externalSignal.reason, delivered: false }; + } + if (!internalAC.signal.aborted) { + internalAC.abort(externalSignal.reason); + } + }, { once: true }); + } + } /** * @param {boolean} [throwAnyError] @@ -83,6 +110,10 @@ export function bufferedAsyncMap (input, callback, options) { if (!isDone) { isDone = true; + if (!internalAC.signal.aborted) { + internalAC.abort(); + } + // TODO: Errors from here, how to handle? allSettled() ensures they will be caught at least await Promise.allSettled( [ @@ -93,12 +124,13 @@ export function bufferedAsyncMap (input, callback, options) { .map(item => item.return && item.return()) ); - // TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10 bufferedPromises.splice(0); subIterators.splice(0); - if (throwAnyError && hasError) { - throw hasError; + if (throwAnyError && capturedErrors.length > 0) { + throw capturedErrors.length === 1 + ? capturedErrors[0] + : new AggregateError(capturedErrors, 'Multiple errors in bufferedAsyncMap'); } } @@ -106,7 +138,7 @@ export function bufferedAsyncMap (input, callback, options) { }; const fillQueue = () => { - if (hasError || isDone) return; + if (capturedErrors.length > 0 || isDone || abortReason) return; /** @type {AsyncIterator|undefined} */ let currentSubIterator; @@ -171,7 +203,7 @@ export function bufferedAsyncMap (input, callback, options) { } // eslint-disable-next-line promise/no-callback-in-promise - const callbackResult = callback(result.value); + const callbackResult = callback(result.value, { signal: internalAC.signal }); const isSubIterator = isAsyncIterable(callbackResult); /** @type {Awaited} */ @@ -216,15 +248,61 @@ export function bufferedAsyncMap (input, callback, options) { } }; + // Sentinel returned from raceAbort() so a pending nextValue() wakes up when abort fires + // even with no buffered promise to resolve it. + const ABORT_SENTINEL = Symbol('abort'); + + /** @returns {Promise} */ + const raceAbort = () => new Promise(resolve => { + if (internalAC.signal.aborted) { + resolve(ABORT_SENTINEL); + } else { + internalAC.signal.addEventListener('abort', () => resolve(ABORT_SENTINEL), { once: true }); + } + }); + + /** + * @returns {{ done: true, value: undefined } | undefined} + */ + const handleAbortIfPending = () => { + if (abortReason && !abortReason.delivered) { + abortReason.delivered = true; + throw abortReason.reason; + } + if (abortReason && abortReason.delivered) { + return { done: true, value: undefined }; + } + }; + /** @type {AsyncIterator["next"]} */ const nextValue = async () => { + { + const earlyAbort = handleAbortIfPending(); + if (earlyAbort) { + await markAsEnded(); + return earlyAbort; + } + } + const nextBufferedPromise = bufferedPromises[0]; if (!nextBufferedPromise) return markAsEnded(true); if (isDone) return { done: true, value: undefined }; + const raced = await Promise.race([ + ordered ? nextBufferedPromise : Promise.race(bufferedPromises), + raceAbort(), + ]); + + // Abort always wins over a buffered value that may have settled in the same tick. + if (raced === ABORT_SENTINEL || abortReason) { + const handled = handleAbortIfPending(); + await markAsEnded(); + return handled ?? { done: true, value: undefined }; + } + /** @type {Awaited} */ - const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises)); + const resolvedPromise = raced; arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise); // Wait for some of the current promises to be finished @@ -240,8 +318,26 @@ export function bufferedAsyncMap (input, callback, options) { if (isDone) { return { done: true, value: undefined }; } else if (err || done) { - if (err && !hasError) { - hasError = err instanceof Error ? err : new Error('Unknown error'); + if (err) { + const normalisedErr = err instanceof Error ? err : new Error('Unknown error'); + + // In fail-fast mode the first captured error short-circuits iteration: + // route it through the same abort machinery so the next .next() rejects + // with the original error and in-flight callbacks see signal.aborted=true. + if (errorsMode === 'fail-fast' && !abortReason) { + abortReason = { reason: normalisedErr, delivered: false }; + if (!internalAC.signal.aborted) { + internalAC.abort(normalisedErr); + } + await markAsEnded(); + if (abortReason && !abortReason.delivered) { + abortReason.delivered = true; + throw normalisedErr; + } + return { done: true, value: undefined }; + } + + capturedErrors.push(normalisedErr); } if (fromSubIterator || subIterators.length > 0) { @@ -266,15 +362,21 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {Promise>} */ let currentStep; - /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} */ + /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ const resultAsyncIterableIterator = { async next () { - currentStep = currentStep ? currentStep.then(() => nextValue()) : nextValue(); + // Chain via then(nextValue, nextValue) so a rejection on one .next() does + // not poison every subsequent call — the next call still reaches + // nextValue() which observes the post-rejection state machine. + currentStep = currentStep + ? currentStep.then(() => nextValue(), () => nextValue()) + : nextValue(); return currentStep; }, // TODO: Accept an argument, as in the spec. Look into what happens if one call return() multiple times + look into if the value provided to return is the one returned forever after 'return': () => markAsEnded(), // TODO: Add "throw", see reference in https://tc39.es/ecma262/ ? And https://twitter.com/matteocollina/status/1392056117128306691 + /** @type {NonNullable["throw"]>} */ 'throw': async (err) => { // TODO: Should remember the throw? And return a rejected promise always? await markAsEnded(); @@ -282,6 +384,9 @@ export function bufferedAsyncMap (input, callback, options) { }, [Symbol.asyncIterator]: () => resultAsyncIterableIterator, + [Symbol.asyncDispose]: async () => { + await markAsEnded(); + }, }; fillQueue(); diff --git a/package.json b/package.json index 2909d87..a260222 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "author": "Pelle Wessman (http://kodfabrik.se/)", "license": "MIT", "engines": { - "node": ">=18.6.0" + "node": ">=22.0.0" }, "type": "module", "exports": "./index.js", @@ -49,9 +49,8 @@ "devDependencies": { "@types/chai": "^4.3.19", "@types/chai-as-promised": "^7.1.8", - "@types/chai-quantifiers": "^1.0.4", "@types/mocha": "^10.0.8", - "@types/node": "^18.19.50", + "@types/node": "^22.0.0", "@types/sinon": "^17.0.3", "@types/sinon-chai": "^3.2.12", "@voxpelli/eslint-config": "^25.0.2", diff --git a/test/abort.spec.js b/test/abort.spec.js new file mode 100644 index 0000000..197930e --- /dev/null +++ b/test/abort.spec.js @@ -0,0 +1,366 @@ +/* eslint-disable promise/prefer-await-to-then */ + +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + promisableTimeout, + yieldValuesOverTime, +} from './utils.js'; + +/** + * @param {number} delayBeforeFirstYield + * @returns {AsyncIterable} + */ +async function * slowSource (delayBeforeFirstYield) { + await promisableTimeout(delayBeforeFirstYield); + yield 0; + await promisableTimeout(delayBeforeFirstYield); + yield 1; + await promisableTimeout(delayBeforeFirstYield); + yield 2; +} + +chai.use(chaiAsPromised); +chai.use(sinonChai); +const should = chai.should(); + +describe('bufferedAsyncMap() options.signal', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + // --- Validation --- + + it('AC 4.1: throws TypeError when signal is not an AbortSignal', () => { + should.Throw(() => { + bufferedAsyncMap( + yieldValuesOverTime(1, 100), + async (item) => item, + // @ts-expect-error + { signal: 'not-a-signal' } + ); + }, TypeError, 'Expected signal to be an AbortSignal'); + }); + + it('AC 4.2: omitting signal behaves identically to commit 3 (no regression)', async () => { + /** @type {number[]} */ + const result = []; + const iterator = bufferedAsyncMap(yieldValuesOverTime(3, 100), async (item) => item); + + const flow = (async () => { + for await (const v of iterator) result.push(v); + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.members([0, 1, 2]); + }); + + // --- Pre-aborted signal --- + + it('AC 4.3 + 4.4 + 4.5: pre-aborted signal: source.next never called, first .next() rejects with reason, subsequent return done', async () => { + const reason = new Error('Pre-aborted'); + const ac = new AbortController(); + ac.abort(reason); + + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + + chai.expect(await first).to.deep.equal({ rejectedWith: reason }); + nextSpy.should.not.have.been.called; + + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('AC 4.6: return() after pre-abort resolves done without throwing', async () => { + const ac = new AbortController(); + ac.abort(new Error('Pre-aborted')); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item) => item, + { signal: ac.signal } + ); + + const ret = iterator.return(); + await clock.runAllAsync(); + await ret.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + // --- Mid-iteration abort --- + + it('AC 4.7 + 4.10: parked .next() rejects with signal.reason (identity preserved)', async () => { + const reason = { custom: 'reason-object' }; + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + slowSource(1000), + async (item) => item, + { signal: ac.signal } + ); + + const parkedNext = iterator.next().catch(err => ({ rejectedWith: err })); + + // Defer the abort to fire while the .next() is parked on the slow source. + setTimeout(() => ac.abort(reason), 10); + + await clock.runAllAsync(); + + chai.expect(await parkedNext).to.deep.equal({ rejectedWith: reason }); + }); + + it('AC 4.8: fresh .next() after abort rejects with reason', async () => { + const reason = new Error('Aborted'); + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + }); + + it('AC 4.9: exactly one .next() rejects with reason; subsequent calls return done', async () => { + const reason = new Error('Once'); + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const sequence = (async () => { + for (let i = 0; i < 3; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await sequence; + + chai.expect(results[0]).to.deep.equal({ rejected: true, value: reason }); + chai.expect(results[1]).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + chai.expect(results[2]).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + }); + + it('AC 4.11 + 4.12: source.next not called after abort; source.return called once', async () => { + const ac = new AbortController(); + + const source = yieldValuesOverTime(20, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal, bufferSize: 2 } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + const callsBeforeAbort = nextSpy.callCount; + + ac.abort(new Error('stop')); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + const r = await next; + should.exist(r); + + // Drain any further calls (should be none). + await iterator.next(); + await clock.runAllAsync(); + + nextSpy.callCount.should.equal(callsBeforeAbort); + returnSpy.should.have.been.calledOnce; + }); + + it('AC 4.13: in-flight callbacks observe signal.aborted=true after external abort', async () => { + const ac = new AbortController(); + /** @type {AbortSignal | undefined} */ + let captured; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + captured = signal; + return item; + }, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + chai.expect(captured?.aborted).to.equal(false); + + ac.abort(new Error('stop')); + + const n = iterator.next().catch(err => err); + await clock.runAllAsync(); + await n; + + chai.expect(captured?.aborted).to.equal(true); + }); + + // --- Close races --- + + it('AC 4.17: return() before abort makes subsequent .next() return done without throwing', async () => { + const ac = new AbortController(); + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + const returned = iterator.return(); + await clock.runAllAsync(); + await returned; + + ac.abort(new Error('late')); + + const final = iterator.next(); + await clock.runAllAsync(); + await final.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('AC 4.18: return() and abort fired together: cleanup runs once, no double-throw', async () => { + const ac = new AbortController(); + + // 200 items so the source is not naturally exhausted between consuming + // the first item and calling return() — that way markAsEnded actually + // has work to do on the source iterator. + const source = yieldValuesOverTime(200, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal, bufferSize: 2 } + ); + + const first = iterator.next(); + // Advance only enough for the first item to land, leaving the source live. + await clock.tickAsync(0); + await first; + + const ret = iterator.return(); + ac.abort(new Error('parallel')); + + await clock.runAllAsync(); + await ret; + + returnSpy.should.have.been.calledOnce; + }); + + it('AC 4.19: throw(err) after abort delivered behaves like throw on a closed iterator', async () => { + const ac = new AbortController(); + const reason = new Error('aborted'); + const tossed = new Error('post-abort throw'); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const aborted = iterator.next().catch(err => err); + await clock.runAllAsync(); + await aborted; + + const tossedNext = iterator.throw(tossed).catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await tossedNext).to.deep.equal({ rejectedWith: tossed }); + }); + + // --- ordered:true coverage --- + + it('AC 4.15: abort works in ordered:true mode', async () => { + const ac = new AbortController(); + const reason = new Error('ordered-abort'); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal, ordered: true } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + + const after = iterator.next(); + await clock.runAllAsync(); + await after.should.eventually.deep.equal({ done: true, value: undefined }); + }); +}); diff --git a/test/dispose.spec.js b/test/dispose.spec.js new file mode 100644 index 0000000..d72223b --- /dev/null +++ b/test/dispose.spec.js @@ -0,0 +1,113 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +chai.should(); + +describe('bufferedAsyncMap() Symbol.asyncDispose', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('AC 2.1: should expose Symbol.asyncDispose as a function returning a Promise', async () => { + const iterator = bufferedAsyncMap(yieldValuesOverTime(3, 100), async (item) => item); + + chai.expect(iterator[Symbol.asyncDispose]).to.be.a('function'); + const result = iterator[Symbol.asyncDispose](); + chai.expect(result).to.be.a('promise'); + + await clock.runAllAsync(); + await result; + }); + + it('AC 2.2: dispose should run the same cleanup as return() (source.return called)', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + await iterator.next(); + + const disposeResult = iterator[Symbol.asyncDispose](); + await clock.runAllAsync(); + await disposeResult.should.eventually.equal(undefined); + + returnSpy.should.have.been.calledOnce; + + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + }); + + // Simulates the desugaring of `await using iterator = bufferedAsyncMap(...)` — + // the runtime support for the syntax landed after Node 22.x, but the + // semantics we promise (cleanup on scope exit via Symbol.asyncDispose) are + // testable directly. + it('AC 2.3: should run cleanup on scope exit (await using desugared)', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const promised = (async () => { + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + try { + // eslint-disable-next-line no-unreachable-loop + for await (const v of iterator) { + chai.expect(v).to.equal(0); + break; + } + } finally { + await iterator[Symbol.asyncDispose](); + } + })(); + + await clock.runAllAsync(); + await promised; + + returnSpy.should.have.been.calledOnce; + }); + + it('AC 2.4: dispose should be idempotent after return()', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + const flow = (async () => { + await iterator.next(); + await iterator.return(); + await iterator[Symbol.asyncDispose](); + })(); + + await clock.runAllAsync(); + await flow; + + returnSpy.should.have.been.calledOnce; + }); +}); diff --git a/test/errors-fail-fast.spec.js b/test/errors-fail-fast.spec.js new file mode 100644 index 0000000..c9ac6ec --- /dev/null +++ b/test/errors-fail-fast.spec.js @@ -0,0 +1,360 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + promisableTimeout, + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +const should = chai.should(); + +/** + * @template T + * @param {T[]} items + * @returns {AsyncIterable} + */ +async function * fromArray (items) { + for (const item of items) { + yield item; + } +} + +/** + * @param {Error} expected + * @returns {AsyncIterable} + */ +async function * sourceThatThrows (expected) { + yield 0; + yield 1; + throw expected; +} + +describe('bufferedAsyncMap() errors: fail-fast', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + // --- Validation & default --- + + it("AC 5.1: throws TypeError when errors is not 'fail-eventually'/'fail-fast'", () => { + should.Throw(() => { + bufferedAsyncMap( + fromArray([0, 1]), + async (item) => item, + // @ts-expect-error + { errors: 'isolate' } + ); + }, TypeError, "Expected errors to be 'fail-eventually' or 'fail-fast'"); + }); + + it("AC 5.2: omitting errors option (or 'fail-eventually') keeps current AggregateError behaviour", async () => { + const errA = new Error('A'); + const errB = new Error('B'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(errA) + .onCall(1).rejects(errB); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + chai.expect(caught).to.be.instanceOf(AggregateError); + }); + + // --- fail-fast semantics --- + + it('AC 5.3: first callback error rejects next .next() with that error; subsequent calls return done', async () => { + const reason = new Error('cb-error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(1).rejects(reason); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(20, 100), + callback, + { errors: 'fail-fast', bufferSize: 2 } + ); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const flow = (async () => { + for (let i = 0; i < 5; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await flow; + + const firstReject = results.findIndex(r => r.rejected); + chai.expect(firstReject).to.be.greaterThan(-1); + chai.expect(results[firstReject]?.value).to.equal(reason); + for (const r of results.slice(firstReject + 1)) { + chai.expect(r).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + } + }); + + it('AC 5.4: source error fails fast', async () => { + const sourceError = new Error('src-error'); + + const iterator = bufferedAsyncMap( + sourceThatThrows(sourceError), + async (item) => item, + { errors: 'fail-fast' } + ); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(sourceError); + }); + + it('AC 5.5 + 5.6: source.next not called after first error; source.return called once', async () => { + const reason = new Error('halt'); + const source = yieldValuesOverTime(50, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(reason); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + callback, + { errors: 'fail-fast', bufferSize: 1 } + ); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(reason); + + const callsAfterFail = nextSpy.callCount; + + await iterator.next(); + await clock.runAllAsync(); + + nextSpy.callCount.should.equal(callsAfterFail); + returnSpy.should.have.been.calledOnce; + }); + + it('AC 5.7: in-flight callbacks observe signal.aborted=true after fail-fast error', async () => { + const reason = new Error('halt'); + /** @type {AbortSignal | undefined} */ + let captured; + + let callIdx = 0; + const callback = async ( + /** @type {number} */ item, + /** @type {{ signal: AbortSignal }} */ { signal } + ) => { + const i = callIdx++; + captured = signal; + if (i === 1) { + // Slow second callback so the first failure has time to trigger fail-fast + // before this one settles. + await promisableTimeout(500); + throw reason; + } + if (i === 2) { + await promisableTimeout(2000); + return item; + } + return item; + }; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(20, 50), + callback, + { errors: 'fail-fast', bufferSize: 4 } + ); + + /** @type {unknown} */ + let caught; + const flow = (async () => { + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + })(); + + await clock.runAllAsync(); + await flow; + chai.expect(caught).to.equal(reason); + chai.expect(captured?.aborted).to.equal(true); + }); + + it('AC 5.8: rejected error is the original (not AggregateError)', async () => { + const reason = new Error('original'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(reason); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { errors: 'fail-fast' })) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(reason); + chai.expect(caught).to.not.be.instanceOf(AggregateError); + }); + + it('AC 5.9: fail-fast works in ordered:true mode', async () => { + const reason = new Error('ordered-fail'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(1).rejects(reason); + + /** @type {unknown} */ + let caught; + const flow = (async () => { + try { + for await (const v of bufferedAsyncMap( + yieldValuesOverTime(10, 100), + callback, + { errors: 'fail-fast', ordered: true, bufferSize: 3 } + )) { + // drain — should yield 0 then fail on 1 + chai.expect(v).to.equal(0); + } + } catch (err) { + caught = err; + } + })(); + + await clock.runAllAsync(); + await flow; + chai.expect(caught).to.equal(reason); + }); + + // --- Interaction with abort --- + + it('AC 5.11: external abort wins over a fail-fast error not yet captured', async () => { + const externalReason = new Error('external'); + const ac = new AbortController(); + + let callIdx = 0; + const callback = async (/** @type {number} */ item) => { + const i = callIdx++; + if (i === 0) return item; + await promisableTimeout(2000); + throw new Error('would-have-failed'); + }; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(10, 50), + callback, + { signal: ac.signal, errors: 'fail-fast', bufferSize: 2 } + ); + + const first = iterator.next(); + await clock.tickAsync(60); + await first; + + ac.abort(externalReason); + + /** @type {unknown} */ + let caught; + try { + const n = iterator.next(); + await clock.runAllAsync(); + await n; + } catch (err) { + caught = err; + } + + chai.expect(caught).to.equal(externalReason); + }); + + it('AC 5.13: external abort wins over queued errors in fail-eventually mode', async () => { + const externalReason = new Error('external'); + const ac = new AbortController(); + + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(new Error('cb-error-A')) + .onCall(1).rejects(new Error('cb-error-B')); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(10, 100), + callback, + { signal: ac.signal, bufferSize: 3 } + ); + + // Let some items load and fail in the background; the errors get queued. + await clock.tickAsync(50); + ac.abort(externalReason); + + /** @type {unknown} */ + let caught; + try { + const n = iterator.next(); + await clock.runAllAsync(); + await n; + } catch (err) { + caught = err; + } + + chai.expect(caught).to.equal(externalReason); + chai.expect(caught).to.not.be.instanceOf(AggregateError); + }); +}); diff --git a/test/errors.spec.js b/test/errors.spec.js new file mode 100644 index 0000000..37969f6 --- /dev/null +++ b/test/errors.spec.js @@ -0,0 +1,153 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; + +import { + bufferedAsyncMap, +} from '../index.js'; + +chai.use(chaiAsPromised); +chai.should(); + +/** + * @template T + * @param {T[]} items + * @returns {AsyncIterable} + */ +async function * fromArray (items) { + for (const item of items) { + yield item; + } +} + +/** + * @param {Error} expected + * @returns {AsyncIterable} + */ +async function * sourceThatThrows (expected) { + yield 0; + yield 1; + throw expected; +} + +describe('bufferedAsyncMap() errors', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('AC 1.1: should reject with the original error (identity preserved) when exactly one error is captured', async () => { + const rejectionError = new Error('Single error'); + const callback = sinon.stub() + .returnsArg(0) + .onSecondCall().rejects(rejectionError); + + const promised = (async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback)) { + // drain + } + })(); + + await clock.runAllAsync(); + await promised.should.be.rejectedWith(rejectionError); + }); + + it('AC 1.2: should reject with AggregateError containing all errors when 2+ errors are captured', async () => { + const errA = new Error('Error A'); + const errB = new Error('Error B'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(errA) + .onCall(1).rejects(errB); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + chai.expect(caught).to.be.instanceOf(AggregateError); + /** @type {AggregateError} */ (caught).errors.should.deep.equal([errA, errB]); + }); + + it('AC 1.3: should include both source and callback errors in the AggregateError', async () => { + const sourceError = new Error('Source error'); + const callbackError = new Error('Callback error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(callbackError); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(sourceThatThrows(sourceError), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + chai.expect(caught).to.be.instanceOf(AggregateError); + const { errors } = /** @type {AggregateError} */ (caught); + errors.should.include(sourceError); + errors.should.include(callbackError); + }); + + it('AC 1.4: should resolve cleanly when no errors are captured (no regression)', async () => { + /** @type {number[]} */ + const result = []; + + const promised = (async () => { + for await (const v of bufferedAsyncMap(fromArray([0, 1, 2]), async (item) => item * 2)) { + result.push(v); + } + })(); + + await clock.runAllAsync(); + await promised; + + result.should.have.members([0, 2, 4]); + }); + + it('AC 1.5: should not retract values delivered before the throw', async () => { + const rejectionError = new Error('Late error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(2).rejects(rejectionError); + + /** @type {number[]} */ + const delivered = []; + /** @type {Error | undefined} */ + let caught; + + try { + for await (const v of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 1 })) { + delivered.push(v); + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + delivered.should.deep.equal([0, 1]); + chai.expect(caught).to.equal(rejectionError); + }); +}); diff --git a/test/per-task-signal.spec.js b/test/per-task-signal.spec.js new file mode 100644 index 0000000..1f6486a --- /dev/null +++ b/test/per-task-signal.spec.js @@ -0,0 +1,184 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +chai.should(); + +describe('bufferedAsyncMap() per-task AbortSignal', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('AC 3.1 + 3.2: invokes callback with (item, { signal }) where signal.aborted === false', async () => { + /** @type {Array<{ item: number, signalIsAbortSignal: boolean, abortedAtCall: boolean }>} */ + const observations = []; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item, opts) => { + observations.push({ + item, + signalIsAbortSignal: opts.signal instanceof AbortSignal, + abortedAtCall: opts.signal.aborted, + }); + return item; + } + ); + + const flow = (async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + })(); + + await clock.runAllAsync(); + await flow; + + observations.should.have.length(3); + for (const o of observations) { + o.signalIsAbortSignal.should.equal(true); + o.abortedAtCall.should.equal(false); + } + }); + + it('AC 3.4: in-flight callbacks observe signal.aborted=true after iterator.return()', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + chai.expect(capturedSignal).to.exist; + chai.expect(capturedSignal?.aborted).to.equal(false); + + const returned = iterator.return(); + await clock.runAllAsync(); + await returned; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('AC 3.5: in-flight callbacks observe signal.aborted=true after iterator.throw()', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + + const flow = (async () => { + try { + await iterator.throw(new Error('boom')); + } catch { + // expected + } + })(); + + await clock.runAllAsync(); + await flow; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('AC 3.6: in-flight callbacks observe signal.aborted=true after Symbol.asyncDispose', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + + const disposed = iterator[Symbol.asyncDispose](); + await clock.runAllAsync(); + await disposed; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('AC 3.7: in-flight callbacks do NOT observe signal.aborted=true on natural source exhaustion', async () => { + /** @type {AbortSignal | undefined} */ + let lastSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(2, 100), + async (item, { signal }) => { + lastSignal = signal; + return item; + } + ); + + /** @type {number[]} */ + const result = []; + const flow = (async () => { + for await (const v of iterator) { + result.push(v); + // Capture the signal state *after* the callback has been invoked but + // before the iterator finalises by reading the signal aborted state + // before draining further. + chai.expect(lastSignal?.aborted).to.equal(false); + } + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.length(2); + }); + + it('AC 3.3: callbacks ignoring the second arg keep working unmodified', async () => { + /** @type {number[]} */ + const result = []; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item) => item * 10 + ); + + const flow = (async () => { + for await (const v of iterator) { + result.push(v); + } + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.members([0, 10, 20]); + }); +}); diff --git a/test/throw.spec.js b/test/throw.spec.js index 52fc2e8..5d00544 100644 --- a/test/throw.spec.js +++ b/test/throw.spec.js @@ -1,3 +1,5 @@ +/* eslint-disable promise/prefer-await-to-then */ + import chai from 'chai'; import chaiAsPromised from 'chai-as-promised'; import chaiQuantifiers from 'chai-quantifiers'; @@ -17,60 +19,85 @@ chai.use(sinonChai); chai.should(); -describe.skip('bufferedAsyncMap() AsyncInterface throw()', () => { +describe('bufferedAsyncMap() AsyncInterface throw()', () => { const count = 6; + /** @type {import('sinon').SinonFakeTimers} */ + let clock; /** @type {AsyncIterable} */ let baseAsyncIterable; - /** @type {number[]} */ - let expectedResult; beforeEach(() => { + clock = sinon.useFakeTimers(); baseAsyncIterable = yieldValuesOverTime(count, (i) => i % 2 === 1 ? 2000 : 100); + }); - expectedResult = []; - for (let i = 0; i < count; i++) { - expectedResult.push(i); - } + afterEach(() => { + sinon.restore(); }); - it('should end the iterator when called', async () => { - const errorToThrow = new Error('Yet another error'); + it('AC 6.1 + 6.2: throw(err) rejects with err and subsequent .next() returns done', async () => { + const errorToThrow = new Error('thrown'); const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - await iterator.next().should.eventually.deep.equal({ value: 0 }); - await iterator.throw(errorToThrow).should.eventually.be.rejectedWith(errorToThrow); - await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + const first = iterator.next(); + await clock.runAllAsync(); + await first.should.eventually.deep.equal({ value: 0 }); + + const tossed = iterator.throw(errorToThrow).catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await tossed).to.deep.equal({ rejectedWith: errorToThrow }); + + const after = iterator.next(); + await clock.runAllAsync(); + await after.should.eventually.deep.equal({ done: true, value: undefined }); }); - it('should be called when a loop throws', async () => { - const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - const returnSpy = sinon.spy(iterator, 'return'); - const throwSpy = sinon.spy(iterator, 'throw'); - const errorToThrow = new Error('Yet another error'); - - let caught; - - // Inspired by https://github.com/WebKit/WebKit/blob/1a09d8d95ba6085df4ef44306c4bfc9fc86fdbc7/JSTests/test262/test/language/expressions/yield/star-rhs-iter-thrw-thrw-get-err.js - async function * g () { - try { - yield * iterator; - } catch (err) { - caught = err; - throw err; + it('AC 6.2: throw(err) calls source.return() once', async () => { + const errorToThrow = new Error('thrown'); + const source = yieldValuesOverTime(50, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + const first = iterator.next(); + await clock.tickAsync(0); + await first; + + const tossed = iterator.throw(errorToThrow).catch(err => err); + await clock.runAllAsync(); + await tossed; + + returnSpy.should.have.been.calledOnce; + }); + + it('AC 6.2: in-flight callbacks observe signal.aborted=true after throw()', async () => { + const errorToThrow = new Error('thrown'); + /** @type {AbortSignal | undefined} */ + let captured; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(50, 100), + async (item, { signal }) => { + captured = signal; + return item; } - } + ); - const wrappedIterator = g(); + const first = iterator.next(); + await clock.tickAsync(0); + await first; + chai.expect(captured?.aborted).to.equal(false); - await wrappedIterator.next().should.eventually.deep.equal({ done: false, value: 0 }); - await wrappedIterator.throw(errorToThrow).should.eventually.be.rejectedWith(errorToThrow); - await wrappedIterator.next().should.eventually.deep.equal({ done: true, value: undefined }); - await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + const tossed = iterator.throw(errorToThrow).catch(err => err); + await clock.runAllAsync(); + await tossed; - (caught || {}).should.equal(errorToThrow); - throwSpy.should.have.been.calledOnceWithExactly(errorToThrow); - returnSpy.should.not.have.been.called; + chai.expect(captured?.aborted).to.equal(true); }); }); diff --git a/test/values.spec.js b/test/values.spec.js index a490bae..d372cb7 100644 --- a/test/values.spec.js +++ b/test/values.spec.js @@ -612,7 +612,8 @@ describe('bufferedAsyncMap() values', () => { throw new Error('Expected a rejection'); }, err => { - err.should.equal(rejectionError); + const captured = err instanceof AggregateError ? err.errors[0] : err; + captured.should.equal(rejectionError); } ); diff --git a/tsconfig.json b/tsconfig.json index 28b2974..48916c9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,5 +1,5 @@ { - "extends": "@voxpelli/tsconfig/node18.json", + "extends": "@voxpelli/tsconfig/node20.json", "files": [ "index.js", ],