Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dry-chefs-accept.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"react-realtime-hooks": patch
---

Improve heartbeat failure handling in useHeartbeat/useWebSocket by surfacing beat errors and adding timeout-driven close/reconnect behavior.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ export function NetworkIndicator() {
| `parseMessage` | `(event) => TIncoming` | raw `event.data` | Incoming parser |
| `serializeMessage` | `(message) => ...` | JSON/string passthrough | Outgoing serializer |
| `reconnect` | `false \| UseReconnectOptions` | enabled | Reconnect configuration |
| `heartbeat` | `false \| UseHeartbeatOptions` | disabled unless configured | Heartbeat configuration |
| `heartbeat` | `false \| UseWebSocketHeartbeatOptions` | disabled unless configured | Heartbeat configuration |
| `shouldReconnect` | `(event) => boolean` | `true` | Reconnect gate on close |
| `onOpen` | `(event, socket) => void` | `undefined` | Open callback |
| `onMessage` | `(message, event) => void` | `undefined` | Message callback |
Expand All @@ -375,6 +375,10 @@ export function NetworkIndicator() {
| `reconnect` | `() => void` | Manual reconnect |
| `send` | `(message) => boolean` | Sends an outgoing payload |

When you configure `useWebSocket` heartbeat, you can also set `timeoutAction` and
`errorAction` to `"none"`, `"close"`, or `"reconnect"`. The default is
`"reconnect"` when reconnect is enabled and `"close"` otherwise.

</details>

<details>
Expand Down Expand Up @@ -464,6 +468,7 @@ export function NetworkIndicator() {
| `startOnMount` | `boolean` | `true` | Starts immediately |
| `onBeat` | `() => void` | `undefined` | Called on every beat |
| `onTimeout` | `() => void` | `undefined` | Called on timeout |
| `onError` | `(error) => void` | `undefined` | Called when `beat()` throws or rejects |

### Result

Expand Down
45 changes: 41 additions & 4 deletions src/hooks/useHeartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export const useHeartbeat = <
const startOnMount = options.startOnMount ?? true;
const intervalRef = useRef(createManagedInterval());
const timeoutRef = useRef(createManagedTimeout());
const generationRef = useRef(0);
const [state, setState] = useState<HeartbeatState>(() =>
createInitialState(enabled && startOnMount)
);
Expand Down Expand Up @@ -71,9 +72,7 @@ export const useHeartbeat = <
}, options.timeoutMs);
});

const runBeat = useEffectEvent(() => {
const performedAt = Date.now();

const handleBeatSuccess = useEffectEvent((performedAt: number) => {
commitState((current) => ({
...current,
hasTimedOut: false,
Expand All @@ -82,8 +81,44 @@ export const useHeartbeat = <

scheduleTimeout();
options.onBeat?.();
});

const handleBeatError = useEffectEvent((error: unknown) => {
timeoutRef.current.cancel();
options.onError?.(error);
});

const runBeat = useEffectEvent(() => {
const generation = generationRef.current;

void options.beat?.();
const completeBeat = (result: void | boolean): void => {
if (generation !== generationRef.current || result === false) {
return;
}

handleBeatSuccess(Date.now());
};

const failBeat = (error: unknown): void => {
if (generation !== generationRef.current) {
return;
}

handleBeatError(error);
};

try {
const result = options.beat?.();

if (result !== null && typeof result === "object" && "then" in result) {
void Promise.resolve(result).then(completeBeat, failBeat);
return;
}

completeBeat(result);
} catch (error) {
failBeat(error);
}
});

const start = (): void => {
Expand All @@ -105,6 +140,7 @@ export const useHeartbeat = <
};

const stop = (): void => {
generationRef.current += 1;
intervalRef.current.cancel();
timeoutRef.current.cancel();
commitState((current) => ({
Expand Down Expand Up @@ -166,6 +202,7 @@ export const useHeartbeat = <
}, [enabled, options.intervalMs, startOnMount]);

useEffect(() => () => {
generationRef.current += 1;
intervalRef.current.cancel();
timeoutRef.current.cancel();
}, []);
Expand Down
120 changes: 114 additions & 6 deletions src/hooks/useWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { useReconnect } from "./useReconnect";
import type { UseHeartbeatOptions } from "../types/useHeartbeat";
import type {
UseWebSocketHook,
WebSocketHeartbeatAction,
UseWebSocketHeartbeatOptions,
UseWebSocketOptions,
UseWebSocketResult
} from "../types/useWebSocket";
Expand Down Expand Up @@ -71,9 +73,12 @@ const toProtocolsDependency = (protocols: string | string[] | undefined): string

const toHeartbeatConfig = <TOutgoing, TIncoming>(
heartbeat: UseWebSocketOptions<TIncoming, TOutgoing>["heartbeat"]
): UseHeartbeatOptions<TOutgoing, TIncoming> | null =>
): UseWebSocketHeartbeatOptions<TOutgoing, TIncoming> | null =>
heartbeat === undefined || heartbeat === false ? null : heartbeat;

const isSocketActive = (socket: WebSocket): boolean =>
socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING;

export const useWebSocket: UseWebSocketHook = <
TIncoming = unknown,
TOutgoing = TIncoming
Expand All @@ -91,6 +96,10 @@ export const useWebSocket: UseWebSocketHook = <
const manualOpenRef = useRef(false);
const skipCloseReconnectRef = useRef(false);
const suppressReconnectRef = useRef(false);
const pendingCloseActionRef = useRef<{
error: Event | null;
reconnectTrigger: "heartbeat-timeout" | "error" | null;
} | null>(null);
const terminalErrorRef = useRef<Event | null>(null);
const [openNonce, setOpenNonce] = useState(0);
const [state, setState] = useState<WebSocketState<TIncoming>>(() =>
Expand All @@ -115,6 +124,8 @@ export const useWebSocket: UseWebSocketHook = <
const heartbeatConfig = toHeartbeatConfig<TOutgoing, TIncoming>(
options.heartbeat
);
const defaultHeartbeatAction: WebSocketHeartbeatAction =
options.reconnect === false ? "close" : "reconnect";
const heartbeatHookOptions: UseHeartbeatOptions<TOutgoing, TIncoming> =
heartbeatConfig === null
? {
Expand Down Expand Up @@ -161,6 +172,33 @@ export const useWebSocket: UseWebSocketHook = <
heartbeatHookOptions.onTimeout = heartbeatConfig.onTimeout;
}

if (heartbeatConfig !== null) {
const onTimeout = heartbeatHookOptions.onTimeout;
heartbeatHookOptions.onTimeout = () => {
applyHeartbeatAction(
heartbeatConfig.timeoutAction ?? defaultHeartbeatAction,
new Event("heartbeat-timeout"),
"heartbeat-timeout"
);
onTimeout?.();
};

const onError = heartbeatConfig.onError;
heartbeatHookOptions.onError = (error) => {
const event =
error instanceof Event ? error : new Event("heartbeat-error");

applyHeartbeatAction(
heartbeatConfig.errorAction ??
heartbeatConfig.timeoutAction ??
defaultHeartbeatAction,
event,
"error"
);
onError?.(error);
};
}

const heartbeat = useHeartbeat<TOutgoing, TIncoming>(
heartbeatHookOptions
);
Expand All @@ -185,11 +223,61 @@ export const useWebSocket: UseWebSocketHook = <
socketRef.current = null;
socketKeyRef.current = null;

if (socket.readyState === WebSocket.OPEN || socket.readyState === WebSocket.CONNECTING) {
if (isSocketActive(socket)) {
socket.close(code, reason);
}
});

const applyHeartbeatAction = useEffectEvent(
(
action: WebSocketHeartbeatAction,
error: Event,
reconnectTrigger: "heartbeat-timeout" | "error"
) => {
heartbeat.stop();

if (action === "none") {
commitState((current) => ({
...current,
lastChangedAt: Date.now(),
lastError: error
}));
return;
}

const shouldReconnect =
action === "reconnect" &&
reconnectEnabled &&
(options.shouldReconnect?.(error) ?? true);
manualOpenRef.current = false;
terminalErrorRef.current = shouldReconnect ? null : error;
const socket = socketRef.current;

if (socket === null || !isSocketActive(socket)) {
commitState((current) => ({
...current,
lastChangedAt: Date.now(),
lastError: error,
status: shouldReconnect ? "reconnecting" : "error"
}));

if (shouldReconnect) {
reconnect.schedule(reconnectTrigger);
}

return;
}

pendingCloseActionRef.current = {
error,
reconnectTrigger: shouldReconnect ? reconnectTrigger : null
};
skipCloseReconnectRef.current = true;
suppressReconnectRef.current = true;
closeSocket();
}
);

const parseMessage = useEffectEvent((event: MessageEvent<unknown>) => {
const parser = options.parseMessage ?? defaultParseMessage<TIncoming>;
return parser(event);
Expand Down Expand Up @@ -267,10 +355,33 @@ export const useWebSocket: UseWebSocketHook = <
socketKeyRef.current = null;
heartbeat.stop();
updateBufferedAmount();
const pendingCloseAction = pendingCloseActionRef.current;
pendingCloseActionRef.current = null;
const terminalError = terminalErrorRef.current;
const skipCloseReconnect = skipCloseReconnectRef.current;
skipCloseReconnectRef.current = false;

if (pendingCloseAction !== null) {
suppressReconnectRef.current = false;

commitState((current) => ({
...current,
lastChangedAt: Date.now(),
lastCloseEvent: event,
lastError: pendingCloseAction.error ?? current.lastError,
status:
pendingCloseAction.reconnectTrigger === null ? "error" : "reconnecting"
}));

options.onClose?.(event);

if (pendingCloseAction.reconnectTrigger !== null) {
reconnect.schedule(pendingCloseAction.reconnectTrigger);
}

return;
}

if (terminalError !== null) {
suppressReconnectRef.current = false;

Expand Down Expand Up @@ -475,10 +586,7 @@ export const useWebSocket: UseWebSocketHook = <
return;
}

if (
socket.readyState === WebSocket.OPEN ||
socket.readyState === WebSocket.CONNECTING
) {
if (isSocketActive(socket)) {
socket.close();
}
}, []);
Expand Down
1 change: 1 addition & 0 deletions src/types/useHeartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export interface UseHeartbeatOptions<TOutgoing = unknown, TIncoming = TOutgoing>
startOnMount?: boolean;
onBeat?: () => void;
onTimeout?: () => void;
onError?: (error: unknown) => void;
}

export interface UseHeartbeatResult<TIncoming = unknown> {
Expand Down
12 changes: 11 additions & 1 deletion src/types/useWebSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import type {
} from "./useHeartbeat";
import type { UseReconnectOptions, UseReconnectResult } from "./useReconnect";

export type WebSocketHeartbeatAction = "none" | "close" | "reconnect";

export interface UseWebSocketHeartbeatOptions<
TOutgoing = unknown,
TIncoming = TOutgoing
> extends UseHeartbeatOptions<TOutgoing, TIncoming> {
timeoutAction?: WebSocketHeartbeatAction;
errorAction?: WebSocketHeartbeatAction;
}

export interface UseWebSocketOptions<TIncoming = unknown, TOutgoing = TIncoming> {
url: UrlProvider;
protocols?: string | string[];
Expand All @@ -19,7 +29,7 @@ export interface UseWebSocketOptions<TIncoming = unknown, TOutgoing = TIncoming>
parseMessage?: MessageParser<TIncoming>;
serializeMessage?: MessageSerializer<TOutgoing>;
reconnect?: false | UseReconnectOptions;
heartbeat?: false | UseHeartbeatOptions<TOutgoing, TIncoming>;
heartbeat?: false | UseWebSocketHeartbeatOptions<TOutgoing, TIncoming>;
shouldReconnect?: (event: CloseEvent | Event | undefined) => boolean;
onOpen?: (event: Event, socket: WebSocket) => void;
onMessage?: (message: TIncoming, event: MessageEvent<unknown>) => void;
Expand Down
4 changes: 2 additions & 2 deletions test/api-contract.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import type {
RealtimeConnectionStatus,
UseEventSourceOptions,
UseEventSourceResult,
UseHeartbeatOptions,
UseOnlineStatusResult,
UseReconnectOptions,
UseReconnectResult,
UseWebSocketHeartbeatOptions,
UseWebSocketOptions,
UseWebSocketResult
} from "../src";
Expand Down Expand Up @@ -35,7 +35,7 @@ describe("api contracts", () => {
expectTypeOf<
UseWebSocketOptions<string, { type: "ping" }>["heartbeat"]
>().toEqualTypeOf<
false | UseHeartbeatOptions<{ type: "ping" }, string> | undefined
false | UseWebSocketHeartbeatOptions<{ type: "ping" }, string> | undefined
>();

expectTypeOf<UseWebSocketResult<string>["transport"]>().toEqualTypeOf<"websocket">();
Expand Down
Loading
Loading