From dd454538ae9b2fe3922dc3eb6e883e8b36852e71 Mon Sep 17 00:00:00 2001 From: truffle Date: Sun, 10 May 2026 03:12:46 +0000 Subject: [PATCH] fix(memory): dedup exact-triple semantic facts on store SemanticStore.store has only ever guarded duplicates via findContradictions, which intentionally excludes same-object matches (semantic.ts:131). Repeated extractions of the same user message produce a fresh randomUUID per call (consolidation.ts:114, 132), so identical (subject, predicate, object) facts accumulate as distinct points across sessions. Adds findExactDuplicate as a scroll-by-payload check on subject + predicate + object + is_null:valid_until before upsert. When a currently-valid fact already encodes the same triple, return its id and skip the upsert. Closes #125 --- src/memory/__tests__/semantic.test.ts | 143 ++++++++++++++++++++++++++ src/memory/semantic.ts | 31 ++++++ 2 files changed, 174 insertions(+) diff --git a/src/memory/__tests__/semantic.test.ts b/src/memory/__tests__/semantic.test.ts index f2f555dc..6116208a 100644 --- a/src/memory/__tests__/semantic.test.ts +++ b/src/memory/__tests__/semantic.test.ts @@ -272,4 +272,147 @@ describe("SemanticStore", () => { expect(payload.valid_until).toBeDefined(); expect(typeof payload.valid_until).toBe("number"); }); + + test("store() returns existing id and skips upsert when an exact duplicate is already stored", async () => { + const vec = make768dVector(); + let upsertCalled = false; + let scrollFilter: Record | null = null; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/api/embed")) { + return Promise.resolve(new Response(JSON.stringify({ embeddings: [vec] }), { status: 200 })); + } + + if (urlStr.includes("/points/scroll")) { + if (init?.body) { + const body = JSON.parse(init.body as string) as Record; + scrollFilter = body.filter as Record; + } + return Promise.resolve( + new Response( + JSON.stringify({ + result: { + points: [ + { + id: "fact-existing", + payload: { + subject: "staging server", + predicate: "runs on", + object: "port 3001", + natural_language: "The staging server runs on port 3001", + valid_from: Date.now(), + valid_until: null, + confidence: 0.9, + category: "domain_knowledge", + version: 1, + }, + }, + ], + }, + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + } + + if (urlStr.includes("/points?") && init?.method === "PUT") { + upsertCalled = true; + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new SemanticStore(qdrant, embedder, TEST_CONFIG); + + const fact = makeTestFact({ id: "fact-fresh-uuid" }); + const id = await store.store(fact); + + expect(id).toBe("fact-existing"); + expect(upsertCalled).toBe(false); + expect(scrollFilter).not.toBeNull(); + const must = (scrollFilter as unknown as { must?: Array> })?.must ?? []; + const matchKeys = must + .filter((c) => "match" in c) + .map((c) => (c as { key: string }).key) + .sort(); + expect(matchKeys).toEqual(["object", "predicate", "subject"]); + }); + + test("store() proceeds with upsert when scroll returns no duplicate", async () => { + const vec = make768dVector(); + let upsertCalled = false; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/api/embed")) { + return Promise.resolve(new Response(JSON.stringify({ embeddings: [vec] }), { status: 200 })); + } + + if (urlStr.includes("/points/scroll")) { + return Promise.resolve(new Response(JSON.stringify({ result: { points: [] } }), { status: 200 })); + } + + if (urlStr.includes("/points/query")) { + return Promise.resolve(new Response(JSON.stringify({ result: { points: [] } }), { status: 200 })); + } + + if (urlStr.includes("/points?") && init?.method === "PUT") { + upsertCalled = true; + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new SemanticStore(qdrant, embedder, TEST_CONFIG); + + const fact = makeTestFact({ id: "fact-fresh-uuid" }); + const id = await store.store(fact); + + expect(id).toBe("fact-fresh-uuid"); + expect(upsertCalled).toBe(true); + }); + + test("findExactDuplicate() filters scroll on subject + predicate + object and excludes invalidated facts", async () => { + let scrollFilter: Record | null = null; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/points/scroll")) { + if (init?.body) { + const body = JSON.parse(init.body as string) as Record; + scrollFilter = body.filter as Record; + } + return Promise.resolve(new Response(JSON.stringify({ result: { points: [] } }), { status: 200 })); + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new SemanticStore(qdrant, embedder, TEST_CONFIG); + + const result = await store.findExactDuplicate(makeTestFact()); + + expect(result).toBeNull(); + expect(scrollFilter).not.toBeNull(); + const must = (scrollFilter as unknown as { must?: Array> })?.must ?? []; + const subjectClause = must.find((c) => (c as { key?: string }).key === "subject"); + const predicateClause = must.find((c) => (c as { key?: string }).key === "predicate"); + const objectClause = must.find((c) => (c as { key?: string }).key === "object"); + const validClause = must.find((c) => "is_null" in c); + + expect(subjectClause).toMatchObject({ match: { value: "staging server" } }); + expect(predicateClause).toMatchObject({ match: { value: "runs on" } }); + expect(objectClause).toMatchObject({ match: { value: "port 3001" } }); + expect(validClause).toMatchObject({ is_null: { key: "valid_until" } }); + }); }); diff --git a/src/memory/semantic.ts b/src/memory/semantic.ts index e332acbf..17f1bafa 100644 --- a/src/memory/semantic.ts +++ b/src/memory/semantic.ts @@ -48,6 +48,15 @@ export class SemanticStore { } async store(fact: SemanticFact): Promise { + // If a currently-valid fact with the same subject + predicate + object already exists, + // return its id without inserting a duplicate row. Repeated extractions of the same user + // message would otherwise accumulate as separate points (each with a fresh randomUUID), + // since findContradictions intentionally ignores same-object matches. + const duplicate = await this.findExactDuplicate(fact); + if (duplicate) { + return duplicate.id; + } + // Check for contradictions before storing const contradictions = await this.findContradictions(fact); @@ -108,6 +117,28 @@ export class SemanticStore { return results.filter((r) => r.score >= minScore).map((r) => this.payloadToFact(r)); } + /** + * Look up a currently-valid fact that already encodes the same (subject, predicate, object) + * triple. Returns the existing fact when one is present so callers can avoid inserting a + * second row, and `null` otherwise. + */ + async findExactDuplicate(fact: SemanticFact): Promise { + const { points } = await this.qdrant.scroll(this.collectionName, { + limit: 1, + filter: { + must: [ + { key: "subject", match: { value: fact.subject } }, + { key: "predicate", match: { value: fact.predicate } }, + { key: "object", match: { value: fact.object } }, + { is_null: { key: "valid_until" } }, + ], + }, + withPayload: true, + }); + if (points.length === 0) return null; + return this.payloadToFact(points[0]); + } + async findContradictions(newFact: SemanticFact): Promise { // Search for facts with the same subject and predicate const queryText = `${newFact.subject} ${newFact.predicate}`;