From bad98c4a76d930fca8b8fd986fe054bb2cae7fa3 Mon Sep 17 00:00:00 2001 From: buky Date: Mon, 18 May 2026 21:30:59 +0200 Subject: [PATCH] fix(db): wrap SET LOCAL hnsw.ef_search in $transaction (Phase 0e) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same root cause as Phase 0a (PR #97): a SET LOCAL Postgres session variable and the subsequent query were issued as separate Prisma calls. With pool connections, the SET LOCAL can land on one connection and the SELECT on another — the ef_search tuning silently reverts to the server default before the vector search runs. Phase 0a fixed this for the org-context session variable in withOrgContext. Phase 0e applies the same $transaction wrap to all three SET LOCAL hnsw.ef_search call sites surfaced during STEP 1 audit: 1. src/lib/knowledge/search.ts:201 (searchKnowledgeBase) - Hot path: 6+ production callers (KB chat/eval API routes, kb-search-handler runtime, MCP tool exposure, rag-inject, codebase-rag SDLC). - Dynamic ef_search (40 | 60 | 100) based on query word count. - Vector search on KBChunk. 2. src/lib/memory/hot-cold-tier.ts:113 (getColdMemories) - No production callers in src/ today, but exported and reachable dynamically. Patched for consistency with the other two sites so the pattern is uniform across the codebase. - ef_search = 40 (hardcoded). - Vector search on AgentMemory. 3. src/lib/runtime/handlers/memory-read-handler.ts:168 (memoryReadHandler vector search path) - Runtime handler for the memory_read node type. Triggered for every agent run that includes a memory_read node. - ef_search = 40 (hardcoded). - Vector search on AgentMemory. PLAN-V2 §4.5 named only search.ts; the audit instruction in that section ('Apply same pattern to other SET LOCAL callers if discovered during STEP 1 audit') is honored here with all three sites in one PR. FORENSIC HAL-9 covers the same finding. Tests: - 63 existing tests still pass (search.test.ts utility tests: 30; hot-cold-tier.test.ts: 16; memory-read-handler.test.ts: 17). - 6 new regression-guard tests added in two new files: src/lib/knowledge/__tests__/search-vector-shape.test.ts (3) src/lib/runtime/handlers/__tests__/memory-read-handler-vector-shape.test.ts (3) - hot-cold-tier.test.ts mock extended with $transaction delegating to a tx mock; existing assertions on $executeRawUnsafe still hold via tx-aliased fn references. - Each shape test asserts: (a) $transaction is invoked, (b) SET LOCAL runs on the tx, not the outer prisma, (c) callback receives tx, (d) outer prisma's direct exec/query paths are NOT used. - Mocked tests verify SHAPE of the fix. They cannot verify pool-survival semantics; that needs a real Postgres integration harness (TODO comments left for Phase 1+). Risk: - search.ts is on the KB hot path. $transaction holds a pool connection for the duration of the callback (one SET LOCAL + one SELECT, both microsecond-millisecond). Should not materially affect pool exhaustion under load; verify in staging if concerned. - hot-cold-tier.ts getColdMemories has no production callers, so functional risk is zero. - memory-read-handler.ts vector search is per-agent-run when memory_read node fires. Similar transaction profile to search.ts. Out of scope (intentional): - Migrating $executeRawUnsafe -> $executeRaw on hot-cold-tier.ts and memory-read-handler.ts. Both keep the existing Unsafe variant; SET LOCAL takes no parameters, no SQL injection vector. Refactoring to safer APIs is a separate workstream. Verification: - vitest run (search.test.ts + search-vector-shape.test.ts + hot-cold-tier.test.ts + memory-read-handler.test.ts + memory-read-handler-vector-shape.test.ts): 69 passed - tsc --noEmit -p tsconfig.json: exit 0 Refs: - skill-rls-rollout-PLAN-V2.md §4.5 (Phase 0e bundled bug fix) - skill-rls-rollout-FORENSIC-REPORT.md HAL-9 - PR #97 (Phase 0a — same fix pattern for withOrgContext) - PR #98 (docs/rls-tech-debt.md — tracking) --- .../__tests__/search-vector-shape.test.ts | 139 +++++++++++++++ src/lib/knowledge/search.ts | 41 +++-- .../memory/__tests__/hot-cold-tier.test.ts | 39 ++++- src/lib/memory/hot-cold-tier.ts | 58 ++++--- .../memory-read-handler-vector-shape.test.ts | 163 ++++++++++++++++++ .../runtime/handlers/memory-read-handler.ts | 56 +++--- 6 files changed, 421 insertions(+), 75 deletions(-) create mode 100644 src/lib/knowledge/__tests__/search-vector-shape.test.ts create mode 100644 src/lib/runtime/handlers/__tests__/memory-read-handler-vector-shape.test.ts diff --git a/src/lib/knowledge/__tests__/search-vector-shape.test.ts b/src/lib/knowledge/__tests__/search-vector-shape.test.ts new file mode 100644 index 00000000..0255b2d9 --- /dev/null +++ b/src/lib/knowledge/__tests__/search-vector-shape.test.ts @@ -0,0 +1,139 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +// --------------------------------------------------------------------------- +// Phase 0e regression-guard tests for searchKnowledgeBase +// --------------------------------------------------------------------------- +// +// Background: searchKnowledgeBase previously ran two separate Prisma calls — +// await prisma.$executeRaw(`SET LOCAL hnsw.ef_search = …`) +// await prisma.$queryRaw(...) +// — which Prisma could distribute across two pool connections, silently +// reverting the ef_search tuning before the SELECT. +// +// Phase 0e patched this to: +// await prisma.$transaction(async (tx) => { +// await tx.$executeRaw(`SET LOCAL hnsw.ef_search = …`) +// return tx.$queryRaw(...) +// }) +// +// These tests verify the SHAPE of the fix: +// - $transaction is used (not direct prisma.$executeRaw) +// - SET LOCAL runs on the tx (not the outer client) +// - the SELECT runs on the same tx +// +// They cannot verify pool-survival semantics — that needs a real Postgres +// integration harness (TODO for Phase 1+). + +vi.mock("@/lib/prisma", () => { + // Factory must be self-contained — vitest hoists vi.mock to top of file, + // so we cannot reference top-level variables here. State is exposed via + // helpers below that read from the mocked module after import. + const tx = { + $executeRaw: vi.fn(), + $queryRaw: vi.fn(), + }; + return { + prisma: { + knowledgeBase: { + findUnique: vi.fn(), + }, + $transaction: vi.fn( + async (fn: (tx: typeof tx) => Promise) => fn(tx), + ), + // Legacy direct paths — must NOT be used by the patched implementation + $executeRaw: vi.fn(), + $queryRaw: vi.fn(), + _tx: tx, + }, + }; +}); + +vi.mock("@/lib/logger", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +vi.mock("@/lib/observability/metrics", () => ({ + recordMetric: vi.fn(), +})); + +vi.mock("../embeddings", () => ({ + generateEmbedding: vi.fn(async () => new Array(1536).fill(0.1)), +})); + +vi.mock("../embedding-cache", () => ({ + getCachedQueryEmbedding: vi.fn(async () => null), + setCachedQueryEmbedding: vi.fn(async () => undefined), +})); + +import { searchKnowledgeBase } from "../search"; +import { prisma } from "@/lib/prisma"; + +// Pull mocked instances back out for assertions +const mockPrisma = prisma as unknown as { + $transaction: ReturnType; + $executeRaw: ReturnType; + $queryRaw: ReturnType; + _tx: { + $executeRaw: ReturnType; + $queryRaw: ReturnType; + }; +}; + +beforeEach(() => { + vi.clearAllMocks(); + mockPrisma._tx.$queryRaw.mockResolvedValue([]); +}); + +describe("searchKnowledgeBase — Phase 0e $transaction shape", () => { + it("opens a $transaction (does not run SET LOCAL on the outer prisma)", async () => { + await searchKnowledgeBase("kb-1", "hello world", 5); + + expect(mockPrisma.$transaction).toHaveBeenCalledOnce(); + + // The pre-Phase-0e bug shape (SET LOCAL on outer prisma) must not appear. + expect(mockPrisma.$executeRaw).not.toHaveBeenCalled(); + expect(mockPrisma.$queryRaw).not.toHaveBeenCalled(); + }); + + it("runs SET LOCAL hnsw.ef_search on the tx, before the SELECT", async () => { + const callOrder: string[] = []; + mockPrisma._tx.$executeRaw.mockImplementation(async () => { + callOrder.push("set_local"); + return 0; + }); + mockPrisma._tx.$queryRaw.mockImplementation(async () => { + callOrder.push("select"); + return []; + }); + + await searchKnowledgeBase("kb-1", "hello world", 5); + + expect(callOrder).toEqual(["set_local", "select"]); + expect(mockPrisma._tx.$executeRaw).toHaveBeenCalledOnce(); + expect(mockPrisma._tx.$queryRaw).toHaveBeenCalledOnce(); + }); + + it("returns whatever tx.$queryRaw returns, mapped through the result shape", async () => { + mockPrisma._tx.$queryRaw.mockResolvedValue([ + { + id: "chunk-A", + content: "result content", + similarity: 0.91, + sourceId: "src-1", + sourceName: "Doc", + sourceType: "pdf", + metadata: null, + }, + ]); + + const result = await searchKnowledgeBase("kb-1", "hello world", 5); + + expect(result).toHaveLength(1); + expect(result[0].chunkId).toBe("chunk-A"); + expect(result[0].similarity).toBe(0.91); + }); +}); + +// TODO(rls-phase-1): integration test against a real Postgres harness to +// verify ef_search actually survives onto the SELECT query — mocked tests +// only verify call shape, not the pool-survival semantics being fixed. diff --git a/src/lib/knowledge/search.ts b/src/lib/knowledge/search.ts index f7eeaa08..771dbd63 100644 --- a/src/lib/knowledge/search.ts +++ b/src/lib/knowledge/search.ts @@ -198,24 +198,31 @@ export async function searchKnowledgeBase( // ($1 syntax is rejected with "syntax error at or near $1"). // efSearch is always 40 | 60 | 100 — a computed integer, never user input — so // Prisma.raw() injection is safe here. - await prisma.$executeRaw(Prisma.sql`SET LOCAL hnsw.ef_search = ${Prisma.raw(String(efSearch))}`); - + // + // $transaction wrapper: SET LOCAL only persists for the lifetime of the + // current transaction. Without $transaction, Prisma's pool may run the + // SET LOCAL on one connection and the SELECT on another — the ef_search + // tuning would silently revert to the server default. Pin both on the + // same connection by wrapping in a single transaction. const searchStart = performance.now(); - const results = await prisma.$queryRaw( - Prisma.sql` - SELECT - c."id", c."content", - 1 - (c."embedding" <=> ${vectorStr}::vector) as similarity, - c."sourceId", s."name" as "sourceName", s."type" as "sourceType", c."metadata" - FROM "KBChunk" c - INNER JOIN "KBSource" s ON c."sourceId" = s."id" - WHERE s."knowledgeBaseId" = ${knowledgeBaseId} - AND s."status" = 'READY' - AND c."embedding" IS NOT NULL - ORDER BY c."embedding" <=> ${vectorStr}::vector - LIMIT ${topK} - ` - ); + const results = await prisma.$transaction(async (tx) => { + await tx.$executeRaw(Prisma.sql`SET LOCAL hnsw.ef_search = ${Prisma.raw(String(efSearch))}`); + return tx.$queryRaw( + Prisma.sql` + SELECT + c."id", c."content", + 1 - (c."embedding" <=> ${vectorStr}::vector) as similarity, + c."sourceId", s."name" as "sourceName", s."type" as "sourceType", c."metadata" + FROM "KBChunk" c + INNER JOIN "KBSource" s ON c."sourceId" = s."id" + WHERE s."knowledgeBaseId" = ${knowledgeBaseId} + AND s."status" = 'READY' + AND c."embedding" IS NOT NULL + ORDER BY c."embedding" <=> ${vectorStr}::vector + LIMIT ${topK} + ` + ); + }); const searchDurationMs = performance.now() - searchStart; recordMetric("kb.search.vector_query_ms", searchDurationMs, "ms", { knowledgeBaseId, diff --git a/src/lib/memory/__tests__/hot-cold-tier.test.ts b/src/lib/memory/__tests__/hot-cold-tier.test.ts index 88dd15e5..919c958d 100644 --- a/src/lib/memory/__tests__/hot-cold-tier.test.ts +++ b/src/lib/memory/__tests__/hot-cold-tier.test.ts @@ -1,15 +1,34 @@ import { describe, it, expect, vi, beforeEach } from "vitest"; // Mock prisma before importing module -vi.mock("@/lib/prisma", () => ({ - prisma: { - agentMemory: { - findMany: vi.fn(), - }, +vi.mock("@/lib/prisma", () => { + // Tx mock — receives the same shape we expect Prisma's TransactionClient + // to expose for the SET LOCAL + raw query pair in getColdMemories. + const tx = { $executeRawUnsafe: vi.fn(), $queryRawUnsafe: vi.fn(), - }, -})); + }; + return { + prisma: { + agentMemory: { + findMany: vi.fn(), + }, + // Legacy $executeRawUnsafe / $queryRawUnsafe references on the outer + // client are kept so tests that assert on them keep working. The new + // $transaction route is used by getColdMemories under the hot-cold-tier + // patch (Phase 0e) — the tx mock is what callbacks actually receive. + $executeRawUnsafe: tx.$executeRawUnsafe, + $queryRawUnsafe: tx.$queryRawUnsafe, + $transaction: vi.fn(async (fn: (tx: typeof tx_) => Promise) => + fn(tx as never), + ), + _tx: tx, + }, + }; +}); + +// Helper alias for the tx callback signature above +type tx_ = { $executeRawUnsafe: typeof vi.fn; $queryRawUnsafe: typeof vi.fn }; vi.mock("@/lib/logger", () => ({ logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, @@ -35,8 +54,14 @@ import { import type { RuntimeContext } from "@/lib/runtime/types"; const mockFindMany = prisma.agentMemory.findMany as ReturnType; +// For getColdMemories: the patched (Phase 0e) implementation runs +// SET LOCAL and the raw SELECT inside prisma.$transaction(...), so they're +// invoked on the TX client, not the outer prisma. The mock above wires +// the tx's $executeRawUnsafe/$queryRawUnsafe to be the same fn references +// exposed on the outer prisma, so these aliases assert on either path. const mockQueryRaw = prisma.$queryRawUnsafe as ReturnType; const mockExecRaw = prisma.$executeRawUnsafe as ReturnType; +const mockTransaction = (prisma as unknown as { $transaction: ReturnType }).$transaction; function makeMemory(overrides: Record = {}) { return { diff --git a/src/lib/memory/hot-cold-tier.ts b/src/lib/memory/hot-cold-tier.ts index 8082988e..1b6e79ac 100644 --- a/src/lib/memory/hot-cold-tier.ts +++ b/src/lib/memory/hot-cold-tier.ts @@ -110,32 +110,38 @@ export async function getColdMemories( const vectorStr = `[${embedding.join(",")}]`; - await prisma.$executeRawUnsafe("SET LOCAL hnsw.ef_search = 40"); - - const results = await prisma.$queryRawUnsafe< - Array<{ - id: string; - key: string; - value: unknown; - category: string; - importance: number; - accessCount: number; - accessedAt: Date; - createdAt: Date; - similarity: number; - }> - >( - `SELECT id, key, value, category, importance, "accessCount", "accessedAt", "createdAt", - 1 - (embedding <=> $1::vector) as similarity - FROM "AgentMemory" - WHERE "agentId" = $2 - AND embedding IS NOT NULL - ORDER BY embedding <=> $1::vector - LIMIT $3`, - vectorStr, - agentId, - topK, - ); + // $transaction wrapper: SET LOCAL only persists for the lifetime of the + // current transaction. Without $transaction, Prisma's pool may run the + // SET LOCAL on one connection and the SELECT on another — the ef_search + // tuning would silently revert to the server default. Pin both on the + // same connection by wrapping in a single transaction. + const results = await prisma.$transaction(async (tx) => { + await tx.$executeRawUnsafe("SET LOCAL hnsw.ef_search = 40"); + return tx.$queryRawUnsafe< + Array<{ + id: string; + key: string; + value: unknown; + category: string; + importance: number; + accessCount: number; + accessedAt: Date; + createdAt: Date; + similarity: number; + }> + >( + `SELECT id, key, value, category, importance, "accessCount", "accessedAt", "createdAt", + 1 - (embedding <=> $1::vector) as similarity + FROM "AgentMemory" + WHERE "agentId" = $2 + AND embedding IS NOT NULL + ORDER BY embedding <=> $1::vector + LIMIT $3`, + vectorStr, + agentId, + topK, + ); + }); return results.filter((r) => r.similarity > 0.3); } catch (error) { diff --git a/src/lib/runtime/handlers/__tests__/memory-read-handler-vector-shape.test.ts b/src/lib/runtime/handlers/__tests__/memory-read-handler-vector-shape.test.ts new file mode 100644 index 00000000..e5c63e90 --- /dev/null +++ b/src/lib/runtime/handlers/__tests__/memory-read-handler-vector-shape.test.ts @@ -0,0 +1,163 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +// --------------------------------------------------------------------------- +// Phase 0e regression-guard tests for memoryReadHandler vector search path +// --------------------------------------------------------------------------- +// +// memoryReadHandler in "search" mode runs a SET LOCAL + raw SELECT against +// AgentMemory. Before Phase 0e these were two separate Prisma calls; the +// patch wraps them in $transaction so the ef_search session var pins to the +// same pool connection as the SELECT. +// +// These tests verify the SHAPE — $transaction is called, SET LOCAL runs on +// the tx, the SELECT runs on the same tx. They cannot verify pool-survival +// semantics (TODO: real Postgres integration for Phase 1+). +// +// The existing test file (memory-read-handler.test.ts) exercises the +// fallback path that runs when embedding generation fails; it does not +// touch the SET LOCAL line. This file complements it. + +vi.mock("@/lib/prisma", () => { + const tx = { + $executeRawUnsafe: vi.fn(), + $queryRawUnsafe: vi.fn(), + }; + return { + prisma: { + agentMemory: { + findUnique: vi.fn(), + findMany: vi.fn(), + update: vi.fn(), + updateMany: vi.fn().mockResolvedValue({ count: 0 }), + }, + $transaction: vi.fn( + async (fn: (tx: typeof tx) => Promise) => fn(tx), + ), + // Legacy direct paths — must NOT be used by the patched implementation + $executeRawUnsafe: vi.fn(), + $queryRawUnsafe: vi.fn(), + _tx: tx, + }, + }; +}); + +vi.mock("@/lib/logger", () => ({ + logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, +})); + +vi.mock("@/lib/ai", () => ({ + getEmbeddingModel: vi.fn(() => "mock-embedding-model"), +})); + +vi.mock("ai", () => ({ + embed: vi.fn(async () => ({ + embedding: new Array(1536).fill(0.1), + })), +})); + +import { memoryReadHandler } from "../memory-read-handler"; +import { prisma } from "@/lib/prisma"; +import type { RuntimeContext } from "../../types"; +import type { FlowNode } from "@/types"; + +const mockPrisma = prisma as unknown as { + $transaction: ReturnType; + $executeRawUnsafe: ReturnType; + $queryRawUnsafe: ReturnType; + agentMemory: { + updateMany: ReturnType; + }; + _tx: { + $executeRawUnsafe: ReturnType; + $queryRawUnsafe: ReturnType; + }; +}; + +function makeNode(overrides: Partial = {}): FlowNode { + return { + id: "mem-r-1", + type: "memory_read", + position: { x: 0, y: 0 }, + data: { + label: "Memory Read", + mode: "search", + key: "", + category: "", + searchQuery: "find user notes", + outputVariable: "memory_result", + topK: 5, + ...overrides, + }, + }; +} + +function makeContext(overrides: Partial = {}): RuntimeContext { + return { + agentId: "agent-1", + conversationId: "conv-1", + variables: {}, + messageHistory: [], + flowContent: { nodes: [], edges: [], variables: [] }, + currentNodeId: null, + isNewConversation: false, + ...overrides, + }; +} + +beforeEach(() => { + vi.clearAllMocks(); + mockPrisma._tx.$queryRawUnsafe.mockResolvedValue([]); + mockPrisma.agentMemory.updateMany.mockResolvedValue({ count: 0 }); +}); + +describe("memoryReadHandler search mode — Phase 0e $transaction shape", () => { + it("opens a $transaction (does not run SET LOCAL on the outer prisma)", async () => { + await memoryReadHandler(makeNode(), makeContext()); + + expect(mockPrisma.$transaction).toHaveBeenCalledOnce(); + + // Pre-Phase-0e shape — must not appear + expect(mockPrisma.$executeRawUnsafe).not.toHaveBeenCalled(); + expect(mockPrisma.$queryRawUnsafe).not.toHaveBeenCalled(); + }); + + it("runs SET LOCAL hnsw.ef_search on the tx before the SELECT", async () => { + const callOrder: string[] = []; + mockPrisma._tx.$executeRawUnsafe.mockImplementation(async () => { + callOrder.push("set_local"); + return 0; + }); + mockPrisma._tx.$queryRawUnsafe.mockImplementation(async () => { + callOrder.push("select"); + return []; + }); + + await memoryReadHandler(makeNode(), makeContext()); + + expect(callOrder).toEqual(["set_local", "select"]); + expect(mockPrisma._tx.$executeRawUnsafe).toHaveBeenCalledWith( + "SET LOCAL hnsw.ef_search = 40", + ); + }); + + it("returns memory results from the tx.$queryRawUnsafe call", async () => { + mockPrisma._tx.$queryRawUnsafe.mockResolvedValue([ + { + id: "mem-A", + key: "user_pref", + value: "dark mode", + category: "general", + importance: 0.7, + similarity: 0.85, + }, + ]); + + const result = await memoryReadHandler(makeNode(), makeContext()); + + // Result should reach updatedVariables.memory_result + expect(result.updatedVariables?.memory_result).toBeDefined(); + }); +}); + +// TODO(rls-phase-1): integration test against a real Postgres harness to +// verify ef_search actually survives onto the SELECT query. diff --git a/src/lib/runtime/handlers/memory-read-handler.ts b/src/lib/runtime/handlers/memory-read-handler.ts index 987a1899..1232c1ac 100644 --- a/src/lib/runtime/handlers/memory-read-handler.ts +++ b/src/lib/runtime/handlers/memory-read-handler.ts @@ -164,31 +164,37 @@ export const memoryReadHandler: NodeHandler = async (node, context) => { const vectorStr = `[${embedding.join(",")}]`; - // Set HNSW ef_search for memory lookups (short lookups, speed-optimized) - await prisma.$executeRawUnsafe(`SET LOCAL hnsw.ef_search = 40`); - - // Cosine similarity search (accelerated by HNSW index) - const memories = await prisma.$queryRawUnsafe< - Array<{ - id: string; - key: string; - value: unknown; - category: string; - importance: number; - similarity: number; - }> - >( - `SELECT id, key, value, category, importance, - 1 - (embedding <=> $1::vector) as similarity - FROM "AgentMemory" - WHERE "agentId" = $2 - AND embedding IS NOT NULL - ORDER BY embedding <=> $1::vector - LIMIT $3`, - vectorStr, - context.agentId, - topK - ); + // Set HNSW ef_search for memory lookups (short lookups, speed-optimized). + // $transaction wrapper: SET LOCAL only persists for the lifetime of the + // current transaction. Without $transaction, Prisma's pool may run the + // SET LOCAL on one connection and the SELECT on another — the ef_search + // tuning would silently revert to the server default. Pin both on the + // same connection by wrapping in a single transaction. + const memories = await prisma.$transaction(async (tx) => { + await tx.$executeRawUnsafe(`SET LOCAL hnsw.ef_search = 40`); + // Cosine similarity search (accelerated by HNSW index) + return tx.$queryRawUnsafe< + Array<{ + id: string; + key: string; + value: unknown; + category: string; + importance: number; + similarity: number; + }> + >( + `SELECT id, key, value, category, importance, + 1 - (embedding <=> $1::vector) as similarity + FROM "AgentMemory" + WHERE "agentId" = $2 + AND embedding IS NOT NULL + ORDER BY embedding <=> $1::vector + LIMIT $3`, + vectorStr, + context.agentId, + topK + ); + }); // Update access tracking if (memories.length > 0) {