diff --git a/.changeset/dry-chefs-accept.md b/.changeset/dry-chefs-accept.md new file mode 100644 index 0000000..cc13f6b --- /dev/null +++ b/.changeset/dry-chefs-accept.md @@ -0,0 +1,5 @@ +--- +"react-realtime-hooks": patch +--- + +Improve heartbeat failure handling in useHeartbeat/useWebSocket by surfacing beat errors and adding timeout-driven close/reconnect behavior. diff --git a/README.md b/README.md index 2b92466..897ca47 100644 --- a/README.md +++ b/README.md @@ -350,7 +350,7 @@ export function NetworkIndicator() { | `parseMessage` | `(event) => TIncoming` | raw `event.data` | Incoming parser | | `serializeMessage` | `(message) => ...` | JSON/string passthrough | Outgoing serializer | | `reconnect` | `false \| UseReconnectOptions` | enabled | Reconnect configuration | -| `heartbeat` | `false \| UseHeartbeatOptions` | disabled unless configured | Heartbeat configuration | +| `heartbeat` | `false \| UseWebSocketHeartbeatOptions` | disabled unless configured | Heartbeat configuration | | `shouldReconnect` | `(event) => boolean` | `true` | Reconnect gate on close | | `onOpen` | `(event, socket) => void` | `undefined` | Open callback | | `onMessage` | `(message, event) => void` | `undefined` | Message callback | @@ -375,6 +375,10 @@ export function NetworkIndicator() { | `reconnect` | `() => void` | Manual reconnect | | `send` | `(message) => boolean` | Sends an outgoing payload | +When you configure `useWebSocket` heartbeat, you can also set `timeoutAction` and +`errorAction` to `"none"`, `"close"`, or `"reconnect"`. The default is +`"reconnect"` when reconnect is enabled and `"close"` otherwise. +
@@ -464,6 +468,7 @@ export function NetworkIndicator() { | `startOnMount` | `boolean` | `true` | Starts immediately | | `onBeat` | `() => void` | `undefined` | Called on every beat | | `onTimeout` | `() => void` | `undefined` | Called on timeout | +| `onError` | `(error) => void` | `undefined` | Called when `beat()` throws or rejects | ### Result diff --git a/src/hooks/useHeartbeat.ts b/src/hooks/useHeartbeat.ts index 8779895..1bd22cc 100644 --- a/src/hooks/useHeartbeat.ts +++ b/src/hooks/useHeartbeat.ts @@ -33,6 +33,7 @@ export const useHeartbeat = < const startOnMount = options.startOnMount ?? true; const intervalRef = useRef(createManagedInterval()); const timeoutRef = useRef(createManagedTimeout()); + const generationRef = useRef(0); const [state, setState] = useState(() => createInitialState(enabled && startOnMount) ); @@ -71,9 +72,7 @@ export const useHeartbeat = < }, options.timeoutMs); }); - const runBeat = useEffectEvent(() => { - const performedAt = Date.now(); - + const handleBeatSuccess = useEffectEvent((performedAt: number) => { commitState((current) => ({ ...current, hasTimedOut: false, @@ -82,8 +81,44 @@ export const useHeartbeat = < scheduleTimeout(); options.onBeat?.(); + }); + + const handleBeatError = useEffectEvent((error: unknown) => { + timeoutRef.current.cancel(); + options.onError?.(error); + }); + + const runBeat = useEffectEvent(() => { + const generation = generationRef.current; - void options.beat?.(); + const completeBeat = (result: void | boolean): void => { + if (generation !== generationRef.current || result === false) { + return; + } + + handleBeatSuccess(Date.now()); + }; + + const failBeat = (error: unknown): void => { + if (generation !== generationRef.current) { + return; + } + + handleBeatError(error); + }; + + try { + const result = options.beat?.(); + + if (result !== null && typeof result === "object" && "then" in result) { + void Promise.resolve(result).then(completeBeat, failBeat); + return; + } + + completeBeat(result); + } catch (error) { + failBeat(error); + } }); const start = (): void => { @@ -105,6 +140,7 @@ export const useHeartbeat = < }; const stop = (): void => { + generationRef.current += 1; intervalRef.current.cancel(); timeoutRef.current.cancel(); commitState((current) => ({ @@ -166,6 +202,7 @@ export const useHeartbeat = < }, [enabled, options.intervalMs, startOnMount]); useEffect(() => () => { + generationRef.current += 1; intervalRef.current.cancel(); timeoutRef.current.cancel(); }, []); diff --git a/src/hooks/useWebSocket.ts b/src/hooks/useWebSocket.ts index d9afbd4..411714a 100644 --- a/src/hooks/useWebSocket.ts +++ b/src/hooks/useWebSocket.ts @@ -8,6 +8,8 @@ import { useReconnect } from "./useReconnect"; import type { UseHeartbeatOptions } from "../types/useHeartbeat"; import type { UseWebSocketHook, + WebSocketHeartbeatAction, + UseWebSocketHeartbeatOptions, UseWebSocketOptions, UseWebSocketResult } from "../types/useWebSocket"; @@ -71,9 +73,12 @@ const toProtocolsDependency = (protocols: string | string[] | undefined): string const toHeartbeatConfig = ( heartbeat: UseWebSocketOptions["heartbeat"] -): UseHeartbeatOptions | null => +): UseWebSocketHeartbeatOptions | null => heartbeat === undefined || heartbeat === false ? null : heartbeat; +const isSocketActive = (socket: WebSocket): boolean => + socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING; + export const useWebSocket: UseWebSocketHook = < TIncoming = unknown, TOutgoing = TIncoming @@ -91,6 +96,10 @@ export const useWebSocket: UseWebSocketHook = < const manualOpenRef = useRef(false); const skipCloseReconnectRef = useRef(false); const suppressReconnectRef = useRef(false); + const pendingCloseActionRef = useRef<{ + error: Event | null; + reconnectTrigger: "heartbeat-timeout" | "error" | null; + } | null>(null); const terminalErrorRef = useRef(null); const [openNonce, setOpenNonce] = useState(0); const [state, setState] = useState>(() => @@ -115,6 +124,8 @@ export const useWebSocket: UseWebSocketHook = < const heartbeatConfig = toHeartbeatConfig( options.heartbeat ); + const defaultHeartbeatAction: WebSocketHeartbeatAction = + options.reconnect === false ? "close" : "reconnect"; const heartbeatHookOptions: UseHeartbeatOptions = heartbeatConfig === null ? { @@ -161,6 +172,33 @@ export const useWebSocket: UseWebSocketHook = < heartbeatHookOptions.onTimeout = heartbeatConfig.onTimeout; } + if (heartbeatConfig !== null) { + const onTimeout = heartbeatHookOptions.onTimeout; + heartbeatHookOptions.onTimeout = () => { + applyHeartbeatAction( + heartbeatConfig.timeoutAction ?? defaultHeartbeatAction, + new Event("heartbeat-timeout"), + "heartbeat-timeout" + ); + onTimeout?.(); + }; + + const onError = heartbeatConfig.onError; + heartbeatHookOptions.onError = (error) => { + const event = + error instanceof Event ? error : new Event("heartbeat-error"); + + applyHeartbeatAction( + heartbeatConfig.errorAction ?? + heartbeatConfig.timeoutAction ?? + defaultHeartbeatAction, + event, + "error" + ); + onError?.(error); + }; + } + const heartbeat = useHeartbeat( heartbeatHookOptions ); @@ -185,11 +223,61 @@ export const useWebSocket: UseWebSocketHook = < socketRef.current = null; socketKeyRef.current = null; - if (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING) { + if (isSocketActive(socket)) { socket.close(code, reason); } }); + const applyHeartbeatAction = useEffectEvent( + ( + action: WebSocketHeartbeatAction, + error: Event, + reconnectTrigger: "heartbeat-timeout" | "error" + ) => { + heartbeat.stop(); + + if (action === "none") { + commitState((current) => ({ + ...current, + lastChangedAt: Date.now(), + lastError: error + })); + return; + } + + const shouldReconnect = + action === "reconnect" && + reconnectEnabled && + (options.shouldReconnect?.(error) ?? true); + manualOpenRef.current = false; + terminalErrorRef.current = shouldReconnect ? null : error; + const socket = socketRef.current; + + if (socket === null || !isSocketActive(socket)) { + commitState((current) => ({ + ...current, + lastChangedAt: Date.now(), + lastError: error, + status: shouldReconnect ? "reconnecting" : "error" + })); + + if (shouldReconnect) { + reconnect.schedule(reconnectTrigger); + } + + return; + } + + pendingCloseActionRef.current = { + error, + reconnectTrigger: shouldReconnect ? reconnectTrigger : null + }; + skipCloseReconnectRef.current = true; + suppressReconnectRef.current = true; + closeSocket(); + } + ); + const parseMessage = useEffectEvent((event: MessageEvent) => { const parser = options.parseMessage ?? defaultParseMessage; return parser(event); @@ -267,10 +355,33 @@ export const useWebSocket: UseWebSocketHook = < socketKeyRef.current = null; heartbeat.stop(); updateBufferedAmount(); + const pendingCloseAction = pendingCloseActionRef.current; + pendingCloseActionRef.current = null; const terminalError = terminalErrorRef.current; const skipCloseReconnect = skipCloseReconnectRef.current; skipCloseReconnectRef.current = false; + if (pendingCloseAction !== null) { + suppressReconnectRef.current = false; + + commitState((current) => ({ + ...current, + lastChangedAt: Date.now(), + lastCloseEvent: event, + lastError: pendingCloseAction.error ?? current.lastError, + status: + pendingCloseAction.reconnectTrigger === null ? "error" : "reconnecting" + })); + + options.onClose?.(event); + + if (pendingCloseAction.reconnectTrigger !== null) { + reconnect.schedule(pendingCloseAction.reconnectTrigger); + } + + return; + } + if (terminalError !== null) { suppressReconnectRef.current = false; @@ -475,10 +586,7 @@ export const useWebSocket: UseWebSocketHook = < return; } - if ( - socket.readyState === WebSocket.OPEN || - socket.readyState === WebSocket.CONNECTING - ) { + if (isSocketActive(socket)) { socket.close(); } }, []); diff --git a/src/types/useHeartbeat.ts b/src/types/useHeartbeat.ts index 6cee37c..b32b231 100644 --- a/src/types/useHeartbeat.ts +++ b/src/types/useHeartbeat.ts @@ -12,6 +12,7 @@ export interface UseHeartbeatOptions startOnMount?: boolean; onBeat?: () => void; onTimeout?: () => void; + onError?: (error: unknown) => void; } export interface UseHeartbeatResult { diff --git a/src/types/useWebSocket.ts b/src/types/useWebSocket.ts index a4c8dbf..ad6b059 100644 --- a/src/types/useWebSocket.ts +++ b/src/types/useWebSocket.ts @@ -11,6 +11,16 @@ import type { } from "./useHeartbeat"; import type { UseReconnectOptions, UseReconnectResult } from "./useReconnect"; +export type WebSocketHeartbeatAction = "none" | "close" | "reconnect"; + +export interface UseWebSocketHeartbeatOptions< + TOutgoing = unknown, + TIncoming = TOutgoing +> extends UseHeartbeatOptions { + timeoutAction?: WebSocketHeartbeatAction; + errorAction?: WebSocketHeartbeatAction; +} + export interface UseWebSocketOptions { url: UrlProvider; protocols?: string | string[]; @@ -19,7 +29,7 @@ export interface UseWebSocketOptions parseMessage?: MessageParser; serializeMessage?: MessageSerializer; reconnect?: false | UseReconnectOptions; - heartbeat?: false | UseHeartbeatOptions; + heartbeat?: false | UseWebSocketHeartbeatOptions; shouldReconnect?: (event: CloseEvent | Event | undefined) => boolean; onOpen?: (event: Event, socket: WebSocket) => void; onMessage?: (message: TIncoming, event: MessageEvent) => void; diff --git a/test/api-contract.test.ts b/test/api-contract.test.ts index 783b62a..40436f8 100644 --- a/test/api-contract.test.ts +++ b/test/api-contract.test.ts @@ -4,10 +4,10 @@ import type { RealtimeConnectionStatus, UseEventSourceOptions, UseEventSourceResult, - UseHeartbeatOptions, UseOnlineStatusResult, UseReconnectOptions, UseReconnectResult, + UseWebSocketHeartbeatOptions, UseWebSocketOptions, UseWebSocketResult } from "../src"; @@ -35,7 +35,7 @@ describe("api contracts", () => { expectTypeOf< UseWebSocketOptions["heartbeat"] >().toEqualTypeOf< - false | UseHeartbeatOptions<{ type: "ping" }, string> | undefined + false | UseWebSocketHeartbeatOptions<{ type: "ping" }, string> | undefined >(); expectTypeOf["transport"]>().toEqualTypeOf<"websocket">(); diff --git a/test/hooks/useHeartbeat.test.ts b/test/hooks/useHeartbeat.test.ts index 5b7a409..9a6d366 100644 --- a/test/hooks/useHeartbeat.test.ts +++ b/test/hooks/useHeartbeat.test.ts @@ -161,6 +161,62 @@ describe("useHeartbeat", () => { expect(onTimeout).toHaveBeenCalledTimes(1); }); + it("does not record a beat when beat returns false", () => { + vi.useFakeTimers(); + + const onBeat = vi.fn(); + const beat = vi.fn(() => false); + const { result } = renderHook(() => + useHeartbeat({ + beat, + intervalMs: 100, + onBeat, + startOnMount: false, + timeoutMs: 200 + }) + ); + + act(() => { + result.current.beat(); + vi.advanceTimersByTime(250); + }); + + expect(beat).toHaveBeenCalledTimes(1); + expect(onBeat).not.toHaveBeenCalled(); + expect(result.current.lastBeatAt).toBeNull(); + expect(result.current.hasTimedOut).toBe(false); + }); + + it("reports rejected beat promises without timing out", async () => { + vi.useFakeTimers(); + + const onError = vi.fn(); + const beat = vi.fn().mockRejectedValue(new Error("heartbeat failed")); + const { result } = renderHook(() => + useHeartbeat({ + beat, + intervalMs: 100, + onError, + startOnMount: false, + timeoutMs: 200 + }) + ); + + act(() => { + result.current.beat(); + }); + + await act(async () => { + await Promise.resolve(); + await Promise.resolve(); + vi.advanceTimersByTime(250); + }); + + expect(onError).toHaveBeenCalledTimes(1); + expect(result.current.lastBeatAt).toBeNull(); + expect(result.current.hasTimedOut).toBe(false); + }); + it("does nothing when disabled", () => { vi.useFakeTimers(); diff --git a/test/hooks/useWebSocket.test.ts b/test/hooks/useWebSocket.test.ts index 1c74f7c..32ee0f4 100644 --- a/test/hooks/useWebSocket.test.ts +++ b/test/hooks/useWebSocket.test.ts @@ -342,6 +342,105 @@ describe("useWebSocket", () => { expect(socket?.sent).toEqual(["ping"]); }); + it("reconnects on heartbeat timeout by default", async () => { + vi.useFakeTimers(); + + const { result } = renderHook(() => + useWebSocket({ + heartbeat: { + intervalMs: 100, + message: "ping", + timeoutMs: 50 + }, + reconnect: { + initialDelayMs: 0, + jitterRatio: 0 + }, + url: "ws://localhost:1234" + }) + ); + + const socket = MockWebSocket.instances[0]; + + act(() => { + socket?.emitOpen(); + vi.advanceTimersByTime(150); + }); + + await act(async () => { + await vi.runAllTimersAsync(); + }); + + expect(socket?.closeCalls).toBe(1); + expect(MockWebSocket.instances).toHaveLength(2); + expect(result.current.status).toBe("reconnecting"); + expect(result.current.lastError?.type).toBe("heartbeat-timeout"); + }); + + it("moves to error on heartbeat timeout when reconnect is disabled", () => { + vi.useFakeTimers(); + + const { result } = renderHook(() => + useWebSocket({ + heartbeat: { + intervalMs: 100, + message: "ping", + timeoutMs: 50 + }, + reconnect: false, + url: "ws://localhost:1234" + }) + ); + + const socket = MockWebSocket.instances[0]; + + act(() => { + socket?.emitOpen(); + vi.advanceTimersByTime(150); + }); + + expect(socket?.closeCalls).toBe(1); + expect(result.current.status).toBe("error"); + expect(result.current.lastError?.type).toBe("heartbeat-timeout"); + }); + + it("reconnects when the heartbeat beat throws", async () => { + vi.useFakeTimers(); + + const { result } = renderHook(() => + useWebSocket({ + heartbeat: { + beat: () => { + throw new Error("beat failed"); + }, + intervalMs: 100, + timeoutMs: 50 + }, + reconnect: { + initialDelayMs: 0, + jitterRatio: 0 + }, + url: "ws://localhost:1234" + }) + ); + + const socket = MockWebSocket.instances[0]; + + act(() => { + socket?.emitOpen(); + vi.advanceTimersByTime(100); + }); + + await act(async () => { + await vi.runAllTimersAsync(); + }); + + expect(socket?.closeCalls).toBe(1); + expect(MockWebSocket.instances).toHaveLength(2); + expect(result.current.status).toBe("reconnecting"); + expect(result.current.lastError?.type).toBe("heartbeat-error"); + }); + it("closes the socket and stops reconnect after parse errors", () => { vi.useFakeTimers();