Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ export const env = {
"REFRESH_SCHEDULER_STALE_AFTER_MS",
6 * 60 * 60 * 1000,
),

};
278 changes: 278 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Fastify, { type FastifyBaseLogger, type FastifyReply } from "fastify";
import fastifyCors from "@fastify/cors";
import type { ClerkClient } from "@clerk/backend";
import { z } from "zod";

import { env } from "./env.js";
import clerkAuthPlugin, { requireAuth, getUserEmail } from "./clerk-auth.js";
Expand All @@ -15,6 +16,7 @@ import { capture, shutdown as shutdownAnalytics } from "./analytics/posthog.js";
import { EVENTS } from "./analytics/events.js";
import { registerDataset, deregisterDataset, abortDataset } from "./abort-registry.js";
import {
LOCAL_USER_ID,
clearLegacyPlaintextLocalCredentials,
exchangeOpenRouterOAuthCode,
getLocalSetupStatus,
Expand Down Expand Up @@ -47,6 +49,45 @@ type DatasetUpdateBeginOutcome =
| "already_updating";
type UpdateWorkflowRun = Awaited<ReturnType<typeof updateWorkflow.createRun>>;

const refreshCadenceSchema = z.enum(["manual", "30m", "6h", "12h", "daily", "weekly"]);
const cliCreateDatasetSchema = z.object({
prompt: z.string().trim().min(1),
maxRowCount: z.number().int().min(1).max(2500).default(100),
refreshCadence: refreshCadenceSchema.default("manual"),
});
const cliDatasetIdParamsSchema = z.object({
datasetId: z.string().min(1),
});

function cliDatasetNameFromSchemaName(name: string): string {
return name
.split("_")
.filter(Boolean)
.map((part) => part.charAt(0).toUpperCase() + part.slice(1))
.join(" ");
}

function mapSchemaTypeToDatasetType(
type: "string" | "url" | "date" | "number" | "boolean" | "enum",
): "text" | "number" | "boolean" | "url" | "date" {
if (type === "url" || type === "date" || type === "number" || type === "boolean") {
return type;
}
return "text";
}

function requireLocalCli(reply: FastifyReply): string | null {
if (!env.IS_LOCAL_MODE) {
void reply.code(404).send({ error: "Not found" });
return null;
}
if (!env.CONVEX_ADMIN_KEY) {
void reply.code(500).send({ error: "CONVEX_SELF_HOSTED_ADMIN_KEY is required" });
return null;
}
return LOCAL_USER_ID;
}

function statusErrorMessage(err: unknown): string {
const message = err instanceof Error ? err.message : String(err);
return message.slice(0, 500);
Expand Down Expand Up @@ -782,6 +823,243 @@ fastify.get("/openrouter/models", async (req, reply) => {
}
});

// ────────────────────────────────────────────────────────────────────────
// Local trusted CLI routes
// ────────────────────────────────────────────────────────────────────────

fastify.get("/cli/datasets", async (_req, reply) => {
const ownerId = requireLocalCli(reply);
if (!ownerId) return;

try {
const datasets = await convex.query(internal.datasets.listByOwnerInternal, {
ownerId,
});
return { datasets };
} catch (err) {
fastify.log.error(err, "CLI dataset list failed");
return reply.code(502).send({ error: "Failed to list datasets" });
}
});

fastify.post("/cli/datasets", async (req, reply) => {
const ownerId = requireLocalCli(reply);
if (!ownerId) return;
if (!(await ensureLocalSetupReady(reply))) return;

const parsed = cliCreateDatasetSchema.safeParse(req.body);
if (!parsed.success) {
return reply.code(400).send({
error: "Invalid request",
details: parsed.error.flatten().fieldErrors,
});
}

try {
const schema = await inferSchema(parsed.data.prompt);
const columns = schema.columns.map((column) => ({
name: column.name,
type: mapSchemaTypeToDatasetType(column.type),
description: column.retrieval_hint,
isPrimaryKey: column.is_primary_key || undefined,
}));

const datasetId = await convex.mutation(
internal.datasets.createForOwnerInternal,
{
ownerId,
name: cliDatasetNameFromSchemaName(schema.dataset_name),
description: parsed.data.prompt,
refreshCadence: parsed.data.refreshCadence,
maxRowCount: parsed.data.maxRowCount,
columns,
retrievalStrategy: schema.retrieval_strategy,
sourceHint: schema.source_hint,
},
);

const dataset = await convex.query(internal.datasets.getOwnedInternal, {
id: datasetId,
ownerId,
});

return reply.code(201).send({ dataset, schema });
} catch (err) {
req.log.error(err, "CLI dataset create failed");
return reply.code(502).send({ error: "Failed to create dataset" });
}
});

fastify.get("/cli/datasets/:datasetId", async (req, reply) => {
const ownerId = requireLocalCli(reply);
if (!ownerId) return;

const params = cliDatasetIdParamsSchema.safeParse(req.params);
if (!params.success) {
return reply.code(400).send({ error: "datasetId is required" });
}

try {
const dataset = await convex.query(internal.datasets.getOwnedInternal, {
id: params.data.datasetId,
ownerId,
});
if (!dataset) return reply.code(404).send({ error: "Dataset not found" });
return { dataset };
} catch (err) {
req.log.error(err, "CLI dataset get failed");
return reply.code(400).send({ error: "Invalid datasetId" });
}
});

fastify.get("/cli/datasets/:datasetId/rows", async (req, reply) => {
const ownerId = requireLocalCli(reply);
if (!ownerId) return;

const params = cliDatasetIdParamsSchema.safeParse(req.params);
if (!params.success) {
return reply.code(400).send({ error: "datasetId is required" });
}

try {
const rows = await convex.query(
internal.datasetRows.listByOwnedDatasetInternal,
{
datasetId: params.data.datasetId,
ownerId,
},
);
if (!rows) return reply.code(404).send({ error: "Dataset not found" });
return { rows };
} catch (err) {
req.log.error(err, "CLI rows get failed");
return reply.code(400).send({ error: "Invalid datasetId" });
}
});

fastify.post("/cli/datasets/:datasetId/populate", async (req, reply) => {
const ownerId = requireLocalCli(reply);
if (!ownerId) return;
if (!(await ensureLocalSetupReady(reply))) return;

const params = cliDatasetIdParamsSchema.safeParse(req.params);
if (!params.success) {
return reply.code(400).send({ error: "datasetId is required" });
}

let claimedDatasetId: string | null = null;
try {
const dataset = await convex.query(internal.datasets.getOwnedInternal, {
id: params.data.datasetId,
ownerId,
});
if (!dataset) return reply.code(404).send({ error: "Dataset not found" });

const populateOutcome = await beginDatasetPopulate(dataset._id, ownerId);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If model config lookup, keychain access, or run creation fails, the catch returns 502 but never transitions the dataset out of building.

if (populateOutcome === "already_building") {
return reply.code(409).send({ error: "Dataset is already being populated" });
}
if (populateOutcome === "already_updating") {
return reply.code(409).send({ error: "Dataset is already being updated" });
}
if (populateOutcome !== "started") {
return reply.code(409).send({ error: `Cannot populate dataset: ${populateOutcome}` });
}
claimedDatasetId = dataset._id;

const { getModelConfig } = await import("./config/models.js");
const modelConfig = await getModelConfig(ownerId);
const run = await populateWorkflow.createRun();
const controller = registerDataset(dataset._id);

void runPopulateWorkflowInBackground({
input: {
datasetId: dataset._id,
datasetName: dataset.name,
description: dataset.description,
maxRowCount: dataset.maxRowCount ?? 100,
columns: dataset.columns,
},
run,
controller,
authorizedUserId: ownerId,
logger: req.log,
clerk: req.server.clerk,
modelConfig,
});

return reply.code(202).send({ success: true, runId: run.runId });
} catch (err) {
req.log.error(err, "CLI populate failed");
if (claimedDatasetId) {
try {
await setDatasetPopulateStatus(
claimedDatasetId,
"failed",
statusErrorMessage(err),
);
} catch (statusErr) {
req.log.error(
{ err: statusErr, datasetId: claimedDatasetId },
"Failed to release CLI populate dataset claim",
);
}
}
return reply.code(502).send({ error: "Failed to populate dataset" });
}
});

fastify.post("/cli/datasets/:datasetId/stop", async (req, reply) => {
const ownerId = requireLocalCli(reply);
if (!ownerId) return;

const params = cliDatasetIdParamsSchema.safeParse(req.params);
if (!params.success) {
return reply.code(400).send({ error: "datasetId is required" });
}

try {
const dataset = await convex.query(internal.datasets.getOwnedInternal, {
id: params.data.datasetId,
ownerId,
});
if (!dataset) return reply.code(404).send({ error: "Dataset not found" });
if (dataset.status !== "building" && dataset.status !== "updating") {
return reply.code(409).send({ error: "Dataset is not currently running" });
}

const aborted = abortDataset(dataset._id);
if (!aborted) {
req.log.warn(
{ datasetId: dataset._id },
"CLI stop requested for orphaned dataset (no active run registered); forcing to failed",
);
try {
if (dataset.status === "updating") {
await convex.mutation(internal.datasetRows.clearAllPendingUpdateStatus, {
datasetId: dataset._id,
});
}
await setDatasetPopulateStatus(
dataset._id,
"failed",
"Run interrupted: no active local CLI run registered",
);
} catch (statusErr) {
req.log.error(
{ err: statusErr, datasetId: dataset._id },
"Failed to force-transition orphaned CLI dataset to failed",
);
}
return reply.code(200).send({ success: true });
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
return reply.code(202).send({ success: true });
} catch (err) {
req.log.error(err, "CLI stop failed");
return reply.code(502).send({ error: "Failed to stop dataset run" });
}
});

// ────────────────────────────────────────────────────────────────────────
// Protected routes — gated by Clerk JWT verification
// ────────────────────────────────────────────────────────────────────────
Expand Down
16 changes: 16 additions & 0 deletions frontend/convex/datasetRows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ export const listByDataset = query({
},
});

export const listByOwnedDatasetInternal = internalQuery({
args: {
datasetId: v.id("datasets"),
ownerId: v.string(),
},
handler: async (ctx, args) => {
const dataset = await ctx.db.get(args.datasetId);
if (!dataset || dataset.ownerId !== args.ownerId) return null;

return await ctx.db
.query("datasetRows")
.withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId))
.collect();
},
});

/**
* Row writes are SYSTEM-LEVEL operations performed by the agent runner,
* never by end users directly. They are exposed as `internalMutation` so
Expand Down
56 changes: 56 additions & 0 deletions frontend/convex/datasets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,62 @@ export const create = mutation({
},
});

export const createForOwnerInternal = internalMutation({
args: {
ownerId: v.string(),
name: v.string(),
description: v.string(),
refreshCadence: refreshCadenceValidator,
maxRowCount: v.number(),
columns: v.array(columnValidator),
retrievalStrategy: v.optional(
v.union(
v.literal("search_fetch"),
v.literal("browser"),
v.literal("hybrid")
)
),
sourceHint: v.optional(v.string()),
},
handler: async (ctx, args) => {
assertNotReservedOwner(args.ownerId);
validateMaxRowCount(args.maxRowCount);
await requireQuotaRemaining(ctx, args.ownerId, args.maxRowCount);

return await ctx.db.insert("datasets", {
...args,
status: "paused",
visibility: "private",
rowCount: 0,
refreshEnabled: args.refreshCadence !== "manual",
nextRefreshAt: nextRefreshAtFor(args.refreshCadence, Date.now()),
});
},
});

export const listByOwnerInternal = internalQuery({
args: { ownerId: v.string() },
handler: async (ctx, args) => {
return await ctx.db
.query("datasets")
.withIndex("by_owner", (q) => q.eq("ownerId", args.ownerId))
.order("desc")
.collect();
},
});

export const getOwnedInternal = internalQuery({
args: {
id: v.id("datasets"),
ownerId: v.string(),
},
handler: async (ctx, args) => {
const dataset = await ctx.db.get(args.id);
if (!dataset || dataset.ownerId !== args.ownerId) return null;
return dataset;
},
});

export const updateRefreshSettings = mutation({
args: {
id: v.id("datasets"),
Expand Down
Loading