diff --git a/packages/relay/src/queue.ts b/packages/relay/src/queue.ts index e681b7e..3b8543b 100644 --- a/packages/relay/src/queue.ts +++ b/packages/relay/src/queue.ts @@ -588,7 +588,7 @@ export async function defaultCountWorkingBoxes( sinceCreateMs: msSince(b.createdAt), }; }); - let count = countWorkingSlots(entries, idleGraceMs); + const count = countWorkingSlots(entries, idleGraceMs); let jobs: QueueJob[]; try { @@ -596,49 +596,85 @@ export async function defaultCountWorkingBoxes( } catch { return count; } + return count + countInFlightCreateJobs(jobs, registeredIds); +} + +/** + * Count `running` queue jobs whose box isn't yet accounted for elsewhere — an + * in-flight create that occupies a concurrency slot before its box exists. + * Shared by both gates: the caller passes the set of box ids it already counted + * (relay registry ids for the working gate, state.json ids for the running + * gate), so a job is added only while its box is invisible to that source. A + * job whose worker pid is dead is skipped (its box, if any, is counted directly; + * the orphan-recovery sweep flips the stale manifest to `failed`). + */ +export function countInFlightCreateJobs(jobs: QueueJob[], accountedBoxIds: Set): number { + let n = 0; for (const j of jobs) { if (j.status !== 'running') continue; - if (j.boxId && registeredIds.has(j.boxId)) continue; // counted via its box entry + if (j.boxId && accountedBoxIds.has(j.boxId)) continue; // counted via its box if (typeof j.pid === 'number' && !processAlive(j.pid)) continue; // dead worker - count += 1; // in-flight box creation — occupies a slot until it registers + n += 1; } - return count; + return n; } const RUNNING_COUNT_CACHE_MS = 3_000; -let runningCountCache: { value: number; expiresAt: number } | null = null; +let boxStateCache: { boxCount: number; stateIds: Set; expiresAt: number } | null = null; /** - * Cross-provider count of boxes whose runtime state is `running`. Reads - * `~/.agentbox/state.json`, then per-provider: + * Cross-provider count of boxes whose runtime state is `running`, plus the + * in-flight queue jobs whose box isn't in `state.json` yet. + * + * The box-state portion reads `~/.agentbox/state.json`, then per-provider: * - docker: `docker inspect --format {{.State.Status}}` shell-out (cheap). * - non-docker (daytona/hetzner/…): counted as running. Cloud providers * don't expose a cheap synchronous probe; the autopause/queue loops would * pay an SDK round-trip per box per tick otherwise. Tracked as a v1 * limitation: a paused cloud box still counts against the slot cap until * it's destroyed. Acceptable because cloud pause is rare today. - * Result cached for 3s — multiple slot decisions in one tick share the count. + * That portion is cached for 3s so multiple slot decisions in one tick share it. + * + * The in-flight term is recomputed every call (a cheap `loadQueue` fs read), NOT + * cached: a job the scheduler just flipped to `running` hasn't created its box + * yet (~25s on cloud, image pull on docker), so it's absent from `state.json`. + * Without counting it, the per-tick "start as many as fit" loop would re-select + * the same free slot for the next job and over-start past `--max-running`. */ export async function defaultCountRunningBoxes(): Promise { + const { boxCount, stateIds } = await cachedBoxState(); + let jobs: QueueJob[]; + try { + jobs = await loadQueue(); + } catch { + return boxCount; + } + return boxCount + countInFlightCreateJobs(jobs, stateIds); +} + +async function cachedBoxState(): Promise<{ boxCount: number; stateIds: Set }> { const now = Date.now(); - if (runningCountCache && runningCountCache.expiresAt > now) { - return runningCountCache.value; + if (boxStateCache && boxStateCache.expiresAt > now) { + return { boxCount: boxStateCache.boxCount, stateIds: boxStateCache.stateIds }; } - const value = await uncachedCountRunningBoxes(); - runningCountCache = { value, expiresAt: now + RUNNING_COUNT_CACHE_MS }; - return value; + const fresh = await uncachedBoxStateCount(); + boxStateCache = { ...fresh, expiresAt: now + RUNNING_COUNT_CACHE_MS }; + return fresh; } -async function uncachedCountRunningBoxes(): Promise { +/** Running-box count from `state.json` plus the set of those boxes' ids (for + * in-flight-job dedup). Read errors / no boxes → zero and an empty set. */ +async function uncachedBoxStateCount(): Promise<{ boxCount: number; stateIds: Set }> { let boxes: BoxRecord[]; try { boxes = (await readState(STATE_FILE)).boxes; } catch { - return 0; + return { boxCount: 0, stateIds: new Set() }; } - if (boxes.length === 0) return 0; + const stateIds = new Set(boxes.map((b) => b.id)); + if (boxes.length === 0) return { boxCount: 0, stateIds }; - let count = 0; + let boxCount = 0; const dockerBoxes: BoxRecord[] = []; for (const b of boxes) { const provider = b.provider ?? 'docker'; @@ -647,16 +683,16 @@ async function uncachedCountRunningBoxes(): Promise { } else { // Optimistic: count cloud boxes as running. See note on // {@link defaultCountRunningBoxes} for why. - count += 1; + boxCount += 1; } } if (dockerBoxes.length > 0) { const states = await Promise.all(dockerBoxes.map((b) => inspectDockerState(b.container))); for (const s of states) { - if (s === 'running') count += 1; + if (s === 'running') boxCount += 1; } } - return count; + return { boxCount, stateIds }; } function inspectDockerState(containerName: string): Promise<'running' | 'other'> { diff --git a/packages/relay/test/queue.test.ts b/packages/relay/test/queue.test.ts index f4fd29b..baf62fb 100644 --- a/packages/relay/test/queue.test.ts +++ b/packages/relay/test/queue.test.ts @@ -2,6 +2,7 @@ import { readFile, rm, writeFile } from 'node:fs/promises'; import { join } from 'node:path'; import { describe, expect, it } from 'vitest'; import { + countInFlightCreateJobs, countWorkingSlots, defaultCountWorkingBoxes, loadQueue, @@ -276,6 +277,49 @@ describe('defaultCountWorkingBoxes', () => { }); }); +describe('countInFlightCreateJobs', () => { + // A pid that's essentially never alive — process.kill(pid, 0) → ESRCH. + const DEAD_PID = 2_147_483_646; + + it('counts a running job whose box is not yet accounted for', () => { + // No boxId yet (worker hasn't created the box) → in-flight, occupies a slot. + const jobs = [job({ id: 'a', status: 'running', startedAt: '2024-01-01T00:00:01.000Z' })]; + expect(countInFlightCreateJobs(jobs, new Set())).toBe(1); + }); + + it('does not count a running job once its box id is accounted for', () => { + // boxId present AND in the accounted set → counted via its box, not here. + const jobs = [job({ id: 'a', status: 'running', boxId: 'box1' })]; + expect(countInFlightCreateJobs(jobs, new Set(['box1']))).toBe(0); + // Same job, box NOT in the set → still in-flight, counted. + expect(countInFlightCreateJobs(jobs, new Set(['other']))).toBe(1); + }); + + it('ignores non-running jobs', () => { + const jobs = [ + job({ id: 'q', status: 'queued' }), + job({ id: 'd', status: 'done' }), + job({ id: 'f', status: 'failed' }), + ]; + expect(countInFlightCreateJobs(jobs, new Set())).toBe(0); + }); + + it('skips a running job whose worker pid is dead', () => { + const jobs = [job({ id: 'a', status: 'running', pid: DEAD_PID })]; + expect(countInFlightCreateJobs(jobs, new Set())).toBe(0); + }); + + it('sums multiple in-flight jobs', () => { + const jobs = [ + job({ id: 'a', status: 'running' }), + job({ id: 'b', status: 'running', boxId: 'reg', pid: undefined }), + job({ id: 'c', status: 'running' }), + ]; + // 'b' is deduped via its box; 'a' and 'c' are in-flight. + expect(countInFlightCreateJobs(jobs, new Set(['reg']))).toBe(2); + }); +}); + describe('startQueueLoop working-agent gate', () => { const cfg = (over: Partial): QueueConfig => ({ enabled: true,