From 4ab2a5a03379d6db8136aef62bb06ad3b7890878 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 14:58:30 +0000 Subject: [PATCH 01/10] fix: aggregate captured errors instead of dropping after the first Previously, once a callback or source threw, the iterator would record only the first error and silently drop any subsequent ones still in the buffer. The captured error was then thrown when the buffer drained. Capture all errors into an array. On drain, throw the original error when only one was captured (identity-preserving), or throw an AggregateError containing all captured errors when there were two or more. The existing generator-map rejection test relied on the dropped-errors behaviour and is updated to unwrap AggregateError when present. --- index.js | 16 +++-- test/errors.spec.js | 153 ++++++++++++++++++++++++++++++++++++++++++++ test/values.spec.js | 3 +- 3 files changed, 164 insertions(+), 8 deletions(-) create mode 100644 test/errors.spec.js diff --git a/index.js b/index.js index 6f1a2fd..bbb5663 100644 --- a/index.js +++ b/index.js @@ -72,8 +72,8 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {boolean} */ let isDone; - /** @type {Error|undefined} */ - let hasError; + /** @type {Error[]} */ + const capturedErrors = []; /** * @param {boolean} [throwAnyError] @@ -97,8 +97,10 @@ export function bufferedAsyncMap (input, callback, options) { bufferedPromises.splice(0); subIterators.splice(0); - if (throwAnyError && hasError) { - throw hasError; + if (throwAnyError && capturedErrors.length > 0) { + throw capturedErrors.length === 1 + ? capturedErrors[0] + : new AggregateError(capturedErrors, 'Multiple errors in bufferedAsyncMap'); } } @@ -106,7 +108,7 @@ export function bufferedAsyncMap (input, callback, options) { }; const fillQueue = () => { - if (hasError || isDone) return; + if (capturedErrors.length > 0 || isDone) return; /** @type {AsyncIterator|undefined} */ let currentSubIterator; @@ -240,8 +242,8 @@ export function bufferedAsyncMap (input, callback, options) { if (isDone) { return { done: true, value: undefined }; } else if (err || done) { - if (err && !hasError) { - hasError = err instanceof Error ? err : new Error('Unknown error'); + if (err) { + capturedErrors.push(err instanceof Error ? err : new Error('Unknown error')); } if (fromSubIterator || subIterators.length > 0) { diff --git a/test/errors.spec.js b/test/errors.spec.js new file mode 100644 index 0000000..37969f6 --- /dev/null +++ b/test/errors.spec.js @@ -0,0 +1,153 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; + +import { + bufferedAsyncMap, +} from '../index.js'; + +chai.use(chaiAsPromised); +chai.should(); + +/** + * @template T + * @param {T[]} items + * @returns {AsyncIterable} + */ +async function * fromArray (items) { + for (const item of items) { + yield item; + } +} + +/** + * @param {Error} expected + * @returns {AsyncIterable} + */ +async function * sourceThatThrows (expected) { + yield 0; + yield 1; + throw expected; +} + +describe('bufferedAsyncMap() errors', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('AC 1.1: should reject with the original error (identity preserved) when exactly one error is captured', async () => { + const rejectionError = new Error('Single error'); + const callback = sinon.stub() + .returnsArg(0) + .onSecondCall().rejects(rejectionError); + + const promised = (async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback)) { + // drain + } + })(); + + await clock.runAllAsync(); + await promised.should.be.rejectedWith(rejectionError); + }); + + it('AC 1.2: should reject with AggregateError containing all errors when 2+ errors are captured', async () => { + const errA = new Error('Error A'); + const errB = new Error('Error B'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(errA) + .onCall(1).rejects(errB); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + chai.expect(caught).to.be.instanceOf(AggregateError); + /** @type {AggregateError} */ (caught).errors.should.deep.equal([errA, errB]); + }); + + it('AC 1.3: should include both source and callback errors in the AggregateError', async () => { + const sourceError = new Error('Source error'); + const callbackError = new Error('Callback error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(callbackError); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(sourceThatThrows(sourceError), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + chai.expect(caught).to.be.instanceOf(AggregateError); + const { errors } = /** @type {AggregateError} */ (caught); + errors.should.include(sourceError); + errors.should.include(callbackError); + }); + + it('AC 1.4: should resolve cleanly when no errors are captured (no regression)', async () => { + /** @type {number[]} */ + const result = []; + + const promised = (async () => { + for await (const v of bufferedAsyncMap(fromArray([0, 1, 2]), async (item) => item * 2)) { + result.push(v); + } + })(); + + await clock.runAllAsync(); + await promised; + + result.should.have.members([0, 2, 4]); + }); + + it('AC 1.5: should not retract values delivered before the throw', async () => { + const rejectionError = new Error('Late error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(2).rejects(rejectionError); + + /** @type {number[]} */ + const delivered = []; + /** @type {Error | undefined} */ + let caught; + + try { + for await (const v of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 1 })) { + delivered.push(v); + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + await clock.runAllAsync(); + + delivered.should.deep.equal([0, 1]); + chai.expect(caught).to.equal(rejectionError); + }); +}); diff --git a/test/values.spec.js b/test/values.spec.js index a490bae..d372cb7 100644 --- a/test/values.spec.js +++ b/test/values.spec.js @@ -612,7 +612,8 @@ describe('bufferedAsyncMap() values', () => { throw new Error('Expected a rejection'); }, err => { - err.should.equal(rejectionError); + const captured = err instanceof AggregateError ? err.errors[0] : err; + captured.should.equal(rejectionError); } ); From 0e97e07aa5a9baeabbc24a3fd5b3387a37caad73 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:06:38 +0000 Subject: [PATCH 02/10] feat: add Symbol.asyncDispose for await using support Aliases the new dispose method to the existing return() cleanup path so `await using it = bufferedAsyncMap(...)` runs source.return(), clears buffers, and is idempotent on repeat dispose/return calls. Bumps the supported Node range to >=22.0.0 so the well-known Symbol.asyncDispose is always available natively (Node 18 and 20 are both EOL as of May 2026), and updates the tsconfig preset and @types/node devDep to match. --- index.js | 8 ++- package.json | 4 +- test/dispose.spec.js | 113 +++++++++++++++++++++++++++++++++++++++++++ tsconfig.json | 2 +- 4 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 test/dispose.spec.js diff --git a/index.js b/index.js index bbb5663..61c4c71 100644 --- a/index.js +++ b/index.js @@ -35,7 +35,7 @@ export async function * mergeIterables (input, { bufferSize } = {}) { * @param {AsyncIterable | Iterable | T[]} input * @param {(item: T) => (Promise|AsyncIterable)} callback * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] - * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} + * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ export function bufferedAsyncMap (input, callback, options) { /** @typedef {Promise> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */ @@ -268,7 +268,7 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {Promise>} */ let currentStep; - /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]> }} */ + /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ const resultAsyncIterableIterator = { async next () { currentStep = currentStep ? currentStep.then(() => nextValue()) : nextValue(); @@ -277,6 +277,7 @@ export function bufferedAsyncMap (input, callback, options) { // TODO: Accept an argument, as in the spec. Look into what happens if one call return() multiple times + look into if the value provided to return is the one returned forever after 'return': () => markAsEnded(), // TODO: Add "throw", see reference in https://tc39.es/ecma262/ ? And https://twitter.com/matteocollina/status/1392056117128306691 + /** @type {NonNullable["throw"]>} */ 'throw': async (err) => { // TODO: Should remember the throw? And return a rejected promise always? await markAsEnded(); @@ -284,6 +285,9 @@ export function bufferedAsyncMap (input, callback, options) { }, [Symbol.asyncIterator]: () => resultAsyncIterableIterator, + [Symbol.asyncDispose]: async () => { + await markAsEnded(); + }, }; fillQueue(); diff --git a/package.json b/package.json index 2909d87..0836763 100644 --- a/package.json +++ b/package.json @@ -15,7 +15,7 @@ "author": "Pelle Wessman (http://kodfabrik.se/)", "license": "MIT", "engines": { - "node": ">=18.6.0" + "node": ">=22.0.0" }, "type": "module", "exports": "./index.js", @@ -51,7 +51,7 @@ "@types/chai-as-promised": "^7.1.8", "@types/chai-quantifiers": "^1.0.4", "@types/mocha": "^10.0.8", - "@types/node": "^18.19.50", + "@types/node": "^22.0.0", "@types/sinon": "^17.0.3", "@types/sinon-chai": "^3.2.12", "@voxpelli/eslint-config": "^25.0.2", diff --git a/test/dispose.spec.js b/test/dispose.spec.js new file mode 100644 index 0000000..d72223b --- /dev/null +++ b/test/dispose.spec.js @@ -0,0 +1,113 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +chai.should(); + +describe('bufferedAsyncMap() Symbol.asyncDispose', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('AC 2.1: should expose Symbol.asyncDispose as a function returning a Promise', async () => { + const iterator = bufferedAsyncMap(yieldValuesOverTime(3, 100), async (item) => item); + + chai.expect(iterator[Symbol.asyncDispose]).to.be.a('function'); + const result = iterator[Symbol.asyncDispose](); + chai.expect(result).to.be.a('promise'); + + await clock.runAllAsync(); + await result; + }); + + it('AC 2.2: dispose should run the same cleanup as return() (source.return called)', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + await iterator.next(); + + const disposeResult = iterator[Symbol.asyncDispose](); + await clock.runAllAsync(); + await disposeResult.should.eventually.equal(undefined); + + returnSpy.should.have.been.calledOnce; + + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + }); + + // Simulates the desugaring of `await using iterator = bufferedAsyncMap(...)` — + // the runtime support for the syntax landed after Node 22.x, but the + // semantics we promise (cleanup on scope exit via Symbol.asyncDispose) are + // testable directly. + it('AC 2.3: should run cleanup on scope exit (await using desugared)', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const promised = (async () => { + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + try { + // eslint-disable-next-line no-unreachable-loop + for await (const v of iterator) { + chai.expect(v).to.equal(0); + break; + } + } finally { + await iterator[Symbol.asyncDispose](); + } + })(); + + await clock.runAllAsync(); + await promised; + + returnSpy.should.have.been.calledOnce; + }); + + it('AC 2.4: dispose should be idempotent after return()', async () => { + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + const flow = (async () => { + await iterator.next(); + await iterator.return(); + await iterator[Symbol.asyncDispose](); + })(); + + await clock.runAllAsync(); + await flow; + + returnSpy.should.have.been.calledOnce; + }); +}); diff --git a/tsconfig.json b/tsconfig.json index 28b2974..48916c9 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,5 +1,5 @@ { - "extends": "@voxpelli/tsconfig/node18.json", + "extends": "@voxpelli/tsconfig/node20.json", "files": [ "index.js", ], From dd4bfe4000af614330a9d89e5e60e35e855df06a Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:08:23 +0000 Subject: [PATCH 03/10] feat: forward AbortSignal to callbacks, abort on iterator close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each call to bufferedAsyncMap now mints an internal AbortController whose signal is passed as the second argument to the user callback — `callback(item, { signal })`. The internal controller is aborted from inside markAsEnded() so iterator.return(), iterator.throw(), and Symbol.asyncDispose all surface as `signal.aborted === true` to any in-flight callback within one microtask, giving callbacks a fast-path to bail out of long-running fetches/loops on shutdown. Existing one-arg callbacks keep working — JavaScript ignores the extra argument — so this widening is non-breaking. Mirrors the pattern from mcollina/hwp; the consumer-supplied signal option layered on top arrives in the next commit. --- index.js | 11 ++- test/per-task-signal.spec.js | 184 +++++++++++++++++++++++++++++++++++ 2 files changed, 193 insertions(+), 2 deletions(-) create mode 100644 test/per-task-signal.spec.js diff --git a/index.js b/index.js index 61c4c71..8e5e4ae 100644 --- a/index.js +++ b/index.js @@ -33,7 +33,7 @@ export async function * mergeIterables (input, { bufferSize } = {}) { * @template T * @template R * @param {AsyncIterable | Iterable | T[]} input - * @param {(item: T) => (Promise|AsyncIterable)} callback + * @param {(item: T, opts: { signal: AbortSignal }) => (Promise|AsyncIterable)} callback * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ @@ -75,6 +75,9 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {Error[]} */ const capturedErrors = []; + // Internal controller; aborts on iterator close (return/throw/dispose/source-exhaustion-with-cleanup) so callbacks can fast-path on shutdown. + const internalAC = new AbortController(); + /** * @param {boolean} [throwAnyError] * @returns {Promise>} @@ -83,6 +86,10 @@ export function bufferedAsyncMap (input, callback, options) { if (!isDone) { isDone = true; + if (!internalAC.signal.aborted) { + internalAC.abort(); + } + // TODO: Errors from here, how to handle? allSettled() ensures they will be caught at least await Promise.allSettled( [ @@ -173,7 +180,7 @@ export function bufferedAsyncMap (input, callback, options) { } // eslint-disable-next-line promise/no-callback-in-promise - const callbackResult = callback(result.value); + const callbackResult = callback(result.value, { signal: internalAC.signal }); const isSubIterator = isAsyncIterable(callbackResult); /** @type {Awaited} */ diff --git a/test/per-task-signal.spec.js b/test/per-task-signal.spec.js new file mode 100644 index 0000000..1f6486a --- /dev/null +++ b/test/per-task-signal.spec.js @@ -0,0 +1,184 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +chai.should(); + +describe('bufferedAsyncMap() per-task AbortSignal', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('AC 3.1 + 3.2: invokes callback with (item, { signal }) where signal.aborted === false', async () => { + /** @type {Array<{ item: number, signalIsAbortSignal: boolean, abortedAtCall: boolean }>} */ + const observations = []; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item, opts) => { + observations.push({ + item, + signalIsAbortSignal: opts.signal instanceof AbortSignal, + abortedAtCall: opts.signal.aborted, + }); + return item; + } + ); + + const flow = (async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + })(); + + await clock.runAllAsync(); + await flow; + + observations.should.have.length(3); + for (const o of observations) { + o.signalIsAbortSignal.should.equal(true); + o.abortedAtCall.should.equal(false); + } + }); + + it('AC 3.4: in-flight callbacks observe signal.aborted=true after iterator.return()', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + chai.expect(capturedSignal).to.exist; + chai.expect(capturedSignal?.aborted).to.equal(false); + + const returned = iterator.return(); + await clock.runAllAsync(); + await returned; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('AC 3.5: in-flight callbacks observe signal.aborted=true after iterator.throw()', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + + const flow = (async () => { + try { + await iterator.throw(new Error('boom')); + } catch { + // expected + } + })(); + + await clock.runAllAsync(); + await flow; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('AC 3.6: in-flight callbacks observe signal.aborted=true after Symbol.asyncDispose', async () => { + /** @type {AbortSignal | undefined} */ + let capturedSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + capturedSignal = signal; + return item; + } + ); + + await iterator.next(); + + const disposed = iterator[Symbol.asyncDispose](); + await clock.runAllAsync(); + await disposed; + + chai.expect(capturedSignal?.aborted).to.equal(true); + }); + + it('AC 3.7: in-flight callbacks do NOT observe signal.aborted=true on natural source exhaustion', async () => { + /** @type {AbortSignal | undefined} */ + let lastSignal; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(2, 100), + async (item, { signal }) => { + lastSignal = signal; + return item; + } + ); + + /** @type {number[]} */ + const result = []; + const flow = (async () => { + for await (const v of iterator) { + result.push(v); + // Capture the signal state *after* the callback has been invoked but + // before the iterator finalises by reading the signal aborted state + // before draining further. + chai.expect(lastSignal?.aborted).to.equal(false); + } + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.length(2); + }); + + it('AC 3.3: callbacks ignoring the second arg keep working unmodified', async () => { + /** @type {number[]} */ + const result = []; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item) => item * 10 + ); + + const flow = (async () => { + for await (const v of iterator) { + result.push(v); + } + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.members([0, 10, 20]); + }); +}); From 7a82a16909da7122241cb886f94b546a38848090 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:18:19 +0000 Subject: [PATCH 04/10] feat: accept options.signal for external cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds opts.signal: AbortSignal so consumers can cancel iteration without hand-wiring signal.addEventListener('abort', () => it.return()). Contract: - Pre-aborted signal: source.next() is never called and the first iterator.next() rejects synchronously with signal.reason. - Mid-iteration abort: the next pending or freshly-called iterator.next() rejects exactly once with signal.reason; subsequent iterator.next() calls return { done: true, value: undefined }. - After abort: the source iterator's .return() runs once via the existing markAsEnded() path, and in-flight callbacks observe signal.aborted === true on the second-arg signal within one microtask. - signal.reason is preserved by identity, including non-Error reasons. - Abort wins over a buffered value resolving in the same tick. - Holds in both ordered and unordered modes and across sub-iterator callbacks. Implementation: - Validates options.signal is undefined or AbortSignal at construction time. - Links external → internal AbortController by hand (simple addEventListener, no AbortSignal.any) and short-circuits if the iterator was already closed via return()/throw()/dispose so a late abort is a no-op. - nextValue() races the buffered await against an abort sentinel and threads abort-state through a dedicated handleAbortIfPending() helper so the "reject once, then done forever" contract is centralised. - The currentStep .next() chain is now then(nextValue, nextValue) so a rejection on one .next() does not poison every subsequent call — required for the post-abort done semantics. --- index.js | 82 +++++++++- test/abort.spec.js | 366 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 444 insertions(+), 4 deletions(-) create mode 100644 test/abort.spec.js diff --git a/index.js b/index.js index 8e5e4ae..b161af1 100644 --- a/index.js +++ b/index.js @@ -34,7 +34,7 @@ export async function * mergeIterables (input, { bufferSize } = {}) { * @template R * @param {AsyncIterable | Iterable | T[]} input * @param {(item: T, opts: { signal: AbortSignal }) => (Promise|AsyncIterable)} callback - * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined }} [options] + * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined, signal?: AbortSignal|undefined }} [options] * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ export function bufferedAsyncMap (input, callback, options) { @@ -42,6 +42,7 @@ export function bufferedAsyncMap (input, callback, options) { const { bufferSize = 6, ordered = false, + signal: externalSignal, } = options || {}; /** @type {AsyncIterable} */ @@ -53,6 +54,7 @@ export function bufferedAsyncMap (input, callback, options) { if (!isAsyncIterable(asyncIterable)) throw new TypeError('Expected asyncIterable to have a Symbol.asyncIterator function'); if (typeof callback !== 'function') throw new TypeError('Expected callback to be a function'); if (typeof bufferSize !== 'number') throw new TypeError('Expected bufferSize to be a number'); + if (externalSignal !== undefined && !(externalSignal instanceof AbortSignal)) throw new TypeError('Expected signal to be an AbortSignal'); /** @type {AsyncIterator} */ const asyncIterator = asyncIterable[Symbol.asyncIterator](); @@ -78,6 +80,27 @@ export function bufferedAsyncMap (input, callback, options) { // Internal controller; aborts on iterator close (return/throw/dispose/source-exhaustion-with-cleanup) so callbacks can fast-path on shutdown. const internalAC = new AbortController(); + /** @type {{ reason: unknown, delivered: boolean } | undefined} */ + let abortReason; + + if (externalSignal) { + if (externalSignal.aborted) { + abortReason = { reason: externalSignal.reason, delivered: false }; + internalAC.abort(externalSignal.reason); + } else { + externalSignal.addEventListener('abort', () => { + // If the iterator already closed via return()/throw()/dispose, abort is too late: no-op. + if (isDone) return; + if (!abortReason) { + abortReason = { reason: externalSignal.reason, delivered: false }; + } + if (!internalAC.signal.aborted) { + internalAC.abort(externalSignal.reason); + } + }, { once: true }); + } + } + /** * @param {boolean} [throwAnyError] * @returns {Promise>} @@ -115,7 +138,7 @@ export function bufferedAsyncMap (input, callback, options) { }; const fillQueue = () => { - if (capturedErrors.length > 0 || isDone) return; + if (capturedErrors.length > 0 || isDone || abortReason) return; /** @type {AsyncIterator|undefined} */ let currentSubIterator; @@ -225,15 +248,61 @@ export function bufferedAsyncMap (input, callback, options) { } }; + // Sentinel returned from raceAbort() so a pending nextValue() wakes up when abort fires + // even with no buffered promise to resolve it. + const ABORT_SENTINEL = Symbol('abort'); + + /** @returns {Promise} */ + const raceAbort = () => new Promise(resolve => { + if (internalAC.signal.aborted) { + resolve(ABORT_SENTINEL); + } else { + internalAC.signal.addEventListener('abort', () => resolve(ABORT_SENTINEL), { once: true }); + } + }); + + /** + * @returns {{ done: true, value: undefined } | undefined} + */ + const handleAbortIfPending = () => { + if (abortReason && !abortReason.delivered) { + abortReason.delivered = true; + throw abortReason.reason; + } + if (abortReason && abortReason.delivered) { + return { done: true, value: undefined }; + } + }; + /** @type {AsyncIterator["next"]} */ const nextValue = async () => { + { + const earlyAbort = handleAbortIfPending(); + if (earlyAbort) { + await markAsEnded(); + return earlyAbort; + } + } + const nextBufferedPromise = bufferedPromises[0]; if (!nextBufferedPromise) return markAsEnded(true); if (isDone) return { done: true, value: undefined }; + const raced = await Promise.race([ + ordered ? nextBufferedPromise : Promise.race(bufferedPromises), + raceAbort(), + ]); + + // Abort always wins over a buffered value that may have settled in the same tick. + if (raced === ABORT_SENTINEL || abortReason) { + const handled = handleAbortIfPending(); + await markAsEnded(); + return handled ?? { done: true, value: undefined }; + } + /** @type {Awaited} */ - const resolvedPromise = await (ordered ? nextBufferedPromise : Promise.race(bufferedPromises)); + const resolvedPromise = raced; arrayDeleteInPlace(bufferedPromises, resolvedPromise.bufferPromise); // Wait for some of the current promises to be finished @@ -278,7 +347,12 @@ export function bufferedAsyncMap (input, callback, options) { /** @type {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ const resultAsyncIterableIterator = { async next () { - currentStep = currentStep ? currentStep.then(() => nextValue()) : nextValue(); + // Chain via then(nextValue, nextValue) so a rejection on one .next() does + // not poison every subsequent call — the next call still reaches + // nextValue() which observes the post-rejection state machine. + currentStep = currentStep + ? currentStep.then(() => nextValue(), () => nextValue()) + : nextValue(); return currentStep; }, // TODO: Accept an argument, as in the spec. Look into what happens if one call return() multiple times + look into if the value provided to return is the one returned forever after diff --git a/test/abort.spec.js b/test/abort.spec.js new file mode 100644 index 0000000..197930e --- /dev/null +++ b/test/abort.spec.js @@ -0,0 +1,366 @@ +/* eslint-disable promise/prefer-await-to-then */ + +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + promisableTimeout, + yieldValuesOverTime, +} from './utils.js'; + +/** + * @param {number} delayBeforeFirstYield + * @returns {AsyncIterable} + */ +async function * slowSource (delayBeforeFirstYield) { + await promisableTimeout(delayBeforeFirstYield); + yield 0; + await promisableTimeout(delayBeforeFirstYield); + yield 1; + await promisableTimeout(delayBeforeFirstYield); + yield 2; +} + +chai.use(chaiAsPromised); +chai.use(sinonChai); +const should = chai.should(); + +describe('bufferedAsyncMap() options.signal', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + // --- Validation --- + + it('AC 4.1: throws TypeError when signal is not an AbortSignal', () => { + should.Throw(() => { + bufferedAsyncMap( + yieldValuesOverTime(1, 100), + async (item) => item, + // @ts-expect-error + { signal: 'not-a-signal' } + ); + }, TypeError, 'Expected signal to be an AbortSignal'); + }); + + it('AC 4.2: omitting signal behaves identically to commit 3 (no regression)', async () => { + /** @type {number[]} */ + const result = []; + const iterator = bufferedAsyncMap(yieldValuesOverTime(3, 100), async (item) => item); + + const flow = (async () => { + for await (const v of iterator) result.push(v); + })(); + + await clock.runAllAsync(); + await flow; + + result.should.have.members([0, 1, 2]); + }); + + // --- Pre-aborted signal --- + + it('AC 4.3 + 4.4 + 4.5: pre-aborted signal: source.next never called, first .next() rejects with reason, subsequent return done', async () => { + const reason = new Error('Pre-aborted'); + const ac = new AbortController(); + ac.abort(reason); + + const source = yieldValuesOverTime(6, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + + chai.expect(await first).to.deep.equal({ rejectedWith: reason }); + nextSpy.should.not.have.been.called; + + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('AC 4.6: return() after pre-abort resolves done without throwing', async () => { + const ac = new AbortController(); + ac.abort(new Error('Pre-aborted')); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(3, 100), + async (item) => item, + { signal: ac.signal } + ); + + const ret = iterator.return(); + await clock.runAllAsync(); + await ret.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + // --- Mid-iteration abort --- + + it('AC 4.7 + 4.10: parked .next() rejects with signal.reason (identity preserved)', async () => { + const reason = { custom: 'reason-object' }; + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + slowSource(1000), + async (item) => item, + { signal: ac.signal } + ); + + const parkedNext = iterator.next().catch(err => ({ rejectedWith: err })); + + // Defer the abort to fire while the .next() is parked on the slow source. + setTimeout(() => ac.abort(reason), 10); + + await clock.runAllAsync(); + + chai.expect(await parkedNext).to.deep.equal({ rejectedWith: reason }); + }); + + it('AC 4.8: fresh .next() after abort rejects with reason', async () => { + const reason = new Error('Aborted'); + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + }); + + it('AC 4.9: exactly one .next() rejects with reason; subsequent calls return done', async () => { + const reason = new Error('Once'); + const ac = new AbortController(); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const sequence = (async () => { + for (let i = 0; i < 3; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await sequence; + + chai.expect(results[0]).to.deep.equal({ rejected: true, value: reason }); + chai.expect(results[1]).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + chai.expect(results[2]).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + }); + + it('AC 4.11 + 4.12: source.next not called after abort; source.return called once', async () => { + const ac = new AbortController(); + + const source = yieldValuesOverTime(20, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal, bufferSize: 2 } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + const callsBeforeAbort = nextSpy.callCount; + + ac.abort(new Error('stop')); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + const r = await next; + should.exist(r); + + // Drain any further calls (should be none). + await iterator.next(); + await clock.runAllAsync(); + + nextSpy.callCount.should.equal(callsBeforeAbort); + returnSpy.should.have.been.calledOnce; + }); + + it('AC 4.13: in-flight callbacks observe signal.aborted=true after external abort', async () => { + const ac = new AbortController(); + /** @type {AbortSignal | undefined} */ + let captured; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item, { signal }) => { + captured = signal; + return item; + }, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + chai.expect(captured?.aborted).to.equal(false); + + ac.abort(new Error('stop')); + + const n = iterator.next().catch(err => err); + await clock.runAllAsync(); + await n; + + chai.expect(captured?.aborted).to.equal(true); + }); + + // --- Close races --- + + it('AC 4.17: return() before abort makes subsequent .next() return done without throwing', async () => { + const ac = new AbortController(); + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + const returned = iterator.return(); + await clock.runAllAsync(); + await returned; + + ac.abort(new Error('late')); + + const final = iterator.next(); + await clock.runAllAsync(); + await final.should.eventually.deep.equal({ done: true, value: undefined }); + }); + + it('AC 4.18: return() and abort fired together: cleanup runs once, no double-throw', async () => { + const ac = new AbortController(); + + // 200 items so the source is not naturally exhausted between consuming + // the first item and calling return() — that way markAsEnded actually + // has work to do on the source iterator. + const source = yieldValuesOverTime(200, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item, + { signal: ac.signal, bufferSize: 2 } + ); + + const first = iterator.next(); + // Advance only enough for the first item to land, leaving the source live. + await clock.tickAsync(0); + await first; + + const ret = iterator.return(); + ac.abort(new Error('parallel')); + + await clock.runAllAsync(); + await ret; + + returnSpy.should.have.been.calledOnce; + }); + + it('AC 4.19: throw(err) after abort delivered behaves like throw on a closed iterator', async () => { + const ac = new AbortController(); + const reason = new Error('aborted'); + const tossed = new Error('post-abort throw'); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const aborted = iterator.next().catch(err => err); + await clock.runAllAsync(); + await aborted; + + const tossedNext = iterator.throw(tossed).catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await tossedNext).to.deep.equal({ rejectedWith: tossed }); + }); + + // --- ordered:true coverage --- + + it('AC 4.15: abort works in ordered:true mode', async () => { + const ac = new AbortController(); + const reason = new Error('ordered-abort'); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(6, 100), + async (item) => item, + { signal: ac.signal, ordered: true } + ); + + const first = iterator.next(); + await clock.runAllAsync(); + await first; + + ac.abort(reason); + + const next = iterator.next().catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await next).to.deep.equal({ rejectedWith: reason }); + + const after = iterator.next(); + await clock.runAllAsync(); + await after.should.eventually.deep.equal({ done: true, value: undefined }); + }); +}); From a8da4aa85bafacd0581263797c15c2cc4e50b37f Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:21:42 +0000 Subject: [PATCH 05/10] feat: add errors: 'fail-fast' option MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds opts.errors: 'fail-eventually' | 'fail-fast' (default 'fail-eventually', preserving existing semantics). In 'fail-fast' mode the first error from the callback or the source short-circuits iteration: the next iterator.next() rejects with the original error (no AggregateError wrapping), subsequent iterator.next() calls return { done: true }, source.next() is never called again, source.return() is called once, and in-flight callbacks observe signal.aborted === true on the second-arg signal within one microtask. Implementation reuses commit 4's abort state machine: the captured error is routed through abortReason and internalAC.abort(err), so the "reject once, then done forever" contract is identical to external abort. Precedence rules (also tested): - fail-fast + external abort fired before any error → external reason wins. - fail-fast + callback error before any external abort → fail-fast wins. - fail-eventually + external abort fired with errors queued → external reason wins; AggregateError discarded. The default flip to 'fail-fast' and the proposed 'isolate' envelope mode are deferred to a future major release. --- index.js | 24 ++- test/errors-fail-fast.spec.js | 360 ++++++++++++++++++++++++++++++++++ 2 files changed, 382 insertions(+), 2 deletions(-) create mode 100644 test/errors-fail-fast.spec.js diff --git a/index.js b/index.js index b161af1..28e75f3 100644 --- a/index.js +++ b/index.js @@ -34,13 +34,14 @@ export async function * mergeIterables (input, { bufferSize } = {}) { * @template R * @param {AsyncIterable | Iterable | T[]} input * @param {(item: T, opts: { signal: AbortSignal }) => (Promise|AsyncIterable)} callback - * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined, signal?: AbortSignal|undefined }} [options] + * @param {{ bufferSize?: number|undefined, ordered?: boolean|undefined, signal?: AbortSignal|undefined, errors?: 'fail-eventually'|'fail-fast'|undefined }} [options] * @returns {AsyncIterableIterator & { return: NonNullable["return"]>, throw: NonNullable["throw"]>, [Symbol.asyncDispose]: () => Promise }} */ export function bufferedAsyncMap (input, callback, options) { /** @typedef {Promise> & { bufferPromise: BufferPromise, fromSubIterator?: boolean, isSubIterator?: boolean, err?: unknown }>} BufferPromise */ const { bufferSize = 6, + errors: errorsMode = 'fail-eventually', ordered = false, signal: externalSignal, } = options || {}; @@ -55,6 +56,7 @@ export function bufferedAsyncMap (input, callback, options) { if (typeof callback !== 'function') throw new TypeError('Expected callback to be a function'); if (typeof bufferSize !== 'number') throw new TypeError('Expected bufferSize to be a number'); if (externalSignal !== undefined && !(externalSignal instanceof AbortSignal)) throw new TypeError('Expected signal to be an AbortSignal'); + if (errorsMode !== 'fail-eventually' && errorsMode !== 'fail-fast') throw new TypeError("Expected errors to be 'fail-eventually' or 'fail-fast'"); /** @type {AsyncIterator} */ const asyncIterator = asyncIterable[Symbol.asyncIterator](); @@ -319,7 +321,25 @@ export function bufferedAsyncMap (input, callback, options) { return { done: true, value: undefined }; } else if (err || done) { if (err) { - capturedErrors.push(err instanceof Error ? err : new Error('Unknown error')); + const normalisedErr = err instanceof Error ? err : new Error('Unknown error'); + + // In fail-fast mode the first captured error short-circuits iteration: + // route it through the same abort machinery so the next .next() rejects + // with the original error and in-flight callbacks see signal.aborted=true. + if (errorsMode === 'fail-fast' && !abortReason) { + abortReason = { reason: normalisedErr, delivered: false }; + if (!internalAC.signal.aborted) { + internalAC.abort(normalisedErr); + } + await markAsEnded(); + if (abortReason && !abortReason.delivered) { + abortReason.delivered = true; + throw normalisedErr; + } + return { done: true, value: undefined }; + } + + capturedErrors.push(normalisedErr); } if (fromSubIterator || subIterators.length > 0) { diff --git a/test/errors-fail-fast.spec.js b/test/errors-fail-fast.spec.js new file mode 100644 index 0000000..c9ac6ec --- /dev/null +++ b/test/errors-fail-fast.spec.js @@ -0,0 +1,360 @@ +import chai from 'chai'; +import chaiAsPromised from 'chai-as-promised'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; + +import { + bufferedAsyncMap, +} from '../index.js'; +import { + promisableTimeout, + yieldValuesOverTime, +} from './utils.js'; + +chai.use(chaiAsPromised); +chai.use(sinonChai); +const should = chai.should(); + +/** + * @template T + * @param {T[]} items + * @returns {AsyncIterable} + */ +async function * fromArray (items) { + for (const item of items) { + yield item; + } +} + +/** + * @param {Error} expected + * @returns {AsyncIterable} + */ +async function * sourceThatThrows (expected) { + yield 0; + yield 1; + throw expected; +} + +describe('bufferedAsyncMap() errors: fail-fast', () => { + /** @type {import('sinon').SinonFakeTimers} */ + let clock; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + sinon.restore(); + }); + + // --- Validation & default --- + + it("AC 5.1: throws TypeError when errors is not 'fail-eventually'/'fail-fast'", () => { + should.Throw(() => { + bufferedAsyncMap( + fromArray([0, 1]), + async (item) => item, + // @ts-expect-error + { errors: 'isolate' } + ); + }, TypeError, "Expected errors to be 'fail-eventually' or 'fail-fast'"); + }); + + it("AC 5.2: omitting errors option (or 'fail-eventually') keeps current AggregateError behaviour", async () => { + const errA = new Error('A'); + const errB = new Error('B'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(errA) + .onCall(1).rejects(errB); + + /** @type {Error | undefined} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { bufferSize: 3 })) { + // drain + } + } catch (err) { + caught = /** @type {Error} */ (err); + } + + chai.expect(caught).to.be.instanceOf(AggregateError); + }); + + // --- fail-fast semantics --- + + it('AC 5.3: first callback error rejects next .next() with that error; subsequent calls return done', async () => { + const reason = new Error('cb-error'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(1).rejects(reason); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(20, 100), + callback, + { errors: 'fail-fast', bufferSize: 2 } + ); + + /** @type {Array<{ rejected: boolean, value?: unknown }>} */ + const results = []; + + const flow = (async () => { + for (let i = 0; i < 5; i += 1) { + try { + const r = await iterator.next(); + results.push({ rejected: false, value: r }); + } catch (err) { + results.push({ rejected: true, value: err }); + } + } + })(); + + await clock.runAllAsync(); + await flow; + + const firstReject = results.findIndex(r => r.rejected); + chai.expect(firstReject).to.be.greaterThan(-1); + chai.expect(results[firstReject]?.value).to.equal(reason); + for (const r of results.slice(firstReject + 1)) { + chai.expect(r).to.deep.equal({ rejected: false, value: { done: true, value: undefined } }); + } + }); + + it('AC 5.4: source error fails fast', async () => { + const sourceError = new Error('src-error'); + + const iterator = bufferedAsyncMap( + sourceThatThrows(sourceError), + async (item) => item, + { errors: 'fail-fast' } + ); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(sourceError); + }); + + it('AC 5.5 + 5.6: source.next not called after first error; source.return called once', async () => { + const reason = new Error('halt'); + const source = yieldValuesOverTime(50, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const nextSpy = sinon.spy(sourceIterator, 'next'); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(reason); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + callback, + { errors: 'fail-fast', bufferSize: 1 } + ); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(reason); + + const callsAfterFail = nextSpy.callCount; + + await iterator.next(); + await clock.runAllAsync(); + + nextSpy.callCount.should.equal(callsAfterFail); + returnSpy.should.have.been.calledOnce; + }); + + it('AC 5.7: in-flight callbacks observe signal.aborted=true after fail-fast error', async () => { + const reason = new Error('halt'); + /** @type {AbortSignal | undefined} */ + let captured; + + let callIdx = 0; + const callback = async ( + /** @type {number} */ item, + /** @type {{ signal: AbortSignal }} */ { signal } + ) => { + const i = callIdx++; + captured = signal; + if (i === 1) { + // Slow second callback so the first failure has time to trigger fail-fast + // before this one settles. + await promisableTimeout(500); + throw reason; + } + if (i === 2) { + await promisableTimeout(2000); + return item; + } + return item; + }; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(20, 50), + callback, + { errors: 'fail-fast', bufferSize: 4 } + ); + + /** @type {unknown} */ + let caught; + const flow = (async () => { + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of iterator) { + // drain + } + } catch (err) { + caught = err; + } + })(); + + await clock.runAllAsync(); + await flow; + chai.expect(caught).to.equal(reason); + chai.expect(captured?.aborted).to.equal(true); + }); + + it('AC 5.8: rejected error is the original (not AggregateError)', async () => { + const reason = new Error('original'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(reason); + + /** @type {unknown} */ + let caught; + try { + // eslint-disable-next-line no-unused-vars + for await (const _ of bufferedAsyncMap(fromArray([0, 1, 2]), callback, { errors: 'fail-fast' })) { + // drain + } + } catch (err) { + caught = err; + } + + await clock.runAllAsync(); + chai.expect(caught).to.equal(reason); + chai.expect(caught).to.not.be.instanceOf(AggregateError); + }); + + it('AC 5.9: fail-fast works in ordered:true mode', async () => { + const reason = new Error('ordered-fail'); + const callback = sinon.stub() + .returnsArg(0) + .onCall(1).rejects(reason); + + /** @type {unknown} */ + let caught; + const flow = (async () => { + try { + for await (const v of bufferedAsyncMap( + yieldValuesOverTime(10, 100), + callback, + { errors: 'fail-fast', ordered: true, bufferSize: 3 } + )) { + // drain — should yield 0 then fail on 1 + chai.expect(v).to.equal(0); + } + } catch (err) { + caught = err; + } + })(); + + await clock.runAllAsync(); + await flow; + chai.expect(caught).to.equal(reason); + }); + + // --- Interaction with abort --- + + it('AC 5.11: external abort wins over a fail-fast error not yet captured', async () => { + const externalReason = new Error('external'); + const ac = new AbortController(); + + let callIdx = 0; + const callback = async (/** @type {number} */ item) => { + const i = callIdx++; + if (i === 0) return item; + await promisableTimeout(2000); + throw new Error('would-have-failed'); + }; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(10, 50), + callback, + { signal: ac.signal, errors: 'fail-fast', bufferSize: 2 } + ); + + const first = iterator.next(); + await clock.tickAsync(60); + await first; + + ac.abort(externalReason); + + /** @type {unknown} */ + let caught; + try { + const n = iterator.next(); + await clock.runAllAsync(); + await n; + } catch (err) { + caught = err; + } + + chai.expect(caught).to.equal(externalReason); + }); + + it('AC 5.13: external abort wins over queued errors in fail-eventually mode', async () => { + const externalReason = new Error('external'); + const ac = new AbortController(); + + const callback = sinon.stub() + .returnsArg(0) + .onCall(0).rejects(new Error('cb-error-A')) + .onCall(1).rejects(new Error('cb-error-B')); + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(10, 100), + callback, + { signal: ac.signal, bufferSize: 3 } + ); + + // Let some items load and fail in the background; the errors get queued. + await clock.tickAsync(50); + ac.abort(externalReason); + + /** @type {unknown} */ + let caught; + try { + const n = iterator.next(); + await clock.runAllAsync(); + await n; + } catch (err) { + caught = err; + } + + chai.expect(caught).to.equal(externalReason); + chai.expect(caught).to.not.be.instanceOf(AggregateError); + }); +}); From cb565ebd56fb10e3c7b4521d6392bb2c00f2127c Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:22:35 +0000 Subject: [PATCH 06/10] test: un-skip and align iterator.throw() spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes the long-standing describe.skip and rewrites the spec on top of sinon useFakeTimers (matching return.spec.js), asserting that: - iterator.throw(err) rejects with err and the next iterator.next() returns { done: true, value: undefined }. - The source iterator's .return() is called exactly once via the shared markAsEnded() cleanup path. - In-flight callbacks observe signal.aborted === true on the second-arg signal within one microtask of throw() — confirming the throw path reuses the same abort propagation as return()/dispose/external abort. No production-code change. --- test/throw.spec.js | 99 +++++++++++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 36 deletions(-) diff --git a/test/throw.spec.js b/test/throw.spec.js index 52fc2e8..5d00544 100644 --- a/test/throw.spec.js +++ b/test/throw.spec.js @@ -1,3 +1,5 @@ +/* eslint-disable promise/prefer-await-to-then */ + import chai from 'chai'; import chaiAsPromised from 'chai-as-promised'; import chaiQuantifiers from 'chai-quantifiers'; @@ -17,60 +19,85 @@ chai.use(sinonChai); chai.should(); -describe.skip('bufferedAsyncMap() AsyncInterface throw()', () => { +describe('bufferedAsyncMap() AsyncInterface throw()', () => { const count = 6; + /** @type {import('sinon').SinonFakeTimers} */ + let clock; /** @type {AsyncIterable} */ let baseAsyncIterable; - /** @type {number[]} */ - let expectedResult; beforeEach(() => { + clock = sinon.useFakeTimers(); baseAsyncIterable = yieldValuesOverTime(count, (i) => i % 2 === 1 ? 2000 : 100); + }); - expectedResult = []; - for (let i = 0; i < count; i++) { - expectedResult.push(i); - } + afterEach(() => { + sinon.restore(); }); - it('should end the iterator when called', async () => { - const errorToThrow = new Error('Yet another error'); + it('AC 6.1 + 6.2: throw(err) rejects with err and subsequent .next() returns done', async () => { + const errorToThrow = new Error('thrown'); const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - await iterator.next().should.eventually.deep.equal({ value: 0 }); - await iterator.throw(errorToThrow).should.eventually.be.rejectedWith(errorToThrow); - await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + const first = iterator.next(); + await clock.runAllAsync(); + await first.should.eventually.deep.equal({ value: 0 }); + + const tossed = iterator.throw(errorToThrow).catch(err => ({ rejectedWith: err })); + await clock.runAllAsync(); + chai.expect(await tossed).to.deep.equal({ rejectedWith: errorToThrow }); + + const after = iterator.next(); + await clock.runAllAsync(); + await after.should.eventually.deep.equal({ done: true, value: undefined }); }); - it('should be called when a loop throws', async () => { - const iterator = bufferedAsyncMap(baseAsyncIterable, async (item) => item); - const returnSpy = sinon.spy(iterator, 'return'); - const throwSpy = sinon.spy(iterator, 'throw'); - const errorToThrow = new Error('Yet another error'); - - let caught; - - // Inspired by https://github.com/WebKit/WebKit/blob/1a09d8d95ba6085df4ef44306c4bfc9fc86fdbc7/JSTests/test262/test/language/expressions/yield/star-rhs-iter-thrw-thrw-get-err.js - async function * g () { - try { - yield * iterator; - } catch (err) { - caught = err; - throw err; + it('AC 6.2: throw(err) calls source.return() once', async () => { + const errorToThrow = new Error('thrown'); + const source = yieldValuesOverTime(50, 100); + const sourceIterator = source[Symbol.asyncIterator](); + const returnSpy = sinon.spy(sourceIterator, 'return'); + + const iterator = bufferedAsyncMap( + { [Symbol.asyncIterator]: () => sourceIterator }, + async (item) => item + ); + + const first = iterator.next(); + await clock.tickAsync(0); + await first; + + const tossed = iterator.throw(errorToThrow).catch(err => err); + await clock.runAllAsync(); + await tossed; + + returnSpy.should.have.been.calledOnce; + }); + + it('AC 6.2: in-flight callbacks observe signal.aborted=true after throw()', async () => { + const errorToThrow = new Error('thrown'); + /** @type {AbortSignal | undefined} */ + let captured; + + const iterator = bufferedAsyncMap( + yieldValuesOverTime(50, 100), + async (item, { signal }) => { + captured = signal; + return item; } - } + ); - const wrappedIterator = g(); + const first = iterator.next(); + await clock.tickAsync(0); + await first; + chai.expect(captured?.aborted).to.equal(false); - await wrappedIterator.next().should.eventually.deep.equal({ done: false, value: 0 }); - await wrappedIterator.throw(errorToThrow).should.eventually.be.rejectedWith(errorToThrow); - await wrappedIterator.next().should.eventually.deep.equal({ done: true, value: undefined }); - await iterator.next().should.eventually.deep.equal({ done: true, value: undefined }); + const tossed = iterator.throw(errorToThrow).catch(err => err); + await clock.runAllAsync(); + await tossed; - (caught || {}).should.equal(errorToThrow); - throwSpy.should.have.been.calledOnceWithExactly(errorToThrow); - returnSpy.should.not.have.been.called; + chai.expect(captured?.aborted).to.equal(true); }); }); From 26b32b3600d7c5430016c75210e2b29859d8c082 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:23:22 +0000 Subject: [PATCH 07/10] docs: cancellation, error contract, and async resource management MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documents the three new public surfaces: - options.signal: AbortSignal — with a runnable AbortController + setTimeout example, an explicit "cancels consumption, not in-flight work" caveat, and guidance to forward the per-callback signal into fetch/undici. - options.errors: 'fail-eventually' | 'fail-fast' — explains the AggregateError shape of the default mode, the Promise.all-style semantics of fail-fast, and the precedence rule that external abort wins over queued/captured errors. - Symbol.asyncDispose — covers `await using` usage, idempotency, and the Node 22+ requirement. Updates the bufferedAsyncMap signature/options sections to surface the new fields and the widened (item, { signal }) callback shape. --- README.md | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index b9d29e7..4539667 100644 --- a/README.md +++ b/README.md @@ -75,17 +75,95 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie #### Syntax -`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator` +`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false, signal, errors='fail-eventually' }]) => AsyncIterableIterator` #### Arguments * `input` – either an async iterable, an ordinare iterable or an array -* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer) +* `callback(item, { signal })` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer). The second argument is an `{ signal: AbortSignal }` that aborts on cancellation — see [Cancellation](#cancellation). #### Options * `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer. -* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered +* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered. +* `signal` – _optional_ – an `AbortSignal`. When aborted, iteration stops pulling from the source, the next pending or freshly-called `iterator.next()` rejects with `signal.reason` exactly once, and all subsequent calls return `{ done: true, value: undefined }`. See [Cancellation](#cancellation). +* `errors` – _optional_ – defaults to `'fail-eventually'`. Controls how errors from the callback or the source surface to the consumer. See [Errors](#errors). + +The returned iterator also implements `Symbol.asyncDispose`, so it can be used with `await using` for deterministic cleanup. See [Resource management](#resource-management). + +## Cancellation + +Pass an `AbortSignal` and abort it whenever you want to stop iteration: + +```javascript +import { bufferedAsyncMap } from 'buffered-async-iterable'; + +const ac = new AbortController(); +setTimeout(() => ac.abort(new Error('took too long')), 5000); + +try { + for await (const item of bufferedAsyncMap(source, async (item) => { + return await fetchItem(item); + }, { signal: ac.signal })) { + console.log(item); + } +} catch (err) { + // err === ac.signal.reason +} +``` + +Aborting cancels *consumption* of the source. In-flight callbacks continue running until they settle. To cancel network/IO inside your callback, forward the per-callback `signal` (the second argument) into `fetch`/`undici`/etc: + +```javascript +bufferedAsyncMap(source, async (item, { signal }) => { + const res = await fetch(`/items/${item}`, { signal }); + return res.json(); +}, { signal: ac.signal }); +``` + +The per-callback `signal` is always present (even when no `options.signal` is passed) and aborts on iterator close (return / throw / dispose / source-exhaustion-with-cleanup), so callbacks can fast-path on shutdown. + +If `options.signal` is already aborted at construction time, the source is never read and the first `iterator.next()` rejects with `signal.reason`. External abort always wins over queued errors. + +## Errors + +There are two error modes: + +### `'fail-eventually'` (default) + +Iteration continues after errors. Captured errors are thrown when the iterator drains: + +* If exactly one error was captured, it is thrown directly (identity preserved). +* If two or more errors were captured, they are wrapped in an [`AggregateError`](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/AggregateError) (in capture order). + +In-flight callbacks may still complete in the background after an error is captured. Wrap your callback in `try/catch` if you need per-item isolation. + +### `'fail-fast'` + +Mirrors `Promise.all` semantics: the first error from the callback or the source short-circuits iteration. The next `iterator.next()` rejects with the original error (no `AggregateError` wrapping); subsequent calls return `{ done: true }`. The source's `.next()` is not called again, the source's `.return()` is called once, and in-flight callbacks observe `signal.aborted === true` on their per-callback signal within one microtask. + +```javascript +for await (const item of bufferedAsyncMap(source, fn, { errors: 'fail-fast' })) { + // first thrown error halts iteration immediately +} +``` + +External abort always takes precedence over either error mode: if `options.signal` aborts while errors are queued, the consumer sees `signal.reason`, not the captured errors. + +## Resource management + +The returned iterator implements `Symbol.asyncDispose`, so it can be used with [`await using`](https://github.com/tc39/proposal-explicit-resource-management) for deterministic cleanup: + +```javascript +{ + await using iterator = bufferedAsyncMap(source, fn); + for await (const item of iterator) { + if (shouldStop(item)) break; + } +} // source.return() runs here, regardless of how the block exited +``` + +`Symbol.asyncDispose` is aliased to `iterator.return()` and is idempotent. Native `await using` requires Node 22+ (or a transpiler). ### mergeIterables() From eecb036ec553e4679f0229285aef6ac148d4e225 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:24:01 +0000 Subject: [PATCH 08/10] chore: remove obsolete AbortController TODOs Both TODO comments (one calling out hwp's AbortController pattern as inspiration, one wondering if an AbortController could improve markAsEnded cleanup) are now resolved by the per-callback signal and external-signal commits. --- index.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/index.js b/index.js index 28e75f3..a27a2a5 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,5 @@ /* eslint-disable promise/prefer-await-to-then */ -// TODO: Get inspired by Matteos https://github.com/mcollina/hwp/blob/main/index.js, eg AbortController is nice? // TODO: Check docs here https://tc39.es/ecma262/#sec-operations-on-iterator-objects // TODO: Look into https://tc39.es/ecma262/#sec-iteratorclose / https://tc39.es/ecma262/#sec-asynciteratorclose // TODO: See "iteratorKind" in https://tc39.es/ecma262/#sec-runtime-semantics-forin-div-ofbodyevaluation-lhs-stmt-iterator-lhskind-labelset – see how it loops and validates the returned values @@ -125,7 +124,6 @@ export function bufferedAsyncMap (input, callback, options) { .map(item => item.return && item.return()) ); - // TODO: Could we use an AbortController to improve this? See eg. https://github.com/mcollina/hwp/pull/10 bufferedPromises.splice(0); subIterators.splice(0); From dd3ccdeb3299c26c7bbde7192feebb4d0a8d0901 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 15:26:59 +0000 Subject: [PATCH 09/10] chore: drop redundant @types/chai-quantifiers devDep chai-quantifiers ships its own type declarations (its package.json sets "types": "src/index.d.ts"), so the separate @types/chai-quantifiers package is unused. knip has been flagging this; removing it lets the pre-push check chain pass cleanly. --- package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/package.json b/package.json index 0836763..a260222 100644 --- a/package.json +++ b/package.json @@ -49,7 +49,6 @@ "devDependencies": { "@types/chai": "^4.3.19", "@types/chai-as-promised": "^7.1.8", - "@types/chai-quantifiers": "^1.0.4", "@types/mocha": "^10.0.8", "@types/node": "^22.0.0", "@types/sinon": "^17.0.3", From 11fbb414eb4981539545f7fcd476ca91e25c978c Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 8 May 2026 18:36:54 +0000 Subject: [PATCH 10/10] docs: add CLAUDE.md with build commands and architecture notes Captures the JSDoc-as-source convention, the npm test pre-push gate, the bufferedAsyncMap state machine (internalAC, abortReason, capturedErrors, fillQueue/nextValue split, markAsEnded as single cleanup path), the public-API contracts worth preserving, and the IIFE + clock.runAllAsync test pattern future contributors need to avoid fake-timer deadlocks. --- CLAUDE.md | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..1eaf5ee --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,74 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Build, test, lint + +The project is pure ESM JavaScript with **types written in JSDoc** (no TypeScript source files). `.d.ts` declarations are emitted from JSDoc by `tsc -p declaration.tsconfig.json` at publish time only — do not commit them. + +- `npm test` — full check chain (lint, tsc, knip, type-coverage, installed-check) followed by mocha + c8 coverage. This is what the pre-push husky hook runs; if it fails, fix the cause rather than `--no-verify`. +- `npm run check` — only the static checks (lint + tsc + knip + type-coverage + installed-check), no tests. +- `npx mocha test/.spec.js` — run a single spec file. +- `npx mocha test/.spec.js -g ""` — filter to specific `it()` blocks within a file. +- `npm run build` — clean and emit `.d.ts` declarations. + +Type-coverage is enforced at **≥95% strict** (excluding `test/*.spec.js`). Lint is `@voxpelli/eslint-config` (neostandard). Knip's "unused devDependency" findings are treated as errors by `npm test`. + +Commits must follow Conventional Commits (validated by the `commit-msg` husky hook via `validate-conventional-commit`); `release-please` cuts releases automatically from `main`, so `feat:` bumps minor and `fix:` bumps patch. + +Engines: Node ≥22.0.0 (the well-known `Symbol.asyncDispose` is required natively). The CI matrix in `.github/workflows/nodejs.yml` should match. + +## Architecture + +The library is one core function (`bufferedAsyncMap`) plus a thin wrapper (`mergeIterables`). Everything lives in `index.js`; `lib/` contains three small helpers worth knowing about. + +### `bufferedAsyncMap(input, callback, options)` — the state machine + +The function returns a stateful `AsyncIterableIterator` with these closure variables forming the state machine: + +- **`bufferedPromises[]`** — in-flight promises (size capped at `bufferSize`). Each is the `callback(item, {signal})` result wrapped to never reject (errors are caught into `{err}` envelopes). +- **`subIterators[]`** — stack of nested iterators spawned when `callback` returns an `AsyncIterable` (async-generator callbacks). +- **`promisesToSourceIteratorMap`** — WeakMap tracking which iterator produced each buffer slot; consulted by `findLeastTargeted` (`lib/find-least-targeted.js`) for load-balancing. +- **`internalAC`** — an `AbortController` minted per call. Its signal is **always** the second arg to `callback`, regardless of whether the consumer passed `options.signal`. It fires from `markAsEnded()` on iterator close, from `options.signal` aborting (linked via `addEventListener('abort', …)`), and from the first error in `errors: 'fail-fast'` mode. This is what lets in-flight callbacks fast-path on shutdown. +- **`abortReason: { reason, delivered: boolean } | undefined`** — drives the "reject the next `.next()` once with `signal.reason`, then `done:true` forever" contract. Set by external abort, pre-aborted signal, or first fail-fast error. +- **`capturedErrors[]`** — accumulates errors in `'fail-eventually'` mode; on drain, throws the single error directly (identity-preserved) or wraps in `AggregateError` for ≥2. +- **`isDone`** — set once by `markAsEnded()` to make all close paths idempotent. + +### Two pull/dispatch loops + +`fillQueue()` is the **producer**: pulls from source up to `bufferSize`, dispatches via `callback(item, {signal})`, pushes the wrapped promise into `bufferedPromises`. In `ordered: true` mode it always feeds from `subIterators[0]`; in `ordered: false` it picks the least-targeted iterator via `findLeastTargeted` to prevent starvation. + +`nextValue()` is the **consumer**: races `bufferedPromises[0]` (ordered) or `Promise.race(bufferedPromises)` (unordered) against an abort sentinel from `internalAC.signal`. Abort always wins over a buffered value resolving in the same tick — the post-race code re-checks `abortReason` regardless of which promise won the race. + +`markAsEnded()` is the **single cleanup path**: sets `isDone`, fires `internalAC.abort()`, calls `Promise.allSettled(...iterators.map(it => it.return()))`, clears buffers. Called from `return()`, `throw()`, `Symbol.asyncDispose`, source-exhaustion, and abort delivery. Idempotent via the `isDone` guard. + +### Iterator chaining via `currentStep` + +`next()` chains each call's promise via `currentStep.then(nextValue, nextValue)` (both fulfilled and rejected handlers are `nextValue`) so that one rejection doesn't poison every subsequent call — the next call still re-enters `nextValue`, which then observes the post-rejection state machine (most often returning `{done:true}`). + +### Lib helpers (reuse these, don't reimplement) + +- `lib/find-least-targeted.js` — load-balancing: given a list of iterators and the current buffer, picks the iterator with fewest in-flight slots. +- `lib/misc.js` — `makeIterableAsync(input)` (sync iterable → async iterable) and `arrayDeleteInPlace(list, value)` (in-place splice by value). +- `lib/type-checks.js` — `isAsyncIterable`, `isIterable`, `isPartOfArray` guards. + +### Public-API contracts worth preserving + +- Callback receives `(item, { signal })` where `signal` is **always present** (the internal one) even when no `options.signal` is provided. +- Aborts cancel **consumption**, not in-flight callback work. Promises cannot be cancelled — the library propagates the signal so user code can voluntarily exit; it does not race-and-discard. The README documents this explicitly. +- `errors: 'fail-eventually'` (default) keeps the historical "drain then throw" semantics; `'fail-fast'` mirrors `Promise.all`. External abort always wins over queued/captured errors. +- Existing one-arg callbacks (`async (item) => …`) keep working — JS ignores extra args, so the second-arg widening is non-breaking. + +## Test conventions + +Mocha + chai + sinon. Tests use `sinon.useFakeTimers()` plus `clock.runAllAsync()` / `clock.tickAsync(ms)` for deterministic timing. The standard pattern for an async flow that needs the clock to advance is: + +```js +const flow = (async () => { for await (...) { ... } })(); +await clock.runAllAsync(); +await flow; +``` + +Inline `for await` blocks **without** the IIFE wrapper will deadlock under fake timers when the source uses real `setTimeout`. Test helpers in `test/utils.js` (`yieldValuesOverTime`, `nestedYieldValuesOverTime`, `promisableTimeout`) are the source of truth — reuse them. + +For testing rejections, prefer the `.catch(err => ({ rejectedWith: err }))` envelope pattern (used across `test/abort.spec.js` and `test/errors-fail-fast.spec.js`) over chai-as-promised's `should.be.rejectedWith` when asserting identity-equality on non-Error reasons.