diff --git a/backend/src/index.ts b/backend/src/index.ts index 6baf22c..8c9f195 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -6,6 +6,7 @@ import clerkAuthPlugin, { requireAuth, getUserEmail } from "./clerk-auth.js"; import { inferSchema } from "./pipeline/schema-inference.js"; import { datasetContextSchema } from "./pipeline/populate.js"; import { populateWorkflow } from "./mastra/workflows/populate.js"; +import { updateWorkflow } from "./mastra/workflows/update.js"; import { convex, internal } from "./convex.js"; import { sendTransactionalEmail } from "./email/send.js"; import { datasetReadyTemplate } from "./email/templates/dataset-ready.js"; @@ -245,6 +246,54 @@ await fastify.register(async (instance) => { return reply.code(502).send({ error: "Failed to populate dataset. Please try again." }); } }); + + instance.post("/update", async (req, reply) => { + const parsed = datasetContextSchema.safeParse(req.body); + if (!parsed.success) { + return reply.code(400).send({ + error: "Invalid request", + details: parsed.error.flatten().fieldErrors, + }); + } + + try { + const dataset = await convex.query(internal.datasets.getInternal, { + id: parsed.data.datasetId, + }); + if (!dataset) { + return reply.code(404).send({ error: "Dataset not found" }); + } + if (dataset.ownerId !== req.auth.userId) { + return reply.code(403).send({ error: "Not authorized to update this dataset" }); + } + + const run = await updateWorkflow.createRun(); + const result = await run.start({ + inputData: { + ...parsed.data, + authContext: { + authorizedUserId: req.auth!.userId, + workflowRunId: run.runId, + }, + }, + }); + + req.log.info({ workflowStatus: result.status }, "Update workflow completed"); + + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + return { success: true, result: result.result }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + if (msg.includes("validator") || msg.includes("Invalid")) { + return reply.code(400).send({ error: "Invalid datasetId" }); + } + req.log.error(err, "Update failed"); + return reply.code(502).send({ error: "Failed to update dataset. Please try again." }); + } + }); }); try { diff --git a/backend/src/mastra/agents/investigate.ts b/backend/src/mastra/agents/investigate.ts index eab2747..c2d3361 100644 --- a/backend/src/mastra/agents/investigate.ts +++ b/backend/src/mastra/agents/investigate.ts @@ -63,7 +63,8 @@ export function buildInvestigateAgent( id: "investigate-agent", name: "Dataset Investigate Agent", instructions: buildInvestigateInstructions(columns), - model: openrouter("anthropic/claude-sonnet-4-6"), + model: openrouter("moonshotai/kimi-k2-0905"), + tools: { insert_row, list_rows, diff --git a/backend/src/mastra/agents/populate.ts b/backend/src/mastra/agents/populate.ts index 5551b52..febce00 100644 --- a/backend/src/mastra/agents/populate.ts +++ b/backend/src/mastra/agents/populate.ts @@ -14,13 +14,16 @@ const INSTRUCTIONS = `You fill datasets by finding real leads and handing them t 1. Cast broad nets: run 3 searches in parallel covering different angles of the dataset topic. Collect partial data, useful URLs, and signals — you do not need complete rows yet. -2. Hand off leads: call investigate_row for each promising lead (up to 3 in parallel). +2. Hand off leads: call investigate_row for each promising lead. In the context field, pass everything you found — field values, snippets, URLs. + - First batch: exactly 3 in parallel. Wait for all to finish and read every clue. + - Second batch: up to 10 in parallel. Wait for all to finish and read every clue. + - All subsequent batches: no limit — spawn as many as you have good leads. 3. Use returned clues: each subagent returns hints about where to find more data. Feed those clues into the next batch of investigate_row calls. -4. Keep going until you have 10 inserted rows or have exhausted real leads. +4. Keep going until you have 20 inserted rows or have exhausted real leads. Do not insert rows yourself — only investigate_row subagents can write to the dataset. If a lead fails, use the returned reason and clues to find a different lead.`; @@ -43,7 +46,7 @@ export function buildPopulateAgent( id: "populate-agent", name: "Dataset Populate Orchestrator", instructions: INSTRUCTIONS, - model: openrouter("anthropic/claude-sonnet-4-6"), + model: openrouter("moonshotai/kimi-k2-0905"), tools: { search_web: searchWebTool, fetch_page: fetchPageTool, diff --git a/backend/src/mastra/index.ts b/backend/src/mastra/index.ts index ede5535..34d97e6 100644 --- a/backend/src/mastra/index.ts +++ b/backend/src/mastra/index.ts @@ -1,6 +1,7 @@ import { Mastra } from "@mastra/core/mastra"; import { inferSchemaWorkflow } from "./workflows/infer-schema.js"; import { populateWorkflow } from "./workflows/populate.js"; +import { updateWorkflow } from "./workflows/update.js"; /** * Mastra registry. @@ -14,5 +15,5 @@ import { populateWorkflow } from "./workflows/populate.js"; * registered, so Mastra Studio can inspect it end-to-end. */ export const mastra = new Mastra({ - workflows: { inferSchemaWorkflow, populateWorkflow }, + workflows: { inferSchemaWorkflow, populateWorkflow, updateWorkflow }, }); diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index 1b5b0c1..ae518af 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -119,8 +119,14 @@ const agentStep = createStep({ inputData.authContext, inputData.columns, ); - const result = await agent.generate(inputData.prompt, { maxSteps: 80 }); - return { text: result.text }; + try { + const result = await agent.generate(inputData.prompt, { maxSteps: 80 }); + return { text: result.text }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[populate-agent] agent.generate failed: ${msg}`); + return { text: `Agent failed: ${msg}` }; + } }, }); diff --git a/backend/src/mastra/workflows/update.ts b/backend/src/mastra/workflows/update.ts new file mode 100644 index 0000000..b320bd6 --- /dev/null +++ b/backend/src/mastra/workflows/update.ts @@ -0,0 +1,29 @@ +import { createStep, createWorkflow } from "@mastra/core/workflows"; +import { z } from "zod"; +import { datasetContextSchema } from "../../pipeline/populate.js"; +import { authContextSchema } from "./populate.js"; + +export const updateInputSchema = datasetContextSchema.extend({ + authContext: authContextSchema, +}); +export type UpdateInput = z.infer; + +const updateStep = createStep({ + id: "update-dataset", + inputSchema: updateInputSchema, + outputSchema: z.object({ message: z.string() }), + execute: async ({ inputData }) => { + console.log( + `[update-dataset] triggered dataset=${inputData.datasetId} user=${inputData.authContext.authorizedUserId} run=${inputData.authContext.workflowRunId}`, + ); + return { message: "Update workflow triggered — logic not yet implemented." }; + }, +}); + +export const updateWorkflow = createWorkflow({ + id: "update-workflow", + inputSchema: updateInputSchema, + outputSchema: z.object({ message: z.string() }), +}) + .then(updateStep) + .commit(); diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index 3a158ae..b5d9048 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -12,7 +12,7 @@ import { useSelection } from "@/components/table/use-selection"; import { ThemeToggle } from "@/components/ThemeToggle"; import { StatusBadge } from "@/components/dataset/StatusBadge"; import { downloadCSV, downloadXLSX } from "@/lib/export"; -import { populate } from "@/lib/backend"; +import { populate, update } from "@/lib/backend"; import { EVENTS, captureException, track } from "@/lib/analytics"; export default function DatasetPage() { @@ -21,6 +21,7 @@ export default function DatasetPage() { const { userId, getToken } = useAuth(); const [exporting, setExporting] = useState<"csv" | "xlsx" | null>(null); const [populating, setPopulating] = useState(false); + const [updating, setUpdating] = useState(false); const datasetId = params.id as Id<"datasets">; const dataset = useQuery( @@ -91,6 +92,31 @@ export default function DatasetPage() { } } + async function handleUpdate() { + if (!dataset || updating) return; + setUpdating(true); + try { + const token = await getToken(); + if (!token) throw new Error("Not authenticated"); + + await update( + dataset._id, + dataset.name, + dataset.description, + dataset.columns, + token, + ); + } catch (err) { + console.error("[update] failed", err); + captureException(err, { + operation: "dataset_update", + datasetId: dataset._id, + }); + } finally { + setUpdating(false); + } + } + async function handlePopulate() { if (!dataset || populating) return; setPopulating(true); @@ -188,6 +214,13 @@ export default function DatasetPage() { > {xlsxLabel} +