diff --git a/packages/react-hooks/src/react-hooks.tsx b/packages/react-hooks/src/react-hooks.tsx index c795ce9cbb..7c6fab9c18 100644 --- a/packages/react-hooks/src/react-hooks.tsx +++ b/packages/react-hooks/src/react-hooks.tsx @@ -13,13 +13,43 @@ type UnknownShapeStream = ShapeStream> const streamCache = new Map() const shapeCache = new Map() +const refCountCache = new Map() + +function increaseStreamRef(shapeStream: UnknownShapeStream): void { + const count = (refCountCache.get(shapeStream) ?? 0) + 1 + refCountCache.set(shapeStream, count) + + if (count === 1) { + shapeStream.resume() + } +} + +function decreaseStreamRef(shapeStream: UnknownShapeStream): void { + if (!refCountCache.has(shapeStream)) return + + const count = refCountCache.get(shapeStream)! - 1 + refCountCache.set(shapeStream, count) + + if (count === 0) { + shapeStream.pause() + } +} export async function preloadShape = Row>( options: ShapeStreamOptions> ): Promise> { const shapeStream = getShapeStream(options) const shape = getShape(shapeStream) - await shape.rows + + shape.start() + increaseStreamRef(shapeStream) + + try { + await shape.rows + } finally { + decreaseStreamRef(shapeStream) + } + return shape } @@ -33,7 +63,6 @@ function sortObjectKeys(obj: any): any { return Object.keys(obj) .sort() - .reduce>((sorted, key) => { sorted[key] = sortObjectKeys(obj[key]) return sorted @@ -82,7 +111,7 @@ export function getShape>( shapeCache.delete(shapeStream) } - const newShape = new Shape(shapeStream) + const newShape = new Shape(shapeStream, { autoStart: false }) shapeCache.set(shapeStream, newShape) // Return the created shape @@ -159,6 +188,7 @@ function identity(arg: T): T { interface UseShapeOptions, Selection> extends ShapeStreamOptions> { selector?: (value: UseShapeResult) => Selection + enabled?: boolean } export function useShape< @@ -166,6 +196,7 @@ export function useShape< Selection = UseShapeResult, >({ selector = identity as (arg: UseShapeResult) => Selection, + enabled = true, ...options }: UseShapeOptions): Selection { const shapeStream = getShapeStream( @@ -182,6 +213,15 @@ export function useShape< } const subscribe = (onStoreChange: () => void) => { + if (!enabled) { + return () => { + // noop + } + } + + shape.start() + increaseStreamRef(shapeStream) + // check if shape has changed between the initial snapshot // and subscribing, as there are no guarantees that the // two will occur synchronously with each other @@ -191,10 +231,15 @@ export function useShape< onStoreChange() } - return shapeSubscribe(shape, () => { + const unsubscribe = shapeSubscribe(shape, () => { latestShapeData = parseShapeData(shape) onStoreChange() }) + + return () => { + unsubscribe() + decreaseStreamRef(shapeStream) + } } return () => { @@ -205,7 +250,7 @@ export function useShape< selector ) } - }, [shape, selector]) + }, [shapeStream, shape, selector, enabled]) return useShapeData() } diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index bc64a06054..55d6b7762f 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -480,6 +480,9 @@ export interface ShapeStreamInterface = Row> { ): () => void unsubscribeAll(): void + pause(): void + resume(): void + isLoading(): boolean lastSyncedAt(): number | undefined lastSynced(): number @@ -585,6 +588,7 @@ export class ShapeStream = Row> ] >() + #startLoopRunning = false #started = false #syncState: ShapeStreamState #connected: boolean = false @@ -727,6 +731,12 @@ export class ShapeStream = Row> } async #start(): Promise { + // Prevent concurrent loops + if (this.#startLoopRunning) { + return + } + + this.#startLoopRunning = true this.#started = true this.#subscribeToWakeDetection() @@ -791,6 +801,8 @@ export class ShapeStream = Row> } this.#teardown() throw err + } finally { + this.#startLoopRunning = false } this.#teardown() @@ -1549,6 +1561,24 @@ export class ShapeStream = Row> this.#unsubscribeFromWakeDetection?.() } + pause(): void { + if (!this.#started) return + + if (!this.#pauseLock.isHeldBy(`manual`)) { + // pause() could fail to stop the #requestShape loop + // if #requestAbortController is not yet initialized by #requestShape + this.#pauseLock.acquire(`manual`) + } + } + + resume(): void { + if (!this.#started) return + + if (this.#pauseLock.isHeldBy(`manual`)) { + this.#pauseLock.release(`manual`) + } + } + /** Unix time at which we last synced. Undefined until first successful up-to-date. */ lastSyncedAt(): number | undefined { return this.#syncState.lastSyncedAt diff --git a/packages/typescript-client/src/shape.ts b/packages/typescript-client/src/shape.ts index 35cf7b5dfe..0526a13bb4 100644 --- a/packages/typescript-client/src/shape.ts +++ b/packages/typescript-client/src/shape.ts @@ -59,15 +59,25 @@ export class Shape = Row> { readonly #insertedKeys = new Set() readonly #requestedSubSnapshots = new Set() #reexecuteSnapshotsPending = false + #started = false #status: ShapeStatus = `syncing` #error: FetchError | false = false - constructor(stream: ShapeStreamInterface) { + constructor(stream: ShapeStreamInterface, { autoStart = true } = {}) { this.stream = stream - this.stream.subscribe( - this.#process.bind(this), - this.#handleError.bind(this) - ) + + if (autoStart) { + this.#started = true + + this.stream.subscribe( + this.#process.bind(this), + this.#handleError.bind(this) + ) + } + } + + get started(): boolean { + return this.#started } get isUpToDate(): boolean { @@ -137,6 +147,17 @@ export class Shape = Row> { return this.stream.mode } + start(): void { + if (this.#started) return + + this.#started = true + + this.stream.subscribe( + this.#process.bind(this), + this.#handleError.bind(this) + ) + } + /** * Request a snapshot for subset of data. Only available when mode is changes_only. * Returns void; data will be emitted via the stream and processed by this Shape.