Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 15 additions & 44 deletions packages/appkit/src/database/introspector/diff.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
import type { IntrospectedTable, IntrospectionResult } from "./types";

/** Severity of a drift entry. */
export type DriftSeverity = "info" | "warn" | "error";

/** A single drift entry. */
export interface DriftEntry {
/** The severity of the drift entry. */
severity: DriftSeverity;
/** The kind of drift entry. */
kind: "live-only" | "schema-only" | "type-mismatch";
/** The message of the drift entry. */
message: string;
}

/** A report of drift entries. */
export interface DriftReport {
/** Whether there is any drift. */
hasDrift: boolean;
/** The entries of the drift report. */
entries: DriftEntry[];
}

/** Diff two introspections and return a report of drift entries. */
/**
* TODO(rls): policies are not compared — `schemaToIntrospection` always
* returns `policies: []`, so any DB-side policy would show as `live-only`.
* Re-enable once the schema-builder declares policies.
*/
export function diffIntrospections(
live: IntrospectionResult,
declared: IntrospectionResult,
Expand Down Expand Up @@ -56,7 +52,6 @@ export function diffIntrospections(
return { hasDrift: entries.length > 0, entries };
}

/** Diff two tables and return a report of drift entries. */
function diffColumns(
key: string,
live: IntrospectedTable,
Expand Down Expand Up @@ -98,24 +93,15 @@ function diffColumns(
}
}

/** Get the key of a table. */
function tableKey(table: Pick<IntrospectedTable, "schema" | "name">): string {
return `${table.schema}.${table.name}`;
}

/**
* Compares the column contract beyond the raw Postgres type.
*
* Runtime writes and migrations depend on nullability, defaults, keys,
* generated columns, and FK actions, so drift detection must compare the
* metadata captured by introspection instead of stopping at `pgType`.
*
* Server-generated columns get special treatment: when both sides agree the
* column is server-generated, we skip `hasDefault` and `defaultExpression`
* comparisons because the live DB stores the literal `nextval(...)` /
* `GENERATED AS IDENTITY` expression while the schema models the same fact
* as `serverGenerated: true` metadata. Comparing them would produce noise on
* every introspect → verify roundtrip for serial / bigserial / identity PKs.
* Compare column metadata beyond `pgType` (nullable, default, PK, FK).
* Skip default/hasDefault when both sides are server-generated — the live DB
* stores `nextval(...)` / `GENERATED AS IDENTITY` while the schema flags it
* as `serverGenerated: true`; direct compare would noise-flag every serial PK.
*/
function diffColumnMetadata(
table: string,
Expand Down Expand Up @@ -184,7 +170,6 @@ function diffColumnMetadata(
}
}

/** Compare a field of a column and return a report of drift entries. */
function compareField(
table: string,
column: string,
Expand All @@ -203,10 +188,7 @@ function compareField(
});
}

/**
* Normalizes FK metadata into one comparable value so missing references and
* action changes produce a single readable drift entry.
*/
/** Flatten FK metadata to one comparable string for a single drift entry. */
function normalizeReference(
reference: IntrospectedTable["columns"][number]["references"],
): string {
Expand All @@ -223,17 +205,10 @@ function formatValue(value: unknown): string {
}

/**
* Strip the trivial `'literal'::type` cast Postgres emits around quoted
* string defaults so that `'member'::text` (live) compares equal to `member`
* (declared). Also unescapes `''` -> `'` inside the literal.
*
* Deliberately conservative:
* - Matches a SINGLE quoted literal followed by a single `::type` cast.
* - Does NOT touch expressions that contain `||`, function calls, or
* additional casts — those are kept verbatim and compared as-is so we
* don't claim equality between two non-trivially-different expressions
* and silently miss real drift. Example: `'foo'::text || 'bar'::text`
* and `'foobar'` stay distinct.
* Strip Postgres's `'literal'::type` cast so `'member'::text` (live) compares
* equal to `member` (declared); unescape `''` → `'`. Conservative: only one
* quoted literal + one cast; expressions with `||`, function calls, or extra
* casts pass through verbatim — better a false positive than missed drift.
*/
function normalizeDefaultExpression(
value: string | undefined,
Expand All @@ -245,10 +220,6 @@ function normalizeDefaultExpression(
return trimmed;
}

/**
* Matches `'literal'::type` where the literal is a single quoted string with
* `''` escaping and the type is a simple identifier (no parens, no `||`,
* no further casts).
*/
/** `'literal'::type` — single quoted string + simple type identifier only. */
const SIMPLE_CAST_LITERAL =
/^'((?:[^']|'')*)'::[a-zA-Z_][\w]*(?:\s*\(\s*\d+\s*\))?$/;
10 changes: 7 additions & 3 deletions packages/appkit/src/plugins/database/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ export const STATEMENT_TIMEOUT_DEFAULT_MS = 15_000;
/** `application_name` per connection — surfaces in `pg_stat_activity`/Lakebase audit. */
export const APPLICATION_NAME = "appkit:database";

/** GUC name AppKit `SET`s on every OBO connection for RLS policies to read. */
export const DEFAULT_RLS_SESSION_VARIABLE = "app.user_id";

/**
* OBO pool defaults — small (one pool per user). Fan-out = `(1 + oboPoolMax) × max`;
* defaults cap at `(1+25)×4 + 10 ≈ 114` conns per instance.
* OBO pool defaults. `max=2` because a single user typically serializes HTTP
* requests; 2 conns covers occasional overlap without bloating fan-out.
* Combined with `oboPoolMax=100`, fan-out is `(1+100)×2 + 10 ≈ 212` conns.
*/
export const OBO_POOL_DEFAULTS = {
...POOL_DEFAULTS,
max: 4,
max: 2,
};

/** Default page size when no `?limit=` is given. */
Expand Down
28 changes: 17 additions & 11 deletions packages/appkit/src/plugins/database/entity-wiring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { AuthenticationError, ConfigurationError } from "@/errors";
import { createLogger } from "@/logging/logger";
import {
APPLICATION_NAME,
DEFAULT_RLS_SESSION_VARIABLE,
OBO_POOL_DEFAULTS,
STATEMENT_TIMEOUT_DEFAULT_MS,
} from "./defaults";
Expand Down Expand Up @@ -160,14 +161,14 @@ function makeUserPoolRegistry(
user: identity.email,
workspaceClient: createUserWorkspaceClient(identity.token),
});
// Session-local `app.user_id` so `current_user_id()` RLS helpers resolve
// to the OBO user — safe at session scope since identity is invariant in
// this per-user pool. `statement_timeout` set here too so OBO queries get
// the same server-side cap as SP ones.
// Session-local GUC so RLS helpers resolve to the OBO user — safe at
// session scope since identity is invariant in this per-user pool.
// `statement_timeout` set here too so OBO matches SP server-side cap.
const statementTimeoutMs =
config.statementTimeoutMs ?? STATEMENT_TIMEOUT_DEFAULT_MS;
const sessionVariable =
config.rls?.sessionVariable ?? DEFAULT_RLS_SESSION_VARIABLE;
pool.on("connect", (client) => {
// Tag OBO conns in pg_stat_activity so operators can split SP vs OBO traffic.
client
.query(`SET application_name = '${APPLICATION_NAME}:obo'`)
.catch((err) => {
Expand All @@ -178,10 +179,14 @@ function makeUserPoolRegistry(
);
});
client
.query("SELECT set_config('app.user_id', $1, false)", [identity.email])
.query("SELECT set_config($1, $2, false)", [
sessionVariable,
identity.email,
])
.catch((err) => {
logger.error(
"Failed to set app.user_id on user pool connection for %s: %O",
"Failed to set %s on user pool connection for %s: %O",
sessionVariable,
tag,
err,
);
Expand Down Expand Up @@ -249,7 +254,7 @@ function resolveUserPoolIdentity(
if (email && token) return { email, token };

if (isDev) {
logger.warn(
logger.debug(
"Database OBO requested without x-forwarded-email/x-forwarded-access-token; falling back to service pool in development.",
);
return null;
Expand All @@ -275,9 +280,10 @@ function createUserWorkspaceClient(token: string): WorkspaceClient {
}

function normalizePoolMax(value: number | undefined): number {
// Default 25 keeps fan-out tractable on Lakebase tiers ((1+25)×4 + SP(10)
// ≈ 114 conns). Hot-OBO apps should raise explicitly after sizing the tier.
if (!Number.isFinite(value) || value === undefined) return 25;
// Default 100 active users per instance before LRU evicts; with
// OBO_POOL_DEFAULTS.max=2, fan-out is (1+100)×2 + SP(10) ≈ 212 conns.
// Sized for 1+ CU Lakebase tiers; tune up for hot OBO, down for 0.5 CU.
if (!Number.isFinite(value) || value === undefined) return 100;
return Math.max(1, Math.floor(value));
}

Expand Down
16 changes: 14 additions & 2 deletions packages/appkit/src/plugins/database/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,20 @@
},
"oboPoolMax": {
"type": "number",
"default": 25,
"description": "Maximum number of per-user OBO pools to keep open. Worst-case fan-out is (1 + oboPoolMax) × OBO_POOL_DEFAULTS.max + POOL_DEFAULTS.max connections per app instance."
"default": 100,
"description": "Maximum number of per-user OBO pools to keep open. Worst-case fan-out is (1 + oboPoolMax) × OBO_POOL_DEFAULTS.max + POOL_DEFAULTS.max connections per app instance (default 212)."
},
"rls": {
"type": "object",
"additionalProperties": false,
"description": "Row-level security tunables.",
"properties": {
"sessionVariable": {
"type": "string",
"default": "app.user_id",
"description": "GUC name AppKit SETs on every OBO connection. Override to align with existing policies that read another setting."
}
}
},
"cache": {
"type": "object",
Expand Down
63 changes: 61 additions & 2 deletions packages/appkit/src/plugins/database/tests/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,67 @@ describe("DatabasePlugin", () => {
const client = { query: vi.fn(async () => ({})) };
handler(client);
expect(client.query).toHaveBeenCalledWith(
"SELECT set_config('app.user_id', $1, false)",
["alice@example.com"],
"SELECT set_config($1, $2, false)",
["app.user_id", "alice@example.com"],
);

if (originalHost === undefined) {
delete process.env.DATABRICKS_HOST;
} else {
process.env.DATABRICKS_HOST = originalHost;
}
});

test("rls.sessionVariable override flows into the SET on user pool connect", async () => {
const originalHost = process.env.DATABRICKS_HOST;
process.env.DATABRICKS_HOST = "https://example.cloud.databricks.com";
const servicePool = {
end: vi.fn(async () => undefined),
on: vi.fn(),
} as unknown as Pool;
const userPool = {
end: vi.fn(async () => undefined),
on: vi.fn(),
} as unknown as Pool;
vi.mocked(createLakebasePool)
.mockReturnValueOnce(servicePool)
.mockReturnValueOnce(userPool);
const schema = defineSchema(({ table }) => ({
user: table("user", { id: id(), email: text().notNull() }),
}));
vi.mocked(loadSchemaByConvention).mockResolvedValue({
schema,
schemaPath: "/app/config/database/schema.ts",
});

const plugin = createPlugin({
rls: { sessionVariable: "myapp.uid" },
});
await plugin.setup();
const exports = plugin.exports() as unknown as {
user: { asUser: (req: import("express").Request) => unknown };
};
const req = {
header: vi.fn((name: string) => {
if (name === "x-forwarded-email") return "alice@example.com";
if (name === "x-forwarded-access-token") return "tok-alice";
return undefined;
}),
} as unknown as import("express").Request;
exports.user.asUser(req);

const handler = vi
.mocked(userPool.on)
.mock.calls.find(
([event]) => event === "connect",
)?.[1] as unknown as (client: {
query: ReturnType<typeof vi.fn>;
}) => void;
const client = { query: vi.fn(async () => ({})) };
handler(client);
expect(client.query).toHaveBeenCalledWith(
"SELECT set_config($1, $2, false)",
["myapp.uid", "alice@example.com"],
);

if (originalHost === undefined) {
Expand Down
14 changes: 11 additions & 3 deletions packages/appkit/src/plugins/database/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ export interface IDatabaseConfig extends BasePluginConfig {
cache?: CacheSettings;
/**
* Maximum number of distinct per-user (OBO) pools the registry keeps alive
* at once. Each pool defaults to `OBO_POOL_DEFAULTS.max = 4` connections, so
* the worst-case fan-out is `(1 + oboPoolMax) × poolMax`. Defaults to 25
* tune up for hot OBO traffic, down for low-tier Lakebase plans.
* at once. Each pool defaults to `OBO_POOL_DEFAULTS.max = 2` connections, so
* worst-case fan-out is `(1 + oboPoolMax) × poolMax + 10`. Defaults to 100
* tune up for hot OBO traffic, down for 0.5 CU Lakebase tiers.
*/
oboPoolMax?: number;
/**
Expand All @@ -166,6 +166,14 @@ export interface IDatabaseConfig extends BasePluginConfig {
* timeout interceptor still applies on the client side.
*/
statementTimeoutMs?: number;
/** Row-level security tunables. */
rls?: {
/**
* GUC name AppKit `SET`s on every OBO connection. Override to align with
* existing policies that read another setting. Defaults to `app.user_id`.
*/
sessionVariable?: string;
};
/**
* When true, schema-load and drift-check failures during `setup()` are
* logged but do not throw. Defaults to false (fail closed). Useful in
Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/cli/commands/db/__tests__/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ describe("dbCommand", () => {
"introspect",
"migration",
"migrate",
"rls",
"seed",
"setup:dev",
"types",
Expand Down
Loading