Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 145 additions & 23 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,18 @@ type DatasetPopulateBeginOutcome =
| "started"
| "not_found"
| "forbidden"
| "already_building";
| "already_building"
| "already_updating";
type PopulateWorkflowRun = Awaited<ReturnType<typeof populateWorkflow.createRun>>;

type DatasetUpdateBeginOutcome =
| "started"
| "not_found"
| "forbidden"
| "already_building"
| "already_updating";
type UpdateWorkflowRun = Awaited<ReturnType<typeof updateWorkflow.createRun>>;

function statusErrorMessage(err: unknown): string {
const message = err instanceof Error ? err.message : String(err);
return message.slice(0, 500);
Expand Down Expand Up @@ -64,19 +73,21 @@ async function sendDatasetReadyNotification({
datasetId,
datasetName,
rowCount,
workflowType = "populate",
}: {
logger: FastifyBaseLogger;
clerk: ClerkClient;
userId: string;
datasetId: string;
datasetName: string;
rowCount: number;
workflowType?: "populate" | "update";
}): Promise<void> {
const baseProps = {
datasetId,
datasetName,
rowCount,
workflowType: "populate" as const,
workflowType,
};

try {
Expand Down Expand Up @@ -130,6 +141,106 @@ async function sendDatasetReadyNotification({
}
}

async function beginDatasetUpdate(
datasetId: string,
ownerId: string,
): Promise<DatasetUpdateBeginOutcome> {
const claim = await convex.mutation(internal.datasets.beginUpdateInternal, {
id: datasetId,
ownerId,
});
return claim.outcome;
}

async function runUpdateWorkflowInBackground({
input,
run,
authorizedUserId,
logger,
clerk,
}: {
input: DatasetContext;
run: UpdateWorkflowRun;
authorizedUserId: string;
logger: FastifyBaseLogger;
clerk: ClerkClient;
}): Promise<void> {
const datasetId = input.datasetId;

try {
const result = await run.start({
inputData: {
...input,
authContext: {
authorizedUserId,
workflowRunId: run.runId,
},
},
});

logger.info(
{
workflowStatus: result.status,
steps: JSON.stringify(result.steps).slice(0, 2000),
},
"Update workflow completed",
);

if (result.status !== "success") {
throw new Error(`Workflow ended with status: ${result.status}`);
}

const currentDataset = await convex.query(internal.datasets.getInternal, {
id: datasetId,
});
if (!currentDataset) {
logger.info(
{ datasetId },
"Dataset no longer exists post-update; skipping status transition",
);
return;
}

await setDatasetPopulateStatus(datasetId, "live");

const rowCount = await convex.query(
internal.datasetRows.countByDataset,
{ datasetId },
);
await sendDatasetReadyNotification({
logger,
clerk,
userId: authorizedUserId,
datasetId,
datasetName: currentDataset.name,
rowCount,
workflowType: "update",
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
} catch (err) {
const lastStatusError = statusErrorMessage(err);
logger.error({ err, datasetId }, "Update background workflow failed");

try {
const currentDataset = await convex.query(internal.datasets.getInternal, {
id: datasetId,
});
if (!currentDataset) {
logger.info(
{ datasetId },
"Dataset no longer exists after failed update; skipping failed status transition",
);
return;
}
await setDatasetPopulateStatus(datasetId, "failed", lastStatusError);
} catch (statusErr) {
logger.error(
{ err: statusErr, datasetId },
"Failed to transition dataset status to 'failed' after update",
);
}
}
}

async function runPopulateWorkflowInBackground({
input,
run,
Expand Down Expand Up @@ -350,34 +461,45 @@ await fastify.register(async (instance) => {
return reply.code(401).send({ error: "Authentication required" });
}

const dataset = await convex.query(internal.datasets.getInternal, {
id: parsed.data.datasetId,
});
if (!dataset) {
const updateOutcome = await beginDatasetUpdate(
parsed.data.datasetId,
auth.userId,
);

if (updateOutcome === "not_found") {
return reply.code(404).send({ error: "Dataset not found" });
}
if (dataset.ownerId !== auth.userId) {
if (updateOutcome === "forbidden") {
return reply.code(403).send({ error: "Not authorized to update this dataset" });
}
if (updateOutcome === "already_building") {
return reply.code(409).send({ error: "Dataset is being populated" });
}
if (updateOutcome === "already_updating") {
return reply.code(409).send({ error: "Dataset is already being updated" });
}
if (updateOutcome !== "started") {
throw new Error(`Unexpected update claim outcome: ${updateOutcome}`);
}

const run = await updateWorkflow.createRun();
const result = await run.start({
inputData: {
...parsed.data,
authContext: {
authorizedUserId: auth.userId,
workflowRunId: run.runId,
},
},
});

req.log.info({ workflowStatus: result.status }, "Update workflow completed");

if (result.status !== "success") {
throw new Error(`Workflow ended with status: ${result.status}`);
let run: UpdateWorkflowRun;
try {
run = await updateWorkflow.createRun();
} catch (runErr) {
req.log.error(runErr, "Failed to create update workflow run; reverting dataset status");
await setDatasetPopulateStatus(parsed.data.datasetId, "live");
return reply.code(502).send({ error: "Failed to update dataset. Please try again." });
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

return { success: true, result: result.result };
void runUpdateWorkflowInBackground({
input: parsed.data,
run,
authorizedUserId: auth.userId,
logger: req.log,
clerk: req.server.clerk,
});

return reply.code(202).send({ success: true, runId: run.runId });
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (msg.includes("validator") || msg.includes("Invalid")) {
Expand Down
6 changes: 3 additions & 3 deletions backend/src/mastra/agents/investigate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,19 @@ RULES:
TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces:
search_web: {"query": "your search terms"}
fetch_page: {"url": "https://example.com"}
insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}}
insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"}

WORKFLOW:
1. Fetch 1-2 of the provided URLs to get real data (if URLs were given).
2. If you need more, run ONE search and fetch the best result.
3. Call insert_row with whatever real data you have. Use "" for missing fields.
Include "sources" (URLs you fetched), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "<insert url>", 2. Look for the pricing field, and title name field, 3. etc...)
4. Write your final response:
INSERTED: true/false
SUMMARY: one line
CLUES: hints for finding more entities
REASON: why you succeeded or what was missing

You are scoped to ONE dataset. Do not pass a datasetId to any tool.`;
`;
}

/**
Expand Down
74 changes: 74 additions & 0 deletions backend/src/mastra/agents/refresh.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { Agent } from "@mastra/core/agent";
import { createOpenRouter } from "@openrouter/ai-sdk-provider";
import { buildPopulateTools } from "../tools/dataset-tools.js";
import { searchWebTool, fetchPageTool } from "../tools/web-tools.js";
import type { AuthContext } from "../workflows/populate.js";
import type { PopulateColumn } from "../../pipeline/populate.js";

const openrouter = createOpenRouter({
apiKey: process.env.OPENROUTER_API_KEY!,
});

function buildRefreshInstructions(columns: PopulateColumn[]): string {
const columnNames = columns.map((c) => c.name);
const columnsDesc = columns
.map(
(c) =>
`- "${c.name}" (${c.type})${c.isPrimaryKey ? " [PRIMARY KEY]" : ""}${c.description ? `: ${c.description}` : ""}`,
)
.join("\n");

return `You are refreshing data for one existing row. Be fast — you have very few steps.

Columns:
${columnsDesc}

RULES:
- You have at most 6 tool calls total.
- Start by following the steps in the "Previously found via" section — it describes exactly how the data was originally extracted (which URLs to fetch, which fields to look for). Reproduce those steps to get fresh data.
- If no "Previously found via" steps are provided, fall back to fetching the source URLs directly.
- If a source returns a 404, timeout, or is blocked, note it and move to the next.
- Compare the fetched data with the existing row data carefully.
- If data has MEANINGFULLY changed (not just formatting differences), call update_row with the FULL updated data object (all columns, not just changed ones), plus updated sources, row_summary, and how_found.
- If NO sources work (all 404/blocked), try ONE web search using the primary key values to find a current source.
- If the data is unchanged, do NOT call update_row. Just report your findings.
- Never fabricate values. If you can't verify a field, keep the existing value.

TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces:
fetch_page: {"url": "https://example.com"}
search_web: {"query": "your search terms"}
update_row: {"rowId": "<id>", "data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://..."], "row_summary": "one line about this entity", "how_found": "how you verified this data"}

WORKFLOW:
1. Fetch the provided source URLs (1-2 calls).
2. Compare fetched data with existing row data.
3. If changed: call update_row with the full updated data.
If sources broken: try ONE search using primary key values, then fetch the best result.
4. Write your final response:
UPDATED: true/false
CHANGES: what changed (or "no changes")
REASON: why you updated or didn't
`;
}

export function buildRefreshAgent(
authorizedDatasetId: string,
authContext: AuthContext,
columns: PopulateColumn[],
): Agent {
const { update_row } = buildPopulateTools(
authorizedDatasetId,
authContext,
);
return new Agent({
id: "refresh-agent",
name: "Dataset Refresh Agent",
instructions: buildRefreshInstructions(columns),
model: openrouter("qwen/qwen3.7-max"),
tools: {
update_row,
search_web: searchWebTool,
fetch_page: fetchPageTool,
},
});
}
39 changes: 33 additions & 6 deletions backend/src/mastra/tools/dataset-tools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,21 @@ export function buildPopulateTools(
"Insert a single row into the dataset you are populating. Call this each time you have a row ready — don't wait to batch them.",
inputSchema: z.object({
data: z.record(z.string(), z.any()),
sources: z
.array(z.string())
.optional()
.describe("URLs you visited or used to gather data for this row"),
row_summary: z
.string()
.optional()
.describe("One-line summary of this entity"),
how_found: z
.string()
.optional()
.describe("Brief description of how you found and verified this data"),
}),
outputSchema: writeResultSchema,
execute: async ({ data }) => {
execute: async ({ data, sources, row_summary, how_found }) => {
if (!data || Object.keys(data).length === 0)
return {
success: false,
Expand All @@ -139,12 +151,15 @@ export function buildPopulateTools(

const cleanedData = cleanDataKeys(data);
console.log(
`[insert_row] ${logCtx} cols=${Object.keys(cleanedData).length}`,
`[insert_row] ${logCtx} cols=${Object.keys(cleanedData).length} sources=${sources?.length ?? 0}`,
);
try {
await convex.mutation(internal.datasetRows.insert, {
datasetId: authorizedDatasetId,
data: cleanedData,
...(sources !== undefined ? { sources } : {}),
...(row_summary !== undefined ? { rowSummary: row_summary } : {}),
...(how_found !== undefined ? { howFound: how_found } : {}),
});
return { success: true };
} catch (err) {
Expand Down Expand Up @@ -246,9 +261,21 @@ export function buildPopulateTools(
inputSchema: z.object({
rowId: z.string(),
data: z.record(z.string(), z.any()),
sources: z
.array(z.string())
.optional()
.describe("Updated source URLs where this data was verified"),
row_summary: z
.string()
.optional()
.describe("Updated one-line summary of this entity"),
how_found: z
.string()
.optional()
.describe("Brief description of how the updated data was found"),
}),
outputSchema: writeResultSchema,
execute: async ({ rowId, data }) => {
execute: async ({ rowId, data, sources, row_summary, how_found }) => {
if (!rowId) return { success: false, error: "rowId is required." };
if (!data || Object.keys(data).length === 0)
return {
Expand All @@ -261,13 +288,13 @@ export function buildPopulateTools(
`[update_row] ${logCtx} row=${rowId} cols=${Object.keys(cleanedData).length}`,
);
try {
// expectedDatasetId pins the Convex-side atomic capability check.
// If `rowId` belongs to another dataset, the mutation throws
// "Row not found" — uniform with the get_row policy.
await convex.mutation(internal.datasetRows.update, {
id: rowId,
expectedDatasetId: authorizedDatasetId,
data: cleanedData,
...(sources !== undefined ? { sources } : {}),
...(row_summary !== undefined ? { rowSummary: row_summary } : {}),
...(how_found !== undefined ? { howFound: how_found } : {}),
});
return { success: true };
} catch (err) {
Expand Down
Loading