Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/ci-test-manifest.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const CI_TEST_MANIFEST = [
{ group: "packaging-and-workflow", runner: "node", file: "test/workflow-fork-guards.test.mjs", args: ["--test"] },
{ group: "storage-and-schema", runner: "node", file: "test/clawteam-scope.test.mjs", args: ["--test"] },
{ group: "storage-and-schema", runner: "node", file: "test/cross-process-lock.test.mjs", args: ["--test"] },
{ group: "core-regression", runner: "node", file: "test/lock-stress-test.mjs", args: ["--test"] },
{ group: "core-regression", runner: "node", file: "test/preference-slots.test.mjs", args: ["--test"] },
{ group: "core-regression", runner: "node", file: "test/is-latest-auto-supersede.test.mjs" },
{ group: "core-regression", runner: "node", file: "test/hook-dedup-phase1.test.mjs", args: ["--test"] },
Expand Down
1 change: 1 addition & 0 deletions scripts/verify-ci-test-manifest.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const EXPECTED_BASELINE = [
{ group: "packaging-and-workflow", runner: "node", file: "test/workflow-fork-guards.test.mjs", args: ["--test"] },
{ group: "storage-and-schema", runner: "node", file: "test/clawteam-scope.test.mjs", args: ["--test"] },
{ group: "storage-and-schema", runner: "node", file: "test/cross-process-lock.test.mjs", args: ["--test"] },
{ group: "core-regression", runner: "node", file: "test/lock-stress-test.mjs", args: ["--test"] },
{ group: "core-regression", runner: "node", file: "test/preference-slots.test.mjs", args: ["--test"] },
{ group: "core-regression", runner: "node", file: "test/is-latest-auto-supersede.test.mjs" },
{ group: "core-regression", runner: "node", file: "test/temporal-awareness.test.mjs", args: ["--test"] },
Expand Down
131 changes: 94 additions & 37 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import {
mkdirSync,
realpathSync,
lstatSync,
rmSync,
statSync,
unlinkSync,
} from "node:fs";
Expand Down Expand Up @@ -67,6 +66,11 @@ async function loadLockfile(): Promise<any> {
return lockfileModule;
}

/** For unit testing: override the lockfile module with a mock. */
export function __setLockfileModuleForTests(module: any): void {
lockfileModule = module;
}

export const loadLanceDB = async (): Promise<
typeof import("@lancedb/lancedb")
> => {
Expand Down Expand Up @@ -157,7 +161,7 @@ export function validateStoragePath(dbPath: string): string {
) {
throw err;
} else {
// Other lstat failures ??continue with original path
// Other lstat failures continue with original path
}
}

Expand Down Expand Up @@ -201,23 +205,27 @@ export class MemoryStore {
private table: LanceDB.Table | null = null;
private initPromise: Promise<void> | null = null;
private ftsIndexCreated = false;
// Tail-reset serialization: replaces unbounded promise chain with a boolean flag + FIFO queue.
private _updating = false;
private _waitQueue: Array<() => void> = [];
private updateQueue: Promise<void> = Promise.resolve();

constructor(private readonly config: StoreConfig) { }

private async runWithFileLock<T>(fn: () => Promise<T>): Promise<T> {
const lockfile = await loadLockfile();
const lockPath = join(this.config.dbPath, ".memory-write.lock");

// Ensure lock file exists before locking (proper-lockfile requires it)
if (!existsSync(lockPath)) {
try { mkdirSync(dirname(lockPath), { recursive: true }); } catch {}
try { const { writeFileSync } = await import("node:fs"); writeFileSync(lockPath, "", { flag: "wx" }); } catch {}
}

// Proactive cleanup of stale lock artifacts (fixes stale-lock ECOMPROMISED)
// 【修復 #415】調整 retries:max wait 從 ~3100ms → ~151秒
// 指數退避:1s, 2s, 4s, 8s, 16s, 30s×5,總計約 151 秒
// ECOMPROMISED 透過 onCompromised callback 觸發(非 throw),使用 flag 機制正確處理
let isCompromised = false;
let compromisedErr: unknown = null;
let fnSucceeded = false;
let fnError: unknown = null;

// Proactive cleanup of stale lock artifacts(from PR #626)
// 根本避免 >5 分鐘的 lock artifact 導致 ECOMPROMISED
if (existsSync(lockPath)) {
try {
const stat = statSync(lockPath);
Expand All @@ -231,10 +239,61 @@ export class MemoryStore {
}

const release = await lockfile.lock(lockPath, {
retries: { retries: 10, factor: 2, minTimeout: 200, maxTimeout: 5000 },
stale: 10000,
retries: {
retries: 10,
factor: 2,
minTimeout: 1000, // James 保守設定:避免高負載下過度密集重試
maxTimeout: 30000, // James 保守設定:支撐更久的 event loop 阻塞
},
stale: 10000, // 10 秒後視為 stale,觸發 ECOMPROMISED callback
// 注意:ECOMPROMISED 是 ambiguous degradation 訊號,mtime 無法區分
// "holder 崩潰" vs "holder event loop 阻塞",所以不嘗試區分
onCompromised: (err: unknown) => {
// 【修復 #415 關鍵】必須是同步 callback
// setLockAsCompromised() 不等待 Promise,async throw 無法傳回 caller
isCompromised = true;
compromisedErr = err;
},
});
try { return await fn(); } finally { await release(); }

try {
const result = await fn();
fnSucceeded = true;
return result;
} catch (e: unknown) {
fnError = e;
throw e;
} finally {
if (isCompromised) {
// fnError 優先:fn() 失敗時,fn 的錯誤比 compromised 重要
if (fnError !== null) {
throw fnError;
}
// fn() 尚未完成就 compromised → throw,讓 caller 知道要重試
if (!fnSucceeded) {
throw compromisedErr as Error;
}
// fn() 成功執行,但 lock 在執行期間被標記 compromised
// 正確行為:回傳成功結果(資料已寫入),明確告知 caller 不要重試
console.warn(
`[memory-lancedb-pro] Returning successful result despite compromised lock at "${lockPath}". ` +
`Callers must not retry this operation automatically.`,
);
// 【修復 #415】compromised 後 release() 會回 ERELEASED,忽略即可
// 重要:不要在這裡 return!否則 finally 的 return 會覆蓋 try 的 return 值
try {
await release();
} catch (e: unknown) {
if ((e as NodeJS.ErrnoException).code === 'ERELEASED') {
// ERELEASED 是預期行為,不做任何事,讓 try 的 return 值通過
} else {
throw e; // 其他錯誤照拋
}
}
} else {
await release();
}
}
}

get dbPath(): string {
Expand Down Expand Up @@ -297,24 +356,24 @@ export class MemoryStore {

if (missingColumns.length > 0) {
console.warn(
`memory-lancedb-pro: migrating legacy table ??adding columns: ${missingColumns.map((c) => c.name).join(", ")}`,
`memory-lancedb-pro: migrating legacy table adding columns: ${missingColumns.map((c) => c.name).join(", ")}`,
);
await table.addColumns(missingColumns);
console.log(
`memory-lancedb-pro: migration complete ??${missingColumns.length} column(s) added`,
`memory-lancedb-pro: migration complete ${missingColumns.length} column(s) added`,
);
}
} catch (err) {
const msg = String(err);
if (msg.includes("already exists")) {
// Concurrent initialization race ??another process already added the columns
// Concurrent initialization race another process already added the columns
console.log("memory-lancedb-pro: migration columns already exist (concurrent init)");
} else {
console.warn("memory-lancedb-pro: could not check/migrate table schema:", err);
}
}
} catch (_openErr) {
// Table doesn't exist yet ??create it
// Table doesn't exist yet create it
const schemaEntry: MemoryEntry = {
id: "__schema__",
text: "",
Expand All @@ -333,7 +392,7 @@ export class MemoryStore {
await table.delete('id = "__schema__"');
} catch (createErr) {
// Race: another caller (or eventual consistency) created the table
// between our failed openTable and this createTable ??just open it.
// between our failed openTable and this createTable just open it.
if (String(createErr).includes("already exists")) {
table = await db.openTable(TABLE_NAME);
} else {
Expand Down Expand Up @@ -408,9 +467,10 @@ export class MemoryStore {
return this.runWithFileLock(async () => {
try {
await this.table!.add([fullEntry]);
} catch (err: any) {
const code = err.code || "";
const message = err.message || String(err);
} catch (err: unknown) {
const e = err as { code?: string; message?: string };
const code = e.code || "";
const message = e.message || String(err);
throw new Error(
`Failed to store memory in "${this.config.dbPath}": ${code} ${message}`,
);
Expand Down Expand Up @@ -901,7 +961,7 @@ export class MemoryStore {
throw new Error(`Memory ${id} is outside accessible scopes`);
}

return this.runWithFileLock(async () => {
return this.runWithFileLock(() => this.runSerializedUpdate(async () => {
// Support both full UUID and short prefix (8+ hex chars), same as delete()
const uuidRegex =
/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
Expand Down Expand Up @@ -1016,25 +1076,22 @@ export class MemoryStore {
}

return updated;
});
}));
}

private async runSerializedUpdate<T>(action: () => Promise<T>): Promise<T> {
// Tail-reset: no infinite promise chain. Uses a boolean flag + FIFO queue.
if (!this._updating) {
this._updating = true;
try {
return await action();
} finally {
this._updating = false;
const next = this._waitQueue.shift();
if (next) next();
}
} else {
// Already busy — enqueue and wait for the current owner to signal done.
return new Promise<void>((resolve) => {
this._waitQueue.push(resolve);
}).then(() => this.runSerializedUpdate(action)) as Promise<T>;
const previous = this.updateQueue;
let release: (() => void) | undefined;
const lock = new Promise<void>((resolve) => {
release = resolve;
});
this.updateQueue = previous.then(() => lock);

await previous;
try {
return await action();
} finally {
release?.();
}
}

Expand Down
148 changes: 148 additions & 0 deletions test/lock-stress-test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* 高並發鎖壓力測試 v2(改良版)
* 測試重點:
* 1. 高並發寫入不會 crash(無 ECOMPROMISED)
* 2. 重負載下長等待不會導致 Gateway 崩潰
* 3. 資料完整性
*/
import { describe, it, before, after } from "node:test";
import assert from "node:assert/strict";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import jitiFactory from "jiti";

const jiti = jitiFactory(import.meta.url, { interopDefault: true });
const { MemoryStore } = jiti("../src/store.ts");

let workDir;

before(() => {
workDir = mkdtempSync(join(tmpdir(), "memory-lancedb-pro-stress-v2-"));
});

after(() => {
if (workDir) {
rmSync(workDir, { recursive: true, force: true });
}
});

describe("高並發鎖壓力測試 v2", { concurrency: 1 }, () => {
// 測試 1:中等並發(3個同時寫)不 crash
it("中等並發寫入(3行程×5次)無 ECOMPROMISED crash", async () => {
const store = new MemoryStore({ dbPath: join(workDir, "medium-concurrent"), vectorDim: 3 });
const errors = [];

const worker = async (workerId) => {
const results = [];
for (let i = 0; i < 5; i++) {
try {
const r = await store.store({
text: `w${workerId}-${i}`,
vector: [workerId * 10 + i, 0, 0],
category: "stress",
scope: "global",
importance: 0.5,
metadata: "{}",
});
results.push({ ok: true, id: r.id });
} catch (err) {
const isEcomp = err.code === "ECOMPROMISED" || (err.message && err.message.includes("ECOMPROMISED"));
errors.push({ workerId, i, code: err.code, msg: err.message, isEcomp });
results.push({ ok: false, error: err.message, isEcomp });
}
}
return results;
};

// 3 個 worker 同時啟動
const allResults = await Promise.all([worker(0), worker(1), worker(2)]);
const flat = allResults.flat();
const ecompCount = flat.filter(r => r.isEcomp).length;

console.log(`\n [中等並發] 總操作: ${flat.length}, 成功: ${flat.filter(r => r.ok).length}, ECOMPROMISED: ${ecompCount}`);
if (errors.length > 0) {
errors.forEach(e => console.log(` Worker${e.workerId} op${e.i}: ${e.code || "?"} - ${e.msg}`));
}

// 核心驗證:0 個 ECOMPROMISED crash
assert.strictEqual(ecompCount, 0, `不應有 ECOMPROMISED crash,但發生了 ${ecompCount} 次`);
// 至少有半數成功
assert.ok(flat.filter(r => r.ok).length >= 7, "起碼要有 7/15 成功");
});

// 測試 2:真正並發請求 — 用 Promise.all 同時搶 lock
// 模擬 holder 持有 lock 時,competitor 嘗試取得 lock
// 結果應該是:兩個都成功(一個立即,一個等到 lock 釋放後)
it("並發寫入時兩個都成功(retry 機制正常運作)", async () => {
const store = new MemoryStore({ dbPath: join(workDir, "concurrent-retry"), vectorDim: 3 });

// 用 Promise.all 同時發起兩個 store 請求,真正測試並發競爭下的 retry 行為
const start = Date.now();
const [r1, r2] = await Promise.all([
store.store({
text: "concurrent-1",
vector: [1, 0, 0],
category: "fact",
scope: "global",
importance: 0.8,
metadata: "{}",
}),
store.store({
text: "concurrent-2",
vector: [0, 1, 0],
category: "fact",
scope: "global",
importance: 0.8,
metadata: "{}",
}),
]);
const elapsed = Date.now() - start;

console.log(`\n [並發競爭] 耗時: ${elapsed}ms, id1=${r1.id.slice(0,8)}, id2=${r2.id.slice(0,8)}`);
// F3 修復:明確斷言兩個請求都成功(不死、不 ECOMPROMISED)
assert.ok(r1.id, "第一個請求應該成功(不死、不拋 ECOMPROMISED)");
assert.ok(r2.id, "第二個請求應該成功(retry 後成功,不拋 ECOMPROMISED)");
assert.ok(r1.id !== r2.id, "兩個請求應該產生不同 ID");
// EF4 修復:明確斷言耗時在合理範圍內,防止 CI hang
assert.ok(elapsed < 30000, `並發鎖競爭應在合理時間內完成(< 30s),實際 ${elapsed}ms`);
});

// 測試 3:批量順序寫入後資料完整性(stress test 不該用 30 個並發,那會 ELOCKED)
it("批量寫入後所有資料都能正確讀回", async () => {
const store = new MemoryStore({ dbPath: join(workDir, "bulk-integrity"), vectorDim: 3 });
const COUNT = 20;
const TIMEOUT_MS = 60_000; // EF4 修復:60 秒安全上限,防止 CI hang

// 順序寫入(不是並發),驗證大量寫入的資料完整性
const entries = [];
for (let i = 0; i < COUNT; i++) {
// EF4 修復:單次操作加安全上限
const opStart = Date.now();
const r = await Promise.race([
store.store({
text: `bulk-${i}`,
vector: [i * 0.1, i * 0.2, i * 0.3],
category: "fact",
scope: "global",
importance: 0.6,
metadata: "{}",
}),
new Promise((_, reject) =>
setTimeout(() => reject(new Error(`bulk[${i}] timeout after ${TIMEOUT_MS}ms`)), TIMEOUT_MS)
),
]);
const opElapsed = Date.now() - opStart;
assert.ok(opElapsed < TIMEOUT_MS, `bulk[${i}] 單次寫入應在 ${TIMEOUT_MS}ms 內,實際 ${opElapsed}ms`);
entries.push(r);
}

const ids = entries.map(e => e.id);
const uniqueIds = new Set(ids);
assert.strictEqual(uniqueIds.size, COUNT, `應該有 ${COUNT} 個唯一 ID`);

// 全部能讀回
const all = await store.list(undefined, undefined, 100, 0);
assert.strictEqual(all.length, COUNT, `list 應該返回 ${COUNT} 筆記錄`);
});
});
Loading