From 92f3387e10d6e27ed552288af5319dfa9b5798a3 Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 16:46:44 -0700 Subject: [PATCH 1/8] Add cost tracking for populate workflow runs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Records search/fetch/investigate call counts, token usage (input/output), and timing per populate run to a new Convex `populateRuns` table. Metrics are counted by scanning result.steps[].toolCalls after agent.generate() returns — no factory wrappers needed. The save is fire-and-forget in the finally block so it never blocks or fails the workflow. New files: - backend/src/mastra/run-metrics.ts — RunMetrics accumulator class - backend/src/mastra/save-run-metrics.ts — writes completed run to Convex - frontend/convex/populateRuns.ts — insert mutation + list/get queries Co-Authored-By: Claude Sonnet 4.6 --- backend/src/mastra/agents/populate.ts | 5 +- backend/src/mastra/run-metrics.ts | 53 ++++++++++++ backend/src/mastra/save-run-metrics.ts | 52 ++++++++++++ backend/src/mastra/tools/investigate-tool.ts | 15 ++++ backend/src/mastra/workflows/populate.ts | 44 +++++++++- frontend/convex/populateRuns.ts | 86 ++++++++++++++++++++ frontend/convex/schema.ts | 43 ++++++++++ 7 files changed, 295 insertions(+), 3 deletions(-) create mode 100644 backend/src/mastra/run-metrics.ts create mode 100644 backend/src/mastra/save-run-metrics.ts create mode 100644 frontend/convex/populateRuns.ts diff --git a/backend/src/mastra/agents/populate.ts b/backend/src/mastra/agents/populate.ts index 5fa792b..b2464bb 100644 --- a/backend/src/mastra/agents/populate.ts +++ b/backend/src/mastra/agents/populate.ts @@ -4,6 +4,7 @@ import { buildSubagentTool } from "../tools/investigate-tool.js"; import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; import type { AuthContext } from "../workflows/populate.js"; import type { PopulateColumn } from "../../pipeline/populate.js"; +import type { RunMetrics } from "../run-metrics.js"; const openrouter = createOpenRouter({ apiKey: process.env.OPENROUTER_API_KEY!, @@ -41,12 +42,13 @@ export function buildPopulateAgent( authorizedDatasetId: string, authContext: AuthContext, columns: PopulateColumn[], + metrics?: RunMetrics, ): Agent { return new Agent({ id: "populate-agent", name: "Dataset Populate Orchestrator", instructions: INSTRUCTIONS, - model: openrouter("qwen/qwen3.7-max"), + model: openrouter("deepseek/deepseek-v4-pro"), tools: { search_web: searchWebTool, fetch_page: fetchPageTool, @@ -54,6 +56,7 @@ export function buildPopulateAgent( authorizedDatasetId, authContext, columns, + metrics, ), }, }); diff --git a/backend/src/mastra/run-metrics.ts b/backend/src/mastra/run-metrics.ts new file mode 100644 index 0000000..70c0c61 --- /dev/null +++ b/backend/src/mastra/run-metrics.ts @@ -0,0 +1,53 @@ +/** + * Per-run metrics collector for the populate workflow. + * + * A single RunMetrics instance is created at the start of each agentStep, + * passed by reference into every tool factory and agent builder, and read + * once at the end to write the populateRuns Convex record. + * + * All operations are synchronous integer increments or field reads — zero + * I/O, zero meaningful overhead on the hot path. + */ + +interface AgentResult { + usage?: { inputTokens?: number; outputTokens?: number }; + steps?: unknown[]; +} + +export class RunMetrics { + searchCalls = 0; + fetchCalls = 0; + /** investigate_row tool calls dispatched by the orchestrator. */ + investigateCalls = 0; + /** Rows successfully inserted across all investigate subagents. */ + rowsInserted = 0; + + readonly orchestrator = { inputTokens: 0, outputTokens: 0, steps: 0 }; + readonly investigate = { + inputTokens: 0, + outputTokens: 0, + steps: 0, + runs: 0, + }; + + addOrchestratorResult(result: AgentResult): void { + this.orchestrator.inputTokens += result.usage?.inputTokens ?? 0; + this.orchestrator.outputTokens += result.usage?.outputTokens ?? 0; + this.orchestrator.steps += result.steps?.length ?? 0; + } + + addInvestigateResult(result: AgentResult): void { + this.investigate.inputTokens += result.usage?.inputTokens ?? 0; + this.investigate.outputTokens += result.usage?.outputTokens ?? 0; + this.investigate.steps += result.steps?.length ?? 0; + this.investigate.runs += 1; + } + + totals(): { inputTokens: number; outputTokens: number } { + return { + inputTokens: this.orchestrator.inputTokens + this.investigate.inputTokens, + outputTokens: + this.orchestrator.outputTokens + this.investigate.outputTokens, + }; + } +} diff --git a/backend/src/mastra/save-run-metrics.ts b/backend/src/mastra/save-run-metrics.ts new file mode 100644 index 0000000..d10440f --- /dev/null +++ b/backend/src/mastra/save-run-metrics.ts @@ -0,0 +1,52 @@ +import { convex, internal } from "../convex.js"; +import type { RunMetrics } from "./run-metrics.js"; + +export interface SaveRunMetricsInput { + workflowRunId: string; + datasetId: string; + userId: string; + startedAt: number; + finishedAt: number; + metrics: RunMetrics; + status: "success" | "error"; + error?: string; + isBenchmark?: boolean; +} + +/** + * Persist a completed run's metrics to the populateRuns Convex table. + * + * Called from the agentStep finally-block as a fire-and-forget operation — + * any error here is logged but must never propagate to the populate workflow. + */ +export async function saveRunMetrics(input: SaveRunMetricsInput): Promise { + const totals = input.metrics.totals(); + await convex.mutation(internal.populateRuns.insert, { + workflowRunId: input.workflowRunId, + datasetId: input.datasetId, + userId: input.userId, + startedAt: input.startedAt, + finishedAt: input.finishedAt, + durationMs: input.finishedAt - input.startedAt, + + searchCalls: input.metrics.searchCalls, + fetchCalls: input.metrics.fetchCalls, + investigateCalls: input.metrics.investigateCalls, + rowsInserted: input.metrics.rowsInserted, + + tokensInput: totals.inputTokens, + tokensOutput: totals.outputTokens, + + orchestratorTokensInput: input.metrics.orchestrator.inputTokens, + orchestratorTokensOutput: input.metrics.orchestrator.outputTokens, + orchestratorSteps: input.metrics.orchestrator.steps, + investigateTokensInput: input.metrics.investigate.inputTokens, + investigateTokensOutput: input.metrics.investigate.outputTokens, + investigateSteps: input.metrics.investigate.steps, + investigateRuns: input.metrics.investigate.runs, + + status: input.status, + error: input.error, + isBenchmark: input.isBenchmark, + }); +} diff --git a/backend/src/mastra/tools/investigate-tool.ts b/backend/src/mastra/tools/investigate-tool.ts index 7746ef2..4560b6f 100644 --- a/backend/src/mastra/tools/investigate-tool.ts +++ b/backend/src/mastra/tools/investigate-tool.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { buildInvestigateAgent } from "../agents/investigate.js"; import type { AuthContext } from "../workflows/populate.js"; import type { PopulateColumn } from "../../pipeline/populate.js"; +import type { RunMetrics } from "../run-metrics.js"; const investigateInputSchema = z.object({ entity_hint: z @@ -72,6 +73,7 @@ export function buildSubagentTool( authorizedDatasetId: string, authContext: AuthContext, columns: PopulateColumn[], + metrics?: RunMetrics, ) { return createTool({ id: "run_subagent", @@ -80,6 +82,7 @@ export function buildSubagentTool( inputSchema: investigateInputSchema, outputSchema: investigateOutputSchema, execute: async ({ entity_hint, primary_keys, context, urls, notes }) => { + if (metrics) metrics.investigateCalls++; console.log( `[run_subagent] spawning subagent user=${authContext.authorizedUserId} run=${authContext.workflowRunId} dataset=${authorizedDatasetId} entity="${entity_hint}" pk=${JSON.stringify(primary_keys)}`, ); @@ -110,7 +113,19 @@ Context (partial data already found): ${context}${urlsBlock}${notesBlock}`; const result = await agent.generate(prompt, { maxSteps: 10 }); + if (metrics) { + for (const step of (result.steps ?? []) as any[]) { + for (const tc of (step.toolCalls ?? []) as any[]) { + if (tc.toolName === "search_web") metrics.searchCalls++; + else if (tc.toolName === "fetch_page") metrics.fetchCalls++; + } + } + metrics.addInvestigateResult(result); + } + const parsed = parseInvestigateResult(result.text); + if (metrics && parsed.inserted) metrics.rowsInserted++; + console.log( `[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"}` + (parsed.row_summary ? `\n summary: ${parsed.row_summary}` : "") + diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index 924457d..10f216a 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -5,6 +5,8 @@ import { createOpenRouter } from "@openrouter/ai-sdk-provider"; import { datasetContextSchema, populateColumnSchema } from "../../pipeline/populate.js"; import { convex, internal } from "../../convex.js"; import { buildPopulateAgent } from "../agents/populate.js"; +import { RunMetrics } from "../run-metrics.js"; +import { saveRunMetrics } from "../save-run-metrics.js"; /** * Server-set auth/run context threaded through every step. @@ -28,6 +30,7 @@ import { buildPopulateAgent } from "../agents/populate.js"; export const authContextSchema = z.object({ authorizedUserId: z.string().min(1), workflowRunId: z.string().min(1), + isBenchmark: z.boolean().optional(), }); export type AuthContext = z.infer; @@ -208,24 +211,61 @@ For each lead you find, call run_subagent with the primary key values and any co * capability scope; see tools/dataset-tools.ts). So this step does what * Mastra's agent-as-step adapter would do internally: build the agent, * call `.generate(prompt, { maxSteps })`, return the text. + * + * A RunMetrics instance is created here, threaded into every tool factory + * and agent builder, and saved to Convex in the finally block. The save is + * fire-and-forget — errors are logged but never propagate to the workflow. */ const agentStep = createStep({ id: "populate-agent", inputSchema: buildPromptOutputSchema, outputSchema: z.object({ text: z.string() }), execute: async ({ inputData }) => { + const metrics = new RunMetrics(); + const startedAt = Date.now(); + let status: "success" | "error" = "success"; + let errorMsg: string | undefined; + const agent = buildPopulateAgent( inputData.authorizedDatasetId, inputData.authContext, inputData.columns, + metrics, ); + try { const result = await agent.generate(inputData.prompt, { maxSteps: 80 }); + metrics.addOrchestratorResult(result); + for (const step of (result.steps ?? []) as any[]) { + for (const tc of (step.toolCalls ?? []) as any[]) { + if (tc.toolName === "search_web") metrics.searchCalls++; + else if (tc.toolName === "fetch_page") metrics.fetchCalls++; + } + } return { text: result.text }; } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - console.error(`[populate-agent] agent.generate failed: ${msg}`); + status = "error"; + errorMsg = err instanceof Error ? err.message : String(err); + console.error(`[populate-agent] agent.generate failed: ${errorMsg}`); throw err; + } finally { + const finishedAt = Date.now(); + void saveRunMetrics({ + workflowRunId: inputData.authContext.workflowRunId, + datasetId: inputData.authorizedDatasetId, + userId: inputData.authContext.authorizedUserId, + startedAt, + finishedAt, + metrics, + status, + error: errorMsg, + isBenchmark: inputData.authContext.isBenchmark, + }).catch((err) => + console.error( + `[populate-agent] metrics save failed run=${inputData.authContext.workflowRunId}:`, + err, + ), + ); } }, }); diff --git a/frontend/convex/populateRuns.ts b/frontend/convex/populateRuns.ts new file mode 100644 index 0000000..97adec4 --- /dev/null +++ b/frontend/convex/populateRuns.ts @@ -0,0 +1,86 @@ +import { internalMutation, internalQuery } from "./_generated/server.js"; +import { v } from "convex/values"; + +/** + * Insert a populate-run metrics record. + * + * Called by the backend agent runner at the end of every populate workflow + * run (success or error). Never called from the browser. + */ +export const insert = internalMutation({ + args: { + workflowRunId: v.string(), + datasetId: v.string(), + userId: v.string(), + startedAt: v.number(), + finishedAt: v.number(), + durationMs: v.number(), + + searchCalls: v.number(), + fetchCalls: v.number(), + investigateCalls: v.number(), + rowsInserted: v.number(), + + tokensInput: v.number(), + tokensOutput: v.number(), + + orchestratorTokensInput: v.number(), + orchestratorTokensOutput: v.number(), + orchestratorSteps: v.number(), + investigateTokensInput: v.number(), + investigateTokensOutput: v.number(), + investigateSteps: v.number(), + investigateRuns: v.number(), + + status: v.union(v.literal("success"), v.literal("error")), + error: v.optional(v.string()), + isBenchmark: v.optional(v.boolean()), + }, + handler: async (ctx, args) => { + await ctx.db.insert("populateRuns", args); + }, +}); + +/** + * Fetch a single run by its workflowRunId. Used by the benchmark runner to + * retrieve metrics after a workflow completes. + */ +export const getByWorkflowRunId = internalQuery({ + args: { workflowRunId: v.string() }, + handler: async (ctx, args) => { + return await ctx.db + .query("populateRuns") + .withIndex("by_workflow_run", (q) => + q.eq("workflowRunId", args.workflowRunId), + ) + .first(); + }, +}); + +/** + * List all runs for a dataset, newest first. + */ +export const listByDataset = internalQuery({ + args: { datasetId: v.string() }, + handler: async (ctx, args) => { + const runs = await ctx.db + .query("populateRuns") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + return runs.sort((a, b) => b.startedAt - a.startedAt); + }, +}); + +/** + * List all runs for a user, newest first. + */ +export const listByUser = internalQuery({ + args: { userId: v.string() }, + handler: async (ctx, args) => { + const runs = await ctx.db + .query("populateRuns") + .withIndex("by_user", (q) => q.eq("userId", args.userId)) + .collect(); + return runs.sort((a, b) => b.startedAt - a.startedAt); + }, +}); diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index 11e635a..65feebc 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -99,4 +99,47 @@ export default defineSchema({ // "before current period", which forces a reset on next write. periodStart: v.optional(v.number()), }).index("by_user", ["userId"]), + + // One row per populate workflow run. Written once at the end of each run + // (success or error) by the backend agent runner — never by the frontend. + // Tracks tool-call counts, token usage, and timing so runs can be + // compared across datasets, users, and benchmark sessions. + populateRuns: defineTable({ + workflowRunId: v.string(), + // v.string() (not v.id) so benchmark runs can use synthetic dataset ids + // without needing a real Convex dataset document. + datasetId: v.string(), + userId: v.string(), + startedAt: v.number(), + finishedAt: v.number(), + durationMs: v.number(), + + // Tool-call counts + searchCalls: v.number(), + fetchCalls: v.number(), + investigateCalls: v.number(), + rowsInserted: v.number(), + + // Token usage — totals across all agent invocations in this run + tokensInput: v.number(), + tokensOutput: v.number(), + + // Breakdown by agent tier + orchestratorTokensInput: v.number(), + orchestratorTokensOutput: v.number(), + orchestratorSteps: v.number(), + investigateTokensInput: v.number(), + investigateTokensOutput: v.number(), + investigateSteps: v.number(), + investigateRuns: v.number(), + + status: v.union(v.literal("success"), v.literal("error")), + error: v.optional(v.string()), + + // True when written by the benchmark runner rather than a real user session. + isBenchmark: v.optional(v.boolean()), + }) + .index("by_dataset", ["datasetId"]) + .index("by_user", ["userId"]) + .index("by_workflow_run", ["workflowRunId"]), }); From 31ae767a4ef0745d7303159889ccf941ae24f598 Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 17:35:43 -0700 Subject: [PATCH 2/8] Address CodeRabbit feedback on cost tracking - Move buildPopulateAgent into the try block so any construction error is caught and the finally block (metrics save) always executes - Add status/error consistency guard to populateRuns.insert so a "success" record can't carry an error string and an "error" record can't be written without a message Co-Authored-By: Claude Sonnet 4.6 --- backend/src/mastra/workflows/populate.ts | 13 ++++++------- frontend/convex/populateRuns.ts | 6 ++++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index 10f216a..f5bd2a5 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -226,14 +226,13 @@ const agentStep = createStep({ let status: "success" | "error" = "success"; let errorMsg: string | undefined; - const agent = buildPopulateAgent( - inputData.authorizedDatasetId, - inputData.authContext, - inputData.columns, - metrics, - ); - try { + const agent = buildPopulateAgent( + inputData.authorizedDatasetId, + inputData.authContext, + inputData.columns, + metrics, + ); const result = await agent.generate(inputData.prompt, { maxSteps: 80 }); metrics.addOrchestratorResult(result); for (const step of (result.steps ?? []) as any[]) { diff --git a/frontend/convex/populateRuns.ts b/frontend/convex/populateRuns.ts index 97adec4..851c60c 100644 --- a/frontend/convex/populateRuns.ts +++ b/frontend/convex/populateRuns.ts @@ -37,6 +37,12 @@ export const insert = internalMutation({ isBenchmark: v.optional(v.boolean()), }, handler: async (ctx, args) => { + if (args.status === "success" && args.error) { + throw new Error("populateRuns.insert: error must be absent on a successful run"); + } + if (args.status === "error" && !args.error) { + throw new Error("populateRuns.insert: error message is required on a failed run"); + } await ctx.db.insert("populateRuns", args); }, }); From 031094c7ee698951f96cc832086afdfbb9e33a88 Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 21:47:31 -0700 Subject: [PATCH 3/8] =?UTF-8?q?Rename=20populateRuns=20=E2=86=92=20runStat?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Renames the Convex table and module that stores per-run metrics from populateRuns to runStats for a cleaner, more descriptive name. - frontend/convex/populateRuns.ts → runStats.ts - schema.ts: table definition populateRuns → runStats - save-run-metrics.ts: internal.populateRuns.insert → internal.runStats.insert - run-metrics.ts: update comment Co-Authored-By: Claude Sonnet 4.6 --- backend/src/mastra/run-metrics.ts | 2 +- backend/src/mastra/save-run-metrics.ts | 4 ++-- frontend/convex/{populateRuns.ts => runStats.ts} | 12 ++++++------ frontend/convex/schema.ts | 2 +- 4 files changed, 10 insertions(+), 10 deletions(-) rename frontend/convex/{populateRuns.ts => runStats.ts} (88%) diff --git a/backend/src/mastra/run-metrics.ts b/backend/src/mastra/run-metrics.ts index 70c0c61..5a06896 100644 --- a/backend/src/mastra/run-metrics.ts +++ b/backend/src/mastra/run-metrics.ts @@ -3,7 +3,7 @@ * * A single RunMetrics instance is created at the start of each agentStep, * passed by reference into every tool factory and agent builder, and read - * once at the end to write the populateRuns Convex record. + * once at the end to write the runStats Convex record. * * All operations are synchronous integer increments or field reads — zero * I/O, zero meaningful overhead on the hot path. diff --git a/backend/src/mastra/save-run-metrics.ts b/backend/src/mastra/save-run-metrics.ts index d10440f..e0b771c 100644 --- a/backend/src/mastra/save-run-metrics.ts +++ b/backend/src/mastra/save-run-metrics.ts @@ -14,14 +14,14 @@ export interface SaveRunMetricsInput { } /** - * Persist a completed run's metrics to the populateRuns Convex table. + * Persist a completed run's metrics to the runStats Convex table. * * Called from the agentStep finally-block as a fire-and-forget operation — * any error here is logged but must never propagate to the populate workflow. */ export async function saveRunMetrics(input: SaveRunMetricsInput): Promise { const totals = input.metrics.totals(); - await convex.mutation(internal.populateRuns.insert, { + await convex.mutation(internal.runStats.insert, { workflowRunId: input.workflowRunId, datasetId: input.datasetId, userId: input.userId, diff --git a/frontend/convex/populateRuns.ts b/frontend/convex/runStats.ts similarity index 88% rename from frontend/convex/populateRuns.ts rename to frontend/convex/runStats.ts index 851c60c..9886db2 100644 --- a/frontend/convex/populateRuns.ts +++ b/frontend/convex/runStats.ts @@ -38,12 +38,12 @@ export const insert = internalMutation({ }, handler: async (ctx, args) => { if (args.status === "success" && args.error) { - throw new Error("populateRuns.insert: error must be absent on a successful run"); + throw new Error("runStats.insert: error must be absent on a successful run"); } if (args.status === "error" && !args.error) { - throw new Error("populateRuns.insert: error message is required on a failed run"); + throw new Error("runStats.insert: error message is required on a failed run"); } - await ctx.db.insert("populateRuns", args); + await ctx.db.insert("runStats", args); }, }); @@ -55,7 +55,7 @@ export const getByWorkflowRunId = internalQuery({ args: { workflowRunId: v.string() }, handler: async (ctx, args) => { return await ctx.db - .query("populateRuns") + .query("runStats") .withIndex("by_workflow_run", (q) => q.eq("workflowRunId", args.workflowRunId), ) @@ -70,7 +70,7 @@ export const listByDataset = internalQuery({ args: { datasetId: v.string() }, handler: async (ctx, args) => { const runs = await ctx.db - .query("populateRuns") + .query("runStats") .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) .collect(); return runs.sort((a, b) => b.startedAt - a.startedAt); @@ -84,7 +84,7 @@ export const listByUser = internalQuery({ args: { userId: v.string() }, handler: async (ctx, args) => { const runs = await ctx.db - .query("populateRuns") + .query("runStats") .withIndex("by_user", (q) => q.eq("userId", args.userId)) .collect(); return runs.sort((a, b) => b.startedAt - a.startedAt); diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index 65feebc..aa35a5d 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -104,7 +104,7 @@ export default defineSchema({ // (success or error) by the backend agent runner — never by the frontend. // Tracks tool-call counts, token usage, and timing so runs can be // compared across datasets, users, and benchmark sessions. - populateRuns: defineTable({ + runStats: defineTable({ workflowRunId: v.string(), // v.string() (not v.id) so benchmark runs can use synthetic dataset ids // without needing a real Convex dataset document. From d5a4370bb56a5b27499f21b9b4035ae6a9df7fce Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 22:53:57 -0700 Subject: [PATCH 4/8] Fix searchCalls/fetchCalls always 0 and token undercounting Two bugs in the Mastra FullOutput step scanning: 1. Tool name is nested at tc.payload.toolName, not tc.toolName Mastra's ToolCallChunk wraps everything in a payload object (type: 'tool-call', payload: { toolName, toolCallId, args }). The scan now reads tc.payload?.toolName with tc.toolName as fallback. 2. result.usage is last-step tokens only; result.totalUsage is the sum across all steps (per Mastra FullOutput type definition). Switch addOrchestratorResult/addInvestigateResult to prefer totalUsage so multi-step runs aren't undercounted. Co-Authored-By: Claude Sonnet 4.6 --- backend/src/mastra/run-metrics.ts | 23 +++++++++++++++----- backend/src/mastra/tools/investigate-tool.ts | 5 +++-- backend/src/mastra/workflows/populate.ts | 5 +++-- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/backend/src/mastra/run-metrics.ts b/backend/src/mastra/run-metrics.ts index 5a06896..031dbe4 100644 --- a/backend/src/mastra/run-metrics.ts +++ b/backend/src/mastra/run-metrics.ts @@ -10,14 +10,25 @@ */ interface AgentResult { + // Mastra FullOutput: totalUsage sums across all steps; usage is last-step only. + // Prefer totalUsage for accurate multi-step accounting. + totalUsage?: { inputTokens?: number; outputTokens?: number }; usage?: { inputTokens?: number; outputTokens?: number }; steps?: unknown[]; } +function tokens(result: AgentResult): { input: number; output: number } { + const src = result.totalUsage ?? result.usage; + return { + input: src?.inputTokens ?? 0, + output: src?.outputTokens ?? 0, + }; +} + export class RunMetrics { searchCalls = 0; fetchCalls = 0; - /** investigate_row tool calls dispatched by the orchestrator. */ + /** run_subagent tool calls dispatched by the orchestrator. */ investigateCalls = 0; /** Rows successfully inserted across all investigate subagents. */ rowsInserted = 0; @@ -31,14 +42,16 @@ export class RunMetrics { }; addOrchestratorResult(result: AgentResult): void { - this.orchestrator.inputTokens += result.usage?.inputTokens ?? 0; - this.orchestrator.outputTokens += result.usage?.outputTokens ?? 0; + const { input, output } = tokens(result); + this.orchestrator.inputTokens += input; + this.orchestrator.outputTokens += output; this.orchestrator.steps += result.steps?.length ?? 0; } addInvestigateResult(result: AgentResult): void { - this.investigate.inputTokens += result.usage?.inputTokens ?? 0; - this.investigate.outputTokens += result.usage?.outputTokens ?? 0; + const { input, output } = tokens(result); + this.investigate.inputTokens += input; + this.investigate.outputTokens += output; this.investigate.steps += result.steps?.length ?? 0; this.investigate.runs += 1; } diff --git a/backend/src/mastra/tools/investigate-tool.ts b/backend/src/mastra/tools/investigate-tool.ts index 4560b6f..ed7de20 100644 --- a/backend/src/mastra/tools/investigate-tool.ts +++ b/backend/src/mastra/tools/investigate-tool.ts @@ -116,8 +116,9 @@ ${context}${urlsBlock}${notesBlock}`; if (metrics) { for (const step of (result.steps ?? []) as any[]) { for (const tc of (step.toolCalls ?? []) as any[]) { - if (tc.toolName === "search_web") metrics.searchCalls++; - else if (tc.toolName === "fetch_page") metrics.fetchCalls++; + const name = tc.payload?.toolName ?? tc.toolName; + if (name === "search_web") metrics.searchCalls++; + else if (name === "fetch_page") metrics.fetchCalls++; } } metrics.addInvestigateResult(result); diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index f5bd2a5..470733f 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -237,8 +237,9 @@ const agentStep = createStep({ metrics.addOrchestratorResult(result); for (const step of (result.steps ?? []) as any[]) { for (const tc of (step.toolCalls ?? []) as any[]) { - if (tc.toolName === "search_web") metrics.searchCalls++; - else if (tc.toolName === "fetch_page") metrics.fetchCalls++; + const name = tc.payload?.toolName ?? tc.toolName; + if (name === "search_web") metrics.searchCalls++; + else if (name === "fetch_page") metrics.fetchCalls++; } } return { text: result.text }; From 97904b3838a6664cccb4447d83dfc3de6f22dc73 Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Thu, 28 May 2026 00:05:54 -0700 Subject: [PATCH 5/8] Fix tool call counting: use result.toolCalls instead of step iteration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per-step toolCalls snapshots are captured at step-finish time. Tool-call chunks that arrive after their step's step-finish event (e.g. non-streaming tool-call events that follow a streaming-end synthetic) land in the next step's buffer, causing systematic miscounts. result.toolCalls is the flat accumulated list maintained by Mastra's stream processor as chunks arrive — it is the authoritative source. Using it directly avoids the step-ordering sensitivity entirely. Also adds toolCalls length to the [run_subagent] done log line for easier verification. Co-Authored-By: Claude Sonnet 4.6 --- backend/src/mastra/tools/investigate-tool.ts | 18 +++++++++++------- backend/src/mastra/workflows/populate.ts | 11 +++++------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/backend/src/mastra/tools/investigate-tool.ts b/backend/src/mastra/tools/investigate-tool.ts index ed7de20..57cb455 100644 --- a/backend/src/mastra/tools/investigate-tool.ts +++ b/backend/src/mastra/tools/investigate-tool.ts @@ -114,12 +114,16 @@ ${context}${urlsBlock}${notesBlock}`; const result = await agent.generate(prompt, { maxSteps: 10 }); if (metrics) { - for (const step of (result.steps ?? []) as any[]) { - for (const tc of (step.toolCalls ?? []) as any[]) { - const name = tc.payload?.toolName ?? tc.toolName; - if (name === "search_web") metrics.searchCalls++; - else if (name === "fetch_page") metrics.fetchCalls++; - } + // Use result.toolCalls (the flat accumulated list across all steps) rather + // than iterating result.steps[n].toolCalls. The per-step arrays are snapshots + // captured at step-finish time; tool-call chunks that arrive after their + // step-finish event end up attributed to the wrong step, causing systematic + // miscounts. result.toolCalls is the authoritative list maintained by Mastra's + // stream processor as chunks arrive. + for (const tc of (result.toolCalls ?? []) as any[]) { + const name = tc.payload?.toolName ?? tc.toolName; + if (name === "search_web") metrics.searchCalls++; + else if (name === "fetch_page") metrics.fetchCalls++; } metrics.addInvestigateResult(result); } @@ -128,7 +132,7 @@ ${context}${urlsBlock}${notesBlock}`; if (metrics && parsed.inserted) metrics.rowsInserted++; console.log( - `[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"}` + + `[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"} toolCalls=${result.toolCalls?.length ?? "?"}` + (parsed.row_summary ? `\n summary: ${parsed.row_summary}` : "") + (parsed.reason ? `\n reason: ${parsed.reason}` : "") + (parsed.clues ? `\n clues: ${parsed.clues}` : ""), diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index 470733f..4e62ff0 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -235,12 +235,11 @@ const agentStep = createStep({ ); const result = await agent.generate(inputData.prompt, { maxSteps: 80 }); metrics.addOrchestratorResult(result); - for (const step of (result.steps ?? []) as any[]) { - for (const tc of (step.toolCalls ?? []) as any[]) { - const name = tc.payload?.toolName ?? tc.toolName; - if (name === "search_web") metrics.searchCalls++; - else if (name === "fetch_page") metrics.fetchCalls++; - } + // Use result.toolCalls (flat accumulated list) — same reasoning as investigate-tool.ts. + for (const tc of (result.toolCalls ?? []) as any[]) { + const name = tc.payload?.toolName ?? tc.toolName; + if (name === "search_web") metrics.searchCalls++; + else if (name === "fetch_page") metrics.fetchCalls++; } return { text: result.text }; } catch (err) { From d0d34f1cf299ea3ddf3253c85851231eb0494d9a Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 16:19:41 -0700 Subject: [PATCH 6/8] Add benchmark runner for populate workflow Adds a benchmark suite that exercises the real populateWorkflow directly (no HTTP layer) so metrics reflect actual agent performance. Results are stored in the same populateRuns Convex table as live app sessions, tagged with isBenchmark: true for filtering. - benchmarks/run.mts: runner that creates temporary Convex datasets, calls populateWorkflow, reads back metrics, and emits a JSON summary - benchmarks/prompts.json: 4 test prompts (YC companies, B2B SaaS, national parks, AI research labs) - benchmarks/README.md: instructions for running benchmarks and querying live run stats via the Convex dashboard or CLI - datasets.createInternal / deleteInternal: Convex mutations for benchmark dataset lifecycle (no Clerk auth required) - make benchmark target for convenient invocation Co-Authored-By: Claude Sonnet 4.6 --- benchmarks/README.md | 254 ++++++++++++++++++++++++ benchmarks/prompts.json | 46 +++++ benchmarks/run.mts | 373 ++++++++++++++++++++++++++++++++++++ frontend/convex/datasets.ts | 47 +++++ makefiles/Makefile | 9 +- 5 files changed, 728 insertions(+), 1 deletion(-) create mode 100644 benchmarks/README.md create mode 100644 benchmarks/prompts.json create mode 100644 benchmarks/run.mts diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000..fc5571e --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,254 @@ +# BigSet Benchmarks + +This directory contains the benchmark runner for the BigSet populate workflow. It exercises the **real** populate pipeline — same agent code, same Convex writes, same TinyFish API calls — so the metrics reflect actual agent performance rather than a simulation. + +Every populate run (benchmark or real user session) automatically records its stats to the `populateRuns` Convex table. This means you can analyze cost and performance for both benchmark runs and live app usage from the same place. + +--- + +## How metrics are collected + +When a user clicks "Populate" in the app, or when the benchmark runner triggers a run, the workflow instruments every tool call automatically: + +| Metric | What it counts | +|---|---| +| `searchCalls` | Calls to `search_web` (TinyFish search API) | +| `fetchCalls` | Calls to `fetch_page` (TinyFish fetch API) | +| `investigateCalls` | `investigate_row` dispatches from the orchestrator | +| `rowsInserted` | Rows successfully inserted into the dataset | +| `tokensInput` / `tokensOutput` | Total LLM tokens across all agents | +| `orchestratorTokens*` / `investigateTokens*` | Token breakdown per agent tier | +| `orchestratorSteps` / `investigateSteps` | Agent reasoning steps per tier | +| `investigateRuns` | How many investigate subagents completed | +| `durationMs` | Wall-clock time for the full populate run | + +Each run also records `status` (`success` / `error`), any error message, and an `isBenchmark` flag so you can filter benchmark runs from real sessions. + +--- + +## Prerequisites + +1. The dev stack must be running: `make dev` +2. Your root `.env` must have all required keys: + - `OPENROUTER_API_KEY` — for LLM calls + - `TINYFISH_API_KEY` — for web search and fetch + - `CONVEX_URL` — Convex instance URL (default: `http://127.0.0.1:3210`) + - `CONVEX_SELF_HOSTED_ADMIN_KEY` — to write benchmark datasets and read run records +3. `make convex-push` must have been run after the last schema change (or `make dev` does this automatically on startup) + +--- + +## Running benchmarks + +### Run all prompts + +```bash +make benchmark +``` + +This runs all 4 prompts in `benchmarks/prompts.json` sequentially, prints a JSON summary to stdout, and cleans up the temporary Convex datasets afterward. + +### Run a single prompt + +```bash +make benchmark ARGS="--prompt yc-recent-batch-companies" +``` + +Available prompt IDs (defined in [`prompts.json`](./prompts.json)): + +| ID | Dataset | +|---|---| +| `yc-recent-batch-companies` | YC W24/S24 companies | +| `b2b-saas-free-tier` | B2B SaaS tools with free tiers | +| `us-national-parks` | US National Parks | +| `ai-research-labs` | University AI research labs | + +### Save results to disk + +```bash +make benchmark ARGS="--out benchmark-results/" +``` + +Writes a timestamped JSON file to `benchmark-results/`. Useful for tracking performance across runs over time. + +### Keep the datasets after a run (for inspection) + +```bash +make benchmark ARGS="--no-cleanup" +``` + +The benchmark datasets will remain in your Convex instance under the owner `benchmark-runner`. You can inspect them in the Convex dashboard at http://localhost:6791. + +### Run multiple prompts in parallel + +```bash +make benchmark ARGS="--concurrency 2" +``` + +Runs up to 2 prompts concurrently. Note: parallel runs share TinyFish and OpenRouter rate limits — start with concurrency 1 for a clean baseline. + +### Run directly with Node (no Make) + +```bash +cd backend +npx tsx ../benchmarks/run.mts --prompt us-national-parks --out ../benchmark-results/ +``` + +--- + +## Understanding the JSON output + +The benchmark emits a JSON object to stdout when it finishes. Example: + +```json +{ + "completedAt": "2025-01-15T10:30:00.000Z", + "promptCount": 4, + "successCount": 4, + "failureCount": 0, + "aggregate": { + "rowsInserted": 62, + "searchCalls": 47, + "fetchCalls": 83, + "investigateCalls": 68, + "tokensInput": 1240000, + "tokensOutput": 84000, + "orchestratorSteps": 32, + "investigateSteps": 412, + "investigateRuns": 68, + "durationMs": 1842000 + }, + "perRunAverages": { + "rowsInserted": 15.5, + "searchCalls": 11.8, + "fetchCalls": 20.8, + "investigateCalls": 17.0, + "tokensTotal": 331000, + "durationSeconds": 460.5 + }, + "runs": [ + { + "promptId": "yc-recent-batch-companies", + "workflowRunId": "benchmark-yc-recent-batch-companies-a1b2c3d4", + "status": "success", + "durationMs": 430000, + "metrics": { + "rowsInserted": 18, + "searchCalls": 12, + "fetchCalls": 22, + "investigateCalls": 19, + "tokensInput": 310000, + "tokensOutput": 21000, + "orchestratorTokensInput": 95000, + "orchestratorTokensOutput": 4000, + "orchestratorSteps": 8, + "investigateTokensInput": 215000, + "investigateTokensOutput": 17000, + "investigateSteps": 98, + "investigateRuns": 19 + } + } + ] +} +``` + +--- + +## Viewing run stats from actual app sessions + +Every populate run triggered by a real user through the app UI is also recorded in `populateRuns`. You can query it directly via the Convex dashboard or CLI. + +### Convex dashboard + +Open http://localhost:6791 → select your Convex instance → go to **Data** → **populateRuns**. + +Each row corresponds to one populate run. The `isBenchmark` field is `true` for benchmark runs and absent/`undefined` for real user sessions. + +### Convex CLI queries + +All queries below use the internal query functions added alongside the schema. Run them with `npx convex run` from the `frontend/` directory (or via the dashboard's **Functions** tab). + +**List all runs for a specific dataset:** +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run populateRuns:listByDataset \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"datasetId": ""}' +``` + +**List all runs for a specific user:** +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run populateRuns:listByUser \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"userId": ""}' +``` + +**Fetch a single run by workflow run ID:** +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run populateRuns:getByWorkflowRunId \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"workflowRunId": ""}' +``` + +The `workflowRunId` appears in the backend logs when a populate starts: +``` +[populate-agent] populate-agent start +``` +and also in the HTTP response from `POST /populate` as the `runId` field. + +### Exporting all benchmark runs to JSON + +To export all benchmark runs for offline analysis: + +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run populateRuns:listByUser \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"userId": "benchmark-runner"}' | jq '.' > benchmark-history.json +``` + +--- + +## Adding custom prompts + +Edit [`prompts.json`](./prompts.json) to add your own benchmark prompts. Each entry needs: + +```json +{ + "id": "my-prompt-id", + "datasetName": "Human-readable dataset name", + "description": "What the dataset is about — shown to the agent", + "columns": [ + { "name": "column_name", "type": "text", "description": "What this field is" } + ] +} +``` + +Column types: `text`, `number`, `boolean`, `url`, `date`. + +Then run: +```bash +make benchmark ARGS="--prompt my-prompt-id" +``` + +--- + +## Cost estimation + +Rough estimates based on Kimi K2 pricing (as of writing): + +| Per run (20 rows target) | Approximate cost | +|---|---| +| Input tokens (~300k) | ~$0.84 | +| Output tokens (~20k) | ~$0.20 | +| TinyFish search (~12 calls) | ~$0.06 | +| TinyFish fetch (~20 calls) | ~$0.10 | +| **Total per run** | **~$1.20** | + +These numbers come from the `tokensInput` / `tokensOutput` fields in `populateRuns`. Actual costs vary by dataset complexity and row count achieved. diff --git a/benchmarks/prompts.json b/benchmarks/prompts.json new file mode 100644 index 0000000..bf11c5e --- /dev/null +++ b/benchmarks/prompts.json @@ -0,0 +1,46 @@ +[ + { + "id": "yc-recent-batch-companies", + "datasetName": "YC Recent Batch Companies", + "description": "Companies from recent Y Combinator batches (W24, S24). Include name, website, one-line description, and batch.", + "columns": [ + { "name": "company_name", "type": "text", "description": "Company name" }, + { "name": "website", "type": "url", "description": "Company website URL" }, + { "name": "description", "type": "text", "description": "One-line description of what the company does" }, + { "name": "batch", "type": "text", "description": "YC batch, e.g. W24 or S24" } + ] + }, + { + "id": "b2b-saas-free-tier", + "datasetName": "B2B SaaS with Free Tiers", + "description": "Popular B2B SaaS tools that offer a free tier or freemium plan. Include tool name, category, free-tier summary, and pricing page URL.", + "columns": [ + { "name": "tool_name", "type": "text", "description": "Name of the SaaS tool" }, + { "name": "category", "type": "text", "description": "Category, e.g. CRM, Analytics, DevOps" }, + { "name": "free_tier_summary", "type": "text", "description": "What the free tier includes" }, + { "name": "pricing_page_url", "type": "url", "description": "URL to the pricing page" } + ] + }, + { + "id": "us-national-parks", + "datasetName": "US National Parks", + "description": "All US National Parks with state, year established, and official NPS page URL.", + "columns": [ + { "name": "park_name", "type": "text", "description": "Official park name" }, + { "name": "state", "type": "text", "description": "US state(s) the park is in" }, + { "name": "established_year", "type": "number", "description": "Year the park was established" }, + { "name": "official_page_url", "type": "url", "description": "URL on nps.gov" } + ] + }, + { + "id": "ai-research-labs", + "datasetName": "University AI Research Labs", + "description": "Academic AI research labs at major universities. Include lab name, university, research focus, and lab website URL.", + "columns": [ + { "name": "lab_name", "type": "text", "description": "Name of the research lab" }, + { "name": "university", "type": "text", "description": "Host university" }, + { "name": "research_focus", "type": "text", "description": "Primary research area, e.g. NLP, robotics, RL" }, + { "name": "lab_website_url", "type": "url", "description": "Lab homepage URL" } + ] + } +] diff --git a/benchmarks/run.mts b/benchmarks/run.mts new file mode 100644 index 0000000..c1758a2 --- /dev/null +++ b/benchmarks/run.mts @@ -0,0 +1,373 @@ +#!/usr/bin/env node +/** + * Benchmark runner for the BigSet populate workflow. + * + * Calls the real populateWorkflow directly (no HTTP layer) using the + * same code path as production app sessions. Metrics are collected by + * the instrumented workflow and written to the populateRuns Convex table. + * After each run the script reads those metrics back and emits a JSON + * summary. + * + * Usage (from repo root): + * node --import tsx/esm benchmarks/run.mts [options] + * + * Options: + * --prompt Run only the prompt with this id (repeatable) + * --prompts Path to prompts JSON (default: benchmarks/prompts.json) + * --out Write per-run JSON artifacts to this directory + * --no-cleanup Keep Convex datasets after benchmark (default: delete them) + * --concurrency Max parallel prompt runs (default: 1) + */ + +import { readFile, mkdir, writeFile } from "node:fs/promises"; +import { existsSync, readFileSync } from "node:fs"; +import { resolve, dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { randomUUID } from "node:crypto"; + +// ─── Load root .env before importing backend modules ──────────────────────── + +const repoRoot = resolve(dirname(fileURLToPath(import.meta.url)), ".."); +const rootEnvPath = join(repoRoot, ".env"); + +if (existsSync(rootEnvPath)) { + for (const line of readFileSync(rootEnvPath, "utf8").split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + const sep = trimmed.indexOf("="); + if (sep <= 0) continue; + const key = trimmed.slice(0, sep).trim(); + let val = trimmed.slice(sep + 1).trim(); + if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) + val = val.slice(1, -1); + process.env[key] ??= val; + } +} + +// ─── Imports (after env is loaded) ────────────────────────────────────────── + +// @ts-ignore — importing from sibling package; resolved at runtime via Node module resolution +import { populateWorkflow } from "../backend/src/mastra/workflows/populate.js"; +// @ts-ignore +import { convex, internal } from "../backend/src/convex.js"; + +// ─── Types ─────────────────────────────────────────────────────────────────── + +interface PromptDefinition { + id: string; + datasetName: string; + description: string; + columns: Array<{ + name: string; + type: "text" | "number" | "boolean" | "url" | "date"; + description?: string; + }>; +} + +interface RunResult { + promptId: string; + datasetName: string; + workflowRunId: string; + datasetId: string; + status: "success" | "error" | "timeout"; + error?: string; + metrics?: PopulateRunRecord; + durationMs: number; +} + +interface PopulateRunRecord { + workflowRunId: string; + datasetId: string; + userId: string; + startedAt: number; + finishedAt: number; + durationMs: number; + searchCalls: number; + fetchCalls: number; + investigateCalls: number; + rowsInserted: number; + tokensInput: number; + tokensOutput: number; + orchestratorTokensInput: number; + orchestratorTokensOutput: number; + orchestratorSteps: number; + investigateTokensInput: number; + investigateTokensOutput: number; + investigateSteps: number; + investigateRuns: number; + status: "success" | "error"; + error?: string; + isBenchmark?: boolean; +} + +// ─── CLI args ──────────────────────────────────────────────────────────────── + +function parseArgs(argv: string[]) { + const result = { + promptIds: [] as string[], + promptsFile: join(dirname(fileURLToPath(import.meta.url)), "prompts.json"), + outDir: null as string | null, + cleanup: true, + concurrency: 1, + }; + for (let i = 0; i < argv.length; i++) { + const arg = argv[i]; + if (arg === "--prompt") result.promptIds.push(argv[++i]); + else if (arg === "--prompts") result.promptsFile = argv[++i]; + else if (arg === "--out") result.outDir = argv[++i]; + else if (arg === "--no-cleanup") result.cleanup = false; + else if (arg === "--concurrency") result.concurrency = parseInt(argv[++i], 10); + } + return result; +} + +// ─── Convex helpers ────────────────────────────────────────────────────────── + +async function createBenchmarkDataset( + name: string, + description: string, + columns: PromptDefinition["columns"], +): Promise { + return await convex.mutation(internal.datasets.createInternal, { + name, + description, + columns, + ownerId: "benchmark-runner", + cadence: "manual", + visibility: "private", + }); +} + +async function deleteBenchmarkDataset(datasetId: string): Promise { + await convex.mutation(internal.datasets.deleteInternal, { id: datasetId }); +} + +async function fetchRunMetrics(workflowRunId: string): Promise { + return await convex.query(internal.populateRuns.getByWorkflowRunId, { workflowRunId }); +} + +// ─── Run a single prompt ───────────────────────────────────────────────────── + +async function runPrompt( + prompt: PromptDefinition, + config: ReturnType, +): Promise { + const wallStart = Date.now(); + const workflowRunId = `benchmark-${prompt.id}-${randomUUID().slice(0, 8)}`; + let datasetId = ""; + + console.error(`\n[benchmark] ▶ ${prompt.id}`); + + try { + datasetId = await createBenchmarkDataset( + `[benchmark] ${prompt.datasetName}`, + prompt.description, + prompt.columns, + ); + console.error(`[benchmark] created dataset ${datasetId}`); + + const run = await populateWorkflow.createRun({ runId: workflowRunId }); + const result = await run.start({ + inputData: { + datasetId, + datasetName: prompt.datasetName, + description: prompt.description, + columns: prompt.columns, + authContext: { + authorizedUserId: "benchmark-runner", + workflowRunId, + isBenchmark: true, + }, + }, + }); + + const wallDuration = Date.now() - wallStart; + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + // Give the fire-and-forget metrics save a moment to land in Convex + await sleep(2000); + const metrics = await fetchRunMetrics(workflowRunId); + + console.error( + `[benchmark] ✓ ${prompt.id} rows=${metrics?.rowsInserted ?? "?"} ` + + `tokens=${(metrics?.tokensInput ?? 0) + (metrics?.tokensOutput ?? 0)} ` + + `duration=${(wallDuration / 1000).toFixed(1)}s`, + ); + + return { + promptId: prompt.id, + datasetName: prompt.datasetName, + workflowRunId, + datasetId, + status: "success", + metrics: metrics ?? undefined, + durationMs: wallDuration, + }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[benchmark] ✗ ${prompt.id} error: ${msg}`); + + // Still try to fetch any partial metrics that landed + await sleep(1000); + const metrics = await fetchRunMetrics(workflowRunId).catch(() => null); + + return { + promptId: prompt.id, + datasetName: prompt.datasetName, + workflowRunId, + datasetId, + status: "error", + error: msg, + metrics: metrics ?? undefined, + durationMs: Date.now() - wallStart, + }; + } finally { + if (config.cleanup && datasetId) { + await deleteBenchmarkDataset(datasetId).catch((err) => + console.error(`[benchmark] cleanup failed for ${datasetId}:`, err), + ); + } + } +} + +// ─── Aggregate summary ─────────────────────────────────────────────────────── + +function buildSummary(results: RunResult[]) { + const successful = results.filter((r) => r.status === "success" && r.metrics); + const failed = results.filter((r) => r.status !== "success"); + + const totals = successful.reduce( + (acc, r) => { + const m = r.metrics!; + acc.rowsInserted += m.rowsInserted; + acc.searchCalls += m.searchCalls; + acc.fetchCalls += m.fetchCalls; + acc.investigateCalls += m.investigateCalls; + acc.tokensInput += m.tokensInput; + acc.tokensOutput += m.tokensOutput; + acc.orchestratorSteps += m.orchestratorSteps; + acc.investigateSteps += m.investigateSteps; + acc.investigateRuns += m.investigateRuns; + acc.durationMs += r.durationMs; + return acc; + }, + { + rowsInserted: 0, + searchCalls: 0, + fetchCalls: 0, + investigateCalls: 0, + tokensInput: 0, + tokensOutput: 0, + orchestratorSteps: 0, + investigateSteps: 0, + investigateRuns: 0, + durationMs: 0, + }, + ); + + const n = successful.length || 1; + + return { + completedAt: new Date().toISOString(), + promptCount: results.length, + successCount: successful.length, + failureCount: failed.length, + aggregate: totals, + perRunAverages: { + rowsInserted: +(totals.rowsInserted / n).toFixed(1), + searchCalls: +(totals.searchCalls / n).toFixed(1), + fetchCalls: +(totals.fetchCalls / n).toFixed(1), + investigateCalls: +(totals.investigateCalls / n).toFixed(1), + tokensTotal: +((totals.tokensInput + totals.tokensOutput) / n).toFixed(0), + durationSeconds: +((totals.durationMs / n / 1000).toFixed(1)), + }, + runs: results.map((r) => ({ + promptId: r.promptId, + datasetName: r.datasetName, + workflowRunId: r.workflowRunId, + status: r.status, + error: r.error, + durationMs: r.durationMs, + metrics: r.metrics + ? { + rowsInserted: r.metrics.rowsInserted, + searchCalls: r.metrics.searchCalls, + fetchCalls: r.metrics.fetchCalls, + investigateCalls: r.metrics.investigateCalls, + tokensInput: r.metrics.tokensInput, + tokensOutput: r.metrics.tokensOutput, + orchestratorTokensInput: r.metrics.orchestratorTokensInput, + orchestratorTokensOutput: r.metrics.orchestratorTokensOutput, + orchestratorSteps: r.metrics.orchestratorSteps, + investigateTokensInput: r.metrics.investigateTokensInput, + investigateTokensOutput: r.metrics.investigateTokensOutput, + investigateSteps: r.metrics.investigateSteps, + investigateRuns: r.metrics.investigateRuns, + } + : null, + })), + }; +} + +// ─── Main ──────────────────────────────────────────────────────────────────── + +async function main() { + const config = parseArgs(process.argv.slice(2)); + + // Validate required env vars + const missing = ["OPENROUTER_API_KEY", "TINYFISH_API_KEY", "CONVEX_URL", "CONVEX_SELF_HOSTED_ADMIN_KEY"].filter( + (k) => !process.env[k], + ); + if (missing.length) { + console.error(`[benchmark] Missing required env vars: ${missing.join(", ")}`); + console.error("Copy .env.example to .env and fill in the values, then re-run."); + process.exit(1); + } + + const allPrompts: PromptDefinition[] = JSON.parse( + await readFile(config.promptsFile, "utf8"), + ); + const prompts = + config.promptIds.length > 0 + ? allPrompts.filter((p) => config.promptIds.includes(p.id)) + : allPrompts; + + if (prompts.length === 0) { + console.error(`[benchmark] No matching prompts found.`); + process.exit(1); + } + + console.error(`[benchmark] Running ${prompts.length} prompt(s) (concurrency=${config.concurrency})`); + + const results: RunResult[] = []; + + // Run in batches of config.concurrency + for (let i = 0; i < prompts.length; i += config.concurrency) { + const batch = prompts.slice(i, i + config.concurrency); + const batchResults = await Promise.all(batch.map((p) => runPrompt(p, config))); + results.push(...batchResults); + } + + const summary = buildSummary(results); + + if (config.outDir) { + await mkdir(config.outDir, { recursive: true }); + const outFile = join(config.outDir, `benchmark-${Date.now()}.json`); + await writeFile(outFile, JSON.stringify(summary, null, 2)); + console.error(`[benchmark] Results written to ${outFile}`); + } + + // Emit JSON to stdout for piping / CI capture + console.log(JSON.stringify(summary, null, 2)); +} + +function sleep(ms: number) { + return new Promise((r) => setTimeout(r, ms)); +} + +main().catch((err) => { + console.error("[benchmark] Fatal:", err); + process.exit(1); +}); diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 8637a39..cab4fe3 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -248,6 +248,53 @@ export const remove = mutation({ }, }); +/** + * Create a dataset without Clerk auth. Used exclusively by the benchmark + * runner, which calls populateWorkflow directly (no HTTP layer) and + * therefore has no Clerk identity. The ownerId must be supplied by the + * caller; for benchmark runs pass "benchmark-runner". + */ +export const createInternal = internalMutation({ + args: { + name: v.string(), + description: v.string(), + cadence: v.string(), + columns: v.array(columnValidator), + ownerId: v.string(), + visibility: v.optional(v.union(v.literal("public"), v.literal("private"))), + }, + handler: async (ctx, args) => { + return await ctx.db.insert("datasets", { + name: args.name, + description: args.description, + cadence: args.cadence, + columns: args.columns, + ownerId: args.ownerId, + status: "paused", + visibility: args.visibility ?? "private", + rowCount: 0, + }); + }, +}); + +/** + * Delete a dataset and all its rows without Clerk auth. Used exclusively + * by the benchmark runner for cleanup after a benchmark run. + */ +export const deleteInternal = internalMutation({ + args: { id: v.id("datasets") }, + handler: async (ctx, args) => { + const rows = await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.id)) + .collect(); + for (const row of rows) { + await ctx.db.delete(row._id); + } + await ctx.db.delete(args.id); + }, +}); + /** * One-shot migration: scan every dataset, count its rows, and patch * `rowCount` to the true value. Idempotent and safe to re-run. diff --git a/makefiles/Makefile b/makefiles/Makefile index fb1778e..df56046 100644 --- a/makefiles/Makefile +++ b/makefiles/Makefile @@ -1,4 +1,4 @@ -.PHONY: all dev validate-dev-env down clean convex-push convex-env seed-public-datasets +.PHONY: all dev validate-dev-env down clean convex-push convex-env seed-public-datasets benchmark all: dev @@ -70,6 +70,13 @@ seed-public-datasets: @test -f .env || { echo "Error: .env not found. Run: cp .env.example .env"; exit 1; } @cd frontend && node ../scripts/with-root-env.mjs npx convex run publicSeed:seedPublicDatasets +## Run the benchmark suite against the real populate workflow. +## Requires: make dev running, .env filled in. +## Pass ARGS to forward CLI flags, e.g.: +## make benchmark ARGS="--prompt yc-recent-batch-companies --no-cleanup" +benchmark: + cd backend && npx tsx ../benchmarks/run.mts $(ARGS) + down: docker compose -f docker-compose.dev.yml down From 253b96022eae4219564745ebc9f97b50127ea141 Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 17:55:00 -0700 Subject: [PATCH 7/8] Update benchmark prompts and README for post-rebase correctness MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add isPrimaryKey: true to the natural PK column in each of the 4 benchmark prompts so the orchestrator gets explicit pk guidance and deduplication works correctly with the run_subagent tool's .refine() enforcement - Fix investigateCalls description: investigate_row → run_subagent - Add note that enumeration-step tokens are not captured in RunMetrics - Update custom-prompt example to show isPrimaryKey usage - Update cost table model name: Kimi K2 → DeepSeek V4 Pro Co-Authored-By: Claude Sonnet 4.6 --- benchmarks/README.md | 11 ++++++++--- benchmarks/prompts.json | 8 ++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index fc5571e..0b4c43b 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -14,7 +14,7 @@ When a user clicks "Populate" in the app, or when the benchmark runner triggers |---|---| | `searchCalls` | Calls to `search_web` (TinyFish search API) | | `fetchCalls` | Calls to `fetch_page` (TinyFish fetch API) | -| `investigateCalls` | `investigate_row` dispatches from the orchestrator | +| `investigateCalls` | `run_subagent` dispatches from the orchestrator | | `rowsInserted` | Rows successfully inserted into the dataset | | `tokensInput` / `tokensOutput` | Total LLM tokens across all agents | | `orchestratorTokens*` / `investigateTokens*` | Token breakdown per agent tier | @@ -24,6 +24,8 @@ When a user clicks "Populate" in the app, or when the benchmark runner triggers Each run also records `status` (`success` / `error`), any error message, and an `isBenchmark` flag so you can filter benchmark runs from real sessions. +> **Note:** The workflow includes an enumeration classification step that calls an LLM directly (not through an agent) to decide whether to use a scraper or search strategy. The tokens used by that step are **not** captured in the metrics above — they're a small fixed cost per run (~100–200 input tokens, ~5 output tokens) but worth knowing if you're doing precise cost accounting. + --- ## Prerequisites @@ -225,13 +227,16 @@ Edit [`prompts.json`](./prompts.json) to add your own benchmark prompts. Each en "datasetName": "Human-readable dataset name", "description": "What the dataset is about — shown to the agent", "columns": [ - { "name": "column_name", "type": "text", "description": "What this field is" } + { "name": "entity_name", "type": "text", "description": "The entity name", "isPrimaryKey": true }, + { "name": "other_field", "type": "text", "description": "What this field is" } ] } ``` Column types: `text`, `number`, `boolean`, `url`, `date`. +Mark at least one column as `"isPrimaryKey": true` — the orchestrator uses this to tell subagents which field is the unique identifier, and the workflow uses it to reject duplicate rows automatically. + Then run: ```bash make benchmark ARGS="--prompt my-prompt-id" @@ -241,7 +246,7 @@ make benchmark ARGS="--prompt my-prompt-id" ## Cost estimation -Rough estimates based on Kimi K2 pricing (as of writing): +Rough estimates based on DeepSeek V4 Pro pricing (as of writing): | Per run (20 rows target) | Approximate cost | |---|---| diff --git a/benchmarks/prompts.json b/benchmarks/prompts.json index bf11c5e..8b7c7f9 100644 --- a/benchmarks/prompts.json +++ b/benchmarks/prompts.json @@ -4,7 +4,7 @@ "datasetName": "YC Recent Batch Companies", "description": "Companies from recent Y Combinator batches (W24, S24). Include name, website, one-line description, and batch.", "columns": [ - { "name": "company_name", "type": "text", "description": "Company name" }, + { "name": "company_name", "type": "text", "description": "Company name", "isPrimaryKey": true }, { "name": "website", "type": "url", "description": "Company website URL" }, { "name": "description", "type": "text", "description": "One-line description of what the company does" }, { "name": "batch", "type": "text", "description": "YC batch, e.g. W24 or S24" } @@ -15,7 +15,7 @@ "datasetName": "B2B SaaS with Free Tiers", "description": "Popular B2B SaaS tools that offer a free tier or freemium plan. Include tool name, category, free-tier summary, and pricing page URL.", "columns": [ - { "name": "tool_name", "type": "text", "description": "Name of the SaaS tool" }, + { "name": "tool_name", "type": "text", "description": "Name of the SaaS tool", "isPrimaryKey": true }, { "name": "category", "type": "text", "description": "Category, e.g. CRM, Analytics, DevOps" }, { "name": "free_tier_summary", "type": "text", "description": "What the free tier includes" }, { "name": "pricing_page_url", "type": "url", "description": "URL to the pricing page" } @@ -26,7 +26,7 @@ "datasetName": "US National Parks", "description": "All US National Parks with state, year established, and official NPS page URL.", "columns": [ - { "name": "park_name", "type": "text", "description": "Official park name" }, + { "name": "park_name", "type": "text", "description": "Official park name", "isPrimaryKey": true }, { "name": "state", "type": "text", "description": "US state(s) the park is in" }, { "name": "established_year", "type": "number", "description": "Year the park was established" }, { "name": "official_page_url", "type": "url", "description": "URL on nps.gov" } @@ -37,7 +37,7 @@ "datasetName": "University AI Research Labs", "description": "Academic AI research labs at major universities. Include lab name, university, research focus, and lab website URL.", "columns": [ - { "name": "lab_name", "type": "text", "description": "Name of the research lab" }, + { "name": "lab_name", "type": "text", "description": "Name of the research lab", "isPrimaryKey": true }, { "name": "university", "type": "text", "description": "Host university" }, { "name": "research_focus", "type": "text", "description": "Primary research area, e.g. NLP, robotics, RL" }, { "name": "lab_website_url", "type": "url", "description": "Lab homepage URL" } From a6429451d510817b7efc4d55c98f94dc00e93463 Mon Sep 17 00:00:00 2001 From: MMeteorL Date: Wed, 27 May 2026 21:55:30 -0700 Subject: [PATCH 8/8] =?UTF-8?q?Update=20benchmark=20files:=20populateRuns?= =?UTF-8?q?=20=E2=86=92=20runStats?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follows the rename in feature/cost-tracking. Updates the Convex query call and all documentation references in run.mts and README.md. Co-Authored-By: Claude Sonnet 4.6 --- benchmarks/README.md | 16 ++++++++-------- benchmarks/run.mts | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 0b4c43b..f83b047 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -2,7 +2,7 @@ This directory contains the benchmark runner for the BigSet populate workflow. It exercises the **real** populate pipeline — same agent code, same Convex writes, same TinyFish API calls — so the metrics reflect actual agent performance rather than a simulation. -Every populate run (benchmark or real user session) automatically records its stats to the `populateRuns` Convex table. This means you can analyze cost and performance for both benchmark runs and live app usage from the same place. +Every populate run (benchmark or real user session) automatically records its stats to the `runStats` Convex table. This means you can analyze cost and performance for both benchmark runs and live app usage from the same place. --- @@ -158,11 +158,11 @@ The benchmark emits a JSON object to stdout when it finishes. Example: ## Viewing run stats from actual app sessions -Every populate run triggered by a real user through the app UI is also recorded in `populateRuns`. You can query it directly via the Convex dashboard or CLI. +Every populate run triggered by a real user through the app UI is also recorded in `runStats`. You can query it directly via the Convex dashboard or CLI. ### Convex dashboard -Open http://localhost:6791 → select your Convex instance → go to **Data** → **populateRuns**. +Open http://localhost:6791 → select your Convex instance → go to **Data** → **runStats**. Each row corresponds to one populate run. The `isBenchmark` field is `true` for benchmark runs and absent/`undefined` for real user sessions. @@ -173,7 +173,7 @@ All queries below use the internal query functions added alongside the schema. R **List all runs for a specific dataset:** ```bash cd frontend -node ../scripts/with-root-env.mjs npx convex run populateRuns:listByDataset \ +node ../scripts/with-root-env.mjs npx convex run runStats:listByDataset \ --url http://127.0.0.1:3210 \ --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ '{"datasetId": ""}' @@ -182,7 +182,7 @@ node ../scripts/with-root-env.mjs npx convex run populateRuns:listByDataset \ **List all runs for a specific user:** ```bash cd frontend -node ../scripts/with-root-env.mjs npx convex run populateRuns:listByUser \ +node ../scripts/with-root-env.mjs npx convex run runStats:listByUser \ --url http://127.0.0.1:3210 \ --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ '{"userId": ""}' @@ -191,7 +191,7 @@ node ../scripts/with-root-env.mjs npx convex run populateRuns:listByUser \ **Fetch a single run by workflow run ID:** ```bash cd frontend -node ../scripts/with-root-env.mjs npx convex run populateRuns:getByWorkflowRunId \ +node ../scripts/with-root-env.mjs npx convex run runStats:getByWorkflowRunId \ --url http://127.0.0.1:3210 \ --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ '{"workflowRunId": ""}' @@ -209,7 +209,7 @@ To export all benchmark runs for offline analysis: ```bash cd frontend -node ../scripts/with-root-env.mjs npx convex run populateRuns:listByUser \ +node ../scripts/with-root-env.mjs npx convex run runStats:listByUser \ --url http://127.0.0.1:3210 \ --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ '{"userId": "benchmark-runner"}' | jq '.' > benchmark-history.json @@ -256,4 +256,4 @@ Rough estimates based on DeepSeek V4 Pro pricing (as of writing): | TinyFish fetch (~20 calls) | ~$0.10 | | **Total per run** | **~$1.20** | -These numbers come from the `tokensInput` / `tokensOutput` fields in `populateRuns`. Actual costs vary by dataset complexity and row count achieved. +These numbers come from the `tokensInput` / `tokensOutput` fields in `runStats`. Actual costs vary by dataset complexity and row count achieved. diff --git a/benchmarks/run.mts b/benchmarks/run.mts index c1758a2..702b18f 100644 --- a/benchmarks/run.mts +++ b/benchmarks/run.mts @@ -4,7 +4,7 @@ * * Calls the real populateWorkflow directly (no HTTP layer) using the * same code path as production app sessions. Metrics are collected by - * the instrumented workflow and written to the populateRuns Convex table. + * the instrumented workflow and written to the runStats Convex table. * After each run the script reads those metrics back and emits a JSON * summary. * @@ -143,7 +143,7 @@ async function deleteBenchmarkDataset(datasetId: string): Promise { } async function fetchRunMetrics(workflowRunId: string): Promise { - return await convex.query(internal.populateRuns.getByWorkflowRunId, { workflowRunId }); + return await convex.query(internal.runStats.getByWorkflowRunId, { workflowRunId }); } // ─── Run a single prompt ─────────────────────────────────────────────────────