Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"dotenv": "^16.4.0",
"fastify": "^5.0.0",
"fastify-plugin": "^5.1.0",
"playwright-core": "^1.60.0",
"posthog-node": "^5.35.1",
"resend": "^6.12.3",
"zod": "^4.4.3"
Expand Down
59 changes: 59 additions & 0 deletions backend/src/config/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,46 @@ export const DEFAULT_MODEL_IDS = {
INVESTIGATE_SUBAGENT: env.INVESTIGATE_SUBAGENT_MODEL,
} as const;

const ROW_EXTRACTOR_CONCURRENCY_MIN = 1;
export const ROW_EXTRACTOR_CONCURRENCY_MAX = 100;
const ROW_EXTRACTOR_BROWSER_ATTEMPTS_MIN = 1;
export const ROW_EXTRACTOR_BROWSER_ATTEMPTS_MAX = 10;

export function normalizeRowExtractorConcurrency(value: unknown): number {
return normalizeIntegerSetting(
value,
5,
ROW_EXTRACTOR_CONCURRENCY_MIN,
ROW_EXTRACTOR_CONCURRENCY_MAX,
);
}

export function normalizeRowExtractorBrowserAttempts(value: unknown): number {
return normalizeIntegerSetting(
value,
2,
ROW_EXTRACTOR_BROWSER_ATTEMPTS_MIN,
ROW_EXTRACTOR_BROWSER_ATTEMPTS_MAX,
);
}

function normalizeIntegerSetting(
value: unknown,
fallback: number,
min: number,
max: number,
): number {
const parsed = typeof value === "number" ? value : Number(value);
if (!Number.isFinite(parsed)) return fallback;
return Math.min(max, Math.max(min, Math.trunc(parsed)));
}

const DEFAULT_ROW_EXTRACTOR_CONCURRENCY = normalizeRowExtractorConcurrency(
env.ROW_EXTRACTOR_CONCURRENCY,
);
const DEFAULT_ROW_EXTRACTOR_BROWSER_ATTEMPTS =
normalizeRowExtractorBrowserAttempts(env.ROW_EXTRACTOR_BROWSER_ATTEMPTS);

/**
* Model roles for the settings UI.
*/
Expand Down Expand Up @@ -96,13 +136,23 @@ export async function upsertModelConfig(
schemaInference?: string;
populateOrchestrator?: string;
investigateSubagent?: string;
rowExtractorConcurrency?: number;
rowExtractorBrowserAttempts?: number;
}
): Promise<void> {
await convex.mutation(internal.modelConfig.upsertInternal, {
userId,
schemaInference: config.schemaInference ?? undefined,
populateOrchestrator: config.populateOrchestrator ?? undefined,
investigateSubagent: config.investigateSubagent ?? undefined,
rowExtractorConcurrency:
config.rowExtractorConcurrency !== undefined
? normalizeRowExtractorConcurrency(config.rowExtractorConcurrency)
: undefined,
rowExtractorBrowserAttempts:
config.rowExtractorBrowserAttempts !== undefined
? normalizeRowExtractorBrowserAttempts(config.rowExtractorBrowserAttempts)
: undefined,
});
}

Expand All @@ -117,12 +167,21 @@ export async function getModelConfig(
schemaInference: string;
populateOrchestrator: string;
investigateSubagent: string;
rowExtractorConcurrency: number;
rowExtractorBrowserAttempts: number;
}> {
const config = await convex.query(internal.modelConfig.getInternal, { userId });
return {
schemaInference: config?.schemaInference ?? DEFAULT_MODEL_IDS.SCHEMA_INFERENCE,
populateOrchestrator: config?.populateOrchestrator ?? DEFAULT_MODEL_IDS.POPULATE_ORCHESTRATOR,
investigateSubagent: config?.investigateSubagent ?? DEFAULT_MODEL_IDS.INVESTIGATE_SUBAGENT,
rowExtractorConcurrency: normalizeRowExtractorConcurrency(
config?.rowExtractorConcurrency ?? DEFAULT_ROW_EXTRACTOR_CONCURRENCY,
),
rowExtractorBrowserAttempts: normalizeRowExtractorBrowserAttempts(
config?.rowExtractorBrowserAttempts ??
DEFAULT_ROW_EXTRACTOR_BROWSER_ATTEMPTS,
),
};
}

Expand Down
5 changes: 5 additions & 0 deletions backend/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ export const env = {
process.env.POPULATE_ORCHESTRATOR_MODEL ?? "qwen/qwen3.7-max",
INVESTIGATE_SUBAGENT_MODEL:
process.env.INVESTIGATE_SUBAGENT_MODEL ?? "qwen/qwen3.7-max",
ROW_EXTRACTOR_CONCURRENCY: numberFromEnv("ROW_EXTRACTOR_CONCURRENCY", 5),
ROW_EXTRACTOR_BROWSER_ATTEMPTS: numberFromEnv(
"ROW_EXTRACTOR_BROWSER_ATTEMPTS",
2,
),

// Resend (transactional email). Optional — when RESEND_API_KEY is unset
// the email module no-ops with a log line, so local dev works without
Expand Down
29 changes: 23 additions & 6 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1079,12 +1079,27 @@ await fastify.register(async (instance) => {
schemaInference?: string | null;
populateOrchestrator?: string | null;
investigateSubagent?: string | null;
rowExtractorConcurrency?: number | null;
rowExtractorBrowserAttempts?: number | null;
};
const config = {
schemaInference: typeof body.schemaInference === "string" ? body.schemaInference.trim() || undefined : undefined,
populateOrchestrator: typeof body.populateOrchestrator === "string" ? body.populateOrchestrator.trim() || undefined : undefined,
investigateSubagent: typeof body.investigateSubagent === "string" ? body.investigateSubagent.trim() || undefined : undefined,
rowExtractorConcurrency:
typeof body.rowExtractorConcurrency === "number"
? body.rowExtractorConcurrency
: undefined,
rowExtractorBrowserAttempts:
typeof body.rowExtractorBrowserAttempts === "number"
? body.rowExtractorBrowserAttempts
: undefined,
};

const toValidate: Array<{ role: "schemaInference" | "populateOrchestrator" | "investigateSubagent"; slug: string }> = [];
if (body.schemaInference) toValidate.push({ role: "schemaInference", slug: body.schemaInference });
if (body.populateOrchestrator) toValidate.push({ role: "populateOrchestrator", slug: body.populateOrchestrator });
if (body.investigateSubagent) toValidate.push({ role: "investigateSubagent", slug: body.investigateSubagent });
if (config.schemaInference) toValidate.push({ role: "schemaInference", slug: config.schemaInference });
if (config.populateOrchestrator) toValidate.push({ role: "populateOrchestrator", slug: config.populateOrchestrator });
if (config.investigateSubagent) toValidate.push({ role: "investigateSubagent", slug: config.investigateSubagent });

if (toValidate.length > 0) {
try {
Expand All @@ -1104,9 +1119,11 @@ await fastify.register(async (instance) => {

try {
await upsertModelConfig(req.auth!.userId, {
schemaInference: body.schemaInference ?? undefined,
populateOrchestrator: body.populateOrchestrator ?? undefined,
investigateSubagent: body.investigateSubagent ?? undefined,
schemaInference: config.schemaInference,
populateOrchestrator: config.populateOrchestrator,
investigateSubagent: config.investigateSubagent,
rowExtractorConcurrency: config.rowExtractorConcurrency,
rowExtractorBrowserAttempts: config.rowExtractorBrowserAttempts,
});
return { success: true };
} catch (err) {
Expand Down
36 changes: 36 additions & 0 deletions backend/src/mastra/tools/investigate-tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type { AuthContext } from "../workflows/populate.js";
import type { PopulateColumn } from "../../pipeline/populate.js";
import type { RunMetrics } from "../run-metrics.js";
import { getSignal } from "../../abort-registry.js";
import { tryRowExtractor } from "../../row-extractors/try-row-extractor.js";

const investigateInputSchema = z.object({
entity_hint: z
Expand Down Expand Up @@ -100,6 +101,41 @@ export function buildSubagentTool(
}

if (metrics) metrics.investigateCalls++;

const extractorResult = await tryRowExtractor({
datasetId: authorizedDatasetId,
columns,
primaryKeys: primary_keys,
urls,
context,
browserAttempts: authContext.modelConfig.rowExtractorBrowserAttempts,
});
if (extractorResult.status === "inserted") {
if (metrics) metrics.rowsInserted++;
console.log(
`[run_subagent] row extractor inserted entity="${entity_hint}" reason="${extractorResult.reason}"`,
);
return {
inserted: true,
reason: extractorResult.reason,
row_summary: extractorResult.rowSummary,
clues: undefined,
};
}
if (/duplicate/i.test(extractorResult.reason)) {
return {
inserted: false,
reason: extractorResult.reason,
row_summary: undefined,
clues: undefined,
};
}
if (extractorResult.status === "failed") {
console.warn(
`[run_subagent] row extractor failed entity="${entity_hint}" reason="${extractorResult.reason}"`,
);
}

console.log(
`[run_subagent] spawning subagent user=${authContext.authorizedUserId} run=${authContext.workflowRunId} dataset=${authorizedDatasetId} entity="${entity_hint}" pk=${JSON.stringify(primary_keys)}`,
);
Expand Down
2 changes: 2 additions & 0 deletions backend/src/mastra/workflows/populate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export const authContextSchema = z.object({
schemaInference: z.string().min(1),
populateOrchestrator: z.string().min(1),
investigateSubagent: z.string().min(1),
rowExtractorConcurrency: z.number().int().min(1).max(100).default(5),
rowExtractorBrowserAttempts: z.number().int().min(1).max(10).default(2),
}),
isBenchmark: z.boolean().optional(),
});
Expand Down
62 changes: 56 additions & 6 deletions backend/src/mastra/workflows/update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { requireOpenRouterApiKey } from "../../local-credentials.js";
import { RunMetrics } from "../run-metrics.js";
import { saveRunMetrics } from "../save-run-metrics.js";
import { getSignal } from "../../abort-registry.js";
import { tryRefreshRowExtractor } from "../../row-extractors/try-row-extractor.js";

export const updateInputSchema = datasetContextSchema.extend({
authContext: authContextSchema,
Expand Down Expand Up @@ -69,8 +70,6 @@ const refreshOutputSchema = z.object({
errors: z.number(),
});

const MAX_CONCURRENT = 5;

async function processWithConcurrency<T>(
items: T[],
handler: (item: T) => Promise<void>,
Expand Down Expand Up @@ -100,17 +99,62 @@ const refreshRowsStep = createStep({

const metrics = new RunMetrics();
const startedAt = Date.now();
const openRouterApiKey = await requireOpenRouterApiKey();
let openRouterApiKeyPromise: ReturnType<typeof requireOpenRouterApiKey> | undefined;
const getOpenRouterApiKey = () => {
openRouterApiKeyPromise ??= requireOpenRouterApiKey();
return openRouterApiKeyPromise;
};

const pkColumns = columns.filter((c) => c.isPrimaryKey);
const maxConcurrent = authContext.modelConfig.rowExtractorConcurrency;

async function processRow(row: z.infer<typeof rowSchema>) {
try {
const primaryKeyRecord = Object.fromEntries(
pkColumns
.map((column) => [column.name, String(row.data[column.name] ?? "").trim()])
.filter(([, value]) => value.length > 0),
);

const extractorResult = await tryRefreshRowExtractor({
datasetId,
rowId: row._id,
columns,
primaryKeys: primaryKeyRecord,
existingData: row.data,
urls: row.sources,
context: [row.rowSummary, row.howFound].filter(Boolean).join("\n"),
browserAttempts: authContext.modelConfig.rowExtractorBrowserAttempts,
});

if (extractorResult.status === "updated") {
updatedCount++;
metrics.rowsUpdated++;
console.log(
`[refresh-rows] Row ${row._id}: updated=true via=row_extractor reason="${extractorResult.reason}"`,
);
return;
}

if (extractorResult.status === "unchanged") {
console.log(
`[refresh-rows] Row ${row._id}: updated=false via=row_extractor reason="${extractorResult.reason}"`,
);
return;
}

if (extractorResult.status === "failed") {
if (getSignal(datasetId)?.aborted) throwAbortError();
console.warn(
`[refresh-rows] Row ${row._id}: row extractor failed; falling back to refresh agent: ${extractorResult.reason}`,
);
}

const agent = buildRefreshAgent(
datasetId,
authContext,
columns,
openRouterApiKey,
await getOpenRouterApiKey(),
);

const pkBlock =
Expand Down Expand Up @@ -191,9 +235,9 @@ ${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`;
}

console.log(
`[refresh-rows] Processing ${rows.length} rows (max ${MAX_CONCURRENT} concurrent)`,
`[refresh-rows] Processing ${rows.length} rows (max ${maxConcurrent} concurrent)`,
);
await processWithConcurrency(rows, processRow, MAX_CONCURRENT);
await processWithConcurrency(rows, processRow, maxConcurrent);
Comment on lines +238 to +240
const finishedAt = Date.now();

// If the run was stopped mid-update, workers exited early via AbortError.
Expand Down Expand Up @@ -242,6 +286,12 @@ ${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`;
},
});

function throwAbortError(): never {
const err = new Error("Run was stopped");
err.name = "AbortError";
throw err;
}

export const updateWorkflow = createWorkflow({
id: "update-workflow",
inputSchema: updateInputSchema,
Expand Down
Loading