diff --git a/packages/drivers/src/duckdb.ts b/packages/drivers/src/duckdb.ts index 3ccca467a..867840d0a 100644 --- a/packages/drivers/src/duckdb.ts +++ b/packages/drivers/src/duckdb.ts @@ -56,33 +56,57 @@ export async function connect(config: ConnectionConfig): Promise { new Promise((resolve, reject) => { let resolved = false let timeout: ReturnType | undefined + let instance: any + // Sentinel for an open callback that fired synchronously (before + // `instance` was assigned): `undefined` = not yet fired, `null` = + // fired with success, `Error` = fired with failure. Replayed once + // `instance` exists. + let pendingOpen: Error | null | undefined const opts = accessMode ? { access_mode: accessMode } : undefined - const instance = new duckdb.Database( - dbPath, - opts, - (err: Error | null) => { - if (resolved) { if (instance && typeof instance.close === "function") instance.close(); return } - resolved = true - if (timeout) clearTimeout(timeout) - if (err) { - const msg = err.message || String(err) - if (msg.toLowerCase().includes("locked") || msg.includes("SQLITE_BUSY") || msg.includes("DUCKDB_LOCKED")) { - reject(new Error("DUCKDB_LOCKED")) - } else { - reject(err) - } + const closeQuietly = () => { + try { + if (instance && typeof instance.close === "function") instance.close() + } catch { + // best-effort cleanup of a half-open handle + } + } + const onOpen = (err: Error | null) => { + if (!instance) { + pendingOpen = err + return + } + if (resolved) { + closeQuietly() + return + } + resolved = true + if (timeout) clearTimeout(timeout) + if (err) { + // Open failed — release the half-open handle so it doesn't leak. + closeQuietly() + const msg = err.message || String(err) + if (msg.toLowerCase().includes("locked") || msg.includes("SQLITE_BUSY") || msg.includes("DUCKDB_LOCKED")) { + reject(new Error("DUCKDB_LOCKED")) } else { - resolve(instance) + reject(err) } - }, - ) - // Bun: native callback may not fire; fall back after 2s + } else { + resolve(instance) + } + } + instance = opts + ? new duckdb.Database(dbPath, opts, onOpen) + : new duckdb.Database(dbPath, onOpen) + // Bun: native callback may not fire; fall back after 2s. Arm the timer + // BEFORE replaying a synchronous callback so a sync resolve/reject can + // actually clear it (otherwise it lingers ~2s and delays process exit). timeout = setTimeout(() => { if (!resolved) { resolved = true reject(new Error(`Timed out opening DuckDB database "${dbPath}"`)) } }, 2000) + if (pendingOpen !== undefined) onOpen(pendingOpen) }) try { diff --git a/packages/drivers/test/driver-security.test.ts b/packages/drivers/test/driver-security.test.ts index 1fd4a9435..6a0f8d8c5 100644 --- a/packages/drivers/test/driver-security.test.ts +++ b/packages/drivers/test/driver-security.test.ts @@ -12,6 +12,10 @@ import { describe, test, expect, mock, beforeEach, afterEach } from "bun:test" // DuckDB: wrapDuckDBError + lock retry // --------------------------------------------------------------------------- describe("DuckDB driver", () => { + function openCallback(optsOrCb: any, cb?: (err: Error | null) => void): (err: Error | null) => void { + return typeof optsOrCb === "function" ? optsOrCb : cb! + } + // Test wrapDuckDBError logic inline (it's a closure, so we test via connect behavior) describe("wrapDuckDBError", () => { test("wraps SQLITE_BUSY errors with user-friendly message", async () => { @@ -28,8 +32,8 @@ describe("DuckDB driver", () => { mock.module("duckdb", () => ({ default: { Database: class { - constructor(_path: string, _opts: any, cb: (err: Error | null) => void) { - setTimeout(() => cb(null), 0) + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + setTimeout(() => openCallback(optsOrCb, cb)(null), 0) } connect() { return mockDb.connect() @@ -62,8 +66,8 @@ describe("DuckDB driver", () => { mock.module("duckdb", () => ({ default: { Database: class { - constructor(_path: string, _opts: any, cb: (err: Error | null) => void) { - setTimeout(() => cb(null), 0) + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + setTimeout(() => openCallback(optsOrCb, cb)(null), 0) } connect() { return { @@ -97,8 +101,8 @@ describe("DuckDB driver", () => { mock.module("duckdb", () => ({ default: { Database: class { - constructor(_path: string, _opts: any, cb: (err: Error | null) => void) { - setTimeout(() => cb(null), 0) + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + setTimeout(() => openCallback(optsOrCb, cb)(null), 0) } connect() { return { @@ -132,17 +136,21 @@ describe("DuckDB driver", () => { describe("connect retry with READ_ONLY", () => { test("retries with READ_ONLY when file DB is locked on initial connect", async () => { let connectAttempts = 0 + const accessModes: Array = [] mock.module("duckdb", () => ({ default: { Database: class { - constructor(_path: string, opts: any, cb: (err: Error | null) => void) { + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + const opts = typeof optsOrCb === "function" ? undefined : optsOrCb + const done = openCallback(optsOrCb, cb) + accessModes.push(opts?.access_mode) connectAttempts++ if (connectAttempts === 1 && !opts?.access_mode) { // First attempt fails with lock error - setTimeout(() => cb(new Error("DUCKDB_LOCKED: file is locked")), 0) + setTimeout(() => done(new Error("DUCKDB_LOCKED: file is locked")), 0) } else { // READ_ONLY retry succeeds - setTimeout(() => cb(null), 0) + setTimeout(() => done(null), 0) } } connect() { @@ -163,6 +171,10 @@ describe("DuckDB driver", () => { const connector = await connect({ type: "duckdb", path: "/tmp/test.duckdb" }) await connector.connect() expect(connectAttempts).toBe(2) // First failed, second succeeded in READ_ONLY + // The retry must specifically request READ_ONLY — two attempts alone don't + // prove the lock was worked around correctly. + expect(accessModes[0]).toBeUndefined() + expect(accessModes[1]).toBe("READ_ONLY") await connector.close() }) @@ -171,8 +183,8 @@ describe("DuckDB driver", () => { mock.module("duckdb", () => ({ default: { Database: class { - constructor(_path: string, _opts: any, cb: (err: Error | null) => void) { - setTimeout(() => cb(new Error("DUCKDB_LOCKED")), 0) + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + setTimeout(() => openCallback(optsOrCb, cb)(new Error("DUCKDB_LOCKED")), 0) } connect() { return {} @@ -195,6 +207,89 @@ describe("DuckDB driver", () => { expect(e.message).toBe("DUCKDB_LOCKED") } }) + + test("handles native open callback invoked synchronously", async () => { + mock.module("duckdb", () => ({ + default: { + Database: class { + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + openCallback(optsOrCb, cb)(null) + } + connect() { + return { + all: (_sql: string, cb: (err: Error | null, rows: any[]) => void) => { + cb(null, [{ ok: 1 }]) + }, + } + } + close(cb: any) { + if (cb) cb(null) + } + }, + }, + })) + + const { connect } = await import("../src/duckdb") + const connector = await connect({ type: "duckdb", path: ":memory:" }) + await connector.connect() + expect(await connector.execute("SELECT 1")).toMatchObject({ columns: ["ok"], rows: [[1]], row_count: 1 }) + await connector.close() + }) + + test("retries read-only when native open synchronously reports a file lock", async () => { + let connectAttempts = 0 + mock.module("duckdb", () => ({ + default: { + Database: class { + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + const opts = typeof optsOrCb === "function" ? undefined : optsOrCb + const done = openCallback(optsOrCb, cb) + connectAttempts++ + done(connectAttempts === 1 && !opts?.access_mode ? new Error("DUCKDB_LOCKED: file is locked") : null) + } + connect() { + return { + all: (_sql: string, cb: (err: Error | null, rows: any[]) => void) => { + cb(null, [{ ok: 1 }]) + }, + } + } + close(cb: any) { + if (cb) cb(null) + } + }, + }, + })) + + const { connect } = await import("../src/duckdb") + const connector = await connect({ type: "duckdb", path: "/tmp/sync-lock.duckdb" }) + await connector.connect() + expect(connectAttempts).toBe(2) + expect(await connector.execute("SELECT 1")).toMatchObject({ columns: ["ok"], rows: [[1]], row_count: 1 }) + await connector.close() + }) + + test("propagates synchronous non-lock open errors without connecting undefined db", async () => { + mock.module("duckdb", () => ({ + default: { + Database: class { + constructor(_path: string, optsOrCb: any, cb?: (err: Error | null) => void) { + openCallback(optsOrCb, cb)(new Error("catalog is corrupt")) + } + connect() { + throw new Error("should not connect") + } + close(cb: any) { + if (cb) cb(null) + } + }, + }, + })) + + const { connect } = await import("../src/duckdb") + const connector = await connect({ type: "duckdb", path: "/tmp/corrupt.duckdb" }) + await expect(connector.connect()).rejects.toThrow("catalog is corrupt") + }) }) }) diff --git a/packages/opencode/src/altimate/native/types.ts b/packages/opencode/src/altimate/native/types.ts index 7b47f3068..f3e72e562 100644 --- a/packages/opencode/src/altimate/native/types.ts +++ b/packages/opencode/src/altimate/native/types.ts @@ -879,6 +879,7 @@ export interface AltimateCoreTestgenParams { export interface AltimateCoreEquivalenceParams { sql1: string sql2: string + dialect?: string schema_path?: string schema_context?: Record } diff --git a/packages/opencode/src/altimate/review/orchestrate.ts b/packages/opencode/src/altimate/review/orchestrate.ts index a72778a64..68cefafde 100644 --- a/packages/opencode/src/altimate/review/orchestrate.ts +++ b/packages/opencode/src/altimate/review/orchestrate.ts @@ -117,7 +117,7 @@ export interface ReviewRunner { impact(model: string): Promise grade(sql: string, dialect: string): Promise check(sql: string, dialect: string, baseSql?: string): Promise - equivalence(oldSql: string, newSql: string, dialect: string): Promise + equivalence(oldSql: string, newSql: string, dialect?: string): Promise detectPii(sql: string, dialect: string): Promise<{ columns: string[] }> /** * Lexical scan (reserved-word aliases + dialect operators) — the curated lists @@ -813,8 +813,23 @@ const STRUCTURAL_CATEGORY: Record = { coalesce_removed: "semantic_change", removed_predicate: "semantic_change", type_narrowing: "contract_violation", + join_key_regression: "join_risk", } +// PII classification thresholds for the diff-scoped lane. Names/addresses are +// common, lower-sensitivity PII that the broad name-pattern detector over-flags, +// so the precise lane requires higher confidence before surfacing them. Kept a +// named constant (not a magic number) and deliberately narrow: broadening the +// low-risk set to variants would suppress MORE real columns, hurting recall. +const PII_LOW_RISK_CLASS = /^(name|address)$/i +const PII_LOW_RISK_MIN_CONFIDENCE = 0.9 +const PII_DEFAULT_MIN_CONFIDENCE = 0.7 +// Column-name tokens the regex PII twins (`pii_into_mart`) key off — used to +// recover which column a coarse regex finding flagged so suppression can be +// column-aware (suppress only when core actually classified THAT column). +const PII_TOKEN_RE = + /\b(email|ssn|social_security|phone_number|first_name|last_name|full_name|street_address|date_of_birth|dob|passport|credit_card)\b/i + async function structuralChangeLane(ctx: ModelContext, runner: ReviewRunner): Promise { if (!runner.structuralDiff) return [] // Use the ENGINE (compiled) SQL — structural_diff parses with a real SQL parser, @@ -828,7 +843,11 @@ async function structuralChangeLane(ctx: ModelContext, runner: ReviewRunner): Pr const out: Finding[] = [] for (const f of raw) { const cat: ReviewCategory = STRUCTURAL_CATEGORY[f.rule] ?? "semantic_change" - const sev = clampSeverity(cat, "warning", "high") + // Only `join_risk` (SC010 join-key regression) is allowed to block: a changed + // join key silently corrupts row sets. Other structural rules core marks + // `error` stay advisory (warning) so this lane never over-blocks on a rule + // it wasn't designed to gate. + const sev = clampSeverity(cat, cat === "join_risk" && f.severity === "error" ? "critical" : "warning", "high") out.push( makeFinding({ severity: sev, @@ -915,29 +934,65 @@ async function siblingConsistencyLane(ctxs: ModelContext[], runner: ReviewRunner * only NEW PII (a column whose name appears in the added diff), so it never nags * about pre-existing PII columns the PR didn't touch. */ -async function piiClassifyLane(ctx: ModelContext, runner: ReviewRunner, dialect: string): Promise { - if (!runner.classifyPii) return [] +async function piiClassifyLane( + ctx: ModelContext, + runner: ReviewRunner, + dialect: string, +): Promise<{ findings: Finding[]; classifiedColumns: Set; completed: boolean }> { + const empty = { findings: [], classifiedColumns: new Set(), completed: false } + if (!runner.classifyPii) return empty const { file, engineNewSql } = ctx - if (!engineNewSql || file.status === "deleted") return [] + if (!engineNewSql || file.status === "deleted") return empty const model = modelNameFromPath(file.path) - if (!runner.columnLineage) return [] + if (!runner.columnLineage) return empty // Output columns the change INTRODUCED = head targets − base targets (precise; // a pre-existing column merely mentioned in the diff is NOT counted). - const headCols = [...new Set((await runner.columnLineage(engineNewSql, dialect)).map((e) => e.target).filter(Boolean))] - if (!headCols.length) return [] - const baseCols = ctx.engineOldSql - ? new Set((await runner.columnLineage(ctx.engineOldSql, dialect)).map((e) => e.target.toLowerCase())) - : undefined - const newCols = baseCols ? headCols.filter((c) => !baseCols.has(c.toLowerCase())) : headCols - if (!newCols.length) return [] - const pii = await runner.classifyPii(newCols) + let newCols: string[] + try { + const headCols = [...new Set((await runner.columnLineage(engineNewSql, dialect)).map((e) => e.target).filter(Boolean))] + // Zero head targets means lineage could NOT resolve this model's output + // columns — the classifier considered nothing, so this is uncertainty, not a + // confident "no new PII". Returning `completed: false` keeps the deterministic + // regex fallback alive (vs. the `!newCols.length` case below, where lineage + // DID resolve columns but none are newly introduced — that is genuinely done). + if (!headCols.length) return empty + const baseCols = ctx.engineOldSql + ? new Set((await runner.columnLineage(ctx.engineOldSql, dialect)).map((e) => e.target.toLowerCase())) + : undefined + newCols = baseCols ? headCols.filter((c) => !baseCols.has(c.toLowerCase())) : headCols + } catch { + return empty + } + if (!newCols.length) return { findings: [], classifiedColumns: new Set(), completed: true } + let pii + try { + pii = await runner.classifyPii(newCols) + } catch { + return empty + } + // Columns core's classifier actually CONSIDERED (returned a verdict for), even + // if below threshold. The broad fallback and the regex twins dedup against THIS + // set — not all introduced columns — so a column core simply MISSED (never + // classified) stays eligible for the broad name-pattern detector instead of + // being silently dropped. + const classifiedColumns = new Set(pii.map((p) => String(p.column ?? "").toLowerCase()).filter(Boolean)) const out: Finding[] = [] + // "Exposure" severity only escalates for published/broadly-read layers + // (marts/reporting). This is intentionally narrower than where grain tests + // apply (`missingGrainTestLane` also includes `intermediate`): an internal + // intermediate model surfacing PII is a warning, not a hard block. + const highExposureLayer = /(^|\/)(marts|reporting)\//.test(file.path) for (const p of pii) { - if (p.confidence < 0.7 || !p.column) continue + const classification = String(p.classification ?? "") + const lowRiskClass = PII_LOW_RISK_CLASS.test(classification) + const minConfidence = lowRiskClass ? PII_LOW_RISK_MIN_CONFIDENCE : PII_DEFAULT_MIN_CONFIDENCE + if (p.confidence < minConfidence || !p.column) continue const col = p.column.toLowerCase() + const highConfidence = p.confidence >= 0.9 + const severity: Severity = highExposureLayer && highConfidence && !lowRiskClass ? "critical" : "warning" out.push( makeFinding({ - severity: "warning", + severity: clampSeverity("pii_exposure", severity, highConfidence ? "high" : "medium"), category: "pii_exposure", title: `${model}: exposes ${p.classification} column \`${p.column}\``, body: @@ -946,13 +1001,13 @@ async function piiClassifyLane(ctx: ModelContext, runner: ReviewRunner, dialect: (p.masking ? `; suggested masking: \`${p.masking}\`.` : "."), file: file.path, model, - confidence: p.confidence >= 0.9 ? "high" : "medium", + confidence: highConfidence ? "high" : "medium", evidence: { tool: "altimate_core.classify_pii", result: p }, ruleKey: `pii_exposure:${col}`, }), ) } - return out + return { findings: out, classifiedColumns, completed: true } } function piiLane(file: ChangedFile & { kind: string }, columns: string[]): Finding[] { @@ -1045,6 +1100,13 @@ export async function runReview(input: OrchestrateInput): Promise() + const diffScopedPiiFindingFiles = new Set() + const classifiedPiiColumnsByFile = new Map>() for (const ctx of ctxByPath.values()) { const tasks: Promise[] = [] // Engine lanes consume COMPILED SQL (rendered by dbt) when available. @@ -1070,11 +1132,35 @@ export async function runReview(input: OrchestrateInput): Promise { + const diffScopedPii = await piiClassifyLane(currentCtx, input.runner, dialect) + if (diffScopedPii.completed) { + diffScopedPiiCompletedFiles.add(currentCtx.file.path) + classifiedPiiColumnsByFile.set(currentCtx.file.path, diffScopedPii.classifiedColumns) + } + if (diffScopedPii.findings.length) diffScopedPiiFindingFiles.add(currentCtx.file.path) + const findings = [...diffScopedPii.findings] + if (lanes.has("pii_exposure")) { + // The broad fallback covers output PII columns the precise lane did NOT + // classify (pre-existing columns, or ones core's classifier missed) — + // dedup against the columns core actually CONSIDERED, never all + // introduced columns, so a missed column still surfaces here. + const fallbackPii = diffScopedPii.completed + ? currentCtx.pii.filter((col) => !diffScopedPii.classifiedColumns.has(col.toLowerCase())) + : currentCtx.pii + findings.push(...piiLane(currentCtx.file, fallbackPii)) + } + return findings + })(), + ) // Deterministic dbt anti-pattern detectors run on RAW SQL + diff (need Jinja). if (lanes.has("dbt_patterns")) { all.push(detectModelPatterns(ctx.file, ctx.newSql, input.rubric)) @@ -1138,6 +1224,9 @@ export async function runReview(input: OrchestrateInput): Promise f.category === "pii_exposure" && f.evidence?.tool === "schema.detect_pii").map((f) => f.file), + ) // Per file, the concatenated rule names core's AST lint actually emitted — // so we suppress a regex twin only when core genuinely covered that concern. const coreRulesByFile = new Map() @@ -1160,13 +1249,43 @@ export async function runReview(input: OrchestrateInput): Promise { - if (!coreRanFiles.has(f.file)) return true const tool = f.evidence?.tool + const fallbackRule = String((f.evidence?.result as any)?.rule ?? "").replace(/_/g, "-") + // Dedup the coarse regex PII twins against the precise deterministic PII + // detectors — but only when a better detector genuinely COVERS the same + // column. Suppressing whenever the classifier merely "ran" hid real PII + // (a high-risk column core's classifier missed but the regex would catch, + // especially on tiers without the broad `pii_exposure` lane). + if ( + (tool === "dbt-patterns" || tool === "rule-catalog") && + f.category === "pii_exposure" && + (fallbackRule === "pii-into-mart" || fallbackRule === "select-pii-columns") + ) { + // The broad `schema.detect_pii` lane already flagged this file → redundant. + if (schemaPiiFiles.has(f.file)) return false + if (fallbackRule === "pii-into-mart") { + // `pii_into_mart` carries the matched column in `result.line`: suppress + // only when core's classifier actually CONSIDERED that column (flagged or + // deliberately muted). A column core never classified falls through. + const token = PII_TOKEN_RE.exec(String((f.evidence?.result as any)?.line ?? ""))?.[1]?.toLowerCase() + if (token && classifiedPiiColumnsByFile.get(f.file)?.has(token)) return false + // No recoverable column: defer to the prior file-level behavior so the + // twin isn't double-reported when core ran for the file. + if (!token && diffScopedPiiCompletedFiles.has(f.file)) return false + return true + } + // `select-pii-columns` doesn't preserve its column. Suppress only when a + // precise detector actually EMITTED a PII finding for the file (so the + // exposure is reported by the better source, not silently dropped). + if (diffScopedPiiFindingFiles.has(f.file)) return false + return true + } + if (!coreRanFiles.has(f.file)) return true if (tool !== "dbt-patterns" && tool !== "rule-catalog") return true // Structural twins: the check code survives on evidence.result.rule (ruleKey // is stripped by the zod schema). Suppress ONLY if core actually emitted the // equivalent rule for this file (else the issue would fall through). - const code = String((f.evidence?.result as any)?.rule ?? "").replace(/_/g, "-") + const code = fallbackRule const coreMatcher = CORE_AST_COVERED[code] if (coreMatcher && coreMatcher.test(coreRulesByFile.get(f.file) ?? "")) return false // Portability twins: drop a regex finding ONLY IF it is itself a portability diff --git a/packages/opencode/src/altimate/review/runner.ts b/packages/opencode/src/altimate/review/runner.ts index 1ffda22e6..f26b6add6 100644 --- a/packages/opencode/src/altimate/review/runner.ts +++ b/packages/opencode/src/altimate/review/runner.ts @@ -470,12 +470,13 @@ export function createDispatcherRunner(opts: DispatcherRunnerOptions): ReviewRun } }, - async equivalence(oldSql: string, newSql: string): Promise { + async equivalence(oldSql: string, newSql: string, dialect?: string): Promise { try { const schema = await resolveSchema() const res = await Dispatcher.call("altimate_core.equivalence", { sql1: oldSql, sql2: newSql, + dialect, schema_context: schema, }) const data = (res.data ?? {}) as Record diff --git a/packages/opencode/src/cli/cmd/review.ts b/packages/opencode/src/cli/cmd/review.ts index f3bd9094f..c271dbe5f 100644 --- a/packages/opencode/src/cli/cmd/review.ts +++ b/packages/opencode/src/cli/cmd/review.ts @@ -58,7 +58,11 @@ export const ReviewCommand = cmd({ manifestPath: args.manifest as string | undefined, mode: args.mode as ReviewMode | undefined, severityThreshold: args.severity as Severity | undefined, - noAi: args.ai === false, + // The flag is registered as `--no-ai`, so yargs sets `args.noAi`. No `ai` + // option is declared, so `--ai=false` is NOT a supported CLI flag; the + // `args.ai === false` check only covers programmatic callers that pass + // `ai: false` directly. + noAi: args.noAi === true || args.ai === false, }) if (args.output) await fs.writeFile(args.output as string, JSON.stringify(env, null, 2)) diff --git a/packages/opencode/test/altimate/data-diff-duckdb-e2e.test.ts b/packages/opencode/test/altimate/data-diff-duckdb-e2e.test.ts new file mode 100644 index 000000000..dfde505a5 --- /dev/null +++ b/packages/opencode/test/altimate/data-diff-duckdb-e2e.test.ts @@ -0,0 +1,94 @@ +import { afterAll, beforeAll, describe, expect, test } from "bun:test" + +import { runDataDiff } from "../../src/altimate/native/connections/data-diff" +import * as Registry from "../../src/altimate/native/connections/registry" + +const RUN = process.env.ALTIMATE_RUN_WAREHOUSE_E2E === "1" +const e2eTest = RUN ? test : test.skip + +describe("data.diff DuckDB e2e", () => { + beforeAll(() => { + process.env.ALTIMATE_TELEMETRY_DISABLED = "true" + }) + + afterAll(() => { + delete process.env.ALTIMATE_TELEMETRY_DISABLED + Registry.reset() + }) + + e2eTest("detects value and row deltas through a real DuckDB warehouse", async () => { + Registry.reset() + Registry.setConfigs({ duck_e2e: { type: "duckdb", path: ":memory:" } }) + const conn = await Registry.get("duck_e2e") + + await conn.execute("CREATE TABLE base_orders (order_id INTEGER, amount INTEGER)") + await conn.execute("CREATE TABLE head_orders (order_id INTEGER, amount INTEGER)") + await conn.execute("INSERT INTO base_orders VALUES (1, 100), (2, 200)") + await conn.execute("INSERT INTO head_orders VALUES (1, 100), (2, 250), (3, 300)") + + const result = await runDataDiff({ + source: "base_orders", + target: "head_orders", + key_columns: ["order_id"], + extra_columns: ["amount"], + source_warehouse: "duck_e2e", + target_warehouse: "duck_e2e", + algorithm: "joindiff", + }) + + expect(result.success).toBe(true) + expect((result.outcome as any)?.mode).toBe("diff") + expect((result.outcome as any)?.stats?.exclusive_table2).toBe(1) + expect((result.outcome as any)?.stats?.updated).toBe(1) + }) + + e2eTest("does not report differences for identical DuckDB tables", async () => { + Registry.reset() + Registry.setConfigs({ duck_e2e: { type: "duckdb", path: ":memory:" } }) + const conn = await Registry.get("duck_e2e") + + await conn.execute("CREATE TABLE base_orders (order_id INTEGER, amount INTEGER)") + await conn.execute("CREATE TABLE head_orders (order_id INTEGER, amount INTEGER)") + await conn.execute("INSERT INTO base_orders VALUES (1, 100), (2, 200)") + await conn.execute("INSERT INTO head_orders VALUES (1, 100), (2, 200)") + + const result = await runDataDiff({ + source: "base_orders", + target: "head_orders", + key_columns: ["order_id"], + extra_columns: ["amount"], + source_warehouse: "duck_e2e", + target_warehouse: "duck_e2e", + algorithm: "joindiff", + }) + + expect(result.success).toBe(true) + expect((result.outcome as any)?.stats?.exclusive_table1).toBe(0) + expect((result.outcome as any)?.stats?.exclusive_table2).toBe(0) + expect((result.outcome as any)?.stats?.updated).toBe(0) + }) + + e2eTest("auto-discovers comparable columns when extra_columns is omitted", async () => { + Registry.reset() + Registry.setConfigs({ duck_e2e: { type: "duckdb", path: ":memory:" } }) + const conn = await Registry.get("duck_e2e") + + await conn.execute("CREATE TABLE base_orders (order_id INTEGER, amount INTEGER, updated_at TIMESTAMP)") + await conn.execute("CREATE TABLE head_orders (order_id INTEGER, amount INTEGER, updated_at TIMESTAMP)") + await conn.execute("INSERT INTO base_orders VALUES (1, 100, TIMESTAMP '2026-01-01 00:00:00'), (2, 200, TIMESTAMP '2026-01-01 00:00:00')") + await conn.execute("INSERT INTO head_orders VALUES (1, 100, TIMESTAMP '2026-01-02 00:00:00'), (2, 250, TIMESTAMP '2026-01-02 00:00:00')") + + const result = await runDataDiff({ + source: "base_orders", + target: "head_orders", + key_columns: ["order_id"], + source_warehouse: "duck_e2e", + target_warehouse: "duck_e2e", + algorithm: "joindiff", + }) + + expect(result.success).toBe(true) + expect((result.outcome as any)?.stats?.updated).toBe(1) + expect(result.excluded_audit_columns).toContain("updated_at") + }) +}) diff --git a/packages/opencode/test/altimate/review-runner.test.ts b/packages/opencode/test/altimate/review-runner.test.ts index f055149fb..40e38c8ad 100644 --- a/packages/opencode/test/altimate/review-runner.test.ts +++ b/packages/opencode/test/altimate/review-runner.test.ts @@ -1,9 +1,17 @@ -import { describe, expect, test } from "bun:test" +import { afterEach, describe, expect, spyOn, test } from "bun:test" import { writeFileSync } from "node:fs" import path from "node:path" import { createDispatcherRunner } from "../../src/altimate/review/runner" +import { Dispatcher } from "../../src/altimate/native" import { tmpdir } from "../fixture/fixture" +let dispatcherSpy: ReturnType | undefined + +afterEach(() => { + dispatcherSpy?.mockRestore() + dispatcherSpy = undefined +}) + describe("review manifest loading", () => { test("loads a valid manifest without initializing the native dispatcher", async () => { await using tmp = await tmpdir() @@ -36,4 +44,47 @@ describe("review manifest loading", () => { testCount: 0, }) }) + + test("threads adapter dialect into core equivalence", async () => { + await using tmp = await tmpdir() + const manifestPath = path.join(tmp.path, "manifest.json") + writeFileSync( + manifestPath, + JSON.stringify({ + metadata: { adapter_type: "duckdb" }, + nodes: { + "model.demo.orders": { + resource_type: "model", + name: "orders", + original_file_path: "models/orders.sql", + config: { materialized: "table" }, + depends_on: { nodes: [] }, + columns: { id: { name: "id", data_type: "integer" } }, + }, + }, + sources: {}, + }), + ) + + let seenParams: any + dispatcherSpy = spyOn(Dispatcher, "call").mockImplementation((async (method: string, params: any) => { + expect(method).toBe("altimate_core.equivalence") + seenParams = params + return { + success: true, + data: { + equivalent: true, + confidence: 0.95, + differences: [], + validation_errors: [], + }, + } + }) as any) + + const runner = createDispatcherRunner({ manifestPath }) + const result = await runner.equivalence("select id from orders", "select id from orders", "duckdb") + + expect(result).toEqual({ decided: true, equivalent: true, differences: [], confidence: "high" }) + expect(seenParams.dialect).toBe("duckdb") + }) }) diff --git a/packages/opencode/test/altimate/review.test.ts b/packages/opencode/test/altimate/review.test.ts index 0faf6a191..0e94f0ff4 100644 --- a/packages/opencode/test/altimate/review.test.ts +++ b/packages/opencode/test/altimate/review.test.ts @@ -569,6 +569,130 @@ describe("orchestrate", () => { ).toBe(true) }) + test("core SC010 join-key regression maps to critical join_risk and blocks", async () => { + const oldSql = ` + select * + from orders + left join customers + on orders.customer_id = customers.customer_id + ` + const newSql = ` + select * + from orders + left join customers + on orders.order_id = customers.customer_id + ` + let sawBase = "" + let sawHead = "" + const runner: ReviewRunner = { + ...fakeRunner({}), + async structuralDiff(baseSql, headSql) { + sawBase = baseSql + sawHead = headSql + return [ + { + code: "SC010", + rule: "join_key_regression", + severity: "error", + message: + "A join key changed from matching the same identifier stem to `orders.order_id = customers.customer_id`.", + }, + ] + }, + } + const env = await runReview({ + changedFiles: [ + { + path: "models/marts/fct_customer_orders.sql", + status: "modified", + diff: "+on orders.order_id = customers.customer_id\n-on orders.customer_id = customers.customer_id\n", + }, + ], + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["semantic_change"], ai: false }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: async (_f, side) => (side === "new" ? newSql : oldSql), + getCompiled: async (_f, side) => (side === "new" ? newSql : oldSql), + generatedAt: "2026-06-08T00:00:00Z", + }) + expect(sawBase).toBe(oldSql) + expect(sawHead).toBe(newSql) + const f = env.findings.find( + (x) => x.evidence?.tool === "altimate_core.structural_diff" && (x.evidence?.result as any)?.code === "SC010", + ) + expect(f).toBeDefined() + expect(f!.severity).toBe("critical") + expect(f!.category).toBe("join_risk") + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("core structural diff quiet on CTE rename and bridge join countercases", async () => { + const runner: ReviewRunner = { + ...fakeRunner({}), + async structuralDiff() { + return [] + }, + } + const safeEnv = await runReview({ + changedFiles: [ + { + path: "models/marts/fct_customer_orders.sql", + status: "modified", + diff: + "+on order_records.customer_id = customer_records.customer_id\n" + + "-on orders.customer_id = customers.customer_id\n", + }, + ], + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["semantic_change"], ai: false }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: async (_f, side) => + side === "new" + ? "select * from order_records left join customer_records on order_records.customer_id = customer_records.customer_id" + : "select * from orders left join customers on orders.customer_id = customers.customer_id", + getCompiled: async (_f, side) => + side === "new" + ? "select * from order_records left join customer_records on order_records.customer_id = customer_records.customer_id" + : "select * from orders left join customers on orders.customer_id = customers.customer_id", + generatedAt: "2026-06-08T00:00:00Z", + }) + expect( + safeEnv.findings.some( + (x) => x.evidence?.tool === "altimate_core.structural_diff" && (x.evidence?.result as any)?.code === "SC010", + ), + ).toBe(false) + + const bridgeEnv = await runReview({ + changedFiles: [ + { + path: "models/marts/fct_order_items.sql", + status: "modified", + diff: "+on orders.order_id = order_items.order_id\n-on orders.customer_id = customers.customer_id\n", + }, + ], + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["semantic_change"], ai: false }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: async (_f, side) => + side === "new" + ? "select * from orders left join order_items on orders.order_id = order_items.order_id" + : "select * from orders left join customers on orders.customer_id = customers.customer_id", + getCompiled: async (_f, side) => + side === "new" + ? "select * from orders left join order_items on orders.order_id = order_items.order_id" + : "select * from orders left join customers on orders.customer_id = customers.customer_id", + generatedAt: "2026-06-08T00:00:00Z", + }) + expect( + bridgeEnv.findings.some( + (x) => x.evidence?.tool === "altimate_core.structural_diff" && (x.evidence?.result as any)?.code === "SC010", + ), + ).toBe(false) + }) + test("core L033 portability → regex portability twin suppressed, idempotency concern survives", async () => { const sql = "select getdate() as loaded_at from {{ ref('a') }}" const files: ChangedFile[] = [{ path: "models/marts/m.sql", status: "modified", diff: "+ , getdate() as loaded_at\n" }] @@ -1209,6 +1333,433 @@ describe("orchestrate", () => { expect(env.verdict).toBe("REQUEST_CHANGES") }) + test("diff-scoped core PII is the single blocker for newly introduced mart email", async () => { + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ email\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["email"] } + }, + async columnLineage(sql) { + return sql.includes("email") + ? [ + { source: "customers.customer_name", target: "customer_name" }, + { source: "customers.email", target: "email" }, + ] + : [{ source: "customers.customer_name", target: "customer_name" }] + }, + async classifyPii(columns) { + return columns.map((column) => ({ + column, + classification: "Email", + confidence: 0.95, + masking: "'***MASKED***'", + })) + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_name, email from customers", "select customer_name from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect(pii).toHaveLength(1) + expect(pii[0].severity).toBe("critical") + expect(pii[0].evidence?.tool).toBe("altimate_core.classify_pii") + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("broad PII detector remains fallback for existing output columns", async () => { + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ lower(email) as email\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["email"] } + }, + async columnLineage(sql) { + return sql.includes("email") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.email", target: "email" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii(columns) { + return columns.map((column) => ({ + column, + classification: "Email", + confidence: 0.95, + masking: "'***MASKED***'", + })) + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, lower(email) as email from customers", "select customer_id, email from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect(pii).toHaveLength(1) + expect(pii[0].evidence?.tool).toBe("schema.detect_pii") + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("PII fallback survives core lineage failure instead of aborting review", async () => { + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ email\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["email"] } + }, + async columnLineage() { + throw new Error("lineage parser crashed") + }, + async classifyPii(columns) { + return columns.map((column) => ({ + column, + classification: "Email", + confidence: 0.95, + })) + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, email from customers", "select customer_id from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect(pii).toHaveLength(1) + expect(pii[0].evidence?.tool).toBe("schema.detect_pii") + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("PII fallback survives core classification failure instead of hiding risk", async () => { + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ email\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["email"] } + }, + async columnLineage(sql) { + return sql.includes("email") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.email", target: "email" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii() { + throw new Error("classifier unavailable") + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, email from customers", "select customer_id from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect(pii).toHaveLength(1) + expect(pii[0].evidence?.tool).toBe("schema.detect_pii") + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("case-variant PII detector output does not duplicate core diff-scoped finding", async () => { + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ Email as EMAIL\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["EMAIL", "email"] } + }, + async columnLineage(sql) { + return sql.toLowerCase().includes("email") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.email", target: "EMAIL" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii(columns) { + return columns.map((column) => ({ + column, + classification: "Email", + confidence: 0.95, + })) + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, Email as EMAIL from customers", "select customer_id from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect(pii).toHaveLength(1) + expect(pii[0].evidence?.tool).toBe("altimate_core.classify_pii") + }) + + test("low-confidence name PII does not surface or fall back to regex PII comments", async () => { + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ first_name\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["first_name"] } + }, + async columnLineage(sql) { + return sql.includes("first_name") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.first_name", target: "first_name" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii(columns) { + return columns.map((column) => ({ + column, + classification: "Name", + confidence: 0.85, + })) + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, first_name from customers", "select customer_id from customers"), + }) + expect(env.findings.some((f) => f.category === "pii_exposure")).toBe(false) + }) + + test("diff-scoped PII recall: a column core's classifier MISSED still surfaces via the broad fallback", async () => { + // Regression guard: when core `classify_pii` returns NOTHING for an introduced + // column (it missed it), that column must remain eligible for the broad + // `schema.detect_pii` detector — it must not be silently dropped just because + // the diff-scoped lane ran for the file. + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ ssn\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["ssn"] } + }, + async columnLineage(sql) { + return sql.includes("ssn") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.ssn", target: "ssn" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii() { + return [] + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["pii_exposure"] }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, ssn from customers", "select customer_id from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect(pii).toHaveLength(1) + expect(pii[0].evidence?.tool).toBe("schema.detect_pii") + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("core-missed high-risk PII still caught by the regex twin when the broad PII lane is off", async () => { + // Lower tiers (trivial/lite) run `dbt_patterns` but not `pii_exposure`. If core + // classification ran but missed a high-risk column, the regex twin is the only + // safety net — it must NOT be suppressed for a column core never classified. + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ ssn\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["ssn"] } + }, + async columnLineage(sql) { + return sql.includes("ssn") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.ssn", target: "ssn" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii() { + return [] + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["dbt_patterns"] }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, ssn from customers", "select customer_id from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect( + pii.some((f) => f.evidence?.tool === "dbt-patterns" && (f.evidence?.result as any)?.rule === "pii_into_mart"), + ).toBe(true) + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("zero-target lineage does not mark the file PII-complete, so the regex twin survives", async () => { + // Regression guard: when `columnLineage` RESOLVES but yields no output targets + // (the parser couldn't derive columns — uncertainty, not "no new PII"), the + // diff-scoped lane must not flag the file as completed. Otherwise the file + // enters the PII-completed/classified sets and a deterministic regex twin could + // be suppressed for a column core never actually classified. + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ ssn\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["ssn"] } + }, + async columnLineage() { + // Lineage resolves but derives zero output columns for every input. + return [] + }, + async classifyPii() { + return [] + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["dbt_patterns"] }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, ssn from customers", "select customer_id from customers"), + }) + const pii = env.findings.filter((f) => f.category === "pii_exposure") + expect( + pii.some((f) => f.evidence?.tool === "dbt-patterns" && (f.evidence?.result as any)?.rule === "pii_into_mart"), + ).toBe(true) + expect(env.verdict).toBe("REQUEST_CHANGES") + }) + + test("core considered the matched column (muted as low-confidence) → the regex twin is suppressed", async () => { + // Inverse of the recall guards above: suppression must be COLUMN-aware, not + // merely "the classifier ran". Here core's `classify_pii` DID consider `ssn` + // but muted it (confidence below threshold → no diff-scoped finding). Because + // the column was considered, the coarse `pii_into_mart` regex twin for the + // same column is redundant and must be suppressed — otherwise the same column + // is double-reported. This exercises the token-gated suppression branch + // (`classifiedPiiColumnsByFile.has(token)`), distinct from the "never + // classified → twin survives" path. + const files: ChangedFile[] = [{ path: "models/marts/dim_customers.sql", status: "modified", diff: "+ ssn\n" }] + const runner: ReviewRunner = { + ...fakeRunner({}), + async detectPii() { + return { columns: ["ssn"] } + }, + async columnLineage(sql) { + return sql.includes("ssn") + ? [ + { source: "customers.customer_id", target: "customer_id" }, + { source: "customers.ssn", target: "ssn" }, + ] + : [{ source: "customers.customer_id", target: "customer_id" }] + }, + async classifyPii() { + // Considered `ssn` but is not confident → muted, no diff-scoped finding, + // yet `ssn` still lands in the classified-columns set. + return [{ column: "ssn", classification: "SSN", confidence: 0.1 }] + }, + } + const env = await runReview({ + changedFiles: files, + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["dbt_patterns"] }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: content("select customer_id, ssn from customers", "select customer_id from customers"), + }) + const piiIntoMart = env.findings.filter( + (f) => f.evidence?.tool === "dbt-patterns" && (f.evidence?.result as any)?.rule === "pii_into_mart", + ) + expect(piiIntoMart).toHaveLength(0) + }) + + test("structural error severity blocks only for join_risk; other rules stay advisory", async () => { + // SC003 group_by_change is `semantic_change`, not `join_risk`. Even at core + // `severity: "error"` it must NOT be escalated to critical — only join-key + // regressions are allowed to hard-block. + const oldSql = "select g, sum(x) as x from t group by g" + const newSql = "select g, h, sum(x) as x from t group by g, h" + const runner: ReviewRunner = { + ...fakeRunner({}), + async structuralDiff() { + return [{ code: "SC003", rule: "group_by_change", severity: "error", message: "grain changed" }] + }, + } + const env = await runReview({ + changedFiles: [{ path: "models/marts/fct_orders.sql", status: "modified", diff: "+group by g, h\n" }], + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["semantic_change"], ai: false }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: async (_f, side) => (side === "new" ? newSql : oldSql), + getCompiled: async (_f, side) => (side === "new" ? newSql : oldSql), + generatedAt: "2026-06-08T00:00:00Z", + }) + const f = env.findings.find( + (x) => x.evidence?.tool === "altimate_core.structural_diff" && (x.evidence?.result as any)?.code === "SC003", + ) + expect(f).toBeDefined() + expect(f!.severity).toBe("warning") + expect(f!.category).toBe("semantic_change") + }) + + test("join_key_regression below error severity stays advisory (warning, not critical)", async () => { + const oldSql = "select * from orders left join customers on orders.customer_id = customers.customer_id" + const newSql = "select * from orders left join customers on orders.order_id = customers.customer_id" + const runner: ReviewRunner = { + ...fakeRunner({}), + async structuralDiff() { + return [{ code: "SC010", rule: "join_key_regression", severity: "warning", message: "join key changed" }] + }, + } + const env = await runReview({ + changedFiles: [ + { + path: "models/marts/fct_customer_orders.sql", + status: "modified", + diff: "+on orders.order_id = customers.customer_id\n", + }, + ], + config: { ...DEFAULT_REVIEW_CONFIG, reviewers: ["semantic_change"], ai: false }, + rubric: DEFAULT_RUBRIC, + mode: "gate", + runner, + getContent: async (_f, side) => (side === "new" ? newSql : oldSql), + getCompiled: async (_f, side) => (side === "new" ? newSql : oldSql), + generatedAt: "2026-06-08T00:00:00Z", + }) + const f = env.findings.find( + (x) => x.evidence?.tool === "altimate_core.structural_diff" && (x.evidence?.result as any)?.code === "SC010", + ) + expect(f).toBeDefined() + expect(f!.severity).toBe("warning") + expect(f!.category).toBe("join_risk") + }) + test("clean change with no manifest → degraded, APPROVE/COMMENT, lint-only labeled", async () => { const files: ChangedFile[] = [{ path: "models/staging/stg_x.sql", status: "modified", diff: "+select 1\n" }] const runner: ReviewRunner = {