diff --git a/CLAUDE.md b/CLAUDE.md index f5c93f6..4df3522 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -13,6 +13,7 @@ Frontend on :3500, backend on :3501, Mastra Studio on :4111, Convex dashboard on - `CLERK_SECRET_KEY` — from Clerk API Keys - `CLERK_JWT_ISSUER_DOMAIN` — your Frontend API URL (e.g. `https://your-app.clerk.accounts.dev`) 4. Add an OpenRouter API key to the root `.env` file: `OPENROUTER_API_KEY=sk-or-...` (get one at https://openrouter.ai/settings/keys). Docker Compose reads the root `.env` and passes it to the backend and Mastra containers. +4b. Add a TinyFish API key to the root `.env` file: `TINYFISH_API_KEY=...` (get one at https://agent.tinyfish.ai/api-keys). This enables the populate agent to search the web and fetch page content. 5. Run `make dev` — this starts all Docker services AND pushes Convex functions automatically. 6. Generate a Convex admin key (first run only): `docker compose exec convex ./generate_admin_key.sh` and add it as `CONVEX_SELF_HOSTED_ADMIN_KEY` in `frontend/.env.local`, then re-run `make dev`. @@ -28,6 +29,8 @@ Backend is Fastify + Mastra. Fastify serves the HTTP API (Clerk JWT auth on prot The schema inference pipeline: frontend calls `POST /infer-schema` → Fastify verifies the Clerk JWT → calls `inferSchema()` in `backend/src/pipeline/schema-inference.ts` → Claude Sonnet 4.6 via OpenRouter → returns a Zod-validated `DatasetSchema` → frontend maps it to editable columns in the wizard. +The populate pipeline: frontend calls `POST /populate` with `{ datasetId, datasetName, description, columns }` → Fastify verifies the Clerk JWT → triggers `populateWorkflow` which: (1) clears existing rows, (2) builds a prompt from the schema, (3) runs the populate agent (Claude Sonnet 4.6) which searches the web via TinyFish APIs, then inserts rows into Convex one by one. Rows appear in realtime on the frontend via Convex reactive queries. + Convex functions use `ctx.auth.getUserIdentity()` to get the authenticated user. The `ownerId` field on datasets stores `identity.subject` (Clerk user ID). Do not pass `ownerId` from the client. ## Environment Variables @@ -36,6 +39,7 @@ Docker Compose interpolates variables from the root `.env` file. Key variables: - `NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY`, `CLERK_SECRET_KEY` — shared by frontend and backend - `OPENROUTER_API_KEY` — used by backend and Mastra for AI model calls - `CONVEX_SELF_HOSTED_ADMIN_KEY` — used by backend for system-level Convex writes +- `TINYFISH_API_KEY` — used by the populate agent for web search and fetch (get one at https://agent.tinyfish.ai/api-keys) The backend container maps `NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY` → `CLERK_PUBLISHABLE_KEY` (see `docker-compose.dev.yml`). diff --git a/backend/.env.example b/backend/.env.example index f11f5c4..5f6f461 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -14,3 +14,7 @@ CLERK_PUBLISHABLE_KEY= # OpenRouter API key — required by schema inference. # Generate at https://openrouter.ai/settings/keys OPENROUTER_API_KEY=sk-or-... + +# TinyFish API key — used by the populate agent for web search and fetch. +# Generate at https://agent.tinyfish.ai/api-keys +TINYFISH_API_KEY= diff --git a/backend/CLAUDE.md b/backend/CLAUDE.md index 5299189..f5dccc5 100644 --- a/backend/CLAUDE.md +++ b/backend/CLAUDE.md @@ -9,6 +9,7 @@ Fastify serves the backend API on :3501. Protected routes use Clerk JWT verifica Routes: - `GET /health` — public health check - `POST /infer-schema` — protected. Accepts `{ prompt: string }`, returns a `DatasetSchema`. Calls `inferSchema()` from the pipeline. +- `POST /populate` — protected. Accepts a `DatasetContext` (datasetId, name, description, columns). Triggers the populate workflow which clears existing rows, then uses an AI agent to search the web and insert real data. To add a new protected route, register it inside the scoped plugin in `src/index.ts` that has `requireAuth` as a preHandler. Use `req.auth.userId` for the authenticated user — never trust user-supplied IDs in the body. @@ -22,22 +23,30 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is `src/mastra/` — wraps pipelines into Mastra workflows. Runs as a separate Docker service on :4111 with `mastra dev`, which provides a Studio UI for inspecting and testing workflows. -- `src/mastra/index.ts` — registers workflows with the `Mastra` instance +- `src/mastra/index.ts` — registers agents and workflows with the `Mastra` instance - `src/mastra/workflows/infer-schema.ts` — `inferSchemaWorkflow`, a single-step workflow wrapping `inferSchema()` +- `src/mastra/workflows/populate.ts` — `populateWorkflow`, 3-step workflow: clear rows → build prompt → run populate agent +- `src/mastra/agents/populate.ts` — `populateAgent`, an AI agent (Claude Sonnet 4.6 via OpenRouter) with 7 tools for database CRUD and web access +- `src/mastra/tools/dataset-tools.ts` — 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row` +- `src/mastra/tools/web-tools.ts` — 2 TinyFish API tools: `search_web`, `fetch_page` + +The populate agent uses `createStep(agent, { maxSteps: 80 })` to allow enough tool-call rounds for web research + row insertion. + +All tools return structured error messages (not thrown exceptions) so the agent can self-correct. Mastra uses `HOST` and `PORT` env vars for binding. In Docker, `HOST=0.0.0.0` is required. ## Convex -Writes to Convex via `ConvexHttpClient` in `src/convex.ts`. Import `{ convex, api }` from `./convex.js` to call Convex mutations and queries. The `api` types are re-exported from the frontend's generated Convex code. - -The `tsconfig.json` includes `../frontend/convex` so TypeScript can resolve the generated types. +Writes to Convex via `ConvexHttpClient` in `src/convex.ts`. Import `{ convex, api, internal }` from `./convex.js` to call Convex mutations and queries. Uses `anyApi` from `convex/server` as an untyped proxy — this avoids cross-project imports from the frontend's generated code, which don't work in Docker containers. Admin key is set via `setAdminAuth()` for internal mutations. ## Environment Required env vars (see `.env.example`): - `CONVEX_URL` — Convex instance URL +- `CONVEX_SELF_HOSTED_ADMIN_KEY` — for system-level Convex writes (internal mutations) - `CLERK_SECRET_KEY`, `CLERK_PUBLISHABLE_KEY` — for JWT verification - `OPENROUTER_API_KEY` — for AI model calls +- `TINYFISH_API_KEY` — for web search and fetch (populate agent). Get one at https://agent.tinyfish.ai/api-keys In Docker, these are interpolated from the root `.env` file via `docker-compose.dev.yml`. diff --git a/backend/src/convex.ts b/backend/src/convex.ts index 84d8323..2b7e267 100644 --- a/backend/src/convex.ts +++ b/backend/src/convex.ts @@ -1,4 +1,5 @@ import { ConvexHttpClient } from "convex/browser"; +import { anyApi } from "convex/server"; import { env } from "./env.js"; @@ -16,11 +17,12 @@ import { env } from "./env.js"; * ✗ NEVER use this to act "on behalf of a user". For user-initiated work, * the frontend should call Convex directly with the user's Clerk JWT. * - * If admin key is missing, this client can still call PUBLIC functions but - * will fail closed on internal ones (which is the desired behavior — better - * to error than to silently degrade). + * `anyApi` is an untyped proxy that resolves function references at runtime. + * Full types come from the frontend's generated code (included via tsconfig) + * and are available in the IDE, but the Docker container doesn't need them. */ -export { api, internal } from "../../frontend/convex/_generated/api.js"; +export const api = anyApi; +export const internal = anyApi; export const convex = new ConvexHttpClient(env.CONVEX_URL); diff --git a/backend/src/index.ts b/backend/src/index.ts index 175dbf1..330ade1 100644 --- a/backend/src/index.ts +++ b/backend/src/index.ts @@ -4,6 +4,9 @@ import fastifyCors from "@fastify/cors"; import { env } from "./env.js"; import clerkAuthPlugin, { requireAuth } from "./clerk-auth.js"; import { inferSchema } from "./pipeline/schema-inference.js"; +import { datasetContextSchema } from "./pipeline/populate.js"; +import { populateWorkflow } from "./mastra/workflows/populate.js"; +import { convex, api } from "./convex.js"; const fastify = Fastify({ logger: true }); @@ -47,6 +50,44 @@ await fastify.register(async (instance) => { return reply.code(502).send({ error: "Schema inference failed. Please try again." }); } }); + + instance.post("/populate", async (req, reply) => { + const parsed = datasetContextSchema.safeParse(req.body); + if (!parsed.success) { + return reply.code(400).send({ + error: "Invalid request", + details: parsed.error.flatten().fieldErrors, + }); + } + + try { + const dataset = await convex.query(api.datasets.get, { id: parsed.data.datasetId }); + if (!dataset) { + return reply.code(404).send({ error: "Dataset not found" }); + } + if (dataset.ownerId !== req.auth.userId) { + return reply.code(403).send({ error: "Not authorized to populate this dataset" }); + } + + const run = await populateWorkflow.createRun(); + const result = await run.start({ inputData: parsed.data }); + + req.log.info({ workflowStatus: result.status, steps: JSON.stringify(result.steps).slice(0, 2000) }, "Populate workflow completed"); + + if (result.status !== "success") { + throw new Error(`Workflow ended with status: ${result.status}`); + } + + return { success: true, result: result.result }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + if (msg.includes("validator") || msg.includes("Invalid")) { + return reply.code(400).send({ error: "Invalid datasetId" }); + } + req.log.error(err, "Populate failed"); + return reply.code(502).send({ error: "Failed to populate dataset. Please try again." }); + } + }); }); try { diff --git a/backend/src/mastra/agents/populate.ts b/backend/src/mastra/agents/populate.ts new file mode 100644 index 0000000..2da84d0 --- /dev/null +++ b/backend/src/mastra/agents/populate.ts @@ -0,0 +1,36 @@ +import { Agent } from "@mastra/core/agent"; +import { createOpenRouter } from "@openrouter/ai-sdk-provider"; +import { + insertRowTool, + listRowsTool, + getRowTool, + updateRowTool, + deleteRowTool, +} from "../tools/dataset-tools.js"; +import { searchWebTool, fetchPageTool } from "../tools/web-tools.js"; + +const openrouter = createOpenRouter({ + apiKey: process.env.OPENROUTER_API_KEY!, +}); + +export const populateAgent = new Agent({ + id: "populate-agent", + name: "Dataset Populate Agent", + instructions: `You fill datasets with real data. Here's how: + +1. Search the web for data that fits the dataset topic. +2. Fetch 1-2 pages to get details. +3. Call insert_row for each row using what you found. Don't stop until you've inserted all the rows asked for. + +If you can't find enough real data, make up realistic data to fill the rest. Every row must be inserted with insert_row.`, + model: openrouter("anthropic/claude-sonnet-4-6"), + tools: { + insert_row: insertRowTool, + list_rows: listRowsTool, + get_row: getRowTool, + update_row: updateRowTool, + delete_row: deleteRowTool, + search_web: searchWebTool, + fetch_page: fetchPageTool, + }, +}); diff --git a/backend/src/mastra/index.ts b/backend/src/mastra/index.ts index 16d0bc9..9a7cae7 100644 --- a/backend/src/mastra/index.ts +++ b/backend/src/mastra/index.ts @@ -1,6 +1,9 @@ import { Mastra } from "@mastra/core/mastra"; import { inferSchemaWorkflow } from "./workflows/infer-schema.js"; +import { populateWorkflow } from "./workflows/populate.js"; +import { populateAgent } from "./agents/populate.js"; export const mastra = new Mastra({ - workflows: { inferSchemaWorkflow }, + agents: { populateAgent }, + workflows: { inferSchemaWorkflow, populateWorkflow }, }); diff --git a/backend/src/mastra/tools/dataset-tools.ts b/backend/src/mastra/tools/dataset-tools.ts new file mode 100644 index 0000000..d29c5ec --- /dev/null +++ b/backend/src/mastra/tools/dataset-tools.ts @@ -0,0 +1,161 @@ +import { createTool } from "@mastra/core/tools"; +import { z } from "zod"; +import { convex, api, internal } from "../../convex.js"; + +const resultSchema = z.object({ + success: z.boolean(), + error: z.string().optional(), +}); + +function cleanDataKeys(data: Record): Record { + const cleaned: Record = {}; + for (const [key, value] of Object.entries(data)) { + cleaned[key.replace(/^["`]+|["`]+$/g, "")] = value; + } + return cleaned; +} + +export const insertRowTool = createTool({ + id: "insert_row", + description: + "Insert a single row into the dataset. Call this each time you have a row ready — don't wait to batch them.", + inputSchema: z.object({ + datasetId: z.string(), + data: z.record(z.string(), z.any()), + }), + outputSchema: resultSchema, + execute: async ({ datasetId, data }) => { + if (!datasetId) return { success: false, error: "datasetId is required." }; + if (!data || Object.keys(data).length === 0) + return { success: false, error: "data is required and must have at least one key. Pass an object like { \"Column Name\": value }." }; + + const cleanedData = cleanDataKeys(data); + console.log(`[insert_row] Inserting row into ${datasetId} (${Object.keys(cleanedData).length} columns)`); + try { + await convex.mutation(internal.datasetRows.insert, { datasetId, data: cleanedData }); + console.log(`[insert_row] Row inserted successfully`); + return { success: true }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[insert_row] Failed:`, msg); + if (msg.includes("not found")) + return { success: false, error: `Dataset "${datasetId}" not found. Check the datasetId is correct.` }; + if (msg.includes("validator")) + return { success: false, error: `Data validation failed: ${msg}. Check that your data keys are plain strings and values match expected types.` }; + return { success: false, error: `Insert failed: ${msg}` }; + } + }, +}); + +export const listRowsTool = createTool({ + id: "list_rows", + description: + "Read all rows in the dataset. Returns an array of row objects, each with _id and data fields.", + inputSchema: z.object({ + datasetId: z.string(), + }), + outputSchema: z.object({ rows: z.array(z.any()).optional(), error: z.string().optional() }), + execute: async ({ datasetId }) => { + if (!datasetId) return { error: "datasetId is required." }; + + console.log(`[list_rows] Reading all rows for dataset ${datasetId}`); + try { + const rows = await convex.query(api.datasetRows.listByDataset, { datasetId }); + console.log(`[list_rows] Found ${rows.length} rows`); + return { rows }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[list_rows] Failed:`, msg); + if (msg.includes("not found")) + return { error: `Dataset "${datasetId}" not found. Check the datasetId.` }; + return { error: `List rows failed: ${msg}` }; + } + }, +}); + +export const getRowTool = createTool({ + id: "get_row", + description: + "Read a single row by its ID. Returns the row object with _id and data fields, or an error if not found.", + inputSchema: z.object({ + rowId: z.string(), + }), + outputSchema: z.object({ row: z.any().optional(), error: z.string().optional() }), + execute: async ({ rowId }) => { + if (!rowId) return { error: "rowId is required." }; + + console.log(`[get_row] Reading row ${rowId}`); + try { + const row = await convex.query(internal.datasetRows.get, { id: rowId }); + if (!row) return { error: `Row "${rowId}" not found. It may have been deleted.` }; + console.log(`[get_row] Found`); + return { row }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[get_row] Failed:`, msg); + if (msg.includes("validator") || msg.includes("Invalid")) + return { error: `Invalid row ID format: "${rowId}". Row IDs look like "jd7..." — they are Convex document IDs.` }; + return { error: `Get row failed: ${msg}` }; + } + }, +}); + +export const updateRowTool = createTool({ + id: "update_row", + description: + "Update an existing row by its ID. Pass the full updated data object. Changes are tracked in history.", + inputSchema: z.object({ + rowId: z.string(), + data: z.record(z.string(), z.any()), + }), + outputSchema: resultSchema, + execute: async ({ rowId, data }) => { + if (!rowId) return { success: false, error: "rowId is required." }; + if (!data || Object.keys(data).length === 0) + return { success: false, error: "data is required. Pass the full updated row data object." }; + + const cleanedData = cleanDataKeys(data); + console.log(`[update_row] Updating row ${rowId} (${Object.keys(cleanedData).length} columns)`); + try { + await convex.mutation(internal.datasetRows.update, { id: rowId, data: cleanedData }); + console.log(`[update_row] Row updated successfully`); + return { success: true }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[update_row] Failed:`, msg); + if (msg.includes("Row not found") || msg.includes("not found")) + return { success: false, error: `Row "${rowId}" not found. Use list_rows to see existing row IDs.` }; + if (msg.includes("validator") || msg.includes("Invalid")) + return { success: false, error: `Invalid input: ${msg}. Check that rowId is a valid Convex ID and data keys are plain strings.` }; + return { success: false, error: `Update failed: ${msg}` }; + } + }, +}); + +export const deleteRowTool = createTool({ + id: "delete_row", + description: + "Delete a single row by its ID. This is permanent.", + inputSchema: z.object({ + rowId: z.string(), + }), + outputSchema: resultSchema, + execute: async ({ rowId }) => { + if (!rowId) return { success: false, error: "rowId is required." }; + + console.log(`[delete_row] Deleting row ${rowId}`); + try { + await convex.mutation(internal.datasetRows.remove, { id: rowId }); + console.log(`[delete_row] Row deleted successfully`); + return { success: true }; + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + console.error(`[delete_row] Failed:`, msg); + if (msg.includes("not found")) + return { success: false, error: `Row "${rowId}" not found. It may have already been deleted.` }; + if (msg.includes("validator") || msg.includes("Invalid")) + return { success: false, error: `Invalid row ID format: "${rowId}". Use list_rows to find valid row IDs.` }; + return { success: false, error: `Delete failed: ${msg}` }; + } + }, +}); diff --git a/backend/src/mastra/tools/web-tools.ts b/backend/src/mastra/tools/web-tools.ts new file mode 100644 index 0000000..f0f112e --- /dev/null +++ b/backend/src/mastra/tools/web-tools.ts @@ -0,0 +1,162 @@ +import { createTool } from "@mastra/core/tools"; +import { z } from "zod"; + +const FETCH_TIMEOUT_MS = 30_000; + +const searchResultSchema = z.object({ + title: z.string(), + snippet: z.string(), + url: z.string(), +}); + +export const searchWebTool = createTool({ + id: "search_web", + description: + "Search the web for information. Returns a list of results with titles, snippets, and URLs. Use this to find real data for the dataset.", + inputSchema: z.object({ + query: z.string().describe("The search query"), + }), + outputSchema: z.object({ + results: z.array(searchResultSchema).optional(), + error: z.string().optional(), + }), + execute: async ({ query }) => { + if (!query?.trim()) + return { error: "query is required and cannot be empty." }; + + const apiKey = process.env.TINYFISH_API_KEY; + if (!apiKey) + return { error: "TINYFISH_API_KEY is not configured. Web search is unavailable — use synthetic data instead." }; + + const url = `https://api.search.tinyfish.ai?query=${encodeURIComponent(query)}`; + console.log(`[search_web] Searching: "${query}"`); + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + try { + const res = await fetch(url, { + headers: { "X-API-Key": apiKey }, + signal: controller.signal, + }); + clearTimeout(timeout); + + if (!res.ok) { + const body = await res.text(); + console.error(`[search_web] API error ${res.status}:`, body.slice(0, 200)); + if (res.status === 429) + return { error: "Search rate limit hit. Wait a moment, or skip web search and use synthetic data." }; + if (res.status === 401) + return { error: "Invalid TINYFISH_API_KEY. Web search unavailable — use synthetic data." }; + return { error: `Search API returned HTTP ${res.status}. Try a different query or use synthetic data.` }; + } + + const data = await res.json(); + const results = (data.results ?? []).map((r: Record) => ({ + title: r.title as string, + snippet: r.snippet as string, + url: r.url as string, + })); + + console.log(`[search_web] Got ${results.length} results`); + if (results.length === 0) + return { results: [], error: "No results found for this query. Try a broader search or use synthetic data." }; + return { results }; + } catch (err) { + clearTimeout(timeout); + if (err instanceof Error && err.name === "AbortError") + return { error: "Search timed out. Skip web search and use synthetic data." }; + const msg = err instanceof Error ? err.message : String(err); + console.error(`[search_web] Failed:`, msg); + return { error: `Search failed: ${msg}. Skip web search and use synthetic data.` }; + } + }, +}); + +export const fetchPageTool = createTool({ + id: "fetch_page", + description: + "Fetch a web page and extract its content as clean markdown text. Use this after search_web to read the full content of a page.", + inputSchema: z.object({ + url: z.string().describe("The URL to fetch"), + }), + outputSchema: z.object({ + title: z.string().optional(), + text: z.string().optional(), + error: z.string().optional(), + }), + execute: async ({ url: targetUrl }) => { + if (!targetUrl?.trim()) + return { error: "url is required and cannot be empty." }; + if (!targetUrl.startsWith("http://") && !targetUrl.startsWith("https://")) + return { error: `Invalid URL "${targetUrl}". Must start with http:// or https://.` }; + + const apiKey = process.env.TINYFISH_API_KEY; + if (!apiKey) + return { error: "TINYFISH_API_KEY is not configured. Page fetch is unavailable — use data from search snippets instead." }; + + console.log(`[fetch_page] Fetching: ${targetUrl}`); + + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), FETCH_TIMEOUT_MS); + try { + const res = await fetch("https://api.fetch.tinyfish.ai", { + method: "POST", + headers: { + "Content-Type": "application/json", + "X-API-Key": apiKey, + }, + body: JSON.stringify({ urls: [targetUrl], format: "markdown" }), + signal: controller.signal, + }); + clearTimeout(timeout); + + if (!res.ok) { + const body = await res.text(); + console.error(`[fetch_page] API error ${res.status}:`, body.slice(0, 200)); + if (res.status === 429) + return { error: "Fetch rate limit hit. Use data from search snippets instead." }; + if (res.status === 401) + return { error: "Invalid TINYFISH_API_KEY. Page fetch unavailable." }; + return { error: `Fetch API returned HTTP ${res.status}. Try a different URL or use search snippet data.` }; + } + + const data = await res.json(); + + if (data.errors?.length > 0) { + const err = data.errors[0]; + console.log(`[fetch_page] Failed: ${err.error}`); + const hints: Record = { + bot_blocked: "This site blocks automated access. Use the search snippet data instead.", + timeout: "Page took too long to load. Try a different URL.", + target_unreachable: "Could not connect to this site. Try a different URL.", + page_not_found: "Page not found (404). The URL may be outdated. Try a different one.", + target_http_error: `Site returned HTTP ${err.status ?? "error"}. Try a different URL.`, + }; + return { error: hints[err.error] ?? `Fetch failed: ${err.error}. Try a different URL.` }; + } + + const page = data.results?.[0]; + if (!page?.text) + return { error: "Page loaded but had no extractable text content. Try a different URL." }; + + let text = page.text as string; + const MAX_CHARS = 15000; + if (text.length > MAX_CHARS) { + text = text.slice(0, MAX_CHARS) + `\n\n[Truncated — showing first ${MAX_CHARS} of ${page.text.length} chars]`; + } + + console.log(`[fetch_page] Got ${(page.text as string).length} chars from "${page.title}" (returning ${text.length})`); + return { + title: page.title as string | undefined, + text, + }; + } catch (err) { + clearTimeout(timeout); + if (err instanceof Error && err.name === "AbortError") + return { error: "Page fetch timed out. Try a different URL or use search snippet data." }; + const msg = err instanceof Error ? err.message : String(err); + console.error(`[fetch_page] Failed:`, msg); + return { error: `Fetch failed: ${msg}. Use data from search snippets instead.` }; + } + }, +}); diff --git a/backend/src/mastra/workflows/populate.ts b/backend/src/mastra/workflows/populate.ts new file mode 100644 index 0000000..03e8d3c --- /dev/null +++ b/backend/src/mastra/workflows/populate.ts @@ -0,0 +1,64 @@ +import { createStep, createWorkflow } from "@mastra/core/workflows"; +import { z } from "zod"; +import { datasetContextSchema } from "../../pipeline/populate.js"; +import { convex, internal } from "../../convex.js"; +import { populateAgent } from "../agents/populate.js"; + +const clearRowsStep = createStep({ + id: "clear-rows", + inputSchema: datasetContextSchema, + outputSchema: datasetContextSchema, + execute: async ({ inputData }) => { + console.log(`[clear-rows] Clearing rows for dataset ${inputData.datasetId}`); + await convex.mutation(internal.datasetRows.clearByDataset, { + datasetId: inputData.datasetId, + }); + console.log(`[clear-rows] Done`); + return inputData; + }, +}); + +const buildPromptStep = createStep({ + id: "build-prompt", + inputSchema: datasetContextSchema, + outputSchema: z.object({ prompt: z.string() }), + execute: async ({ inputData }) => { + const columnNames = inputData.columns.map((c) => c.name); + const columnsDesc = inputData.columns + .map( + (c) => + `- "${c.name}" (${c.type})${c.description ? `: ${c.description}` : ""}`, + ) + .join("\n"); + + const prompt = `Dataset ID: ${inputData.datasetId} +Dataset: ${inputData.datasetName} +Description: ${inputData.description} + +Columns: +${columnsDesc} + +When calling insert_row, the data object keys MUST be exactly these strings (no backticks, no extra quotes): +${JSON.stringify(columnNames)} + +Example insert_row call: +insert_row({ datasetId: "${inputData.datasetId}", data: { ${columnNames.map((n) => `"${n}": `).join(", ")} } }) + +Search the web for real data about this topic. Then call insert_row to fill in 10 rows. Use real data from your search. Fill in any gaps with realistic fake data.`; + + console.log(`[build-prompt] Built prompt for ${inputData.datasetName} (${inputData.columns.length} columns)`); + return { prompt }; + }, +}); + +const agentStep = createStep(populateAgent, { maxSteps: 80 }); + +export const populateWorkflow = createWorkflow({ + id: "populate-workflow", + inputSchema: datasetContextSchema, + outputSchema: z.object({ text: z.string() }), +}) + .then(clearRowsStep) + .then(buildPromptStep) + .then(agentStep) + .commit(); diff --git a/backend/src/pipeline/populate.ts b/backend/src/pipeline/populate.ts new file mode 100644 index 0000000..1524d34 --- /dev/null +++ b/backend/src/pipeline/populate.ts @@ -0,0 +1,16 @@ +import { z } from "zod"; + +export const populateColumnSchema = z.object({ + name: z.string(), + type: z.enum(["text", "number", "boolean", "url", "date"]), + description: z.optional(z.string()), +}); +export type PopulateColumn = z.infer; + +export const datasetContextSchema = z.object({ + datasetId: z.string().min(1), + datasetName: z.string(), + description: z.string(), + columns: z.array(populateColumnSchema).min(1), +}); +export type DatasetContext = z.infer; diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 00e18e5..7a0eec1 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -32,6 +32,7 @@ services: CLERK_SECRET_KEY: ${CLERK_SECRET_KEY:-} CLERK_PUBLISHABLE_KEY: ${NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY:-} OPENROUTER_API_KEY: ${OPENROUTER_API_KEY:-} + TINYFISH_API_KEY: ${TINYFISH_API_KEY:-} depends_on: convex: condition: service_healthy @@ -50,6 +51,7 @@ services: OPENROUTER_API_KEY: ${OPENROUTER_API_KEY:-} CONVEX_URL: http://convex:3210 CONVEX_SELF_HOSTED_ADMIN_KEY: ${CONVEX_SELF_HOSTED_ADMIN_KEY:-} + TINYFISH_API_KEY: ${TINYFISH_API_KEY:-} depends_on: convex: condition: service_healthy diff --git a/frontend/app/dataset/[id]/page.tsx b/frontend/app/dataset/[id]/page.tsx index 7cdb26a..84e4234 100644 --- a/frontend/app/dataset/[id]/page.tsx +++ b/frontend/app/dataset/[id]/page.tsx @@ -11,17 +11,19 @@ import { DatasetTable } from "@/components/table"; import { ThemeToggle } from "@/components/ThemeToggle"; import { StatusBadge } from "@/components/dataset/StatusBadge"; import { downloadCSV, downloadXLSX } from "@/lib/export"; +import { populate } from "@/lib/backend"; import { EVENTS, captureException, track } from "@/lib/analytics"; export default function DatasetPage() { const params = useParams(); const { isLoading } = useConvexAuth(); - const { userId } = useAuth(); + const { userId, getToken } = useAuth(); const [exporting, setExporting] = useState<"csv" | "xlsx" | null>(null); + const [populating, setPopulating] = useState(false); const datasetId = params.id as Id<"datasets">; - const dataset = useQuery(api.datasets.get, { id: datasetId }); - const rows = useQuery(api.datasetRows.listByDataset, { + const dataset = useQuery(api.datasets.get, isLoading ? "skip" : { id: datasetId }); + const rows = useQuery(api.datasetRows.listByDataset, isLoading ? "skip" : { datasetId, }); @@ -67,6 +69,35 @@ export default function DatasetPage() { } } + async function handlePopulate() { + if (!dataset || populating) return; + setPopulating(true); + try { + const token = await getToken(); + if (!token) throw new Error("Not authenticated"); + + await populate( + dataset._id, + dataset.name, + dataset.description, + dataset.columns, + token, + ); + track(EVENTS.DATASET_POPULATED, { + datasetId: dataset._id, + column_count: dataset.columns.length, + }); + } catch (err) { + console.error("[populate] failed", err); + captureException(err, { + operation: "dataset_populate", + datasetId: dataset._id, + }); + } finally { + setPopulating(false); + } + } + if (isLoading || dataset === undefined || rows === undefined) { return (
@@ -111,6 +142,13 @@ export default function DatasetPage() { > {exporting === "xlsx" ? "Exporting…" : "Export XLSX"} +
diff --git a/frontend/convex/datasetRows.ts b/frontend/convex/datasetRows.ts index b3d0f96..dc3f318 100644 --- a/frontend/convex/datasetRows.ts +++ b/frontend/convex/datasetRows.ts @@ -1,4 +1,4 @@ -import { query, internalMutation } from "./_generated/server.js"; +import { query, internalMutation, internalQuery } from "./_generated/server.js"; import { v } from "convex/values"; import { loadReadableDataset } from "./lib/authz.js"; @@ -72,6 +72,34 @@ export const update = internalMutation({ }, }); +export const clearByDataset = internalMutation({ + args: { datasetId: v.id("datasets") }, + handler: async (ctx, args) => { + const rows = await ctx.db + .query("datasetRows") + .withIndex("by_dataset", (q) => q.eq("datasetId", args.datasetId)) + .collect(); + for (const row of rows) { + await ctx.db.delete(row._id); + } + return rows.length; + }, +}); + +export const get = internalQuery({ + args: { id: v.id("datasetRows") }, + handler: async (ctx, args) => { + return await ctx.db.get(args.id); + }, +}); + +export const remove = internalMutation({ + args: { id: v.id("datasetRows") }, + handler: async (ctx, args) => { + await ctx.db.delete(args.id); + }, +}); + export const insertBatch = internalMutation({ args: { datasetId: v.id("datasets"), diff --git a/frontend/lib/analytics.ts b/frontend/lib/analytics.ts index 8f076cc..7b60702 100644 --- a/frontend/lib/analytics.ts +++ b/frontend/lib/analytics.ts @@ -32,6 +32,7 @@ export const EVENTS = { // Dataset interaction DATASET_OPENED: "dataset_opened", DATASET_EXPORTED: "dataset_exported", + DATASET_POPULATED: "dataset_populated", // Creation flow DATASET_CREATION_STARTED: "dataset_creation_started", diff --git a/frontend/lib/backend.ts b/frontend/lib/backend.ts index 9061ae3..c1e7142 100644 --- a/frontend/lib/backend.ts +++ b/frontend/lib/backend.ts @@ -17,6 +17,17 @@ export interface InferredColumn { nullable: boolean; } +export interface PopulateColumn { + name: string; + type: "text" | "number" | "boolean" | "url" | "date"; + description?: string; +} + +export interface PopulateResult { + success: boolean; + result: unknown; +} + const BACKEND_URL = process.env.NEXT_PUBLIC_BACKEND_URL || "http://localhost:3501"; @@ -41,3 +52,28 @@ export async function inferSchema( return res.json(); } + +export async function populate( + datasetId: string, + datasetName: string, + description: string, + columns: PopulateColumn[], + token: string, +): Promise { + const res = await fetch(`${BACKEND_URL}/populate`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${token}`, + }, + body: JSON.stringify({ datasetId, datasetName: datasetName, description, columns }), + }); + + if (!res.ok) { + const body = await res.json().catch(() => null); + const message = body?.error || `Backend error (${res.status})`; + throw new Error(message); + } + + return res.json(); +}