diff --git a/packages/durable-stream-db-collection/README.md b/packages/durable-stream-db-collection/README.md new file mode 100644 index 000000000..f87c95d8e --- /dev/null +++ b/packages/durable-stream-db-collection/README.md @@ -0,0 +1,240 @@ +# @tanstack/durable-stream-db-collection + +TanStack DB collection for [Durable Streams](https://github.com/durable-streams/durable-streams). + +## Installation + +```bash +npm install @tanstack/durable-stream-db-collection @tanstack/db @durable-streams/client +``` + +> **Note:** `@durable-streams/client` is a peer dependency. Install a compatible Durable Streams client that implements the [Durable Streams protocol](https://github.com/durable-streams/durable-streams). + +## Quick Start + +```typescript +import { createCollection } from '@tanstack/db' +import { durableStreamCollectionOptions } from '@tanstack/durable-stream-db-collection' + +const eventsCollection = createCollection( + durableStreamCollectionOptions({ + url: 'https://api.example.com/v1/stream/events', + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + }) +) +``` + +## Key Concepts + +### Batch-Level Offsets + +Durable Streams uses batch-level offsets, not row-level. When resuming from an offset, the entire batch may replay. This package handles deduplication automatically using your `getDeduplicationKey` function. + +### JSON Mode Requirement + +This package requires Durable Streams servers running in **JSON mode** (`content-type: application/json`). In JSON mode: + +- Each append is a valid JSON value +- Reads return parsed JSON arrays +- Message boundaries are preserved + +### Read-Only + +This collection is read-only. To write data, use your stream's append endpoint directly or through a wrapper protocol. + +### Offset Persistence + +Offsets are automatically persisted to localStorage (configurable) for cross-session resumption. + +## API Reference + +### `durableStreamCollectionOptions` + +Creates TanStack DB collection configuration for a Durable Stream. + +```typescript +interface DurableStreamCollectionConfig { + // Required + url: string // URL of the Durable Stream endpoint + getKey: (row: TRow) => string | number // Extract primary key from row + getDeduplicationKey: (row: TRow) => string // Extract deduplication key from row + + // Optional + id?: string // Collection ID (auto-generated from URL if not provided) + schema?: StandardSchemaV1 // Standard Schema for validation + initialOffset?: string // Initial offset (default: '-1' for beginning) + headers?: Record // HTTP headers for requests + reconnectDelay?: number // Delay before reconnecting after error (default: 5000ms) + liveMode?: 'long-poll' | 'sse' // Live mode (default: 'long-poll') + storageKey?: string | false // Storage key prefix (default: 'durable-stream') + storage?: OffsetStorage // Custom storage adapter +} +``` + +### Output Type + +Each row from the collection includes the batch offset: + +```typescript +type RowWithOffset = TRow & { offset: string } +``` + +## Usage Examples + +### Basic Usage + +```typescript +import { createCollection } from '@tanstack/db' +import { durableStreamCollectionOptions } from '@tanstack/durable-stream-db-collection' + +interface Event { + id: string + type: string + payload: unknown + timestamp: string +} + +const eventsCollection = createCollection( + durableStreamCollectionOptions({ + url: 'https://api.example.com/v1/stream/events', + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + }) +) + +// Preload the collection +await eventsCollection.preload() + +// Access data +const events = eventsCollection.toArray +console.log(`Loaded ${events.length} events`) +``` + +### With Schema Validation + +```typescript +import { z } from 'zod' +import { createCollection } from '@tanstack/db' +import { durableStreamCollectionOptions } from '@tanstack/durable-stream-db-collection' + +const eventSchema = z.object({ + id: z.string(), + type: z.string(), + payload: z.unknown(), + timestamp: z.string(), + seq: z.number(), +}) + +type Event = z.infer + +const eventsCollection = createCollection( + durableStreamCollectionOptions({ + url: 'https://api.example.com/v1/stream/events', + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + schema: eventSchema, + }) +) +``` + +### With Authentication + +```typescript +const eventsCollection = createCollection( + durableStreamCollectionOptions({ + url: 'https://api.example.com/v1/stream/events', + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + headers: { + 'Authorization': `Bearer ${token}`, + }, + }) +) +``` + +### Custom Storage Adapter + +```typescript +// For React Native with AsyncStorage +import AsyncStorage from '@react-native-async-storage/async-storage' + +const eventsCollection = createCollection( + durableStreamCollectionOptions({ + url: 'https://api.example.com/v1/stream/events', + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + storage: AsyncStorage, + }) +) +``` + +### Disable Offset Persistence + +```typescript +const eventsCollection = createCollection( + durableStreamCollectionOptions({ + url: 'https://api.example.com/v1/stream/events', + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + storageKey: false, // No persistence + }) +) +``` + +### With React + +```typescript +import { useLiveQuery } from '@tanstack/react-db' +import { eq } from '@tanstack/db' + +function EventList() { + const { data: events } = useLiveQuery((q) => + q.from({ event: eventsCollection }) + .where(({ event }) => eq(event.type, 'user.created')) + .orderBy(({ event }) => event.timestamp, 'desc') + ) + + return ( +
    + {events.map(event => ( +
  • + {event.type}: {JSON.stringify(event.payload)} +
  • + ))} +
+ ) +} +``` + +## Deduplication Strategy + +When resuming from a batch offset, Durable Streams may replay the entire batch. The `getDeduplicationKey` function is critical for filtering out already-seen rows. + +**Common patterns:** + +```typescript +// Rows with unique IDs +getDeduplicationKey: (row) => row.id + +// Rows with sequence numbers per entity +getDeduplicationKey: (row) => `${row.entityId}:${row.seq}` + +// Composite keys +getDeduplicationKey: (row) => `${row.timestamp}:${row.id}` +``` + +The deduplication key must be: +- **Unique** within the stream +- **Deterministic** - the same row always produces the same key + +## Reconnection Behavior + +On error, the collection will: +1. Mark as ready (if not already) to avoid blocking UI +2. Wait for `reconnectDelay` milliseconds (default: 5000) +3. Reconnect and resume from the last successful offset + +## License + +MIT diff --git a/packages/durable-stream-db-collection/package.json b/packages/durable-stream-db-collection/package.json new file mode 100644 index 000000000..a41abb238 --- /dev/null +++ b/packages/durable-stream-db-collection/package.json @@ -0,0 +1,65 @@ +{ + "name": "@tanstack/durable-stream-db-collection", + "description": "Durable Streams collection for TanStack DB", + "version": "0.0.1", + "dependencies": { + "@standard-schema/spec": "^1.0.0", + "@tanstack/db": "workspace:*" + }, + "peerDependencies": { + "@durable-streams/client": ">=0.1.0" + }, + "peerDependenciesMeta": { + "@durable-streams/client": { + "optional": true + } + }, + "devDependencies": { + "@vitest/coverage-istanbul": "^3.2.4", + "zod": "^3.23.0" + }, + "exports": { + ".": { + "import": { + "types": "./dist/esm/index.d.ts", + "default": "./dist/esm/index.js" + }, + "require": { + "types": "./dist/cjs/index.d.cts", + "default": "./dist/cjs/index.cjs" + } + }, + "./package.json": "./package.json" + }, + "files": [ + "dist", + "src" + ], + "main": "dist/cjs/index.cjs", + "module": "dist/esm/index.js", + "packageManager": "pnpm@10.24.0", + "author": "TanStack", + "license": "MIT", + "repository": { + "type": "git", + "url": "https://github.com/TanStack/db.git", + "directory": "packages/durable-stream-db-collection" + }, + "homepage": "https://tanstack.com/db", + "keywords": [ + "durable-streams", + "streaming", + "sync", + "optimistic", + "typescript" + ], + "scripts": { + "build": "vite build", + "dev": "vite build --watch", + "lint": "eslint . --fix", + "test": "npx vitest run" + }, + "sideEffects": false, + "type": "module", + "types": "dist/esm/index.d.ts" +} diff --git a/packages/durable-stream-db-collection/src/collection.ts b/packages/durable-stream-db-collection/src/collection.ts new file mode 100644 index 000000000..e5b0ecd4c --- /dev/null +++ b/packages/durable-stream-db-collection/src/collection.ts @@ -0,0 +1,194 @@ +import { DurableStream } from '@durable-streams/client' +import { loadOffset, saveOffset } from './offset-storage' +import type { CollectionConfig, SyncConfig } from '@tanstack/db' +import type { StandardSchemaV1 } from '@standard-schema/spec' +import type { + DurableStreamCollectionConfig, + DurableStreamResult, + RowWithOffset, +} from './types' + +/** + * Helper type to extract the output type from a standard schema + */ +type InferSchemaOutput = T extends StandardSchemaV1 + ? StandardSchemaV1.InferOutput extends object + ? StandardSchemaV1.InferOutput + : Record + : Record + +/** + * Creates Durable Stream collection options for use with a standard Collection. + * + * This is a read-only collection that syncs data from a Durable Streams server + * in JSON mode. Each row is annotated with the batch offset for tracking. + * + * @template TRow - The type of items in the collection + * @param config - Configuration options for the Durable Stream collection + * @returns Collection configuration compatible with TanStack DB createCollection() + * + * @example + * ```typescript + * import { createCollection } from '@tanstack/db' + * import { durableStreamCollectionOptions } from '@tanstack/durable-stream-db-collection' + * + * const eventsCollection = createCollection( + * durableStreamCollectionOptions({ + * url: 'https://api.example.com/v1/stream/events', + * getKey: (row) => row.id, + * getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + * }) + * ) + * ``` + */ + +// Overload for when schema is provided +export function durableStreamCollectionOptions< + T extends StandardSchemaV1, + TRow extends object = InferSchemaOutput, +>( + config: DurableStreamCollectionConfig & { + schema: T + }, +): Omit, string | number, T>, `utils`> & { + id: string + schema: T +} + +// Overload for when no schema is provided +export function durableStreamCollectionOptions( + config: DurableStreamCollectionConfig & { + schema?: never + }, +): Omit, string | number>, `utils`> & { + id: string + schema?: never +} + +// Implementation +export function durableStreamCollectionOptions( + config: DurableStreamCollectionConfig, +): Omit< + CollectionConfig, string | number, any>, + `utils` +> & { + id: string + schema?: any +} { + const collectionId = config.id ?? `durable-stream:${config.url}` + + const sync: SyncConfig>[`sync`] = (params) => { + const { begin, write, commit, markReady } = params + + let aborted = false + + // Track seen deduplication keys to filter replayed rows + const seenKeys = new Set() + + const syncLoop = async () => { + let isFirstBatch = true + + // Load persisted offset or use initial offset + const persistedOffset = await loadOffset(config) + let currentOffset = persistedOffset ?? config.initialOffset ?? `-1` + + // Create the Durable Stream client + const stream = new DurableStream({ + url: config.url, + headers: config.headers, + }) + + try { + const followOptions = { + offset: currentOffset, + live: config.liveMode ?? `long-poll`, + } + + for await (const result of stream.follow( + followOptions, + ) as AsyncIterable>) { + if (aborted) break + + // In JSON mode, result.data is the parsed array + const rows = result.data + + // Only start a transaction if we have rows to process + if (rows.length > 0) { + begin() + + for (const row of rows) { + // Deduplicate - batch offsets may cause replay on resume + const dedupKey = config.getDeduplicationKey(row) + if (seenKeys.has(dedupKey)) { + continue + } + seenKeys.add(dedupKey) + + // Attach batch offset to row + const rowWithOffset: RowWithOffset = { + ...row, + offset: result.offset, + } + + write({ + type: `insert`, + value: rowWithOffset, + }) + } + + commit() + } + + // Update offset for next iteration / persistence + currentOffset = result.offset + await saveOffset(config, currentOffset) + + // Mark ready after first successful batch + if (isFirstBatch) { + markReady() + isFirstBatch = false + } + } + } catch (error) { + console.error(`Durable stream sync error:`, error) + + // Ensure markReady is called even on error so UI doesn't hang + if (isFirstBatch) { + markReady() + } + + // Reconnect after delay if not aborted + if (!aborted) { + const delay = config.reconnectDelay ?? 5000 + setTimeout(syncLoop, delay) + } + } + } + + // Start sync loop + syncLoop() + + // Return cleanup function + return { + cleanup: () => { + aborted = true + }, + } + } + + // Create the getKey function that extracts from RowWithOffset + const getKey = (row: RowWithOffset): string | number => { + // Extract the original row (without offset) for the user's getKey function + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const { offset: _offset, ...originalRow } = row + return config.getKey(originalRow as TRow) + } + + return { + id: collectionId, + schema: config.schema, + getKey, + sync: { sync }, + // No mutation handlers - this is a read-only sync + } +} diff --git a/packages/durable-stream-db-collection/src/durable-streams-client.d.ts b/packages/durable-stream-db-collection/src/durable-streams-client.d.ts new file mode 100644 index 000000000..80538ec62 --- /dev/null +++ b/packages/durable-stream-db-collection/src/durable-streams-client.d.ts @@ -0,0 +1,90 @@ +/** + * Type declarations for @durable-streams/client + * + * This module provides client types for the Durable Streams protocol. + * See: https://github.com/durable-streams/durable-streams + */ + +declare module '@durable-streams/client' { + export interface DurableStreamOptions { + /** + * URL of the Durable Stream endpoint. + */ + url: string + + /** + * HTTP headers to include in requests. + */ + headers?: Record + } + + export interface FollowOptions { + /** + * The offset to start reading from. + * Use '-1' to read from the beginning. + */ + offset: string + + /** + * Live mode for following the stream. + * - 'long-poll': HTTP long-polling (default) + * - 'sse': Server-Sent Events + */ + live?: 'long-poll' | 'sse' + } + + export interface StreamResult { + /** + * The data from this batch. + * In JSON mode, this is an array of parsed JSON objects. + */ + data: TData + + /** + * The Stream-Next-Offset for this batch. + * Use this offset to resume from this point. + */ + offset: string + } + + export interface ReadOptions { + /** + * The offset to start reading from. + */ + offset?: string + } + + export interface ReadResult extends StreamResult {} + + /** + * Durable Streams client for reading from a Durable Stream. + * + * @example + * ```typescript + * const stream = new DurableStream({ url: 'https://api.example.com/v1/stream/events' }) + * + * // Read from a specific offset + * const result = await stream.read({ offset: '-1' }) + * console.log(result.data, result.offset) + * + * // Follow the stream live + * for await (const result of stream.follow({ offset: '-1', live: 'long-poll' })) { + * console.log(result.data, result.offset) + * } + * ``` + */ + export class DurableStream { + constructor(options: DurableStreamOptions) + + /** + * Read data from the stream starting at the given offset. + */ + read(options?: ReadOptions): Promise> + + /** + * Follow the stream from a given offset, yielding results as they arrive. + * This is an async iterator that yields results continuously. + */ + follow(options: FollowOptions): AsyncIterable> + } +} diff --git a/packages/durable-stream-db-collection/src/index.ts b/packages/durable-stream-db-collection/src/index.ts new file mode 100644 index 000000000..b1f6d3bdd --- /dev/null +++ b/packages/durable-stream-db-collection/src/index.ts @@ -0,0 +1,7 @@ +export { durableStreamCollectionOptions } from './collection' +export type { + DurableStreamCollectionConfig, + RowWithOffset, + LiveMode, + OffsetStorage, +} from './types' diff --git a/packages/durable-stream-db-collection/src/offset-storage.ts b/packages/durable-stream-db-collection/src/offset-storage.ts new file mode 100644 index 000000000..d22eba0e9 --- /dev/null +++ b/packages/durable-stream-db-collection/src/offset-storage.ts @@ -0,0 +1,123 @@ +import type { DurableStreamCollectionConfig, OffsetStorage } from './types' + +/** + * Get the storage key for persisting offset. + * @returns The storage key, or null if persistence is disabled. + */ +export function getStorageKey( + config: DurableStreamCollectionConfig, +): string | null { + if (config.storageKey === false) { + return null + } + const prefix = config.storageKey ?? `durable-stream` + return `${prefix}:${config.url}:offset` +} + +/** + * Get the default storage adapter. + * Returns localStorage if available, otherwise null. + */ +function getDefaultStorage(): OffsetStorage | null { + if (typeof localStorage !== `undefined`) { + return localStorage + } + return null +} + +/** + * Get the storage adapter to use. + * Returns the configured storage, or the default storage. + */ +function getStorage( + config: DurableStreamCollectionConfig, +): OffsetStorage | null { + return config.storage ?? getDefaultStorage() +} + +/** + * Load the persisted offset from storage. + * @returns The persisted offset, or null if not found or persistence is disabled. + */ +export async function loadOffset( + config: DurableStreamCollectionConfig, +): Promise { + const key = getStorageKey(config) + if (!key) { + return null + } + + const storage = getStorage(config) + if (!storage) { + return null + } + + try { + const result = storage.getItem(key) + // Handle both sync and async storage + if (result instanceof Promise) { + return (await result) ?? null + } + return result ?? null + } catch { + // Ignore storage errors (e.g., SecurityError in some browsers) + return null + } +} + +/** + * Save the offset to storage. + * Does nothing if persistence is disabled or storage is unavailable. + */ +export async function saveOffset( + config: DurableStreamCollectionConfig, + offset: string, +): Promise { + const key = getStorageKey(config) + if (!key) { + return + } + + const storage = getStorage(config) + if (!storage) { + return + } + + try { + const result = storage.setItem(key, offset) + // Handle both sync and async storage + if (result instanceof Promise) { + await result + } + } catch { + // Ignore storage errors (e.g., QuotaExceededError, SecurityError) + } +} + +/** + * Clear the persisted offset from storage. + * Useful for resetting sync state. + */ +export async function clearOffset( + config: DurableStreamCollectionConfig, +): Promise { + const key = getStorageKey(config) + if (!key) { + return + } + + const storage = getStorage(config) + if (!storage) { + return + } + + try { + // Use setItem with empty string as a fallback since not all storage adapters have removeItem + const result = storage.setItem(key, ``) + if (result instanceof Promise) { + await result + } + } catch { + // Ignore storage errors + } +} diff --git a/packages/durable-stream-db-collection/src/types.ts b/packages/durable-stream-db-collection/src/types.ts new file mode 100644 index 000000000..a25f92330 --- /dev/null +++ b/packages/durable-stream-db-collection/src/types.ts @@ -0,0 +1,173 @@ +import type { StandardSchemaV1 } from '@standard-schema/spec' + +/** + * Storage adapter interface for offset persistence. + * Compatible with localStorage, sessionStorage, AsyncStorage, etc. + */ +export interface OffsetStorage { + getItem: (key: string) => string | null | Promise + setItem: (key: string, value: string) => void | Promise +} + +/** + * Live mode options for following a Durable Stream. + */ +export type LiveMode = 'long-poll' | 'sse' + +/** + * Configuration interface for Durable Stream collection options. + * @template TRow - The type of items in the collection + */ +export interface DurableStreamCollectionConfig { + // ═══════════════════════════════════════════════════════════════════ + // Required + // ═══════════════════════════════════════════════════════════════════ + + /** + * URL of the Durable Stream endpoint. + * Must be a stream in JSON mode. + */ + url: string + + /** + * Extract a unique key from each row. + * Used as the collection's primary key for lookups and updates. + */ + getKey: (row: TRow) => string | number + + /** + * Extract a deduplication key from each row. + * Used to filter out replayed rows when resuming from a batch offset. + * + * This key must be unique within the stream and deterministic - + * the same row must always produce the same deduplication key. + * + * Common patterns: + * - `${row.id}` for rows with unique IDs + * - `${row.groupId}:${row.seq}` for rows with sequence numbers per group + */ + getDeduplicationKey: (row: TRow) => string + + // ═══════════════════════════════════════════════════════════════════ + // Optional + // ═══════════════════════════════════════════════════════════════════ + + /** + * Unique identifier for the collection. + * Auto-generated from URL if not provided. + */ + id?: string + + /** + * Schema for runtime validation and type inference. + * Must be a Standard Schema compatible schema (Zod, Valibot, etc.) + */ + schema?: StandardSchemaV1 + + /** + * Initial offset to start reading from. + * Use '-1' to read from the beginning. + * + * @default '-1' + */ + initialOffset?: string + + /** + * HTTP headers to include in stream requests. + * Useful for authentication tokens. + */ + headers?: Record + + /** + * Delay in milliseconds before reconnecting after an error. + * + * @default 5000 + */ + reconnectDelay?: number + + /** + * Live mode for following the stream. + * + * @default 'long-poll' + */ + liveMode?: LiveMode + + /** + * Storage key prefix for persisting offsets. + * Set to false to disable offset persistence. + * + * @default 'durable-stream' + */ + storageKey?: string | false + + /** + * Custom storage adapter for offset persistence. + * Defaults to localStorage in browsers. + */ + storage?: OffsetStorage +} + +/** + * Output row type includes the batch offset. + * Each row from a Durable Stream batch is annotated with the batch's offset. + */ +export type RowWithOffset = TRow & { offset: string } + +/** + * Result from a Durable Stream follow iteration. + * In JSON mode, data is the parsed array of rows. + */ +export interface DurableStreamResult { + /** + * The data from this batch. In JSON mode, this is an array of parsed JSON objects. + */ + data: Array + + /** + * The Stream-Next-Offset for this batch. + * Use this offset to resume from this point. + */ + offset: string +} + +/** + * Options for the DurableStream.follow() method. + */ +export interface FollowOptions { + /** + * The offset to start reading from. + * Use '-1' to read from the beginning. + */ + offset: string + + /** + * Live mode for following the stream. + */ + live?: LiveMode +} + +/** + * Interface for the Durable Streams client. + * This matches the @durable-streams/client package API. + */ +export interface DurableStreamClient { + /** + * Follow the stream from a given offset, yielding results as they arrive. + */ + follow(options: FollowOptions): AsyncIterable> +} + +/** + * Constructor options for DurableStream client. + */ +export interface DurableStreamOptions { + /** + * URL of the Durable Stream endpoint. + */ + url: string + + /** + * HTTP headers to include in requests. + */ + headers?: Record +} diff --git a/packages/durable-stream-db-collection/tests/collection.test-d.ts b/packages/durable-stream-db-collection/tests/collection.test-d.ts new file mode 100644 index 000000000..29360de13 --- /dev/null +++ b/packages/durable-stream-db-collection/tests/collection.test-d.ts @@ -0,0 +1,204 @@ +import { describe, expectTypeOf, it } from 'vitest' +import { createCollection } from '@tanstack/db' +import { durableStreamCollectionOptions } from '../src/collection' +import type { RowWithOffset } from '../src/types' +import { z } from 'zod' + +describe(`durableStreamCollectionOptions types`, () => { + it(`should infer row type from getKey function`, () => { + interface Event { + id: string + type: string + payload: unknown + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + }) + + // The collection should have RowWithOffset as the item type + const collection = createCollection(options) + + // Get should return RowWithOffset | undefined + const item = collection.get(`test`) + expectTypeOf(item).toEqualTypeOf | undefined>() + + if (item) { + expectTypeOf(item.id).toEqualTypeOf() + expectTypeOf(item.type).toEqualTypeOf() + expectTypeOf(item.payload).toEqualTypeOf() + expectTypeOf(item.offset).toEqualTypeOf() + } + }) + + it(`should infer row type from schema`, () => { + const eventSchema = z.object({ + id: z.string(), + type: z.string(), + timestamp: z.number(), + }) + + type Event = z.infer + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + schema: eventSchema, + }) + + const collection = createCollection(options) + const item = collection.get(`test`) + + if (item) { + expectTypeOf(item.id).toEqualTypeOf() + expectTypeOf(item.type).toEqualTypeOf() + expectTypeOf(item.timestamp).toEqualTypeOf() + expectTypeOf(item.offset).toEqualTypeOf() + } + }) + + it(`should allow string or number keys`, () => { + interface Event { + id: number + name: string + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, // number key + getDeduplicationKey: (row) => String(row.id), + }) + + const collection = createCollection(options) + + // Should accept number keys + const item = collection.get(123) + expectTypeOf(item).toEqualTypeOf | undefined>() + }) + + it(`should require getKey and getDeduplicationKey`, () => { + interface Event { + id: string + } + + // @ts-expect-error - missing required getKey + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getDeduplicationKey: (row) => row.id, + }) + + // @ts-expect-error - missing required getDeduplicationKey + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + }) + + // @ts-expect-error - missing required url + durableStreamCollectionOptions({ + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + }) + }) + + it(`should type headers correctly`, () => { + interface Event { + id: string + } + + // Should allow Record + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + headers: { + Authorization: `Bearer token`, + 'Content-Type': `application/json`, + }, + }) + }) + + it(`should type storage interface correctly`, () => { + interface Event { + id: string + } + + // Sync storage + const syncStorage = { + getItem: (key: string): string | null => null, + setItem: (key: string, value: string): void => {}, + } + + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + storage: syncStorage, + }) + + // Async storage + const asyncStorage = { + getItem: async (key: string): Promise => null, + setItem: async (key: string, value: string): Promise => {}, + } + + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + storage: asyncStorage, + }) + }) + + it(`should allow storageKey to be false or string`, () => { + interface Event { + id: string + } + + // String prefix + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + storageKey: `my-app`, + }) + + // Disabled + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + storageKey: false, + }) + }) + + it(`should type liveMode correctly`, () => { + interface Event { + id: string + } + + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + liveMode: `long-poll`, + }) + + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + liveMode: `sse`, + }) + + // @ts-expect-error - invalid live mode + durableStreamCollectionOptions({ + url: `http://example.com/stream`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => row.id, + liveMode: `invalid`, + }) + }) +}) diff --git a/packages/durable-stream-db-collection/tests/collection.test.ts b/packages/durable-stream-db-collection/tests/collection.test.ts new file mode 100644 index 000000000..7ccfcd4a9 --- /dev/null +++ b/packages/durable-stream-db-collection/tests/collection.test.ts @@ -0,0 +1,603 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { createCollection } from '@tanstack/db' +import { durableStreamCollectionOptions } from '../src/collection' +import type { DurableStreamResult, RowWithOffset } from '../src/types' + +// Test row type +interface TestRow { + id: string + name: string + seq: number +} + +// Mock controller for the follow iterator +interface MockFollowController { + emit: (result: DurableStreamResult) => void + complete: () => void + error: (err: Error) => void +} + +// Mock the @durable-streams/client module +let mockFollowController: MockFollowController | null = null +const mockFollow = vi.fn() + +vi.mock(`@durable-streams/client`, () => { + return { + DurableStream: vi.fn().mockImplementation(() => ({ + follow: mockFollow, + })), + } +}) + +// Helper to create an async iterator from a controller +function createMockFollowIterator(): AsyncIterable> { + const queue: Array> = [] + let resolveNext: ((value: IteratorResult>) => void) | null = null + let isDone = false + let error: Error | null = null + + mockFollowController = { + emit: (result) => { + if (resolveNext) { + resolveNext({ value: result, done: false }) + resolveNext = null + } else { + queue.push(result) + } + }, + complete: () => { + isDone = true + if (resolveNext) { + resolveNext({ value: undefined as any, done: true }) + resolveNext = null + } + }, + error: (err) => { + error = err + if (resolveNext) { + // We need to reject the promise, but we can't do that from here + // So we'll throw on next iteration + } + }, + } + + return { + [Symbol.asyncIterator](): AsyncIterator> { + return { + async next() { + if (error) { + throw error + } + if (queue.length > 0) { + return { value: queue.shift()!, done: false } + } + if (isDone) { + return { value: undefined as any, done: true } + } + return new Promise((resolve) => { + resolveNext = resolve + }) + }, + } + }, + } +} + +describe(`durableStreamCollectionOptions`, () => { + beforeEach(() => { + vi.clearAllMocks() + mockFollowController = null + + // Setup mock follow to return our controlled iterator + mockFollow.mockImplementation(() => createMockFollowIterator()) + }) + + it(`should create a collection with correct id from url`, () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + }) + + expect(options.id).toBe(`durable-stream:http://example.com/stream/events`) + }) + + it(`should use custom id when provided`, () => { + const options = durableStreamCollectionOptions({ + id: `my-custom-id`, + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + }) + + expect(options.id).toBe(`my-custom-id`) + }) + + it(`should sync data from stream and mark ready after first batch`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, // Disable persistence for test + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + // Wait for sync to start + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Emit first batch + mockFollowController?.emit({ + data: [ + { id: `1`, name: `Test 1`, seq: 0 }, + { id: `2`, name: `Test 2`, seq: 0 }, + ], + offset: `offset-1`, + }) + + // Wait for processing + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Collection should be ready and have data + expect(collection.isReady()).toBe(true) + expect(collection.size).toBe(2) + expect(collection.get(`1`)).toEqual({ + id: `1`, + name: `Test 1`, + seq: 0, + offset: `offset-1`, + }) + expect(collection.get(`2`)).toEqual({ + id: `2`, + name: `Test 2`, + seq: 0, + offset: `offset-1`, + }) + + await collection.cleanup() + }) + + it(`should attach batch offset to each row`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Emit batch with specific offset + mockFollowController?.emit({ + data: [{ id: `1`, name: `Test`, seq: 0 }], + offset: `batch-offset-123`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + const row = collection.get(`1`) as RowWithOffset + expect(row.offset).toBe(`batch-offset-123`) + + await collection.cleanup() + }) + + it(`should deduplicate replayed rows on resume`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // First batch with seq 0 and 1 + mockFollowController?.emit({ + data: [ + { id: `1`, name: `Test`, seq: 0 }, + { id: `1`, name: `Test`, seq: 1 }, + ], + offset: `offset-a`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(collection.size).toBe(2) + + // Second batch with seq 1 replayed and seq 2 new + mockFollowController?.emit({ + data: [ + { id: `1`, name: `Test`, seq: 1 }, // Replayed - should be deduplicated + { id: `1`, name: `Test`, seq: 2 }, // New + ], + offset: `offset-b`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Should have 3 items, not 4 (seq 1 deduplicated) + expect(collection.size).toBe(3) + + await collection.cleanup() + }) + + it(`should handle empty batches without starting transaction`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Emit empty batch + mockFollowController?.emit({ + data: [], + offset: `offset-empty`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Collection should be ready even with empty batch + expect(collection.isReady()).toBe(true) + expect(collection.size).toBe(0) + + await collection.cleanup() + }) + + it(`should handle multiple sequential batches`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // First batch + mockFollowController?.emit({ + data: [{ id: `1`, name: `Test 1`, seq: 0 }], + offset: `offset-1`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(collection.size).toBe(1) + + // Second batch + mockFollowController?.emit({ + data: [{ id: `2`, name: `Test 2`, seq: 0 }], + offset: `offset-2`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(collection.size).toBe(2) + + // Third batch + mockFollowController?.emit({ + data: [{ id: `3`, name: `Test 3`, seq: 0 }], + offset: `offset-3`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + expect(collection.size).toBe(3) + + await collection.cleanup() + }) + + it(`should use getKey function correctly with offset stripped`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + // getKey should work on the original row type + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + mockFollowController?.emit({ + data: [{ id: `test-id-123`, name: `Test`, seq: 0 }], + offset: `offset-1`, + }) + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Should be able to get by the original key + expect(collection.has(`test-id-123`)).toBe(true) + expect(collection.get(`test-id-123`)).toBeDefined() + + await collection.cleanup() + }) + + it(`should pass headers to DurableStream client`, () => { + durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + headers: { + Authorization: `Bearer test-token`, + 'X-Custom-Header': `custom-value`, + }, + storageKey: false, + }) + + // Check that DurableStream was instantiated with the headers + const { DurableStream } = require(`@durable-streams/client`) + + // Create collection to trigger the sync + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + headers: { + Authorization: `Bearer test-token`, + }, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + expect(DurableStream).toHaveBeenCalledWith({ + url: `http://example.com/stream/events`, + headers: { + Authorization: `Bearer test-token`, + }, + }) + + collection.cleanup() + }) +}) + +describe(`offset-storage`, () => { + it(`should load offset from storage on start`, async () => { + const mockStorage = { + getItem: vi.fn().mockReturnValue(`saved-offset-123`), + setItem: vi.fn(), + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storage: mockStorage, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + // Wait for async storage operations + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Should have loaded from storage + expect(mockStorage.getItem).toHaveBeenCalledWith( + `durable-stream:http://example.com/stream/events:offset`, + ) + + // follow should have been called with the saved offset + expect(mockFollow).toHaveBeenCalledWith({ + offset: `saved-offset-123`, + live: `long-poll`, + }) + + await collection.cleanup() + }) + + it(`should save offset to storage after each batch`, async () => { + const mockStorage = { + getItem: vi.fn().mockReturnValue(null), + setItem: vi.fn(), + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storage: mockStorage, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Emit batch + mockFollowController?.emit({ + data: [{ id: `1`, name: `Test`, seq: 0 }], + offset: `new-offset-456`, + }) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Should have saved the new offset + expect(mockStorage.setItem).toHaveBeenCalledWith( + `durable-stream:http://example.com/stream/events:offset`, + `new-offset-456`, + ) + + await collection.cleanup() + }) + + it(`should use custom storage key prefix`, async () => { + const mockStorage = { + getItem: vi.fn().mockReturnValue(null), + setItem: vi.fn(), + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storage: mockStorage, + storageKey: `my-app`, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + // Should use custom prefix + expect(mockStorage.getItem).toHaveBeenCalledWith( + `my-app:http://example.com/stream/events:offset`, + ) + + await collection.cleanup() + }) + + it(`should not persist when storageKey is false`, async () => { + const mockStorage = { + getItem: vi.fn().mockReturnValue(null), + setItem: vi.fn(), + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storage: mockStorage, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 10)) + + mockFollowController?.emit({ + data: [{ id: `1`, name: `Test`, seq: 0 }], + offset: `some-offset`, + }) + + await new Promise((resolve) => setTimeout(resolve, 50)) + + // Should not have called storage at all + expect(mockStorage.getItem).not.toHaveBeenCalled() + expect(mockStorage.setItem).not.toHaveBeenCalled() + + await collection.cleanup() + }) + + it(`should use initialOffset when no persisted offset exists`, async () => { + const mockStorage = { + getItem: vi.fn().mockReturnValue(null), + setItem: vi.fn(), + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storage: mockStorage, + initialOffset: `custom-initial-offset`, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 50)) + + // follow should have been called with the initial offset + expect(mockFollow).toHaveBeenCalledWith({ + offset: `custom-initial-offset`, + live: `long-poll`, + }) + + await collection.cleanup() + }) + + it(`should default to -1 offset when no persisted or initial offset`, async () => { + const mockStorage = { + getItem: vi.fn().mockReturnValue(null), + setItem: vi.fn(), + } + + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storage: mockStorage, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 50)) + + // follow should have been called with -1 (default) + expect(mockFollow).toHaveBeenCalledWith({ + offset: `-1`, + live: `long-poll`, + }) + + await collection.cleanup() + }) +}) + +describe(`live mode configuration`, () => { + it(`should use long-poll mode by default`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockFollow).toHaveBeenCalledWith( + expect.objectContaining({ + live: `long-poll`, + }), + ) + + await collection.cleanup() + }) + + it(`should use sse mode when configured`, async () => { + const options = durableStreamCollectionOptions({ + url: `http://example.com/stream/events`, + getKey: (row) => row.id, + getDeduplicationKey: (row) => `${row.id}:${row.seq}`, + liveMode: `sse`, + storageKey: false, + }) + + const collection = createCollection(options) + collection.startSyncImmediate() + + await new Promise((resolve) => setTimeout(resolve, 50)) + + expect(mockFollow).toHaveBeenCalledWith( + expect.objectContaining({ + live: `sse`, + }), + ) + + await collection.cleanup() + }) +}) diff --git a/packages/durable-stream-db-collection/tsconfig.json b/packages/durable-stream-db-collection/tsconfig.json new file mode 100644 index 000000000..ad23edbc0 --- /dev/null +++ b/packages/durable-stream-db-collection/tsconfig.json @@ -0,0 +1,20 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "target": "ES2020", + "module": "ESNext", + "moduleResolution": "Bundler", + "declaration": true, + "outDir": "dist", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "paths": { + "@tanstack/db-ivm": ["../db-ivm/src"], + "@tanstack/db": ["../db/src"] + } + }, + "include": ["src", "tests", "vite.config.ts"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/durable-stream-db-collection/vite.config.ts b/packages/durable-stream-db-collection/vite.config.ts new file mode 100644 index 000000000..6b9e3c255 --- /dev/null +++ b/packages/durable-stream-db-collection/vite.config.ts @@ -0,0 +1,24 @@ +import { defineConfig, mergeConfig } from 'vitest/config' +import { tanstackViteConfig } from '@tanstack/vite-config' +import packageJson from './package.json' + +const config = defineConfig({ + test: { + name: packageJson.name, + include: [`tests/**/*.test.ts`], + environment: `jsdom`, + coverage: { enabled: true, provider: `istanbul`, include: [`src/**/*`] }, + typecheck: { + enabled: true, + include: [`tests/**/*.test.ts`, `tests/**/*.test-d.ts`], + }, + }, +}) + +export default mergeConfig( + config, + tanstackViteConfig({ + entry: `./src/index.ts`, + srcDir: `./src`, + }), +)