Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Frontend on :3500, backend on :3501, Mastra Studio on :4111, Convex dashboard on
- `CLERK_SECRET_KEY` — from Clerk API Keys
- `CLERK_JWT_ISSUER_DOMAIN` — your Frontend API URL (e.g. `https://your-app.clerk.accounts.dev`)
4. Add an OpenRouter API key to the root `.env` file: `OPENROUTER_API_KEY=sk-or-...` (get one at https://openrouter.ai/settings/keys). Docker Compose reads the root `.env` and passes it to the backend and Mastra containers.
4b. Add a TinyFish API key to the root `.env` file: `TINYFISH_API_KEY=...` (get one at https://agent.tinyfish.ai/api-keys). This enables the populate agent to search the web and fetch page content.
5. Run `make dev` — this starts all Docker services AND pushes Convex functions automatically.
6. Generate a Convex admin key (first run only): `docker compose exec convex ./generate_admin_key.sh` and add it as `CONVEX_SELF_HOSTED_ADMIN_KEY` in `frontend/.env.local`, then re-run `make dev`.

Expand All @@ -28,6 +29,8 @@ Backend is Fastify + Mastra. Fastify serves the HTTP API (Clerk JWT auth on prot

The schema inference pipeline: frontend calls `POST /infer-schema` → Fastify verifies the Clerk JWT → calls `inferSchema()` in `backend/src/pipeline/schema-inference.ts` → Claude Sonnet 4.6 via OpenRouter → returns a Zod-validated `DatasetSchema` → frontend maps it to editable columns in the wizard.

The populate pipeline: frontend calls `POST /populate` with `{ datasetId, datasetName, description, columns }` → Fastify verifies the Clerk JWT → triggers `populateWorkflow` which: (1) clears existing rows, (2) builds a prompt from the schema, (3) runs the populate agent (Claude Sonnet 4.6) which searches the web via TinyFish APIs, then inserts rows into Convex one by one. Rows appear in realtime on the frontend via Convex reactive queries.

Convex functions use `ctx.auth.getUserIdentity()` to get the authenticated user. The `ownerId` field on datasets stores `identity.subject` (Clerk user ID). Do not pass `ownerId` from the client.

## Environment Variables
Expand All @@ -36,6 +39,7 @@ Docker Compose interpolates variables from the root `.env` file. Key variables:
- `NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY`, `CLERK_SECRET_KEY` — shared by frontend and backend
- `OPENROUTER_API_KEY` — used by backend and Mastra for AI model calls
- `CONVEX_SELF_HOSTED_ADMIN_KEY` — used by backend for system-level Convex writes
- `TINYFISH_API_KEY` — used by the populate agent for web search and fetch (get one at https://agent.tinyfish.ai/api-keys)

The backend container maps `NEXT_PUBLIC_CLERK_PUBLISHABLE_KEY` → `CLERK_PUBLISHABLE_KEY` (see `docker-compose.dev.yml`).

Expand Down
4 changes: 4 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ CLERK_PUBLISHABLE_KEY=
# OpenRouter API key — required by schema inference.
# Generate at https://openrouter.ai/settings/keys
OPENROUTER_API_KEY=sk-or-...

# TinyFish API key — used by the populate agent for web search and fetch.
# Generate at https://agent.tinyfish.ai/api-keys
TINYFISH_API_KEY=
17 changes: 13 additions & 4 deletions backend/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Fastify serves the backend API on :3501. Protected routes use Clerk JWT verifica
Routes:
- `GET /health` — public health check
- `POST /infer-schema` — protected. Accepts `{ prompt: string }`, returns a `DatasetSchema`. Calls `inferSchema()` from the pipeline.
- `POST /populate` — protected. Accepts a `DatasetContext` (datasetId, name, description, columns). Triggers the populate workflow which clears existing rows, then uses an AI agent to search the web and insert real data.

To add a new protected route, register it inside the scoped plugin in `src/index.ts` that has `requireAuth` as a preHandler. Use `req.auth.userId` for the authenticated user — never trust user-supplied IDs in the body.

Expand All @@ -22,22 +23,30 @@ The pipeline is a pure function (`inferSchema(prompt) → DatasetSchema`). It is

`src/mastra/` — wraps pipelines into Mastra workflows. Runs as a separate Docker service on :4111 with `mastra dev`, which provides a Studio UI for inspecting and testing workflows.

- `src/mastra/index.ts` — registers workflows with the `Mastra` instance
- `src/mastra/index.ts` — registers agents and workflows with the `Mastra` instance
- `src/mastra/workflows/infer-schema.ts` — `inferSchemaWorkflow`, a single-step workflow wrapping `inferSchema()`
- `src/mastra/workflows/populate.ts` — `populateWorkflow`, 3-step workflow: clear rows → build prompt → run populate agent
- `src/mastra/agents/populate.ts` — `populateAgent`, an AI agent (Claude Sonnet 4.6 via OpenRouter) with 7 tools for database CRUD and web access
- `src/mastra/tools/dataset-tools.ts` — 5 Convex-backed tools: `insert_row`, `list_rows`, `get_row`, `update_row`, `delete_row`
- `src/mastra/tools/web-tools.ts` — 2 TinyFish API tools: `search_web`, `fetch_page`

The populate agent uses `createStep(agent, { maxSteps: 80 })` to allow enough tool-call rounds for web research + row insertion.

All tools return structured error messages (not thrown exceptions) so the agent can self-correct.

Mastra uses `HOST` and `PORT` env vars for binding. In Docker, `HOST=0.0.0.0` is required.

## Convex

Writes to Convex via `ConvexHttpClient` in `src/convex.ts`. Import `{ convex, api }` from `./convex.js` to call Convex mutations and queries. The `api` types are re-exported from the frontend's generated Convex code.

The `tsconfig.json` includes `../frontend/convex` so TypeScript can resolve the generated types.
Writes to Convex via `ConvexHttpClient` in `src/convex.ts`. Import `{ convex, api, internal }` from `./convex.js` to call Convex mutations and queries. Uses `anyApi` from `convex/server` as an untyped proxy — this avoids cross-project imports from the frontend's generated code, which don't work in Docker containers. Admin key is set via `setAdminAuth()` for internal mutations.

## Environment

Required env vars (see `.env.example`):
- `CONVEX_URL` — Convex instance URL
- `CONVEX_SELF_HOSTED_ADMIN_KEY` — for system-level Convex writes (internal mutations)
- `CLERK_SECRET_KEY`, `CLERK_PUBLISHABLE_KEY` — for JWT verification
- `OPENROUTER_API_KEY` — for AI model calls
- `TINYFISH_API_KEY` — for web search and fetch (populate agent). Get one at https://agent.tinyfish.ai/api-keys

In Docker, these are interpolated from the root `.env` file via `docker-compose.dev.yml`.
10 changes: 6 additions & 4 deletions backend/src/convex.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ConvexHttpClient } from "convex/browser";
import { anyApi } from "convex/server";

import { env } from "./env.js";

Expand All @@ -16,11 +17,12 @@ import { env } from "./env.js";
* ✗ NEVER use this to act "on behalf of a user". For user-initiated work,
* the frontend should call Convex directly with the user's Clerk JWT.
*
* If admin key is missing, this client can still call PUBLIC functions but
* will fail closed on internal ones (which is the desired behavior — better
* to error than to silently degrade).
* `anyApi` is an untyped proxy that resolves function references at runtime.
* Full types come from the frontend's generated code (included via tsconfig)
* and are available in the IDE, but the Docker container doesn't need them.
*/
export { api, internal } from "../../frontend/convex/_generated/api.js";
export const api = anyApi;
export const internal = anyApi;

export const convex = new ConvexHttpClient(env.CONVEX_URL);

Expand Down
41 changes: 41 additions & 0 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import fastifyCors from "@fastify/cors";
import { env } from "./env.js";
import clerkAuthPlugin, { requireAuth } from "./clerk-auth.js";
import { inferSchema } from "./pipeline/schema-inference.js";
import { datasetContextSchema } from "./pipeline/populate.js";
import { populateWorkflow } from "./mastra/workflows/populate.js";
import { convex, api } from "./convex.js";

const fastify = Fastify({ logger: true });

Expand Down Expand Up @@ -47,6 +50,44 @@ await fastify.register(async (instance) => {
return reply.code(502).send({ error: "Schema inference failed. Please try again." });
}
});

instance.post("/populate", async (req, reply) => {
const parsed = datasetContextSchema.safeParse(req.body);
if (!parsed.success) {
return reply.code(400).send({
error: "Invalid request",
details: parsed.error.flatten().fieldErrors,
});
}

try {
const dataset = await convex.query(api.datasets.get, { id: parsed.data.datasetId });
if (!dataset) {
return reply.code(404).send({ error: "Dataset not found" });
}
if (dataset.ownerId !== req.auth.userId) {
return reply.code(403).send({ error: "Not authorized to populate this dataset" });
}

const run = await populateWorkflow.createRun();
const result = await run.start({ inputData: parsed.data });

req.log.info({ workflowStatus: result.status, steps: JSON.stringify(result.steps).slice(0, 2000) }, "Populate workflow completed");

if (result.status !== "success") {
throw new Error(`Workflow ended with status: ${result.status}`);
}

return { success: true, result: result.result };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
if (msg.includes("validator") || msg.includes("Invalid")) {
return reply.code(400).send({ error: "Invalid datasetId" });
}
Comment on lines +83 to +86

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Narrow invalid-dataset detection to dataset lookup failures only.

Line 84 matches any error containing "Invalid", so downstream workflow/tool failures can be mislabeled as 400 Invalid datasetId. That can hide real server-side failures and return the wrong status code.

Suggested fix
-    try {
-      const dataset = await convex.query(api.datasets.get, { id: parsed.data.datasetId });
-      if (!dataset) {
-        return reply.code(404).send({ error: "Dataset not found" });
-      }
-      if (dataset.ownerId !== req.auth.userId) {
-        return reply.code(403).send({ error: "Not authorized to populate this dataset" });
-      }
-
-      const run = await populateWorkflow.createRun();
-      const result = await run.start({ inputData: parsed.data });
+    let dataset;
+    try {
+      dataset = await convex.query(api.datasets.get, { id: parsed.data.datasetId });
+    } catch (err) {
+      const msg = err instanceof Error ? err.message : String(err);
+      if (msg.includes("validator") || msg.includes("Invalid")) {
+        return reply.code(400).send({ error: "Invalid datasetId" });
+      }
+      req.log.error(err, "Dataset lookup failed");
+      return reply.code(502).send({ error: "Failed to populate dataset. Please try again." });
+    }
+
+    if (!dataset) {
+      return reply.code(404).send({ error: "Dataset not found" });
+    }
+    if (dataset.ownerId !== req.auth.userId) {
+      return reply.code(403).send({ error: "Not authorized to populate this dataset" });
+    }
+
+    try {
+      const run = await populateWorkflow.createRun();
+      const result = await run.start({ inputData: parsed.data });
@@
-    } catch (err) {
-      const msg = err instanceof Error ? err.message : String(err);
-      if (msg.includes("validator") || msg.includes("Invalid")) {
-        return reply.code(400).send({ error: "Invalid datasetId" });
-      }
+    } catch (err) {
       req.log.error(err, "Populate failed");
       return reply.code(502).send({ error: "Failed to populate dataset. Please try again." });
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/src/index.ts` around lines 83 - 86, Only treat the error as a
client-side "Invalid datasetId" when it actually comes from the dataset lookup:
change the condition that currently checks msg.includes("Invalid") to a stricter
check such as msg === "Invalid datasetId" or msg.includes("Invalid datasetId")
and/or verify the error originates from the dataset lookup function (e.g.,
getDatasetById) or has a specific error.name/marker; leave other errors to fall
through so they return the appropriate server error status instead of
reply.code(400).send({ error: "Invalid datasetId" }).

req.log.error(err, "Populate failed");
return reply.code(502).send({ error: "Failed to populate dataset. Please try again." });
}
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});

try {
Expand Down
36 changes: 36 additions & 0 deletions backend/src/mastra/agents/populate.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Agent } from "@mastra/core/agent";
import { createOpenRouter } from "@openrouter/ai-sdk-provider";
import {
insertRowTool,
listRowsTool,
getRowTool,
updateRowTool,
deleteRowTool,
} from "../tools/dataset-tools.js";
import { searchWebTool, fetchPageTool } from "../tools/web-tools.js";

const openrouter = createOpenRouter({
apiKey: process.env.OPENROUTER_API_KEY!,
});

export const populateAgent = new Agent({
id: "populate-agent",
name: "Dataset Populate Agent",
instructions: `You fill datasets with real data. Here's how:

1. Search the web for data that fits the dataset topic.
2. Fetch 1-2 pages to get details.
3. Call insert_row for each row using what you found. Don't stop until you've inserted all the rows asked for.

If you can't find enough real data, make up realistic data to fill the rest. Every row must be inserted with insert_row.`,
model: openrouter("anthropic/claude-sonnet-4-6"),
tools: {
insert_row: insertRowTool,
list_rows: listRowsTool,
get_row: getRowTool,
update_row: updateRowTool,
delete_row: deleteRowTool,
search_web: searchWebTool,
fetch_page: fetchPageTool,
},
});
5 changes: 4 additions & 1 deletion backend/src/mastra/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { Mastra } from "@mastra/core/mastra";
import { inferSchemaWorkflow } from "./workflows/infer-schema.js";
import { populateWorkflow } from "./workflows/populate.js";
import { populateAgent } from "./agents/populate.js";

export const mastra = new Mastra({
workflows: { inferSchemaWorkflow },
agents: { populateAgent },
workflows: { inferSchemaWorkflow, populateWorkflow },
});
161 changes: 161 additions & 0 deletions backend/src/mastra/tools/dataset-tools.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import { createTool } from "@mastra/core/tools";
import { z } from "zod";
import { convex, api, internal } from "../../convex.js";

const resultSchema = z.object({
success: z.boolean(),
error: z.string().optional(),
});

function cleanDataKeys(data: Record<string, unknown>): Record<string, unknown> {
const cleaned: Record<string, unknown> = {};
for (const [key, value] of Object.entries(data)) {
cleaned[key.replace(/^["`]+|["`]+$/g, "")] = value;
}
return cleaned;
}

export const insertRowTool = createTool({
id: "insert_row",
description:
"Insert a single row into the dataset. Call this each time you have a row ready — don't wait to batch them.",
inputSchema: z.object({
datasetId: z.string(),
data: z.record(z.string(), z.any()),
}),
outputSchema: resultSchema,
execute: async ({ datasetId, data }) => {
if (!datasetId) return { success: false, error: "datasetId is required." };
if (!data || Object.keys(data).length === 0)
return { success: false, error: "data is required and must have at least one key. Pass an object like { \"Column Name\": value }." };

const cleanedData = cleanDataKeys(data);
console.log(`[insert_row] Inserting row into ${datasetId} (${Object.keys(cleanedData).length} columns)`);
try {
await convex.mutation(internal.datasetRows.insert, { datasetId, data: cleanedData });
console.log(`[insert_row] Row inserted successfully`);
return { success: true };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[insert_row] Failed:`, msg);
if (msg.includes("not found"))
return { success: false, error: `Dataset "${datasetId}" not found. Check the datasetId is correct.` };
if (msg.includes("validator"))
return { success: false, error: `Data validation failed: ${msg}. Check that your data keys are plain strings and values match expected types.` };
return { success: false, error: `Insert failed: ${msg}` };
}
},
});

export const listRowsTool = createTool({
id: "list_rows",
description:
"Read all rows in the dataset. Returns an array of row objects, each with _id and data fields.",
inputSchema: z.object({
datasetId: z.string(),
}),
outputSchema: z.object({ rows: z.array(z.any()).optional(), error: z.string().optional() }),
execute: async ({ datasetId }) => {
if (!datasetId) return { error: "datasetId is required." };

console.log(`[list_rows] Reading all rows for dataset ${datasetId}`);
try {
const rows = await convex.query(api.datasetRows.listByDataset, { datasetId });
console.log(`[list_rows] Found ${rows.length} rows`);
return { rows };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[list_rows] Failed:`, msg);
if (msg.includes("not found"))
return { error: `Dataset "${datasetId}" not found. Check the datasetId.` };
return { error: `List rows failed: ${msg}` };
}
},
});

export const getRowTool = createTool({
id: "get_row",
description:
"Read a single row by its ID. Returns the row object with _id and data fields, or an error if not found.",
inputSchema: z.object({
rowId: z.string(),
}),
outputSchema: z.object({ row: z.any().optional(), error: z.string().optional() }),
execute: async ({ rowId }) => {
if (!rowId) return { error: "rowId is required." };

console.log(`[get_row] Reading row ${rowId}`);
try {
const row = await convex.query(internal.datasetRows.get, { id: rowId });
if (!row) return { error: `Row "${rowId}" not found. It may have been deleted.` };
console.log(`[get_row] Found`);
return { row };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[get_row] Failed:`, msg);
if (msg.includes("validator") || msg.includes("Invalid"))
return { error: `Invalid row ID format: "${rowId}". Row IDs look like "jd7..." — they are Convex document IDs.` };
return { error: `Get row failed: ${msg}` };
}
},
});

export const updateRowTool = createTool({
id: "update_row",
description:
"Update an existing row by its ID. Pass the full updated data object. Changes are tracked in history.",
inputSchema: z.object({
rowId: z.string(),
data: z.record(z.string(), z.any()),
}),
outputSchema: resultSchema,
execute: async ({ rowId, data }) => {
if (!rowId) return { success: false, error: "rowId is required." };
if (!data || Object.keys(data).length === 0)
return { success: false, error: "data is required. Pass the full updated row data object." };

const cleanedData = cleanDataKeys(data);
console.log(`[update_row] Updating row ${rowId} (${Object.keys(cleanedData).length} columns)`);
try {
await convex.mutation(internal.datasetRows.update, { id: rowId, data: cleanedData });
console.log(`[update_row] Row updated successfully`);
return { success: true };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[update_row] Failed:`, msg);
if (msg.includes("Row not found") || msg.includes("not found"))
return { success: false, error: `Row "${rowId}" not found. Use list_rows to see existing row IDs.` };
if (msg.includes("validator") || msg.includes("Invalid"))
return { success: false, error: `Invalid input: ${msg}. Check that rowId is a valid Convex ID and data keys are plain strings.` };
return { success: false, error: `Update failed: ${msg}` };
}
},
});

export const deleteRowTool = createTool({
id: "delete_row",
description:
"Delete a single row by its ID. This is permanent.",
inputSchema: z.object({
rowId: z.string(),
}),
outputSchema: resultSchema,
execute: async ({ rowId }) => {
if (!rowId) return { success: false, error: "rowId is required." };

console.log(`[delete_row] Deleting row ${rowId}`);
try {
await convex.mutation(internal.datasetRows.remove, { id: rowId });
console.log(`[delete_row] Row deleted successfully`);
return { success: true };
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
console.error(`[delete_row] Failed:`, msg);
if (msg.includes("not found"))
return { success: false, error: `Row "${rowId}" not found. It may have already been deleted.` };
if (msg.includes("validator") || msg.includes("Invalid"))
return { success: false, error: `Invalid row ID format: "${rowId}". Use list_rows to find valid row IDs.` };
return { success: false, error: `Delete failed: ${msg}` };
}
},
});
Loading