diff --git a/backend/src/mastra/agents/populate.ts b/backend/src/mastra/agents/populate.ts index 5fa792b..b2464bb 100644 --- a/backend/src/mastra/agents/populate.ts +++ b/backend/src/mastra/agents/populate.ts @@ -4,6 +4,7 @@ import { buildSubagentTool } from "../tools/investigate-tool.js"; import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; import type { AuthContext } from "../workflows/populate.js"; import type { PopulateColumn } from "../../pipeline/populate.js"; +import type { RunMetrics } from "../run-metrics.js"; const openrouter = createOpenRouter({ apiKey: process.env.OPENROUTER_API_KEY!, @@ -41,12 +42,13 @@ export function buildPopulateAgent( authorizedDatasetId: string, authContext: AuthContext, columns: PopulateColumn[], + metrics?: RunMetrics, ): Agent { return new Agent({ id: "populate-agent", name: "Dataset Populate Orchestrator", instructions: INSTRUCTIONS, - model: openrouter("qwen/qwen3.7-max"), + model: openrouter("deepseek/deepseek-v4-pro"), tools: { search_web: searchWebTool, fetch_page: fetchPageTool, @@ -54,6 +56,7 @@ export function buildPopulateAgent( authorizedDatasetId, authContext, columns, + metrics, ), }, }); diff --git a/backend/src/mastra/run-metrics.ts b/backend/src/mastra/run-metrics.ts new file mode 100644 index 0000000..031dbe4 --- /dev/null +++ b/backend/src/mastra/run-metrics.ts @@ -0,0 +1,66 @@ +/** + * Per-run metrics collector for the populate workflow. + * + * A single RunMetrics instance is created at the start of each agentStep, + * passed by reference into every tool factory and agent builder, and read + * once at the end to write the runStats Convex record. + * + * All operations are synchronous integer increments or field reads — zero + * I/O, zero meaningful overhead on the hot path. + */ + +interface AgentResult { + // Mastra FullOutput: totalUsage sums across all steps; usage is last-step only. + // Prefer totalUsage for accurate multi-step accounting. + totalUsage?: { inputTokens?: number; outputTokens?: number }; + usage?: { inputTokens?: number; outputTokens?: number }; + steps?: unknown[]; +} + +function tokens(result: AgentResult): { input: number; output: number } { + const src = result.totalUsage ?? result.usage; + return { + input: src?.inputTokens ?? 0, + output: src?.outputTokens ?? 0, + }; +} + +export class RunMetrics { + searchCalls = 0; + fetchCalls = 0; + /** run_subagent tool calls dispatched by the orchestrator. */ + investigateCalls = 0; + /** Rows successfully inserted across all investigate subagents. */ + rowsInserted = 0; + + readonly orchestrator = { inputTokens: 0, outputTokens: 0, steps: 0 }; + readonly investigate = { + inputTokens: 0, + outputTokens: 0, + steps: 0, + runs: 0, + }; + + addOrchestratorResult(result: AgentResult): void { + const { input, output } = tokens(result); + this.orchestrator.inputTokens += input; + this.orchestrator.outputTokens += output; + this.orchestrator.steps += result.steps?.length ?? 0; + } + + addInvestigateResult(result: AgentResult): void { + const { input, output } = tokens(result); + this.investigate.inputTokens += input; + this.investigate.outputTokens += output; + this.investigate.steps += result.steps?.length ?? 0; + this.investigate.runs += 1; + } + + totals(): { inputTokens: number; outputTokens: number } { + return { + inputTokens: this.orchestrator.inputTokens + this.investigate.inputTokens, + outputTokens: + this.orchestrator.outputTokens + this.investigate.outputTokens, + }; + } +} diff --git a/backend/src/mastra/save-run-metrics.ts b/backend/src/mastra/save-run-metrics.ts new file mode 100644 index 0000000..e0b771c --- /dev/null +++ b/backend/src/mastra/save-run-metrics.ts @@ -0,0 +1,52 @@ +import { convex, internal } from "../convex.js"; +import type { RunMetrics } from "./run-metrics.js"; + +export interface SaveRunMetricsInput { + workflowRunId: string; + datasetId: string; + userId: string; + startedAt: number; + finishedAt: number; + metrics: RunMetrics; + status: "success" | "error"; + error?: string; + isBenchmark?: boolean; +} + +/** + * Persist a completed run's metrics to the runStats Convex table. + * + * Called from the agentStep finally-block as a fire-and-forget operation — + * any error here is logged but must never propagate to the populate workflow. + */ +export async function saveRunMetrics(input: SaveRunMetricsInput): Promise { + const totals = input.metrics.totals(); + await convex.mutation(internal.runStats.insert, { + workflowRunId: input.workflowRunId, + datasetId: input.datasetId, + userId: input.userId, + startedAt: input.startedAt, + finishedAt: input.finishedAt, + durationMs: input.finishedAt - input.startedAt, + + searchCalls: input.metrics.searchCalls, + fetchCalls: input.metrics.fetchCalls, + investigateCalls: input.metrics.investigateCalls, + rowsInserted: input.metrics.rowsInserted, + + tokensInput: totals.inputTokens, + tokensOutput: totals.outputTokens, + + orchestratorTokensInput: input.metrics.orchestrator.inputTokens, + orchestratorTokensOutput: input.metrics.orchestrator.outputTokens, + orchestratorSteps: input.metrics.orchestrator.steps, + investigateTokensInput: input.metrics.investigate.inputTokens, + investigateTokensOutput: input.metrics.investigate.outputTokens, + investigateSteps: input.metrics.investigate.steps, + investigateRuns: input.metrics.investigate.runs, + + status: input.status, + error: input.error, + isBenchmark: input.isBenchmark, + }); +} diff --git a/backend/src/mastra/tools/investigate-tool.ts b/backend/src/mastra/tools/investigate-tool.ts index 7746ef2..57cb455 100644 --- a/backend/src/mastra/tools/investigate-tool.ts +++ b/backend/src/mastra/tools/investigate-tool.ts @@ -3,6 +3,7 @@ import { z } from "zod"; import { buildInvestigateAgent } from "../agents/investigate.js"; import type { AuthContext } from "../workflows/populate.js"; import type { PopulateColumn } from "../../pipeline/populate.js"; +import type { RunMetrics } from "../run-metrics.js"; const investigateInputSchema = z.object({ entity_hint: z @@ -72,6 +73,7 @@ export function buildSubagentTool( authorizedDatasetId: string, authContext: AuthContext, columns: PopulateColumn[], + metrics?: RunMetrics, ) { return createTool({ id: "run_subagent", @@ -80,6 +82,7 @@ export function buildSubagentTool( inputSchema: investigateInputSchema, outputSchema: investigateOutputSchema, execute: async ({ entity_hint, primary_keys, context, urls, notes }) => { + if (metrics) metrics.investigateCalls++; console.log( `[run_subagent] spawning subagent user=${authContext.authorizedUserId} run=${authContext.workflowRunId} dataset=${authorizedDatasetId} entity="${entity_hint}" pk=${JSON.stringify(primary_keys)}`, ); @@ -110,9 +113,26 @@ Context (partial data already found): ${context}${urlsBlock}${notesBlock}`; const result = await agent.generate(prompt, { maxSteps: 10 }); + if (metrics) { + // Use result.toolCalls (the flat accumulated list across all steps) rather + // than iterating result.steps[n].toolCalls. The per-step arrays are snapshots + // captured at step-finish time; tool-call chunks that arrive after their + // step-finish event end up attributed to the wrong step, causing systematic + // miscounts. result.toolCalls is the authoritative list maintained by Mastra's + // stream processor as chunks arrive. + for (const tc of (result.toolCalls ?? []) as any[]) { + const name = tc.payload?.toolName ?? tc.toolName; + if (name === "search_web") metrics.searchCalls++; + else if (name === "fetch_page") metrics.fetchCalls++; + } + metrics.addInvestigateResult(result); + } + const parsed = parseInvestigateResult(result.text); + if (metrics && parsed.inserted) metrics.rowsInserted++; + console.log( - `[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"}` + + `[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"} toolCalls=${result.toolCalls?.length ?? "?"}` + (parsed.row_summary ? `\n summary: ${parsed.row_summary}` : "") + (parsed.reason ? `\n reason: ${parsed.reason}` : "") + (parsed.clues ? `\n clues: ${parsed.clues}` : ""), diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index 924457d..4e62ff0 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -5,6 +5,8 @@ import { createOpenRouter } from "@openrouter/ai-sdk-provider"; import { datasetContextSchema, populateColumnSchema } from "../../pipeline/populate.js"; import { convex, internal } from "../../convex.js"; import { buildPopulateAgent } from "../agents/populate.js"; +import { RunMetrics } from "../run-metrics.js"; +import { saveRunMetrics } from "../save-run-metrics.js"; /** * Server-set auth/run context threaded through every step. @@ -28,6 +30,7 @@ import { buildPopulateAgent } from "../agents/populate.js"; export const authContextSchema = z.object({ authorizedUserId: z.string().min(1), workflowRunId: z.string().min(1), + isBenchmark: z.boolean().optional(), }); export type AuthContext = z.infer; @@ -208,24 +211,60 @@ For each lead you find, call run_subagent with the primary key values and any co * capability scope; see tools/dataset-tools.ts). So this step does what * Mastra's agent-as-step adapter would do internally: build the agent, * call `.generate(prompt, { maxSteps })`, return the text. + * + * A RunMetrics instance is created here, threaded into every tool factory + * and agent builder, and saved to Convex in the finally block. The save is + * fire-and-forget — errors are logged but never propagate to the workflow. */ const agentStep = createStep({ id: "populate-agent", inputSchema: buildPromptOutputSchema, outputSchema: z.object({ text: z.string() }), execute: async ({ inputData }) => { - const agent = buildPopulateAgent( - inputData.authorizedDatasetId, - inputData.authContext, - inputData.columns, - ); + const metrics = new RunMetrics(); + const startedAt = Date.now(); + let status: "success" | "error" = "success"; + let errorMsg: string | undefined; + try { + const agent = buildPopulateAgent( + inputData.authorizedDatasetId, + inputData.authContext, + inputData.columns, + metrics, + ); const result = await agent.generate(inputData.prompt, { maxSteps: 80 }); + metrics.addOrchestratorResult(result); + // Use result.toolCalls (flat accumulated list) — same reasoning as investigate-tool.ts. + for (const tc of (result.toolCalls ?? []) as any[]) { + const name = tc.payload?.toolName ?? tc.toolName; + if (name === "search_web") metrics.searchCalls++; + else if (name === "fetch_page") metrics.fetchCalls++; + } return { text: result.text }; } catch (err) { - const msg = err instanceof Error ? err.message : String(err); - console.error(`[populate-agent] agent.generate failed: ${msg}`); + status = "error"; + errorMsg = err instanceof Error ? err.message : String(err); + console.error(`[populate-agent] agent.generate failed: ${errorMsg}`); throw err; + } finally { + const finishedAt = Date.now(); + void saveRunMetrics({ + workflowRunId: inputData.authContext.workflowRunId, + datasetId: inputData.authorizedDatasetId, + userId: inputData.authContext.authorizedUserId, + startedAt, + finishedAt, + metrics, + status, + error: errorMsg, + isBenchmark: inputData.authContext.isBenchmark, + }).catch((err) => + console.error( + `[populate-agent] metrics save failed run=${inputData.authContext.workflowRunId}:`, + err, + ), + ); } }, }); diff --git a/benchmarks/README.md b/benchmarks/README.md new file mode 100644 index 0000000..f83b047 --- /dev/null +++ b/benchmarks/README.md @@ -0,0 +1,259 @@ +# BigSet Benchmarks + +This directory contains the benchmark runner for the BigSet populate workflow. It exercises the **real** populate pipeline — same agent code, same Convex writes, same TinyFish API calls — so the metrics reflect actual agent performance rather than a simulation. + +Every populate run (benchmark or real user session) automatically records its stats to the `runStats` Convex table. This means you can analyze cost and performance for both benchmark runs and live app usage from the same place. + +--- + +## How metrics are collected + +When a user clicks "Populate" in the app, or when the benchmark runner triggers a run, the workflow instruments every tool call automatically: + +| Metric | What it counts | +|---|---| +| `searchCalls` | Calls to `search_web` (TinyFish search API) | +| `fetchCalls` | Calls to `fetch_page` (TinyFish fetch API) | +| `investigateCalls` | `run_subagent` dispatches from the orchestrator | +| `rowsInserted` | Rows successfully inserted into the dataset | +| `tokensInput` / `tokensOutput` | Total LLM tokens across all agents | +| `orchestratorTokens*` / `investigateTokens*` | Token breakdown per agent tier | +| `orchestratorSteps` / `investigateSteps` | Agent reasoning steps per tier | +| `investigateRuns` | How many investigate subagents completed | +| `durationMs` | Wall-clock time for the full populate run | + +Each run also records `status` (`success` / `error`), any error message, and an `isBenchmark` flag so you can filter benchmark runs from real sessions. + +> **Note:** The workflow includes an enumeration classification step that calls an LLM directly (not through an agent) to decide whether to use a scraper or search strategy. The tokens used by that step are **not** captured in the metrics above — they're a small fixed cost per run (~100–200 input tokens, ~5 output tokens) but worth knowing if you're doing precise cost accounting. + +--- + +## Prerequisites + +1. The dev stack must be running: `make dev` +2. Your root `.env` must have all required keys: + - `OPENROUTER_API_KEY` — for LLM calls + - `TINYFISH_API_KEY` — for web search and fetch + - `CONVEX_URL` — Convex instance URL (default: `http://127.0.0.1:3210`) + - `CONVEX_SELF_HOSTED_ADMIN_KEY` — to write benchmark datasets and read run records +3. `make convex-push` must have been run after the last schema change (or `make dev` does this automatically on startup) + +--- + +## Running benchmarks + +### Run all prompts + +```bash +make benchmark +``` + +This runs all 4 prompts in `benchmarks/prompts.json` sequentially, prints a JSON summary to stdout, and cleans up the temporary Convex datasets afterward. + +### Run a single prompt + +```bash +make benchmark ARGS="--prompt yc-recent-batch-companies" +``` + +Available prompt IDs (defined in [`prompts.json`](./prompts.json)): + +| ID | Dataset | +|---|---| +| `yc-recent-batch-companies` | YC W24/S24 companies | +| `b2b-saas-free-tier` | B2B SaaS tools with free tiers | +| `us-national-parks` | US National Parks | +| `ai-research-labs` | University AI research labs | + +### Save results to disk + +```bash +make benchmark ARGS="--out benchmark-results/" +``` + +Writes a timestamped JSON file to `benchmark-results/`. Useful for tracking performance across runs over time. + +### Keep the datasets after a run (for inspection) + +```bash +make benchmark ARGS="--no-cleanup" +``` + +The benchmark datasets will remain in your Convex instance under the owner `benchmark-runner`. You can inspect them in the Convex dashboard at http://localhost:6791. + +### Run multiple prompts in parallel + +```bash +make benchmark ARGS="--concurrency 2" +``` + +Runs up to 2 prompts concurrently. Note: parallel runs share TinyFish and OpenRouter rate limits — start with concurrency 1 for a clean baseline. + +### Run directly with Node (no Make) + +```bash +cd backend +npx tsx ../benchmarks/run.mts --prompt us-national-parks --out ../benchmark-results/ +``` + +--- + +## Understanding the JSON output + +The benchmark emits a JSON object to stdout when it finishes. Example: + +```json +{ + "completedAt": "2025-01-15T10:30:00.000Z", + "promptCount": 4, + "successCount": 4, + "failureCount": 0, + "aggregate": { + "rowsInserted": 62, + "searchCalls": 47, + "fetchCalls": 83, + "investigateCalls": 68, + "tokensInput": 1240000, + "tokensOutput": 84000, + "orchestratorSteps": 32, + "investigateSteps": 412, + "investigateRuns": 68, + "durationMs": 1842000 + }, + "perRunAverages": { + "rowsInserted": 15.5, + "searchCalls": 11.8, + "fetchCalls": 20.8, + "investigateCalls": 17.0, + "tokensTotal": 331000, + "durationSeconds": 460.5 + }, + "runs": [ + { + "promptId": "yc-recent-batch-companies", + "workflowRunId": "benchmark-yc-recent-batch-companies-a1b2c3d4", + "status": "success", + "durationMs": 430000, + "metrics": { + "rowsInserted": 18, + "searchCalls": 12, + "fetchCalls": 22, + "investigateCalls": 19, + "tokensInput": 310000, + "tokensOutput": 21000, + "orchestratorTokensInput": 95000, + "orchestratorTokensOutput": 4000, + "orchestratorSteps": 8, + "investigateTokensInput": 215000, + "investigateTokensOutput": 17000, + "investigateSteps": 98, + "investigateRuns": 19 + } + } + ] +} +``` + +--- + +## Viewing run stats from actual app sessions + +Every populate run triggered by a real user through the app UI is also recorded in `runStats`. You can query it directly via the Convex dashboard or CLI. + +### Convex dashboard + +Open http://localhost:6791 → select your Convex instance → go to **Data** → **runStats**. + +Each row corresponds to one populate run. The `isBenchmark` field is `true` for benchmark runs and absent/`undefined` for real user sessions. + +### Convex CLI queries + +All queries below use the internal query functions added alongside the schema. Run them with `npx convex run` from the `frontend/` directory (or via the dashboard's **Functions** tab). + +**List all runs for a specific dataset:** +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run runStats:listByDataset \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"datasetId": ""}' +``` + +**List all runs for a specific user:** +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run runStats:listByUser \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"userId": ""}' +``` + +**Fetch a single run by workflow run ID:** +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run runStats:getByWorkflowRunId \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"workflowRunId": ""}' +``` + +The `workflowRunId` appears in the backend logs when a populate starts: +``` +[populate-agent] populate-agent start +``` +and also in the HTTP response from `POST /populate` as the `runId` field. + +### Exporting all benchmark runs to JSON + +To export all benchmark runs for offline analysis: + +```bash +cd frontend +node ../scripts/with-root-env.mjs npx convex run runStats:listByUser \ + --url http://127.0.0.1:3210 \ + --admin-key "$CONVEX_SELF_HOSTED_ADMIN_KEY" \ + '{"userId": "benchmark-runner"}' | jq '.' > benchmark-history.json +``` + +--- + +## Adding custom prompts + +Edit [`prompts.json`](./prompts.json) to add your own benchmark prompts. Each entry needs: + +```json +{ + "id": "my-prompt-id", + "datasetName": "Human-readable dataset name", + "description": "What the dataset is about — shown to the agent", + "columns": [ + { "name": "entity_name", "type": "text", "description": "The entity name", "isPrimaryKey": true }, + { "name": "other_field", "type": "text", "description": "What this field is" } + ] +} +``` + +Column types: `text`, `number`, `boolean`, `url`, `date`. + +Mark at least one column as `"isPrimaryKey": true` — the orchestrator uses this to tell subagents which field is the unique identifier, and the workflow uses it to reject duplicate rows automatically. + +Then run: +```bash +make benchmark ARGS="--prompt my-prompt-id" +``` + +--- + +## Cost estimation + +Rough estimates based on DeepSeek V4 Pro pricing (as of writing): + +| Per run (20 rows target) | Approximate cost | +|---|---| +| Input tokens (~300k) | ~$0.84 | +| Output tokens (~20k) | ~$0.20 | +| TinyFish search (~12 calls) | ~$0.06 | +| TinyFish fetch (~20 calls) | ~$0.10 | +| **Total per run** | **~$1.20** | + +These numbers come from the `tokensInput` / `tokensOutput` fields in `runStats`. Actual costs vary by dataset complexity and row count achieved. diff --git a/benchmarks/prompts.json b/benchmarks/prompts.json new file mode 100644 index 0000000..8b7c7f9 --- /dev/null +++ b/benchmarks/prompts.json @@ -0,0 +1,46 @@ +[ + { + "id": "yc-recent-batch-companies", + "datasetName": "YC Recent Batch Companies", + "description": "Companies from recent Y Combinator batches (W24, S24). Include name, website, one-line description, and batch.", + "columns": [ + { "name": "company_name", "type": "text", "description": "Company name", "isPrimaryKey": true }, + { "name": "website", "type": "url", "description": "Company website URL" }, + { "name": "description", "type": "text", "description": "One-line description of what the company does" }, + { "name": "batch", "type": "text", "description": "YC batch, e.g. W24 or S24" } + ] + }, + { + "id": "b2b-saas-free-tier", + "datasetName": "B2B SaaS with Free Tiers", + "description": "Popular B2B SaaS tools that offer a free tier or freemium plan. Include tool name, category, free-tier summary, and pricing page URL.", + "columns": [ + { "name": "tool_name", "type": "text", "description": "Name of the SaaS tool", "isPrimaryKey": true }, + { "name": "category", "type": "text", "description": "Category, e.g. CRM, Analytics, DevOps" }, + { "name": "free_tier_summary", "type": "text", "description": "What the free tier includes" }, + { "name": "pricing_page_url", "type": "url", "description": "URL to the pricing page" } + ] + }, + { + "id": "us-national-parks", + "datasetName": "US National Parks", + "description": "All US National Parks with state, year established, and official NPS page URL.", + "columns": [ + { "name": "park_name", "type": "text", "description": "Official park name", "isPrimaryKey": true }, + { "name": "state", "type": "text", "description": "US state(s) the park is in" }, + { "name": "established_year", "type": "number", "description": "Year the park was established" }, + { "name": "official_page_url", "type": "url", "description": "URL on nps.gov" } + ] + }, + { + "id": "ai-research-labs", + "datasetName": "University AI Research Labs", + "description": "Academic AI research labs at major universities. Include lab name, university, research focus, and lab website URL.", + "columns": [ + { "name": "lab_name", "type": "text", "description": "Name of the research lab", "isPrimaryKey": true }, + { "name": "university", "type": "text", "description": "Host university" }, + { "name": "research_focus", "type": "text", "description": "Primary research area, e.g. NLP, robotics, RL" }, + { "name": "lab_website_url", "type": "url", "description": "Lab homepage URL" } + ] + } +] diff --git a/benchmarks/run.mts b/benchmarks/run.mts new file mode 100644 index 0000000..702b18f --- /dev/null +++ b/benchmarks/run.mts @@ -0,0 +1,373 @@ +#!/usr/bin/env node +/** + * Benchmark runner for the BigSet populate workflow. + * + * Calls the real populateWorkflow directly (no HTTP layer) using the + * same code path as production app sessions. Metrics are collected by + * the instrumented workflow and written to the runStats Convex table. + * After each run the script reads those metrics back and emits a JSON + * summary. + * + * Usage (from repo root): + * node --import tsx/esm benchmarks/run.mts [options] + * + * Options: + * --prompt Run only the prompt with this id (repeatable) + * --prompts Path to prompts JSON (default: benchmarks/prompts.json) + * --out Write per-run JSON artifacts to this directory + * --no-cleanup Keep Convex datasets after benchmark (default: delete them) + * --concurrency Max parallel prompt runs (default: 1) + */ + +import { readFile, mkdir, writeFile } from "node:fs/promises"; +import { existsSync, readFileSync } from "node:fs"; +import { resolve, dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { randomUUID } from "node:crypto"; + +// ─── Load root .env before importing backend modules ──────────────────────── + +const repoRoot = resolve(dirname(fileURLToPath(import.meta.url)), ".."); +const rootEnvPath = join(repoRoot, ".env"); + +if (existsSync(rootEnvPath)) { + for (const line of readFileSync(rootEnvPath, "utf8").split(/\r?\n/)) { + const trimmed = line.trim(); + if (!trimmed || trimmed.startsWith("#")) continue; + const sep = trimmed.indexOf("="); + if (sep <= 0) continue; + const key = trimmed.slice(0, sep).trim(); + let val = trimmed.slice(sep + 1).trim(); + if ((val.startsWith('"') && val.endsWith('"')) || (val.startsWith("'") && val.endsWith("'"))) + val = val.slice(1, -1); + process.env[key] ??= val; + } +} + +// ─── Imports (after env is loaded) ────────────────────────────────────────── + +// @ts-ignore — importing from sibling package; resolved at runtime via Node module resolution +import { populateWorkflow } from "../backend/src/mastra/workflows/populate.js"; +// @ts-ignore +import { convex, internal } from "../backend/src/convex.js"; + +// ─── Types ─────────────────────────────────────────────────────────────────── + +interface PromptDefinition { + id: string; + datasetName: string; + description: string; + columns: Array<{ + name: string; + type: "text" | "number" | "boolean" | "url" | "date"; + description?: string; + }>; +} + +interface RunResult { + promptId: string; + datasetName: string; + workflowRunId: string; + datasetId: string; + status: "success" | "error" | "timeout"; + error?: string; + metrics?: PopulateRunRecord; + durationMs: number; +} + +interface PopulateRunRecord { + workflowRunId: string; + datasetId: string; + userId: string; + startedAt: number; + finishedAt: number; + durationMs: number; + searchCalls: number; + fetchCalls: number; + investigateCalls: number; + rowsInserted: number; + tokensInput: number; + tokensOutput: number; + orchestratorTokensInput: number; + orchestratorTokensOutput: number; + orchestratorSteps: number; + investigateTokensInput: number; + investigateTokensOutput: number; + investigateSteps: number; + investigateRuns: number; + status: "success" | "error"; + error?: string; + isBenchmark?: boolean; +} + +// ─── CLI args ──────────────────────────────────────────────────────────────── + +function parseArgs(argv: string[]) { + const result = { + promptIds: [] as string[], + promptsFile: join(dirname(fileURLToPath(import.meta.url)), "prompts.json"), + outDir: null as string | null, + cleanup: true, + concurrency: 1, + }; + for (let i = 0; i < argv.length; i++) { + const arg = argv[i]; + if (arg === "--prompt") result.promptIds.push(argv[++i]); + else if (arg === "--prompts") result.promptsFile = argv[++i]; + else if (arg === "--out") result.outDir = argv[++i]; + else if (arg === "--no-cleanup") result.cleanup = false; + else if (arg === "--concurrency") result.concurrency = parseInt(argv[++i], 10); + } + return result; +} + +// ─── Convex helpers ────────────────────────────────────────────────────────── + +async function createBenchmarkDataset( + name: string, + description: string, + columns: PromptDefinition["columns"], +): Promise { + return await convex.mutation(internal.datasets.createInternal, { + name, + description, + columns, + ownerId: "benchmark-runner", + cadence: "manual", + visibility: "private", + }); +} + +async function deleteBenchmarkDataset(datasetId: string): Promise { + await convex.mutation(internal.datasets.deleteInternal, { id: datasetId }); +} + +async function fetchRunMetrics(workflowRunId: string): Promise { + return await convex.query(internal.runStats.getByWorkflowRunId, { workflowRunId }); +} + +// ─── Run a single prompt ───────────────────────────────────────────────────── + +async function runPrompt( + prompt: PromptDefinition, + config: ReturnType, +): Promise { + const wallStart = Date.now(); + const workflowRunId = `benchmark-${prompt.id}-${randomUUID().slice(0, 8)}`; + let datasetId = ""; + + console.error(`\n[benchmark] ▶ ${prompt.id}`); + + try { + datasetId = await createBenchmarkDataset( + `[benchmark] ${prompt.datasetName}`, + prompt.description, + prompt.columns, + ); + console.error(`[benchmark] created dataset ${datasetId}`); + + const run = await populateWorkflow.createRun({ runId: workflowRunId }); + const result = await run.start({ + inputData: { + datasetId, + datasetName: prompt.datasetName, + description: prompt.description, + columns: prompt.columns, + authContext: { + authorizedUserId: "benchmark-runner", + workflowRunId, + isBenchmark: true, + }, + }, + }); + + const wallDuration = Date.now() - wallStart; + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + // Give the fire-and-forget metrics save a moment to land in Convex + await sleep(2000); + const metrics = await fetchRunMetrics(workflowRunId); + + console.error( + `[benchmark] ✓ ${prompt.id} rows=${metrics?.rowsInserted ?? "?"} ` + + `tokens=${(metrics?.tokensInput ?? 0) + (metrics?.tokensOutput ?? 0)} ` + + `duration=${(wallDuration / 1000).toFixed(1)}s`, + ); + + return { + promptId: prompt.id, + datasetName: prompt.datasetName, + workflowRunId, + datasetId, + status: "success", + metrics: metrics ?? undefined, + durationMs: wallDuration, + }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[benchmark] ✗ ${prompt.id} error: ${msg}`); + + // Still try to fetch any partial metrics that landed + await sleep(1000); + const metrics = await fetchRunMetrics(workflowRunId).catch(() => null); + + return { + promptId: prompt.id, + datasetName: prompt.datasetName, + workflowRunId, + datasetId, + status: "error", + error: msg, + metrics: metrics ?? undefined, + durationMs: Date.now() - wallStart, + }; + } finally { + if (config.cleanup && datasetId) { + await deleteBenchmarkDataset(datasetId).catch((err) => + console.error(`[benchmark] cleanup failed for ${datasetId}:`, err), + ); + } + } +} + +// ─── Aggregate summary ─────────────────────────────────────────────────────── + +function buildSummary(results: RunResult[]) { + const successful = results.filter((r) => r.status === "success" && r.metrics); + const failed = results.filter((r) => r.status !== "success"); + + const totals = successful.reduce( + (acc, r) => { + const m = r.metrics!; + acc.rowsInserted += m.rowsInserted; + acc.searchCalls += m.searchCalls; + acc.fetchCalls += m.fetchCalls; + acc.investigateCalls += m.investigateCalls; + acc.tokensInput += m.tokensInput; + acc.tokensOutput += m.tokensOutput; + acc.orchestratorSteps += m.orchestratorSteps; + acc.investigateSteps += m.investigateSteps; + acc.investigateRuns += m.investigateRuns; + acc.durationMs += r.durationMs; + return acc; + }, + { + rowsInserted: 0, + searchCalls: 0, + fetchCalls: 0, + investigateCalls: 0, + tokensInput: 0, + tokensOutput: 0, + orchestratorSteps: 0, + investigateSteps: 0, + investigateRuns: 0, + durationMs: 0, + }, + ); + + const n = successful.length || 1; + + return { + completedAt: new Date().toISOString(), + promptCount: results.length, + successCount: successful.length, + failureCount: failed.length, + aggregate: totals, + perRunAverages: { + rowsInserted: +(totals.rowsInserted / n).toFixed(1), + searchCalls: +(totals.searchCalls / n).toFixed(1), + fetchCalls: +(totals.fetchCalls / n).toFixed(1), + investigateCalls: +(totals.investigateCalls / n).toFixed(1), + tokensTotal: +((totals.tokensInput + totals.tokensOutput) / n).toFixed(0), + durationSeconds: +((totals.durationMs / n / 1000).toFixed(1)), + }, + runs: results.map((r) => ({ + promptId: r.promptId, + datasetName: r.datasetName, + workflowRunId: r.workflowRunId, + status: r.status, + error: r.error, + durationMs: r.durationMs, + metrics: r.metrics + ? { + rowsInserted: r.metrics.rowsInserted, + searchCalls: r.metrics.searchCalls, + fetchCalls: r.metrics.fetchCalls, + investigateCalls: r.metrics.investigateCalls, + tokensInput: r.metrics.tokensInput, + tokensOutput: r.metrics.tokensOutput, + orchestratorTokensInput: r.metrics.orchestratorTokensInput, + orchestratorTokensOutput: r.metrics.orchestratorTokensOutput, + orchestratorSteps: r.metrics.orchestratorSteps, + investigateTokensInput: r.metrics.investigateTokensInput, + investigateTokensOutput: r.metrics.investigateTokensOutput, + investigateSteps: r.metrics.investigateSteps, + investigateRuns: r.metrics.investigateRuns, + } + : null, + })), + }; +} + +// ─── Main ──────────────────────────────────────────────────────────────────── + +async function main() { + const config = parseArgs(process.argv.slice(2)); + + // Validate required env vars + const missing = ["OPENROUTER_API_KEY", "TINYFISH_API_KEY", "CONVEX_URL", "CONVEX_SELF_HOSTED_ADMIN_KEY"].filter( + (k) => !process.env[k], + ); + if (missing.length) { + console.error(`[benchmark] Missing required env vars: ${missing.join(", ")}`); + console.error("Copy .env.example to .env and fill in the values, then re-run."); + process.exit(1); + } + + const allPrompts: PromptDefinition[] = JSON.parse( + await readFile(config.promptsFile, "utf8"), + ); + const prompts = + config.promptIds.length > 0 + ? allPrompts.filter((p) => config.promptIds.includes(p.id)) + : allPrompts; + + if (prompts.length === 0) { + console.error(`[benchmark] No matching prompts found.`); + process.exit(1); + } + + console.error(`[benchmark] Running ${prompts.length} prompt(s) (concurrency=${config.concurrency})`); + + const results: RunResult[] = []; + + // Run in batches of config.concurrency + for (let i = 0; i < prompts.length; i += config.concurrency) { + const batch = prompts.slice(i, i + config.concurrency); + const batchResults = await Promise.all(batch.map((p) => runPrompt(p, config))); + results.push(...batchResults); + } + + const summary = buildSummary(results); + + if (config.outDir) { + await mkdir(config.outDir, { recursive: true }); + const outFile = join(config.outDir, `benchmark-${Date.now()}.json`); + await writeFile(outFile, JSON.stringify(summary, null, 2)); + console.error(`[benchmark] Results written to ${outFile}`); + } + + // Emit JSON to stdout for piping / CI capture + console.log(JSON.stringify(summary, null, 2)); +} + +function sleep(ms: number) { + return new Promise((r) => setTimeout(r, ms)); +} + +main().catch((err) => { + console.error("[benchmark] Fatal:", err); + process.exit(1); +}); diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 8637a39..cab4fe3 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -248,6 +248,53 @@ export const remove = mutation({ }, }); +/** + * Create a dataset without Clerk auth. Used exclusively by the benchmark + * runner, which calls populateWorkflow directly (no HTTP layer) and + * therefore has no Clerk identity. The ownerId must be supplied by the + * caller; for benchmark runs pass "benchmark-runner". + */ +export const createInternal = internalMutation({ + args: { + name: v.string(), + description: v.string(), + cadence: v.string(), + columns: v.array(columnValidator), + ownerId: v.string(), + visibility: v.optional(v.union(v.literal("public"), v.literal("private"))), + }, + handler: async (ctx, args) => { + return await ctx.db.insert("datasets", { + name: args.name, + description: args.description, + cadence: args.cadence, + columns: args.columns, + ownerId: args.ownerId, + status: "paused", + visibility: args.visibility ?? "private", + rowCount: 0, + }); + }, +}); + +/** + * Delete a dataset and all its rows without Clerk auth. Used exclusively + * by the benchmark runner for cleanup after a benchmark run. + */ +export const deleteInternal = internalMutation({ + args: { id: v.id("datasets") }, + handler: async (ctx, args) => { + const rows = await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.id)) + .collect(); + for (const row of rows) { + await ctx.db.delete(row._id); + } + await ctx.db.delete(args.id); + }, +}); + /** * One-shot migration: scan every dataset, count its rows, and patch * `rowCount` to the true value. Idempotent and safe to re-run. diff --git a/frontend/convex/runStats.ts b/frontend/convex/runStats.ts new file mode 100644 index 0000000..9886db2 --- /dev/null +++ b/frontend/convex/runStats.ts @@ -0,0 +1,92 @@ +import { internalMutation, internalQuery } from "./_generated/server.js"; +import { v } from "convex/values"; + +/** + * Insert a populate-run metrics record. + * + * Called by the backend agent runner at the end of every populate workflow + * run (success or error). Never called from the browser. + */ +export const insert = internalMutation({ + args: { + workflowRunId: v.string(), + datasetId: v.string(), + userId: v.string(), + startedAt: v.number(), + finishedAt: v.number(), + durationMs: v.number(), + + searchCalls: v.number(), + fetchCalls: v.number(), + investigateCalls: v.number(), + rowsInserted: v.number(), + + tokensInput: v.number(), + tokensOutput: v.number(), + + orchestratorTokensInput: v.number(), + orchestratorTokensOutput: v.number(), + orchestratorSteps: v.number(), + investigateTokensInput: v.number(), + investigateTokensOutput: v.number(), + investigateSteps: v.number(), + investigateRuns: v.number(), + + status: v.union(v.literal("success"), v.literal("error")), + error: v.optional(v.string()), + isBenchmark: v.optional(v.boolean()), + }, + handler: async (ctx, args) => { + if (args.status === "success" && args.error) { + throw new Error("runStats.insert: error must be absent on a successful run"); + } + if (args.status === "error" && !args.error) { + throw new Error("runStats.insert: error message is required on a failed run"); + } + await ctx.db.insert("runStats", args); + }, +}); + +/** + * Fetch a single run by its workflowRunId. Used by the benchmark runner to + * retrieve metrics after a workflow completes. + */ +export const getByWorkflowRunId = internalQuery({ + args: { workflowRunId: v.string() }, + handler: async (ctx, args) => { + return await ctx.db + .query("runStats") + .withIndex("by_workflow_run", (q) => + q.eq("workflowRunId", args.workflowRunId), + ) + .first(); + }, +}); + +/** + * List all runs for a dataset, newest first. + */ +export const listByDataset = internalQuery({ + args: { datasetId: v.string() }, + handler: async (ctx, args) => { + const runs = await ctx.db + .query("runStats") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + return runs.sort((a, b) => b.startedAt - a.startedAt); + }, +}); + +/** + * List all runs for a user, newest first. + */ +export const listByUser = internalQuery({ + args: { userId: v.string() }, + handler: async (ctx, args) => { + const runs = await ctx.db + .query("runStats") + .withIndex("by_user", (q) => q.eq("userId", args.userId)) + .collect(); + return runs.sort((a, b) => b.startedAt - a.startedAt); + }, +}); diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index 11e635a..aa35a5d 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -99,4 +99,47 @@ export default defineSchema({ // "before current period", which forces a reset on next write. periodStart: v.optional(v.number()), }).index("by_user", ["userId"]), + + // One row per populate workflow run. Written once at the end of each run + // (success or error) by the backend agent runner — never by the frontend. + // Tracks tool-call counts, token usage, and timing so runs can be + // compared across datasets, users, and benchmark sessions. + runStats: defineTable({ + workflowRunId: v.string(), + // v.string() (not v.id) so benchmark runs can use synthetic dataset ids + // without needing a real Convex dataset document. + datasetId: v.string(), + userId: v.string(), + startedAt: v.number(), + finishedAt: v.number(), + durationMs: v.number(), + + // Tool-call counts + searchCalls: v.number(), + fetchCalls: v.number(), + investigateCalls: v.number(), + rowsInserted: v.number(), + + // Token usage — totals across all agent invocations in this run + tokensInput: v.number(), + tokensOutput: v.number(), + + // Breakdown by agent tier + orchestratorTokensInput: v.number(), + orchestratorTokensOutput: v.number(), + orchestratorSteps: v.number(), + investigateTokensInput: v.number(), + investigateTokensOutput: v.number(), + investigateSteps: v.number(), + investigateRuns: v.number(), + + status: v.union(v.literal("success"), v.literal("error")), + error: v.optional(v.string()), + + // True when written by the benchmark runner rather than a real user session. + isBenchmark: v.optional(v.boolean()), + }) + .index("by_dataset", ["datasetId"]) + .index("by_user", ["userId"]) + .index("by_workflow_run", ["workflowRunId"]), }); diff --git a/makefiles/Makefile b/makefiles/Makefile index fb1778e..df56046 100644 --- a/makefiles/Makefile +++ b/makefiles/Makefile @@ -1,4 +1,4 @@ -.PHONY: all dev validate-dev-env down clean convex-push convex-env seed-public-datasets +.PHONY: all dev validate-dev-env down clean convex-push convex-env seed-public-datasets benchmark all: dev @@ -70,6 +70,13 @@ seed-public-datasets: @test -f .env || { echo "Error: .env not found. Run: cp .env.example .env"; exit 1; } @cd frontend && node ../scripts/with-root-env.mjs npx convex run publicSeed:seedPublicDatasets +## Run the benchmark suite against the real populate workflow. +## Requires: make dev running, .env filled in. +## Pass ARGS to forward CLI flags, e.g.: +## make benchmark ARGS="--prompt yc-recent-batch-companies --no-cleanup" +benchmark: + cd backend && npx tsx ../benchmarks/run.mts $(ARGS) + down: docker compose -f docker-compose.dev.yml down