Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
githubQueryKeys,
} from "#/lib/github.query";
import { githubRevalidationSignalKeys } from "#/lib/github-revalidation";
import { useGitHubSignalRefresh } from "#/lib/use-github-signal-refresh";
import { useGitHubSignalStream } from "#/lib/use-github-signal-stream";
import { useHasMounted } from "#/lib/use-has-mounted";
import { useRegisterTab } from "#/lib/use-register-tab";
import { IssueDetailActivitySection } from "./issue-detail-activity";
Expand Down Expand Up @@ -49,11 +49,7 @@ export function IssueDetailPage() {
...githubIssuePageQueryOptions(scope, input),
enabled: hasMounted,
});
useGitHubSignalRefresh({
enabled:
hasMounted && pageQuery.data !== undefined && !pageQuery.isFetching,
targets: webhookRefreshTargets,
});
useGitHubSignalStream(webhookRefreshTargets);

const issue = pageQuery.data?.detail;
const comments = pageQuery.data?.comments;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
githubViewerQueryOptions,
} from "#/lib/github.query";
import { githubRevalidationSignalKeys } from "#/lib/github-revalidation";
import { useGitHubSignalRefresh } from "#/lib/use-github-signal-refresh";
import { useGitHubSignalStream } from "#/lib/use-github-signal-stream";
import { useHasMounted } from "#/lib/use-has-mounted";
import { useRegisterTab } from "#/lib/use-register-tab";
import { PullBodySection } from "./pull-body-section";
Expand Down Expand Up @@ -59,11 +59,7 @@ export function PullDetailPage() {
...githubViewerQueryOptions(scope),
enabled: hasMounted,
});
useGitHubSignalRefresh({
enabled:
hasMounted && pageQuery.data !== undefined && !pageQuery.isFetching,
targets: webhookRefreshTargets,
});
useGitHubSignalStream(webhookRefreshTargets);

const pr = pageQuery.data?.detail;
const comments = pageQuery.data?.comments;
Expand Down
12 changes: 2 additions & 10 deletions apps/dashboard/src/components/pulls/review/review-page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import type {
PullReviewComment,
} from "#/lib/github.types";
import { githubRevalidationSignalKeys } from "#/lib/github-revalidation";
import { useGitHubSignalRefresh } from "#/lib/use-github-signal-refresh";
import { useGitHubSignalStream } from "#/lib/use-github-signal-stream";
import { useRegisterTab } from "#/lib/use-register-tab";
import { checkPermissionWarning } from "#/lib/warning-store";
import type { ReviewDiffPaneHandle } from "./review-diff-pane";
Expand Down Expand Up @@ -174,15 +174,7 @@ export function ReviewPage() {
enabled: hasDiffPayload,
refetchOnWindowFocus: false,
});
useGitHubSignalRefresh({
enabled:
pageQuery.data !== undefined &&
!pageQuery.isFetching &&
!fileSummariesQuery.isFetching &&
!filesQuery.isFetching &&
!reviewCommentsQuery.isFetching,
targets: webhookRefreshTargets,
});
useGitHubSignalStream(webhookRefreshTargets);

const pr = pageQuery.data?.detail ?? null;
const sidebarFiles = fileSummariesQuery.data ?? [];
Expand Down
58 changes: 58 additions & 0 deletions apps/dashboard/src/entry-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import startEntry from "@tanstack/react-start/server-entry";

export { SignalRelay } from "./lib/signal-relay.server";

async function handleWebSocketUpgrade(
request: Request,
env: Record<string, unknown>,
): Promise<Response> {
const { getAuth } = await import("#/lib/auth.server");
const session = await getAuth().api.getSession({
headers: request.headers,
});

if (!session) {
return new Response("Unauthorized", { status: 401 });
}

const signalRelay = env.SIGNAL_RELAY as DurableObjectNamespace | undefined;
if (!signalRelay) {
return new Response("Signal relay not configured", { status: 503 });
}

const id = signalRelay.idFromName("global");
const stub = signalRelay.get(id);
const doUrl = new URL(request.url);
doUrl.pathname = "/connect";

return stub.fetch(
new Request(doUrl.toString(), { headers: request.headers }),
);
}

export default {
async fetch(
request: Request,
env: Record<string, unknown>,
ctx: ExecutionContext,
): Promise<Response> {
const url = new URL(request.url);

if (
url.pathname === "/api/ws/signals" &&
request.headers.get("Upgrade") === "websocket"
) {
return handleWebSocketUpgrade(request, env);
}

// TanStack Start's type only declares (request, env?) but the runtime
// handler created by @cloudflare/vite-plugin passes (request, env, ctx)
// through to the underlying Worker fetch signature.
type WorkerFetch = (
request: Request,
env: Record<string, unknown>,
ctx: ExecutionContext,
) => Promise<Response>;
return (startEntry.fetch as unknown as WorkerFetch)(request, env, ctx);
},
};
1 change: 1 addition & 0 deletions apps/dashboard/src/env.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ declare namespace Cloudflare {
GITHUB_CLIENT_SECRET?: string;
BETTER_AUTH_SECRET: string;
BETTER_AUTH_URL: string;
SIGNAL_RELAY: DurableObjectNamespace;
}
}
9 changes: 0 additions & 9 deletions apps/dashboard/src/lib/github-revalidation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,6 @@ export const githubRevalidationSignalKeys = {
`repoCode:${input.owner}/${input.repo}`,
} as const;

export type GitHubRevalidationSignalRecord = {
signalKey: string;
updatedAt: number;
};

export type GitHubRevalidationSignalInput = {
signalKeys: string[];
};

function isRecord(value: unknown): value is Record<string, unknown> {
return Boolean(value) && typeof value === "object";
}
Expand Down
20 changes: 1 addition & 19 deletions apps/dashboard/src/lib/github.functions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,10 @@ import {
createGitHubResponseMetadata,
type GitHubConditionalHeaders,
type GitHubFetchResult,
getGitHubRevalidationSignals,
getOrRevalidateGitHubResource,
} from "./github-cache";
import { githubCachePolicy } from "./github-cache-policy";
import {
type GitHubRevalidationSignalInput,
githubRevalidationSignalKeys,
} from "./github-revalidation";
import { githubRevalidationSignalKeys } from "./github-revalidation";

type GitHubClient = OctokitType;
type AuthSession = {
Expand Down Expand Up @@ -4326,20 +4322,6 @@ function identityValidator<TInput>(data: TInput) {
return data;
}

export const getGitHubRevalidationSignalRecords = createServerFn({
method: "POST",
})
.inputValidator(identityValidator<GitHubRevalidationSignalInput>)
.handler(async ({ data }) => {
const { getRequestSession } = await import("./auth-runtime");
const session = await getRequestSession();
if (!session) {
return [];
}

return getGitHubRevalidationSignals(data.signalKeys);
});

export const getGitHubViewer = createServerFn({ method: "GET" }).handler(
async () => {
const context = await getGitHubContext();
Expand Down
32 changes: 32 additions & 0 deletions apps/dashboard/src/lib/signal-relay-broadcast.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import "@tanstack/react-start/server-only";
import { debug } from "./debug";

export async function broadcastSignalKeys(signalKeys: string[]) {
try {
const { env } = await import("cloudflare:workers");
const workerEnv = env as typeof env & {
SIGNAL_RELAY?: DurableObjectNamespace;
};

if (!workerEnv.SIGNAL_RELAY) {
debug(
"signal-relay",
"SIGNAL_RELAY binding not available, skipping broadcast",
);
return;
}

const id = workerEnv.SIGNAL_RELAY.idFromName("global");
const stub = workerEnv.SIGNAL_RELAY.get(id);

await stub.fetch("https://signal-relay/broadcast", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ signalKeys }),
});
} catch (error) {
debug("signal-relay", "broadcast failed", {
error: error instanceof Error ? error.message : String(error),
});
}
}
102 changes: 102 additions & 0 deletions apps/dashboard/src/lib/signal-relay.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import "@tanstack/react-start/server-only";
import { DurableObject } from "cloudflare:workers";

type SubscribeMessage = {
type: "subscribe";
keys: string[];
};

function isSubscribeMessage(data: unknown): data is SubscribeMessage {
return (
typeof data === "object" &&
data !== null &&
"type" in data &&
data.type === "subscribe" &&
"keys" in data &&
Array.isArray(data.keys) &&
data.keys.every((k: unknown) => typeof k === "string")
);
}

export class SignalRelay extends DurableObject {
private subscriptions = new Map<WebSocket, Set<string>>();

async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);

if (url.pathname === "/broadcast" && request.method === "POST") {
return this.handleBroadcast(request);
}

if (url.pathname === "/connect") {
return this.handleConnect(request);
}

return new Response("Not found", { status: 404 });
}

private handleConnect(request: Request): Response {
const upgradeHeader = request.headers.get("Upgrade");
if (upgradeHeader !== "websocket") {
return new Response("Expected WebSocket upgrade", { status: 426 });
}

const pair = new WebSocketPair();
const [client, server] = Object.values(pair);

server.accept();
this.subscriptions.set(server, new Set());

server.addEventListener("message", (event) => {
if (typeof event.data !== "string") return;

try {
const message: unknown = JSON.parse(event.data);
if (isSubscribeMessage(message)) {
this.subscriptions.set(server, new Set(message.keys));
}
} catch {
// ignore malformed messages
}
});

server.addEventListener("close", () => {
this.subscriptions.delete(server);
});

server.addEventListener("error", () => {
this.subscriptions.delete(server);
});

return new Response(null, {
status: 101,
webSocket: client,
} as ResponseInit);
}

private async handleBroadcast(request: Request): Promise<Response> {
const body = (await request.json()) as { signalKeys?: string[] };
const signalKeys = body.signalKeys;
if (!Array.isArray(signalKeys) || signalKeys.length === 0) {
return new Response("Missing signalKeys", { status: 400 });
}

const signalSet = new Set(signalKeys);
const payload = JSON.stringify({ type: "signals", keys: signalKeys });
let notified = 0;

for (const [ws, subscribedKeys] of this.subscriptions) {
const hasMatch = [...subscribedKeys].some((key) => signalSet.has(key));
if (!hasMatch) continue;

try {
ws.send(payload);
notified++;
} catch {
this.subscriptions.delete(ws);
}
}

return Response.json({ ok: true, notified });
}
}
Loading
Loading