Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,28 @@ export interface ShapeStreamOptions<T = never> {
* ```
*/
onError?: ShapeStreamErrorHandler

/**
* Delay in milliseconds before pausing sync when the tab is backgrounded.
* This allows sync to continue for a period after the user switches tabs,
* reducing the perceived "jump" when returning to the tab.
*
* Some browsers forcibly terminate long-running fetches in background tabs,
* so this is a trade-off between responsiveness and reliability.
*
* Defaults to 600000 (10 minutes).
*
* @example
* ```typescript
* // Pause immediately when tab is backgrounded
* const stream = new ShapeStream({
* url: 'http://localhost:3000/v1/shape',
* params: { table: 'todos' },
* backgroundPauseDelayMs: 0
* })
* ```
*/
backgroundPauseDelayMs?: number
}

export interface ShapeStreamInterface<T extends Row<unknown> = Row> {
Expand Down Expand Up @@ -577,6 +599,8 @@ export class ShapeStream<T extends Row<unknown> = Row>
#sseBackoffBaseDelay = 100 // Base delay for exponential backoff (ms)
#sseBackoffMaxDelay = 5000 // Maximum delay cap (ms)
#unsubscribeFromVisibilityChanges?: () => void
#backgroundPauseDelayMs: number
#backgroundPauseTimeoutId?: ReturnType<typeof setTimeout>
#staleCacheBuster?: string // Cache buster set when stale CDN response detected, used on retry requests to bypass cache
#staleCacheRetryCount = 0
#maxStaleCacheRetries = 3
Expand Down Expand Up @@ -621,6 +645,8 @@ export class ShapeStream<T extends Row<unknown> = Row>

this.#onError = this.options.onError
this.#mode = this.options.log ?? `full`
this.#backgroundPauseDelayMs =
this.options.backgroundPauseDelayMs ?? 10 * 60 * 1000 // 10 minutes

const baseFetchClient =
options.fetchClient ??
Expand Down Expand Up @@ -1406,6 +1432,11 @@ export class ShapeStream<T extends Row<unknown> = Row>
unsubscribeAll(): void {
this.#subscribers.clear()
this.#unsubscribeFromVisibilityChanges?.()
// Clear any pending background pause timeout
if (this.#backgroundPauseTimeoutId !== undefined) {
clearTimeout(this.#backgroundPauseTimeoutId)
this.#backgroundPauseTimeoutId = undefined
}
}

/** Unix time at which we last synced. Undefined when `isLoading` is true. */
Expand Down Expand Up @@ -1525,8 +1556,28 @@ export class ShapeStream<T extends Row<unknown> = Row>
) {
const visibilityHandler = () => {
if (document.hidden) {
this.#pause()
// Clear any existing timeout
if (this.#backgroundPauseTimeoutId !== undefined) {
clearTimeout(this.#backgroundPauseTimeoutId)
this.#backgroundPauseTimeoutId = undefined
}

if (this.#backgroundPauseDelayMs > 0) {
// Schedule a delayed pause
this.#backgroundPauseTimeoutId = setTimeout(() => {
this.#backgroundPauseTimeoutId = undefined
this.#pause()
}, this.#backgroundPauseDelayMs)
} else {
// Pause immediately (default behavior)
this.#pause()
}
} else {
// Clear any pending pause timeout when tab becomes visible
if (this.#backgroundPauseTimeoutId !== undefined) {
clearTimeout(this.#backgroundPauseTimeoutId)
this.#backgroundPauseTimeoutId = undefined
}
this.#resume()
}
}
Expand Down
103 changes: 103 additions & 0 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ describe.for(fetchAndSse)(`Shape (liveSSE=$liveSse)`, ({ liveSse }) => {
},
signal: aborter.signal,
liveSse,
backgroundPauseDelayMs: 0, // Pause immediately for this test
})

const unsubscribe = shapeStream.subscribe(() => unsubscribe())
Expand All @@ -474,6 +475,7 @@ describe.for(fetchAndSse)(`Shape (liveSSE=$liveSse)`, ({ liveSse }) => {
},
signal: aborter.signal,
liveSse,
backgroundPauseDelayMs: 0, // Pause immediately for this test
})
const shape = new Shape(shapeStream)

Expand Down Expand Up @@ -546,6 +548,107 @@ describe.for(fetchAndSse)(`Shape (liveSSE=$liveSse)`, ({ liveSse }) => {
])
})

// Skip SSE tests because fetch-event-source has its own visibility handler
// that immediately aborts SSE connections when the tab is hidden, which
// conflicts with our delayed pause behavior
it.skipIf(liveSse)(
`should delay pausing the stream when backgroundPauseDelayMs is set`,
async ({ issuesTableUrl, insertIssues, aborter }) => {
const { pause, resume } = mockVisibilityApi()
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
},
signal: aborter.signal,
liveSse,
backgroundPauseDelayMs: 500, // 500ms delay before pausing
})
const shape = new Shape(shapeStream)

const values: Row[][] = []
shape.subscribe(({ rows }) => {
values.push(rows)
})

// Insert an issue and wait for the initial sync
await insertIssues({ title: `test title` })
await vi.waitFor(() => expect(values.length).toBeGreaterThan(0))
await vi.waitFor(() => expect(shapeStream.isConnected()).true)

// Pause (hide tab)
pause()

// Stream should still be connected immediately after pausing
// because of the 500ms delay
expect(shapeStream.isConnected()).true

// Insert another issue while the delay is pending
const [id2] = await insertIssues({ title: `during delay` })

// Wait a bit but less than the delay
await sleep(100)

// Should still be connected
expect(shapeStream.isConnected()).true

// Wait for the update to arrive (stream should still be active)
await vi.waitFor(
() => expect(values.some((rows) => rows.some((r) => r.id === id2))).true
)

// Now wait for the full delay to expire
await sleep(500)

// Stream should now be paused
await vi.waitFor(() => expect(shapeStream.isConnected()).false)

// Resume
resume()
await vi.waitFor(() => expect(shapeStream.isConnected()).true)
}
)

// Skip SSE tests because fetch-event-source has its own visibility handler
// that immediately aborts SSE connections when the tab is hidden
it.skipIf(liveSse)(
`should cancel delayed pause when tab becomes visible again`,
async ({ issuesTableUrl, aborter }) => {
const { pause, resume } = mockVisibilityApi()
const shapeStream = new ShapeStream({
url: `${BASE_URL}/v1/shape`,
params: {
table: issuesTableUrl,
},
signal: aborter.signal,
liveSse,
backgroundPauseDelayMs: 500,
})

const unsubscribe = shapeStream.subscribe(() => unsubscribe())

await vi.waitFor(() => expect(shapeStream.isConnected()).true)

// Pause (hide tab)
pause()

// Wait less than the delay
await sleep(100)

// Should still be connected
expect(shapeStream.isConnected()).true

// Resume before the delay expires
resume()

// Wait longer than the original delay
await sleep(600)

// Stream should still be connected (pause was cancelled)
expect(shapeStream.isConnected()).true
}
)

it(`should not throw error if an error handler is provided`, async ({
issuesTableUrl,
aborter,
Expand Down
Loading