diff --git a/backend/src/index.ts b/backend/src/index.ts index e4e6155..f3eb7e0 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -25,9 +25,18 @@ type DatasetPopulateBeginOutcome = | "started" | "not_found" | "forbidden" - | "already_building"; + | "already_building" + | "already_updating"; type PopulateWorkflowRun = Awaited>; +type DatasetUpdateBeginOutcome = + | "started" + | "not_found" + | "forbidden" + | "already_building" + | "already_updating"; +type UpdateWorkflowRun = Awaited>; + function statusErrorMessage(err: unknown): string { const message = err instanceof Error ? err.message : String(err); return message.slice(0, 500); @@ -64,6 +73,7 @@ async function sendDatasetReadyNotification({ datasetId, datasetName, rowCount, + workflowType = "populate", }: { logger: FastifyBaseLogger; clerk: ClerkClient; @@ -71,12 +81,13 @@ async function sendDatasetReadyNotification({ datasetId: string; datasetName: string; rowCount: number; + workflowType?: "populate" | "update"; }): Promise { const baseProps = { datasetId, datasetName, rowCount, - workflowType: "populate" as const, + workflowType, }; try { @@ -130,6 +141,106 @@ async function sendDatasetReadyNotification({ } } +async function beginDatasetUpdate( + datasetId: string, + ownerId: string, +): Promise { + const claim = await convex.mutation(internal.datasets.beginUpdateInternal, { + id: datasetId, + ownerId, + }); + return claim.outcome; +} + +async function runUpdateWorkflowInBackground({ + input, + run, + authorizedUserId, + logger, + clerk, +}: { + input: DatasetContext; + run: UpdateWorkflowRun; + authorizedUserId: string; + logger: FastifyBaseLogger; + clerk: ClerkClient; +}): Promise { + const datasetId = input.datasetId; + + try { + const result = await run.start({ + inputData: { + ...input, + authContext: { + authorizedUserId, + workflowRunId: run.runId, + }, + }, + }); + + logger.info( + { + workflowStatus: result.status, + steps: JSON.stringify(result.steps).slice(0, 2000), + }, + "Update workflow completed", + ); + + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + const currentDataset = await convex.query(internal.datasets.getInternal, { + id: datasetId, + }); + if (!currentDataset) { + logger.info( + { datasetId }, + "Dataset no longer exists post-update; skipping status transition", + ); + return; + } + + await setDatasetPopulateStatus(datasetId, "live"); + + const rowCount = await convex.query( + internal.datasetRows.countByDataset, + { datasetId }, + ); + await sendDatasetReadyNotification({ + logger, + clerk, + userId: authorizedUserId, + datasetId, + datasetName: currentDataset.name, + rowCount, + workflowType: "update", + }); + } catch (err) { + const lastStatusError = statusErrorMessage(err); + logger.error({ err, datasetId }, "Update background workflow failed"); + + try { + const currentDataset = await convex.query(internal.datasets.getInternal, { + id: datasetId, + }); + if (!currentDataset) { + logger.info( + { datasetId }, + "Dataset no longer exists after failed update; skipping failed status transition", + ); + return; + } + await setDatasetPopulateStatus(datasetId, "failed", lastStatusError); + } catch (statusErr) { + logger.error( + { err: statusErr, datasetId }, + "Failed to transition dataset status to 'failed' after update", + ); + } + } +} + async function runPopulateWorkflowInBackground({ input, run, @@ -350,34 +461,45 @@ await fastify.register(async (instance) => { return reply.code(401).send({ error: "Authentication required" }); } - const dataset = await convex.query(internal.datasets.getInternal, { - id: parsed.data.datasetId, - }); - if (!dataset) { + const updateOutcome = await beginDatasetUpdate( + parsed.data.datasetId, + auth.userId, + ); + + if (updateOutcome === "not_found") { return reply.code(404).send({ error: "Dataset not found" }); } - if (dataset.ownerId !== auth.userId) { + if (updateOutcome === "forbidden") { return reply.code(403).send({ error: "Not authorized to update this dataset" }); } + if (updateOutcome === "already_building") { + return reply.code(409).send({ error: "Dataset is being populated" }); + } + if (updateOutcome === "already_updating") { + return reply.code(409).send({ error: "Dataset is already being updated" }); + } + if (updateOutcome !== "started") { + throw new Error(`Unexpected update claim outcome: ${updateOutcome}`); + } - const run = await updateWorkflow.createRun(); - const result = await run.start({ - inputData: { - ...parsed.data, - authContext: { - authorizedUserId: auth.userId, - workflowRunId: run.runId, - }, - }, - }); - - req.log.info({ workflowStatus: result.status }, "Update workflow completed"); - - if (result.status !== "success") { - throw new Error(`Workflow ended with status: ${result.status}`); + let run: UpdateWorkflowRun; + try { + run = await updateWorkflow.createRun(); + } catch (runErr) { + req.log.error(runErr, "Failed to create update workflow run; reverting dataset status"); + await setDatasetPopulateStatus(parsed.data.datasetId, "live"); + return reply.code(502).send({ error: "Failed to update dataset. Please try again." }); } - return { success: true, result: result.result }; + void runUpdateWorkflowInBackground({ + input: parsed.data, + run, + authorizedUserId: auth.userId, + logger: req.log, + clerk: req.server.clerk, + }); + + return reply.code(202).send({ success: true, runId: run.runId }); } catch (err) { const msg = err instanceof Error ? err.message : String(err); if (msg.includes("validator") || msg.includes("Invalid")) { diff --git a/backend/src/mastra/agents/investigate.ts b/backend/src/mastra/agents/investigate.ts index ad8ce1b..10faaae 100644 --- a/backend/src/mastra/agents/investigate.ts +++ b/backend/src/mastra/agents/investigate.ts @@ -33,19 +33,19 @@ RULES: TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces: search_web: {"query": "your search terms"} fetch_page: {"url": "https://example.com"} - insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}} + insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"} WORKFLOW: 1. Fetch 1-2 of the provided URLs to get real data (if URLs were given). 2. If you need more, run ONE search and fetch the best result. 3. Call insert_row with whatever real data you have. Use "" for missing fields. + Include "sources" (URLs you fetched), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "", 2. Look for the pricing field, and title name field, 3. etc...) 4. Write your final response: INSERTED: true/false SUMMARY: one line CLUES: hints for finding more entities REASON: why you succeeded or what was missing - -You are scoped to ONE dataset. Do not pass a datasetId to any tool.`; +`; } /** diff --git a/backend/src/mastra/agents/refresh.ts b/backend/src/mastra/agents/refresh.ts new file mode 100644 index 0000000..2215686 --- /dev/null +++ b/backend/src/mastra/agents/refresh.ts @@ -0,0 +1,74 @@ +import { Agent } from "@mastra/core/agent"; +import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { buildPopulateTools } from "../tools/dataset-tools.js"; +import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; +import type { AuthContext } from "../workflows/populate.js"; +import type { PopulateColumn } from "../../pipeline/populate.js"; + +const openrouter = createOpenRouter({ + apiKey: process.env.OPENROUTER_API_KEY!, +}); + +function buildRefreshInstructions(columns: PopulateColumn[]): string { + const columnNames = columns.map((c) => c.name); + const columnsDesc = columns + .map( + (c) => + `- "${c.name}" (${c.type})${c.isPrimaryKey ? " [PRIMARY KEY]" : ""}${c.description ? `: ${c.description}` : ""}`, + ) + .join("\n"); + + return `You are refreshing data for one existing row. Be fast — you have very few steps. + +Columns: +${columnsDesc} + +RULES: +- You have at most 6 tool calls total. +- Start by following the steps in the "Previously found via" section — it describes exactly how the data was originally extracted (which URLs to fetch, which fields to look for). Reproduce those steps to get fresh data. +- If no "Previously found via" steps are provided, fall back to fetching the source URLs directly. +- If a source returns a 404, timeout, or is blocked, note it and move to the next. +- Compare the fetched data with the existing row data carefully. +- If data has MEANINGFULLY changed (not just formatting differences), call update_row with the FULL updated data object (all columns, not just changed ones), plus updated sources, row_summary, and how_found. +- If NO sources work (all 404/blocked), try ONE web search using the primary key values to find a current source. +- If the data is unchanged, do NOT call update_row. Just report your findings. +- Never fabricate values. If you can't verify a field, keep the existing value. + +TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces: + fetch_page: {"url": "https://example.com"} + search_web: {"query": "your search terms"} + update_row: {"rowId": "", "data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://..."], "row_summary": "one line about this entity", "how_found": "how you verified this data"} + +WORKFLOW: +1. Fetch the provided source URLs (1-2 calls). +2. Compare fetched data with existing row data. +3. If changed: call update_row with the full updated data. + If sources broken: try ONE search using primary key values, then fetch the best result. +4. Write your final response: + UPDATED: true/false + CHANGES: what changed (or "no changes") + REASON: why you updated or didn't +`; +} + +export function buildRefreshAgent( + authorizedDatasetId: string, + authContext: AuthContext, + columns: PopulateColumn[], +): Agent { + const { update_row } = buildPopulateTools( + authorizedDatasetId, + authContext, + ); + return new Agent({ + id: "refresh-agent", + name: "Dataset Refresh Agent", + instructions: buildRefreshInstructions(columns), + model: openrouter("qwen/qwen3.7-max"), + tools: { + update_row, + search_web: searchWebTool, + fetch_page: fetchPageTool, + }, + }); +} diff --git a/backend/src/mastra/tools/dataset-tools.ts b/backend/src/mastra/tools/dataset-tools.ts index a3ecac8..1fc016e 100644 --- a/backend/src/mastra/tools/dataset-tools.ts +++ b/backend/src/mastra/tools/dataset-tools.ts @@ -127,9 +127,21 @@ export function buildPopulateTools( "Insert a single row into the dataset you are populating. Call this each time you have a row ready — don't wait to batch them.", inputSchema: z.object({ data: z.record(z.string(), z.any()), + sources: z + .array(z.string()) + .optional() + .describe("URLs you visited or used to gather data for this row"), + row_summary: z + .string() + .optional() + .describe("One-line summary of this entity"), + how_found: z + .string() + .optional() + .describe("Brief description of how you found and verified this data"), }), outputSchema: writeResultSchema, - execute: async ({ data }) => { + execute: async ({ data, sources, row_summary, how_found }) => { if (!data || Object.keys(data).length === 0) return { success: false, @@ -139,12 +151,15 @@ export function buildPopulateTools( const cleanedData = cleanDataKeys(data); console.log( - `[insert_row] ${logCtx} cols=${Object.keys(cleanedData).length}`, + `[insert_row] ${logCtx} cols=${Object.keys(cleanedData).length} sources=${sources?.length ?? 0}`, ); try { await convex.mutation(internal.datasetRows.insert, { datasetId: authorizedDatasetId, data: cleanedData, + ...(sources !== undefined ? { sources } : {}), + ...(row_summary !== undefined ? { rowSummary: row_summary } : {}), + ...(how_found !== undefined ? { howFound: how_found } : {}), }); return { success: true }; } catch (err) { @@ -246,9 +261,21 @@ export function buildPopulateTools( inputSchema: z.object({ rowId: z.string(), data: z.record(z.string(), z.any()), + sources: z + .array(z.string()) + .optional() + .describe("Updated source URLs where this data was verified"), + row_summary: z + .string() + .optional() + .describe("Updated one-line summary of this entity"), + how_found: z + .string() + .optional() + .describe("Brief description of how the updated data was found"), }), outputSchema: writeResultSchema, - execute: async ({ rowId, data }) => { + execute: async ({ rowId, data, sources, row_summary, how_found }) => { if (!rowId) return { success: false, error: "rowId is required." }; if (!data || Object.keys(data).length === 0) return { @@ -261,13 +288,13 @@ export function buildPopulateTools( `[update_row] ${logCtx} row=${rowId} cols=${Object.keys(cleanedData).length}`, ); try { - // expectedDatasetId pins the Convex-side atomic capability check. - // If `rowId` belongs to another dataset, the mutation throws - // "Row not found" — uniform with the get_row policy. await convex.mutation(internal.datasetRows.update, { id: rowId, expectedDatasetId: authorizedDatasetId, data: cleanedData, + ...(sources !== undefined ? { sources } : {}), + ...(row_summary !== undefined ? { rowSummary: row_summary } : {}), + ...(how_found !== undefined ? { howFound: how_found } : {}), }); return { success: true }; } catch (err) { diff --git a/backend/src/mastra/workflows/update.ts b/backend/src/mastra/workflows/update.ts index b320bd6..c5d44cf 100644 --- a/backend/src/mastra/workflows/update.ts +++ b/backend/src/mastra/workflows/update.ts @@ -1,6 +1,8 @@ import { createStep, createWorkflow } from "@mastra/core/workflows"; import { z } from "zod"; -import { datasetContextSchema } from "../../pipeline/populate.js"; +import { datasetContextSchema, populateColumnSchema } from "../../pipeline/populate.js"; +import { convex, internal } from "../../convex.js"; +import { buildRefreshAgent } from "../agents/refresh.js"; import { authContextSchema } from "./populate.js"; export const updateInputSchema = datasetContextSchema.extend({ @@ -8,22 +10,167 @@ export const updateInputSchema = datasetContextSchema.extend({ }); export type UpdateInput = z.infer; -const updateStep = createStep({ - id: "update-dataset", +const rowSchema = z.object({ + _id: z.string(), + data: z.record(z.string(), z.any()), + sources: z.array(z.string()).optional(), + rowSummary: z.string().optional(), + howFound: z.string().optional(), +}); + +const markAndFetchOutputSchema = updateInputSchema.extend({ + rows: z.array(rowSchema), +}); + +const markAndFetchStep = createStep({ + id: "mark-and-fetch", inputSchema: updateInputSchema, - outputSchema: z.object({ message: z.string() }), + outputSchema: markAndFetchOutputSchema, execute: async ({ inputData }) => { + const selective = inputData.rowIds && inputData.rowIds.length > 0; console.log( - `[update-dataset] triggered dataset=${inputData.datasetId} user=${inputData.authContext.authorizedUserId} run=${inputData.authContext.workflowRunId}`, + `[mark-and-fetch] Marking ${selective ? inputData.rowIds!.length : "all"} rows for dataset ${inputData.datasetId}`, ); - return { message: "Update workflow triggered — logic not yet implemented." }; + + const markedCount = await convex.mutation(internal.datasetRows.markForUpdate, { + datasetId: inputData.datasetId, + ...(selective ? { rowIds: inputData.rowIds } : {}), + }); + + const rawRows = await convex.query(internal.datasetRows.listInternal, { + datasetId: inputData.datasetId, + }); + + let rows = (rawRows as Record[]).map((r) => ({ + _id: String(r._id), + data: (r.data ?? {}) as Record, + sources: r.sources as string[] | undefined, + rowSummary: r.rowSummary as string | undefined, + howFound: r.howFound as string | undefined, + })); + + if (selective) { + const selectedSet = new Set(inputData.rowIds); + rows = rows.filter((r) => selectedSet.has(r._id)); + } + + console.log(`[mark-and-fetch] Marked ${markedCount}, processing ${rows.length} rows`); + return { ...inputData, rows }; + }, +}); + +const refreshOutputSchema = z.object({ + updatedCount: z.number(), + totalCount: z.number(), + errors: z.number(), +}); + +const MAX_CONCURRENT = 5; + +async function processWithConcurrency( + items: T[], + handler: (item: T) => Promise, + max: number, +): Promise { + let idx = 0; + const workers = Array.from( + { length: Math.min(max, items.length) }, + async () => { + while (idx < items.length) { + const i = idx++; + await handler(items[i]); + } + }, + ); + await Promise.allSettled(workers); +} + +const refreshRowsStep = createStep({ + id: "refresh-rows", + inputSchema: markAndFetchOutputSchema, + outputSchema: refreshOutputSchema, + execute: async ({ inputData }) => { + const { datasetId, columns, authContext, rows } = inputData; + let updatedCount = 0; + let errors = 0; + + const pkColumns = columns.filter((c) => c.isPrimaryKey); + + async function processRow(row: z.infer) { + try { + const agent = buildRefreshAgent(datasetId, authContext, columns); + + const pkBlock = + pkColumns.length > 0 + ? pkColumns + .map((c) => `- ${c.name}: ${row.data[c.name] ?? ""}`) + .join("\n") + : "(no primary keys defined)"; + const existingDataBlock = Object.entries(row.data) + .map(([k, v]) => `- ${k}: ${v}`) + .join("\n"); + const sourcesBlock = + row.sources && row.sources.length > 0 + ? `\nSource URLs to check:\n${row.sources.map((s) => `- ${s}`).join("\n")}` + : "\nNo source URLs recorded — search the web using the primary key values."; + + const prompt = `Refresh this existing row and update it if the data has changed. + +Row ID: ${row._id} + +Primary keys: +${pkBlock} + +Existing data: +${existingDataBlock} +${sourcesBlock} +${row.rowSummary ? `\nPrevious summary: ${row.rowSummary}` : ""} +${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`; + + const result = await agent.generate(prompt, { maxSteps: 10 }); + const text = result.text.toLowerCase(); + if (text.includes("updated: true")) { + updatedCount++; + } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error( + `[refresh-rows] Row ${row._id} failed: ${msg}`, + ); + errors++; + } finally { + try { + await convex.mutation(internal.datasetRows.clearUpdateStatus, { + id: row._id, + expectedDatasetId: datasetId, + }); + } catch (cleanupErr) { + const cleanupMsg = cleanupErr instanceof Error ? cleanupErr.message : String(cleanupErr); + if (/not found/i.test(cleanupMsg)) { + return; + } + console.error(`[refresh-rows] Failed to clear update status for row ${row._id}: ${cleanupMsg}`); + } + } + } + + console.log( + `[refresh-rows] Processing ${rows.length} rows (max ${MAX_CONCURRENT} concurrent)`, + ); + await processWithConcurrency(rows, processRow, MAX_CONCURRENT); + console.log( + `[refresh-rows] Done: ${updatedCount} updated, ${errors} errors, ${rows.length - updatedCount - errors} unchanged`, + ); + + return { updatedCount, totalCount: rows.length, errors }; }, }); export const updateWorkflow = createWorkflow({ id: "update-workflow", inputSchema: updateInputSchema, - outputSchema: z.object({ message: z.string() }), + outputSchema: refreshOutputSchema, }) - .then(updateStep) + .then(markAndFetchStep) + .then(refreshRowsStep) .commit(); diff --git a/backend/src/pipeline/populate.ts b/backend/src/pipeline/populate.ts index 9f5f3a5..55d37aa 100644 --- a/backend/src/pipeline/populate.ts +++ b/backend/src/pipeline/populate.ts @@ -13,5 +13,6 @@ export const datasetContextSchema = z.object({ datasetName: z.string(), description: z.string(), columns: z.array(populateColumnSchema).min(1), + rowIds: z.array(z.string()).min(1).optional(), }); export type DatasetContext = z.infer; diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index cc98ef5..a196172 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -135,19 +135,29 @@ export default function DatasetPage() { } async function handleUpdate() { - if (!dataset || updating || dataset.status === "building") return; + if (!dataset || updating || dataset.status === "building" || dataset.status === "updating") return; setUpdating(true); try { const token = await getToken(); if (!token) throw new Error("Not authenticated"); - await update( + const selectedRowIds = selectedCount > 0 ? Array.from(selection.selected) : undefined; + const result = await update( dataset._id, dataset.name, dataset.description, dataset.columns, token, + selectedRowIds, ); + if (selectedCount > 0) selection.clear(); + track(EVENTS.DATASET_UPDATE_STARTED, { + datasetId: dataset._id, + column_count: dataset.columns.length, + row_count: selectedRowIds?.length ?? rows.length, + selective: selectedCount > 0, + runId: result.runId, + }); } catch (err) { console.error("[update] failed", err); captureException(err, { @@ -172,16 +182,20 @@ export default function DatasetPage() { // the "Dataset not found" UI. const exportDisabled = exporting !== null || rows.length === 0; - const isDatasetBuilding = dataset.status === "building"; - const updateDisabled = updating || isDatasetBuilding; - const populateDisabled = populating || isDatasetBuilding; - const updateLabel = isDatasetBuilding - ? "Building…" - : updating - ? "Updating…" - : "Update Dataset"; - const populateLabel = isDatasetBuilding - ? "Building…" + const isDatasetBusy = dataset.status === "building" || dataset.status === "updating"; + const updateDisabled = updating || isDatasetBusy; + const populateDisabled = populating || isDatasetBusy; + const updateLabel = dataset.status === "updating" + ? "Updating…" + : dataset.status === "building" + ? "Building…" + : updating + ? "Starting…" + : selectedCount > 0 + ? `Update (${selectedCount})` + : "Update Dataset"; + const populateLabel = isDatasetBusy + ? (dataset.status === "updating" ? "Updating…" : "Building…") : populating ? "Starting…" : dataset.status === "failed" diff --git a/frontend/app/globals.css b/frontend/app/globals.css index 1b9363a..10b5ef4 100644 --- a/frontend/app/globals.css +++ b/frontend/app/globals.css @@ -97,3 +97,34 @@ body { :root[data-theme="dark"] .group:hover .card-glow::before { opacity: 0.45; } + +@keyframes shimmer { + 0% { background-position: -200% 0; } + 100% { background-position: 200% 0; } +} + +@keyframes cell-flash { + 0% { background-color: rgba(16, 185, 129, 0.25); } + 100% { background-color: transparent; } +} + +.shimmer-overlay { + background: linear-gradient( + 90deg, + transparent 0%, + var(--foreground) 50%, + transparent 100% + ); + background-size: 200% 100%; + animation: shimmer 1.8s ease-in-out infinite; + opacity: 0.04; + pointer-events: none; +} + +:root[data-theme="dark"] .shimmer-overlay { + opacity: 0.06; +} + +.cell-flash { + animation: cell-flash 1.5s ease-out forwards; +} diff --git a/frontend/components/dataset/StatusBadge.tsx b/frontend/components/dataset/StatusBadge.tsx index 81f71d7..1a50b5b 100644 --- a/frontend/components/dataset/StatusBadge.tsx +++ b/frontend/components/dataset/StatusBadge.tsx @@ -1,9 +1,10 @@ -export type DatasetStatus = "live" | "paused" | "building" | "failed"; +export type DatasetStatus = "live" | "paused" | "building" | "updating" | "failed"; const STYLES: Record = { live: "border-emerald-600/20 bg-emerald-600/5 text-emerald-700 dark:text-emerald-400", paused: "border-border bg-background text-muted", building: "border-amber-600/20 bg-amber-600/5 text-amber-700 dark:text-amber-400", + updating: "border-blue-600/20 bg-blue-600/5 text-blue-700 dark:text-blue-400", failed: "border-red-600/20 bg-red-600/5 text-red-700 dark:text-red-400", }; @@ -11,6 +12,7 @@ const LABELS: Record = { live: "Live", paused: "Paused", building: "Building...", + updating: "Updating...", failed: "Failed", }; @@ -25,6 +27,9 @@ export function StatusBadge({ status }: { status: DatasetStatus }) { {status === "building" && ( )} + {status === "updating" && ( + + )} {status === "failed" && ( )} diff --git a/frontend/components/table/DataRow.tsx b/frontend/components/table/DataRow.tsx index b57c5a3..1fe0974 100644 --- a/frontend/components/table/DataRow.tsx +++ b/frontend/components/table/DataRow.tsx @@ -14,6 +14,8 @@ export interface DataRowData { isSelected: (id: string) => boolean; toggleRow: (id: string, shiftKey: boolean) => void; isBuilding: boolean; + pendingRowIds: Set; + flashingCells: Set; } function DataRowImpl({ @@ -25,7 +27,7 @@ function DataRowImpl({ index: number; style: CSSProperties; }) { - const { rows, columns, columnWidths, isSelected, toggleRow, isBuilding } = data; + const { rows, columns, columnWidths, isSelected, toggleRow, isBuilding, pendingRowIds, flashingCells } = data; const row = rows[index]; if (!row) { @@ -104,26 +106,24 @@ function DataRowImpl({ {columns.map((col, cellIdx) => { const width = floorWidth(columnWidths[cellIdx + 1] ?? 150); const value = row.original.data[col.name]; + const isPending = pendingRowIds.has(row.original._id); + const isFlashing = flashingCells.has(`${row.original._id}:${col.name}`); return (
+ {isPending &&
}
); })} diff --git a/frontend/components/table/DatasetTable.tsx b/frontend/components/table/DatasetTable.tsx index 18792da..bdf9d6a 100644 --- a/frontend/components/table/DatasetTable.tsx +++ b/frontend/components/table/DatasetTable.tsx @@ -13,6 +13,7 @@ import type { useSelection } from "./use-selection"; import { usePersistedColumnWidths } from "./use-persisted-widths"; import { TableHeader } from "./TableHeader"; import { DataRow, type DataRowData } from "./DataRow"; +import { useRowChangeDetection } from "./use-row-change-detection"; type Selection = ReturnType; @@ -112,6 +113,7 @@ export function DatasetTable({ const tableRows = table.getRowModel().rows; const columnWidths = useMemo(() => headers.map((h) => h.getSize()), [headers]); const isBuilding = dataset.status === "building"; + const { flashingCells, pendingRowIds } = useRowChangeDetection(rows); const displayCount = isBuilding ? Math.max(tableRows.length, GHOST_ROW_COUNT) : tableRows.length; const totalWidth = columnWidths.reduce((sum, w) => sum + w, 0); const tableContentWidth = totalWidth + LAST_COLUMN_RESIZE_GUTTER; @@ -140,8 +142,10 @@ export function DatasetTable({ isSelected: selection.has, toggleRow, isBuilding, + pendingRowIds, + flashingCells, }), - [tableRows, dataset.columns, columnWidths, selection.has, toggleRow, isBuilding], + [tableRows, dataset.columns, columnWidths, selection.has, toggleRow, isBuilding, pendingRowIds, flashingCells], ); return ( diff --git a/frontend/components/table/types.ts b/frontend/components/table/types.ts index 7014ab9..9ff7cc1 100644 --- a/frontend/components/table/types.ts +++ b/frontend/components/table/types.ts @@ -11,7 +11,7 @@ export interface DatasetMeta { _id: string; name: string; description: string; - status: "live" | "paused" | "building" | "failed"; + status: "live" | "paused" | "building" | "updating" | "failed"; lastStatusError?: string; cadence: string; columns: DatasetColumn[]; @@ -21,4 +21,5 @@ export interface DatasetRow { _id: string; _creationTime: number; data: Record; + updateStatus?: "pending"; } diff --git a/frontend/components/table/use-row-change-detection.ts b/frontend/components/table/use-row-change-detection.ts new file mode 100644 index 0000000..49ce219 --- /dev/null +++ b/frontend/components/table/use-row-change-detection.ts @@ -0,0 +1,75 @@ +import { useEffect, useRef, useState, useMemo } from "react"; +import type { DatasetRow } from "./types"; + +const FLASH_DURATION_MS = 1500; + +export function useRowChangeDetection(rows: DatasetRow[]) { + const prevRowsRef = useRef>(new Map()); + const flashTimersRef = useRef>>(new Set()); + const [flashingCells, setFlashingCells] = useState>(new Set()); + + useEffect(() => { + return () => { + for (const timer of flashTimersRef.current) clearTimeout(timer); + flashTimersRef.current.clear(); + }; + }, []); + + const pendingRowIds = useMemo(() => { + const set = new Set(); + for (const row of rows) { + if (row.updateStatus === "pending") { + set.add(row._id); + } + } + return set; + }, [rows]); + + useEffect(() => { + const prevMap = prevRowsRef.current; + const newFlashes = new Set(); + + for (const row of rows) { + const prev = prevMap.get(row._id); + if (!prev) continue; + + const wasPending = prev.updateStatus === "pending"; + const isNowClear = row.updateStatus !== "pending"; + + if (wasPending && isNowClear) { + for (const [key, newVal] of Object.entries(row.data)) { + const oldVal = prev.data[key]; + if (String(oldVal ?? "") !== String(newVal ?? "")) { + newFlashes.add(`${row._id}:${key}`); + } + } + } + } + + const nextMap = new Map(); + for (const row of rows) { + nextMap.set(row._id, { ...row, data: { ...row.data } }); + } + prevRowsRef.current = nextMap; + + if (newFlashes.size > 0) { + setFlashingCells((prev) => { + const merged = new Set(prev); + for (const key of newFlashes) merged.add(key); + return merged; + }); + + const timer = setTimeout(() => { + setFlashingCells((prev) => { + const next = new Set(prev); + for (const key of newFlashes) next.delete(key); + return next; + }); + flashTimersRef.current.delete(timer); + }, FLASH_DURATION_MS); + flashTimersRef.current.add(timer); + } + }, [rows]); + + return { flashingCells, pendingRowIds }; +} diff --git a/frontend/convex/datasetRows.ts b/frontend/convex/datasetRows.ts index 719bd5f..f69316d 100644 --- a/frontend/convex/datasetRows.ts +++ b/frontend/convex/datasetRows.ts @@ -64,6 +64,8 @@ export const insert = internalMutation({ datasetId: v.id("datasets"), data: v.record(v.string(), v.any()), sources: v.optional(v.array(v.string())), + rowSummary: v.optional(v.string()), + howFound: v.optional(v.string()), }, handler: async (ctx, args) => { const dataset = await ctx.db.get(args.datasetId); @@ -140,20 +142,19 @@ export const update = internalMutation({ id: v.id("datasetRows"), expectedDatasetId: v.id("datasets"), data: v.record(v.string(), v.any()), + sources: v.optional(v.array(v.string())), + rowSummary: v.optional(v.string()), + howFound: v.optional(v.string()), }, handler: async (ctx, args) => { - // 1. Capability scope check (security): atomically verifies the row - // exists AND belongs to expectedDatasetId. Throws otherwise. const existing = await assertRowInDataset( ctx, args.id, args.expectedDatasetId, ); - // 2. Quota: charge the dataset's owner for 1 row modification. await consumeQuotaForDataset(ctx, args.expectedDatasetId, 1); - // 3. Diff + history. const oldData = existing.data as Record; const newData = args.data; for (const [key, newVal] of Object.entries(newData)) { @@ -169,8 +170,14 @@ export const update = internalMutation({ } } - // 4. Patch. - await ctx.db.patch(args.id, { data: newData }); + const patch: Record = { + data: newData, + updateStatus: undefined, + }; + if (args.sources !== undefined) patch.sources = args.sources; + if (args.rowSummary !== undefined) patch.rowSummary = args.rowSummary; + if (args.howFound !== undefined) patch.howFound = args.howFound; + await ctx.db.patch(args.id, patch); }, }); @@ -258,6 +265,45 @@ export const remove = internalMutation({ }, }); +export const markForUpdate = internalMutation({ + args: { + datasetId: v.id("datasets"), + rowIds: v.optional(v.array(v.id("datasetRows"))), + }, + handler: async (ctx, args) => { + if (args.rowIds && args.rowIds.length > 0) { + let marked = 0; + for (const rowId of args.rowIds) { + const row = await ctx.db.get(rowId); + if (row && row.datasetId === args.datasetId) { + await ctx.db.patch(rowId, { updateStatus: "pending" as const }); + marked++; + } + } + return marked; + } + const rows = await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + for (const row of rows) { + await ctx.db.patch(row._id, { updateStatus: "pending" as const }); + } + return rows.length; + }, +}); + +export const clearUpdateStatus = internalMutation({ + args: { + id: v.id("datasetRows"), + expectedDatasetId: v.id("datasets"), + }, + handler: async (ctx, args) => { + await assertRowInDataset(ctx, args.id, args.expectedDatasetId); + await ctx.db.patch(args.id, { updateStatus: undefined }); + }, +}); + /** * Admin-only row listing for a dataset. Used by the populate agent's * `list_rows` tool to see what's already been inserted in the dataset diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 8637a39..cc7b301 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -138,6 +138,9 @@ export const beginPopulateInternal = internalMutation({ if (dataset.status === "building") { return { outcome: "already_building" as const }; } + if (dataset.status === "updating") { + return { outcome: "already_updating" as const }; + } await ctx.db.patch(dataset._id, { status: "building", @@ -147,6 +150,34 @@ export const beginPopulateInternal = internalMutation({ }, }); +export const beginUpdateInternal = internalMutation({ + args: { + id: v.id("datasets"), + ownerId: v.string(), + }, + handler: async (ctx, args) => { + const dataset = await ctx.db.get(args.id); + if (!dataset) { + return { outcome: "not_found" as const }; + } + if (dataset.ownerId !== args.ownerId) { + return { outcome: "forbidden" as const }; + } + if (dataset.status === "building") { + return { outcome: "already_building" as const }; + } + if (dataset.status === "updating") { + return { outcome: "already_updating" as const }; + } + + await ctx.db.patch(dataset._id, { + status: "updating", + lastStatusError: undefined, + }); + return { outcome: "started" as const }; + }, +}); + /** * Admin-only status transition. Used by the backend orchestration layer * to move a dataset between lifecycle states after a workflow completes. @@ -172,6 +203,7 @@ export const setStatusInternal = internalMutation({ v.literal("live"), v.literal("paused"), v.literal("building"), + v.literal("updating"), v.literal("failed"), ), lastStatusError: v.optional(v.string()), diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index 11e635a..05ef659 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -10,6 +10,7 @@ export default defineSchema({ v.literal("live"), v.literal("paused"), v.literal("building"), + v.literal("updating"), v.literal("failed") ), lastStatusError: v.optional(v.string()), @@ -63,6 +64,9 @@ export default defineSchema({ datasetId: v.id("datasets"), data: v.record(v.string(), v.any()), sources: v.optional(v.array(v.string())), + rowSummary: v.optional(v.string()), + howFound: v.optional(v.string()), + updateStatus: v.optional(v.literal("pending")), scrapeScript: v.optional(v.string()), }).index("by_dataset", ["datasetId"]), diff --git a/frontend/lib/analytics.ts b/frontend/lib/analytics.ts index 440587b..bb2cb4a 100644 --- a/frontend/lib/analytics.ts +++ b/frontend/lib/analytics.ts @@ -33,6 +33,7 @@ export const EVENTS = { DATASET_OPENED: "dataset_opened", DATASET_EXPORTED: "dataset_exported", DATASET_POPULATE_STARTED: "dataset_populate_started", + DATASET_UPDATE_STARTED: "dataset_update_started", // Creation flow DATASET_CREATION_STARTED: "dataset_creation_started", diff --git a/frontend/lib/backend.ts b/frontend/lib/backend.ts index 8043c5a..1bb78f1 100644 --- a/frontend/lib/backend.ts +++ b/frontend/lib/backend.ts @@ -90,14 +90,17 @@ export async function update( description: string, columns: PopulateColumn[], token: string, -): Promise { + rowIds?: string[], +): Promise { + const body: Record = { datasetId, datasetName, description, columns }; + if (rowIds && rowIds.length > 0) body.rowIds = rowIds; const res = await fetch(`${BACKEND_URL}/update`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}`, }, - body: JSON.stringify({ datasetId, datasetName, description, columns }), + body: JSON.stringify(body), }); if (!res.ok) {