Skip to content
Open
74 changes: 74 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Build, test, lint

The project is pure ESM JavaScript with **types written in JSDoc** (no TypeScript source files). `.d.ts` declarations are emitted from JSDoc by `tsc -p declaration.tsconfig.json` at publish time only — do not commit them.

- `npm test` — full check chain (lint, tsc, knip, type-coverage, installed-check) followed by mocha + c8 coverage. This is what the pre-push husky hook runs; if it fails, fix the cause rather than `--no-verify`.
- `npm run check` — only the static checks (lint + tsc + knip + type-coverage + installed-check), no tests.
- `npx mocha test/<name>.spec.js` — run a single spec file.
- `npx mocha test/<name>.spec.js -g "<pattern>"` — filter to specific `it()` blocks within a file.
- `npm run build` — clean and emit `.d.ts` declarations.

Type-coverage is enforced at **≥95% strict** (excluding `test/*.spec.js`). Lint is `@voxpelli/eslint-config` (neostandard). Knip's "unused devDependency" findings are treated as errors by `npm test`.

Commits must follow Conventional Commits (validated by the `commit-msg` husky hook via `validate-conventional-commit`); `release-please` cuts releases automatically from `main`, so `feat:` bumps minor and `fix:` bumps patch.

Engines: Node ≥22.0.0 (the well-known `Symbol.asyncDispose` is required natively). The CI matrix in `.github/workflows/nodejs.yml` should match.

## Architecture

The library is one core function (`bufferedAsyncMap`) plus a thin wrapper (`mergeIterables`). Everything lives in `index.js`; `lib/` contains three small helpers worth knowing about.

### `bufferedAsyncMap(input, callback, options)` — the state machine

The function returns a stateful `AsyncIterableIterator` with these closure variables forming the state machine:

- **`bufferedPromises[]`** — in-flight promises (size capped at `bufferSize`). Each is the `callback(item, {signal})` result wrapped to never reject (errors are caught into `{err}` envelopes).
- **`subIterators[]`** — stack of nested iterators spawned when `callback` returns an `AsyncIterable<R>` (async-generator callbacks).
- **`promisesToSourceIteratorMap`** — WeakMap tracking which iterator produced each buffer slot; consulted by `findLeastTargeted` (`lib/find-least-targeted.js`) for load-balancing.
- **`internalAC`** — an `AbortController` minted per call. Its signal is **always** the second arg to `callback`, regardless of whether the consumer passed `options.signal`. It fires from `markAsEnded()` on iterator close, from `options.signal` aborting (linked via `addEventListener('abort', …)`), and from the first error in `errors: 'fail-fast'` mode. This is what lets in-flight callbacks fast-path on shutdown.
- **`abortReason: { reason, delivered: boolean } | undefined`** — drives the "reject the next `.next()` once with `signal.reason`, then `done:true` forever" contract. Set by external abort, pre-aborted signal, or first fail-fast error.
- **`capturedErrors[]`** — accumulates errors in `'fail-eventually'` mode; on drain, throws the single error directly (identity-preserved) or wraps in `AggregateError` for ≥2.
- **`isDone`** — set once by `markAsEnded()` to make all close paths idempotent.

### Two pull/dispatch loops

`fillQueue()` is the **producer**: pulls from source up to `bufferSize`, dispatches via `callback(item, {signal})`, pushes the wrapped promise into `bufferedPromises`. In `ordered: true` mode it always feeds from `subIterators[0]`; in `ordered: false` it picks the least-targeted iterator via `findLeastTargeted` to prevent starvation.

`nextValue()` is the **consumer**: races `bufferedPromises[0]` (ordered) or `Promise.race(bufferedPromises)` (unordered) against an abort sentinel from `internalAC.signal`. Abort always wins over a buffered value resolving in the same tick — the post-race code re-checks `abortReason` regardless of which promise won the race.

`markAsEnded()` is the **single cleanup path**: sets `isDone`, fires `internalAC.abort()`, calls `Promise.allSettled(...iterators.map(it => it.return()))`, clears buffers. Called from `return()`, `throw()`, `Symbol.asyncDispose`, source-exhaustion, and abort delivery. Idempotent via the `isDone` guard.

### Iterator chaining via `currentStep`

`next()` chains each call's promise via `currentStep.then(nextValue, nextValue)` (both fulfilled and rejected handlers are `nextValue`) so that one rejection doesn't poison every subsequent call — the next call still re-enters `nextValue`, which then observes the post-rejection state machine (most often returning `{done:true}`).

### Lib helpers (reuse these, don't reimplement)

- `lib/find-least-targeted.js` — load-balancing: given a list of iterators and the current buffer, picks the iterator with fewest in-flight slots.
- `lib/misc.js` — `makeIterableAsync(input)` (sync iterable → async iterable) and `arrayDeleteInPlace(list, value)` (in-place splice by value).
- `lib/type-checks.js` — `isAsyncIterable`, `isIterable`, `isPartOfArray` guards.

### Public-API contracts worth preserving

- Callback receives `(item, { signal })` where `signal` is **always present** (the internal one) even when no `options.signal` is provided.
- Aborts cancel **consumption**, not in-flight callback work. Promises cannot be cancelled — the library propagates the signal so user code can voluntarily exit; it does not race-and-discard. The README documents this explicitly.
- `errors: 'fail-eventually'` (default) keeps the historical "drain then throw" semantics; `'fail-fast'` mirrors `Promise.all`. External abort always wins over queued/captured errors.
- Existing one-arg callbacks (`async (item) => …`) keep working — JS ignores extra args, so the second-arg widening is non-breaking.

## Test conventions

Mocha + chai + sinon. Tests use `sinon.useFakeTimers()` plus `clock.runAllAsync()` / `clock.tickAsync(ms)` for deterministic timing. The standard pattern for an async flow that needs the clock to advance is:

```js
const flow = (async () => { for await (...) { ... } })();
await clock.runAllAsync();
await flow;
```

Inline `for await` blocks **without** the IIFE wrapper will deadlock under fake timers when the source uses real `setTimeout`. Test helpers in `test/utils.js` (`yieldValuesOverTime`, `nestedYieldValuesOverTime`, `promisableTimeout`) are the source of truth — reuse them.

For testing rejections, prefer the `.catch(err => ({ rejectedWith: err }))` envelope pattern (used across `test/abort.spec.js` and `test/errors-fail-fast.spec.js`) over chai-as-promised's `should.be.rejectedWith` when asserting identity-equality on non-Error reasons.
84 changes: 81 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,95 @@ Iterates and applies the `callback` to up to `bufferSize` items from `input` yie

#### Syntax

`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false }]) => AsyncIterableIterator`
`bufferedAsyncMap(input, callback[, { bufferSize=6, ordered=false, signal, errors='fail-eventually' }]) => AsyncIterableIterator`

#### Arguments

* `input` – either an async iterable, an ordinare iterable or an array
* `callback(item)` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer)
* `callback(item, { signal })` – should be either an async generator or an ordinary async function. Items from async generators are buffered in the main buffer and the buffer is refilled by the one that has least items in the current buffer (`input` is considered equal to sub iterators in this regard when refilling the buffer). The second argument is an `{ signal: AbortSignal }` that aborts on cancellation — see [Cancellation](#cancellation).

#### Options

* `bufferSize` – _optional_ – defaults to `6`, sets the max amount of simultanoeus items that processed at once in the buffer.
* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered
* `ordered` – _optional_ – defaults to `false`, when `true` the result will be returned in order instead of unordered.
* `signal` – _optional_ – an `AbortSignal`. When aborted, iteration stops pulling from the source, the next pending or freshly-called `iterator.next()` rejects with `signal.reason` exactly once, and all subsequent calls return `{ done: true, value: undefined }`. See [Cancellation](#cancellation).
* `errors` – _optional_ – defaults to `'fail-eventually'`. Controls how errors from the callback or the source surface to the consumer. See [Errors](#errors).

The returned iterator also implements `Symbol.asyncDispose`, so it can be used with `await using` for deterministic cleanup. See [Resource management](#resource-management).

## Cancellation

Pass an `AbortSignal` and abort it whenever you want to stop iteration:

```javascript
import { bufferedAsyncMap } from 'buffered-async-iterable';

const ac = new AbortController();
setTimeout(() => ac.abort(new Error('took too long')), 5000);

try {
for await (const item of bufferedAsyncMap(source, async (item) => {
return await fetchItem(item);
}, { signal: ac.signal })) {
console.log(item);
}
} catch (err) {
// err === ac.signal.reason
}
```

Aborting cancels *consumption* of the source. In-flight callbacks continue running until they settle. To cancel network/IO inside your callback, forward the per-callback `signal` (the second argument) into `fetch`/`undici`/etc:

```javascript
bufferedAsyncMap(source, async (item, { signal }) => {
const res = await fetch(`/items/${item}`, { signal });
return res.json();
}, { signal: ac.signal });
```

The per-callback `signal` is always present (even when no `options.signal` is passed) and aborts on iterator close (return / throw / dispose / source-exhaustion-with-cleanup), so callbacks can fast-path on shutdown.

If `options.signal` is already aborted at construction time, the source is never read and the first `iterator.next()` rejects with `signal.reason`. External abort always wins over queued errors.

## Errors

There are two error modes:

### `'fail-eventually'` (default)

Iteration continues after errors. Captured errors are thrown when the iterator drains:

* If exactly one error was captured, it is thrown directly (identity preserved).
* If two or more errors were captured, they are wrapped in an [`AggregateError`](https://developer.mozilla.org/docs/Web/JavaScript/Reference/Global_Objects/AggregateError) (in capture order).

In-flight callbacks may still complete in the background after an error is captured. Wrap your callback in `try/catch` if you need per-item isolation.

### `'fail-fast'`

Mirrors `Promise.all` semantics: the first error from the callback or the source short-circuits iteration. The next `iterator.next()` rejects with the original error (no `AggregateError` wrapping); subsequent calls return `{ done: true }`. The source's `.next()` is not called again, the source's `.return()` is called once, and in-flight callbacks observe `signal.aborted === true` on their per-callback signal within one microtask.

```javascript
for await (const item of bufferedAsyncMap(source, fn, { errors: 'fail-fast' })) {
// first thrown error halts iteration immediately
}
```

External abort always takes precedence over either error mode: if `options.signal` aborts while errors are queued, the consumer sees `signal.reason`, not the captured errors.

## Resource management

The returned iterator implements `Symbol.asyncDispose`, so it can be used with [`await using`](https://github.com/tc39/proposal-explicit-resource-management) for deterministic cleanup:

```javascript
{
await using iterator = bufferedAsyncMap(source, fn);
for await (const item of iterator) {
if (shouldStop(item)) break;
}
} // source.return() runs here, regardless of how the block exited
```

`Symbol.asyncDispose` is aliased to `iterator.return()` and is idempotent. Native `await using` requires Node 22+ (or a transpiler).

### mergeIterables()

Expand Down
Loading
Loading