Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion backend/src/mastra/agents/populate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { buildSubagentTool } from "../tools/investigate-tool.js";
import { searchWebTool, fetchPageTool } from "../tools/web-tools.js";
import type { AuthContext } from "../workflows/populate.js";
import type { PopulateColumn } from "../../pipeline/populate.js";
import type { RunMetrics } from "../run-metrics.js";

const openrouter = createOpenRouter({
apiKey: process.env.OPENROUTER_API_KEY!,
Expand Down Expand Up @@ -41,19 +42,21 @@ export function buildPopulateAgent(
authorizedDatasetId: string,
authContext: AuthContext,
columns: PopulateColumn[],
metrics?: RunMetrics,
): Agent {
return new Agent({
id: "populate-agent",
name: "Dataset Populate Orchestrator",
instructions: INSTRUCTIONS,
model: openrouter("qwen/qwen3.7-max"),
model: openrouter("deepseek/deepseek-v4-pro"),
tools: {
search_web: searchWebTool,
fetch_page: fetchPageTool,
run_subagent: buildSubagentTool(
authorizedDatasetId,
authContext,
columns,
metrics,
),
},
});
Expand Down
66 changes: 66 additions & 0 deletions backend/src/mastra/run-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Per-run metrics collector for the populate workflow.
*
* A single RunMetrics instance is created at the start of each agentStep,
* passed by reference into every tool factory and agent builder, and read
* once at the end to write the runStats Convex record.
*
* All operations are synchronous integer increments or field reads — zero
* I/O, zero meaningful overhead on the hot path.
*/

interface AgentResult {
// Mastra FullOutput: totalUsage sums across all steps; usage is last-step only.
// Prefer totalUsage for accurate multi-step accounting.
totalUsage?: { inputTokens?: number; outputTokens?: number };
usage?: { inputTokens?: number; outputTokens?: number };
steps?: unknown[];
}

function tokens(result: AgentResult): { input: number; output: number } {
const src = result.totalUsage ?? result.usage;
return {
input: src?.inputTokens ?? 0,
output: src?.outputTokens ?? 0,
};
}

export class RunMetrics {
searchCalls = 0;
fetchCalls = 0;
/** run_subagent tool calls dispatched by the orchestrator. */
investigateCalls = 0;
/** Rows successfully inserted across all investigate subagents. */
rowsInserted = 0;

readonly orchestrator = { inputTokens: 0, outputTokens: 0, steps: 0 };
readonly investigate = {
inputTokens: 0,
outputTokens: 0,
steps: 0,
runs: 0,
};

addOrchestratorResult(result: AgentResult): void {
const { input, output } = tokens(result);
this.orchestrator.inputTokens += input;
this.orchestrator.outputTokens += output;
this.orchestrator.steps += result.steps?.length ?? 0;
}

addInvestigateResult(result: AgentResult): void {
const { input, output } = tokens(result);
this.investigate.inputTokens += input;
this.investigate.outputTokens += output;
this.investigate.steps += result.steps?.length ?? 0;
this.investigate.runs += 1;
}

totals(): { inputTokens: number; outputTokens: number } {
return {
inputTokens: this.orchestrator.inputTokens + this.investigate.inputTokens,
outputTokens:
this.orchestrator.outputTokens + this.investigate.outputTokens,
};
}
}
52 changes: 52 additions & 0 deletions backend/src/mastra/save-run-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { convex, internal } from "../convex.js";
import type { RunMetrics } from "./run-metrics.js";

export interface SaveRunMetricsInput {
workflowRunId: string;
datasetId: string;
userId: string;
startedAt: number;
finishedAt: number;
metrics: RunMetrics;
status: "success" | "error";
error?: string;
isBenchmark?: boolean;
}

/**
* Persist a completed run's metrics to the runStats Convex table.
*
* Called from the agentStep finally-block as a fire-and-forget operation —
* any error here is logged but must never propagate to the populate workflow.
*/
export async function saveRunMetrics(input: SaveRunMetricsInput): Promise<void> {
const totals = input.metrics.totals();
await convex.mutation(internal.runStats.insert, {
workflowRunId: input.workflowRunId,
datasetId: input.datasetId,
userId: input.userId,
startedAt: input.startedAt,
finishedAt: input.finishedAt,
durationMs: input.finishedAt - input.startedAt,

searchCalls: input.metrics.searchCalls,
fetchCalls: input.metrics.fetchCalls,
investigateCalls: input.metrics.investigateCalls,
rowsInserted: input.metrics.rowsInserted,

tokensInput: totals.inputTokens,
tokensOutput: totals.outputTokens,

orchestratorTokensInput: input.metrics.orchestrator.inputTokens,
orchestratorTokensOutput: input.metrics.orchestrator.outputTokens,
orchestratorSteps: input.metrics.orchestrator.steps,
investigateTokensInput: input.metrics.investigate.inputTokens,
investigateTokensOutput: input.metrics.investigate.outputTokens,
investigateSteps: input.metrics.investigate.steps,
investigateRuns: input.metrics.investigate.runs,

status: input.status,
error: input.error,
isBenchmark: input.isBenchmark,
});
}
22 changes: 21 additions & 1 deletion backend/src/mastra/tools/investigate-tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { z } from "zod";
import { buildInvestigateAgent } from "../agents/investigate.js";
import type { AuthContext } from "../workflows/populate.js";
import type { PopulateColumn } from "../../pipeline/populate.js";
import type { RunMetrics } from "../run-metrics.js";

const investigateInputSchema = z.object({
entity_hint: z
Expand Down Expand Up @@ -72,6 +73,7 @@ export function buildSubagentTool(
authorizedDatasetId: string,
authContext: AuthContext,
columns: PopulateColumn[],
metrics?: RunMetrics,
) {
return createTool({
id: "run_subagent",
Expand All @@ -80,6 +82,7 @@ export function buildSubagentTool(
inputSchema: investigateInputSchema,
outputSchema: investigateOutputSchema,
execute: async ({ entity_hint, primary_keys, context, urls, notes }) => {
if (metrics) metrics.investigateCalls++;
console.log(
`[run_subagent] spawning subagent user=${authContext.authorizedUserId} run=${authContext.workflowRunId} dataset=${authorizedDatasetId} entity="${entity_hint}" pk=${JSON.stringify(primary_keys)}`,
);
Expand Down Expand Up @@ -110,9 +113,26 @@ Context (partial data already found):
${context}${urlsBlock}${notesBlock}`;

const result = await agent.generate(prompt, { maxSteps: 10 });
if (metrics) {
// Use result.toolCalls (the flat accumulated list across all steps) rather
// than iterating result.steps[n].toolCalls. The per-step arrays are snapshots
// captured at step-finish time; tool-call chunks that arrive after their
// step-finish event end up attributed to the wrong step, causing systematic
// miscounts. result.toolCalls is the authoritative list maintained by Mastra's
// stream processor as chunks arrive.
for (const tc of (result.toolCalls ?? []) as any[]) {
const name = tc.payload?.toolName ?? tc.toolName;
if (name === "search_web") metrics.searchCalls++;
else if (name === "fetch_page") metrics.fetchCalls++;
}
metrics.addInvestigateResult(result);
}

const parsed = parseInvestigateResult(result.text);
if (metrics && parsed.inserted) metrics.rowsInserted++;

console.log(
`[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"}` +
`[run_subagent] done entity="${entity_hint}" inserted=${parsed.inserted} steps=${result.steps?.length ?? "?"} toolCalls=${result.toolCalls?.length ?? "?"}` +
(parsed.row_summary ? `\n summary: ${parsed.row_summary}` : "") +
(parsed.reason ? `\n reason: ${parsed.reason}` : "") +
(parsed.clues ? `\n clues: ${parsed.clues}` : ""),
Expand Down
53 changes: 46 additions & 7 deletions backend/src/mastra/workflows/populate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { createOpenRouter } from "@openrouter/ai-sdk-provider";
import { datasetContextSchema, populateColumnSchema } from "../../pipeline/populate.js";
import { convex, internal } from "../../convex.js";
import { buildPopulateAgent } from "../agents/populate.js";
import { RunMetrics } from "../run-metrics.js";
import { saveRunMetrics } from "../save-run-metrics.js";

/**
* Server-set auth/run context threaded through every step.
Expand All @@ -28,6 +30,7 @@ import { buildPopulateAgent } from "../agents/populate.js";
export const authContextSchema = z.object({
authorizedUserId: z.string().min(1),
workflowRunId: z.string().min(1),
isBenchmark: z.boolean().optional(),
});
export type AuthContext = z.infer<typeof authContextSchema>;

Expand Down Expand Up @@ -208,24 +211,60 @@ For each lead you find, call run_subagent with the primary key values and any co
* capability scope; see tools/dataset-tools.ts). So this step does what
* Mastra's agent-as-step adapter would do internally: build the agent,
* call `.generate(prompt, { maxSteps })`, return the text.
*
* A RunMetrics instance is created here, threaded into every tool factory
* and agent builder, and saved to Convex in the finally block. The save is
* fire-and-forget — errors are logged but never propagate to the workflow.
*/
const agentStep = createStep({
id: "populate-agent",
inputSchema: buildPromptOutputSchema,
outputSchema: z.object({ text: z.string() }),
execute: async ({ inputData }) => {
const agent = buildPopulateAgent(
inputData.authorizedDatasetId,
inputData.authContext,
inputData.columns,
);
const metrics = new RunMetrics();
const startedAt = Date.now();
let status: "success" | "error" = "success";
let errorMsg: string | undefined;

try {
const agent = buildPopulateAgent(
inputData.authorizedDatasetId,
inputData.authContext,
inputData.columns,
metrics,
);
const result = await agent.generate(inputData.prompt, { maxSteps: 80 });
metrics.addOrchestratorResult(result);
// Use result.toolCalls (flat accumulated list) — same reasoning as investigate-tool.ts.
for (const tc of (result.toolCalls ?? []) as any[]) {
const name = tc.payload?.toolName ?? tc.toolName;
if (name === "search_web") metrics.searchCalls++;
else if (name === "fetch_page") metrics.fetchCalls++;
}
return { text: result.text };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[populate-agent] agent.generate failed: ${msg}`);
status = "error";
errorMsg = err instanceof Error ? err.message : String(err);
console.error(`[populate-agent] agent.generate failed: ${errorMsg}`);
throw err;
} finally {
const finishedAt = Date.now();
void saveRunMetrics({
workflowRunId: inputData.authContext.workflowRunId,
datasetId: inputData.authorizedDatasetId,
userId: inputData.authContext.authorizedUserId,
startedAt,
finishedAt,
metrics,
status,
error: errorMsg,
isBenchmark: inputData.authContext.isBenchmark,
}).catch((err) =>
console.error(
`[populate-agent] metrics save failed run=${inputData.authContext.workflowRunId}:`,
err,
),
);
}
},
});
Expand Down
Loading