Skip to content

sgmonda/streamfu

header

streamfu

Streams should feel like arrays. Now they do.

JSR Score JSR Version npm version License: MIT 100% Coverage Bundle size Try it live

One streaming paradigm for Node, Deno, Bun, Cloudflare Workers and the browser — built on Web Streams.

Website · Playground · Getting Started · API · Why streamfu? · Contributing


One paradigm, every runtime

streamfu is built on the Web Streams API — the streaming primitive that ships natively in Node, Deno, Bun, Cloudflare Workers and every modern browser. So the same pipeline runs everywhere, unchanged:

import { map, ndjson, pipe } from "@sgmonda/streamfu"

// identical code in Node, Deno, Bun, a Worker, or the browser
const events = pipe(response.body!, ndjson, map(normalize))

No per-runtime imports, no polyfills, no node:stream lock-in. If your code reads from fetch().body, a file, a socket or a Blob, it reads the same way on the server and on the edge. That portability — standard primitive, one codebase, five runtimes — is what sets streamfu apart from node:stream (Node-only) and from generic async-iterable toolkits (which don't speak the platform's stream model). And because the API mirrors Array.prototype, it stays readable while it travels.


What streamfu is — and isn't

streamfu is functional, array-like utilities for the Web Streams API, plus a small set of time-based operators for real-world data pipelines. The mental model is Array.prototype extended to streams: map, filter, reduce, pipe, concat, zip, branch, slice. The temporal additions (delay, throttle, debounce, bufferTime — available since v0.9.0) cover the cases where pure array-thinking isn't enough: SSE batching, debounced input streams, rate-limited fetch pipelines.

streamfu is not a reactive programming library. There are no Subjects, no schedulers, no hot/cold stream model, no multicast / combineLatest / withLatestFrom. If you need those, you need RxJS — and streamfu won't try to replace it.

Use streamfu when you're building data pipelines that flow chunks through transformations: NDJSON / CSV / JSONL ingest, fetch() streaming, database cursors, file processing, log aggregation, S3/R2 transform-and-upload — and you want that pipeline to run unchanged across runtimes (see above).

Use RxJS (or similar) when you're orchestrating events: UI state, multicast streams shared across subscribers, complex hot/cold semantics, schedulers, or anything where the producers are events rather than data chunks.


When NOT to use streamfu

streamfu is a focused tool, not a default. Reach for something else when:

  • Your data already fits in memory. If you can hold the whole dataset in an array, Array.prototype (.map, .filter, .reduce) is simpler, faster, and needs no dependency. Streams earn their keep with data that is large, infinite, or arrives over time — not with a 1,000-row array you already have in hand.
  • You're orchestrating events, not data. UI state, multicast to many subscribers, hot/cold semantics, schedulers, combineLatest / withLatestFrom — that's reactive programming. Use RxJS; streamfu deliberately has none of it (see What streamfu is — and isn't).
  • You target Node only and already lean on node:stream. If your code will never leave Node and you're comfortable with stream.pipeline, Readable.from, and object-mode streams, the native module is one less dependency. streamfu's edge is the same code across runtimes — if you don't need that, you may not need streamfu.
  • It's a single trivial transform. A one-off await response.text() then JSON.parse, or a lone .map, doesn't need a pipeline. streamfu shines when you're composing several steps over a flowing source.

If none of these fit — you're composing transformations over large, streaming, or cross-runtime data — streamfu is built for exactly that.


How streamfu compares

The honest question isn't "streamfu vs writing readers by hand" — it's "why not the tool I already know?". Here's where each fits:

Criterion streamfu streaming-iterables RxJS
Core primitive Web Streams (ReadableStream) Async iterables Observable
Paradigm Functional pipelines over data Functional utils over async iterables Reactive event streams
Best for Cross-runtime data pipelines Async-iterable pipelines Event orchestration, UI, multicast
Backpressure Native (the platform's stream model) Partial (Node-stream interop) Manual (operators / schedulers)
Runtime model Same code on Node/Deno/Bun/Workers/browser Any async-iterable runtime; stream interop is Node-oriented All JS runtimes
Dependencies Zero Zero Zero, but a larger tree-shakeable core
Learning curve Know Array.prototype? You're done Functional utils Steep — Observables, Subjects, schedulers

Pick streamfu when you're transforming flowing data (NDJSON, CSV, fetch() bodies, file or DB cursors) and want the same code to run everywhere on the platform's native stream type. Pick streaming-iterables if you'd rather model everything as async iterables than as Web Streams. Pick RxJS when your producers are events and you need multicast, hot/cold, or schedulers — see When NOT to use streamfu.

Bundle size. Zero dependencies; the full barrel import is ~2.8 KB minzipped. The package ships per-operator subpath exports (@sgmonda/streamfu/map, .../filter, …) and is marked side-effect-free, so importing a single operator pulls in only what that operator needs — a fraction of the whole. The minzipped badge tracks the current figure.


Performance

Honesty first: streamfu is not the throughput winner, and it won't pretend to be. The trade it makes — composability and bounded memory for a constant per-stage cost — is worth seeing in numbers.

A synthetic NDJSON ingest pipeline (parse → filter ~50% → map → sink). Apple M4 Max · Deno 2.7.12 · 2026-05-28 — indicative, reproduce with deno task bench.

Throughput — 200k rows, 16.23 MB in memory:

Contender Throughput vs streamfu
raw Web Streams ~300 MB/s 4.4× faster
streaming-iterables ~139 MB/s 2.0× faster
streamfu ~68 MB/s

Peak memory — 2,000,000 rows:

Variant Peak RSS Shape
raw Web Streams (streaming) 141 MiB bounded
streamfu (streaming) 180 MiB bounded
load all into an array 595 MiB grows O(n)

Chaining TransformStream stages costs throughput, and an in-memory micro-ingest maximizes that overhead — with real I/O-bound work (a network fetch() body, a file or DB cursor) the per-stage cost is amortized against the wait. What you get back is bounded memory — like a hand-written reader, unlike materializing everything into an array, whose footprint grows with the dataset.

Concurrency — where it flips. Enriching 2000 items with an I/O-bound call (1 ms each):

Contender time/iter vs serial loop Note
Promise.all (unbounded) 6.5 ms ~710× faster no limit — melts real I/O
streamfu mapAsync({ concurrency: 20 }) 266.3 ms 17.4× faster bounded + backpressure
serial loop (naive baseline) 4.6 s one task at a time

The moment work is I/O-bound, mapAsync beats the serial loop a developer reaches for first by ~17×, with a bounded pool. Promise.all is faster only because it's unbounded — the kind of fast that exhausts a connection pool in production.

Where does the throughput gap come from? Not the Web Streams primitive — a single fused TransformStream (71 ms) actually beats a chain of async generators (108 ms, how streaming-iterables composes). The cost is the number of chained stages: every operator is a pipeThrough with its own queue and per-chunk scheduling, so the idiomatic 3-stage pipeline (since v0.9.0, with ndjson internally fused) pays ~3.4× over one hop — ~80 ms per hop. The gap shrinks with fewer operators, which is why composing tightly (or relying on operators that fuse internally) is the lever for throughput.

Composing for throughput: fewer stages = faster

Since the cost is per stage, the most effective thing you can do — without leaving streamfu — is collapse adjacent transforms into a single operator call. map is already variadic for exactly this: a chain of maps is one TransformStream, not N.

// 4 hops — one TransformStream per map.
pipe(stream, ndjson<Row>(), map((r) => r.id), map((id) => id * 2), map(String))

// 2 hops — `map` applies all three transforms inside one TransformStream.
pipe(stream, ndjson<Row>(), map((r) => r.id, (id) => id * 2, String))

The same idea is what ndjson does internally as of v0.9.0: line splitting and JSON.parse are one fused TransformStream instead of two — that's why the canonical ingest pipeline costs 3 hops instead of 4. You don't need to do anything to benefit from it; it's there because the throughput-per-stage trade-off applies inside the library too.

When to ignore this. The moment your pipeline waits on real I/O — a fetch() body over the network, a database cursor, a file read — the constant per-stage cost is amortized against the wait. A stage adds tens of milliseconds across an entire ingest; one network round-trip dwarfs that. Optimize stage count for CPU-bound, in-memory work; leave it alone otherwise.

Anti-pattern: pipe(stream, map(parse), map(enrich), map(format)) — three hops for what is really one logical "transform the chunk" step. Pass them as arguments to a single map instead. Same code, one stage.

Pick streamfu when the data is large, infinite, or arriving over time; if it fits in RAM and you just want raw CPU speed, an array wins. Full methodology and how to reproduce: benchmarks/.


Recipes

Complete, copy-pasteable pipelines for real tasks — each mirrored by an executable, type-checked file under examples/recipes/. Full collection in RECIPES.md:

The headline case — ingest an NDJSON endpoint, drop heartbeats, enrich with bounded concurrency, drain to a sink — is one pipeline:

import { filter, fromResponse, mapAsync, ndjson, pipeTo } from "@sgmonda/streamfu"

await pipeTo(
  ndjson<RawEvent>(fromResponse(await fetch("/events.ndjson"))),
  filter((event) => event.type !== "heartbeat"),
  mapAsync(enrich, { concurrency: 10 }),
  sink,
)

Stability

streamfu is in 0.x and follows stricter-than-SemVer rules:

  • Patch releases (0.x.y → 0.x.y+1) never break the public API.
  • Minor releases (0.x.0 → 0.x+1.0) may include breaking changes — always called out as BREAKING: in the changelog.

Pinning to ^0.9.x is safe for patch updates. The full policy, current API status by group, deprecation rules, and the exit criteria for v1.0 live in STABILITY.md.


Why streamfu?

Streams are one of the most powerful primitives in JavaScript. They handle infinite data, backpressure, and async flows — things arrays simply can't do.

But the standard API makes you pay for that power with boilerplate, footguns, and unreadable code.

The problem: Native streams are painful

Here's a real scenario — read a stream of numbers, keep only even ones, double them, and collect the results:

// ❌ Native Web Streams — imperative, verbose, error-prone
const reader = readable.getReader()
const results: number[] = []

while (true) {
  const { done, value } = await reader.read()
  if (done) break
  if (value % 2 === 0) {
    results.push(value * 2)
  }
}

reader.releaseLock()

Manual reader management. Mutable state. An infinite while (true) loop. And this is the simple case.

Need to split a stream? Native tee() only gives you two copies. Want to merge streams? Build your own. Want to zip? Good luck.

// ❌ Native — splitting a stream into 4 branches
const [a, rest1] = stream.tee()
const [b, rest2] = rest1.tee()
const [c, d] = rest2.tee()
// Hope you got the order right...

The solution: streamfu

// ✅ streamfu — declarative, composable, readable
import { createReadable, filter, list, map, pipe } from "@sgmonda/streamfu"

const readable = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

const stream = pipe(
  readable,
  filter((n) => n % 2 === 0),
  map((n) => n * 2),
)
const results = await list(stream)

Same result. No manual readers. No mutable state. No while (true). Just pure transformations.

// ✅ streamfu — branch into any number of copies
import { branch } from "@sgmonda/streamfu"

const [a, b, c, d] = branch(stream, 4)

Side-by-side comparison

Task Native Streams streamfu
Transform each chunk pipeThrough(new TransformStream({...})) map(stream, fn)
Filter chunks Manual reader loop + condition filter(stream, fn)
Reduce to value Manual reader loop + accumulator reduce(stream, fn, init)
Combine streams Manual reader orchestration zip(s1, s2, s3)
Concatenate streams Complex async pull logic concat(s1, s2, s3)
Split stream Nested .tee() chains branch(stream, n)
Get element at index Manual counter + reader at(stream, i)
Check if value exists Manual loop + early exit includes(stream, val)
Chain operations Deeply nested pipeThrough pipe(stream, f1, f2, f3)

If you know Array.prototype, you already know streamfu.


Getting Started

Install

Node.js / Bun — from npm
npm install @sgmonda/streamfu
yarn add @sgmonda/streamfu
pnpm add @sgmonda/streamfu
bun add @sgmonda/streamfu

Ships a dual CJS + ESM build with classic main / types fields, so it works in any project — including legacy setups with "module": "commonjs" and "moduleResolution": "node10". No tsconfig.json changes required.

Deno — from JSR
deno add jsr:@sgmonda/streamfu
Node.js / Bun — from JSR (modern TS only)
npx jsr add @sgmonda/streamfu

Requires "moduleResolution": "node16", "nodenext", or "bundler" in your tsconfig.json. If you're on "node10", install from npm instead (see above).

Quick start

import { createReadable, filter, map, pipe, reduce } from "@sgmonda/streamfu"

// Create a stream from any iterable
const numbers = createReadable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

// Compose transformations with pipe
const stream = pipe(
  numbers,
  filter((n) => n % 2 === 0), // keep even: 2, 4, 6, 8, 10
  map((n) => n * 2), // double:   4, 8, 12, 16, 20
)
const sumOfDoubledEvens = await reduce(stream, (a, b) => a + b, 0) // sum: 60

console.log(sumOfDoubledEvens) // 60

Curried form (v0.9.0+)

Every non-consuming transform (map, mapAsync, mapSettled, catchError, filter, flat, flatMap, slice, splice, take, drop, tap, inspect, batch, partition, groupConsecutive, unique, uniqueConsecutive, scan, ndjson, csv, delay, throttle, debounce, bufferTime) accepts two call shapes:

// Direct: pass the stream as the first argument
filter(stream, (n) => n > 0)
map(stream, (n) => n * 2)

// Curried: omit the stream, get back a (stream) => stream function
filter((n) => n > 0)
map((n) => n * 2)

// Both compose with pipe, but the curried form reads like an Array.prototype chain
pipe(
  numbers,
  filter((n) => n % 2 === 0),
  map((n) => n * 2),
)

The forms are interchangeable. The direct form is unchanged from prior versions — picking which to use is a style choice, not a migration step. lines already has the (stream) => stream shape and works directly inside pipe(s, lines) without a curried overload.

Consumers (reduce, list, at, some, every, includes, indexOf, forEach, count, join, toBuffer, pipeTo) are intentionally not curried — they return a Promise, so they don't fit pipe's (stream) => stream shape.

Async work with bounded concurrency: mapAsync (v0.9.0+)

map with an async function processes one chunk at a time — fine for CPU-bound transforms, slow for I/O. For pipelines that need parallel I/O (HTTP calls, DB lookups, file reads) use mapAsync:

// 1000 fetches in batches of 10, output preserves input order
const users = pipe(
  userIds,
  mapAsync((id) => fetch(`/api/users/${id}`).then((r) => r.json()), { concurrency: 10 }),
)
Option Default Effect
concurrency 10 Max tasks in flight. Must be a positive integer (Infinity is rejected).
ordered true true preserves input order; false emits each chunk as soon as its task completes.

When to use which:

Use Why
map(stream, async fn) Sequential async work. Each chunk waits for the previous. Strict order, no parallelism.
mapAsync(stream, fn, opts?) Parallel I/O. Up to concurrency tasks in flight. Backpressure preserved end-to-end.

Backpressure: mapAsync only reads from upstream when a pool slot is free, and total memory (in-flight tasks + buffered results awaiting emission) is bounded by concurrency. If the consumer slows down, the pool naturally throttles.

Errors: the first task error aborts the stream and cancels the upstream. Tasks already in flight keep running but their results are discarded — mapAsync does not accept an AbortSignal today, so design your fn to be cheap-to-abandon if you expect errors.

Error recovery: mapSettled and catchError (v0.9.0+)

A single throw in any transform aborts the whole pipeline. That is the wrong default for NDJSON ingest with the occasional malformed line, CSV with badly encoded rows, or fetch enrichment against a flaky API. streamfu offers two complementary operators:

  • mapSettled — like mapAsync, but converts per-chunk failures into tagged results instead of aborting. Each chunk produces a SettledResult<A, B>:

    type SettledResult<A, B> =
      | { status: "fulfilled"; value: B; chunk: A; index: number }
      | { status: "rejected"; reason: unknown; chunk: A; index: number }

    The original chunk and index are preserved on both branches so you can log, requeue, or diff after the fact.

    // Parse NDJSON tolerating bad lines.
    const tagged = pipe(lines, mapSettled(JSON.parse))
    for await (const r of tagged) {
      if (r.status === "fulfilled") use(r.value)
      else console.warn(`line ${r.index} failed:`, r.reason, "raw:", r.chunk)
    }

    Defaults match mapAsync: { concurrency: 10, ordered: true }. Upstream errors (i.e. the source itself errors, not the fn) still propagate — mapSettled only catches per-chunk failures.

  • catchError — stream-level recovery. When the upstream errors, the handler is invoked once with the error and may return a fallback chunk (emitted then the stream closes), return undefined/void (close silently), or throw to replace the original error.

    const safe = pipe(
      flakyStream,
      catchError((e) => {
        reportToSentry(e)
        return { fallback: true } // emitted as a final chunk; stream closes
      }),
    )

    catchError fires once and closes the stream — it cannot resume reading. For per-chunk recovery (continue processing the rest), use mapSettled.

Recipe: skipErrors

streamfu does not export skipErrors as its own symbol — it is a three-line composition over mapSettled:

import { filter, map, mapSettled, type SettledResult } from "@sgmonda/streamfu"

const skipErrors = <A, B>(fn: (a: A, i: number) => B | Promise<B>) => (stream: ReadableStream<A>) =>
  pipe(
    stream,
    mapSettled<A, B>(fn),
    filter((r: SettledResult<A, B>) => r.status === "fulfilled"),
    map((r) => (r as Extract<SettledResult<A, B>, { status: "fulfilled" }>).value),
  )

retry is not in v0.9.0

A retry(stream, fn, { times, delay, backoff }) operator is on the roadmap but is its own design problem (backoff strategies, jitter, transient-vs-permanent predicates, cancellation, interaction with mapAsync.concurrency). Until it ships, retry manually inside the transform:

const fetchWithRetry = async (id: string, attempts = 3): Promise<User> => {
  for (let i = 0; i < attempts; i++) {
    try {
      return await fetchUser(id)
    } catch (e) {
      if (i === attempts - 1) throw e
      await sleep(2 ** i * 100)
    }
  }
  throw new Error("unreachable")
}

pipe(ids, mapAsync(fetchWithRetry, { concurrency: 10 }))

Partitioning: partition (v0.9.0+)

partition(stream, predicate) classifies each chunk into one of two output streams in a single pass — matching (predicate truthy) and nonMatching (falsy). The predicate runs exactly once per chunk, even when it is async.

// Triage NDJSON: valid records to the DB, invalid to a dead-letter log.
const [valid, invalid] = partition(records, isValid)
await Promise.all([
  pipe(valid, mapAsync(saveToDb, { concurrency: 10 }), forEach(noop)),
  pipe(invalid, forEach(logRejection)),
])

Async predicates work the same way:

const [authenticated, anonymous] = partition(requests, async (req) => {
  const token = req.headers.get("authorization")
  return token ? await verifyToken(token) : false
})

Backpressure is shared between sides. If consumer A is slower than consumer B, A throttles B — because the demultiplexer pauses upstream reads when either side is full. This is the honest trade-off for evaluating the predicate exactly once. If you need fully independent rates at the cost of memory, use branch(s, 2) and apply filter to each branch:

const [a, b] = branch(records, 2)
const valid = filter(a, isValid)
const invalid = filter(b, async (r, i) => !(await isValid(r, i)))

branch + filter evaluates the predicate twice per chunk and buffers each side independently — divergent if the predicate has side effects or randomness, and unbounded memory if one side stalls (unless you pass branch(records, 2, { maxBuffer }); see Branching and memory below). Prefer partition whenever the predicate is non-trivial or both sides will be consumed at comparable rates.

The curried form is reusable across streams:

const splitByPriority = partition<Event>((e) => e.priority > 5)
const [hotA, coldA] = splitByPriority(streamA)
const [hotB, coldB] = splitByPriority(streamB)

If the predicate or the upstream errors, both outputs terminate with that error.

Grouping: groupBy and groupConsecutive (v0.9.0+)

Two operators with disjoint responsibilities — pick the one that matches your data shape and memory budget:

Need Use Memory
Eager Map<K, T[]>, dataset fits in RAM groupBy O(n)
Streaming runs by key, input ordered by key groupConsecutive O(largest run)
Streaming runs over an unordered input sort upstream, then groupConsecutive; or groupBy if it fits varies

groupBy(stream, keyFn): Promise<Map<K, T[]>> is a consumer — it reads the entire stream into memory and returns a Map keyed by keyFn. Insertion order is preserved inside each bucket. The bucket is mutated in place, so a stream with one dominant key stays O(n), not O(n²). Supports async keyFn:

const byCountry = await groupBy(users, (u) => u.country)
for (const [country, list] of byCountry) {
  console.log(country, list.length)
}

const byCanonical = await groupBy(records, async (r) => canonicalKey(r))

groupConsecutive(stream, keyFn): ReadableStream<readonly [K, T[]]> is a transform — it emits one [key, chunks] tuple every time the key changes, and a final tuple on close. Memory stays at the size of the largest run, so it scales to streams that don't fit in RAM as long as the input is ordered by key:

// CSV rows pre-sorted by country → one [country, rows] emission per country.
for await (const [country, rows] of groupConsecutive(csvRows, (r) => r.country)) {
  await processCountry(country, rows)
}

Keys are compared with ===. Two distinct objects with the same shape count as different keys. If you need structural equality, normalize to a primitive upstream (e.g. map(x => ({ ...x, _key: hash(x) })) then group by _key).

groupConsecutive on unordered input emits the same key multiple times — one tuple per non-contiguous run. That is intentional and the reason the operator is named "consecutive" rather than "by". Use groupBy (or sort upstream first) when you need uniqueness.

If the upstream or keyFn errors, the partial run in groupConsecutive is not emitted — the resulting stream errors immediately. groupBy's promise rejects with the same error.

The curried form of groupConsecutive follows the B.1 convention:

const runByDay = groupConsecutive<Event, string>((e) => e.day)
const [todayA, ...] = runByDay(streamA)
const [todayB, ...] = runByDay(streamB)

groupBy has no curried form — consumers don't, by convention (see STABILITY.md).

Deduplication: unique and uniqueConsecutive (v0.9.0+)

Same shape as the grouping pair: pick based on whether you need global dedup (O(n) memory) or just runs of consecutive duplicates (O(1) memory).

Need Use Memory Equality
Dedup globally by key unique O(unique keys) SameValueZero (Set)
Suppress consecutive same-key duplicates only uniqueConsecutive O(1) ===

unique(stream, keyFn?): ReadableStream<T> keeps an internal Set and emits the first occurrence of each key. keyFn is optional — when omitted, chunks are compared by identity:

// Dedup primitives.
const ids = await list(unique(idStream))

// Dedup by id, keeping the first occurrence of each event.
const events = pipe(rawEvents, unique((e) => e.id))

// Async keys work the same way.
const canonical = pipe(records, unique(async (r) => await canonicalKey(r)))

uniqueConsecutive(stream, keyFn?): ReadableStream<T> remembers only the last key and drops a chunk whose key matches it. Memory is O(1), so it scales to streams that don't fit in RAM:

// SSE: don't re-emit messages with the same signal as the previous one.
const dedup = pipe(sse, uniqueConsecutive((m) => m.signal))

Different NaN semantics. unique uses Set equality (SameValueZero), so two NaN chunks collapse to one. uniqueConsecutive compares with ===, so two consecutive NaN chunks both pass through. If you need Set semantics on consecutive duplicates too, normalize upstream (map(x => Number.isNaN(x) ? "NaN" : x)) before deduping.

uniqueConsecutive on unordered input lets the same key pass through multiple times — one chunk per non-contiguous run. That is intentional; use unique if you need global deduplication.

Both operators support the curried form:

const dedupById = unique<Event, string>((e) => e.id)
const a = dedupById(streamA)
const b = dedupById(streamB) // each call gets an independent Set

const dropRepeats = uniqueConsecutive<number>()

If keyFn or the upstream errors, the resulting stream errors.

Running state: scan (v0.9.0+)

scan is the missing sibling of reduce: instead of consuming the stream and resolving with the final accumulator, it emits the accumulator after each chunk. For N input chunks, exactly N chunks come out — the seed is never emitted. This is the idiomatic operator for running totals, sliding stats, watermarks, and any evolving state in a streaming pipeline.

// Running total: emits 1, 3, 6, 10, 15.
const totals = pipe(numbers, scan((acc, n) => acc + n, 0))

// Sliding stats with an immutable accumulator.
const stats = pipe(
  events,
  scan(
    (acc, e) => ({
      count: acc.count + 1,
      sum: acc.sum + e.value,
      avg: (acc.sum + e.value) / (acc.count + 1),
    }),
    { count: 0, sum: 0, avg: 0 },
  ),
)

// Async reducer (state that depends on I/O).
const enriched = pipe(
  events,
  scan(async (acc, e) => ({ ...acc, last: await dbLookup(e.id) }), { last: null }),
)

The seed is internal state, not a chunk. Empty streams produce zero emissions — init is never observed. If you want to see the starting state, prepend it explicitly:

const totalsWithSeed = concat(createReadable([0]), pipe(numbers, scan(sum, 0)))

Mutation is a footgun. Returning the same acc reference every iteration is allowed, but every emitted chunk then shares that reference. Anything that collects them (await list(...)) sees the final value in every position. Return a new value per iteration unless you have measured this matters.

The curried form composes naturally inside pipe:

const runningTotal = scan<number, number>((acc, n) => acc + n, 0)
const a = runningTotal(streamA) // each call has its own internal acc
const b = runningTotal(streamB)

If the reducer throws/rejects or the upstream errors, the resulting stream errors.


Find by predicate: find (v0.9.0+)

find is the streaming sibling of Array.prototype.find: it walks the stream chunk by chunk and resolves with the first chunk that satisfies a predicate, or undefined if none do. Unlike filter-based workarounds, it terminates as soon as the predicate is satisfied — the upstream is cancelled and no further chunks are pulled.

// First paid order — stops at the first match.
const paid = await find(orders, (o) => o.status === "paid")

// Predicate receives (chunk, index), like filter/map.
const atEvenIndex = await find(stream, (chunk, i) => i % 2 === 0 && chunk.kind === "x")

// Async predicate — useful when matching depends on I/O.
const premium = await find(users, async (u) => await isPremium(u.id))

// Combine with `??` for a default.
const target = (await find(stream, isMatch)) ?? defaultValue

undefined is ambiguous when the stream emits undefined legitimately. Same trade-off as Array.prototype.find: the caller cannot distinguish "found, value is undefined" from "not found". If that matters, lift the predicate to a presence-detector and track the chunk on the side.

If the predicate throws/rejects or the upstream errors before a match, the returned promise rejects with that error.

find is a consumer (returns Promise<T | undefined>); like every other consumer (reduce, some, every, at, includes, indexOf, forEach, count, join, toBuffer, groupBy, list), it is intentionally not curried.


reduce without an initial value (v0.9.0+)

reduce(stream, fn) mirrors Array.prototype.reduce(fn): the first chunk becomes the accumulator and fn is invoked from the second chunk with index = 1. Useful when the operation has no natural neutral element — Math.max, Math.min, "pick the most recent record", string concatenation when the empty string would change semantics.

import { createReadable, reduce } from "@sgmonda/streamfu"

// No need to remember -Infinity / Infinity:
const max = await reduce(createReadable([4, 1, 9, 3]), (a, b) => Math.max(a, b)) // 9
const min = await reduce(createReadable([4, 1, 9, 3]), (a, b) => Math.min(a, b)) // 1

// The seeded overload is unchanged — pick the form that matches the operation.
const sum = await reduce(createReadable([1, 2, 3]), (a, b) => a + b) // 6
const sumWithBias = await reduce(createReadable([1, 2, 3]), (a, b) => a + b, 100) // 106

Empty stream. Calling reduce(s, fn) on an empty stream rejects the returned promise with TypeError("reduce of empty stream with no initial value"), matching Array.prototype.reduce's behaviour. The rejection is async (the runtime cannot know the stream is empty until it pulls). If your stream may be empty, either pass an explicit initialValue or wrap the call in a .catch / try/await.

When to keep the seeded overload. Prefer reduce(s, fn, init) when the accumulator is a different type than the chunks (e.g. building a Map from items), when the seed is non-neutral (0 for sum is neutral; {} for object-building is not), or when the stream is allowed to be empty and the seeded result is meaningful.


NDJSON ingest: ndjson (v0.9.0+)

ndjson parses Newline-Delimited JSON (also known as JSON Lines / .jsonl) in a single call. It is what you actually want when the recipe "split by lines, then JSON.parse each" comes up — three real-world details that the naïve composition gets wrong are handled by default:

  1. Empty and whitespace-only lines are skipped. A trailing \n at the end of the file no longer explodes the pipeline with JSON.parse("").
  2. A leading UTF-8 BOM is stripped. Files exported from Windows, Excel, or some log appenders parse without manual cleanup.
  3. Parse failures error the stream with NdjsonParseError, carrying line (1-indexed, counting empty lines so it matches the editor) and raw (the offending line). The original SyntaxError is preserved as cause.

Accepts the same input as lines: a stream of strings or Uint8Array chunks (UTF-8). Lines split across chunks parse correctly.

import { ndjson } from "@sgmonda/streamfu"

// Direct form — typed parsing in one call.
const response = await fetch("https://api.example.com/events.ndjson")
for await (const event of ndjson<Event>(response.body!)) handle(event)

// Curried form, composes inside `pipe`.
const errors = pipe(
  fileStream,
  ndjson<LogEvent>(),
  filter((e) => e.severity === "error"),
  forEach(sendToSink),
)

// Fail-fast with a final fallback chunk — `catchError` runs once and closes
// the stream (it does not resume reading). Useful for "give me a sentinel if
// the file is corrupt at all", not for "skip every bad line".
import { catchError, ndjson, NdjsonParseError } from "@sgmonda/streamfu"
const withFallback = catchError(ndjson<Event>(fileStream), (err) => {
  if (err instanceof NdjsonParseError) {
    console.warn(`Stopped at line ${err.line}: ${err.raw}`)
    return { kind: "halted", reason: err.message } satisfies Event
  }
  throw err
})

// Tolerate many bad lines and keep going — drop down to `lines + mapSettled`
// for per-chunk recovery. You lose the typed `NdjsonParseError` and BOM
// stripping, so add them explicitly if needed.
const robust = pipe(
  fileStream,
  lines,
  filter((line) => line.trim().length > 0),
  mapSettled<string, Event>((line) => JSON.parse(line)),
  filter((r) => r.status === "fulfilled"),
  map((r) => (r as { status: "fulfilled"; value: Event }).value),
)

Strict by default. A single invalid line errors the whole stream — composing with catchError gives you a final-fallback hook but does not resume the stream (see the Error recovery section above). For per-line tolerance, drop down to the lines + mapSettled recipe shown above. Skip of empty lines and BOM is not configurable in ndjson; both are always the right behaviour for real NDJSON.


CSV parsing: csv (v0.9.0+)

csv parses RFC 4180 CSV — including quoted fields, doubled-quote escapes ("""), \n inside quoted fields, configurable delimiter and quote character, and a leading UTF-8 BOM — into a stream of objects or arrays. It is what you actually want when the recipe "split records by lines, then split fields by commas" comes up — every detail that String.prototype.split(",") gets wrong is handled by default.

Three modes, picked by opts.header:

import { csv } from "@sgmonda/streamfu"

// 1) Auto header (default) — first row becomes the keys, subsequent rows are
// Record<string, string>.
for await (const row of csv(fileStream)) save(row)
// row: Record<string, string>

// 2) Explicit header — keys come from `opts.header` (a literal-typed array
// gives you a narrowed Record). The first row is NOT consumed.
const users = csv(fileStream, { header: ["name", "age", "email"] as const })
for await (const u of users) {
  // u: Record<"name" | "age" | "email", string>
  saveUser({ ...u, age: Number(u.age) })
}

// 3) Arrays mode — no header mapping. Each emission is a string[].
for await (const cells of csv(fileStream, { header: false })) {
  // cells: string[]
}

// Curried, with a European delimiter.
const result = pipe(
  fileStream,
  csv({ delimiter: ";" }),
  filter((row) => row.country === "ES"),
  forEach(insertDb),
)

Parsing errors surface as CsvParseError, carrying the 1-indexed line (counting header and empty rows), the raw record, and — when applicable — the column where parsing bailed or the expected vs actual column count for cardinality mismatches:

import { csv, CsvParseError } from "@sgmonda/streamfu"

try {
  for await (const row of csv(fileStream)) save(row)
} catch (e) {
  if (e instanceof CsvParseError) {
    console.error(`CSV: line ${e.line}: ${e.message} (raw: ${e.raw})`)
  } else {
    throw e
  }
}

For per-row tolerance — keep parsing past malformed records — drop down to csvLines + mapSettled:

import { csvLines, filter, map, mapSettled, pipe } from "@sgmonda/streamfu"

const robust = pipe(
  fileStream,
  csvLines,
  mapSettled<string, MyRow>(parseOneRow), // your row-level parser
  filter((r) => r.status === "fulfilled"),
  map((r) => (r as { status: "fulfilled"; value: MyRow }).value),
)

Caveats. Whitespace around unquoted fields is preserved (not trimmed): "a, b ,c" parses as ["a", " b ", "c"]. Every cell is a stringcsv never casts to numbers/booleans; do that explicitly in a map. Legacy Mac line endings (\r alone) are not supported (\n and \r\n are). Cardinality mismatches in object modes throw CsvParseError; pass header: false to opt out of that check.


Sources for web pipelines: fromEvents / fromPromise / fromResponse (v0.9.0+)

Three sources that turn the most common web inputs — DOM/WebSocket/EventSource events, deferred promises, and fetch responses — into a ReadableStream you can drop into pipe without boilerplate. They sit alongside createReadable, range, iterate, and words; like those, they are not curried (they take an external input, not an upstream stream).

fromEvents(target, type, opts?)

Bridges any EventTarget (DOM elements, WebSocket, EventSource, MessagePort, AbortSignal, BroadcastChannel, …) into ReadableStream<E>. Cancelling the stream, aborting opts.signal, or matching opts.once all detach every registered listener, so there are no leaks regardless of which path closes the pipeline.

import { forEach, fromEvents, pipe } from "@sgmonda/streamfu"

// 1) DOM clicks — coordinated abort via AbortSignal.
const ac = new AbortController()
pipe(
  fromEvents<MouseEvent>(button, "click", { signal: ac.signal }),
  forEach((e) => console.log(e.clientX)),
)
// later: ac.abort() — closes the stream and removes the listener in one step.

// 2) WebSocket messages with typed payload.
const ws = new WebSocket("wss://example.com/events")
const events = pipe(
  fromEvents<MessageEvent<string>>(ws, "message"),
  map((e) => JSON.parse(e.data) as Event),
)

// 3) Coordinated page-close handler — listen to multiple event types at once,
// then close after the first one fires.
pipe(
  fromEvents(window, ["pagehide", "beforeunload"], { once: true }),
  forEach(() => flushPendingWrites()),
)

Buffering. fromEvents uses the underlying ReadableStream queue with no upper bound: events are push-based and the queue grows if the consumer falls behind. High-frequency sources (mousemove, scroll, pointermove) should be throttled, debounced, or sampled downstream (slice(0, N), or throttle / debounce in a future minor) to bound memory.

Node EventEmitter is not directly supported — the surface uses the standard EventTarget interface. Bridge an emitter with events.on and createReadable:

import { on } from "node:events"
import { createReadable } from "@sgmonda/streamfu"
const data = createReadable(on(emitter, "data")) // async-iterable adapter

fromPromise(promise | () => promise)

Turns a Promise<T> into a one-emission stream. Resolution emits the value and closes; rejection errors the stream without wrapping the reason. Two overloads:

import { concat, fromPromise, pipe } from "@sgmonda/streamfu"

// 1) Eager — natural for the common case.
const user = fromPromise(fetchUser(id))
for await (const u of user) render(u)

// 2) Lazy factory — defers the work until the consumer reaches this segment.
// With `concat`, the cached fetch only runs if the live stream hasn't already
// served something earlier.
const stream = concat(
  fromPromise(() => fetchCached()), // not called until start() runs
  liveStream,
)

The promise cannot be cancelled. Cancelling the returned stream stops the stream from emitting, but JavaScript has no general-purpose promise cancellation, so the underlying work runs to completion. For HTTP requests, pass an AbortSignal to fetch itself — that cancels the request — and then wrap the resulting Response with fromResponse.

fromResponse(response, opts?)

Returns response.body as ReadableStream<Uint8Array>, but with two safe defaults that catch the most common footguns of raw response.body!:

  1. !response.ok throws synchronously with a FromResponseError. This catches the case where an HTML error page from a 500 would otherwise be parsed downstream as NDJSON / CSV and fail much later, with a confusing message.
  2. response.body === null throws synchronously (e.g. 204 No Content, HEAD responses).

Both checks are opt-out via throwOnError: false / throwOnEmpty: false.

import { csv, forEach, fromResponse, FromResponseError, ndjson, pipe } from "@sgmonda/streamfu"

// 1) fetch → NDJSON.
const events = ndjson<Event>(fromResponse(await fetch("/events.ndjson")))
for await (const ev of events) handle(ev)

// 2) fetch → CSV with explicit HTTP error handling.
try {
  pipe(
    fromResponse(await fetch("/export.csv")),
    csv(),
    forEach(saveRow),
  )
} catch (e) {
  if (e instanceof FromResponseError) {
    console.error(`HTTP ${e.status} ${e.statusText} for ${e.url}`)
  } else {
    throw e
  }
}

// 3) Consume an error body anyway (e.g. JSON error envelopes that ship with 4xx).
const errorBody = fromResponse(await fetch("/api/data"), { throwOnError: false })

Cancellation. Cancelling the returned stream closes the underlying body connection (the runtime aborts the download). To cancel the request itself, pass an AbortSignal to fetch; the body cancellation cascade then completes the cleanup.

Synchronous validation. FromResponseError is thrown the moment you call fromResponse, not on the first read(). This makes try / catch ergonomic on the line where you build the source and surfaces HTTP failures before the pipeline runs.


Debugging pipelines: inspect (v0.9.0+)

inspect(stream, label?) logs every chunk as it flows through and forwards it untouched — a tap with a consistent, ready-made format. It exists to answer "what's actually going through this pipe?" without hand-rolling a console.log at each stage.

Debug-only — not for production. Each chunk is serialized and written to a sink, which adds overhead and log noise. The default sink is console.error (stderr), so it never pollutes data written to stdout.

import { filter, inspect, map, ndjson, pipe } from "@sgmonda/streamfu"

const users = pipe(
  response.body!,
  ndjson,
  inspect("parsed"),
  map((u) => ({ ...u, name: u.name.trim() })),
  inspect("mapped"),
  filter((u) => u.active),
  inspect("active"),
)

// stderr — `@t` is milliseconds since the operator was created, so you can read inter-chunk cadence:
// [parsed] #0 @0.0ms {"id":1,"name":" Ada "}
// [mapped] #0 @0.4ms {"id":1,"name":"Ada"}
// [active] #0 @0.5ms {"id":1,"name":"Ada","active":true}

The default line is [label] #N @t.tms value (the [label] prefix is dropped when no label is given). Objects are serialized with JSON.stringify, falling back to String(value) for non-serializable values (e.g. circular references). For full control, pass options instead of a bare label:

// Send lines somewhere other than stderr, or format them yourself.
inspect(stream, { label: "rows", logger: (line) => myLogger.debug(line) })
inspect(stream, { formatter: (chunk, i) => `${i}: ${chunk.id}` })

The output format is @experimental — it may change before v1.0. The behavior (passthrough, chunks untouched) is stable. See STABILITY.md.

Reading stack traces

When a callback throws inside an operator, streamfu propagates the error with the stack intact — it does not wrap or swallow it. Two frames matter:

Error: Cannot read properties of undefined (reading 'name')
    at /my-app/index.ts:4:30      ← your callback, exactly where it threw
    at Object.transform (…/streamfu/src/map.ts:282)  ← the operator that ran it

Your callback keeps its own frame (file and line), and the streamfu frame points to the operator's source file (src/map.ts for map, src/filter.ts for filter, …), so you can tell which operator was running. The deeper TransformStreamDefaultController frames belong to the runtime's Streams implementation, not to streamfu.


Branching and memory: branch (v0.9.0+)

branch(stream, n) splits a stream into n independent copies. Under the hood it chains ReadableStream#tee(), which inherits a memory-footgun from the spec: if one branch consumes slower than the others, its queue grows without limit. For pipelines that touch large files, infinite event streams, or long-running fetch bodies, that means O(stream) memory — silently.

There are two ways to keep memory bounded:

1. Consume every branch in parallel

When the branches do roughly the same amount of work, Promise.all is enough — every branch drains at a similar rate and no single one falls far behind:

import { branch, reduce } from "@sgmonda/streamfu"

const [forSum, forMax] = branch(numbers, 2)
const [sum, max] = await Promise.all([
  reduce(forSum, (a, b) => a + b, 0),
  reduce(forMax, (a, b) => Math.max(a, b), -Infinity),
])

The trap is the sequential pattern: await reduce(forSum, …) then await reduce(forMax, …) drains every chunk of forSum while the demultiplexer buffers all of them for forMax. Cheap for arrays of 10 numbers, catastrophic for a million-row CSV.

2. Cap each branch's queue with maxBuffer

When the branches are intrinsically asymmetric (e.g. one writes to a database, the other counts in memory), pass { maxBuffer } to make the slow branch throttle the rest instead of letting memory grow:

import { branch, forEach, mapAsync, reduce } from "@sgmonda/streamfu"

const [forDb, forStats] = branch(records, 2, { maxBuffer: 1024 })

await Promise.all([
  forEach(mapAsync(forDb, writeRow, { concurrency: 8 }), () => {}),
  reduce(forStats, accumulate, initial),
])

With maxBuffer: 1024 chunks, neither branch can ever queue more than 1024 records ahead of the other. If the database side stalls, the demultiplexer waits to pull the next chunk from upstream — the stats side experiences backpressure but does not lose data and does not throw. The slowest active branch sets the pace for the group.

Default is unbounded. Omitting maxBuffer keeps the historical behaviour (chained tee(), zero overhead). The opt-in form is the only way to get the bounded version today; the default may tighten in a future major release once benchmarks settle the right size.

No drop policies. maxBuffer only does backpressure. If a future release adds onOverflow: "throw" | "drop-oldest" | "drop-newest", the current { maxBuffer } semantics remain the implicit default ("block").

Cross-link: the same footgun shows up in the branch + filter alternative discussed in the partition caveat above — there too, branch(records, 2, { maxBuffer }) makes the pattern safe.


Time-based operators: delay / throttle / debounce / bufferTime (v0.9.0+)

streamfu ships a small, focused set of time-aware operators for real-world pipelines — SSE, WebSockets, UI events, log batching — without turning into RxJS (no Subjects, schedulers, or hot/cold streams; see STABILITY.md and the scope note above). All four take a duration in milliseconds and have a curried form.

import { bufferTime, debounce, delay, pipe, tap, throttle } from "@sgmonda/streamfu"

// debounce — emit only after the user stops typing for 300ms
pipe(searchInput, debounce(300), tap((q) => search(q)))

// throttle — at most one scroll event every 100ms (leading edge)
pipe(scrollEvents, throttle(100), tap(render))

// bufferTime — flush the log lines collected each second as one array
pipe(logLines, bufferTime(1000), tap((batch) => ship(batch)))

// delay — space out retries by 200ms each
pipe(retryJobs, delay(200), tap(run))

Semantics, chosen for predictability:

Operator When it emits
delay(ms) Each chunk ms after it arrives. Order and backpressure are preserved.
throttle(ms) The first chunk immediately, then drops chunks for ms, then repeats (leading edge — no trailing emit).
debounce(ms) A chunk only once ms passes with no newer chunk; a pending chunk is flushed when the upstream closes.
bufferTime(ms) One array per ms window. Empty windows are skipped (no [] while idle); a partial buffer is flushed on close.

Backpressure caveat. throttle, debounce, and bufferTime drain the upstream eagerly — they must read chunks to drop them, reset the timer, or fill a window — so they do not apply backpressure. For an unbounded, fast source feeding a slow consumer, bufferTime can accumulate within a window. delay is the exception: it preserves backpressure (the next chunk is not pulled until the current one is emitted). Cancelling any of them clears its pending timer and cancels the upstream.


Draining to a sink: pipeTo (v0.9.0+)

pipeTo(readable, ...transforms?, writable) runs a source through any number of optional transforms and drains the result into a WritableStream, resolving when the flow completes. It's the terminal counterpart of pipe — variadic streamfu equivalent of ReadableStream.prototype.pipeTo, with composable middle steps:

import { createWritable, filter, map, mapAsync, ndjson, pipeTo } from "@sgmonda/streamfu"

await pipeTo(
  fromResponse(res),
  ndjson<Row>(),
  filter((r) => r.active),
  mapAsync(enrich, { concurrency: 8 }),
  createWritable(persist),
)

Errors in the source, any transform, or the sink reject the returned promise. When the sink fails, the source is cancelled — no unbounded buffering upstream.

pipe vs pipeTo:

Operator Returns Role
pipe ReadableStream<…> Composition: the result can be branched, zipped, or read by another consumer.
pipeTo Promise<void> Terminal: the pipeline runs to completion into a WritableStream.

Writing your own operator: public types (v0.9.0+)

streamfu exports the same type aliases its built-in operators use, so a custom transform composes cleanly with pipe, map, reduce, scan and zip:

import {
  pipe,
  type StreamPipeFn,
  type StreamReducer,
  type StreamTransformer,
  type StreamTuple,
} from "@sgmonda/streamfu"

// (chunk, index) => value | Promise<value> — same shape that `map` accepts.
const double: StreamTransformer<number, number> = (n) => n * 2

// A pipe-able step: either a function or a TransformStream.
const onlyEven: StreamPipeFn<number, number> = (rs) =>
  rs.pipeThrough(
    new TransformStream({
      transform(n, c) {
        if (n % 2 === 0) c.enqueue(n)
      },
    }),
  )

// (acc, chunk, index) => acc | Promise<acc> — what `reduce` and `scan` expect.
const sum: StreamReducer<number, number> = (acc, n) => acc + n

// `zip` infers chunk tuples from a tuple of ReadableStreams via StreamTuple.
type Row = StreamTuple<readonly [ReadableStream<number>, ReadableStream<string>]>
// → [number, string]

These names are part of the Stable surface — see STABILITY.md.


API Reference

Stream creation

Function Description
createReadable(iterable) Create a stream from arrays, generators, sets, strings — any iterable
createWritable(fn | opts) Create a writable stream from a callback or { write, close?, abort?, highWaterMark? } config
range(min, max, step?) Generate a stream of numbers in a range
words(chars, length) Generate a stream of random strings
iterate(fn) Generate a stream by calling fn(i) until it returns null
fromEvents(target, type, opts?) Bridge an EventTarget into a stream (DOM, WebSocket, AbortSignal, …)
fromPromise(promise | () => p) Turn a promise (or lazy factory) into a one-emission stream
fromResponse(response, opts?) Read a fetch Response body with safe defaults (!ok / null-body throws)

Transformations (non-consuming)

These return a new stream — the original is not consumed.

Function Description
map(stream, ...fns) Transform each chunk (supports chaining multiple transforms)
mapAsync(stream, fn, opts?) Async transform with bounded concurrency for I/O
mapSettled(stream, fn, opts?) Like mapAsync but never aborts — emits tagged results
catchError(stream, handler) Stream-level recovery: fallback chunk + close on error
filter(stream, fn) Keep only chunks matching a predicate
partition(stream, fn) Split into [matching, nonMatching] in one pass
groupConsecutive(stream, keyFn) Emit [key, run] tuples for consecutive same-key chunks
unique(stream, keyFn?) Dedup globally by key (O(unique keys) memory, Set semantics)
uniqueConsecutive(stream, keyFn?) Drop consecutive same-key duplicates only (O(1) memory)
scan(stream, fn, init) Like reduce but emits the accumulator after each chunk
tap(stream, fn) Run a side-effect on each chunk, forward chunks untouched
inspect(stream, label?) Debug-only: log each chunk with a consistent format, forward untouched
flat(stream, depth?) Flatten a stream of arrays
flatMap(stream, fn) Map + flatten in one step
batch(stream, size) Group consecutive chunks into arrays of size
slice(stream, start, end?) Extract a portion of the stream
take(stream, n) Keep the first n chunks (alias of slice(s, 0, n))
drop(stream, n) Skip the first n chunks (alias of slice(s, n))
splice(stream, start, count, ...items) Remove/insert chunks at a position
concat(...sources) Concatenate streams in order; sources may be lazy factories/promises
merge(...streams) Interleave multiple streams as chunks arrive
zip(...streams) Combine streams into tuples; stops at the shortest stream
zipLongest(...streams, opts?) Like zip but runs to the longest; pads exhausted streams
lines(stream) Split a text or byte stream into lines (UTF-8)
csvLines(stream) Like lines(), but preserves \n inside quoted CSV fields
csv(stream, opts?) Parse CSV (RFC 4180) into objects or arrays with header opts
ndjson(stream) Parse NDJSON / JSON Lines into typed objects (line-aware)
pipe(stream, ...fns) Chain functions or TransformStreams
branch(stream, n, opts?) Split a stream into n copies; opts.maxBuffer caps per-branch memory
delay(stream, ms) Time-shift: emit each chunk ms after arrival (backpressure preserved)
throttle(stream, ms) Leading-edge rate limit: at most one chunk per ms window
debounce(stream, ms) Trailing edge: emit a chunk only after ms of silence
bufferTime(stream, ms) Group chunks into one array per ms window (empty windows skipped)

Consumers (consuming)

These consume the stream — it cannot be reused afterward.

Function Description
reduce(stream, fn, init?) Reduce to a single value; init optional (first chunk is the seed when omitted)
groupBy(stream, keyFn) Eager Map<K, T[]> keyed by keyFn (O(n) memory)
list(stream) Collect all chunks into an array
count(stream) Count the number of chunks
some(stream, fn) Check if any chunk matches
every(stream, fn) Check if all chunks match
forEach(stream, fn) Execute a function for each chunk
pipeTo(stream, ...fns, sink) Drain through optional transforms into a WritableStream; resolves when done
includes(stream, value) Check if a value exists in the stream
at(stream, index) Get the chunk at a specific index
indexOf(stream, value) Find the index of a value
find(stream, fn) First chunk matching a predicate, or undefined
join(stream, separator?) Join all chunks into a string
toBuffer(stream) Concatenate binary/text chunks into a Uint8Array

Consuming vs non-consuming

Rule of thumb: If it returns a ReadableStream, it's non-consuming. If it returns a Promise, it consumes the stream.

Need to consume a stream multiple times? Use branch() first:

const [forSum, forCount] = branch(numbers, 2)

const sum = await reduce(forSum, (a, b) => a + b, 0)
const count = await reduce(forCount, (acc) => acc + 1, 0)

tap vs forEach

These two look alike — both run a function for every chunk — but they sit on opposite sides of that rule:

  • tap = side-effect mid-pipe. Returns a ReadableStream, so the chunks keep flowing to the next stage. Use it to log, measure, or collect while the pipeline continues.
  • forEach = consumer terminal. Returns a Promise<void> and drains the stream, which can't be reused afterward. Use it as the final step when there's nothing left to transform.
// tap — side-effect, stream keeps going (returns ReadableStream)
const evens = pipe(
  numbers,
  tap((n, i) => console.log(`#${i}: ${n}`)), // logs, forwards untouched
  filter((n) => n % 2 === 0),
) // ← still a stream; pipe continues

// forEach — terminal, drains the stream (returns Promise<void>)
await pipe(
  numbers,
  filter((n) => n % 2 === 0),
  forEach((n) => console.log(n)), // ← nothing flows after this
)
Operator Returns Consumes? Fits in pipe? Use for
tap(stream, fn) ReadableStream No Yes (mid-pipe) Logging/metrics while continuing
forEach(stream, fn) Promise<void> Yes Yes (terminal) Final side-effect, no more stages

Streams the Hard Way vs streamfu

Every example below shows a real task done the hard way with native Web Streams, then the easy way with streamfu.

1. Transform every chunk — Parse CSV lines and uppercase names

Before — native Web Streams:

const transform1 = new TransformStream({
  transform(line, ctrl) {
    ctrl.enqueue(line.split(","))
  },
})
const transform2 = new TransformStream({
  transform(cols, ctrl) {
    ctrl.enqueue({ name: cols[0].toUpperCase(), age: Number(cols[1]) })
  },
})

const reader = csvStream.pipeThrough(transform1).pipeThrough(transform2).getReader()
const results = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  results.push(value)
}
reader.releaseLock()

After — streamfu:

import { createReadable, list, map } from "@sgmonda/streamfu"

const stream = map(
  csvStream,
  (line) => line.split(","),
  (cols) => ({ name: cols[0].toUpperCase(), age: Number(cols[1]) }),
)
const results = await list(stream)
2. Filter and collect — Keep only active users from a stream

Before — native Web Streams:

const filter = new TransformStream({
  transform(user, ctrl) {
    if (user.status === "active") ctrl.enqueue(user)
  },
})

const reader = usersStream.pipeThrough(filter).getReader()
const active = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  active.push(value)
}
reader.releaseLock()

After — streamfu:

import { filter, list } from "@sgmonda/streamfu"

const active = await list(filter(usersStream, (u) => u.status === "active"))
3. Reduce to a single value — Sum a stream of numbers

Before — native Web Streams:

const reader = numbersStream.getReader()
let total = 0
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  total += value
}
reader.releaseLock()

After — streamfu:

import { reduce } from "@sgmonda/streamfu"

const total = await reduce(numbersStream, (sum, n) => sum + n, 0)
4. Multi-step pipeline — Filter, transform, and collect in one chain

Before — native Web Streams:

const filterStep = new TransformStream({
  transform(n, ctrl) {
    if (n % 2 === 0) ctrl.enqueue(n)
  },
})
const doubleStep = new TransformStream({
  transform(n, ctrl) {
    ctrl.enqueue(n * 2)
  },
})
const toStringStep = new TransformStream({
  transform(n, ctrl) {
    ctrl.enqueue(`Value: ${n}`)
  },
})

const reader = numbersStream
  .pipeThrough(filterStep)
  .pipeThrough(doubleStep)
  .pipeThrough(toStringStep)
  .getReader()
const results = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  results.push(value)
}
reader.releaseLock()

After — streamfu:

import { filter, list, map, pipe } from "@sgmonda/streamfu"

const results = await list(pipe(
  numbersStream,
  filter((n) => n % 2 === 0),
  map((n) => n * 2),
  map((n) => `Value: ${n}`),
))
5. Concatenate multiple sources — Merge API pages into one stream and take the first 50

Before — native Web Streams:

const sources = [page1Stream, page2Stream, page3Stream]
const reader1 = sources[0].getReader()
const reader2 = sources[1].getReader()
const reader3 = sources[2].getReader()
const all = []

for (const reader of [reader1, reader2, reader3]) {
  while (true) {
    const { done, value } = await reader.read()
    if (done) break
    all.push(value)
  }
  reader.releaseLock()
}
const first50 = all.slice(0, 50)

After — streamfu:

import { concat, list, slice } from "@sgmonda/streamfu"

const first50 = await list(slice(concat(page1Stream, page2Stream, page3Stream), 0, 50))

Each concat source can also be a factory () => ReadableStream | Promise<ReadableStream>. The factory runs only when the previous source is drained — so a page is fetched lazily, and pages past an early stop (here slice(0, 50)) are never requested at all:

const first50 = await list(slice(concat(() => fetchPage(0), () => fetchPage(1), () => fetchPage(2)), 0, 50))
// fetchPage(2) never runs if the first two pages already cover 50 rows

A plain Promise<ReadableStream> is accepted too, but it's created eagerly — only factories defer the work.

6. Zip parallel streams — Pair names with scores into labeled strings

Before — native Web Streams:

const readerA = namesStream.getReader()
const readerB = scoresStream.getReader()
const leaderboard = []

while (true) {
  const [a, b] = await Promise.all([readerA.read(), readerB.read()])
  if (a.done || b.done) break
  leaderboard.push(`${a.value}: ${b.value}`)
}
readerA.releaseLock()
readerB.releaseLock()

After — streamfu:

import { list, map, zip } from "@sgmonda/streamfu"

const leaderboard = await list(map(zip(namesStream, scoresStream), ([name, score]) => `${name}: ${score}`))

zip stops as soon as any input is exhausted — the shortest stream sets the output length. Need to run to the longest instead, padding the streams that finish early? Use zipLongest:

import { list, zipLongest } from "@sgmonda/streamfu"

// names: ["Ada", "Lin"], scores: [90, 80, 70]
await list(zipLongest(names, scores)) // [["Ada", 90], ["Lin", 80], [undefined, 70]]
await list(zipLongest(names, scores, { fillValue: "?" })) // [["Ada", 90], ["Lin", 80], ["?", 70]]
7. Consume a stream twice — Get both the sum and the max from the same stream

Before — native Web Streams:

const [copy1, copy2] = numbersStream.tee()

const reader1 = copy1.getReader()
let sum = 0
while (true) {
  const { done, value } = await reader1.read()
  if (done) break
  sum += value
}
reader1.releaseLock()

const reader2 = copy2.getReader()
let max = -Infinity
while (true) {
  const { done, value } = await reader2.read()
  if (done) break
  if (value > max) max = value
}
reader2.releaseLock()

After — streamfu:

import { branch, reduce } from "@sgmonda/streamfu"

const [forSum, forMax] = branch(numbersStream, 2)

const [sum, max] = await Promise.all([
  reduce(forSum, (a, b) => a + b, 0),
  reduce(forMax, (a, b) => (b > a ? b : a), -Infinity),
])
8. Flatten paginated results — Expand arrays of items into individual chunks

Before — native Web Streams:

// Each chunk is an array like [item1, item2, item3] from a paginated API
const flatten = new TransformStream({
  transform(page, ctrl) {
    for (const item of page) ctrl.enqueue(item)
  },
})
const label = new TransformStream({
  transform(item, ctrl) {
    ctrl.enqueue(`#${item.id}: ${item.title}`)
  },
})

const reader = pagesStream.pipeThrough(flatten).pipeThrough(label).getReader()
const items = []
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  items.push(value)
}
reader.releaseLock()

After — streamfu:

import { flatMap, list, map } from "@sgmonda/streamfu"

const items = await list(map(flatMap(pagesStream, (page) => page), (item) => `#${item.id}: ${item.title}`))
9. Quick queries on a stream — Answer five questions about the data without five manual loops

Before — native Web Streams:

// Need 5 copies — tee() only gives 2, so we chain:
const [s1, rest1] = dataStream.tee()
const [s2, rest2] = rest1.tee()
const [s3, rest3] = rest2.tee()
const [s4, s5] = rest3.tee()

// Does it contain 42?
let hasFortyTwo = false
const r1 = s1.getReader()
while (true) {
  const { done, value } = await r1.read()
  if (done) break
  if (value === 42) {
    hasFortyTwo = true
    break
  }
}
r1.releaseLock()

// Are all values positive?
let allPositive = true
const r2 = s2.getReader()
while (true) {
  const { done, value } = await r2.read()
  if (done) break
  if (value <= 0) {
    allPositive = false
    break
  }
}
r2.releaseLock()

// Is any value greater than 100?
let anyOver100 = false
const r3 = s3.getReader()
while (true) {
  const { done, value } = await r3.read()
  if (done) break
  if (value > 100) {
    anyOver100 = true
    break
  }
}
r3.releaseLock()

// Where is 7?
let indexOf7 = -1
let idx = 0
const r4 = s4.getReader()
while (true) {
  const { done, value } = await r4.read()
  if (done) break
  if (value === 7) {
    indexOf7 = idx
    break
  }
  idx++
}
r4.releaseLock()

// What's the third element?
let third = undefined
let count = 0
const r5 = s5.getReader()
while (true) {
  const { done, value } = await r5.read()
  if (done) break
  if (count === 2) {
    third = value
    break
  }
  count++
}
r5.releaseLock()

After — streamfu:

import { at, branch, every, includes, indexOf, some } from "@sgmonda/streamfu"

const [s1, s2, s3, s4, s5] = branch(dataStream, 5)

const [hasFortyTwo, allPositive, anyOver100, indexOf7, third] = await Promise.all([
  includes(s1, 42),
  every(s2, (n) => n > 0),
  some(s3, (n) => n > 100),
  indexOf(s4, 7),
  at(s5, 2),
])
10. Generate, splice, and process — Create a range, replace elements, and log results

Before — native Web Streams:

// Generate 1..10 manually
const numbers = new ReadableStream({
  start(ctrl) {
    for (let i = 1; i <= 10; i++) ctrl.enqueue(i)
    ctrl.close()
  },
})

// Splice: remove 3 items at index 3, insert 99 and 100
let idx = 0
const removed = 3
const spliceTransform = new TransformStream({
  transform(chunk, ctrl) {
    if (idx === 3 + removed) {
      ctrl.enqueue(99)
      ctrl.enqueue(100)
    }
    if (idx < 3 || idx >= 3 + removed) {
      ctrl.enqueue(chunk)
    }
    idx++
  },
  flush(ctrl) {
    if (idx <= 3 + removed) {
      ctrl.enqueue(99)
      ctrl.enqueue(100)
    }
  },
})

// Double each value
const doubleTransform = new TransformStream({
  transform(n, ctrl) {
    ctrl.enqueue(n * 2)
  },
})

const reader = numbers.pipeThrough(spliceTransform).pipeThrough(doubleTransform).getReader()
while (true) {
  const { done, value } = await reader.read()
  if (done) break
  console.log(value)
}
reader.releaseLock()

After — streamfu:

import { forEach, map, pipe, range, splice } from "@sgmonda/streamfu"

await forEach(
  pipe(
    range(1, 10),
    splice(3, 3, 99, 100),
    map((n) => n * 2),
  ),
  (value) => console.log(value),
)

Design principles

  • Functional & pure — No side effects, no mutations. Every operation returns a new stream.
  • Familiar API — Mirrors Array.prototype methods. Zero learning curve.
  • Universal — Built on the Web Streams API. Works in Node.js, Deno, Bun, and browsers.
  • Scoped — Data pipelines, not event systems. See What streamfu is — and isn't.
  • Type-safe — Full TypeScript support with precise generics.
  • Tested — 100% code coverage. Every function, every edge case.

Error handling

One of the trickiest parts of working with streams is error handling. The traditional approach relies on listening to events, which splits your logic across multiple callbacks and makes the control flow hard to follow:

// ❌ Event-based error handling: scattered logic, easy to forget a listener
stream.on("data", (chunk) => {/* process chunk */})
stream.on("error", (err) => {/* handle error */})
stream.on("end", () => {/* cleanup */})

This pattern has several problems:

  • Error handling is disconnected from the processing logic
  • Forgetting the error listener can cause unhandled exceptions that crash your process
  • Coordinating end and error to know when the stream is truly done requires extra state
  • It doesn't compose well with async/await code

In streamfu, errors propagate automatically through chained operations. If a map() transformer or a filter() predicate throws, the error bubbles up and rejects the promise returned by any consuming operation (list(), reduce(), every(), etc.). This means you can use a standard try/catch block:

// ✅ Errors propagate through the entire chain
const stream = pipe(
  createReadable(data),
  map(transformFn), // if this throws...
  filter(predicateFn),
)

try {
  const results = await list(stream) // ...the error rejects here
} catch (err) {
  console.error("Something failed:", err)
}

No special error listeners, no extra plumbing. Errors flow naturally through map(), filter(), and any other chained transformation, all the way to whichever consumer ends the pipeline.


Working with Node.js streams

streamfu operates on Web Streams (ReadableStream, WritableStream, TransformStream). On the Node side, most APIs hand you a Node Readable instead — fs.createReadStream, multer's file upload, TypeORM's queryRunner.stream(), HTTP request bodies, and so on.

Bridge with the standard Readable.toWeb() helper (Node 17+, stable since 18.7):

import { Readable } from "node:stream"
import { filter, list, map, pipe } from "@sgmonda/streamfu"

const nodeReadable = fs.createReadStream("./users.csv")
const webStream = Readable.toWeb(nodeReadable) as ReadableStream<Uint8Array>

From there it's plain streamfu. End-to-end CSV ingest example:

import { Readable } from "node:stream"
import fs from "node:fs"
import { csvLines, filter, forEach, lines, map, pipe } from "@sgmonda/streamfu"

const file = Readable.toWeb(fs.createReadStream("./users.csv")) as ReadableStream<Uint8Array>

await forEach(
  pipe(
    file,
    csvLines, // ReadableStream<string>
    map((line: string) => line.split(",")), // ReadableStream<string[]>
    filter((cols) => cols[1] === "active"),
  ),
  (cols) => insertUser(cols),
)

Tips:

  • The cast as ReadableStream<T> after Readable.toWeb() is unavoidable today — Node's types return a wider ReadableStream<any>. It's safe as long as the source you're wrapping really produces T chunks.

  • TypeORM queryRunner.stream() returns a Node Readable backed by a database connection. Make sure to release the QueryRunner after the stream ends or errors — failing to do so exhausts the pool. A small wrapper pattern:

    const safeQueryRunner = async <T>(em: EntityManager, sql: string): Promise<ReadableStream<T>> => {
      const qr = em.connection.createQueryRunner()
      try {
        const nodeStream = await qr.stream(sql)
        const release = async () => {
          if (!qr.isReleased) await qr.release()
        }
        nodeStream.on("end", release).on("error", release).on("close", release)
        return Readable.toWeb(nodeStream) as ReadableStream<T>
      } catch (e) {
        if (!qr.isReleased) await qr.release()
        throw e
      }
    }
  • toBuffer() returns a Uint8Array. In Node, Buffer extends Uint8Array, so the result is already usable wherever a Buffer is expected; call Buffer.from(bytes) if you want the explicit type.

Contributing

Contributions welcome! Fork the repo, make your changes, and submit a PR.

deno task test

Requirements:

Publishing

Published to JSR automatically via GitHub CI when version changes in deno.json on main.


MIT License · Made with care by @sgmonda and contributors

About

Functional stream utilities for JavaScript & TypeScript

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Contributors