diff --git a/backend/src/mastra/agents/investigate.ts b/backend/src/mastra/agents/investigate.ts index 4cdc32e..3e0d1b8 100644 --- a/backend/src/mastra/agents/investigate.ts +++ b/backend/src/mastra/agents/investigate.ts @@ -1,5 +1,6 @@ import { Agent } from "@mastra/core/agent"; import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { wrapModelWithTokenLimit } from "../model-wrapper.js"; import { buildPopulateTools } from "../tools/dataset-tools.js"; import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; import type { AuthContext } from "../workflows/populate.js"; @@ -28,18 +29,19 @@ RULES: - You have at most 6 tool calls total. Budget them: 1 fetch + 1 search + 1 fetch + 1 insert = done. - ALWAYS insert a row, even if some fields are incomplete. Use "" for unknown fields. Partial real data is better than no row. - Never fabricate values. Use "" for anything you cannot verify. +- For every field value you extract and fill in "data", you MUST record the cell-level provenance (the source URL, the search query used to find it, and the exact text snippet context showing the value) in the "provenance" parameter of insert_row/update_row. - insert_row rejects duplicates based on primary key columns. If you get a "Duplicate" error, do NOT retry — report INSERTED: false and move on. TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces: search_web: {"query": "your search terms"} fetch_page: {"url": "https://example.com"} - insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"} + insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "provenance": {${columnNames.map((n) => `"${n}": {"url": "https://url-you-fetched.com", "query": "search query used", "snippet": "exact context snippet from page"}`).join(", ")}}, "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"} WORKFLOW: 1. Fetch 1-2 of the provided URLs to get real data (if URLs were given). 2. If you need more, run ONE search and fetch the best result. 3. Call insert_row with whatever real data you have. Use "" for missing fields. - Include "sources" (URLs you fetched), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "", 2. Look for the pricing field, and title name field, 3. etc...) + Include "sources" (URLs you fetched), "provenance" (mapping of column names to their detailed source details), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "", 2. Look for the pricing field, and title name field, 3. etc...) 4. Write your final response: INSERTED: true/false SUMMARY: one line @@ -70,7 +72,7 @@ export function buildInvestigateAgent( id: "investigate-agent", name: "Dataset Investigate Agent", instructions: buildInvestigateInstructions(columns), - model: openrouter(modelSlug), + model: wrapModelWithTokenLimit(openrouter(modelSlug)), tools: { insert_row, diff --git a/backend/src/mastra/agents/populate.ts b/backend/src/mastra/agents/populate.ts index 155492a..682489c 100644 --- a/backend/src/mastra/agents/populate.ts +++ b/backend/src/mastra/agents/populate.ts @@ -1,5 +1,6 @@ import { Agent } from "@mastra/core/agent"; import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { wrapModelWithTokenLimit } from "../model-wrapper.js"; import { buildSubagentTool } from "../tools/investigate-tool.js"; import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; import type { AuthContext } from "../workflows/populate.js"; @@ -53,7 +54,7 @@ export function buildPopulateAgent( id: "populate-agent", name: "Dataset Populate Orchestrator", instructions: buildInstructions(maxRowCount), - model: openrouter(modelSlug), + model: wrapModelWithTokenLimit(openrouter(modelSlug)), tools: { search_web: searchWebTool, fetch_page: fetchPageTool, diff --git a/backend/src/mastra/agents/refresh.ts b/backend/src/mastra/agents/refresh.ts index 2215686..fad49bc 100644 --- a/backend/src/mastra/agents/refresh.ts +++ b/backend/src/mastra/agents/refresh.ts @@ -1,5 +1,6 @@ import { Agent } from "@mastra/core/agent"; import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { wrapModelWithTokenLimit } from "../model-wrapper.js"; import { buildPopulateTools } from "../tools/dataset-tools.js"; import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; import type { AuthContext } from "../workflows/populate.js"; @@ -64,7 +65,7 @@ export function buildRefreshAgent( id: "refresh-agent", name: "Dataset Refresh Agent", instructions: buildRefreshInstructions(columns), - model: openrouter("qwen/qwen3.7-max"), + model: wrapModelWithTokenLimit(openrouter("qwen/qwen3.7-max")), tools: { update_row, search_web: searchWebTool, diff --git a/backend/src/mastra/model-wrapper.test.ts b/backend/src/mastra/model-wrapper.test.ts new file mode 100644 index 0000000..ebb810a --- /dev/null +++ b/backend/src/mastra/model-wrapper.test.ts @@ -0,0 +1,62 @@ +import test from "node:test"; +import assert from "node:assert"; +import { wrapModelWithTokenLimit } from "./model-wrapper.js"; + +test("wrapModelWithTokenLimit - doGenerate intercepts and caps maxTokens", async () => { + let receivedOptions: any = null; + + const mockModel: any = { + provider: "test-provider", + modelId: "test-model", + doGenerate: async (options: any) => { + receivedOptions = options; + return { text: "mock response" }; + }, + doStream: async (options: any) => { + receivedOptions = options; + return { stream: "mock stream" }; + }, + }; + + const wrapped = wrapModelWithTokenLimit(mockModel, 4096); + + // 1. Default maxTokens when not provided + await wrapped.doGenerate({ prompt: "hello" }); + assert.strictEqual(receivedOptions.maxTokens, 4096); + + // 2. Cap maxTokens when it exceeds the limit + await wrapped.doGenerate({ prompt: "hello", maxTokens: 99999 }); + assert.strictEqual(receivedOptions.maxTokens, 4096); + + // 3. Keep maxTokens when it is below the limit + await wrapped.doGenerate({ prompt: "hello", maxTokens: 1000 }); + assert.strictEqual(receivedOptions.maxTokens, 1000); + + // 4. Test doStream default + await wrapped.doStream({ prompt: "hello" }); + assert.strictEqual(receivedOptions.maxTokens, 4096); + + // 5. Test doStream cap + await wrapped.doStream({ prompt: "hello", maxTokens: 99999 }); + assert.strictEqual(receivedOptions.maxTokens, 4096); + + // 6. Test doStream keep below limit + await wrapped.doStream({ prompt: "hello", maxTokens: 1000 }); + assert.strictEqual(receivedOptions.maxTokens, 1000); +}); + +test("wrapModelWithTokenLimit - forwards properties and binds functions", () => { + const mockModel: any = { + provider: "test-provider", + modelId: "test-model", + someFunc() { + return this.provider; + }, + }; + + const wrapped = wrapModelWithTokenLimit(mockModel, 4096); + + assert.strictEqual(wrapped.provider, "test-provider"); + assert.strictEqual(wrapped.modelId, "test-model"); + assert.strictEqual(wrapped.someFunc(), "test-provider"); +}); diff --git a/backend/src/mastra/model-wrapper.ts b/backend/src/mastra/model-wrapper.ts new file mode 100644 index 0000000..b01e2d3 --- /dev/null +++ b/backend/src/mastra/model-wrapper.ts @@ -0,0 +1,40 @@ +/** + * Wraps a LanguageModel with a Proxy to cap or default the maxTokens parameter. + * This prevents OpenRouter 402 errors due to requesting the default 65535 maxTokens. + */ +export function wrapModelWithTokenLimit( + model: any, + maxTokensLimit: number = 8192, +): any { + return new Proxy(model, { + get(target, prop, receiver) { + if (prop === "doGenerate") { + return async function (options: any) { + const modifiedOptions = { ...options }; + if (typeof modifiedOptions.maxTokens === "number") { + modifiedOptions.maxTokens = Math.min(modifiedOptions.maxTokens, maxTokensLimit); + } else { + modifiedOptions.maxTokens = maxTokensLimit; + } + return target.doGenerate(modifiedOptions); + }; + } + if (prop === "doStream") { + return async function (options: any) { + const modifiedOptions = { ...options }; + if (typeof modifiedOptions.maxTokens === "number") { + modifiedOptions.maxTokens = Math.min(modifiedOptions.maxTokens, maxTokensLimit); + } else { + modifiedOptions.maxTokens = maxTokensLimit; + } + return target.doStream(modifiedOptions); + }; + } + const val = Reflect.get(target, prop, receiver); + if (typeof val === "function") { + return val.bind(target); + } + return val; + }, + }); +} diff --git a/backend/src/mastra/tools/dataset-tools.ts b/backend/src/mastra/tools/dataset-tools.ts index 1fc016e..f044ca3 100644 --- a/backend/src/mastra/tools/dataset-tools.ts +++ b/backend/src/mastra/tools/dataset-tools.ts @@ -131,6 +131,17 @@ export function buildPopulateTools( .array(z.string()) .optional() .describe("URLs you visited or used to gather data for this row"), + provenance: z + .record( + z.string(), + z.object({ + url: z.string(), + query: z.string().optional(), + snippet: z.string().optional(), + }) + ) + .optional() + .describe("Mapping of column names to their detailed source provenance (url, query, snippet)"), row_summary: z .string() .optional() @@ -141,7 +152,7 @@ export function buildPopulateTools( .describe("Brief description of how you found and verified this data"), }), outputSchema: writeResultSchema, - execute: async ({ data, sources, row_summary, how_found }) => { + execute: async ({ data, sources, provenance, row_summary, how_found }) => { if (!data || Object.keys(data).length === 0) return { success: false, @@ -158,6 +169,7 @@ export function buildPopulateTools( datasetId: authorizedDatasetId, data: cleanedData, ...(sources !== undefined ? { sources } : {}), + ...(provenance !== undefined ? { provenance } : {}), ...(row_summary !== undefined ? { rowSummary: row_summary } : {}), ...(how_found !== undefined ? { howFound: how_found } : {}), }); @@ -265,6 +277,17 @@ export function buildPopulateTools( .array(z.string()) .optional() .describe("Updated source URLs where this data was verified"), + provenance: z + .record( + z.string(), + z.object({ + url: z.string(), + query: z.string().optional(), + snippet: z.string().optional(), + }) + ) + .optional() + .describe("Updated mapping of column names to their detailed source provenance (url, query, snippet)"), row_summary: z .string() .optional() @@ -275,7 +298,7 @@ export function buildPopulateTools( .describe("Brief description of how the updated data was found"), }), outputSchema: writeResultSchema, - execute: async ({ rowId, data, sources, row_summary, how_found }) => { + execute: async ({ rowId, data, sources, provenance, row_summary, how_found }) => { if (!rowId) return { success: false, error: "rowId is required." }; if (!data || Object.keys(data).length === 0) return { @@ -293,6 +316,7 @@ export function buildPopulateTools( expectedDatasetId: authorizedDatasetId, data: cleanedData, ...(sources !== undefined ? { sources } : {}), + ...(provenance !== undefined ? { provenance } : {}), ...(row_summary !== undefined ? { rowSummary: row_summary } : {}), ...(how_found !== undefined ? { howFound: how_found } : {}), }); diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index 35db3b1..917dafe 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -2,6 +2,7 @@ import { createStep, createWorkflow } from "@mastra/core/workflows"; import { z } from "zod"; import { generateText } from "ai"; import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { wrapModelWithTokenLimit } from "../model-wrapper.js"; import { datasetContextSchema, populateColumnSchema } from "../../pipeline/populate.js"; import { convex, internal } from "../../convex.js"; import { DEFAULT_MODEL_IDS } from "../../config/models.js"; @@ -114,7 +115,7 @@ Respond with EXACTLY one word: scraper or search`; const modelSlug = inputData.authContext?.modelConfig?.schemaInference ?? DEFAULT_MODEL_IDS.SCHEMA_INFERENCE; const result = await generateText({ - model: openrouter(modelSlug), + model: wrapModelWithTokenLimit(openrouter(modelSlug)), prompt: classificationPrompt, maxOutputTokens: 10, abortSignal: getSignal(inputData.datasetId), diff --git a/backend/src/pipeline/schema-inference.ts b/backend/src/pipeline/schema-inference.ts index 1f1ea2a..a70e9ba 100644 --- a/backend/src/pipeline/schema-inference.ts +++ b/backend/src/pipeline/schema-inference.ts @@ -1,5 +1,6 @@ import { generateText, Output, NoObjectGeneratedError } from "ai"; import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { wrapModelWithTokenLimit } from "../mastra/model-wrapper.js"; import { DEFAULT_MODEL_IDS } from "../config/models.js"; import { datasetSchemaSchema, type DatasetSchema } from "./types.js"; @@ -33,7 +34,7 @@ function getModel(modelSlug?: string) { } const openrouter = createOpenRouter({ apiKey }); const resolvedSlug = modelSlug ?? DEFAULT_MODEL_IDS.SCHEMA_INFERENCE; - return openrouter(resolvedSlug); + return wrapModelWithTokenLimit(openrouter(resolvedSlug)); } export async function inferSchema(prompt: string, modelSlug?: string): Promise { diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index d28abda..d04a6f2 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -43,6 +43,11 @@ export default function DatasetPage() { column: DatasetColumn; value: unknown; sources?: string[]; + provenance?: { + url: string; + query?: string; + snippet?: string; + }; } | null>(null); const datasetId = params.id as Id<"datasets">; @@ -103,7 +108,12 @@ export default function DatasetPage() { const col = dataset.columns.find((c) => c.name === columnName); if (!col) return; const row = rows.find((r) => r._id === rowId); - setCellDetail({ column: col, value, sources: row?.sources }); + setCellDetail({ + column: col, + value, + sources: row?.sources, + provenance: row?.provenance?.[columnName], + }); }, [dataset, rows]); const openedFired = useRef(null); @@ -462,6 +472,7 @@ export default function DatasetPage() { column={cellDetail.column} value={cellDetail.value} sources={cellDetail.sources} + provenance={cellDetail.provenance} /> )} diff --git a/frontend/components/SideSheet.tsx b/frontend/components/SideSheet.tsx index 215d477..972631c 100644 --- a/frontend/components/SideSheet.tsx +++ b/frontend/components/SideSheet.tsx @@ -119,6 +119,12 @@ interface CellDetailProps { value: unknown; /** Row-level sources stored by the populate agent. */ sources?: string[]; + /** Cell-level provenance metadata. */ + provenance?: { + url: string; + query?: string; + snippet?: string; + }; } function isValidHttpUrl(src: string): boolean { @@ -130,7 +136,7 @@ function isValidHttpUrl(src: string): boolean { } } -export function CellDetail({ column, value, sources }: CellDetailProps) { +export function CellDetail({ column, value, sources, provenance }: CellDetailProps) { const [copied, setCopied] = useState(false); const copyTimerRef = useRef | null>(null); const displayValue = value == null || value === "" ? "—" : String(value); @@ -192,6 +198,65 @@ export function CellDetail({ column, value, sources }: CellDetailProps) { + {/* Cell Provenance */} + {provenance && ( +
+
+ + + + Verified Source Origin +
+ +
+ {/* Source URL */} +
+

Source URL

+ {isValidHttpUrl(provenance.url) ? ( + + + {provenance.url} + + ) : ( +

+ {provenance.url} +

+ )} +
+ + {/* Search Query */} + {provenance.query && ( +
+

Search Query Used

+
+ + + + {provenance.query} +
+
+ )} + + {/* Text Snippet */} + {provenance.snippet && ( +
+

Snippet Context

+
+ +

{provenance.snippet}

+
+
+ )} +
+
+ )} + {/* Sources */} {sources && sources.length > 0 && (
diff --git a/frontend/components/table/DataRow.tsx b/frontend/components/table/DataRow.tsx index 2d54661..0a456d5 100644 --- a/frontend/components/table/DataRow.tsx +++ b/frontend/components/table/DataRow.tsx @@ -117,6 +117,7 @@ function DataRowImpl({ const value = row.original.data[col.name]; const isPending = pendingRowIds.has(row.original._id); const isFlashing = flashingCells.has(`${row.original._id}:${col.name}`); + const hasProvenance = !!row.original.provenance?.[col.name]; return (
- +
+
+ +
+ {hasProvenance && ( + + )} +