diff --git a/backend/src/env.ts b/backend/src/env.ts index 97c410f..adcec52 100644 --- a/backend/src/env.ts +++ b/backend/src/env.ts @@ -85,4 +85,5 @@ export const env = { "REFRESH_SCHEDULER_STALE_AFTER_MS", 6 * 60 * 60 * 1000, ), + }; diff --git a/backend/src/index.ts b/backend/src/index.ts index cb57cd1..4d63f42 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1,6 +1,7 @@ import Fastify, { type FastifyBaseLogger, type FastifyReply } from "fastify"; import fastifyCors from "@fastify/cors"; import type { ClerkClient } from "@clerk/backend"; +import { z } from "zod"; import { env } from "./env.js"; import clerkAuthPlugin, { requireAuth, getUserEmail } from "./clerk-auth.js"; @@ -15,6 +16,7 @@ import { capture, shutdown as shutdownAnalytics } from "./analytics/posthog.js"; import { EVENTS } from "./analytics/events.js"; import { registerDataset, deregisterDataset, abortDataset } from "./abort-registry.js"; import { + LOCAL_USER_ID, clearLegacyPlaintextLocalCredentials, exchangeOpenRouterOAuthCode, getLocalSetupStatus, @@ -47,6 +49,45 @@ type DatasetUpdateBeginOutcome = | "already_updating"; type UpdateWorkflowRun = Awaited>; +const refreshCadenceSchema = z.enum(["manual", "30m", "6h", "12h", "daily", "weekly"]); +const cliCreateDatasetSchema = z.object({ + prompt: z.string().trim().min(1), + maxRowCount: z.number().int().min(1).max(2500).default(100), + refreshCadence: refreshCadenceSchema.default("manual"), +}); +const cliDatasetIdParamsSchema = z.object({ + datasetId: z.string().min(1), +}); + +function cliDatasetNameFromSchemaName(name: string): string { + return name + .split("_") + .filter(Boolean) + .map((part) => part.charAt(0).toUpperCase() + part.slice(1)) + .join(" "); +} + +function mapSchemaTypeToDatasetType( + type: "string" | "url" | "date" | "number" | "boolean" | "enum", +): "text" | "number" | "boolean" | "url" | "date" { + if (type === "url" || type === "date" || type === "number" || type === "boolean") { + return type; + } + return "text"; +} + +function requireLocalCli(reply: FastifyReply): string | null { + if (!env.IS_LOCAL_MODE) { + void reply.code(404).send({ error: "Not found" }); + return null; + } + if (!env.CONVEX_ADMIN_KEY) { + void reply.code(500).send({ error: "CONVEX_SELF_HOSTED_ADMIN_KEY is required" }); + return null; + } + return LOCAL_USER_ID; +} + function statusErrorMessage(err: unknown): string { const message = err instanceof Error ? err.message : String(err); return message.slice(0, 500); @@ -782,6 +823,243 @@ fastify.get("/openrouter/models", async (req, reply) => { } }); +// ──────────────────────────────────────────────────────────────────────── +// Local trusted CLI routes +// ──────────────────────────────────────────────────────────────────────── + +fastify.get("/cli/datasets", async (_req, reply) => { + const ownerId = requireLocalCli(reply); + if (!ownerId) return; + + try { + const datasets = await convex.query(internal.datasets.listByOwnerInternal, { + ownerId, + }); + return { datasets }; + } catch (err) { + fastify.log.error(err, "CLI dataset list failed"); + return reply.code(502).send({ error: "Failed to list datasets" }); + } +}); + +fastify.post("/cli/datasets", async (req, reply) => { + const ownerId = requireLocalCli(reply); + if (!ownerId) return; + if (!(await ensureLocalSetupReady(reply))) return; + + const parsed = cliCreateDatasetSchema.safeParse(req.body); + if (!parsed.success) { + return reply.code(400).send({ + error: "Invalid request", + details: parsed.error.flatten().fieldErrors, + }); + } + + try { + const schema = await inferSchema(parsed.data.prompt); + const columns = schema.columns.map((column) => ({ + name: column.name, + type: mapSchemaTypeToDatasetType(column.type), + description: column.retrieval_hint, + isPrimaryKey: column.is_primary_key || undefined, + })); + + const datasetId = await convex.mutation( + internal.datasets.createForOwnerInternal, + { + ownerId, + name: cliDatasetNameFromSchemaName(schema.dataset_name), + description: parsed.data.prompt, + refreshCadence: parsed.data.refreshCadence, + maxRowCount: parsed.data.maxRowCount, + columns, + retrievalStrategy: schema.retrieval_strategy, + sourceHint: schema.source_hint, + }, + ); + + const dataset = await convex.query(internal.datasets.getOwnedInternal, { + id: datasetId, + ownerId, + }); + + return reply.code(201).send({ dataset, schema }); + } catch (err) { + req.log.error(err, "CLI dataset create failed"); + return reply.code(502).send({ error: "Failed to create dataset" }); + } +}); + +fastify.get("/cli/datasets/:datasetId", async (req, reply) => { + const ownerId = requireLocalCli(reply); + if (!ownerId) return; + + const params = cliDatasetIdParamsSchema.safeParse(req.params); + if (!params.success) { + return reply.code(400).send({ error: "datasetId is required" }); + } + + try { + const dataset = await convex.query(internal.datasets.getOwnedInternal, { + id: params.data.datasetId, + ownerId, + }); + if (!dataset) return reply.code(404).send({ error: "Dataset not found" }); + return { dataset }; + } catch (err) { + req.log.error(err, "CLI dataset get failed"); + return reply.code(400).send({ error: "Invalid datasetId" }); + } +}); + +fastify.get("/cli/datasets/:datasetId/rows", async (req, reply) => { + const ownerId = requireLocalCli(reply); + if (!ownerId) return; + + const params = cliDatasetIdParamsSchema.safeParse(req.params); + if (!params.success) { + return reply.code(400).send({ error: "datasetId is required" }); + } + + try { + const rows = await convex.query( + internal.datasetRows.listByOwnedDatasetInternal, + { + datasetId: params.data.datasetId, + ownerId, + }, + ); + if (!rows) return reply.code(404).send({ error: "Dataset not found" }); + return { rows }; + } catch (err) { + req.log.error(err, "CLI rows get failed"); + return reply.code(400).send({ error: "Invalid datasetId" }); + } +}); + +fastify.post("/cli/datasets/:datasetId/populate", async (req, reply) => { + const ownerId = requireLocalCli(reply); + if (!ownerId) return; + if (!(await ensureLocalSetupReady(reply))) return; + + const params = cliDatasetIdParamsSchema.safeParse(req.params); + if (!params.success) { + return reply.code(400).send({ error: "datasetId is required" }); + } + + let claimedDatasetId: string | null = null; + try { + const dataset = await convex.query(internal.datasets.getOwnedInternal, { + id: params.data.datasetId, + ownerId, + }); + if (!dataset) return reply.code(404).send({ error: "Dataset not found" }); + + const populateOutcome = await beginDatasetPopulate(dataset._id, ownerId); + if (populateOutcome === "already_building") { + return reply.code(409).send({ error: "Dataset is already being populated" }); + } + if (populateOutcome === "already_updating") { + return reply.code(409).send({ error: "Dataset is already being updated" }); + } + if (populateOutcome !== "started") { + return reply.code(409).send({ error: `Cannot populate dataset: ${populateOutcome}` }); + } + claimedDatasetId = dataset._id; + + const { getModelConfig } = await import("./config/models.js"); + const modelConfig = await getModelConfig(ownerId); + const run = await populateWorkflow.createRun(); + const controller = registerDataset(dataset._id); + + void runPopulateWorkflowInBackground({ + input: { + datasetId: dataset._id, + datasetName: dataset.name, + description: dataset.description, + maxRowCount: dataset.maxRowCount ?? 100, + columns: dataset.columns, + }, + run, + controller, + authorizedUserId: ownerId, + logger: req.log, + clerk: req.server.clerk, + modelConfig, + }); + + return reply.code(202).send({ success: true, runId: run.runId }); + } catch (err) { + req.log.error(err, "CLI populate failed"); + if (claimedDatasetId) { + try { + await setDatasetPopulateStatus( + claimedDatasetId, + "failed", + statusErrorMessage(err), + ); + } catch (statusErr) { + req.log.error( + { err: statusErr, datasetId: claimedDatasetId }, + "Failed to release CLI populate dataset claim", + ); + } + } + return reply.code(502).send({ error: "Failed to populate dataset" }); + } +}); + +fastify.post("/cli/datasets/:datasetId/stop", async (req, reply) => { + const ownerId = requireLocalCli(reply); + if (!ownerId) return; + + const params = cliDatasetIdParamsSchema.safeParse(req.params); + if (!params.success) { + return reply.code(400).send({ error: "datasetId is required" }); + } + + try { + const dataset = await convex.query(internal.datasets.getOwnedInternal, { + id: params.data.datasetId, + ownerId, + }); + if (!dataset) return reply.code(404).send({ error: "Dataset not found" }); + if (dataset.status !== "building" && dataset.status !== "updating") { + return reply.code(409).send({ error: "Dataset is not currently running" }); + } + + const aborted = abortDataset(dataset._id); + if (!aborted) { + req.log.warn( + { datasetId: dataset._id }, + "CLI stop requested for orphaned dataset (no active run registered); forcing to failed", + ); + try { + if (dataset.status === "updating") { + await convex.mutation(internal.datasetRows.clearAllPendingUpdateStatus, { + datasetId: dataset._id, + }); + } + await setDatasetPopulateStatus( + dataset._id, + "failed", + "Run interrupted: no active local CLI run registered", + ); + } catch (statusErr) { + req.log.error( + { err: statusErr, datasetId: dataset._id }, + "Failed to force-transition orphaned CLI dataset to failed", + ); + } + return reply.code(200).send({ success: true }); + } + return reply.code(202).send({ success: true }); + } catch (err) { + req.log.error(err, "CLI stop failed"); + return reply.code(502).send({ error: "Failed to stop dataset run" }); + } +}); + // ──────────────────────────────────────────────────────────────────────── // Protected routes — gated by Clerk JWT verification // ──────────────────────────────────────────────────────────────────────── diff --git a/frontend/convex/datasetRows.ts b/frontend/convex/datasetRows.ts index eac39ea..a4a877c 100644 --- a/frontend/convex/datasetRows.ts +++ b/frontend/convex/datasetRows.ts @@ -44,6 +44,22 @@ export const listByDataset = query({ }, }); +export const listByOwnedDatasetInternal = internalQuery({ + args: { + datasetId: v.id("datasets"), + ownerId: v.string(), + }, + handler: async (ctx, args) => { + const dataset = await ctx.db.get(args.datasetId); + if (!dataset || dataset.ownerId !== args.ownerId) return null; + + return await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + }, +}); + /** * Row writes are SYSTEM-LEVEL operations performed by the agent runner, * never by end users directly. They are exposed as `internalMutation` so diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index d295327..791671b 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -406,6 +406,62 @@ export const create = mutation({ }, }); +export const createForOwnerInternal = internalMutation({ + args: { + ownerId: v.string(), + name: v.string(), + description: v.string(), + refreshCadence: refreshCadenceValidator, + maxRowCount: v.number(), + columns: v.array(columnValidator), + retrievalStrategy: v.optional( + v.union( + v.literal("search_fetch"), + v.literal("browser"), + v.literal("hybrid") + ) + ), + sourceHint: v.optional(v.string()), + }, + handler: async (ctx, args) => { + assertNotReservedOwner(args.ownerId); + validateMaxRowCount(args.maxRowCount); + await requireQuotaRemaining(ctx, args.ownerId, args.maxRowCount); + + return await ctx.db.insert("datasets", { + ...args, + status: "paused", + visibility: "private", + rowCount: 0, + refreshEnabled: args.refreshCadence !== "manual", + nextRefreshAt: nextRefreshAtFor(args.refreshCadence, Date.now()), + }); + }, +}); + +export const listByOwnerInternal = internalQuery({ + args: { ownerId: v.string() }, + handler: async (ctx, args) => { + return await ctx.db + .query("datasets") + .withIndex("by_owner", (q) => q.eq("ownerId", args.ownerId)) + .order("desc") + .collect(); + }, +}); + +export const getOwnedInternal = internalQuery({ + args: { + id: v.id("datasets"), + ownerId: v.string(), + }, + handler: async (ctx, args) => { + const dataset = await ctx.db.get(args.id); + if (!dataset || dataset.ownerId !== args.ownerId) return null; + return dataset; + }, +}); + export const updateRefreshSettings = mutation({ args: { id: v.id("datasets"), diff --git a/scripts/build-release.mjs b/scripts/build-release.mjs index 45e3ea3..9405e92 100644 --- a/scripts/build-release.mjs +++ b/scripts/build-release.mjs @@ -138,7 +138,7 @@ start( start( "frontend", process.execPath, - [fromRoot("./frontend/server.js")], + [fromRoot("./frontend/frontend/server.js")], { ...process.env, PORT: frontendPort, @@ -148,7 +148,7 @@ start( NEXT_PUBLIC_PROD: process.env.NEXT_PUBLIC_PROD || "", PROD: process.env.PROD || "", }, - fromRoot("./frontend"), + fromRoot("./frontend/frontend"), ); console.log(""); @@ -208,6 +208,10 @@ async function bundleBackend() { } async function copyConvexRuntime() { + await cp( + join(frontendDir, "package.json"), + join(packageRoot, "frontend", "package.json"), + ); await cp(join(frontendDir, "convex"), join(packageRoot, "frontend", "convex"), { recursive: true, }); @@ -250,12 +254,12 @@ async function main() { console.log("Assembling release directory..."); await cp(standaloneDir, join(packageRoot, "frontend"), { recursive: true }); - await cp(join(frontendDir, "public"), join(packageRoot, "frontend", "public"), { + await cp(join(frontendDir, "public"), join(packageRoot, "frontend", "frontend", "public"), { recursive: true, }); await cp( join(frontendDir, ".next", "static"), - join(packageRoot, "frontend", ".next", "static"), + join(packageRoot, "frontend", "frontend", ".next", "static"), { recursive: true }, ); await copyConvexRuntime();