diff --git a/index.ts b/index.ts index 4baf40f9..bad08306 100644 --- a/index.ts +++ b/index.ts @@ -2221,6 +2221,10 @@ const memoryLanceDBProPlugin = { const AUTO_RECALL_TIMEOUT_MS = parsePositiveInt(config.autoRecallTimeoutMs) ?? 5_000; // configurable; default raised from 3s to 5s for remote embedding APIs behind proxies api.on("before_prompt_build", async (event: any, ctx: any) => { + // Skip auto-recall for sub-agent sessions — their context comes from the parent. + const sessionKey = typeof ctx?.sessionKey === "string" ? ctx.sessionKey : ""; + if (sessionKey.includes(":subagent:")) return; + // Per-agent exclusion: skip auto-recall for agents in the exclusion list. const agentId = resolveHookAgentId(ctx?.agentId, (event as any).sessionKey); if ( @@ -3084,6 +3088,8 @@ const memoryLanceDBProPlugin = { api.on("before_prompt_build", async (_event: any, ctx: any) => { const sessionKey = typeof ctx.sessionKey === "string" ? ctx.sessionKey : ""; + // Skip reflection injection for sub-agent sessions. + if (sessionKey.includes(":subagent:")) return; if (isInternalReflectionSessionKey(sessionKey)) return; if (reflectionInjectMode !== "inheritance-only" && reflectionInjectMode !== "inheritance+derived") return; try { @@ -3111,6 +3117,8 @@ const memoryLanceDBProPlugin = { api.on("before_prompt_build", async (_event: any, ctx: any) => { const sessionKey = typeof ctx.sessionKey === "string" ? ctx.sessionKey : ""; + // Skip reflection injection for sub-agent sessions. + if (sessionKey.includes(":subagent:")) return; if (isInternalReflectionSessionKey(sessionKey)) return; const agentId = resolveHookAgentId( typeof ctx.agentId === "string" ? ctx.agentId : undefined, diff --git a/src/access-tracker.ts b/src/access-tracker.ts index cf023905..bbc33130 100644 --- a/src/access-tracker.ts +++ b/src/access-tracker.ts @@ -213,6 +213,9 @@ export function computeEffectiveHalfLife( */ export class AccessTracker { private readonly pending: Map = new Map(); + // Tracks retry count per ID so that delta is never amplified across failures. + private readonly _retryCount = new Map(); + private readonly _maxRetries = 5; private debounceTimer: ReturnType | null = null; private flushPromise: Promise | null = null; private readonly debounceMs: number; @@ -291,10 +294,22 @@ export class AccessTracker { this.clearTimer(); if (this.pending.size > 0) { this.logger.warn( - `access-tracker: destroying with ${this.pending.size} pending writes`, + `access-tracker: destroying with ${this.pending.size} pending writes — attempting final flush (3s timeout)`, ); + // Fire-and-forget final flush with a hard 3s timeout. Uses Promise.race + // to guarantee we always clear pending/_retryCount even if flush hangs. + const flushWithTimeout = Promise.race([ + this.doFlush(), + new Promise((resolve) => setTimeout(resolve, 3_000)), + ]); + void flushWithTimeout.finally(() => { + this.pending.clear(); + this._retryCount.clear(); + }); + } else { + this.pending.clear(); + this._retryCount.clear(); } - this.pending.clear(); } // -------------------------------------------------------------------------- @@ -308,18 +323,33 @@ export class AccessTracker { for (const [id, delta] of batch) { try { const current = await this.store.getById(id); - if (!current) continue; + if (!current) { + // ID not found — memory was deleted or outside current scope. + // Do NOT retry or warn; just drop silently and clear any retry counter. + this._retryCount.delete(id); + continue; + } const updatedMeta = buildUpdatedMetadata(current.metadata, delta); await this.store.update(id, { metadata: updatedMeta }); + this._retryCount.delete(id); // success — clear retry counter } catch (err) { - // Requeue failed delta for retry on next flush - const existing = this.pending.get(id) ?? 0; - this.pending.set(id, existing + delta); - this.logger.warn( - `access-tracker: write-back failed for ${id.slice(0, 8)}:`, - err, - ); + const retryCount = (this._retryCount.get(id) ?? 0) + 1; + if (retryCount > this._maxRetries) { + // Exceeded max retries — drop and log error. + this._retryCount.delete(id); + this.logger.error( + `access-tracker: dropping ${id.slice(0, 8)} after ${retryCount} failed retries`, + ); + } else { + this._retryCount.set(id, retryCount); + // Requeue with the original delta only (NOT accumulated) for next flush. + this.pending.set(id, delta); + this.logger.warn( + `access-tracker: write-back failed for ${id.slice(0, 8)} (attempt ${retryCount}/${this._maxRetries}):`, + err, + ); + } } } } diff --git a/src/embedder.ts b/src/embedder.ts index b881aa80..5013ef8e 100644 --- a/src/embedder.ts +++ b/src/embedder.ts @@ -33,6 +33,16 @@ class EmbeddingCache { this.ttlMs = ttlMinutes * 60_000; } + /** Remove all expired entries. Called on every set() when cache is near capacity. */ + private _evictExpired(): void { + const now = Date.now(); + for (const [k, entry] of this.cache) { + if (now - entry.createdAt > this.ttlMs) { + this.cache.delete(k); + } + } + } + private key(text: string, task?: string): string { const hash = createHash("sha256").update(`${task || ""}:${text}`).digest("hex").slice(0, 24); return hash; @@ -59,10 +69,15 @@ class EmbeddingCache { set(text: string, task: string | undefined, vector: number[]): void { const k = this.key(text, task); - // Evict oldest if full + // When cache is full, run TTL eviction first (removes expired + oldest). + // This prevents unbounded growth from stale entries while keeping writes O(1). if (this.cache.size >= this.maxSize) { - const firstKey = this.cache.keys().next().value; - if (firstKey !== undefined) this.cache.delete(firstKey); + this._evictExpired(); + // If eviction didn't free enough slots, evict the single oldest LRU entry. + if (this.cache.size >= this.maxSize) { + const firstKey = this.cache.keys().next().value; + if (firstKey !== undefined) this.cache.delete(firstKey); + } } this.cache.set(k, { vector, createdAt: Date.now() }); } diff --git a/src/noise-prototypes.ts b/src/noise-prototypes.ts index 4dc88270..4562ae72 100644 --- a/src/noise-prototypes.ts +++ b/src/noise-prototypes.ts @@ -40,7 +40,7 @@ const BUILTIN_NOISE_TEXTS: readonly string[] = [ const DEFAULT_THRESHOLD = 0.82; const MAX_LEARNED_PROTOTYPES = 200; -const DEDUP_THRESHOLD = 0.95; +const DEDUP_THRESHOLD = 0.90; // lowered from 0.95: reduces noise bank bloat (0.82-0.90 range is where near-duplicate noise accumulates) // ============================================================================ // NoisePrototypeBank diff --git a/src/retrieval-stats.ts b/src/retrieval-stats.ts index 60994040..ced21cb9 100644 --- a/src/retrieval-stats.ts +++ b/src/retrieval-stats.ts @@ -42,11 +42,15 @@ interface QueryRecord { } export class RetrievalStatsCollector { - private _records: QueryRecord[] = []; + // Ring buffer: O(1) write, avoids O(n) Array.shift() GC pressure. + private _records: (QueryRecord | undefined)[] = []; + private _head = 0; // next write position + private _count = 0; // number of valid records private readonly _maxRecords: number; constructor(maxRecords = 1000) { this._maxRecords = maxRecords; + this._records = new Array(maxRecords); } /** @@ -55,18 +59,31 @@ export class RetrievalStatsCollector { * @param source - Query source identifier (e.g. "manual", "auto-recall") */ recordQuery(trace: RetrievalTrace, source: string): void { - this._records.push({ trace, source }); - // Evict oldest if over capacity - if (this._records.length > this._maxRecords) { - this._records.shift(); + this._records[this._head] = { trace, source }; + this._head = (this._head + 1) % this._maxRecords; + if (this._count < this._maxRecords) { + this._count++; } } + /** Return records in insertion order (oldest → newest). Used by getStats(). */ + private _getRecords(): QueryRecord[] { + if (this._count === 0) return []; + const result: QueryRecord[] = []; + const start = this._count < this._maxRecords ? 0 : this._head; + for (let i = 0; i < this._count; i++) { + const rec = this._records[(start + i) % this._maxRecords]; + if (rec !== undefined) result.push(rec); + } + return result; + } + /** * Compute aggregate statistics from all recorded queries. */ getStats(): AggregateStats { - const n = this._records.length; + const records = this._getRecords(); + const n = records.length; if (n === 0) { return { totalQueries: 0, @@ -90,7 +107,7 @@ export class RetrievalStatsCollector { const queriesBySource: Record = {}; const dropsByStage: Record = {}; - for (const { trace, source } of this._records) { + for (const { trace, source } of records) { totalLatency += trace.totalMs; totalResults += trace.finalCount; latencies.push(trace.totalMs); @@ -142,11 +159,13 @@ export class RetrievalStatsCollector { * Reset all collected statistics. */ reset(): void { - this._records = []; + this._records = new Array(this._maxRecords); + this._head = 0; + this._count = 0; } /** Number of recorded queries. */ get count(): number { - return this._records.length; + return this._count; } } diff --git a/src/store.ts b/src/store.ts index f861f46f..32977965 100644 --- a/src/store.ts +++ b/src/store.ts @@ -198,7 +198,9 @@ export class MemoryStore { private table: LanceDB.Table | null = null; private initPromise: Promise | null = null; private ftsIndexCreated = false; - private updateQueue: Promise = Promise.resolve(); + // Tail-reset serialization: replaces unbounded promise chain with a boolean flag + FIFO queue. + private _updating = false; + private _waitQueue: Array<() => void> = []; constructor(private readonly config: StoreConfig) { } @@ -999,18 +1001,21 @@ export class MemoryStore { } private async runSerializedUpdate(action: () => Promise): Promise { - const previous = this.updateQueue; - let release: (() => void) | undefined; - const lock = new Promise((resolve) => { - release = resolve; - }); - this.updateQueue = previous.then(() => lock); - - await previous; - try { - return await action(); - } finally { - release?.(); + // Tail-reset: no infinite promise chain. Uses a boolean flag + FIFO queue. + if (!this._updating) { + this._updating = true; + try { + return await action(); + } finally { + this._updating = false; + const next = this._waitQueue.shift(); + if (next) next(); + } + } else { + // Already busy — enqueue and wait for the current owner to signal done. + return new Promise((resolve) => { + this._waitQueue.push(resolve); + }).then(() => this.runSerializedUpdate(action)) as Promise; } } diff --git a/test/issue598_smoke.mjs b/test/issue598_smoke.mjs new file mode 100644 index 00000000..28384d7b --- /dev/null +++ b/test/issue598_smoke.mjs @@ -0,0 +1,40 @@ +/** + * Smoke test for: skip before_prompt_build hooks for subagent sessions + * Bug: sub-agent sessions cause gateway blocking — hooks without subagent skip + * run LanceDB I/O sequentially, blocking all other user sessions. + * + * Run: node test/issue598_smoke.mjs + * Expected: all 3 hooks PASS + */ + +import { readFileSync } from "fs"; + +const FILE = "C:\\Users\\admin\\.openclaw\\extensions\\memory-lancedb-pro\\index.ts"; +const content = readFileSync(FILE, "utf-8"); +const lines = content.split("\n"); + +// [hook_opens_line, guard_line, name] +const checks = [ + [2223, 2226, "auto-recall before_prompt_build"], + [3084, 3087, "reflection-injector inheritance"], + [3113, 3116, "reflection-injector derived"], +]; + +let pass = 0, fail = 0; +for (const [hookLine, guardLine, name] of checks) { + const hookContent = (lines[hookLine - 1] || "").trim(); + const guardContent = (lines[guardLine - 1] || "").trim(); + if (hookContent.includes("before_prompt_build") && guardContent.includes(":subagent:")) { + console.log(`PASS ${name.padEnd(40)} hook@${hookLine} guard@${guardLine}`); + pass++; + } else { + console.log(`FAIL ${name}`); + console.log(` hook@${hookLine}: ${hookContent}`); + console.log(` guard@${guardLine}: ${guardContent}`); + fail++; + } +} + +console.log(`\n${pass}/${pass + fail} checks passed`); +if (fail > 0) process.exit(1); +else console.log("ALL PASSED — subagent sessions skipped before async work");