From 1fe202a74a601356e0ece0e5617e7daf69d71a2a Mon Sep 17 00:00:00 2001 From: pranavjanakiraman Date: Mon, 1 Jun 2026 15:20:50 -0700 Subject: [PATCH 1/2] Add dataset refresh cadence scheduling --- backend/src/env.ts | 12 ++ backend/src/index.ts | 175 ++++++++++++++++ docker-compose.dev.yml | 4 + frontend/app/dashboard/page.tsx | 10 +- frontend/app/dataset/[id]/page.tsx | 87 +++++++- frontend/app/dataset/new/page.tsx | 39 ++-- frontend/components/dataset/DatasetCard.tsx | 7 +- frontend/components/table/types.ts | 9 +- frontend/convex/datasets.ts | 212 +++++++++++++++++++- frontend/convex/modelConfig.ts | 9 +- frontend/convex/publicSeed.ts | 50 +++-- frontend/convex/schema.ts | 22 +- frontend/lib/refresh-cadence.ts | 23 +++ 13 files changed, 601 insertions(+), 58 deletions(-) create mode 100644 frontend/lib/refresh-cadence.ts diff --git a/backend/src/env.ts b/backend/src/env.ts index 084ffe0..89b7d85 100644 --- a/backend/src/env.ts +++ b/backend/src/env.ts @@ -56,4 +56,16 @@ export const env = { process.env.POSTHOG_HOST || process.env.NEXT_PUBLIC_POSTHOG_HOST || "https://us.i.posthog.com", + + REFRESH_SCHEDULER_ENABLED: + process.env.REFRESH_SCHEDULER_ENABLED !== "false", + REFRESH_SCHEDULER_POLL_MS: Number( + process.env.REFRESH_SCHEDULER_POLL_MS || 60_000, + ), + REFRESH_SCHEDULER_BATCH_SIZE: Number( + process.env.REFRESH_SCHEDULER_BATCH_SIZE || 5, + ), + REFRESH_SCHEDULER_STALE_AFTER_MS: Number( + process.env.REFRESH_SCHEDULER_STALE_AFTER_MS || 6 * 60 * 60 * 1000, + ), }; diff --git a/backend/src/index.ts b/backend/src/index.ts index 010df15..d642ac8 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -248,6 +248,73 @@ async function runUpdateWorkflowInBackground({ } } +async function runScheduledUpdateWorkflowInBackground({ + input, + run, + authorizedUserId, + logger, + modelConfig, +}: { + input: DatasetContext; + run: UpdateWorkflowRun; + authorizedUserId: string; + logger: FastifyBaseLogger; + modelConfig: { + schemaInference: string; + populateOrchestrator: string; + investigateSubagent: string; + }; +}): Promise { + const datasetId = input.datasetId; + + try { + const result = await run.start({ + inputData: { + ...input, + authContext: { + authorizedUserId, + workflowRunId: run.runId, + modelConfig, + }, + }, + }); + + logger.info( + { + datasetId, + workflowStatus: result.status, + steps: JSON.stringify(result.steps).slice(0, 2000), + }, + "Scheduled update workflow completed", + ); + + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + await convex.mutation(internal.datasets.completeScheduledRefreshInternal, { + id: datasetId, + now: Date.now(), + }); + } catch (err) { + const lastStatusError = statusErrorMessage(err); + logger.error({ err, datasetId }, "Scheduled update workflow failed"); + + try { + await convex.mutation(internal.datasets.failScheduledRefreshInternal, { + id: datasetId, + now: Date.now(), + lastStatusError, + }); + } catch (statusErr) { + logger.error( + { err: statusErr, datasetId }, + "Failed to record scheduled refresh failure", + ); + } + } +} + async function runPopulateWorkflowInBackground({ input, run, @@ -350,6 +417,110 @@ async function runPopulateWorkflowInBackground({ } } +async function backfillDatasetRefreshSettings( + logger: FastifyBaseLogger, +): Promise { + try { + const result = await convex.mutation( + internal.datasets.backfillRefreshSettings, + { defaultCadence: "daily" }, + ); + logger.info(result, "Dataset refresh settings backfill complete"); + } catch (err) { + logger.error({ err }, "Dataset refresh settings backfill failed"); + } +} + +function startLocalRefreshScheduler( + logger: FastifyBaseLogger, +): ReturnType | null { + if (!env.REFRESH_SCHEDULER_ENABLED) { + logger.info("Dataset refresh scheduler disabled"); + return null; + } + + let ticking = false; + + async function tick(): Promise { + if (ticking) return; + ticking = true; + + try { + const now = Date.now(); + const dueDatasets = await convex.query( + internal.datasets.listDueForRefreshInternal, + { + now, + limit: env.REFRESH_SCHEDULER_BATCH_SIZE, + }, + ); + + for (const dueDataset of dueDatasets as Array<{ _id: string }>) { + let run: UpdateWorkflowRun; + try { + run = await updateWorkflow.createRun(); + } catch (runErr) { + logger.error(runErr, "Failed to create scheduled update workflow run"); + continue; + } + + const claim = await convex.mutation( + internal.datasets.claimScheduledRefreshInternal, + { + id: dueDataset._id, + now: Date.now(), + runId: run.runId, + staleAfterMs: env.REFRESH_SCHEDULER_STALE_AFTER_MS, + }, + ); + + if (claim.outcome !== "started") { + logger.debug( + { datasetId: dueDataset._id, outcome: claim.outcome }, + "Skipped scheduled refresh claim", + ); + continue; + } + + const dataset = claim.dataset; + const { getModelConfig } = await import("./config/models.js"); + const modelConfig = await getModelConfig(dataset.ownerId); + + void runScheduledUpdateWorkflowInBackground({ + input: { + datasetId: dataset.datasetId, + datasetName: dataset.datasetName, + description: dataset.description, + columns: dataset.columns, + }, + run, + authorizedUserId: dataset.ownerId, + logger, + modelConfig, + }); + } + } catch (err) { + logger.error({ err }, "Dataset refresh scheduler tick failed"); + } finally { + ticking = false; + } + } + + void tick(); + const interval = setInterval(() => { + void tick(); + }, env.REFRESH_SCHEDULER_POLL_MS); + logger.info( + { + pollMs: env.REFRESH_SCHEDULER_POLL_MS, + batchSize: env.REFRESH_SCHEDULER_BATCH_SIZE, + staleAfterMs: env.REFRESH_SCHEDULER_STALE_AFTER_MS, + }, + "Dataset refresh scheduler started", + ); + return interval; +} + const fastify = Fastify({ logger: true }); await fastify.register(fastifyCors, { @@ -365,9 +536,13 @@ await fastify.register(fastifyCors, { // protected routes — see the example block below. await fastify.register(clerkAuthPlugin); +void backfillDatasetRefreshSettings(fastify.log); +const refreshScheduler = startLocalRefreshScheduler(fastify.log); + // Flush queued PostHog events on graceful shutdown so a SIGTERM mid-flight // doesn't drop the dataset_ready_email_sent capture from the last request. fastify.addHook("onClose", async () => { + if (refreshScheduler) clearInterval(refreshScheduler); await shutdownAnalytics(); }); diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 1afbf5b..1a12432 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -43,6 +43,10 @@ services: # When unset, backend analytics module no-ops. POSTHOG_KEY: ${NEXT_PUBLIC_POSTHOG_KEY:-} POSTHOG_HOST: ${NEXT_PUBLIC_POSTHOG_HOST:-https://us.i.posthog.com} + REFRESH_SCHEDULER_ENABLED: ${REFRESH_SCHEDULER_ENABLED:-true} + REFRESH_SCHEDULER_POLL_MS: ${REFRESH_SCHEDULER_POLL_MS:-60000} + REFRESH_SCHEDULER_BATCH_SIZE: ${REFRESH_SCHEDULER_BATCH_SIZE:-5} + REFRESH_SCHEDULER_STALE_AFTER_MS: ${REFRESH_SCHEDULER_STALE_AFTER_MS:-21600000} depends_on: convex: condition: service_healthy diff --git a/frontend/app/dashboard/page.tsx b/frontend/app/dashboard/page.tsx index 6432de2..e7d0e8e 100644 --- a/frontend/app/dashboard/page.tsx +++ b/frontend/app/dashboard/page.tsx @@ -6,7 +6,6 @@ import { useRouter } from "next/navigation"; import { useQuery, useConvexAuth } from "convex/react"; import { useUser, useClerk } from "@clerk/nextjs"; import { api } from "@/convex/_generated/api"; -import type { UserResource } from "@clerk/types"; import { DatasetCard, type DatasetCardData, @@ -15,6 +14,13 @@ import { useTheme } from "@/components/ThemeToggle"; import { QuotaBadge } from "@/components/QuotaBadge"; import { EVENTS, track } from "@/lib/analytics"; +type ProfileUser = { + fullName?: string | null; + firstName?: string | null; + primaryEmailAddress?: { emailAddress?: string | null } | null; + imageUrl?: string | null; +}; + export default function DashboardPage() { const { isAuthenticated, isLoading } = useConvexAuth(); const { user } = useUser(); @@ -252,7 +258,7 @@ function ProfileMenu({ user, onSignOut, }: { - user: UserResource | null | undefined; + user: ProfileUser | null | undefined; onSignOut: () => void; }) { const [open, setOpen] = useState(false); diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index a196172..d9fc537 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -3,9 +3,8 @@ import { useParams } from "next/navigation"; import Link from "next/link"; import { useCallback, useEffect, useMemo, useRef, useState } from "react"; -import { useQuery, useConvexAuth } from "convex/react"; +import { useMutation, useQuery, useConvexAuth } from "convex/react"; import { useAuth, useUser, useClerk } from "@clerk/nextjs"; -import type { UserResource } from "@clerk/types"; import { api } from "@/convex/_generated/api"; import type { Id } from "@/convex/_generated/dataModel"; import { DatasetTable } from "@/components/table"; @@ -15,6 +14,18 @@ import { StatusBadge } from "@/components/dataset/StatusBadge"; import { downloadCSV, downloadXLSX } from "@/lib/export"; import { populate, update } from "@/lib/backend"; import { EVENTS, captureException, track } from "@/lib/analytics"; +import { + REFRESH_CADENCE_OPTIONS, + refreshCadenceLabel, + type RefreshCadence, +} from "@/lib/refresh-cadence"; + +type ProfileUser = { + fullName?: string | null; + firstName?: string | null; + primaryEmailAddress?: { emailAddress?: string | null } | null; + imageUrl?: string | null; +}; export default function DatasetPage() { const params = useParams(); @@ -28,6 +39,7 @@ export default function DatasetPage() { const [exportOpen, setExportOpen] = useState(false); const [settingsOpen, setSettingsOpen] = useState(false); const [confirmPopulate, setConfirmPopulate] = useState(false); + const [savingRefreshCadence, setSavingRefreshCadence] = useState(false); const datasetId = params.id as Id<"datasets">; const dataset = useQuery( @@ -38,6 +50,7 @@ export default function DatasetPage() { api.datasetRows.listByDataset, authLoading ? "skip" : { datasetId }, ); + const updateRefreshSettings = useMutation(api.datasets.updateRefreshSettings); const rowIds = useMemo(() => (rows ?? []).map((r) => r._id), [rows]); const selection = useSelection(rowIds); @@ -154,7 +167,7 @@ export default function DatasetPage() { track(EVENTS.DATASET_UPDATE_STARTED, { datasetId: dataset._id, column_count: dataset.columns.length, - row_count: selectedRowIds?.length ?? rows.length, + row_count: selectedRowIds?.length ?? rows?.length ?? 0, selective: selectedCount > 0, runId: result.runId, }); @@ -169,6 +182,25 @@ export default function DatasetPage() { } } + async function handleRefreshCadenceChange(refreshCadence: RefreshCadence) { + if (!dataset || savingRefreshCadence || userId !== dataset.ownerId) return; + setSavingRefreshCadence(true); + try { + await updateRefreshSettings({ + id: dataset._id, + refreshCadence, + }); + } catch (err) { + console.error("[refresh cadence] failed", err); + captureException(err, { + operation: "dataset_refresh_cadence_update", + datasetId: dataset._id, + }); + } finally { + setSavingRefreshCadence(false); + } + } + if (authLoading || dataset === undefined || rows === undefined) { return (
@@ -183,6 +215,12 @@ export default function DatasetPage() { const exportDisabled = exporting !== null || rows.length === 0; const isDatasetBusy = dataset.status === "building" || dataset.status === "updating"; + const isOwner = userId === dataset.ownerId; + const displayDataset = { + ...dataset, + refreshCadence: dataset.refreshCadence ?? "daily", + refreshEnabled: dataset.refreshEnabled ?? true, + }; const updateDisabled = updating || isDatasetBusy; const populateDisabled = populating || isDatasetBusy; const updateLabel = dataset.status === "updating" @@ -239,11 +277,13 @@ export default function DatasetPage() { open={settingsOpen} onToggle={() => setSettingsOpen((o) => !o)} onClose={() => setSettingsOpen(false)} - cadence={dataset.cadence} + refreshCadence={displayDataset.refreshCadence} + refreshCadenceDisabled={!isOwner || savingRefreshCadence} updateLabel={updateLabel} updateDisabled={updateDisabled} populateLabel={populateLabel} populateDisabled={populateDisabled} + onRefreshCadenceChange={handleRefreshCadenceChange} onUpdate={() => { setSettingsOpen(false); handleUpdate(); }} onPopulate={() => { setSettingsOpen(false); @@ -288,7 +328,7 @@ export default function DatasetPage() {
void; onClose: () => void; - cadence: string; + refreshCadence: RefreshCadence; + refreshCadenceDisabled: boolean; updateLabel: string; updateDisabled: boolean; populateLabel: string; populateDisabled: boolean; + onRefreshCadenceChange: (refreshCadence: RefreshCadence) => void; onUpdate: () => void; onPopulate: () => void; }) { @@ -437,7 +481,7 @@ function SettingsDropdown({ {open && ( -
+
-
- {cadence} +
+
+ Refresh cadence +
+ {REFRESH_CADENCE_OPTIONS.map((option) => { + const selected = refreshCadence === option.value; + return ( + + ); + })}
)} @@ -473,7 +538,7 @@ function DatasetProfileMenu({ user, onSignOut, }: { - user: UserResource | null | undefined; + user: ProfileUser | null | undefined; onSignOut: () => void; }) { const [open, setOpen] = useState(false); diff --git a/frontend/app/dataset/new/page.tsx b/frontend/app/dataset/new/page.tsx index a8c7730..285cbde 100644 --- a/frontend/app/dataset/new/page.tsx +++ b/frontend/app/dataset/new/page.tsx @@ -8,6 +8,10 @@ import { useMutation, useConvexAuth } from "convex/react"; import { api } from "@/convex/_generated/api"; import { EVENTS, track } from "@/lib/analytics"; import { inferSchema, type InferredColumn } from "@/lib/backend"; +import { + REFRESH_CADENCE_OPTIONS, + type RefreshCadence, +} from "@/lib/refresh-cadence"; type ColumnType = "text" | "number" | "boolean" | "url" | "date"; @@ -20,25 +24,8 @@ interface ProposedColumn { isPrimaryKey: boolean; } -type Cadence = "30m" | "6h" | "12h" | "daily" | "weekly"; type Step = "describe" | "generating" | "review"; -const CADENCE_OPTIONS: { value: Cadence; label: string }[] = [ - { value: "30m", label: "Every 30 min" }, - { value: "6h", label: "Every 6 hours" }, - { value: "12h", label: "Every 12 hours" }, - { value: "daily", label: "Daily" }, - { value: "weekly", label: "Weekly" }, -]; - -const CADENCE_LABELS: Record = { - "30m": "Every 30 min", - "6h": "Every 6 hours", - "12h": "Every 12 hours", - daily: "Daily", - weekly: "Weekly", -}; - const COLUMN_TYPES: { value: ColumnType; label: string; icon: string }[] = [ { value: "text", label: "Text", icon: "≡" }, { value: "number", label: "Number", icon: "#" }, @@ -93,7 +80,7 @@ export default function NewDatasetPage() { const [step, setStep] = useState("describe"); const [prompt, setPrompt] = useState(""); - const [cadence, setCadence] = useState("daily"); + const [refreshCadence, setRefreshCadence] = useState("daily"); const [columns, setColumns] = useState([]); const [datasetName, setDatasetName] = useState(""); const [isCreating, setIsCreating] = useState(false); @@ -183,7 +170,7 @@ export default function NewDatasetPage() { datasetId = await createDataset({ name: datasetName, description: prompt, - cadence: CADENCE_LABELS[cadence], + refreshCadence, columns: columns.map((c) => ({ name: c.name, type: c.type, @@ -203,7 +190,13 @@ export default function NewDatasetPage() { setIsCreating(false); return; } - try { track(EVENTS.DATASET_CREATED, { datasetId, column_count: columns.length, cadence: CADENCE_LABELS[cadence] }); } catch {} + try { + track(EVENTS.DATASET_CREATED, { + datasetId, + column_count: columns.length, + refreshCadence, + }); + } catch {} router.push(`/dataset/${datasetId}`); } @@ -312,12 +305,12 @@ export default function NewDatasetPage() {
- {CADENCE_OPTIONS.map((opt) => ( + {REFRESH_CADENCE_OPTIONS.map((opt) => (