Skip to content

Commit 71b8989

Browse files
authored
Real-time webhook revalidation via Durable Objects and WebSockets (#98)
* Add real-time webhook revalidation via Durable Objects and WebSockets Replace the one-shot polling system (useGitHubSignalRefresh + SSE endpoint) with a persistent WebSocket connection backed by a Cloudflare Durable Object. When a GitHub webhook arrives, the signal is broadcast to all connected clients, which then invalidate only the matching React Query caches instantly. * Keep WebSocket connected when tab loses focus Removes the visibility change listener that disconnected/reconnected the signal stream on tab blur/focus, preventing missed webhook updates.
1 parent 9ec025c commit 71b8989

16 files changed

Lines changed: 472 additions & 161 deletions

apps/dashboard/src/components/issues/detail/issue-detail-page.tsx

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
githubQueryKeys,
1313
} from "#/lib/github.query";
1414
import { githubRevalidationSignalKeys } from "#/lib/github-revalidation";
15-
import { useGitHubSignalRefresh } from "#/lib/use-github-signal-refresh";
15+
import { useGitHubSignalStream } from "#/lib/use-github-signal-stream";
1616
import { useHasMounted } from "#/lib/use-has-mounted";
1717
import { useRegisterTab } from "#/lib/use-register-tab";
1818
import { IssueDetailActivitySection } from "./issue-detail-activity";
@@ -49,11 +49,7 @@ export function IssueDetailPage() {
4949
...githubIssuePageQueryOptions(scope, input),
5050
enabled: hasMounted,
5151
});
52-
useGitHubSignalRefresh({
53-
enabled:
54-
hasMounted && pageQuery.data !== undefined && !pageQuery.isFetching,
55-
targets: webhookRefreshTargets,
56-
});
52+
useGitHubSignalStream(webhookRefreshTargets);
5753

5854
const issue = pageQuery.data?.detail;
5955
const comments = pageQuery.data?.comments;

apps/dashboard/src/components/pulls/detail/pull-detail-page.tsx

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import {
1313
githubViewerQueryOptions,
1414
} from "#/lib/github.query";
1515
import { githubRevalidationSignalKeys } from "#/lib/github-revalidation";
16-
import { useGitHubSignalRefresh } from "#/lib/use-github-signal-refresh";
16+
import { useGitHubSignalStream } from "#/lib/use-github-signal-stream";
1717
import { useHasMounted } from "#/lib/use-has-mounted";
1818
import { useRegisterTab } from "#/lib/use-register-tab";
1919
import { PullBodySection } from "./pull-body-section";
@@ -59,11 +59,7 @@ export function PullDetailPage() {
5959
...githubViewerQueryOptions(scope),
6060
enabled: hasMounted,
6161
});
62-
useGitHubSignalRefresh({
63-
enabled:
64-
hasMounted && pageQuery.data !== undefined && !pageQuery.isFetching,
65-
targets: webhookRefreshTargets,
66-
});
62+
useGitHubSignalStream(webhookRefreshTargets);
6763

6864
const pr = pageQuery.data?.detail;
6965
const comments = pageQuery.data?.comments;

apps/dashboard/src/components/pulls/review/review-page.tsx

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import type {
5353
PullReviewComment,
5454
} from "#/lib/github.types";
5555
import { githubRevalidationSignalKeys } from "#/lib/github-revalidation";
56-
import { useGitHubSignalRefresh } from "#/lib/use-github-signal-refresh";
56+
import { useGitHubSignalStream } from "#/lib/use-github-signal-stream";
5757
import { useRegisterTab } from "#/lib/use-register-tab";
5858
import { checkPermissionWarning } from "#/lib/warning-store";
5959
import type { ReviewDiffPaneHandle } from "./review-diff-pane";
@@ -174,15 +174,7 @@ export function ReviewPage() {
174174
enabled: hasDiffPayload,
175175
refetchOnWindowFocus: false,
176176
});
177-
useGitHubSignalRefresh({
178-
enabled:
179-
pageQuery.data !== undefined &&
180-
!pageQuery.isFetching &&
181-
!fileSummariesQuery.isFetching &&
182-
!filesQuery.isFetching &&
183-
!reviewCommentsQuery.isFetching,
184-
targets: webhookRefreshTargets,
185-
});
177+
useGitHubSignalStream(webhookRefreshTargets);
186178

187179
const pr = pageQuery.data?.detail ?? null;
188180
const sidebarFiles = fileSummariesQuery.data ?? [];

apps/dashboard/src/entry-worker.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import startEntry from "@tanstack/react-start/server-entry";
2+
3+
export { SignalRelay } from "./lib/signal-relay.server";
4+
5+
async function handleWebSocketUpgrade(
6+
request: Request,
7+
env: Record<string, unknown>,
8+
): Promise<Response> {
9+
const { getAuth } = await import("#/lib/auth.server");
10+
const session = await getAuth().api.getSession({
11+
headers: request.headers,
12+
});
13+
14+
if (!session) {
15+
return new Response("Unauthorized", { status: 401 });
16+
}
17+
18+
const signalRelay = env.SIGNAL_RELAY as DurableObjectNamespace | undefined;
19+
if (!signalRelay) {
20+
return new Response("Signal relay not configured", { status: 503 });
21+
}
22+
23+
const id = signalRelay.idFromName("global");
24+
const stub = signalRelay.get(id);
25+
const doUrl = new URL(request.url);
26+
doUrl.pathname = "/connect";
27+
28+
return stub.fetch(
29+
new Request(doUrl.toString(), { headers: request.headers }),
30+
);
31+
}
32+
33+
export default {
34+
async fetch(
35+
request: Request,
36+
env: Record<string, unknown>,
37+
ctx: ExecutionContext,
38+
): Promise<Response> {
39+
const url = new URL(request.url);
40+
41+
if (
42+
url.pathname === "/api/ws/signals" &&
43+
request.headers.get("Upgrade") === "websocket"
44+
) {
45+
return handleWebSocketUpgrade(request, env);
46+
}
47+
48+
// TanStack Start's type only declares (request, env?) but the runtime
49+
// handler created by @cloudflare/vite-plugin passes (request, env, ctx)
50+
// through to the underlying Worker fetch signature.
51+
type WorkerFetch = (
52+
request: Request,
53+
env: Record<string, unknown>,
54+
ctx: ExecutionContext,
55+
) => Promise<Response>;
56+
return (startEntry.fetch as unknown as WorkerFetch)(request, env, ctx);
57+
},
58+
};

apps/dashboard/src/env.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@ declare namespace Cloudflare {
1515
GITHUB_CLIENT_SECRET?: string;
1616
BETTER_AUTH_SECRET: string;
1717
BETTER_AUTH_URL: string;
18+
SIGNAL_RELAY: DurableObjectNamespace;
1819
}
1920
}

apps/dashboard/src/lib/github-revalidation.ts

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,6 @@ export const githubRevalidationSignalKeys = {
2424
`repoCode:${input.owner}/${input.repo}`,
2525
} as const;
2626

27-
export type GitHubRevalidationSignalRecord = {
28-
signalKey: string;
29-
updatedAt: number;
30-
};
31-
32-
export type GitHubRevalidationSignalInput = {
33-
signalKeys: string[];
34-
};
35-
3627
function isRecord(value: unknown): value is Record<string, unknown> {
3728
return Boolean(value) && typeof value === "object";
3829
}

apps/dashboard/src/lib/github.functions.ts

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,10 @@ import {
6060
createGitHubResponseMetadata,
6161
type GitHubConditionalHeaders,
6262
type GitHubFetchResult,
63-
getGitHubRevalidationSignals,
6463
getOrRevalidateGitHubResource,
6564
} from "./github-cache";
6665
import { githubCachePolicy } from "./github-cache-policy";
67-
import {
68-
type GitHubRevalidationSignalInput,
69-
githubRevalidationSignalKeys,
70-
} from "./github-revalidation";
66+
import { githubRevalidationSignalKeys } from "./github-revalidation";
7167

7268
type GitHubClient = OctokitType;
7369
type AuthSession = {
@@ -4326,20 +4322,6 @@ function identityValidator<TInput>(data: TInput) {
43264322
return data;
43274323
}
43284324

4329-
export const getGitHubRevalidationSignalRecords = createServerFn({
4330-
method: "POST",
4331-
})
4332-
.inputValidator(identityValidator<GitHubRevalidationSignalInput>)
4333-
.handler(async ({ data }) => {
4334-
const { getRequestSession } = await import("./auth-runtime");
4335-
const session = await getRequestSession();
4336-
if (!session) {
4337-
return [];
4338-
}
4339-
4340-
return getGitHubRevalidationSignals(data.signalKeys);
4341-
});
4342-
43434325
export const getGitHubViewer = createServerFn({ method: "GET" }).handler(
43444326
async () => {
43454327
const context = await getGitHubContext();
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import "@tanstack/react-start/server-only";
2+
import { debug } from "./debug";
3+
4+
export async function broadcastSignalKeys(signalKeys: string[]) {
5+
try {
6+
const { env } = await import("cloudflare:workers");
7+
const workerEnv = env as typeof env & {
8+
SIGNAL_RELAY?: DurableObjectNamespace;
9+
};
10+
11+
if (!workerEnv.SIGNAL_RELAY) {
12+
debug(
13+
"signal-relay",
14+
"SIGNAL_RELAY binding not available, skipping broadcast",
15+
);
16+
return;
17+
}
18+
19+
const id = workerEnv.SIGNAL_RELAY.idFromName("global");
20+
const stub = workerEnv.SIGNAL_RELAY.get(id);
21+
22+
await stub.fetch("https://signal-relay/broadcast", {
23+
method: "POST",
24+
headers: { "Content-Type": "application/json" },
25+
body: JSON.stringify({ signalKeys }),
26+
});
27+
} catch (error) {
28+
debug("signal-relay", "broadcast failed", {
29+
error: error instanceof Error ? error.message : String(error),
30+
});
31+
}
32+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import "@tanstack/react-start/server-only";
2+
import { DurableObject } from "cloudflare:workers";
3+
4+
type SubscribeMessage = {
5+
type: "subscribe";
6+
keys: string[];
7+
};
8+
9+
function isSubscribeMessage(data: unknown): data is SubscribeMessage {
10+
return (
11+
typeof data === "object" &&
12+
data !== null &&
13+
"type" in data &&
14+
data.type === "subscribe" &&
15+
"keys" in data &&
16+
Array.isArray(data.keys) &&
17+
data.keys.every((k: unknown) => typeof k === "string")
18+
);
19+
}
20+
21+
export class SignalRelay extends DurableObject {
22+
private subscriptions = new Map<WebSocket, Set<string>>();
23+
24+
async fetch(request: Request): Promise<Response> {
25+
const url = new URL(request.url);
26+
27+
if (url.pathname === "/broadcast" && request.method === "POST") {
28+
return this.handleBroadcast(request);
29+
}
30+
31+
if (url.pathname === "/connect") {
32+
return this.handleConnect(request);
33+
}
34+
35+
return new Response("Not found", { status: 404 });
36+
}
37+
38+
private handleConnect(request: Request): Response {
39+
const upgradeHeader = request.headers.get("Upgrade");
40+
if (upgradeHeader !== "websocket") {
41+
return new Response("Expected WebSocket upgrade", { status: 426 });
42+
}
43+
44+
const pair = new WebSocketPair();
45+
const [client, server] = Object.values(pair);
46+
47+
server.accept();
48+
this.subscriptions.set(server, new Set());
49+
50+
server.addEventListener("message", (event) => {
51+
if (typeof event.data !== "string") return;
52+
53+
try {
54+
const message: unknown = JSON.parse(event.data);
55+
if (isSubscribeMessage(message)) {
56+
this.subscriptions.set(server, new Set(message.keys));
57+
}
58+
} catch {
59+
// ignore malformed messages
60+
}
61+
});
62+
63+
server.addEventListener("close", () => {
64+
this.subscriptions.delete(server);
65+
});
66+
67+
server.addEventListener("error", () => {
68+
this.subscriptions.delete(server);
69+
});
70+
71+
return new Response(null, {
72+
status: 101,
73+
webSocket: client,
74+
} as ResponseInit);
75+
}
76+
77+
private async handleBroadcast(request: Request): Promise<Response> {
78+
const body = (await request.json()) as { signalKeys?: string[] };
79+
const signalKeys = body.signalKeys;
80+
if (!Array.isArray(signalKeys) || signalKeys.length === 0) {
81+
return new Response("Missing signalKeys", { status: 400 });
82+
}
83+
84+
const signalSet = new Set(signalKeys);
85+
const payload = JSON.stringify({ type: "signals", keys: signalKeys });
86+
let notified = 0;
87+
88+
for (const [ws, subscribedKeys] of this.subscriptions) {
89+
const hasMatch = [...subscribedKeys].some((key) => signalSet.has(key));
90+
if (!hasMatch) continue;
91+
92+
try {
93+
ws.send(payload);
94+
notified++;
95+
} catch {
96+
this.subscriptions.delete(ws);
97+
}
98+
}
99+
100+
return Response.json({ ok: true, notified });
101+
}
102+
}

0 commit comments

Comments
 (0)