Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import clerkAuthPlugin, { requireAuth, getUserEmail } from "./clerk-auth.js";
import { inferSchema } from "./pipeline/schema-inference.js";
import { datasetContextSchema } from "./pipeline/populate.js";
import { populateWorkflow } from "./mastra/workflows/populate.js";
import { updateWorkflow } from "./mastra/workflows/update.js";
import { convex, internal } from "./convex.js";
import { sendTransactionalEmail } from "./email/send.js";
import { datasetReadyTemplate } from "./email/templates/dataset-ready.js";
Expand Down Expand Up @@ -245,6 +246,54 @@ await fastify.register(async (instance) => {
return reply.code(502).send({ error: "Failed to populate dataset. Please try again." });
}
});

instance.post("/update", async (req, reply) => {
const parsed = datasetContextSchema.safeParse(req.body);
if (!parsed.success) {
return reply.code(400).send({
error: "Invalid request",
details: parsed.error.flatten().fieldErrors,
});
}

try {
const dataset = await convex.query(internal.datasets.getInternal, {
id: parsed.data.datasetId,
});
if (!dataset) {
return reply.code(404).send({ error: "Dataset not found" });
}
if (dataset.ownerId !== req.auth.userId) {
return reply.code(403).send({ error: "Not authorized to update this dataset" });
}

const run = await updateWorkflow.createRun();
const result = await run.start({
inputData: {
...parsed.data,
authContext: {
authorizedUserId: req.auth!.userId,
workflowRunId: run.runId,
},
},
});

req.log.info({ workflowStatus: result.status }, "Update workflow completed");

if (result.status !== "success") {
throw new Error(`Workflow ended with status: ${result.status}`);
}

return { success: true, result: result.result };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (msg.includes("validator") || msg.includes("Invalid")) {
return reply.code(400).send({ error: "Invalid datasetId" });
}
req.log.error(err, "Update failed");
return reply.code(502).send({ error: "Failed to update dataset. Please try again." });
}
});
});

try {
Expand Down
3 changes: 2 additions & 1 deletion backend/src/mastra/agents/investigate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ export function buildInvestigateAgent(
id: "investigate-agent",
name: "Dataset Investigate Agent",
instructions: buildInvestigateInstructions(columns),
model: openrouter("anthropic/claude-sonnet-4-6"),
model: openrouter("moonshotai/kimi-k2-0905"),

tools: {
insert_row,
list_rows,
Expand Down
9 changes: 6 additions & 3 deletions backend/src/mastra/agents/populate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ const INSTRUCTIONS = `You fill datasets by finding real leads and handing them t
1. Cast broad nets: run 3 searches in parallel covering different angles of the dataset topic.
Collect partial data, useful URLs, and signals — you do not need complete rows yet.

2. Hand off leads: call investigate_row for each promising lead (up to 3 in parallel).
2. Hand off leads: call investigate_row for each promising lead.
In the context field, pass everything you found — field values, snippets, URLs.
- First batch: exactly 3 in parallel. Wait for all to finish and read every clue.
- Second batch: up to 10 in parallel. Wait for all to finish and read every clue.
- All subsequent batches: no limit — spawn as many as you have good leads.

3. Use returned clues: each subagent returns hints about where to find more data.
Feed those clues into the next batch of investigate_row calls.

4. Keep going until you have 10 inserted rows or have exhausted real leads.
4. Keep going until you have 20 inserted rows or have exhausted real leads.

Do not insert rows yourself — only investigate_row subagents can write to the dataset.
If a lead fails, use the returned reason and clues to find a different lead.`;
Expand All @@ -43,7 +46,7 @@ export function buildPopulateAgent(
id: "populate-agent",
name: "Dataset Populate Orchestrator",
instructions: INSTRUCTIONS,
model: openrouter("anthropic/claude-sonnet-4-6"),
model: openrouter("moonshotai/kimi-k2-0905"),
tools: {
search_web: searchWebTool,
fetch_page: fetchPageTool,
Expand Down
3 changes: 2 additions & 1 deletion backend/src/mastra/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Mastra } from "@mastra/core/mastra";
import { inferSchemaWorkflow } from "./workflows/infer-schema.js";
import { populateWorkflow } from "./workflows/populate.js";
import { updateWorkflow } from "./workflows/update.js";

/**
* Mastra registry.
Expand All @@ -14,5 +15,5 @@ import { populateWorkflow } from "./workflows/populate.js";
* registered, so Mastra Studio can inspect it end-to-end.
*/
export const mastra = new Mastra({
workflows: { inferSchemaWorkflow, populateWorkflow },
workflows: { inferSchemaWorkflow, populateWorkflow, updateWorkflow },
});
10 changes: 8 additions & 2 deletions backend/src/mastra/workflows/populate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,14 @@ const agentStep = createStep({
inputData.authContext,
inputData.columns,
);
const result = await agent.generate(inputData.prompt, { maxSteps: 80 });
return { text: result.text };
try {
const result = await agent.generate(inputData.prompt, { maxSteps: 80 });
return { text: result.text };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[populate-agent] agent.generate failed: ${msg}`);
return { text: `Agent failed: ${msg}` };
Comment on lines +126 to +128
}
},
});

Expand Down
29 changes: 29 additions & 0 deletions backend/src/mastra/workflows/update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
import { datasetContextSchema } from "../../pipeline/populate.js";
import { authContextSchema } from "./populate.js";

export const updateInputSchema = datasetContextSchema.extend({
authContext: authContextSchema,
});
export type UpdateInput = z.infer<typeof updateInputSchema>;

const updateStep = createStep({
id: "update-dataset",
inputSchema: updateInputSchema,
outputSchema: z.object({ message: z.string() }),
execute: async ({ inputData }) => {
console.log(
`[update-dataset] triggered dataset=${inputData.datasetId} user=${inputData.authContext.authorizedUserId} run=${inputData.authContext.workflowRunId}`,
);
return { message: "Update workflow triggered — logic not yet implemented." };
},
});

export const updateWorkflow = createWorkflow({
id: "update-workflow",
inputSchema: updateInputSchema,
outputSchema: z.object({ message: z.string() }),
})
.then(updateStep)
.commit();
35 changes: 34 additions & 1 deletion frontend/app/dataset/[id]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { useSelection } from "@/components/table/use-selection";
import { ThemeToggle } from "@/components/ThemeToggle";
import { StatusBadge } from "@/components/dataset/StatusBadge";
import { downloadCSV, downloadXLSX } from "@/lib/export";
import { populate } from "@/lib/backend";
import { populate, update } from "@/lib/backend";
import { EVENTS, captureException, track } from "@/lib/analytics";

export default function DatasetPage() {
Expand All @@ -21,6 +21,7 @@ export default function DatasetPage() {
const { userId, getToken } = useAuth();
const [exporting, setExporting] = useState<"csv" | "xlsx" | null>(null);
const [populating, setPopulating] = useState(false);
const [updating, setUpdating] = useState(false);

const datasetId = params.id as Id<"datasets">;
const dataset = useQuery(
Expand Down Expand Up @@ -91,6 +92,31 @@ export default function DatasetPage() {
}
}

async function handleUpdate() {
if (!dataset || updating) return;
setUpdating(true);
try {
const token = await getToken();
if (!token) throw new Error("Not authenticated");

await update(
dataset._id,
dataset.name,
dataset.description,
dataset.columns,
token,
);
} catch (err) {
console.error("[update] failed", err);
captureException(err, {
operation: "dataset_update",
datasetId: dataset._id,
});
Comment on lines +102 to +114
} finally {
setUpdating(false);
}
}

async function handlePopulate() {
if (!dataset || populating) return;
setPopulating(true);
Expand Down Expand Up @@ -188,6 +214,13 @@ export default function DatasetPage() {
>
{xlsxLabel}
</button>
<button
onClick={handleUpdate}
disabled={updating}
className="border border-border px-3 py-1.5 text-xs font-medium text-foreground hover:bg-foreground/[0.03] transition-colors disabled:opacity-40 disabled:cursor-not-allowed"
>
{updating ? "Updating…" : "Update Dataset"}
</button>
<button
onClick={handlePopulate}
disabled={populating}
Expand Down
25 changes: 25 additions & 0 deletions frontend/lib/backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,28 @@ export async function populate(

return res.json();
}

export async function update(
datasetId: string,
datasetName: string,
description: string,
columns: PopulateColumn[],
token: string,
): Promise<PopulateResult> {
const res = await fetch(`${BACKEND_URL}/update`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${token}`,
},
body: JSON.stringify({ datasetId, datasetName, description, columns }),
});

if (!res.ok) {
const body = await res.json().catch(() => null);
const message = body?.error || `Backend error (${res.status})`;
throw new Error(message);
}

return res.json();
}