Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/cli/repo_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ import {
import { S3Lock } from "../infrastructure/persistence/s3_lock.ts";
import { S3Client } from "../infrastructure/persistence/s3_client.ts";
import { FileLock } from "../infrastructure/persistence/file_lock.ts";
import type { DistributedLock } from "../domain/datastore/distributed_lock.ts";
import {
type DistributedLock,
LockTimeoutError,
} from "../domain/datastore/distributed_lock.ts";
import {
type CustomDatastoreConfig,
type DatastoreConfig,
Expand Down Expand Up @@ -582,7 +585,11 @@ export async function acquireModelLocks(
"Global lock held by {holder} — waiting for structural command to finish",
{ holder: globalInfo.holder },
);
// Poll until the global lock is released or expires (stale lock)
// Poll until the global lock is released or expires (stale lock).
// Timeout after 2x TTL to prevent indefinite hangs if staleness
// detection fails (e.g. clock skew, S3 consistency issues).
const globalWaitStart = Date.now();
const globalMaxWaitMs = (globalInfo.ttlMs ?? 30_000) * 2;
while (true) {
await new Promise((resolve) => setTimeout(resolve, 1_000));
const info = await globalLock.inspect();
Expand All @@ -596,6 +603,15 @@ export async function acquireModelLocks(
);
break;
}
// Hard timeout to prevent indefinite hangs
const globalElapsed = Date.now() - globalWaitStart;
if (globalElapsed >= globalMaxWaitMs) {
throw new LockTimeoutError(
".datastore.lock",
info,
globalElapsed,
);
}
}
logger.info`Global lock released, proceeding with per-model locks`;
}
Expand Down Expand Up @@ -654,7 +670,9 @@ export async function acquireModelLocks(
}
lockKeys.length = 0;

// Wait for global lock to be released
// Wait for global lock to be released (with timeout)
const retryWaitStart = Date.now();
const retryMaxWaitMs = (postAcquireGlobalInfo.ttlMs ?? 30_000) * 2;
while (true) {
await new Promise((resolve) => setTimeout(resolve, 1_000));
const info = await globalLock.inspect();
Expand All @@ -667,6 +685,14 @@ export async function acquireModelLocks(
);
break;
}
const retryElapsed = Date.now() - retryWaitStart;
if (retryElapsed >= retryMaxWaitMs) {
throw new LockTimeoutError(
".datastore.lock",
info,
retryElapsed,
);
}
}

// Restart the entire per-model lock acquisition from scratch
Expand Down
39 changes: 26 additions & 13 deletions src/infrastructure/persistence/file_lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import type {
LockOptions,
} from "../../domain/datastore/distributed_lock.ts";
import { LockTimeoutError } from "../../domain/datastore/distributed_lock.ts";
import { getSwampLogger } from "../logging/logger.ts";

const DEFAULT_TTL_MS = 30_000;
const DEFAULT_RETRY_INTERVAL_MS = 1_000;
Expand Down Expand Up @@ -88,6 +89,17 @@ export class FileLock implements DistributedLock {
const nonce = crypto.randomUUID();

while (true) {
// Check timeout on every iteration — including retries after stale lock cleanup
const elapsed = Date.now() - startTime;
if (elapsed >= this.maxWaitMs) {
const existing = await this.readLockFile();
throw new LockTimeoutError(
this.lockPath,
existing,
elapsed,
);
}

const info = buildLockInfo(this.ttlMs, nonce);
const content = JSON.stringify(info, null, 2);

Expand Down Expand Up @@ -122,20 +134,10 @@ export class FileLock implements DistributedLock {
} catch {
// Another process may have already cleaned it up
}
continue; // Retry atomic create
continue; // Retry atomic create (timeout checked at top of loop)
}
}

// Check timeout
const elapsed = Date.now() - startTime;
if (elapsed >= this.maxWaitMs) {
throw new LockTimeoutError(
this.lockPath,
existing,
elapsed,
);
}

// Wait and retry
await new Promise((resolve) => setTimeout(resolve, this.retryIntervalMs));
}
Expand All @@ -150,8 +152,19 @@ export class FileLock implements DistributedLock {

try {
await Deno.remove(this.lockPath);
} catch {
// Best-effort — file may already be gone
} catch (error) {
if (!(error instanceof Deno.errors.NotFound)) {
// NotFound is expected (file already gone), but other errors
// indicate a real problem — log so operators can investigate
const logger = getSwampLogger(["datastore", "lock"]);
logger.warn(
"Failed to delete lock {path} during release: {error}",
{
path: this.lockPath,
error: error instanceof Error ? error.message : String(error),
},
);
}
}
}

Expand Down
26 changes: 18 additions & 8 deletions src/infrastructure/persistence/s3_lock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import type {
} from "../../domain/datastore/distributed_lock.ts";
import { LockTimeoutError } from "../../domain/datastore/distributed_lock.ts";
import type { S3Client } from "./s3_client.ts";
import { getSwampLogger } from "../logging/logger.ts";

const DEFAULT_TTL_MS = 30_000;
const DEFAULT_RETRY_INTERVAL_MS = 1_000;
Expand Down Expand Up @@ -96,6 +97,13 @@ export class S3Lock implements DistributedLock {
const nonce = crypto.randomUUID();

while (true) {
// Check timeout on every iteration — including retries after stale lock cleanup
const elapsed = Date.now() - startTime;
if (elapsed >= this.maxWaitMs) {
const existing = await this.readLock();
throw new LockTimeoutError(this.lockKey, existing, elapsed);
}

const info = buildLockInfo(this.ttlMs, nonce);
const body = encodeLockInfo(info);

Expand All @@ -122,17 +130,11 @@ export class S3Lock implements DistributedLock {
} catch {
// Another process may have already cleaned it up
}
continue; // Retry conditional write
continue; // Retry conditional write (timeout checked at top of loop)
}
}
}

// Check timeout
const elapsed = Date.now() - startTime;
if (elapsed >= this.maxWaitMs) {
throw new LockTimeoutError(this.lockKey, existing, elapsed);
}

// Wait and retry
await new Promise((resolve) => setTimeout(resolve, this.retryIntervalMs));
}
Expand All @@ -147,8 +149,16 @@ export class S3Lock implements DistributedLock {

try {
await this.s3.deleteObject(this.lockKey);
} catch {
} catch (error) {
// Best-effort release — if S3 is unreachable, the lock expires via TTL
const logger = getSwampLogger(["datastore", "lock"]);
logger.warn(
"Failed to delete lock {key} during release: {error}",
{
key: this.lockKey,
error: error instanceof Error ? error.message : String(error),
},
);
}
}

Expand Down
103 changes: 103 additions & 0 deletions src/infrastructure/persistence/s3_lock_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -247,3 +247,106 @@ Deno.test("S3Lock - custom lock key", async () => {

await lock.release();
});

Deno.test("S3Lock - times out when stale lock cannot be deleted", async () => {
const mock = createMockS3Client();

// Place a stale lock
const staleLock: LockInfo = {
holder: "stale@host",
hostname: "host",
pid: 99999,
acquiredAt: new Date(Date.now() - 120_000).toISOString(),
ttlMs: 5000,
};
const body = new TextEncoder().encode(JSON.stringify(staleLock, null, 2));
mock.storage.set(".datastore.lock", body);

// Override headObject to return stale lastModified
const originalHead = mock.headObject.bind(mock);
(mock as unknown as Record<string, unknown>).headObject = (key: string) => {
if (key === ".datastore.lock") {
return Promise.resolve({
exists: true,
size: body.length,
lastModified: new Date(Date.now() - 120_000),
});
}
return originalHead(key);
};

// Override deleteObject to always fail — simulates persistent S3 delete failure
(mock as unknown as Record<string, unknown>).deleteObject = () => {
return Promise.reject(new Error("Simulated S3 delete failure"));
};

const lock = new S3Lock(mock, {
ttlMs: 5000,
retryIntervalMs: 10,
maxWaitMs: 200,
});

await assertRejects(
() => lock.acquire(),
LockTimeoutError,
);
});

Deno.test("S3Lock - timeout check fires even during stale lock retry loop", async () => {
const mock = createMockS3Client();

// Place a stale lock
const staleLock: LockInfo = {
holder: "stale@host",
hostname: "host",
pid: 99999,
acquiredAt: new Date(Date.now() - 120_000).toISOString(),
ttlMs: 100,
};
const body = new TextEncoder().encode(JSON.stringify(staleLock, null, 2));
mock.storage.set(".datastore.lock", body);

// Override headObject to return stale lastModified
(mock as unknown as Record<string, unknown>).headObject = (key: string) => {
if (key === ".datastore.lock") {
return Promise.resolve({
exists: true,
size: body.length,
lastModified: new Date(Date.now() - 120_000),
});
}
return Promise.resolve({ exists: false });
};

// deleteObject "succeeds" but lock reappears (simulates S3 versioning issue)
(mock as unknown as Record<string, unknown>).deleteObject = () => {
// Don't actually remove from storage — simulates delete marker not
// preventing conditional put from failing
return Promise.resolve();
};

// putObjectConditional always fails (lock "still exists" due to versioning)
(mock as unknown as Record<string, unknown>).putObjectConditional = () => {
return Promise.resolve(false);
};

const lock = new S3Lock(mock, {
ttlMs: 100,
retryIntervalMs: 10,
maxWaitMs: 200,
});

const start = Date.now();
await assertRejects(
() => lock.acquire(),
LockTimeoutError,
);
const elapsed = Date.now() - start;

// Should timeout within a reasonable margin of maxWaitMs, not hang forever
assertEquals(
elapsed < 1000,
true,
`Expected timeout within 1s, took ${elapsed}ms`,
);
});
Loading