diff --git a/apps/dashboard/src/lib/use-github-signal-stream.ts b/apps/dashboard/src/lib/use-github-signal-stream.ts index bc82728..6b647b7 100644 --- a/apps/dashboard/src/lib/use-github-signal-stream.ts +++ b/apps/dashboard/src/lib/use-github-signal-stream.ts @@ -1,5 +1,5 @@ import { type QueryKey, useQueryClient } from "@tanstack/react-query"; -import { useEffect, useMemo, useRef } from "react"; +import { type MutableRefObject, useEffect, useMemo, useRef } from "react"; import { debug } from "./debug"; import { getRevalidationSignalTimestamps } from "./github.functions"; import { type GitHubQueryScope, githubQueryKeys } from "./github.query"; @@ -133,9 +133,54 @@ export function invalidateTargets( return invalidatedCount; } +function signalStreamCompositeKey( + queryKey: QueryKey, + signalKey: string, +): string { + return `${JSON.stringify(queryKey)}\0${signalKey}`; +} + +/** Sync server signal timestamps with local query ages; mutates lastSeenTimestamps (per queryKey+signalKey). */ +function collectKeysToInvalidateAfterServerSync( + queryClient: ReturnType, + targets: readonly GitHubSignalStreamTarget[], + signals: Array<{ signalKey: string; updatedAt: number }>, + lastSeenTimestamps: Map, +): string[] { + const updatedKeys = new Set(); + + for (const signal of signals) { + for (const target of targets) { + if (!target.signalKeys.includes(signal.signalKey)) { + continue; + } + + const compositeKey = signalStreamCompositeKey( + target.queryKey, + signal.signalKey, + ); + const lastSeen = lastSeenTimestamps.get(compositeKey); + const qs = queryClient.getQueryState(target.queryKey); + + if (lastSeen === undefined) { + if (qs && qs.dataUpdatedAt > 0 && signal.updatedAt > qs.dataUpdatedAt) { + updatedKeys.add(signal.signalKey); + } + lastSeenTimestamps.set(compositeKey, signal.updatedAt); + } else if (signal.updatedAt > lastSeen) { + lastSeenTimestamps.set(compositeKey, signal.updatedAt); + updatedKeys.add(signal.signalKey); + } + } + } + + return Array.from(updatedKeys); +} + function useGitHubSignalStreamWebSocket( targets: readonly GitHubSignalStreamTarget[], signalKeysKey: string, + lastSeenTimestampsRef: MutableRefObject>, ) { const queryClient = useQueryClient(); const targetsRef = useRef(targets); @@ -151,6 +196,45 @@ function useGitHubSignalStreamWebSocket( let reconnectTimer: ReturnType | null = null; let disposed = false; + async function syncSignalsFromServer(source: string) { + try { + const signals = await getRevalidationSignalTimestamps({ + data: { signalKeys: keys }, + }); + if (disposed) return; + + const updatedKeys = collectKeysToInvalidateAfterServerSync( + queryClient, + targetsRef.current, + signals, + lastSeenTimestampsRef.current, + ); + + if (updatedKeys.length === 0) { + return; + } + + debug(source, "detected missed or stale cache vs signals", { + updatedKeys, + }); + + const invalidatedCount = invalidateTargets( + queryClient, + targetsRef.current, + new Set(updatedKeys), + source, + ); + + debug(source, "sync processed", { + updatedKeys, + invalidatedCount, + totalTargets: targetsRef.current.length, + }); + } catch (error) { + debug(source, "sync failed", { error }); + } + } + function sendSubscription(socket: WebSocket) { if (socket.readyState === WebSocket.OPEN) { debug("github-signal-stream", "subscribing to signal keys", { @@ -212,6 +296,7 @@ function useGitHubSignalStreamWebSocket( ws.addEventListener("open", () => { debug("github-signal-stream", "connected"); if (ws) sendSubscription(ws); + void syncSignalsFromServer("github-signal-ws-catchup"); }); ws.addEventListener("message", handleMessage); @@ -251,12 +336,13 @@ function useGitHubSignalStreamWebSocket( ws.close(); } }; - }, [signalKeysKey, queryClient]); + }, [signalKeysKey, queryClient, lastSeenTimestampsRef]); } function useGitHubSignalPoll( targets: readonly GitHubSignalStreamTarget[], signalKeysKey: string, + lastSeenTimestampsRef: MutableRefObject>, ) { const queryClient = useQueryClient(); const targetsRef = useRef(targets); @@ -270,7 +356,6 @@ function useGitHubSignalPoll( const keys = signalKeysKey.split(","); let pollTimer: ReturnType | null = null; let disposed = false; - const lastSeenTimestamps = new Map(); async function pollSignals() { if (disposed) return; @@ -282,16 +367,12 @@ function useGitHubSignalPoll( if (disposed) return; - const updatedKeys: string[] = []; - for (const signal of signals) { - const lastSeen = lastSeenTimestamps.get(signal.signalKey); - if (lastSeen === undefined) { - lastSeenTimestamps.set(signal.signalKey, signal.updatedAt); - } else if (signal.updatedAt > lastSeen) { - lastSeenTimestamps.set(signal.signalKey, signal.updatedAt); - updatedKeys.push(signal.signalKey); - } - } + const updatedKeys = collectKeysToInvalidateAfterServerSync( + queryClient, + targetsRef.current, + signals, + lastSeenTimestampsRef.current, + ); if (updatedKeys.length > 0) { debug("github-signal-poll", "detected missed signals", { @@ -335,7 +416,7 @@ function useGitHubSignalPoll( clearTimeout(pollTimer); } }; - }, [signalKeysKey, queryClient]); + }, [signalKeysKey, queryClient, lastSeenTimestampsRef]); } export function useGitHubSignalStream( @@ -356,6 +437,31 @@ export function useGitHubSignalStream( // not when the array reference changes. const signalKeysKey = allSignalKeys.join(","); - useGitHubSignalStreamWebSocket(mergedTargets, signalKeysKey); - useGitHubSignalPoll(mergedTargets, signalKeysKey); + const mergedTargetsIdentity = useMemo( + () => + mergedTargets + .map( + (t) => + `${JSON.stringify(t.queryKey)}\0${[...t.signalKeys].sort().join(",")}`, + ) + .sort() + .join("|"), + [mergedTargets], + ); + + const lastSeenTimestampsRef = useRef(new Map()); + + useEffect(() => { + // Reference deps so the reset runs when subscription identity changes (Biome exhaustive-deps). + void signalKeysKey; + void mergedTargetsIdentity; + lastSeenTimestampsRef.current = new Map(); + }, [signalKeysKey, mergedTargetsIdentity]); + + useGitHubSignalStreamWebSocket( + mergedTargets, + signalKeysKey, + lastSeenTimestampsRef, + ); + useGitHubSignalPoll(mergedTargets, signalKeysKey, lastSeenTimestampsRef); }