Skip to content

A Web Worker pool with bounded concurrency, plus a companion package that accelerates zarrita codec operations on Web Workers.

License

Notifications You must be signed in to change notification settings

fideus-labs/worker-pool

Repository files navigation

@fideus-labs/worker-pool

CI

A Web Worker pool with bounded concurrency, plus a companion @fideus-labs/fizarrita package that accelerates zarrita codec operations on Web Workers.

Features

  • Bounded concurrency — at most poolSize workers run simultaneously.
  • Worker recycling — workers are reused (LIFO) across tasks instead of being re-created.
  • ChunkQueue interfaceadd() + onIdle(), compatible with zarrita.js and p-queue patterns.
  • Batch interfacerunTasks() with progress reporting and cancellation.
  • Zero runtime dependencies.

Installation

npm add @fideus-labs/worker-pool

Usage

Task function contract

Every task function receives an available Worker (or null when the pool needs a new worker created) and must return an object with the worker to recycle and the result:

type WorkerPoolTask<T> = (
  worker: Worker | null
) => Promise<{ worker: Worker; result: T }>

ChunkQueue interface (add / onIdle)

import { WorkerPool } from '@fideus-labs/worker-pool'

const workerUrl = new URL('./my-worker.js', import.meta.url).href

function createTask(input: number) {
  return (worker: Worker | null) => {
    const w = worker ?? new Worker(workerUrl, { type: 'module' })
    return new Promise<{ worker: Worker; result: number }>((resolve) => {
      w.onmessage = (e) => resolve({ worker: w, result: e.data })
      w.postMessage(input)
    })
  }
}

const pool = new WorkerPool(4) // 4 concurrent workers

pool.add(createTask(1))
pool.add(createTask(2))
pool.add(createTask(3))

const results = await pool.onIdle<number>()
// results: [result1, result2, result3] — in add() order

pool.terminateWorkers()

Batch interface (runTasks)

Submit an array of tasks at once with optional progress reporting and cancellation:

const pool = new WorkerPool(2)

const tasks = inputs.map((input) => createTask(input))

const { promise, runId } = pool.runTasks(tasks, (completed, total) => {
  console.log(`${completed}/${total}`)
})

// Cancel if needed:
// pool.cancel(runId)

const results = await promise
pool.terminateWorkers()

API

new WorkerPool(poolSize: number)

Create a pool with at most poolSize concurrent workers.

pool.add<T>(fn: WorkerPoolTask<T>): void

Enqueue a task. Tasks are started when onIdle() is called.

pool.onIdle<T>(): Promise<T[]>

Execute all enqueued tasks and wait for completion. Returns results in the order tasks were added.

pool.runTasks<T>(taskFns, progressCallback?): { promise, runId }

Submit a batch of tasks. The promise resolves with ordered results. The optional progressCallback is invoked as (completedTasks: number, totalTasks: number) => void after each task completes.

pool.cancel(runId: number): void

Cancel a pending runTasks batch. The promise rejects with 'Remaining tasks canceled'.

pool.terminateWorkers(): void

Terminate all idle workers. The pool can still be used after this — new workers will be created as needed.


zarrita.js Integration

The @fideus-labs/fizarrita package provides getWorker and setWorker as drop-in replacements for zarrita's get and set, offloading codec encode/decode to Web Workers via the worker pool.

Installation

pnpm add @fideus-labs/fizarrita @fideus-labs/worker-pool zarrita

Basic usage

import { WorkerPool } from '@fideus-labs/worker-pool'
import { getWorker, setWorker } from '@fideus-labs/fizarrita'
import * as zarr from 'zarrita'

const pool = new WorkerPool(4)

// Open an array
const store = new zarr.FetchStore('https://example.com/data.zarr')
const arr = await zarr.open(store, { kind: 'array' })

// Read with codec decode offloaded to workers
const chunk = await getWorker(arr, null, { pool })

// Write with codec encode offloaded to workers
await setWorker(arr, null, chunk, { pool })

pool.terminateWorkers()

SharedArrayBuffer support

Both getWorker and setWorker support a useSharedArrayBuffer option for additional performance:

// Read — output allocated on SharedArrayBuffer, workers decode directly
// into shared memory (eliminates one transfer + one copy per chunk)
const chunk = await getWorker(arr, null, {
  pool,
  useSharedArrayBuffer: true,
})

// chunk.data.buffer instanceof SharedArrayBuffer === true
// The chunk can be shared with other workers without copying.

// Write — intermediate buffers use SharedArrayBuffer for zero-transfer
// sharing between main thread and codec workers
await setWorker(arr, null, chunk, {
  pool,
  useSharedArrayBuffer: true,
})

getWorker with SAB:

  • Output TypedArray is backed by SharedArrayBuffer
  • Codec workers decode chunks AND write directly into the shared output buffer via the decode_into message protocol
  • Eliminates 1 ArrayBuffer transfer (worker to main) and 1 main-thread set_from_chunk copy per chunk
  • Fill-value chunks are still handled on the main thread

setWorker with SAB:

  • Intermediate chunk buffers for partial updates use SharedArrayBuffer
  • Reduces ArrayBuffer transfers between main thread and codec workers during the decode-modify-encode cycle

COOP/COEP headers

SharedArrayBuffer requires the page to be served with these HTTP headers:

Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: require-corp

If these headers are missing, useSharedArrayBuffer: true will throw with a descriptive error message.

Vite example:

// vite.config.ts
export default defineConfig({
  server: {
    headers: {
      'Cross-Origin-Opener-Policy': 'same-origin',
      'Cross-Origin-Embedder-Policy': 'require-corp',
    },
  },
})

getWorker options

Option Type Description
pool WorkerPool Required. The worker pool to use.
workerUrl string | URL URL of the codec worker script. Uses built-in default if omitted.
opts StoreOpts Pass-through options for the store's get method (e.g., RequestInit).
useSharedArrayBuffer boolean Allocate output on SharedArrayBuffer with decode-into-shared optimization.

setWorker options

Option Type Description
pool WorkerPool Required. The worker pool to use.
workerUrl string | URL URL of the codec worker script. Uses built-in default if omitted.
useSharedArrayBuffer boolean Use SharedArrayBuffer for intermediate chunk buffers during partial updates.

Worker message protocol

The codec worker handles four message types:

Request Response Description
init init_ok Register codec metadata (sent once per worker per unique array config)
decode decoded Decode raw bytes, transfer decoded ArrayBuffer back
decode_into decode_into_ok Decode raw bytes and write directly into SharedArrayBuffer output
encode encoded Encode chunk data, transfer encoded bytes back

Benchmark

Live Benchmark — try it in your browser.

The repository includes a benchmark app that compares vanilla zarrita get/set with getWorker/setWorker (with and without SharedArrayBuffer):

pnpm bench
# Opens at http://localhost:5174

The benchmark supports both synthetic in-memory arrays and remote OME-Zarr datasets from AWS S3. The live version is deployed to GitHub Pages on every push to main.

Development

pnpm install
pnpm dev           # Start test app dev server (port 5173)
pnpm bench         # Start benchmark app (port 5174)
pnpm test          # Run Playwright browser tests
pnpm test:ui       # Interactive Playwright UI

License

MIT

About

A Web Worker pool with bounded concurrency, plus a companion package that accelerates zarrita codec operations on Web Workers.

Topics

Resources

License

Stars

Watchers

Forks

Contributors