From c9be1c1fdd7aefa78b2717fc78041e50a8d7bbf0 Mon Sep 17 00:00:00 2001 From: Simantak Dabhade Date: Thu, 28 May 2026 09:46:38 -0700 Subject: [PATCH 1/3] Add update dataset workflow with per-row refresh agents and shimmer UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the full "Update Dataset" feature: **Backend**: New refresh agent that re-fetches data from saved source URLs (falling back to web search), compares with existing values, and calls update_row when data has meaningfully changed. Runs as a Mastra workflow with JS-driven concurrency control (5 agents in parallel). Endpoint switched to async background execution matching the populate pattern. **Frontend**: Per-row shimmer overlay on cells being refreshed, green flash animation on cells that change, progressive de-shimmer as each row's agent completes. New "updating" dataset status (blue badge). Selective update support — selecting rows via checkboxes scopes the update to only those rows. **Provenance**: insert_row and update_row now persist sources, rowSummary, and howFound metadata alongside row data. howFound stores a step-by-step extraction guide that the refresh agent replays on subsequent updates. Co-Authored-By: Claude Opus 4.6 --- backend/src/index.ts | 163 +++++++++++++++--- backend/src/mastra/agents/investigate.ts | 8 +- backend/src/mastra/agents/refresh.ts | 74 ++++++++ backend/src/mastra/tools/dataset-tools.ts | 39 ++++- backend/src/mastra/workflows/update.ts | 159 ++++++++++++++++- backend/src/pipeline/populate.ts | 1 + frontend/app/dataset/[id]/page.tsx | 38 ++-- frontend/app/globals.css | 31 ++++ frontend/components/dataset/StatusBadge.tsx | 7 +- frontend/components/table/DataRow.tsx | 16 +- frontend/components/table/DatasetTable.tsx | 6 +- frontend/components/table/types.ts | 3 +- .../table/use-row-change-detection.ts | 67 +++++++ frontend/convex/datasetRows.ts | 56 +++++- frontend/convex/datasets.ts | 33 ++++ frontend/convex/schema.ts | 4 + frontend/lib/analytics.ts | 1 + frontend/lib/backend.ts | 7 +- 18 files changed, 642 insertions(+), 71 deletions(-) create mode 100644 backend/src/mastra/agents/refresh.ts create mode 100644 frontend/components/table/use-row-change-detection.ts diff --git a/backend/src/index.ts b/backend/src/index.ts index e4e6155..eaf9c1f 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -25,9 +25,18 @@ type DatasetPopulateBeginOutcome = | "started" | "not_found" | "forbidden" - | "already_building"; + | "already_building" + | "already_updating"; type PopulateWorkflowRun = Awaited>; +type DatasetUpdateBeginOutcome = + | "started" + | "not_found" + | "forbidden" + | "already_building" + | "already_updating"; +type UpdateWorkflowRun = Awaited>; + function statusErrorMessage(err: unknown): string { const message = err instanceof Error ? err.message : String(err); return message.slice(0, 500); @@ -130,6 +139,105 @@ async function sendDatasetReadyNotification({ } } +async function beginDatasetUpdate( + datasetId: string, + ownerId: string, +): Promise { + const claim = await convex.mutation(internal.datasets.beginUpdateInternal, { + id: datasetId, + ownerId, + }); + return claim.outcome; +} + +async function runUpdateWorkflowInBackground({ + input, + run, + authorizedUserId, + logger, + clerk, +}: { + input: DatasetContext; + run: UpdateWorkflowRun; + authorizedUserId: string; + logger: FastifyBaseLogger; + clerk: ClerkClient; +}): Promise { + const datasetId = input.datasetId; + + try { + const result = await run.start({ + inputData: { + ...input, + authContext: { + authorizedUserId, + workflowRunId: run.runId, + }, + }, + }); + + logger.info( + { + workflowStatus: result.status, + steps: JSON.stringify(result.steps).slice(0, 2000), + }, + "Update workflow completed", + ); + + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + const currentDataset = await convex.query(internal.datasets.getInternal, { + id: datasetId, + }); + if (!currentDataset) { + logger.info( + { datasetId }, + "Dataset no longer exists post-update; skipping status transition", + ); + return; + } + + await setDatasetPopulateStatus(datasetId, "live"); + + const rowCount = await convex.query( + internal.datasetRows.countByDataset, + { datasetId }, + ); + await sendDatasetReadyNotification({ + logger, + clerk, + userId: authorizedUserId, + datasetId, + datasetName: currentDataset.name, + rowCount, + }); + } catch (err) { + const lastStatusError = statusErrorMessage(err); + logger.error({ err, datasetId }, "Update background workflow failed"); + + try { + const currentDataset = await convex.query(internal.datasets.getInternal, { + id: datasetId, + }); + if (!currentDataset) { + logger.info( + { datasetId }, + "Dataset no longer exists after failed update; skipping failed status transition", + ); + return; + } + await setDatasetPopulateStatus(datasetId, "failed", lastStatusError); + } catch (statusErr) { + logger.error( + { err: statusErr, datasetId }, + "Failed to transition dataset status to 'failed' after update", + ); + } + } +} + async function runPopulateWorkflowInBackground({ input, run, @@ -350,34 +458,45 @@ await fastify.register(async (instance) => { return reply.code(401).send({ error: "Authentication required" }); } - const dataset = await convex.query(internal.datasets.getInternal, { - id: parsed.data.datasetId, - }); - if (!dataset) { + const updateOutcome = await beginDatasetUpdate( + parsed.data.datasetId, + auth.userId, + ); + + if (updateOutcome === "not_found") { return reply.code(404).send({ error: "Dataset not found" }); } - if (dataset.ownerId !== auth.userId) { + if (updateOutcome === "forbidden") { return reply.code(403).send({ error: "Not authorized to update this dataset" }); } + if (updateOutcome === "already_building") { + return reply.code(409).send({ error: "Dataset is being populated" }); + } + if (updateOutcome === "already_updating") { + return reply.code(409).send({ error: "Dataset is already being updated" }); + } + if (updateOutcome !== "started") { + throw new Error(`Unexpected update claim outcome: ${updateOutcome}`); + } - const run = await updateWorkflow.createRun(); - const result = await run.start({ - inputData: { - ...parsed.data, - authContext: { - authorizedUserId: auth.userId, - workflowRunId: run.runId, - }, - }, - }); - - req.log.info({ workflowStatus: result.status }, "Update workflow completed"); - - if (result.status !== "success") { - throw new Error(`Workflow ended with status: ${result.status}`); + let run: UpdateWorkflowRun; + try { + run = await updateWorkflow.createRun(); + } catch (runErr) { + req.log.error(runErr, "Failed to create update workflow run; releasing dataset claim"); + await setDatasetPopulateStatus(parsed.data.datasetId, "failed", statusErrorMessage(runErr)); + return reply.code(502).send({ error: "Failed to update dataset. Please try again." }); } - return { success: true, result: result.result }; + void runUpdateWorkflowInBackground({ + input: parsed.data, + run, + authorizedUserId: auth.userId, + logger: req.log, + clerk: req.server.clerk, + }); + + return reply.code(202).send({ success: true, runId: run.runId }); } catch (err) { const msg = err instanceof Error ? err.message : String(err); if (msg.includes("validator") || msg.includes("Invalid")) { diff --git a/backend/src/mastra/agents/investigate.ts b/backend/src/mastra/agents/investigate.ts index ad8ce1b..92f63da 100644 --- a/backend/src/mastra/agents/investigate.ts +++ b/backend/src/mastra/agents/investigate.ts @@ -33,19 +33,19 @@ RULES: TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces: search_web: {"query": "your search terms"} fetch_page: {"url": "https://example.com"} - insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}} + insert_row: {"data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://url-you-fetched.com"], "row_summary": "one line about this entity", "how_found": "step by step guide on how to extract the data so an agent in the future can do it too"} WORKFLOW: 1. Fetch 1-2 of the provided URLs to get real data (if URLs were given). 2. If you need more, run ONE search and fetch the best result. -3. Call insert_row with whatever real data you have. Use "" for missing fields. +3. Call insert_row with whatever real data you have. Use "" for mdissing fields. + Include "sources" (URLs you fetched), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "", 2. Look for the pricing field, and title name field, 3. etc...) 4. Write your final response: INSERTED: true/false SUMMARY: one line CLUES: hints for finding more entities REASON: why you succeeded or what was missing - -You are scoped to ONE dataset. Do not pass a datasetId to any tool.`; +`; } /** diff --git a/backend/src/mastra/agents/refresh.ts b/backend/src/mastra/agents/refresh.ts new file mode 100644 index 0000000..2215686 --- /dev/null +++ b/backend/src/mastra/agents/refresh.ts @@ -0,0 +1,74 @@ +import { Agent } from "@mastra/core/agent"; +import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { buildPopulateTools } from "../tools/dataset-tools.js"; +import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; +import type { AuthContext } from "../workflows/populate.js"; +import type { PopulateColumn } from "../../pipeline/populate.js"; + +const openrouter = createOpenRouter({ + apiKey: process.env.OPENROUTER_API_KEY!, +}); + +function buildRefreshInstructions(columns: PopulateColumn[]): string { + const columnNames = columns.map((c) => c.name); + const columnsDesc = columns + .map( + (c) => + `- "${c.name}" (${c.type})${c.isPrimaryKey ? " [PRIMARY KEY]" : ""}${c.description ? `: ${c.description}` : ""}`, + ) + .join("\n"); + + return `You are refreshing data for one existing row. Be fast — you have very few steps. + +Columns: +${columnsDesc} + +RULES: +- You have at most 6 tool calls total. +- Start by following the steps in the "Previously found via" section — it describes exactly how the data was originally extracted (which URLs to fetch, which fields to look for). Reproduce those steps to get fresh data. +- If no "Previously found via" steps are provided, fall back to fetching the source URLs directly. +- If a source returns a 404, timeout, or is blocked, note it and move to the next. +- Compare the fetched data with the existing row data carefully. +- If data has MEANINGFULLY changed (not just formatting differences), call update_row with the FULL updated data object (all columns, not just changed ones), plus updated sources, row_summary, and how_found. +- If NO sources work (all 404/blocked), try ONE web search using the primary key values to find a current source. +- If the data is unchanged, do NOT call update_row. Just report your findings. +- Never fabricate values. If you can't verify a field, keep the existing value. + +TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in curly braces: + fetch_page: {"url": "https://example.com"} + search_web: {"query": "your search terms"} + update_row: {"rowId": "", "data": {${columnNames.map((n) => `"${n}": "value"`).join(", ")}}, "sources": ["https://..."], "row_summary": "one line about this entity", "how_found": "how you verified this data"} + +WORKFLOW: +1. Fetch the provided source URLs (1-2 calls). +2. Compare fetched data with existing row data. +3. If changed: call update_row with the full updated data. + If sources broken: try ONE search using primary key values, then fetch the best result. +4. Write your final response: + UPDATED: true/false + CHANGES: what changed (or "no changes") + REASON: why you updated or didn't +`; +} + +export function buildRefreshAgent( + authorizedDatasetId: string, + authContext: AuthContext, + columns: PopulateColumn[], +): Agent { + const { update_row } = buildPopulateTools( + authorizedDatasetId, + authContext, + ); + return new Agent({ + id: "refresh-agent", + name: "Dataset Refresh Agent", + instructions: buildRefreshInstructions(columns), + model: openrouter("qwen/qwen3.7-max"), + tools: { + update_row, + search_web: searchWebTool, + fetch_page: fetchPageTool, + }, + }); +} diff --git a/backend/src/mastra/tools/dataset-tools.ts b/backend/src/mastra/tools/dataset-tools.ts index a3ecac8..ae16438 100644 --- a/backend/src/mastra/tools/dataset-tools.ts +++ b/backend/src/mastra/tools/dataset-tools.ts @@ -127,9 +127,21 @@ export function buildPopulateTools( "Insert a single row into the dataset you are populating. Call this each time you have a row ready — don't wait to batch them.", inputSchema: z.object({ data: z.record(z.string(), z.any()), + sources: z + .array(z.string()) + .optional() + .describe("URLs you visited or used to gather data for this row"), + row_summary: z + .string() + .optional() + .describe("One-line summary of this entity"), + how_found: z + .string() + .optional() + .describe("Brief description of how you found and verified this data"), }), outputSchema: writeResultSchema, - execute: async ({ data }) => { + execute: async ({ data, sources, row_summary, how_found }) => { if (!data || Object.keys(data).length === 0) return { success: false, @@ -139,12 +151,15 @@ export function buildPopulateTools( const cleanedData = cleanDataKeys(data); console.log( - `[insert_row] ${logCtx} cols=${Object.keys(cleanedData).length}`, + `[insert_row] ${logCtx} cols=${Object.keys(cleanedData).length} sources=${sources?.length ?? 0}`, ); try { await convex.mutation(internal.datasetRows.insert, { datasetId: authorizedDatasetId, data: cleanedData, + ...(sources && sources.length > 0 ? { sources } : {}), + ...(row_summary ? { rowSummary: row_summary } : {}), + ...(how_found ? { howFound: how_found } : {}), }); return { success: true }; } catch (err) { @@ -246,9 +261,21 @@ export function buildPopulateTools( inputSchema: z.object({ rowId: z.string(), data: z.record(z.string(), z.any()), + sources: z + .array(z.string()) + .optional() + .describe("Updated source URLs where this data was verified"), + row_summary: z + .string() + .optional() + .describe("Updated one-line summary of this entity"), + how_found: z + .string() + .optional() + .describe("Brief description of how the updated data was found"), }), outputSchema: writeResultSchema, - execute: async ({ rowId, data }) => { + execute: async ({ rowId, data, sources, row_summary, how_found }) => { if (!rowId) return { success: false, error: "rowId is required." }; if (!data || Object.keys(data).length === 0) return { @@ -261,13 +288,13 @@ export function buildPopulateTools( `[update_row] ${logCtx} row=${rowId} cols=${Object.keys(cleanedData).length}`, ); try { - // expectedDatasetId pins the Convex-side atomic capability check. - // If `rowId` belongs to another dataset, the mutation throws - // "Row not found" — uniform with the get_row policy. await convex.mutation(internal.datasetRows.update, { id: rowId, expectedDatasetId: authorizedDatasetId, data: cleanedData, + ...(sources && sources.length > 0 ? { sources } : {}), + ...(row_summary ? { rowSummary: row_summary } : {}), + ...(how_found ? { howFound: how_found } : {}), }); return { success: true }; } catch (err) { diff --git a/backend/src/mastra/workflows/update.ts b/backend/src/mastra/workflows/update.ts index b320bd6..6fc041f 100644 --- a/backend/src/mastra/workflows/update.ts +++ b/backend/src/mastra/workflows/update.ts @@ -1,6 +1,8 @@ import { createStep, createWorkflow } from "@mastra/core/workflows"; import { z } from "zod"; -import { datasetContextSchema } from "../../pipeline/populate.js"; +import { datasetContextSchema, populateColumnSchema } from "../../pipeline/populate.js"; +import { convex, internal } from "../../convex.js"; +import { buildRefreshAgent } from "../agents/refresh.js"; import { authContextSchema } from "./populate.js"; export const updateInputSchema = datasetContextSchema.extend({ @@ -8,22 +10,163 @@ export const updateInputSchema = datasetContextSchema.extend({ }); export type UpdateInput = z.infer; -const updateStep = createStep({ - id: "update-dataset", +const rowSchema = z.object({ + _id: z.string(), + data: z.record(z.string(), z.any()), + sources: z.array(z.string()).optional(), + rowSummary: z.string().optional(), + howFound: z.string().optional(), +}); + +const markAndFetchOutputSchema = updateInputSchema.extend({ + rows: z.array(rowSchema), +}); + +const markAndFetchStep = createStep({ + id: "mark-and-fetch", inputSchema: updateInputSchema, - outputSchema: z.object({ message: z.string() }), + outputSchema: markAndFetchOutputSchema, execute: async ({ inputData }) => { + const selective = inputData.rowIds && inputData.rowIds.length > 0; console.log( - `[update-dataset] triggered dataset=${inputData.datasetId} user=${inputData.authContext.authorizedUserId} run=${inputData.authContext.workflowRunId}`, + `[mark-and-fetch] Marking ${selective ? inputData.rowIds!.length : "all"} rows for dataset ${inputData.datasetId}`, ); - return { message: "Update workflow triggered — logic not yet implemented." }; + + const markedCount = await convex.mutation(internal.datasetRows.markForUpdate, { + datasetId: inputData.datasetId, + ...(selective ? { rowIds: inputData.rowIds } : {}), + }); + + const rawRows = await convex.query(internal.datasetRows.listInternal, { + datasetId: inputData.datasetId, + }); + + let rows = (rawRows as Record[]).map((r) => ({ + _id: String(r._id), + data: (r.data ?? {}) as Record, + sources: r.sources as string[] | undefined, + rowSummary: r.rowSummary as string | undefined, + howFound: r.howFound as string | undefined, + })); + + if (selective) { + const selectedSet = new Set(inputData.rowIds); + rows = rows.filter((r) => selectedSet.has(r._id)); + } + + console.log(`[mark-and-fetch] Marked ${markedCount}, processing ${rows.length} rows`); + return { ...inputData, rows }; + }, +}); + +const refreshOutputSchema = z.object({ + updatedCount: z.number(), + totalCount: z.number(), + errors: z.number(), +}); + +const MAX_CONCURRENT = 5; + +async function processWithConcurrency( + items: T[], + handler: (item: T) => Promise, + max: number, +): Promise { + let idx = 0; + const workers = Array.from( + { length: Math.min(max, items.length) }, + async () => { + while (idx < items.length) { + const i = idx++; + await handler(items[i]); + } + }, + ); + await Promise.allSettled(workers); +} + +const refreshRowsStep = createStep({ + id: "refresh-rows", + inputSchema: markAndFetchOutputSchema, + outputSchema: refreshOutputSchema, + execute: async ({ inputData }) => { + const { datasetId, columns, authContext, rows } = inputData; + let updatedCount = 0; + let errors = 0; + + const pkColumns = columns.filter((c) => c.isPrimaryKey); + + async function processRow(row: z.infer) { + try { + const agent = buildRefreshAgent(datasetId, authContext, columns); + + const pkBlock = + pkColumns.length > 0 + ? pkColumns + .map((c) => `- ${c.name}: ${row.data[c.name] ?? ""}`) + .join("\n") + : "(no primary keys defined)"; + const existingDataBlock = Object.entries(row.data) + .map(([k, v]) => `- ${k}: ${v}`) + .join("\n"); + const sourcesBlock = + row.sources && row.sources.length > 0 + ? `\nSource URLs to check:\n${row.sources.map((s) => `- ${s}`).join("\n")}` + : "\nNo source URLs recorded — search the web using the primary key values."; + + const prompt = `Refresh this existing row and update it if the data has changed. + +Row ID: ${row._id} + +Primary keys: +${pkBlock} + +Existing data: +${existingDataBlock} +${sourcesBlock} +${row.rowSummary ? `\nPrevious summary: ${row.rowSummary}` : ""} +${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`; + + const result = await agent.generate(prompt, { maxSteps: 10 }); + const text = result.text.toLowerCase(); + if (text.includes("updated: true")) { + updatedCount++; + } + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error( + `[refresh-rows] Row ${row._id} failed: ${msg}`, + ); + errors++; + } finally { + try { + await convex.mutation(internal.datasetRows.clearUpdateStatus, { + id: row._id, + expectedDatasetId: datasetId, + }); + } catch { + // Row may have been deleted during the update. + } + } + } + + console.log( + `[refresh-rows] Processing ${rows.length} rows (max ${MAX_CONCURRENT} concurrent)`, + ); + await processWithConcurrency(rows, processRow, MAX_CONCURRENT); + console.log( + `[refresh-rows] Done: ${updatedCount} updated, ${errors} errors, ${rows.length - updatedCount - errors} unchanged`, + ); + + return { updatedCount, totalCount: rows.length, errors }; }, }); export const updateWorkflow = createWorkflow({ id: "update-workflow", inputSchema: updateInputSchema, - outputSchema: z.object({ message: z.string() }), + outputSchema: refreshOutputSchema, }) - .then(updateStep) + .then(markAndFetchStep) + .then(refreshRowsStep) .commit(); diff --git a/backend/src/pipeline/populate.ts b/backend/src/pipeline/populate.ts index 9f5f3a5..52a8a88 100644 --- a/backend/src/pipeline/populate.ts +++ b/backend/src/pipeline/populate.ts @@ -13,5 +13,6 @@ export const datasetContextSchema = z.object({ datasetName: z.string(), description: z.string(), columns: z.array(populateColumnSchema).min(1), + rowIds: z.array(z.string()).optional(), }); export type DatasetContext = z.infer; diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index cc98ef5..a196172 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -135,19 +135,29 @@ export default function DatasetPage() { } async function handleUpdate() { - if (!dataset || updating || dataset.status === "building") return; + if (!dataset || updating || dataset.status === "building" || dataset.status === "updating") return; setUpdating(true); try { const token = await getToken(); if (!token) throw new Error("Not authenticated"); - await update( + const selectedRowIds = selectedCount > 0 ? Array.from(selection.selected) : undefined; + const result = await update( dataset._id, dataset.name, dataset.description, dataset.columns, token, + selectedRowIds, ); + if (selectedCount > 0) selection.clear(); + track(EVENTS.DATASET_UPDATE_STARTED, { + datasetId: dataset._id, + column_count: dataset.columns.length, + row_count: selectedRowIds?.length ?? rows.length, + selective: selectedCount > 0, + runId: result.runId, + }); } catch (err) { console.error("[update] failed", err); captureException(err, { @@ -172,16 +182,20 @@ export default function DatasetPage() { // the "Dataset not found" UI. const exportDisabled = exporting !== null || rows.length === 0; - const isDatasetBuilding = dataset.status === "building"; - const updateDisabled = updating || isDatasetBuilding; - const populateDisabled = populating || isDatasetBuilding; - const updateLabel = isDatasetBuilding - ? "Building…" - : updating - ? "Updating…" - : "Update Dataset"; - const populateLabel = isDatasetBuilding - ? "Building…" + const isDatasetBusy = dataset.status === "building" || dataset.status === "updating"; + const updateDisabled = updating || isDatasetBusy; + const populateDisabled = populating || isDatasetBusy; + const updateLabel = dataset.status === "updating" + ? "Updating…" + : dataset.status === "building" + ? "Building…" + : updating + ? "Starting…" + : selectedCount > 0 + ? `Update (${selectedCount})` + : "Update Dataset"; + const populateLabel = isDatasetBusy + ? (dataset.status === "updating" ? "Updating…" : "Building…") : populating ? "Starting…" : dataset.status === "failed" diff --git a/frontend/app/globals.css b/frontend/app/globals.css index 1b9363a..10b5ef4 100644 --- a/frontend/app/globals.css +++ b/frontend/app/globals.css @@ -97,3 +97,34 @@ body { :root[data-theme="dark"] .group:hover .card-glow::before { opacity: 0.45; } + +@keyframes shimmer { + 0% { background-position: -200% 0; } + 100% { background-position: 200% 0; } +} + +@keyframes cell-flash { + 0% { background-color: rgba(16, 185, 129, 0.25); } + 100% { background-color: transparent; } +} + +.shimmer-overlay { + background: linear-gradient( + 90deg, + transparent 0%, + var(--foreground) 50%, + transparent 100% + ); + background-size: 200% 100%; + animation: shimmer 1.8s ease-in-out infinite; + opacity: 0.04; + pointer-events: none; +} + +:root[data-theme="dark"] .shimmer-overlay { + opacity: 0.06; +} + +.cell-flash { + animation: cell-flash 1.5s ease-out forwards; +} diff --git a/frontend/components/dataset/StatusBadge.tsx b/frontend/components/dataset/StatusBadge.tsx index 81f71d7..1a50b5b 100644 --- a/frontend/components/dataset/StatusBadge.tsx +++ b/frontend/components/dataset/StatusBadge.tsx @@ -1,9 +1,10 @@ -export type DatasetStatus = "live" | "paused" | "building" | "failed"; +export type DatasetStatus = "live" | "paused" | "building" | "updating" | "failed"; const STYLES: Record = { live: "border-emerald-600/20 bg-emerald-600/5 text-emerald-700 dark:text-emerald-400", paused: "border-border bg-background text-muted", building: "border-amber-600/20 bg-amber-600/5 text-amber-700 dark:text-amber-400", + updating: "border-blue-600/20 bg-blue-600/5 text-blue-700 dark:text-blue-400", failed: "border-red-600/20 bg-red-600/5 text-red-700 dark:text-red-400", }; @@ -11,6 +12,7 @@ const LABELS: Record = { live: "Live", paused: "Paused", building: "Building...", + updating: "Updating...", failed: "Failed", }; @@ -25,6 +27,9 @@ export function StatusBadge({ status }: { status: DatasetStatus }) { {status === "building" && ( )} + {status === "updating" && ( + + )} {status === "failed" && ( )} diff --git a/frontend/components/table/DataRow.tsx b/frontend/components/table/DataRow.tsx index b57c5a3..1fe0974 100644 --- a/frontend/components/table/DataRow.tsx +++ b/frontend/components/table/DataRow.tsx @@ -14,6 +14,8 @@ export interface DataRowData { isSelected: (id: string) => boolean; toggleRow: (id: string, shiftKey: boolean) => void; isBuilding: boolean; + pendingRowIds: Set; + flashingCells: Set; } function DataRowImpl({ @@ -25,7 +27,7 @@ function DataRowImpl({ index: number; style: CSSProperties; }) { - const { rows, columns, columnWidths, isSelected, toggleRow, isBuilding } = data; + const { rows, columns, columnWidths, isSelected, toggleRow, isBuilding, pendingRowIds, flashingCells } = data; const row = rows[index]; if (!row) { @@ -104,26 +106,24 @@ function DataRowImpl({ {columns.map((col, cellIdx) => { const width = floorWidth(columnWidths[cellIdx + 1] ?? 150); const value = row.original.data[col.name]; + const isPending = pendingRowIds.has(row.original._id); + const isFlashing = flashingCells.has(`${row.original._id}:${col.name}`); return (
+ {isPending &&
}
); })} diff --git a/frontend/components/table/DatasetTable.tsx b/frontend/components/table/DatasetTable.tsx index 18792da..bdf9d6a 100644 --- a/frontend/components/table/DatasetTable.tsx +++ b/frontend/components/table/DatasetTable.tsx @@ -13,6 +13,7 @@ import type { useSelection } from "./use-selection"; import { usePersistedColumnWidths } from "./use-persisted-widths"; import { TableHeader } from "./TableHeader"; import { DataRow, type DataRowData } from "./DataRow"; +import { useRowChangeDetection } from "./use-row-change-detection"; type Selection = ReturnType; @@ -112,6 +113,7 @@ export function DatasetTable({ const tableRows = table.getRowModel().rows; const columnWidths = useMemo(() => headers.map((h) => h.getSize()), [headers]); const isBuilding = dataset.status === "building"; + const { flashingCells, pendingRowIds } = useRowChangeDetection(rows); const displayCount = isBuilding ? Math.max(tableRows.length, GHOST_ROW_COUNT) : tableRows.length; const totalWidth = columnWidths.reduce((sum, w) => sum + w, 0); const tableContentWidth = totalWidth + LAST_COLUMN_RESIZE_GUTTER; @@ -140,8 +142,10 @@ export function DatasetTable({ isSelected: selection.has, toggleRow, isBuilding, + pendingRowIds, + flashingCells, }), - [tableRows, dataset.columns, columnWidths, selection.has, toggleRow, isBuilding], + [tableRows, dataset.columns, columnWidths, selection.has, toggleRow, isBuilding, pendingRowIds, flashingCells], ); return ( diff --git a/frontend/components/table/types.ts b/frontend/components/table/types.ts index 7014ab9..9ff7cc1 100644 --- a/frontend/components/table/types.ts +++ b/frontend/components/table/types.ts @@ -11,7 +11,7 @@ export interface DatasetMeta { _id: string; name: string; description: string; - status: "live" | "paused" | "building" | "failed"; + status: "live" | "paused" | "building" | "updating" | "failed"; lastStatusError?: string; cadence: string; columns: DatasetColumn[]; @@ -21,4 +21,5 @@ export interface DatasetRow { _id: string; _creationTime: number; data: Record; + updateStatus?: "pending"; } diff --git a/frontend/components/table/use-row-change-detection.ts b/frontend/components/table/use-row-change-detection.ts new file mode 100644 index 0000000..3add86d --- /dev/null +++ b/frontend/components/table/use-row-change-detection.ts @@ -0,0 +1,67 @@ +import { useEffect, useRef, useState, useMemo } from "react"; +import type { DatasetRow } from "./types"; + +const FLASH_DURATION_MS = 1500; + +export function useRowChangeDetection(rows: DatasetRow[]) { + const prevRowsRef = useRef>(new Map()); + const [flashingCells, setFlashingCells] = useState>(new Set()); + + const pendingRowIds = useMemo(() => { + const set = new Set(); + for (const row of rows) { + if (row.updateStatus === "pending") { + set.add(row._id); + } + } + return set; + }, [rows]); + + useEffect(() => { + const prevMap = prevRowsRef.current; + const newFlashes = new Set(); + + for (const row of rows) { + const prev = prevMap.get(row._id); + if (!prev) continue; + + const wasPending = prev.updateStatus === "pending"; + const isNowClear = row.updateStatus !== "pending"; + + if (wasPending && isNowClear) { + for (const [key, newVal] of Object.entries(row.data)) { + const oldVal = prev.data[key]; + if (String(oldVal ?? "") !== String(newVal ?? "")) { + newFlashes.add(`${row._id}:${key}`); + } + } + } + } + + const nextMap = new Map(); + for (const row of rows) { + nextMap.set(row._id, { ...row, data: { ...row.data } }); + } + prevRowsRef.current = nextMap; + + if (newFlashes.size > 0) { + setFlashingCells((prev) => { + const merged = new Set(prev); + for (const key of newFlashes) merged.add(key); + return merged; + }); + + const timer = setTimeout(() => { + setFlashingCells((prev) => { + const next = new Set(prev); + for (const key of newFlashes) next.delete(key); + return next; + }); + }, FLASH_DURATION_MS); + + return () => clearTimeout(timer); + } + }, [rows]); + + return { flashingCells, pendingRowIds }; +} diff --git a/frontend/convex/datasetRows.ts b/frontend/convex/datasetRows.ts index 719bd5f..80e3629 100644 --- a/frontend/convex/datasetRows.ts +++ b/frontend/convex/datasetRows.ts @@ -64,6 +64,8 @@ export const insert = internalMutation({ datasetId: v.id("datasets"), data: v.record(v.string(), v.any()), sources: v.optional(v.array(v.string())), + rowSummary: v.optional(v.string()), + howFound: v.optional(v.string()), }, handler: async (ctx, args) => { const dataset = await ctx.db.get(args.datasetId); @@ -140,20 +142,19 @@ export const update = internalMutation({ id: v.id("datasetRows"), expectedDatasetId: v.id("datasets"), data: v.record(v.string(), v.any()), + sources: v.optional(v.array(v.string())), + rowSummary: v.optional(v.string()), + howFound: v.optional(v.string()), }, handler: async (ctx, args) => { - // 1. Capability scope check (security): atomically verifies the row - // exists AND belongs to expectedDatasetId. Throws otherwise. const existing = await assertRowInDataset( ctx, args.id, args.expectedDatasetId, ); - // 2. Quota: charge the dataset's owner for 1 row modification. await consumeQuotaForDataset(ctx, args.expectedDatasetId, 1); - // 3. Diff + history. const oldData = existing.data as Record; const newData = args.data; for (const [key, newVal] of Object.entries(newData)) { @@ -169,8 +170,14 @@ export const update = internalMutation({ } } - // 4. Patch. - await ctx.db.patch(args.id, { data: newData }); + const patch: Record = { + data: newData, + updateStatus: undefined, + }; + if (args.sources !== undefined) patch.sources = args.sources; + if (args.rowSummary !== undefined) patch.rowSummary = args.rowSummary; + if (args.howFound !== undefined) patch.howFound = args.howFound; + await ctx.db.patch(args.id, patch); }, }); @@ -258,6 +265,43 @@ export const remove = internalMutation({ }, }); +export const markForUpdate = internalMutation({ + args: { + datasetId: v.id("datasets"), + rowIds: v.optional(v.array(v.id("datasetRows"))), + }, + handler: async (ctx, args) => { + if (args.rowIds && args.rowIds.length > 0) { + for (const rowId of args.rowIds) { + const row = await ctx.db.get(rowId); + if (row && row.datasetId === args.datasetId) { + await ctx.db.patch(rowId, { updateStatus: "pending" as const }); + } + } + return args.rowIds.length; + } + const rows = await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + for (const row of rows) { + await ctx.db.patch(row._id, { updateStatus: "pending" as const }); + } + return rows.length; + }, +}); + +export const clearUpdateStatus = internalMutation({ + args: { + id: v.id("datasetRows"), + expectedDatasetId: v.id("datasets"), + }, + handler: async (ctx, args) => { + await assertRowInDataset(ctx, args.id, args.expectedDatasetId); + await ctx.db.patch(args.id, { updateStatus: undefined }); + }, +}); + /** * Admin-only row listing for a dataset. Used by the populate agent's * `list_rows` tool to see what's already been inserted in the dataset diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 8637a39..2088652 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -138,6 +138,9 @@ export const beginPopulateInternal = internalMutation({ if (dataset.status === "building") { return { outcome: "already_building" as const }; } + if (dataset.status === "updating") { + return { outcome: "already_updating" as const }; + } await ctx.db.patch(dataset._id, { status: "building", @@ -147,6 +150,34 @@ export const beginPopulateInternal = internalMutation({ }, }); +export const beginUpdateInternal = internalMutation({ + args: { + id: v.id("datasets"), + ownerId: v.string(), + }, + handler: async (ctx, args) => { + const dataset = await ctx.db.get(args.id); + if (!dataset) { + return { outcome: "not_found" as const }; + } + if (dataset.ownerId !== args.ownerId) { + return { outcome: "forbidden" as const }; + } + if (dataset.status === "building") { + return { outcome: "already_building" as const }; + } + if (dataset.status === "updating") { + return { outcome: "already_updating" as const }; + } + + await ctx.db.patch(dataset._id, { + status: "updating", + lastStatusError: undefined, + }); + return { outcome: "started" as const }; + }, +}); + /** * Admin-only status transition. Used by the backend orchestration layer * to move a dataset between lifecycle states after a workflow completes. @@ -172,6 +203,7 @@ export const setStatusInternal = internalMutation({ v.literal("live"), v.literal("paused"), v.literal("building"), + v.literal("updating"), v.literal("failed"), ), lastStatusError: v.optional(v.string()), @@ -224,6 +256,7 @@ export const updateStatus = mutation({ v.literal("live"), v.literal("paused"), v.literal("building"), + v.literal("updating"), ), }, handler: async (ctx, args) => { diff --git a/frontend/convex/schema.ts b/frontend/convex/schema.ts index 11e635a..05ef659 100644 --- a/frontend/convex/schema.ts +++ b/frontend/convex/schema.ts @@ -10,6 +10,7 @@ export default defineSchema({ v.literal("live"), v.literal("paused"), v.literal("building"), + v.literal("updating"), v.literal("failed") ), lastStatusError: v.optional(v.string()), @@ -63,6 +64,9 @@ export default defineSchema({ datasetId: v.id("datasets"), data: v.record(v.string(), v.any()), sources: v.optional(v.array(v.string())), + rowSummary: v.optional(v.string()), + howFound: v.optional(v.string()), + updateStatus: v.optional(v.literal("pending")), scrapeScript: v.optional(v.string()), }).index("by_dataset", ["datasetId"]), diff --git a/frontend/lib/analytics.ts b/frontend/lib/analytics.ts index 440587b..bb2cb4a 100644 --- a/frontend/lib/analytics.ts +++ b/frontend/lib/analytics.ts @@ -33,6 +33,7 @@ export const EVENTS = { DATASET_OPENED: "dataset_opened", DATASET_EXPORTED: "dataset_exported", DATASET_POPULATE_STARTED: "dataset_populate_started", + DATASET_UPDATE_STARTED: "dataset_update_started", // Creation flow DATASET_CREATION_STARTED: "dataset_creation_started", diff --git a/frontend/lib/backend.ts b/frontend/lib/backend.ts index 8043c5a..1bb78f1 100644 --- a/frontend/lib/backend.ts +++ b/frontend/lib/backend.ts @@ -90,14 +90,17 @@ export async function update( description: string, columns: PopulateColumn[], token: string, -): Promise { + rowIds?: string[], +): Promise { + const body: Record = { datasetId, datasetName, description, columns }; + if (rowIds && rowIds.length > 0) body.rowIds = rowIds; const res = await fetch(`${BACKEND_URL}/update`, { method: "POST", headers: { "Content-Type": "application/json", Authorization: `Bearer ${token}`, }, - body: JSON.stringify({ datasetId, datasetName, description, columns }), + body: JSON.stringify(body), }); if (!res.ok) { From fa00cee0971b4fcd526213a2a0a2a7e466028a07 Mon Sep 17 00:00:00 2001 From: Simantak Dabhade Date: Thu, 28 May 2026 10:35:53 -0700 Subject: [PATCH 2/3] Address CodeRabbit review feedback (9 findings) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove "updating" from public updateStatus mutation (clients shouldn't set system-managed states) - Add workflowType param to sendDatasetReadyNotification so update runs report as "update" not "populate" in analytics - Revert dataset to "live" (not "failed") when createRun() throws since the update never actually started - Fix typo: "mdissing" → "missing" in investigate agent prompt - Use !== undefined checks for metadata fields in insert_row/update_row to allow intentional clears of stale provenance - Reject empty rowIds arrays at schema level (z.array().min(1).optional()) - Use ref-based flash timers to prevent cleanup from canceling pending flash removal timeouts on unrelated row changes - Return actual marked count (not requested count) from markForUpdate - Only suppress "not found" errors in clearUpdateStatus catch, log others Co-Authored-By: Claude Opus 4.6 --- backend/src/index.ts | 9 ++++++--- backend/src/mastra/agents/investigate.ts | 2 +- backend/src/mastra/tools/dataset-tools.ts | 12 ++++++------ backend/src/mastra/workflows/update.ts | 9 +++++++-- backend/src/pipeline/populate.ts | 2 +- .../components/table/use-row-change-detection.ts | 12 ++++++++++-- frontend/convex/datasetRows.ts | 4 +++- frontend/convex/datasets.ts | 1 - 8 files changed, 34 insertions(+), 17 deletions(-) diff --git a/backend/src/index.ts b/backend/src/index.ts index eaf9c1f..f3eb7e0 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -73,6 +73,7 @@ async function sendDatasetReadyNotification({ datasetId, datasetName, rowCount, + workflowType = "populate", }: { logger: FastifyBaseLogger; clerk: ClerkClient; @@ -80,12 +81,13 @@ async function sendDatasetReadyNotification({ datasetId: string; datasetName: string; rowCount: number; + workflowType?: "populate" | "update"; }): Promise { const baseProps = { datasetId, datasetName, rowCount, - workflowType: "populate" as const, + workflowType, }; try { @@ -212,6 +214,7 @@ async function runUpdateWorkflowInBackground({ datasetId, datasetName: currentDataset.name, rowCount, + workflowType: "update", }); } catch (err) { const lastStatusError = statusErrorMessage(err); @@ -483,8 +486,8 @@ await fastify.register(async (instance) => { try { run = await updateWorkflow.createRun(); } catch (runErr) { - req.log.error(runErr, "Failed to create update workflow run; releasing dataset claim"); - await setDatasetPopulateStatus(parsed.data.datasetId, "failed", statusErrorMessage(runErr)); + req.log.error(runErr, "Failed to create update workflow run; reverting dataset status"); + await setDatasetPopulateStatus(parsed.data.datasetId, "live"); return reply.code(502).send({ error: "Failed to update dataset. Please try again." }); } diff --git a/backend/src/mastra/agents/investigate.ts b/backend/src/mastra/agents/investigate.ts index 92f63da..10faaae 100644 --- a/backend/src/mastra/agents/investigate.ts +++ b/backend/src/mastra/agents/investigate.ts @@ -38,7 +38,7 @@ TOOL CALL FORMAT — every tool call argument must be a JSON object wrapped in c WORKFLOW: 1. Fetch 1-2 of the provided URLs to get real data (if URLs were given). 2. If you need more, run ONE search and fetch the best result. -3. Call insert_row with whatever real data you have. Use "" for mdissing fields. +3. Call insert_row with whatever real data you have. Use "" for missing fields. Include "sources" (URLs you fetched), "row_summary" (one line about this entity), and "how_found" (a step by step guide on how you found this data. eg, 1. fetch the contents of this url "", 2. Look for the pricing field, and title name field, 3. etc...) 4. Write your final response: INSERTED: true/false diff --git a/backend/src/mastra/tools/dataset-tools.ts b/backend/src/mastra/tools/dataset-tools.ts index ae16438..1fc016e 100644 --- a/backend/src/mastra/tools/dataset-tools.ts +++ b/backend/src/mastra/tools/dataset-tools.ts @@ -157,9 +157,9 @@ export function buildPopulateTools( await convex.mutation(internal.datasetRows.insert, { datasetId: authorizedDatasetId, data: cleanedData, - ...(sources && sources.length > 0 ? { sources } : {}), - ...(row_summary ? { rowSummary: row_summary } : {}), - ...(how_found ? { howFound: how_found } : {}), + ...(sources !== undefined ? { sources } : {}), + ...(row_summary !== undefined ? { rowSummary: row_summary } : {}), + ...(how_found !== undefined ? { howFound: how_found } : {}), }); return { success: true }; } catch (err) { @@ -292,9 +292,9 @@ export function buildPopulateTools( id: rowId, expectedDatasetId: authorizedDatasetId, data: cleanedData, - ...(sources && sources.length > 0 ? { sources } : {}), - ...(row_summary ? { rowSummary: row_summary } : {}), - ...(how_found ? { howFound: how_found } : {}), + ...(sources !== undefined ? { sources } : {}), + ...(row_summary !== undefined ? { rowSummary: row_summary } : {}), + ...(how_found !== undefined ? { howFound: how_found } : {}), }); return { success: true }; } catch (err) { diff --git a/backend/src/mastra/workflows/update.ts b/backend/src/mastra/workflows/update.ts index 6fc041f..51fe3b1 100644 --- a/backend/src/mastra/workflows/update.ts +++ b/backend/src/mastra/workflows/update.ts @@ -144,8 +144,13 @@ ${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`; id: row._id, expectedDatasetId: datasetId, }); - } catch { - // Row may have been deleted during the update. + } catch (cleanupErr) { + const cleanupMsg = cleanupErr instanceof Error ? cleanupErr.message : String(cleanupErr); + if (/not found/i.test(cleanupMsg)) { + return; + } + console.error(`[refresh-rows] Failed to clear update status for row ${row._id}: ${cleanupMsg}`); + errors++; } } } diff --git a/backend/src/pipeline/populate.ts b/backend/src/pipeline/populate.ts index 52a8a88..55d37aa 100644 --- a/backend/src/pipeline/populate.ts +++ b/backend/src/pipeline/populate.ts @@ -13,6 +13,6 @@ export const datasetContextSchema = z.object({ datasetName: z.string(), description: z.string(), columns: z.array(populateColumnSchema).min(1), - rowIds: z.array(z.string()).optional(), + rowIds: z.array(z.string()).min(1).optional(), }); export type DatasetContext = z.infer; diff --git a/frontend/components/table/use-row-change-detection.ts b/frontend/components/table/use-row-change-detection.ts index 3add86d..49ce219 100644 --- a/frontend/components/table/use-row-change-detection.ts +++ b/frontend/components/table/use-row-change-detection.ts @@ -5,8 +5,16 @@ const FLASH_DURATION_MS = 1500; export function useRowChangeDetection(rows: DatasetRow[]) { const prevRowsRef = useRef>(new Map()); + const flashTimersRef = useRef>>(new Set()); const [flashingCells, setFlashingCells] = useState>(new Set()); + useEffect(() => { + return () => { + for (const timer of flashTimersRef.current) clearTimeout(timer); + flashTimersRef.current.clear(); + }; + }, []); + const pendingRowIds = useMemo(() => { const set = new Set(); for (const row of rows) { @@ -57,9 +65,9 @@ export function useRowChangeDetection(rows: DatasetRow[]) { for (const key of newFlashes) next.delete(key); return next; }); + flashTimersRef.current.delete(timer); }, FLASH_DURATION_MS); - - return () => clearTimeout(timer); + flashTimersRef.current.add(timer); } }, [rows]); diff --git a/frontend/convex/datasetRows.ts b/frontend/convex/datasetRows.ts index 80e3629..f69316d 100644 --- a/frontend/convex/datasetRows.ts +++ b/frontend/convex/datasetRows.ts @@ -272,13 +272,15 @@ export const markForUpdate = internalMutation({ }, handler: async (ctx, args) => { if (args.rowIds && args.rowIds.length > 0) { + let marked = 0; for (const rowId of args.rowIds) { const row = await ctx.db.get(rowId); if (row && row.datasetId === args.datasetId) { await ctx.db.patch(rowId, { updateStatus: "pending" as const }); + marked++; } } - return args.rowIds.length; + return marked; } const rows = await ctx.db .query("datasetRows") diff --git a/frontend/convex/datasets.ts b/frontend/convex/datasets.ts index 2088652..cc7b301 100644 --- a/frontend/convex/datasets.ts +++ b/frontend/convex/datasets.ts @@ -256,7 +256,6 @@ export const updateStatus = mutation({ v.literal("live"), v.literal("paused"), v.literal("building"), - v.literal("updating"), ), }, handler: async (ctx, args) => { From 6946c4544b1fc24e29c7abe56bc2236dcbe6a481 Mon Sep 17 00:00:00 2001 From: Simantak Dabhade Date: Thu, 28 May 2026 10:41:32 -0700 Subject: [PATCH 3/3] Remove double-count of errors on cleanup failure Cleanup errors in clearUpdateStatus should not increment the shared errors counter since the row may have already been counted from the main catch block. Co-Authored-By: Claude Opus 4.6 --- backend/src/mastra/workflows/update.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/src/mastra/workflows/update.ts b/backend/src/mastra/workflows/update.ts index 51fe3b1..c5d44cf 100644 --- a/backend/src/mastra/workflows/update.ts +++ b/backend/src/mastra/workflows/update.ts @@ -150,7 +150,6 @@ ${row.howFound ? `\nPreviously found via: ${row.howFound}` : ""}`; return; } console.error(`[refresh-rows] Failed to clear update status for row ${row._id}: ${cleanupMsg}`); - errors++; } } }