Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions apps/dashboard/src/lib/github.functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6499,3 +6499,24 @@ export const getRepoContributors = createServerFn({ method: "GET" })
},
});
});

type RevalidationSignalTimestampsInput = { signalKeys: string[] };

export const getRevalidationSignalTimestamps = createServerFn({
method: "GET",
})
.inputValidator(identityValidator<RevalidationSignalTimestampsInput>)
.handler(
async ({
data,
}): Promise<Array<{ signalKey: string; updatedAt: number }>> => {
const { getRequestSession } = await import("./auth-runtime");
const session = await getRequestSession();
if (!session) {
return [];
}

const { getGitHubRevalidationSignals } = await import("./github-cache");
return getGitHubRevalidationSignals(data.signalKeys);
},
);
216 changes: 158 additions & 58 deletions apps/dashboard/src/lib/use-github-signal-stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { type QueryKey, useQueryClient } from "@tanstack/react-query";
import { useEffect, useMemo, useRef } from "react";
import { debug } from "./debug";
import { getRevalidationSignalTimestamps } from "./github.functions";

export type GitHubSignalStreamTarget = {
queryKey: QueryKey;
Expand All @@ -24,29 +25,69 @@ function isSignalMessage(data: unknown): data is SignalMessage {
}

const RECONNECT_DELAY_MS = 3_000;
const POLL_INTERVAL_MS = 5 * 60 * 1_000;

function getWebSocketUrl() {
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
return `${protocol}//${window.location.host}/api/ws/signals`;
}

export function useGitHubSignalStream(
export function invalidateTargets(
queryClient: ReturnType<typeof useQueryClient>,
targets: readonly GitHubSignalStreamTarget[],
receivedKeys: Set<string>,
source: string,
) {
let invalidatedCount = 0;

for (const target of targets) {
const matchedKeys = target.signalKeys.filter((key) =>
receivedKeys.has(key),
);
if (matchedKeys.length === 0) continue;

const queryState = queryClient.getQueryState(target.queryKey);
if (
!queryState ||
queryState.dataUpdatedAt === 0 ||
queryState.fetchStatus === "fetching"
) {
debug(source, "skipping query (no data or already fetching)", {
queryKey: target.queryKey,
matchedKeys,
reason: !queryState
? "no-state"
: queryState.dataUpdatedAt === 0
? "no-data"
: "fetching",
});
continue;
}

debug(source, "invalidating query", {
queryKey: target.queryKey,
matchedKeys,
});

void queryClient.invalidateQueries({
queryKey: target.queryKey,
exact: true,
refetchType: "active",
});
invalidatedCount++;
}

return invalidatedCount;
}

function useGitHubSignalStreamWebSocket(
targets: readonly GitHubSignalStreamTarget[],
signalKeysKey: string,
) {
const queryClient = useQueryClient();
const targetsRef = useRef(targets);
targetsRef.current = targets;

const allSignalKeys = useMemo(() => {
return Array.from(
new Set(targets.flatMap((target) => [...target.signalKeys])),
).sort();
}, [targets]);

// Stable string so the effect only re-runs when the actual keys change,
// not when the array reference changes.
const signalKeysKey = allSignalKeys.join(",");

useEffect(() => {
if (signalKeysKey.length === 0) {
return;
Expand Down Expand Up @@ -92,48 +133,12 @@ export function useGitHubSignalStream(

const receivedKeys = new Set(message.keys);
const currentTargets = targetsRef.current;
let invalidatedCount = 0;

for (const target of currentTargets) {
const matchedKeys = target.signalKeys.filter((key) =>
receivedKeys.has(key),
);
if (matchedKeys.length === 0) continue;

const queryState = queryClient.getQueryState(target.queryKey);
if (
!queryState ||
queryState.dataUpdatedAt === 0 ||
queryState.fetchStatus === "fetching"
) {
debug(
"github-signal-stream",
"skipping query (no data or already fetching)",
{
queryKey: target.queryKey,
matchedKeys,
reason: !queryState
? "no-state"
: queryState.dataUpdatedAt === 0
? "no-data"
: "fetching",
},
);
continue;
}

debug("github-signal-stream", "invalidating query", {
queryKey: target.queryKey,
matchedKeys,
});

void queryClient.invalidateQueries({
queryKey: target.queryKey,
exact: true,
refetchType: "active",
});
invalidatedCount++;
}
const invalidatedCount = invalidateTargets(
queryClient,
currentTargets,
receivedKeys,
"github-signal-stream",
);

debug("github-signal-stream", "broadcast processed", {
receivedKeys: message.keys,
Expand Down Expand Up @@ -182,22 +187,117 @@ export function useGitHubSignalStream(
reconnectTimer = setTimeout(connect, RECONNECT_DELAY_MS);
}

function cleanup() {
connect();

return () => {
disposed = true;
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (ws) {
ws.close();
ws = null;
}
};
}, [signalKeysKey, queryClient]);
}

function useGitHubSignalPoll(
targets: readonly GitHubSignalStreamTarget[],
signalKeysKey: string,
) {
const queryClient = useQueryClient();
const targetsRef = useRef(targets);
targetsRef.current = targets;

useEffect(() => {
if (signalKeysKey.length === 0) {
return;
}

const keys = signalKeysKey.split(",");
let pollTimer: ReturnType<typeof setTimeout> | null = null;
let disposed = false;
const lastSeenTimestamps = new Map<string, number>();

async function pollSignals() {
if (disposed) return;

try {
const signals = await getRevalidationSignalTimestamps({
data: { signalKeys: keys },
});

if (disposed) return;

const updatedKeys: string[] = [];
for (const signal of signals) {
const lastSeen = lastSeenTimestamps.get(signal.signalKey);
if (lastSeen === undefined) {
lastSeenTimestamps.set(signal.signalKey, signal.updatedAt);
} else if (signal.updatedAt > lastSeen) {
lastSeenTimestamps.set(signal.signalKey, signal.updatedAt);
updatedKeys.push(signal.signalKey);
}
}

if (updatedKeys.length > 0) {
debug("github-signal-poll", "detected missed signals", {
updatedKeys,
});

const currentTargets = targetsRef.current;
const invalidatedCount = invalidateTargets(
queryClient,
currentTargets,
new Set(updatedKeys),
"github-signal-poll",
);

debug("github-signal-poll", "poll processed", {
updatedKeys,
invalidatedCount,
totalTargets: currentTargets.length,
});
} else {
debug("github-signal-poll", "no missed signals");
}
} catch (error) {
debug("github-signal-poll", "poll failed", { error });
}

schedulePoll();
}

connect();
function schedulePoll() {
if (disposed) return;
pollTimer = setTimeout(pollSignals, POLL_INTERVAL_MS);
}

// Seed timestamps immediately, then poll every 5 minutes
void pollSignals();

return () => {
disposed = true;
cleanup();
if (pollTimer) {
clearTimeout(pollTimer);
}
};
}, [signalKeysKey, queryClient]);
}

export function useGitHubSignalStream(
targets: readonly GitHubSignalStreamTarget[],
) {
const allSignalKeys = useMemo(() => {
return Array.from(
new Set(targets.flatMap((target) => [...target.signalKeys])),
).sort();
}, [targets]);

// Stable string so the effects only re-run when the actual keys change,
// not when the array reference changes.
const signalKeysKey = allSignalKeys.join(",");

useGitHubSignalStreamWebSocket(targets, signalKeysKey);
useGitHubSignalPoll(targets, signalKeysKey);
}
Loading
Loading