diff --git a/src/cli/repo_context.ts b/src/cli/repo_context.ts index 75d0e612..14cb5139 100644 --- a/src/cli/repo_context.ts +++ b/src/cli/repo_context.ts @@ -54,7 +54,10 @@ import { import { S3Lock } from "../infrastructure/persistence/s3_lock.ts"; import { S3Client } from "../infrastructure/persistence/s3_client.ts"; import { FileLock } from "../infrastructure/persistence/file_lock.ts"; -import type { DistributedLock } from "../domain/datastore/distributed_lock.ts"; +import { + type DistributedLock, + LockTimeoutError, +} from "../domain/datastore/distributed_lock.ts"; import { type CustomDatastoreConfig, type DatastoreConfig, @@ -582,7 +585,11 @@ export async function acquireModelLocks( "Global lock held by {holder} — waiting for structural command to finish", { holder: globalInfo.holder }, ); - // Poll until the global lock is released or expires (stale lock) + // Poll until the global lock is released or expires (stale lock). + // Timeout after 2x TTL to prevent indefinite hangs if staleness + // detection fails (e.g. clock skew, S3 consistency issues). + const globalWaitStart = Date.now(); + const globalMaxWaitMs = (globalInfo.ttlMs ?? 30_000) * 2; while (true) { await new Promise((resolve) => setTimeout(resolve, 1_000)); const info = await globalLock.inspect(); @@ -596,6 +603,15 @@ export async function acquireModelLocks( ); break; } + // Hard timeout to prevent indefinite hangs + const globalElapsed = Date.now() - globalWaitStart; + if (globalElapsed >= globalMaxWaitMs) { + throw new LockTimeoutError( + ".datastore.lock", + info, + globalElapsed, + ); + } } logger.info`Global lock released, proceeding with per-model locks`; } @@ -654,7 +670,9 @@ export async function acquireModelLocks( } lockKeys.length = 0; - // Wait for global lock to be released + // Wait for global lock to be released (with timeout) + const retryWaitStart = Date.now(); + const retryMaxWaitMs = (postAcquireGlobalInfo.ttlMs ?? 30_000) * 2; while (true) { await new Promise((resolve) => setTimeout(resolve, 1_000)); const info = await globalLock.inspect(); @@ -667,6 +685,14 @@ export async function acquireModelLocks( ); break; } + const retryElapsed = Date.now() - retryWaitStart; + if (retryElapsed >= retryMaxWaitMs) { + throw new LockTimeoutError( + ".datastore.lock", + info, + retryElapsed, + ); + } } // Restart the entire per-model lock acquisition from scratch diff --git a/src/infrastructure/persistence/file_lock.ts b/src/infrastructure/persistence/file_lock.ts index 78c23efe..c597788e 100644 --- a/src/infrastructure/persistence/file_lock.ts +++ b/src/infrastructure/persistence/file_lock.ts @@ -34,6 +34,7 @@ import type { LockOptions, } from "../../domain/datastore/distributed_lock.ts"; import { LockTimeoutError } from "../../domain/datastore/distributed_lock.ts"; +import { getSwampLogger } from "../logging/logger.ts"; const DEFAULT_TTL_MS = 30_000; const DEFAULT_RETRY_INTERVAL_MS = 1_000; @@ -88,6 +89,17 @@ export class FileLock implements DistributedLock { const nonce = crypto.randomUUID(); while (true) { + // Check timeout on every iteration — including retries after stale lock cleanup + const elapsed = Date.now() - startTime; + if (elapsed >= this.maxWaitMs) { + const existing = await this.readLockFile(); + throw new LockTimeoutError( + this.lockPath, + existing, + elapsed, + ); + } + const info = buildLockInfo(this.ttlMs, nonce); const content = JSON.stringify(info, null, 2); @@ -122,20 +134,10 @@ export class FileLock implements DistributedLock { } catch { // Another process may have already cleaned it up } - continue; // Retry atomic create + continue; // Retry atomic create (timeout checked at top of loop) } } - // Check timeout - const elapsed = Date.now() - startTime; - if (elapsed >= this.maxWaitMs) { - throw new LockTimeoutError( - this.lockPath, - existing, - elapsed, - ); - } - // Wait and retry await new Promise((resolve) => setTimeout(resolve, this.retryIntervalMs)); } @@ -150,8 +152,19 @@ export class FileLock implements DistributedLock { try { await Deno.remove(this.lockPath); - } catch { - // Best-effort — file may already be gone + } catch (error) { + if (!(error instanceof Deno.errors.NotFound)) { + // NotFound is expected (file already gone), but other errors + // indicate a real problem — log so operators can investigate + const logger = getSwampLogger(["datastore", "lock"]); + logger.warn( + "Failed to delete lock {path} during release: {error}", + { + path: this.lockPath, + error: error instanceof Error ? error.message : String(error), + }, + ); + } } } diff --git a/src/infrastructure/persistence/s3_lock.ts b/src/infrastructure/persistence/s3_lock.ts index 86bb40b7..f1696280 100644 --- a/src/infrastructure/persistence/s3_lock.ts +++ b/src/infrastructure/persistence/s3_lock.ts @@ -33,6 +33,7 @@ import type { } from "../../domain/datastore/distributed_lock.ts"; import { LockTimeoutError } from "../../domain/datastore/distributed_lock.ts"; import type { S3Client } from "./s3_client.ts"; +import { getSwampLogger } from "../logging/logger.ts"; const DEFAULT_TTL_MS = 30_000; const DEFAULT_RETRY_INTERVAL_MS = 1_000; @@ -96,6 +97,13 @@ export class S3Lock implements DistributedLock { const nonce = crypto.randomUUID(); while (true) { + // Check timeout on every iteration — including retries after stale lock cleanup + const elapsed = Date.now() - startTime; + if (elapsed >= this.maxWaitMs) { + const existing = await this.readLock(); + throw new LockTimeoutError(this.lockKey, existing, elapsed); + } + const info = buildLockInfo(this.ttlMs, nonce); const body = encodeLockInfo(info); @@ -122,17 +130,11 @@ export class S3Lock implements DistributedLock { } catch { // Another process may have already cleaned it up } - continue; // Retry conditional write + continue; // Retry conditional write (timeout checked at top of loop) } } } - // Check timeout - const elapsed = Date.now() - startTime; - if (elapsed >= this.maxWaitMs) { - throw new LockTimeoutError(this.lockKey, existing, elapsed); - } - // Wait and retry await new Promise((resolve) => setTimeout(resolve, this.retryIntervalMs)); } @@ -147,8 +149,16 @@ export class S3Lock implements DistributedLock { try { await this.s3.deleteObject(this.lockKey); - } catch { + } catch (error) { // Best-effort release — if S3 is unreachable, the lock expires via TTL + const logger = getSwampLogger(["datastore", "lock"]); + logger.warn( + "Failed to delete lock {key} during release: {error}", + { + key: this.lockKey, + error: error instanceof Error ? error.message : String(error), + }, + ); } } diff --git a/src/infrastructure/persistence/s3_lock_test.ts b/src/infrastructure/persistence/s3_lock_test.ts index 00326fa9..d3f35919 100644 --- a/src/infrastructure/persistence/s3_lock_test.ts +++ b/src/infrastructure/persistence/s3_lock_test.ts @@ -247,3 +247,106 @@ Deno.test("S3Lock - custom lock key", async () => { await lock.release(); }); + +Deno.test("S3Lock - times out when stale lock cannot be deleted", async () => { + const mock = createMockS3Client(); + + // Place a stale lock + const staleLock: LockInfo = { + holder: "stale@host", + hostname: "host", + pid: 99999, + acquiredAt: new Date(Date.now() - 120_000).toISOString(), + ttlMs: 5000, + }; + const body = new TextEncoder().encode(JSON.stringify(staleLock, null, 2)); + mock.storage.set(".datastore.lock", body); + + // Override headObject to return stale lastModified + const originalHead = mock.headObject.bind(mock); + (mock as unknown as Record).headObject = (key: string) => { + if (key === ".datastore.lock") { + return Promise.resolve({ + exists: true, + size: body.length, + lastModified: new Date(Date.now() - 120_000), + }); + } + return originalHead(key); + }; + + // Override deleteObject to always fail — simulates persistent S3 delete failure + (mock as unknown as Record).deleteObject = () => { + return Promise.reject(new Error("Simulated S3 delete failure")); + }; + + const lock = new S3Lock(mock, { + ttlMs: 5000, + retryIntervalMs: 10, + maxWaitMs: 200, + }); + + await assertRejects( + () => lock.acquire(), + LockTimeoutError, + ); +}); + +Deno.test("S3Lock - timeout check fires even during stale lock retry loop", async () => { + const mock = createMockS3Client(); + + // Place a stale lock + const staleLock: LockInfo = { + holder: "stale@host", + hostname: "host", + pid: 99999, + acquiredAt: new Date(Date.now() - 120_000).toISOString(), + ttlMs: 100, + }; + const body = new TextEncoder().encode(JSON.stringify(staleLock, null, 2)); + mock.storage.set(".datastore.lock", body); + + // Override headObject to return stale lastModified + (mock as unknown as Record).headObject = (key: string) => { + if (key === ".datastore.lock") { + return Promise.resolve({ + exists: true, + size: body.length, + lastModified: new Date(Date.now() - 120_000), + }); + } + return Promise.resolve({ exists: false }); + }; + + // deleteObject "succeeds" but lock reappears (simulates S3 versioning issue) + (mock as unknown as Record).deleteObject = () => { + // Don't actually remove from storage — simulates delete marker not + // preventing conditional put from failing + return Promise.resolve(); + }; + + // putObjectConditional always fails (lock "still exists" due to versioning) + (mock as unknown as Record).putObjectConditional = () => { + return Promise.resolve(false); + }; + + const lock = new S3Lock(mock, { + ttlMs: 100, + retryIntervalMs: 10, + maxWaitMs: 200, + }); + + const start = Date.now(); + await assertRejects( + () => lock.acquire(), + LockTimeoutError, + ); + const elapsed = Date.now() - start; + + // Should timeout within a reasonable margin of maxWaitMs, not hang forever + assertEquals( + elapsed < 1000, + true, + `Expected timeout within 1s, took ${elapsed}ms`, + ); +});