diff --git a/backend/package-lock.json b/backend/package-lock.json index e231b48..8239c4e 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -18,6 +18,7 @@ "dotenv": "^16.4.0", "fastify": "^5.0.0", "fastify-plugin": "^5.1.0", + "playwright-core": "^1.60.0", "posthog-node": "^5.35.1", "resend": "^6.12.3", "zod": "^4.4.3" @@ -7623,6 +7624,18 @@ "pathe": "^2.0.1" } }, + "node_modules/playwright-core": { + "version": "1.60.0", + "resolved": "https://registry.npmjs.org/playwright-core/-/playwright-core-1.60.0.tgz", + "integrity": "sha512-9bW6zvX/m0lEbgTKJ6YppOKx8H3VOPBMOCFh2irXFOT4BbHgrx5hPjwJYLT40Lu+4qtD36qKc/Hn56StUW57IA==", + "license": "Apache-2.0", + "bin": { + "playwright-core": "cli.js" + }, + "engines": { + "node": ">=18" + } + }, "node_modules/postal-mime": { "version": "2.7.4", "resolved": "https://registry.npmjs.org/postal-mime/-/postal-mime-2.7.4.tgz", diff --git a/backend/package.json b/backend/package.json index 11df78b..0e161cb 100644 --- a/backend/package.json +++ b/backend/package.json @@ -20,6 +20,7 @@ "dotenv": "^16.4.0", "fastify": "^5.0.0", "fastify-plugin": "^5.1.0", + "playwright-core": "^1.60.0", "posthog-node": "^5.35.1", "resend": "^6.12.3", "zod": "^4.4.3" diff --git a/backend/src/config/models.ts b/backend/src/config/models.ts index 1d28cbf..3cb3e17 100644 --- a/backend/src/config/models.ts +++ b/backend/src/config/models.ts @@ -28,6 +28,46 @@ export const DEFAULT_MODEL_IDS = { INVESTIGATE_SUBAGENT: env.INVESTIGATE_SUBAGENT_MODEL, } as const; +const ROW_EXTRACTOR_CONCURRENCY_MIN = 1; +export const ROW_EXTRACTOR_CONCURRENCY_MAX = 100; +const ROW_EXTRACTOR_BROWSER_ATTEMPTS_MIN = 1; +export const ROW_EXTRACTOR_BROWSER_ATTEMPTS_MAX = 10; + +export function normalizeRowExtractorConcurrency(value: unknown): number { + return normalizeIntegerSetting( + value, + 5, + ROW_EXTRACTOR_CONCURRENCY_MIN, + ROW_EXTRACTOR_CONCURRENCY_MAX, + ); +} + +export function normalizeRowExtractorBrowserAttempts(value: unknown): number { + return normalizeIntegerSetting( + value, + 2, + ROW_EXTRACTOR_BROWSER_ATTEMPTS_MIN, + ROW_EXTRACTOR_BROWSER_ATTEMPTS_MAX, + ); +} + +function normalizeIntegerSetting( + value: unknown, + fallback: number, + min: number, + max: number, +): number { + const parsed = typeof value === "number" ? value : Number(value); + if (!Number.isFinite(parsed)) return fallback; + return Math.min(max, Math.max(min, Math.trunc(parsed))); +} + +const DEFAULT_ROW_EXTRACTOR_CONCURRENCY = normalizeRowExtractorConcurrency( + env.ROW_EXTRACTOR_CONCURRENCY, +); +const DEFAULT_ROW_EXTRACTOR_BROWSER_ATTEMPTS = + normalizeRowExtractorBrowserAttempts(env.ROW_EXTRACTOR_BROWSER_ATTEMPTS); + /** * Model roles for the settings UI. */ @@ -96,6 +136,8 @@ export async function upsertModelConfig( schemaInference?: string; populateOrchestrator?: string; investigateSubagent?: string; + rowExtractorConcurrency?: number; + rowExtractorBrowserAttempts?: number; } ): Promise { await convex.mutation(internal.modelConfig.upsertInternal, { @@ -103,6 +145,14 @@ export async function upsertModelConfig( schemaInference: config.schemaInference ?? undefined, populateOrchestrator: config.populateOrchestrator ?? undefined, investigateSubagent: config.investigateSubagent ?? undefined, + rowExtractorConcurrency: + config.rowExtractorConcurrency !== undefined + ? normalizeRowExtractorConcurrency(config.rowExtractorConcurrency) + : undefined, + rowExtractorBrowserAttempts: + config.rowExtractorBrowserAttempts !== undefined + ? normalizeRowExtractorBrowserAttempts(config.rowExtractorBrowserAttempts) + : undefined, }); } @@ -117,12 +167,21 @@ export async function getModelConfig( schemaInference: string; populateOrchestrator: string; investigateSubagent: string; + rowExtractorConcurrency: number; + rowExtractorBrowserAttempts: number; }> { const config = await convex.query(internal.modelConfig.getInternal, { userId }); return { schemaInference: config?.schemaInference ?? DEFAULT_MODEL_IDS.SCHEMA_INFERENCE, populateOrchestrator: config?.populateOrchestrator ?? DEFAULT_MODEL_IDS.POPULATE_ORCHESTRATOR, investigateSubagent: config?.investigateSubagent ?? DEFAULT_MODEL_IDS.INVESTIGATE_SUBAGENT, + rowExtractorConcurrency: normalizeRowExtractorConcurrency( + config?.rowExtractorConcurrency ?? DEFAULT_ROW_EXTRACTOR_CONCURRENCY, + ), + rowExtractorBrowserAttempts: normalizeRowExtractorBrowserAttempts( + config?.rowExtractorBrowserAttempts ?? + DEFAULT_ROW_EXTRACTOR_BROWSER_ATTEMPTS, + ), }; } diff --git a/backend/src/env.ts b/backend/src/env.ts index adcec52..a818e29 100644 --- a/backend/src/env.ts +++ b/backend/src/env.ts @@ -52,6 +52,11 @@ export const env = { process.env.POPULATE_ORCHESTRATOR_MODEL ?? "qwen/qwen3.7-max", INVESTIGATE_SUBAGENT_MODEL: process.env.INVESTIGATE_SUBAGENT_MODEL ?? "qwen/qwen3.7-max", + ROW_EXTRACTOR_CONCURRENCY: numberFromEnv("ROW_EXTRACTOR_CONCURRENCY", 5), + ROW_EXTRACTOR_BROWSER_ATTEMPTS: numberFromEnv( + "ROW_EXTRACTOR_BROWSER_ATTEMPTS", + 2, + ), // Resend (transactional email). Optional — when RESEND_API_KEY is unset // the email module no-ops with a log line, so local dev works without diff --git a/backend/src/index.ts b/backend/src/index.ts index 4d63f42..6a8df6d 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -1079,12 +1079,27 @@ await fastify.register(async (instance) => { schemaInference?: string | null; populateOrchestrator?: string | null; investigateSubagent?: string | null; + rowExtractorConcurrency?: number | null; + rowExtractorBrowserAttempts?: number | null; + }; + const config = { + schemaInference: typeof body.schemaInference === "string" ? body.schemaInference.trim() || undefined : undefined, + populateOrchestrator: typeof body.populateOrchestrator === "string" ? body.populateOrchestrator.trim() || undefined : undefined, + investigateSubagent: typeof body.investigateSubagent === "string" ? body.investigateSubagent.trim() || undefined : undefined, + rowExtractorConcurrency: + typeof body.rowExtractorConcurrency === "number" + ? body.rowExtractorConcurrency + : undefined, + rowExtractorBrowserAttempts: + typeof body.rowExtractorBrowserAttempts === "number" + ? body.rowExtractorBrowserAttempts + : undefined, }; const toValidate: Array<{ role: "schemaInference" | "populateOrchestrator" | "investigateSubagent"; slug: string }> = []; - if (body.schemaInference) toValidate.push({ role: "schemaInference", slug: body.schemaInference }); - if (body.populateOrchestrator) toValidate.push({ role: "populateOrchestrator", slug: body.populateOrchestrator }); - if (body.investigateSubagent) toValidate.push({ role: "investigateSubagent", slug: body.investigateSubagent }); + if (config.schemaInference) toValidate.push({ role: "schemaInference", slug: config.schemaInference }); + if (config.populateOrchestrator) toValidate.push({ role: "populateOrchestrator", slug: config.populateOrchestrator }); + if (config.investigateSubagent) toValidate.push({ role: "investigateSubagent", slug: config.investigateSubagent }); if (toValidate.length > 0) { try { @@ -1104,9 +1119,11 @@ await fastify.register(async (instance) => { try { await upsertModelConfig(req.auth!.userId, { - schemaInference: body.schemaInference ?? undefined, - populateOrchestrator: body.populateOrchestrator ?? undefined, - investigateSubagent: body.investigateSubagent ?? undefined, + schemaInference: config.schemaInference, + populateOrchestrator: config.populateOrchestrator, + investigateSubagent: config.investigateSubagent, + rowExtractorConcurrency: config.rowExtractorConcurrency, + rowExtractorBrowserAttempts: config.rowExtractorBrowserAttempts, }); return { success: true }; } catch (err) { diff --git a/backend/src/mastra/tools/investigate-tool.ts b/backend/src/mastra/tools/investigate-tool.ts index 0139aa4..9b13ceb 100644 --- a/backend/src/mastra/tools/investigate-tool.ts +++ b/backend/src/mastra/tools/investigate-tool.ts @@ -6,6 +6,7 @@ import type { AuthContext } from "../workflows/populate.js"; import type { PopulateColumn } from "../../pipeline/populate.js"; import type { RunMetrics } from "../run-metrics.js"; import { getSignal } from "../../abort-registry.js"; +import { tryRowExtractor } from "../../row-extractors/try-row-extractor.js"; const investigateInputSchema = z.object({ entity_hint: z @@ -100,6 +101,41 @@ export function buildSubagentTool( } if (metrics) metrics.investigateCalls++; + + const extractorResult = await tryRowExtractor({ + datasetId: authorizedDatasetId, + columns, + primaryKeys: primary_keys, + urls, + context, + browserAttempts: authContext.modelConfig.rowExtractorBrowserAttempts, + }); + if (extractorResult.status === "inserted") { + if (metrics) metrics.rowsInserted++; + console.log( + `[run_subagent] row extractor inserted entity="${entity_hint}" reason="${extractorResult.reason}"`, + ); + return { + inserted: true, + reason: extractorResult.reason, + row_summary: extractorResult.rowSummary, + clues: undefined, + }; + } + if (/duplicate/i.test(extractorResult.reason)) { + return { + inserted: false, + reason: extractorResult.reason, + row_summary: undefined, + clues: undefined, + }; + } + if (extractorResult.status === "failed") { + console.warn( + `[run_subagent] row extractor failed entity="${entity_hint}" reason="${extractorResult.reason}"`, + ); + } + console.log( `[run_subagent] spawning subagent user=${authContext.authorizedUserId} run=${authContext.workflowRunId} dataset=${authorizedDatasetId} entity="${entity_hint}" pk=${JSON.stringify(primary_keys)}`, ); diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts index e07ed8e..b0c6299 100644 --- a/backend/src/mastra/workflows/populate.ts +++ b/backend/src/mastra/workflows/populate.ts @@ -37,6 +37,8 @@ export const authContextSchema = z.object({ schemaInference: z.string().min(1), populateOrchestrator: z.string().min(1), investigateSubagent: z.string().min(1), + rowExtractorConcurrency: z.number().int().min(1).max(100).default(5), + rowExtractorBrowserAttempts: z.number().int().min(1).max(10).default(2), }), isBenchmark: z.boolean().optional(), }); diff --git a/backend/src/mastra/workflows/update.ts b/backend/src/mastra/workflows/update.ts index 28aadee..55a0f13 100644 --- a/backend/src/mastra/workflows/update.ts +++ b/backend/src/mastra/workflows/update.ts @@ -8,6 +8,7 @@ import { requireOpenRouterApiKey } from "../../local-credentials.js"; import { RunMetrics } from "../run-metrics.js"; import { saveRunMetrics } from "../save-run-metrics.js"; import { getSignal } from "../../abort-registry.js"; +import { tryRefreshRowExtractor } from "../../row-extractors/try-row-extractor.js"; export const updateInputSchema = datasetContextSchema.extend({ authContext: authContextSchema, @@ -69,8 +70,6 @@ const refreshOutputSchema = z.object({ errors: z.number(), }); -const MAX_CONCURRENT = 5; - async function processWithConcurrency( items: T[], handler: (item: T) => Promise, @@ -100,17 +99,62 @@ const refreshRowsStep = createStep({ const metrics = new RunMetrics(); const startedAt = Date.now(); - const openRouterApiKey = await requireOpenRouterApiKey(); + let openRouterApiKeyPromise: ReturnType | undefined; + const getOpenRouterApiKey = () => { + openRouterApiKeyPromise ??= requireOpenRouterApiKey(); + return openRouterApiKeyPromise; + }; const pkColumns = columns.filter((c) => c.isPrimaryKey); + const maxConcurrent = authContext.modelConfig.rowExtractorConcurrency; async function processRow(row: z.infer) { try { + const primaryKeyRecord = Object.fromEntries( + pkColumns + .map((column) => [column.name, String(row.data[column.name] ?? "").trim()]) + .filter(([, value]) => value.length > 0), + ); + + const extractorResult = await tryRefreshRowExtractor({ + datasetId, + rowId: row._id, + columns, + primaryKeys: primaryKeyRecord, + existingData: row.data, + urls: row.sources, + context: [row.rowSummary, row.howFound].filter(Boolean).join("\n"), + browserAttempts: authContext.modelConfig.rowExtractorBrowserAttempts, + }); + + if (extractorResult.status === "updated") { + updatedCount++; + metrics.rowsUpdated++; + console.log( + `[refresh-rows] Row ${row._id}: updated=true via=row_extractor reason="${extractorResult.reason}"`, + ); + return; + } + + if (extractorResult.status === "unchanged") { + console.log( + `[refresh-rows] Row ${row._id}: updated=false via=row_extractor reason="${extractorResult.reason}"`, + ); + return; + } + + if (extractorResult.status === "failed") { + if (getSignal(datasetId)?.aborted) throwAbortError(); + console.warn( + `[refresh-rows] Row ${row._id}: row extractor failed; falling back to refresh agent: ${extractorResult.reason}`, + ); + } + const agent = buildRefreshAgent( datasetId, authContext, columns, - openRouterApiKey, + await getOpenRouterApiKey(), ); const pkBlock = @@ -191,9 +235,9 @@ ${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`; } console.log( - `[refresh-rows] Processing ${rows.length} rows (max ${MAX_CONCURRENT} concurrent)`, + `[refresh-rows] Processing ${rows.length} rows (max ${maxConcurrent} concurrent)`, ); - await processWithConcurrency(rows, processRow, MAX_CONCURRENT); + await processWithConcurrency(rows, processRow, maxConcurrent); const finishedAt = Date.now(); // If the run was stopped mid-update, workers exited early via AbortError. @@ -242,6 +286,12 @@ ${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`; }, }); +function throwAbortError(): never { + const err = new Error("Run was stopped"); + err.name = "AbortError"; + throw err; +} + export const updateWorkflow = createWorkflow({ id: "update-workflow", inputSchema: updateInputSchema, diff --git a/backend/src/row-extractors/try-row-extractor.ts b/backend/src/row-extractors/try-row-extractor.ts new file mode 100644 index 0000000..29dda9f --- /dev/null +++ b/backend/src/row-extractors/try-row-extractor.ts @@ -0,0 +1,709 @@ +import { chromium, type Browser, type Page } from "playwright-core"; + +import { getSignal } from "../abort-registry.js"; +import { convex, internal } from "../convex.js"; +import { FETCH_TIMEOUT_MS } from "../fetch-timeout.js"; +import { getTinyFishApiKey, tinyFishHeaders } from "../local-credentials.js"; +import type { PopulateColumn } from "../pipeline/populate.js"; + +type ExtractorStatus = "inserted" | "updated" | "unchanged" | "miss" | "failed"; + +export interface TryRowExtractorInput { + datasetId: string; + columns: PopulateColumn[]; + primaryKeys: Record; + urls?: string[]; + context?: string; + browserAttempts?: number; +} + +export interface TryRefreshRowExtractorInput extends TryRowExtractorInput { + rowId: string; + existingData: Record; +} + +export interface TryRowExtractorResult { + status: ExtractorStatus; + reason: string; + rowSummary?: string; + sources?: string[]; +} + +interface TinyFishBrowserSession { + session_id: string; + cdp_url: string; + base_url: string; +} + +interface GitHubRepoFacts { + owner: string; + repo: string; + fullName: string; + url: string; + description?: string; + stars?: number; + forks?: number; + watchers?: number; + issues?: number; + pullRequests?: number; + language?: string; + license?: string; + latestCommitAt?: string; + updatedAt?: string; + createdAt?: string; + homepage?: string; + archived?: boolean; +} + +interface RawGitHubRepoDomFacts { + description?: string; + stars?: string; + forks?: string; + watchers?: string; + issues?: string; + pullRequests?: string; + language?: string; + license?: string; + latestCommitAt?: string; + homepage?: string; + archived?: boolean; +} + +const ENABLED_VALUES = new Set(["1", "true", "yes", "on"]); +const GITHUB_HOSTS = new Set(["github.com", "www.github.com"]); +const BROWSER_TIMEOUT_MS = 45_000; +const CDP_CONNECT_TIMEOUT_MS = 45_000; +const DEFAULT_BROWSER_ATTEMPTS = 2; +const GITHUB_EXTRACTOR_HOW_FOUND = + "Opened the GitHub repository URL with TinyFish Browser and extracted repository facts from the rendered page."; +const GITHUB_REFRESH_HOW_FOUND = + "Refreshed the GitHub repository URL with TinyFish Browser and extracted repository facts from the rendered page."; + +export async function tryRowExtractor( + input: TryRowExtractorInput, +): Promise { + if (!ENABLED_VALUES.has((process.env.ROW_EXTRACTORS_ENABLED ?? "").toLowerCase())) { + return { status: "miss", reason: "row extractors are disabled" }; + } + + const url = firstCandidateUrl(input); + if (!url) return { status: "miss", reason: "no URL primary key or candidate URL" }; + + const repoRef = parseGitHubRepoUrl(url); + if (!repoRef) { + return { status: "miss", reason: `unsupported URL host: ${safeHost(url)}` }; + } + + try { + const facts = await extractGitHubRepoFacts( + url, + input.datasetId, + input.browserAttempts, + ); + const row = buildGitHubRow(input.columns, input.primaryKeys, facts); + if (!row) { + return { + status: "miss", + reason: "GitHub extractor could not satisfy all requested columns", + }; + } + + await convex.mutation(internal.datasetRows.insert, { + datasetId: input.datasetId, + data: row, + sources: [facts.url], + rowSummary: githubRowSummary(facts), + howFound: GITHUB_EXTRACTOR_HOW_FOUND, + }); + + return { + status: "inserted", + reason: "Inserted by GitHub row extractor", + rowSummary: githubRowSummary(facts), + sources: [facts.url], + }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + if (/duplicate/i.test(msg)) { + return { + status: "miss", + reason: `${msg} Move on to the next entity.`, + }; + } + return { status: "failed", reason: msg }; + } +} + +export async function tryRefreshRowExtractor( + input: TryRefreshRowExtractorInput, +): Promise { + if (!ENABLED_VALUES.has((process.env.ROW_EXTRACTORS_ENABLED ?? "").toLowerCase())) { + return { status: "miss", reason: "row extractors are disabled" }; + } + + const url = firstCandidateUrl(input); + if (!url) return { status: "miss", reason: "no URL primary key or candidate URL" }; + + const repoRef = parseGitHubRepoUrl(url); + if (!repoRef) { + return { status: "miss", reason: `unsupported URL host: ${safeHost(url)}` }; + } + + try { + const facts = await extractGitHubRepoFacts( + url, + input.datasetId, + input.browserAttempts, + ); + const row = buildGitHubRow(input.columns, input.primaryKeys, facts); + if (!row) { + return { + status: "miss", + reason: "GitHub extractor could not satisfy all requested columns", + }; + } + + const changedColumns = changedColumnNames(row, input.existingData, input.columns); + if (changedColumns.length === 0) { + return { + status: "unchanged", + reason: "Verified unchanged by GitHub row extractor", + rowSummary: githubRowSummary(facts), + sources: [facts.url], + }; + } + + await convex.mutation(internal.datasetRows.update, { + id: input.rowId, + expectedDatasetId: input.datasetId, + data: row, + sources: [facts.url], + rowSummary: githubRowSummary(facts), + howFound: GITHUB_REFRESH_HOW_FOUND, + }); + + return { + status: "updated", + reason: `Updated by GitHub row extractor (${changedColumns.join(", ")})`, + rowSummary: githubRowSummary(facts), + sources: [facts.url], + }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + return { status: "failed", reason: msg }; + } +} + +function firstCandidateUrl(input: TryRowExtractorInput): string | undefined { + const fromPrimaryKey = Object.values(input.primaryKeys).find((value) => + isHttpUrl(value), + ); + if (fromPrimaryKey) return normalizeUrl(fromPrimaryKey); + + const fromUrls = input.urls?.find(isHttpUrl); + if (fromUrls) return normalizeUrl(fromUrls); + + const fromContext = input.context?.match(/https?:\/\/[^\s)>"']+/i)?.[0]; + return fromContext ? normalizeUrl(fromContext) : undefined; +} + +function normalizeUrl(value: string): string { + return value.trim().replace(/[.,;:]+$/, ""); +} + +function isHttpUrl(value: string | undefined): value is string { + if (!value) return false; + try { + const parsed = new URL(normalizeUrl(value)); + return parsed.protocol === "http:" || parsed.protocol === "https:"; + } catch { + return false; + } +} + +function safeHost(value: string): string { + try { + return new URL(value).host; + } catch { + return "invalid-url"; + } +} + +function parseGitHubRepoUrl(value: string): { owner: string; repo: string } | null { + try { + const url = new URL(value); + if (!GITHUB_HOSTS.has(url.hostname.toLowerCase())) return null; + const [owner, repo] = url.pathname + .split("/") + .filter(Boolean) + .map((part) => part.trim()); + if (!owner || !repo) return null; + if (["orgs", "topics", "marketplace", "features"].includes(owner)) return null; + return { owner, repo: repo.replace(/\.git$/i, "") }; + } catch { + return null; + } +} + +async function extractGitHubRepoFacts( + url: string, + datasetId: string, + browserAttempts: number | undefined, +): Promise { + const apiKey = await getTinyFishApiKey(); + if (!apiKey) throw new Error("TINYFISH_API_KEY is not configured"); + + const attempts = normalizedBrowserAttempts(browserAttempts); + let lastError: unknown; + for (let attempt = 1; attempt <= attempts; attempt++) { + try { + return await extractGitHubRepoFactsOnce(apiKey, url, datasetId); + } catch (err) { + lastError = err; + if (getSignal(datasetId)?.aborted || attempt === attempts) break; + const msg = err instanceof Error ? err.message : String(err); + console.warn( + `[row_extractor] GitHub browser attempt ${attempt}/${attempts} failed; retrying: ${msg}`, + ); + } + } + + throw lastError instanceof Error ? lastError : new Error(String(lastError)); +} + +function normalizedBrowserAttempts(value: number | undefined): number { + if (typeof value !== "number" || !Number.isFinite(value)) { + return DEFAULT_BROWSER_ATTEMPTS; + } + return Math.min(10, Math.max(1, Math.trunc(value))); +} + +async function extractGitHubRepoFactsOnce( + apiKey: string, + url: string, + datasetId: string, +): Promise { + const session = await createTinyFishBrowserSession(apiKey, url, datasetId); + let browser: Browser | undefined; + try { + browser = await chromium.connectOverCDP(session.cdp_url, { + timeout: CDP_CONNECT_TIMEOUT_MS, + }); + const context = browser.contexts()[0] ?? (await browser.newContext()); + const page = context.pages()[0] ?? (await context.newPage()); + await page.goto(url, { + waitUntil: "domcontentloaded", + timeout: BROWSER_TIMEOUT_MS, + }); + await page.waitForLoadState("networkidle", { timeout: 10_000 }).catch(() => { + // GitHub may keep long-lived requests open. DOMContentLoaded is enough. + }); + return await readGitHubRepoFacts(page); + } finally { + await browser?.close().catch(() => undefined); + } +} + +async function createTinyFishBrowserSession( + apiKey: string, + url: string, + datasetId: string, +): Promise { + const response = await withRunTimeoutSignal(datasetId, FETCH_TIMEOUT_MS, (signal) => + fetch("https://agent.tinyfish.ai/v1/browser", { + method: "POST", + headers: { + ...tinyFishHeaders(apiKey), + "Content-Type": "application/json", + Accept: "application/json", + }, + body: JSON.stringify({ url }), + signal, + }), + ); + + if (!response.ok) { + const body = await response.text().catch(() => ""); + throw new Error( + `TinyFish Browser returned HTTP ${response.status}: ${body.slice(0, 200)}`, + ); + } + + const data = (await response.json()) as Partial; + if (!data.session_id || !data.cdp_url || !data.base_url) { + throw new Error("TinyFish Browser response did not include CDP connection details"); + } + + return { + session_id: data.session_id, + cdp_url: data.cdp_url, + base_url: data.base_url, + }; +} + +async function withRunTimeoutSignal( + datasetId: string, + timeoutMs: number, + operation: (signal: AbortSignal) => Promise, +): Promise { + const runSignal = getSignal(datasetId); + if (runSignal?.aborted) throw new DOMException("Run was stopped", "AbortError"); + + const controller = new AbortController(); + const timeout = setTimeout( + () => controller.abort(new DOMException("Timed out", "TimeoutError")), + timeoutMs, + ); + const abortFromRun = () => + controller.abort(runSignal?.reason ?? new DOMException("Run was stopped", "AbortError")); + + runSignal?.addEventListener("abort", abortFromRun, { once: true }); + try { + return await operation(controller.signal); + } finally { + clearTimeout(timeout); + runSignal?.removeEventListener("abort", abortFromRun); + } +} + +async function readGitHubRepoFacts(page: Page): Promise { + const url = page.url(); + const repoRef = parseGitHubRepoUrl(url); + if (!repoRef) throw new Error(`Not a GitHub repository page: ${url}`); + + const facts = (await page.evaluate(` + (() => { + const text = (selector) => + document.querySelector(selector)?.textContent?.trim() || undefined; + const attr = (selector, name) => + document.querySelector(selector)?.getAttribute(name) || undefined; + const firstCandidateText = (selector, predicate) => + Array.from(document.querySelectorAll(selector)) + .map((el) => el.textContent?.trim()) + .filter(Boolean) + .find((value) => !predicate || predicate(value)); + const language = () => + text("[itemprop=\\"programmingLanguage\\"]") ?? + text("a[href*=\\"search?l=\\"] span.color-fg-default.text-bold") ?? + text("a[href*=\\"search?l=\\"] .text-bold"); + const license = () => + firstCandidateText( + "a[href*=\\"LICENSE\\"], a[href*=\\"license\\"], [data-testid*=\\"license\\"]", + (value) => /licensed|MIT|Apache|BSD|GPL|MPL|ISC/i.test(value), + ) ?? + firstCandidateText( + "a[href*=\\"LICENSE\\"], a[href*=\\"license\\"], [data-testid*=\\"license\\"]", + (value) => !/^(license|view license)$/i.test(value), + ) ?? + firstCandidateText( + "a[href*=\\"LICENSE\\"], a[href*=\\"license\\"], [data-testid*=\\"license\\"]", + ) ?? + text("svg.octicon-law + span"); + const bodyText = document.body?.innerText ?? ""; + + return { + description: + text("[data-pjax=\\"#repo-content-pjax-container\\"] [itemprop=\\"about\\"]") ?? + text("[itemprop=\\"about\\"]") ?? + attr("meta[name='description']", "content"), + stars: + text("#repo-stars-counter-star") ?? + text("a[href$='/stargazers'] strong") ?? + text("a[href$='/stargazers']"), + forks: + text("#repo-network-counter") ?? + text("a[href$='/forks'] strong") ?? + text("a[href$='/forks']"), + watchers: + text("a[href$='/watchers'] strong") ?? + text("a[href$='/watchers']"), + issues: + text("#issues-tab span.Counter") ?? + text("a[href$=\\"/issues\\"] span.Counter") ?? + text("a[data-tab-item=\\"i1issues-tab\\"] span.Counter"), + pullRequests: + text("#pull-requests-tab span.Counter") ?? + text("a[href$=\\"/pulls\\"] span.Counter") ?? + text("a[data-tab-item=\\"i2pull-requests-tab\\"] span.Counter"), + language: language(), + license: license(), + latestCommitAt: + attr("relative-time[datetime]", "datetime") ?? + attr("time-ago[datetime]", "datetime"), + homepage: attr("[itemprop='url']", "href"), + archived: /This repository has been archived/i.test(bodyText), + }; + })() + `)) as RawGitHubRepoDomFacts; + + const apiFacts = await fetchGitHubApiFacts(page, repoRef.owner, repoRef.repo).catch( + () => undefined, + ); + + return { + owner: repoRef.owner, + repo: repoRef.repo, + fullName: `${repoRef.owner}/${repoRef.repo}`, + url, + description: apiFacts?.description ?? cleanOptionalText(facts.description), + stars: apiFacts?.stars ?? parseCompactNumber(facts.stars), + forks: apiFacts?.forks ?? parseCompactNumber(facts.forks), + watchers: apiFacts?.watchers ?? parseCompactNumber(facts.watchers), + issues: parseCompactNumber(facts.issues) ?? apiFacts?.issues, + pullRequests: parseCompactNumber(facts.pullRequests) ?? apiFacts?.pullRequests, + language: apiFacts?.language ?? cleanOptionalText(facts.language), + license: apiFacts?.license ?? cleanOptionalText(facts.license), + latestCommitAt: apiFacts?.latestCommitAt ?? facts.latestCommitAt, + updatedAt: apiFacts?.updatedAt, + createdAt: apiFacts?.createdAt, + homepage: apiFacts?.homepage ?? cleanOptionalText(facts.homepage), + archived: apiFacts?.archived ?? facts.archived, + }; +} + +async function fetchGitHubApiFacts( + page: Page, + owner: string, + repo: string, +): Promise> { + const response = await page.request.get( + `https://api.github.com/repos/${encodeURIComponent(owner)}/${encodeURIComponent(repo)}`, + { + headers: { + Accept: "application/vnd.github+json", + }, + timeout: FETCH_TIMEOUT_MS, + }, + ); + if (!response.ok()) { + throw new Error(`GitHub API returned HTTP ${response.status()}`); + } + + const data = (await response.json()) as { + description?: string | null; + stargazers_count?: number; + forks_count?: number; + watchers_count?: number; + open_issues_count?: number; + language?: string | null; + license?: { spdx_id?: string | null; name?: string | null } | null; + pushed_at?: string | null; + updated_at?: string | null; + created_at?: string | null; + homepage?: string | null; + archived?: boolean; + html_url?: string; + }; + + return { + url: data.html_url, + description: data.description ?? undefined, + stars: data.stargazers_count, + forks: data.forks_count, + watchers: data.watchers_count, + issues: data.open_issues_count, + language: data.language ?? undefined, + license: data.license?.spdx_id || data.license?.name || undefined, + latestCommitAt: data.pushed_at ?? undefined, + updatedAt: data.updated_at ?? undefined, + createdAt: data.created_at ?? undefined, + homepage: data.homepage || undefined, + archived: data.archived, + }; +} + +function buildGitHubRow( + columns: PopulateColumn[], + primaryKeys: Record, + facts: GitHubRepoFacts, +): Record | null { + const row: Record = {}; + + for (const column of columns) { + const pkValue = findPrimaryKeyValue(column.name, primaryKeys); + const rawValue = pkValue ?? valueForGitHubColumn(column.name, facts); + const value = coerceColumnValue(rawValue, column); + if (value === undefined) return null; + row[column.name] = value; + } + + return row; +} + +function githubRowSummary(facts: GitHubRepoFacts): string { + return facts.description ? `${facts.fullName}: ${facts.description}` : facts.fullName; +} + +function changedColumnNames( + nextRow: Record, + existingData: Record, + columns: PopulateColumn[], +): string[] { + return columns + .filter((column) => !valuesEqualForColumn(nextRow[column.name], existingData[column.name], column)) + .map((column) => column.name); +} + +function valuesEqualForColumn( + nextValue: string | number | boolean | undefined, + existingValue: unknown, + column: PopulateColumn, +): boolean { + if (nextValue === undefined) return existingValue === undefined || existingValue === ""; + + switch (column.type) { + case "number": { + const existingNumber = + typeof existingValue === "number" + ? existingValue + : Number(String(existingValue ?? "").replace(/,/g, "")); + return Number.isFinite(existingNumber) && existingNumber === nextValue; + } + case "boolean": + if (typeof existingValue === "boolean") return existingValue === nextValue; + if (/^(true|yes)$/i.test(String(existingValue))) return nextValue === true; + if (/^(false|no)$/i.test(String(existingValue))) return nextValue === false; + return false; + case "date": { + const nextDate = new Date(String(nextValue)); + const existingDate = new Date(String(existingValue ?? "")); + return ( + Number.isFinite(nextDate.getTime()) && + Number.isFinite(existingDate.getTime()) && + nextDate.getTime() === existingDate.getTime() + ); + } + case "url": + case "text": + return String(existingValue ?? "").trim() === String(nextValue).trim(); + } +} + +function findPrimaryKeyValue( + columnName: string, + primaryKeys: Record, +): string | undefined { + if (primaryKeys[columnName]) return primaryKeys[columnName]; + const normalizedColumn = normalizeFieldName(columnName); + const entry = Object.entries(primaryKeys).find( + ([key]) => normalizeFieldName(key) === normalizedColumn, + ); + return entry?.[1]; +} + +function valueForGitHubColumn( + columnName: string, + facts: GitHubRepoFacts, +): string | number | boolean | undefined { + const normalized = normalizeFieldName(columnName); + if (matches(normalized, ["repository_url", "repo_url", "github_url", "url", "link"])) { + return facts.url; + } + if (matches(normalized, ["repository_name", "repo_name"])) { + return facts.fullName; + } + if (matches(normalized, ["repository", "repo", "name"])) { + return facts.repo; + } + if (matches(normalized, ["full_name", "repository_full_name", "repo_full_name"])) { + return facts.fullName; + } + if (matches(normalized, ["owner", "organization", "org", "user"])) { + return facts.owner; + } + if (matches(normalized, ["description", "summary", "about"])) { + return facts.description; + } + if (matches(normalized, ["stars", "star_count", "stargazers", "stargazer_count"])) { + return facts.stars; + } + if (matches(normalized, ["forks", "fork_count"])) { + return facts.forks; + } + if (matches(normalized, ["watchers", "watcher_count"])) { + return facts.watchers; + } + if (matches(normalized, ["issues", "open_issues", "open_issue_count"])) { + return facts.issues; + } + if (matches(normalized, ["pull_requests", "open_pull_requests", "prs", "open_prs", "pr_count", "open_pr_count"])) { + return facts.pullRequests; + } + if (matches(normalized, ["language", "primary_language"])) { + return facts.language; + } + if (matches(normalized, ["license", "license_type", "license_spdx"])) { + return facts.license; + } + if (matches(normalized, ["latest_commit", "latest_commit_at", "last_commit", "pushed_at", "activity", "last_activity"])) { + return facts.latestCommitAt; + } + if (matches(normalized, ["updated", "updated_at", "last_updated"])) { + return facts.updatedAt; + } + if (matches(normalized, ["created", "created_at"])) { + return facts.createdAt; + } + if (matches(normalized, ["homepage", "website", "site"])) { + return facts.homepage; + } + if (matches(normalized, ["archived", "is_archived"])) { + return facts.archived; + } + return undefined; +} + +function coerceColumnValue( + value: string | number | boolean | undefined, + column: PopulateColumn, +): string | number | boolean | undefined { + if (value === undefined || value === "") return undefined; + switch (column.type) { + case "number": { + if (typeof value === "number") return Number.isFinite(value) ? value : undefined; + const parsed = Number(String(value).replace(/,/g, "")); + return Number.isFinite(parsed) ? parsed : undefined; + } + case "boolean": + if (typeof value === "boolean") return value; + if (/^(true|yes)$/i.test(String(value))) return true; + if (/^(false|no)$/i.test(String(value))) return false; + return undefined; + case "url": + return isHttpUrl(String(value)) ? normalizeUrl(String(value)) : undefined; + case "date": { + const date = new Date(String(value)); + return Number.isNaN(date.getTime()) ? undefined : date.toISOString(); + } + case "text": + return String(value).trim(); + } +} + +function parseCompactNumber(value: string | undefined): number | undefined { + if (!value) return undefined; + const match = value.replace(/,/g, "").match(/([\d.]+)\s*([kmb])?/i); + if (!match) return undefined; + const base = Number(match[1]); + if (!Number.isFinite(base)) return undefined; + const suffix = match[2]?.toLowerCase(); + const multiplier = suffix === "k" ? 1_000 : suffix === "m" ? 1_000_000 : suffix === "b" ? 1_000_000_000 : 1; + return Math.round(base * multiplier); +} + +function cleanOptionalText(value: string | undefined | null): string | undefined { + const trimmed = value?.trim(); + return trimmed || undefined; +} + +function normalizeFieldName(value: string): string { + return value + .trim() + .toLowerCase() + .replace(/[^a-z0-9]+/g, "_") + .replace(/^_+|_+$/g, ""); +} + +function matches(value: string, candidates: string[]): boolean { + return candidates.includes(value); +} diff --git a/frontend/app/dashboard/settings/models/page.tsx b/frontend/app/dashboard/settings/models/page.tsx index 6a3acb4..48099a1 100644 --- a/frontend/app/dashboard/settings/models/page.tsx +++ b/frontend/app/dashboard/settings/models/page.tsx @@ -12,6 +12,7 @@ import { ModelSideSheet } from "@/components/settings/ModelSideSheet"; import { MODEL_ROLES, type ModelRole } from "@/components/settings/types"; import { SkeletonList } from "@/components/settings/Skeleton"; import { useAppAuth } from "@/lib/app-auth"; +import { isLocalMode } from "@/lib/app-mode"; export default function ModelSettingsPage() { const { getToken } = useAppAuth(); @@ -23,6 +24,9 @@ export default function ModelSettingsPage() { const [sheetModels, setSheetModels] = useState([]); const [activeSheet, setActiveSheet] = useState<{ role: ModelRole } | null>(null); const [isSavingModel, setIsSavingModel] = useState(false); + const [isSavingExtractorSettings, setIsSavingExtractorSettings] = useState(false); + const [extractorConcurrency, setExtractorConcurrency] = useState(5); + const [extractorAttempts, setExtractorAttempts] = useState(2); const isLoading = convexModels === undefined || isLoadingConfig; @@ -32,7 +36,11 @@ export default function ModelSettingsPage() { if (!token) throw new Error("Not authenticated"); return getModelConfig(token); }) - .then((config) => setEffectiveConfig(config)) + .then((config) => { + setEffectiveConfig(config); + setExtractorConcurrency(config.rowExtractorConcurrency); + setExtractorAttempts(config.rowExtractorBrowserAttempts); + }) .catch(() => setEffectiveConfig(null)) .finally(() => setIsLoadingConfig(false)); }, [getToken]); @@ -48,7 +56,7 @@ export default function ModelSettingsPage() { : []; function getSelectedModel(role: ModelRole): string { - return effectiveConfig?.[role.key as keyof typeof effectiveConfig] ?? ""; + return String(effectiveConfig?.[role.key as keyof typeof effectiveConfig] ?? ""); } async function handleModelSelect(role: ModelRole, model: OpenRouterModel) { @@ -68,6 +76,34 @@ export default function ModelSettingsPage() { } } + async function saveExtractorSettings() { + setIsSavingExtractorSettings(true); + try { + const token = await getToken(); + if (!token) throw new Error("Not authenticated"); + await saveModelConfig( + { + rowExtractorConcurrency: extractorConcurrency, + rowExtractorBrowserAttempts: extractorAttempts, + }, + token, + ); + setEffectiveConfig((prev: EffectiveModelConfig | null) => + prev + ? { + ...prev, + rowExtractorConcurrency: extractorConcurrency, + rowExtractorBrowserAttempts: extractorAttempts, + } + : prev, + ); + } catch { + // we will add toast later + } finally { + setIsSavingExtractorSettings(false); + } + } + function openSideSheet(role: ModelRole) { if (sheetModels.length === 0) { getOpenRouterModels() @@ -139,6 +175,72 @@ export default function ModelSettingsPage() { )) )} + + {isLocalMode && !isLoading && ( +
+ + +
+ + + +
+ +
+ +
+
+ )} {activeSheet && ( diff --git a/frontend/convex/modelConfig.ts b/frontend/convex/modelConfig.ts index e10fd3d..4b0835c 100644 --- a/frontend/convex/modelConfig.ts +++ b/frontend/convex/modelConfig.ts @@ -30,6 +30,8 @@ export const upsert = mutation({ schemaInference: v.optional(v.string()), populateOrchestrator: v.optional(v.string()), investigateSubagent: v.optional(v.string()), + rowExtractorConcurrency: v.optional(v.number()), + rowExtractorBrowserAttempts: v.optional(v.number()), }, handler: async (ctx, args) => { const identity = await getIdentity(ctx); @@ -43,10 +45,18 @@ export const upsert = mutation({ if (existing) { // Partial update — only touch fields that were explicitly provided. // Omitting a field preserves its current database value. - const patch: Record = {}; + const patch: { + schemaInference?: string; + populateOrchestrator?: string; + investigateSubagent?: string; + rowExtractorConcurrency?: number; + rowExtractorBrowserAttempts?: number; + } = {}; if (args.schemaInference !== undefined) patch.schemaInference = args.schemaInference; if (args.populateOrchestrator !== undefined) patch.populateOrchestrator = args.populateOrchestrator; if (args.investigateSubagent !== undefined) patch.investigateSubagent = args.investigateSubagent; + if (args.rowExtractorConcurrency !== undefined) patch.rowExtractorConcurrency = args.rowExtractorConcurrency; + if (args.rowExtractorBrowserAttempts !== undefined) patch.rowExtractorBrowserAttempts = args.rowExtractorBrowserAttempts; await ctx.db.patch(existing._id, patch); } else { // First-time save — build insert object from provided fields only. @@ -56,10 +66,14 @@ export const upsert = mutation({ schemaInference?: string; populateOrchestrator?: string; investigateSubagent?: string; + rowExtractorConcurrency?: number; + rowExtractorBrowserAttempts?: number; } = { userId: identity.subject }; if (args.schemaInference !== undefined) insert.schemaInference = args.schemaInference; if (args.populateOrchestrator !== undefined) insert.populateOrchestrator = args.populateOrchestrator; if (args.investigateSubagent !== undefined) insert.investigateSubagent = args.investigateSubagent; + if (args.rowExtractorConcurrency !== undefined) insert.rowExtractorConcurrency = args.rowExtractorConcurrency; + if (args.rowExtractorBrowserAttempts !== undefined) insert.rowExtractorBrowserAttempts = args.rowExtractorBrowserAttempts; await ctx.db.insert("modelConfig", insert); } }, @@ -88,6 +102,8 @@ export const upsertInternal = internalMutation({ schemaInference: v.optional(v.string()), populateOrchestrator: v.optional(v.string()), investigateSubagent: v.optional(v.string()), + rowExtractorConcurrency: v.optional(v.number()), + rowExtractorBrowserAttempts: v.optional(v.number()), }, handler: async (ctx, args) => { const existing = await ctx.db @@ -95,10 +111,18 @@ export const upsertInternal = internalMutation({ .withIndex("by_user", (q) => q.eq("userId", args.userId)) .first(); - const patch: Record = {}; + const patch: { + schemaInference?: string; + populateOrchestrator?: string; + investigateSubagent?: string; + rowExtractorConcurrency?: number; + rowExtractorBrowserAttempts?: number; + } = {}; if (args.schemaInference !== undefined) patch.schemaInference = args.schemaInference; if (args.populateOrchestrator !== undefined) patch.populateOrchestrator = args.populateOrchestrator; if (args.investigateSubagent !== undefined) patch.investigateSubagent = args.investigateSubagent; + if (args.rowExtractorConcurrency !== undefined) patch.rowExtractorConcurrency = args.rowExtractorConcurrency; + if (args.rowExtractorBrowserAttempts !== undefined) patch.rowExtractorBrowserAttempts = args.rowExtractorBrowserAttempts; if (existing) { await ctx.db.patch(existing._id, patch); diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index 83ea007..3427ed7 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -138,6 +138,8 @@ export default defineSchema({ schemaInference: v.optional(v.string()), populateOrchestrator: v.optional(v.string()), investigateSubagent: v.optional(v.string()), + rowExtractorConcurrency: v.optional(v.number()), + rowExtractorBrowserAttempts: v.optional(v.number()), }).index("by_user", ["userId"]), localCredentials: defineTable({ diff --git a/frontend/lib/backend.ts b/frontend/lib/backend.ts index f5ea3e5..44310a9 100644 --- a/frontend/lib/backend.ts +++ b/frontend/lib/backend.ts @@ -43,6 +43,8 @@ export interface EffectiveModelConfig { schemaInference: string; populateOrchestrator: string; investigateSubagent: string; + rowExtractorConcurrency: number; + rowExtractorBrowserAttempts: number; } /** @@ -53,6 +55,8 @@ export interface SavedModelConfig { schemaInference: string | null; populateOrchestrator: string | null; investigateSubagent: string | null; + rowExtractorConcurrency: number | null; + rowExtractorBrowserAttempts: number | null; } export interface OpenRouterModel { @@ -82,6 +86,48 @@ export interface LocalSetupStatus { const BACKEND_URL = process.env.NEXT_PUBLIC_BACKEND_URL || "http://localhost:3501"; +const DEFAULT_ROW_EXTRACTOR_CONCURRENCY = 5; +const DEFAULT_ROW_EXTRACTOR_BROWSER_ATTEMPTS = 2; + +function normalizeIntegerSetting( + value: unknown, + fallback: number, + min: number, + max: number, +): number { + const parsed = typeof value === "number" ? value : Number(value); + if (!Number.isFinite(parsed)) return fallback; + return Math.min(max, Math.max(min, Math.trunc(parsed))); +} + +function normalizeEffectiveModelConfig( + config: Partial | null | undefined, +): EffectiveModelConfig { + return { + schemaInference: + typeof config?.schemaInference === "string" ? config.schemaInference : "", + populateOrchestrator: + typeof config?.populateOrchestrator === "string" + ? config.populateOrchestrator + : "", + investigateSubagent: + typeof config?.investigateSubagent === "string" + ? config.investigateSubagent + : "", + rowExtractorConcurrency: normalizeIntegerSetting( + config?.rowExtractorConcurrency, + DEFAULT_ROW_EXTRACTOR_CONCURRENCY, + 1, + 100, + ), + rowExtractorBrowserAttempts: normalizeIntegerSetting( + config?.rowExtractorBrowserAttempts, + DEFAULT_ROW_EXTRACTOR_BROWSER_ATTEMPTS, + 1, + 10, + ), + }; +} async function errorMessage(res: Response): Promise { const body = await res.json().catch(() => null); @@ -177,7 +223,7 @@ export async function getModelConfig(token: string): Promise