Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2221,6 +2221,10 @@ const memoryLanceDBProPlugin = {

const AUTO_RECALL_TIMEOUT_MS = parsePositiveInt(config.autoRecallTimeoutMs) ?? 5_000; // configurable; default raised from 3s to 5s for remote embedding APIs behind proxies
api.on("before_prompt_build", async (event: any, ctx: any) => {
// Skip auto-recall for sub-agent sessions — their context comes from the parent.
const sessionKey = typeof ctx?.sessionKey === "string" ? ctx.sessionKey : "";
if (sessionKey.includes(":subagent:")) return;

// Per-agent exclusion: skip auto-recall for agents in the exclusion list.
const agentId = resolveHookAgentId(ctx?.agentId, (event as any).sessionKey);
if (
Expand Down Expand Up @@ -3084,6 +3088,8 @@ const memoryLanceDBProPlugin = {

api.on("before_prompt_build", async (_event: any, ctx: any) => {
const sessionKey = typeof ctx.sessionKey === "string" ? ctx.sessionKey : "";
// Skip reflection injection for sub-agent sessions.
if (sessionKey.includes(":subagent:")) return;
if (isInternalReflectionSessionKey(sessionKey)) return;
if (reflectionInjectMode !== "inheritance-only" && reflectionInjectMode !== "inheritance+derived") return;
try {
Expand Down Expand Up @@ -3111,6 +3117,8 @@ const memoryLanceDBProPlugin = {

api.on("before_prompt_build", async (_event: any, ctx: any) => {
const sessionKey = typeof ctx.sessionKey === "string" ? ctx.sessionKey : "";
// Skip reflection injection for sub-agent sessions.
if (sessionKey.includes(":subagent:")) return;
if (isInternalReflectionSessionKey(sessionKey)) return;
const agentId = resolveHookAgentId(
typeof ctx.agentId === "string" ? ctx.agentId : undefined,
Expand Down
50 changes: 40 additions & 10 deletions src/access-tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ export function computeEffectiveHalfLife(
*/
export class AccessTracker {
private readonly pending: Map<string, number> = new Map();
// Tracks retry count per ID so that delta is never amplified across failures.
private readonly _retryCount = new Map<string, number>();
private readonly _maxRetries = 5;
private debounceTimer: ReturnType<typeof setTimeout> | null = null;
private flushPromise: Promise<void> | null = null;
private readonly debounceMs: number;
Expand Down Expand Up @@ -291,10 +294,22 @@ export class AccessTracker {
this.clearTimer();
if (this.pending.size > 0) {
this.logger.warn(
`access-tracker: destroying with ${this.pending.size} pending writes`,
`access-tracker: destroying with ${this.pending.size} pending writes — attempting final flush (3s timeout)`,
);
// Fire-and-forget final flush with a hard 3s timeout. Uses Promise.race
// to guarantee we always clear pending/_retryCount even if flush hangs.
const flushWithTimeout = Promise.race([
this.doFlush(),
new Promise<void>((resolve) => setTimeout(resolve, 3_000)),
]);
void flushWithTimeout.finally(() => {
this.pending.clear();
this._retryCount.clear();
});
} else {
this.pending.clear();
this._retryCount.clear();
}
this.pending.clear();
}

// --------------------------------------------------------------------------
Expand All @@ -308,18 +323,33 @@ export class AccessTracker {
for (const [id, delta] of batch) {
try {
const current = await this.store.getById(id);
if (!current) continue;
if (!current) {
// ID not found — memory was deleted or outside current scope.
// Do NOT retry or warn; just drop silently and clear any retry counter.
this._retryCount.delete(id);
continue;
}

const updatedMeta = buildUpdatedMetadata(current.metadata, delta);
await this.store.update(id, { metadata: updatedMeta });
this._retryCount.delete(id); // success — clear retry counter
} catch (err) {
// Requeue failed delta for retry on next flush
const existing = this.pending.get(id) ?? 0;
this.pending.set(id, existing + delta);
this.logger.warn(
`access-tracker: write-back failed for ${id.slice(0, 8)}:`,
err,
);
const retryCount = (this._retryCount.get(id) ?? 0) + 1;
if (retryCount > this._maxRetries) {
// Exceeded max retries — drop and log error.
this._retryCount.delete(id);
this.logger.error(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Guard optional logger.error before calling it

The AccessTrackerOptions logger contract only requires warn (with optional info), but this new retry-drop branch calls this.logger.error(...) unconditionally. Any caller that provides the documented minimal logger will throw at runtime once retries exceed _maxRetries, converting a handled write-back failure into an unexpected flush failure.

Useful? React with 👍 / 👎.

`access-tracker: dropping ${id.slice(0, 8)} after ${retryCount} failed retries`,
);
} else {
this._retryCount.set(id, retryCount);
// Requeue with the original delta only (NOT accumulated) for next flush.
this.pending.set(id, delta);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Merge retry delta with newly recorded accesses

When doFlush() retries a failed ID, it now does this.pending.set(id, delta), which overwrites any fresh accesses that were recorded for the same ID while the flush was in flight. In a transient store failure scenario (slow/failing getById or update plus concurrent recordAccess() calls), this drops real access events and undercounts reinforcement metadata. Requeueing should add to the current pending value instead of replacing it.

Useful? React with 👍 / 👎.

this.logger.warn(
`access-tracker: write-back failed for ${id.slice(0, 8)} (attempt ${retryCount}/${this._maxRetries}):`,
err,
);
}
}
}
}
Expand Down
21 changes: 18 additions & 3 deletions src/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ class EmbeddingCache {
this.ttlMs = ttlMinutes * 60_000;
}

/** Remove all expired entries. Called on every set() when cache is near capacity. */
private _evictExpired(): void {
const now = Date.now();
for (const [k, entry] of this.cache) {
if (now - entry.createdAt > this.ttlMs) {
this.cache.delete(k);
}
}
}

private key(text: string, task?: string): string {
const hash = createHash("sha256").update(`${task || ""}:${text}`).digest("hex").slice(0, 24);
return hash;
Expand All @@ -59,10 +69,15 @@ class EmbeddingCache {

set(text: string, task: string | undefined, vector: number[]): void {
const k = this.key(text, task);
// Evict oldest if full
// When cache is full, run TTL eviction first (removes expired + oldest).
// This prevents unbounded growth from stale entries while keeping writes O(1).
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
if (firstKey !== undefined) this.cache.delete(firstKey);
this._evictExpired();
// If eviction didn't free enough slots, evict the single oldest LRU entry.
if (this.cache.size >= this.maxSize) {
const firstKey = this.cache.keys().next().value;
if (firstKey !== undefined) this.cache.delete(firstKey);
}
}
this.cache.set(k, { vector, createdAt: Date.now() });
}
Expand Down
2 changes: 1 addition & 1 deletion src/noise-prototypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const BUILTIN_NOISE_TEXTS: readonly string[] = [

const DEFAULT_THRESHOLD = 0.82;
const MAX_LEARNED_PROTOTYPES = 200;
const DEDUP_THRESHOLD = 0.95;
const DEDUP_THRESHOLD = 0.90; // lowered from 0.95: reduces noise bank bloat (0.82-0.90 range is where near-duplicate noise accumulates)

// ============================================================================
// NoisePrototypeBank
Expand Down
37 changes: 28 additions & 9 deletions src/retrieval-stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ interface QueryRecord {
}

export class RetrievalStatsCollector {
private _records: QueryRecord[] = [];
// Ring buffer: O(1) write, avoids O(n) Array.shift() GC pressure.
private _records: (QueryRecord | undefined)[] = [];
private _head = 0; // next write position
private _count = 0; // number of valid records
private readonly _maxRecords: number;

constructor(maxRecords = 1000) {
this._maxRecords = maxRecords;
this._records = new Array(maxRecords);
}

/**
Expand All @@ -55,18 +59,31 @@ export class RetrievalStatsCollector {
* @param source - Query source identifier (e.g. "manual", "auto-recall")
*/
recordQuery(trace: RetrievalTrace, source: string): void {
this._records.push({ trace, source });
// Evict oldest if over capacity
if (this._records.length > this._maxRecords) {
this._records.shift();
this._records[this._head] = { trace, source };
this._head = (this._head + 1) % this._maxRecords;
if (this._count < this._maxRecords) {
this._count++;
}
}

/** Return records in insertion order (oldest → newest). Used by getStats(). */
private _getRecords(): QueryRecord[] {
if (this._count === 0) return [];
const result: QueryRecord[] = [];
const start = this._count < this._maxRecords ? 0 : this._head;
for (let i = 0; i < this._count; i++) {
const rec = this._records[(start + i) % this._maxRecords];
if (rec !== undefined) result.push(rec);
}
return result;
}

/**
* Compute aggregate statistics from all recorded queries.
*/
getStats(): AggregateStats {
const n = this._records.length;
const records = this._getRecords();
const n = records.length;
if (n === 0) {
return {
totalQueries: 0,
Expand All @@ -90,7 +107,7 @@ export class RetrievalStatsCollector {
const queriesBySource: Record<string, number> = {};
const dropsByStage: Record<string, number> = {};

for (const { trace, source } of this._records) {
for (const { trace, source } of records) {
totalLatency += trace.totalMs;
totalResults += trace.finalCount;
latencies.push(trace.totalMs);
Expand Down Expand Up @@ -142,11 +159,13 @@ export class RetrievalStatsCollector {
* Reset all collected statistics.
*/
reset(): void {
this._records = [];
this._records = new Array(this._maxRecords);
this._head = 0;
this._count = 0;
}

/** Number of recorded queries. */
get count(): number {
return this._records.length;
return this._count;
}
}
31 changes: 18 additions & 13 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ export class MemoryStore {
private table: LanceDB.Table | null = null;
private initPromise: Promise<void> | null = null;
private ftsIndexCreated = false;
private updateQueue: Promise<void> = Promise.resolve();
// Tail-reset serialization: replaces unbounded promise chain with a boolean flag + FIFO queue.
private _updating = false;
private _waitQueue: Array<() => void> = [];

constructor(private readonly config: StoreConfig) { }

Expand Down Expand Up @@ -999,18 +1001,21 @@ export class MemoryStore {
}

private async runSerializedUpdate<T>(action: () => Promise<T>): Promise<T> {
const previous = this.updateQueue;
let release: (() => void) | undefined;
const lock = new Promise<void>((resolve) => {
release = resolve;
});
this.updateQueue = previous.then(() => lock);

await previous;
try {
return await action();
} finally {
release?.();
// Tail-reset: no infinite promise chain. Uses a boolean flag + FIFO queue.
if (!this._updating) {
this._updating = true;
try {
return await action();
} finally {
this._updating = false;
const next = this._waitQueue.shift();
if (next) next();
}
} else {
// Already busy — enqueue and wait for the current owner to signal done.
return new Promise<void>((resolve) => {
this._waitQueue.push(resolve);
}).then(() => this.runSerializedUpdate(action)) as Promise<T>;
}
}

Expand Down
40 changes: 40 additions & 0 deletions test/issue598_smoke.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Smoke test for: skip before_prompt_build hooks for subagent sessions
* Bug: sub-agent sessions cause gateway blocking — hooks without subagent skip
* run LanceDB I/O sequentially, blocking all other user sessions.
*
* Run: node test/issue598_smoke.mjs
* Expected: all 3 hooks PASS
*/

import { readFileSync } from "fs";

const FILE = "C:\\Users\\admin\\.openclaw\\extensions\\memory-lancedb-pro\\index.ts";
const content = readFileSync(FILE, "utf-8");
const lines = content.split("\n");

// [hook_opens_line, guard_line, name]
const checks = [
[2223, 2226, "auto-recall before_prompt_build"],
[3084, 3087, "reflection-injector inheritance"],
[3113, 3116, "reflection-injector derived"],
];

let pass = 0, fail = 0;
for (const [hookLine, guardLine, name] of checks) {
const hookContent = (lines[hookLine - 1] || "").trim();
const guardContent = (lines[guardLine - 1] || "").trim();
if (hookContent.includes("before_prompt_build") && guardContent.includes(":subagent:")) {
console.log(`PASS ${name.padEnd(40)} hook@${hookLine} guard@${guardLine}`);
pass++;
} else {
console.log(`FAIL ${name}`);
console.log(` hook@${hookLine}: ${hookContent}`);
console.log(` guard@${guardLine}: ${guardContent}`);
fail++;
}
}

console.log(`\n${pass}/${pass + fail} checks passed`);
if (fail > 0) process.exit(1);
else console.log("ALL PASSED — subagent sessions skipped before async work");
Loading