Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/add-system-wake-detection.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/client': patch
---

Fix ShapeStream hanging after system sleep in non-browser environments (Bun, Node.js). Stale in-flight HTTP requests are now automatically aborted and reconnected on wake, preventing hangs until TCP timeout.
74 changes: 67 additions & 7 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import {
REPLICA_PARAM,
FORCE_DISCONNECT_AND_REFRESH,
PAUSE_STREAM,
SYSTEM_WAKE,
EXPERIMENTAL_LIVE_SSE_QUERY_PARAM,
LIVE_SSE_QUERY_PARAM,
ELECTRIC_PROTOCOL_QUERY_PARAMS,
Expand Down Expand Up @@ -598,6 +599,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
#sseBackoffBaseDelay = 100 // Base delay for exponential backoff (ms)
#sseBackoffMaxDelay = 5000 // Maximum delay cap (ms)
#unsubscribeFromVisibilityChanges?: () => void
#unsubscribeFromWakeDetection?: () => void
#staleCacheBuster?: string // Cache buster set when stale CDN response detected, used on retry requests to bypass cache
#staleCacheRetryCount = 0
#maxStaleCacheRetries = 3
Expand Down Expand Up @@ -666,6 +668,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
this.#fetchClient = createFetchWithConsumedMessages(this.#sseFetchClient)

this.#subscribeToVisibilityChanges()
this.#subscribeToWakeDetection()
}

get shapeHandle() {
Expand Down Expand Up @@ -735,6 +738,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
this.#connected = false
this.#tickPromiseRejecter?.()
this.#unsubscribeFromWakeDetection?.()
return
}

Expand All @@ -745,12 +749,14 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
this.#connected = false
this.#tickPromiseRejecter?.()
this.#unsubscribeFromWakeDetection?.()
throw err
}

// Normal completion, clean up
this.#connected = false
this.#tickPromiseRejecter?.()
this.#unsubscribeFromWakeDetection?.()
}

async #requestShape(): Promise<void> {
Expand Down Expand Up @@ -785,13 +791,16 @@ export class ShapeStream<T extends Row<unknown> = Row>
resumingFromPause,
})
} catch (e) {
// Handle abort error triggered by refresh
const abortReason = requestAbortController.signal.reason
const isRestartAbort =
requestAbortController.signal.aborted &&
(abortReason === FORCE_DISCONNECT_AND_REFRESH ||
abortReason === SYSTEM_WAKE)

if (
(e instanceof FetchError || e instanceof FetchBackoffAbortError) &&
requestAbortController.signal.aborted &&
requestAbortController.signal.reason === FORCE_DISCONNECT_AND_REFRESH
isRestartAbort
) {
// Start a new request
return this.#requestShape()
}

Expand Down Expand Up @@ -1432,6 +1441,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
unsubscribeAll(): void {
this.#subscribers.clear()
this.#unsubscribeFromVisibilityChanges?.()
this.#unsubscribeFromWakeDetection?.()
}

/** Unix time at which we last synced. Undefined when `isLoading` is true. */
Expand Down Expand Up @@ -1543,12 +1553,16 @@ export class ShapeStream<T extends Row<unknown> = Row>
})
}

#subscribeToVisibilityChanges() {
if (
#hasBrowserVisibilityAPI(): boolean {
return (
typeof document === `object` &&
typeof document.hidden === `boolean` &&
typeof document.addEventListener === `function`
) {
)
}

#subscribeToVisibilityChanges() {
if (this.#hasBrowserVisibilityAPI()) {
const visibilityHandler = () => {
if (document.hidden) {
this.#pause()
Expand All @@ -1566,6 +1580,52 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
}

/**
* Detects system wake from sleep using timer gap detection.
* When the system sleeps, setInterval timers are paused. On wake,
* the elapsed wall-clock time since the last tick will be much larger
* than the interval period, indicating the system was asleep.
*
* Only active in non-browser environments (Bun, Node.js) where
* `document.visibilitychange` is not available. In browsers,
* `#subscribeToVisibilityChanges` handles this instead. Without wake
* detection, in-flight HTTP requests (long-poll or SSE) may hang until
* the OS TCP timeout.
*/
#subscribeToWakeDetection() {
if (this.#hasBrowserVisibilityAPI()) return

const INTERVAL_MS = 2_000
const WAKE_THRESHOLD_MS = 4_000

let lastTickTime = Date.now()

const timer = setInterval(() => {
const now = Date.now()
const elapsed = now - lastTickTime
lastTickTime = now

if (elapsed > INTERVAL_MS + WAKE_THRESHOLD_MS) {
if (this.#state === `active` && this.#requestAbortController) {
this.#isRefreshing = true
this.#requestAbortController.abort(SYSTEM_WAKE)
queueMicrotask(() => {
this.#isRefreshing = false
})
}
}
}, INTERVAL_MS)

// Ensure the timer doesn't prevent the process from exiting
if (typeof timer === `object` && `unref` in timer) {
timer.unref()
}

this.#unsubscribeFromWakeDetection = () => {
clearInterval(timer)
}
}

/**
* Resets the state of the stream, optionally with a provided
* shape handle
Expand Down
1 change: 1 addition & 0 deletions packages/typescript-client/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export const EXPERIMENTAL_LIVE_SSE_QUERY_PARAM = `experimental_live_sse`
export const LIVE_SSE_QUERY_PARAM = `live_sse`
export const FORCE_DISCONNECT_AND_REFRESH = `force-disconnect-and-refresh`
export const PAUSE_STREAM = `pause-stream`
export const SYSTEM_WAKE = `system-wake`
export const LOG_MODE_QUERY_PARAM = `log`
export const SUBSET_PARAM_WHERE = `subset__where`
export const SUBSET_PARAM_LIMIT = `subset__limit`
Expand Down
120 changes: 120 additions & 0 deletions packages/typescript-client/test/wake-detection.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// @vitest-environment node
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { ShapeStream } from '../src'
import { resolveInMacrotask } from './support/test-helpers'

describe(`Wake detection`, () => {
const shapeUrl = `https://example.com/v1/shape`
let aborter: AbortController

beforeEach(() => {
aborter = new AbortController()
})

afterEach(() => {
aborter.abort()
vi.useRealTimers()
vi.restoreAllMocks()
})

it(`should set up wake detection timer in non-browser environments`, async () => {
const clearIntervalSpy = vi.spyOn(globalThis, `clearInterval`)

const fetchWrapper = (): Promise<Response> => {
return resolveInMacrotask(Response.error())
}

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `foo` },
signal: aborter.signal,
fetchClient: fetchWrapper,
})
const unsub = stream.subscribe(() => {})

stream.unsubscribeAll()
expect(clearIntervalSpy.mock.calls.length).toBeGreaterThanOrEqual(1)

unsub()
})

it(`should NOT set up wake detection timer in browser environments`, async () => {
;(globalThis as Record<string, unknown>).document = {
hidden: false,
addEventListener: vi.fn(),
removeEventListener: vi.fn(),
}

const clearIntervalSpy = vi.spyOn(globalThis, `clearInterval`)

const fetchWrapper = (): Promise<Response> => {
return resolveInMacrotask(Response.error())
}

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `foo` },
signal: aborter.signal,
fetchClient: fetchWrapper,
})
const unsub = stream.subscribe(() => {})

stream.unsubscribeAll()
expect(clearIntervalSpy.mock.calls.length).toBe(0)

unsub()
delete (globalThis as Record<string, unknown>).document
})

it(`should detect time gap and abort stale fetch after system wake`, async () => {
vi.useFakeTimers()

const fetchSignals: AbortSignal[] = []
let fetchCallCount = 0

const fetchWrapper = (
...args: Parameters<typeof fetch>
): Promise<Response> => {
const signal = args[1]?.signal
if (signal) fetchSignals.push(signal)
fetchCallCount++
return new Promise<Response>((_resolve, reject) => {
signal?.addEventListener(`abort`, () => reject(new Error(`aborted`)), {
once: true,
})
})
}

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `foo` },
signal: aborter.signal,
fetchClient: fetchWrapper,
})
const unsub = stream.subscribe(() => {})

// Wait for first fetch
await vi.advanceTimersByTimeAsync(0)
expect(fetchCallCount).toBeGreaterThanOrEqual(1)
const initialFetchCount = fetchCallCount

// Advance one normal interval (2s) — should NOT trigger wake detection
await vi.advanceTimersByTimeAsync(2_001)
expect(fetchSignals[fetchSignals.length - 1]?.aborted).toBe(false)

// Simulate system sleep by jumping Date.now() forward 10s
const currentTime = Date.now()
vi.setSystemTime(currentTime + 10_000)

// Trigger the next interval tick and allow async restart
await vi.advanceTimersByTimeAsync(2_001)
await vi.advanceTimersByTimeAsync(100)

expect(fetchSignals[0]?.aborted).toBe(true)
expect(fetchSignals[0]?.reason).toBe(`system-wake`)
expect(fetchCallCount).toBeGreaterThan(initialFetchCount)

unsub()
aborter.abort()
})
})
1 change: 1 addition & 0 deletions packages/typescript-client/vitest.unit.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export default defineConfig({
`test/parser.test.ts`,
`test/snapshot-tracker.test.ts`,
`test/expired-shapes-cache.test.ts`,
`test/wake-detection.test.ts`,
],
testTimeout: 30000,
environment: `jsdom`,
Expand Down
Loading