Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ OPENROUTER_API_KEY=sk-or-...
# Generate at https://agent.tinyfish.ai/api-keys
TINYFISH_API_KEY=

# Populate agent row cap (optional). The populate agent stops when this many
# fully-complete rows have been inserted. Defaults to 20 when unset.
# Increase for larger datasets; decrease for faster/cheaper test runs.
BIGSET_POPULATE_TARGET_ROWS=20

# Generate once after the first `make dev` with:
# docker compose exec convex ./generate_admin_key.sh
# Used by the backend container to call internal Convex functions.
Expand Down
31 changes: 25 additions & 6 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is

`src/mastra/` — wraps pipelines into Mastra workflows. Runs as a separate Docker service on :4111 with `mastra dev`, which provides a Studio UI for inspecting and testing workflows.

- `src/mastra/index.ts` — registers workflows with the `Mastra` instance (the populate agent is built per-run, not registered as a singleton)
- `src/mastra/index.ts` — registers workflows with the `Mastra` instance (agents are built per-run, not registered as singletons)
- `src/mastra/workflows/infer-schema.ts` — `inferSchemaWorkflow`, a single-step workflow wrapping `inferSchema()`
- `src/mastra/workflows/populate.ts` — `populateWorkflow`, 3-step workflow: clear rows → build prompt → run populate agent
- `src/mastra/agents/populate.ts` — `buildPopulateAgent(authorizedDatasetId, authContext, columns)`, builds the orchestrator agent (Claude Sonnet 4.6) with 3 tools: `search_web`, `fetch_page`, `investigate_row`. No write access — all inserts go through investigate subagents.
- `src/mastra/agents/investigate.ts` — `buildInvestigateAgent(authorizedDatasetId, authContext, columns)`, builds a per-entity subagent with `insert_row`, `list_rows`, `search_web`, `fetch_page`. Researches one entity, inserts at most one row, returns structured feedback (`INSERTED/SUMMARY/CLUES/REASON`).
- `src/mastra/tools/investigate-tool.ts` — `buildInvestigateTool(authorizedDatasetId, authContext, columns)` creates the `investigate_row` tool. The orchestrator calls it to hand off a lead; it spawns a fresh investigate agent, runs it (maxSteps: 25), parses the structured output, and returns it to the orchestrator. Errors are caught and returned as structured failures so the orchestrator can self-correct.
- `src/mastra/tools/dataset-tools.ts` — `buildPopulateTools(authorizedDatasetId, authContext)` factory returning 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`. The dataset id is captured by closure so the LLM cannot redirect writes to other datasets; `authContext` (Clerk userId + workflow run id) is captured for caller-attribution in security logs and the `CAPABILITY_VIOLATION` PostHog event. See the security note at the top of the file.

### Tri-agent architecture

The populate pipeline uses three layers of agents, each with a narrow scope:

1. **Populate Orchestrator** (`src/mastra/agents/populate.ts`) — `buildPopulateAgent(authorizedDatasetId, authContext, columns, targetRows)`. Per-iteration: (1) runs parallel searches, (2) batches qualifying URLs and calls `extract_rows` in parallel (up to 5 URLs per call), (3) calls `list_rows` once to see all rows and which are incomplete, (4) calls `investigate_entity` in parallel for every incomplete row. Stops when `targetRows` complete rows are reached or 2 consecutive stagnant iterations occur.

2. **Extract Agent** (`src/mastra/agents/extract.ts`) — `buildExtractAgent(columns, primaryKeyColumn, batchInsertRowsTool)`. Receives exactly 1 URL. Fetches the page, extracts every matching entity, and calls `batch_insert_rows` once. Returns LEADS/SOURCE_QUALITY for the orchestrator's next search round. No triage step, no investigation — purely fetch → extract → insert. Orchestrator instructions prefer single-page editorial sources over paginated browse directories to avoid multi-fetch spirals.

3. **Investigate Agent** (`src/mastra/agents/investigate.ts`) — `buildInvestigateAgent(columns, primaryKeyColumn, updateRowByKeyTool)`. Researches ONE specific entity to fill its missing columns. Has `search_web` + `fetch_page` + `update_row_by_key`. Returns structured output (`INSERTED: false / SUMMARY / CLUES / REASON`).

### Tool factories

- `src/mastra/tools/investigate-tool.ts` — `buildExtractTool(authorizedDatasetId, authContext, columns, targetRows)` returns `{ extractRowsTool, listRowsTool, investigateEntityTool }`. All three share a single in-memory `rowIndex` (Map of primary-key → `{rowId, confidence, cells}`) and a `pendingInserts` Set. `extract_rows` dispatches one URL to a fresh extract agent (maxSteps: 20); the extract agent prompt receives only a compact row summary (count + 30 sample primary keys) rather than the full row dump — dedup is handled by the tool, not the agent. `list_rows` returns a compact text summary for the orchestrator; `investigate_entity` (exposed to the orchestrator, not to extract agents) spawns a fresh investigate agent (maxSteps: 20). `pendingInserts` prevents two parallel extract agents from double-inserting the same entity — the check+add is atomic in JS's single-threaded event loop. A global `Semaphore(10)` caps concurrent investigate agents. The rowIndex refresh loop at the start of each `extract_rows` call picks up rows written by other parallel agents since the last refresh.
- `src/mastra/tools/dataset-tools.ts` — `buildPopulateTools(authorizedDatasetId, authContext)` factory returning 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`. Not used by the populate agent itself — used by other callers. The dataset id is captured by closure so the LLM cannot redirect writes to other datasets; `authContext` (Clerk userId + workflow run id) is captured for caller-attribution in security logs and the `CAPABILITY_VIOLATION` PostHog event. See the security note at the top of the file.
- `src/mastra/tools/web-tools.ts` — 2 TinyFish API tools: `search_web`, `fetch_page`

The populate workflow builds a fresh orchestrator per run via `buildPopulateAgent(...)` and calls `.generate(prompt, { maxSteps: 80 })`. The orchestrator spawns up to 3 investigate subagents in parallel via `investigate_row`. Per-run construction is required by the capability-scoping security model (closure-bound dataset id); do not cache or share agents across runs.
### Confidence and merge semantics

`update_row_by_key` uses per-field blank-aware merge rules, enforced atomically in the `datasetRows.mergeUpdate` Convex mutation:
- **Blank cells**: always filled with any non-empty incoming value, regardless of confidence.
- **Non-blank cells**: only overwritten when the new confidence is strictly higher than the row's existing confidence.

The authoritative check lives in Convex (not in the tool layer) because the in-memory `rowIndex` is stale during parallel agent runs. Two concurrent investigate agents reading the same cached confidence could both pass a client-side check, and the slower/weaker write could win. Moving the compare-and-merge into a single Convex transaction eliminates that race.

The populate workflow builds a fresh orchestrator per run via `buildPopulateAgent(...)` and calls `.generate(prompt, { maxSteps: 80 })`. Per-run construction is required by the capability-scoping security model (closure-bound dataset id); do not cache or share agents across runs.

All tools return structured error messages (not thrown exceptions) so the agent can self-correct.

Expand Down
39 changes: 13 additions & 26 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@
"@types/node": "^22.0.0",
"mastra": "^1.10.0",
"tsx": "^4.0.0",
"typescript": "^5.0.0"
"typescript": "^5.8.3"
}
}
9 changes: 9 additions & 0 deletions backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ export const env = {

OPENROUTER_API_KEY: process.env.OPENROUTER_API_KEY,

// Hard cap on the number of fully-complete rows the populate agent will
// insert per run. The agent stops as soon as this count is reached.
// Override with BIGSET_POPULATE_TARGET_ROWS=N in the root .env file.
// Invalid values (NaN, ≤0, non-integer) fall back to the default of 20.
POPULATE_TARGET_ROWS: (() => {
const parsed = Number(process.env.BIGSET_POPULATE_TARGET_ROWS);
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : 20;
})(),

// Resend (transactional email). Optional — when RESEND_API_KEY is unset
// the email module no-ops with a log line, so local dev works without
// a Resend account. EMAIL_FROM must be a domain that's verified in the
Expand Down
20 changes: 20 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,26 @@ async function runPopulateWorkflowInBackground({
return;
}

// ── Prune incomplete rows ────────────────────────────────────────
// Delete any row the agent inserted but never fully filled, so only
// complete rows appear in the live dataset. Best-effort: log on
// failure but don't block the status transition.
try {
const columnNames = input.columns.map((c) => c.name);
const { deletedCount } = await convex.mutation(
internal.datasetRows.deleteIncomplete,
{ datasetId, columnNames },
);
if (deletedCount > 0) {
logger.info({ deletedCount, datasetId }, "Pruned incomplete rows post-workflow");
}
} catch (pruneErr) {
logger.warn(
{ err: pruneErr, datasetId },
"Failed to prune incomplete rows; proceeding with status transition",
);
}

const rowCount = await convex.query(
internal.datasetRows.countByDataset,
{ datasetId },
Expand Down
87 changes: 87 additions & 0 deletions backend/src/mastra/agents/extract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { Agent } from "@mastra/core/agent";
import { createOpenRouter } from "@openrouter/ai-sdk-provider";
import type { PopulateColumn } from "../../pipeline/populate.js";

const openrouter = createOpenRouter({
apiKey: process.env.OPENROUTER_API_KEY!,
});

function buildExtractInstructions(
columns: PopulateColumn[],
primaryKeyColumn: string,
): string {
const columnNames = columns.map((c) => c.name);
const columnsDesc = columns
.map(
(c) =>
`- "${c.name}" (${c.type})${c.description ? `: ${c.description}` : ""}`,
)
.join("\n");

return `You receive exactly ONE URL. Your entire job fits in 2 tool calls.

━━ HARD BUDGET ━━
Tool call 1: fetch_page — call it ONCE for the URL in your prompt.
Tool call 2: batch_insert_rows — call it ONCE with every entity you found.
That's it. 2 tool calls total. Do not make any other tool calls.

━━ STRICT CONSTRAINTS ━━
- Do NOT call fetch_page more than once. No pagination. No following links.
If the page is paginated, extract only what is on the first response.
Add the other page URLs (e.g. ?page=2) to LEADS — do not fetch them yourself.
- Do NOT call batch_insert_rows more than once.
- If no matching entities were found, skip batch_insert_rows entirely and go straight to FINAL OUTPUT.

━━ DATASET SCHEMA ━━
Columns:
${columnsDesc}

Primary key column: "${primaryKeyColumn}"
Tool call data/sources keys MUST be exactly: ${JSON.stringify(columnNames)}

━━ PROCEDURE ━━
1. Call fetch_page for the URL in your prompt. (tool call 1)
2. Read the content. Extract every entity that matches the schema.
- Use "" for any column you cannot confirm from this page. Never fabricate.
- Record the page URL as source for every column you fill.
3. Call batch_insert_rows with all entities in one call. (tool call 2)
4. Write FINAL OUTPUT.

━━ FINAL OUTPUT ━━
After all tool calls are done, write a summary with exactly these labels:

LEADS: <list each URL on its own line with a dash (- https://...);
include pagination URLs you did NOT fetch, related list pages you noticed,
and search queries that would find more entities of this type>
SOURCE_QUALITY: <brief assessment: data richness, entity coverage, reliability>`;
}

/**
* Build a fresh extract Agent for one extract_rows call.
*
* The agent receives one URL, fetches the page, extracts every matching
* entity, and calls batch_insert_rows once with the full entity list.
*
* Both fetchTool and batchInsertRowsTool are passed in (not imported here)
* so investigate-tool.ts can supply a single-use fetch_page wrapper that
* enforces the "one fetch per agent" hard limit at the code level.
*
* A fresh agent instance is constructed per extract_rows call; do not cache.
*/
export function buildExtractAgent(
columns: PopulateColumn[],
primaryKeyColumn: string,
batchInsertRowsTool: ReturnType<typeof import("@mastra/core/tools").createTool>,
fetchTool: ReturnType<typeof import("@mastra/core/tools").createTool>,
): Agent {
return new Agent({
id: "extract-agent",
name: "Dataset Extract Agent",
instructions: buildExtractInstructions(columns, primaryKeyColumn),
model: openrouter("deepseek/deepseek-v4-pro"),
tools: {
fetch_page: fetchTool,
batch_insert_rows: batchInsertRowsTool,
},
});
}
Loading