diff --git a/.changeset/add-system-wake-detection.md b/.changeset/add-system-wake-detection.md new file mode 100644 index 0000000000..da68bca8e6 --- /dev/null +++ b/.changeset/add-system-wake-detection.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/client': patch +--- + +Fix ShapeStream hanging after system sleep in non-browser environments (Bun, Node.js). Stale in-flight HTTP requests are now automatically aborted and reconnected on wake, preventing hangs until TCP timeout. diff --git a/packages/typescript-client/src/client.ts b/packages/typescript-client/src/client.ts index 512378b560..20f407c0a7 100644 --- a/packages/typescript-client/src/client.ts +++ b/packages/typescript-client/src/client.ts @@ -51,6 +51,7 @@ import { REPLICA_PARAM, FORCE_DISCONNECT_AND_REFRESH, PAUSE_STREAM, + SYSTEM_WAKE, EXPERIMENTAL_LIVE_SSE_QUERY_PARAM, LIVE_SSE_QUERY_PARAM, ELECTRIC_PROTOCOL_QUERY_PARAMS, @@ -598,6 +599,7 @@ export class ShapeStream = Row> #sseBackoffBaseDelay = 100 // Base delay for exponential backoff (ms) #sseBackoffMaxDelay = 5000 // Maximum delay cap (ms) #unsubscribeFromVisibilityChanges?: () => void + #unsubscribeFromWakeDetection?: () => void #staleCacheBuster?: string // Cache buster set when stale CDN response detected, used on retry requests to bypass cache #staleCacheRetryCount = 0 #maxStaleCacheRetries = 3 @@ -666,6 +668,7 @@ export class ShapeStream = Row> this.#fetchClient = createFetchWithConsumedMessages(this.#sseFetchClient) this.#subscribeToVisibilityChanges() + this.#subscribeToWakeDetection() } get shapeHandle() { @@ -735,6 +738,7 @@ export class ShapeStream = Row> } this.#connected = false this.#tickPromiseRejecter?.() + this.#unsubscribeFromWakeDetection?.() return } @@ -745,12 +749,14 @@ export class ShapeStream = Row> } this.#connected = false this.#tickPromiseRejecter?.() + this.#unsubscribeFromWakeDetection?.() throw err } // Normal completion, clean up this.#connected = false this.#tickPromiseRejecter?.() + this.#unsubscribeFromWakeDetection?.() } async #requestShape(): Promise { @@ -785,13 +791,16 @@ export class ShapeStream = Row> resumingFromPause, }) } catch (e) { - // Handle abort error triggered by refresh + const abortReason = requestAbortController.signal.reason + const isRestartAbort = + requestAbortController.signal.aborted && + (abortReason === FORCE_DISCONNECT_AND_REFRESH || + abortReason === SYSTEM_WAKE) + if ( (e instanceof FetchError || e instanceof FetchBackoffAbortError) && - requestAbortController.signal.aborted && - requestAbortController.signal.reason === FORCE_DISCONNECT_AND_REFRESH + isRestartAbort ) { - // Start a new request return this.#requestShape() } @@ -1432,6 +1441,7 @@ export class ShapeStream = Row> unsubscribeAll(): void { this.#subscribers.clear() this.#unsubscribeFromVisibilityChanges?.() + this.#unsubscribeFromWakeDetection?.() } /** Unix time at which we last synced. Undefined when `isLoading` is true. */ @@ -1543,12 +1553,16 @@ export class ShapeStream = Row> }) } - #subscribeToVisibilityChanges() { - if ( + #hasBrowserVisibilityAPI(): boolean { + return ( typeof document === `object` && typeof document.hidden === `boolean` && typeof document.addEventListener === `function` - ) { + ) + } + + #subscribeToVisibilityChanges() { + if (this.#hasBrowserVisibilityAPI()) { const visibilityHandler = () => { if (document.hidden) { this.#pause() @@ -1566,6 +1580,52 @@ export class ShapeStream = Row> } } + /** + * Detects system wake from sleep using timer gap detection. + * When the system sleeps, setInterval timers are paused. On wake, + * the elapsed wall-clock time since the last tick will be much larger + * than the interval period, indicating the system was asleep. + * + * Only active in non-browser environments (Bun, Node.js) where + * `document.visibilitychange` is not available. In browsers, + * `#subscribeToVisibilityChanges` handles this instead. Without wake + * detection, in-flight HTTP requests (long-poll or SSE) may hang until + * the OS TCP timeout. + */ + #subscribeToWakeDetection() { + if (this.#hasBrowserVisibilityAPI()) return + + const INTERVAL_MS = 2_000 + const WAKE_THRESHOLD_MS = 4_000 + + let lastTickTime = Date.now() + + const timer = setInterval(() => { + const now = Date.now() + const elapsed = now - lastTickTime + lastTickTime = now + + if (elapsed > INTERVAL_MS + WAKE_THRESHOLD_MS) { + if (this.#state === `active` && this.#requestAbortController) { + this.#isRefreshing = true + this.#requestAbortController.abort(SYSTEM_WAKE) + queueMicrotask(() => { + this.#isRefreshing = false + }) + } + } + }, INTERVAL_MS) + + // Ensure the timer doesn't prevent the process from exiting + if (typeof timer === `object` && `unref` in timer) { + timer.unref() + } + + this.#unsubscribeFromWakeDetection = () => { + clearInterval(timer) + } + } + /** * Resets the state of the stream, optionally with a provided * shape handle diff --git a/packages/typescript-client/src/constants.ts b/packages/typescript-client/src/constants.ts index c143966304..08643002d5 100644 --- a/packages/typescript-client/src/constants.ts +++ b/packages/typescript-client/src/constants.ts @@ -20,6 +20,7 @@ export const EXPERIMENTAL_LIVE_SSE_QUERY_PARAM = `experimental_live_sse` export const LIVE_SSE_QUERY_PARAM = `live_sse` export const FORCE_DISCONNECT_AND_REFRESH = `force-disconnect-and-refresh` export const PAUSE_STREAM = `pause-stream` +export const SYSTEM_WAKE = `system-wake` export const LOG_MODE_QUERY_PARAM = `log` export const SUBSET_PARAM_WHERE = `subset__where` export const SUBSET_PARAM_LIMIT = `subset__limit` diff --git a/packages/typescript-client/test/wake-detection.test.ts b/packages/typescript-client/test/wake-detection.test.ts new file mode 100644 index 0000000000..a3193ffb89 --- /dev/null +++ b/packages/typescript-client/test/wake-detection.test.ts @@ -0,0 +1,120 @@ +// @vitest-environment node +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { ShapeStream } from '../src' +import { resolveInMacrotask } from './support/test-helpers' + +describe(`Wake detection`, () => { + const shapeUrl = `https://example.com/v1/shape` + let aborter: AbortController + + beforeEach(() => { + aborter = new AbortController() + }) + + afterEach(() => { + aborter.abort() + vi.useRealTimers() + vi.restoreAllMocks() + }) + + it(`should set up wake detection timer in non-browser environments`, async () => { + const clearIntervalSpy = vi.spyOn(globalThis, `clearInterval`) + + const fetchWrapper = (): Promise => { + return resolveInMacrotask(Response.error()) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `foo` }, + signal: aborter.signal, + fetchClient: fetchWrapper, + }) + const unsub = stream.subscribe(() => {}) + + stream.unsubscribeAll() + expect(clearIntervalSpy.mock.calls.length).toBeGreaterThanOrEqual(1) + + unsub() + }) + + it(`should NOT set up wake detection timer in browser environments`, async () => { + ;(globalThis as Record).document = { + hidden: false, + addEventListener: vi.fn(), + removeEventListener: vi.fn(), + } + + const clearIntervalSpy = vi.spyOn(globalThis, `clearInterval`) + + const fetchWrapper = (): Promise => { + return resolveInMacrotask(Response.error()) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `foo` }, + signal: aborter.signal, + fetchClient: fetchWrapper, + }) + const unsub = stream.subscribe(() => {}) + + stream.unsubscribeAll() + expect(clearIntervalSpy.mock.calls.length).toBe(0) + + unsub() + delete (globalThis as Record).document + }) + + it(`should detect time gap and abort stale fetch after system wake`, async () => { + vi.useFakeTimers() + + const fetchSignals: AbortSignal[] = [] + let fetchCallCount = 0 + + const fetchWrapper = ( + ...args: Parameters + ): Promise => { + const signal = args[1]?.signal + if (signal) fetchSignals.push(signal) + fetchCallCount++ + return new Promise((_resolve, reject) => { + signal?.addEventListener(`abort`, () => reject(new Error(`aborted`)), { + once: true, + }) + }) + } + + const stream = new ShapeStream({ + url: shapeUrl, + params: { table: `foo` }, + signal: aborter.signal, + fetchClient: fetchWrapper, + }) + const unsub = stream.subscribe(() => {}) + + // Wait for first fetch + await vi.advanceTimersByTimeAsync(0) + expect(fetchCallCount).toBeGreaterThanOrEqual(1) + const initialFetchCount = fetchCallCount + + // Advance one normal interval (2s) — should NOT trigger wake detection + await vi.advanceTimersByTimeAsync(2_001) + expect(fetchSignals[fetchSignals.length - 1]?.aborted).toBe(false) + + // Simulate system sleep by jumping Date.now() forward 10s + const currentTime = Date.now() + vi.setSystemTime(currentTime + 10_000) + + // Trigger the next interval tick and allow async restart + await vi.advanceTimersByTimeAsync(2_001) + await vi.advanceTimersByTimeAsync(100) + + expect(fetchSignals[0]?.aborted).toBe(true) + expect(fetchSignals[0]?.reason).toBe(`system-wake`) + expect(fetchCallCount).toBeGreaterThan(initialFetchCount) + + unsub() + aborter.abort() + }) +}) diff --git a/packages/typescript-client/vitest.unit.config.ts b/packages/typescript-client/vitest.unit.config.ts index 36e6a90da9..a6ac751167 100644 --- a/packages/typescript-client/vitest.unit.config.ts +++ b/packages/typescript-client/vitest.unit.config.ts @@ -10,6 +10,7 @@ export default defineConfig({ `test/parser.test.ts`, `test/snapshot-tracker.test.ts`, `test/expired-shapes-cache.test.ts`, + `test/wake-detection.test.ts`, ], testTimeout: 30000, environment: `jsdom`,