Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 50 additions & 5 deletions packages/react-hooks/src/react-hooks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,43 @@ type UnknownShapeStream = ShapeStream<Row<unknown>>

const streamCache = new Map<string, UnknownShapeStream>()
const shapeCache = new Map<UnknownShapeStream, UnknownShape>()
const refCountCache = new Map<UnknownShapeStream, number>()

function increaseStreamRef(shapeStream: UnknownShapeStream): void {
const count = (refCountCache.get(shapeStream) ?? 0) + 1
refCountCache.set(shapeStream, count)

if (count === 1) {
shapeStream.resume()
}
}

function decreaseStreamRef(shapeStream: UnknownShapeStream): void {
if (!refCountCache.has(shapeStream)) return

const count = refCountCache.get(shapeStream)! - 1
refCountCache.set(shapeStream, count)

if (count === 0) {
shapeStream.pause()
}
}

export async function preloadShape<T extends Row<unknown> = Row>(
options: ShapeStreamOptions<GetExtensions<T>>
): Promise<Shape<T>> {
const shapeStream = getShapeStream<T>(options)
const shape = getShape<T>(shapeStream)
await shape.rows

shape.start()
increaseStreamRef(shapeStream)

try {
await shape.rows
} finally {
decreaseStreamRef(shapeStream)
}

return shape
}

Expand All @@ -33,7 +63,6 @@ function sortObjectKeys(obj: any): any {

return Object.keys(obj)
.sort()

.reduce<Record<string, any>>((sorted, key) => {
sorted[key] = sortObjectKeys(obj[key])
return sorted
Expand Down Expand Up @@ -82,7 +111,7 @@ export function getShape<T extends Row<unknown>>(
shapeCache.delete(shapeStream)
}

const newShape = new Shape<T>(shapeStream)
const newShape = new Shape<T>(shapeStream, { autoStart: false })
shapeCache.set(shapeStream, newShape)

// Return the created shape
Expand Down Expand Up @@ -159,13 +188,15 @@ function identity<T>(arg: T): T {
interface UseShapeOptions<SourceData extends Row<unknown>, Selection>
extends ShapeStreamOptions<GetExtensions<SourceData>> {
selector?: (value: UseShapeResult<SourceData>) => Selection
enabled?: boolean
}

export function useShape<
SourceData extends Row<unknown> = Row,
Selection = UseShapeResult<SourceData>,
>({
selector = identity as (arg: UseShapeResult<SourceData>) => Selection,
enabled = true,
...options
}: UseShapeOptions<SourceData, Selection>): Selection {
const shapeStream = getShapeStream<SourceData>(
Expand All @@ -182,6 +213,15 @@ export function useShape<
}

const subscribe = (onStoreChange: () => void) => {
if (!enabled) {
return () => {
// noop
}
}

shape.start()
increaseStreamRef(shapeStream)

// check if shape has changed between the initial snapshot
// and subscribing, as there are no guarantees that the
// two will occur synchronously with each other
Expand All @@ -191,10 +231,15 @@ export function useShape<
onStoreChange()
}

return shapeSubscribe(shape, () => {
const unsubscribe = shapeSubscribe(shape, () => {
latestShapeData = parseShapeData(shape)
onStoreChange()
})

return () => {
unsubscribe()
decreaseStreamRef(shapeStream)
}
}

return () => {
Expand All @@ -205,7 +250,7 @@ export function useShape<
selector
)
}
}, [shape, selector])
}, [shapeStream, shape, selector, enabled])

return useShapeData()
}
30 changes: 30 additions & 0 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
): () => void
unsubscribeAll(): void

pause(): void
resume(): void

isLoading(): boolean
lastSyncedAt(): number | undefined
lastSynced(): number
Expand Down Expand Up @@ -585,6 +588,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
]
>()

#startLoopRunning = false
#started = false
#syncState: ShapeStreamState
#connected: boolean = false
Expand Down Expand Up @@ -727,6 +731,12 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

async #start(): Promise<void> {
// Prevent concurrent loops
if (this.#startLoopRunning) {
return
}

this.#startLoopRunning = true
this.#started = true
this.#subscribeToWakeDetection()

Expand Down Expand Up @@ -791,6 +801,8 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
this.#teardown()
throw err
} finally {
this.#startLoopRunning = false
}

this.#teardown()
Expand Down Expand Up @@ -1549,6 +1561,24 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#unsubscribeFromWakeDetection?.()
}

pause(): void {
if (!this.#started) return

if (!this.#pauseLock.isHeldBy(`manual`)) {
// pause() could fail to stop the #requestShape loop
// if #requestAbortController is not yet initialized by #requestShape
this.#pauseLock.acquire(`manual`)
}
}

resume(): void {
if (!this.#started) return

if (this.#pauseLock.isHeldBy(`manual`)) {
this.#pauseLock.release(`manual`)
}
}

/** Unix time at which we last synced. Undefined until first successful up-to-date. */
lastSyncedAt(): number | undefined {
return this.#syncState.lastSyncedAt
Expand Down
31 changes: 26 additions & 5 deletions packages/typescript-client/src/shape.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,25 @@ export class Shape<T extends Row<unknown> = Row> {
readonly #insertedKeys = new Set<string>()
readonly #requestedSubSnapshots = new Set<string>()
#reexecuteSnapshotsPending = false
#started = false
#status: ShapeStatus = `syncing`
#error: FetchError | false = false

constructor(stream: ShapeStreamInterface<T>) {
constructor(stream: ShapeStreamInterface<T>, { autoStart = true } = {}) {
this.stream = stream
this.stream.subscribe(
this.#process.bind(this),
this.#handleError.bind(this)
)

if (autoStart) {
this.#started = true

this.stream.subscribe(
this.#process.bind(this),
this.#handleError.bind(this)
)
}
}

get started(): boolean {
return this.#started
}

get isUpToDate(): boolean {
Expand Down Expand Up @@ -137,6 +147,17 @@ export class Shape<T extends Row<unknown> = Row> {
return this.stream.mode
}

start(): void {
if (this.#started) return

this.#started = true

this.stream.subscribe(
this.#process.bind(this),
this.#handleError.bind(this)
)
}

/**
* Request a snapshot for subset of data. Only available when mode is changes_only.
* Returns void; data will be emitted via the stream and processed by this Shape.
Expand Down