Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 56 additions & 20 deletions packages/relay/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -588,57 +588,93 @@ export async function defaultCountWorkingBoxes(
sinceCreateMs: msSince(b.createdAt),
};
});
let count = countWorkingSlots(entries, idleGraceMs);
const count = countWorkingSlots(entries, idleGraceMs);

let jobs: QueueJob[];
try {
jobs = await loadQueue();
} catch {
return count;
}
return count + countInFlightCreateJobs(jobs, registeredIds);
}

/**
* Count `running` queue jobs whose box isn't yet accounted for elsewhere — an
* in-flight create that occupies a concurrency slot before its box exists.
* Shared by both gates: the caller passes the set of box ids it already counted
* (relay registry ids for the working gate, state.json ids for the running
* gate), so a job is added only while its box is invisible to that source. A
* job whose worker pid is dead is skipped (its box, if any, is counted directly;
* the orphan-recovery sweep flips the stale manifest to `failed`).
*/
export function countInFlightCreateJobs(jobs: QueueJob[], accountedBoxIds: Set<string>): number {
let n = 0;
for (const j of jobs) {
if (j.status !== 'running') continue;
if (j.boxId && registeredIds.has(j.boxId)) continue; // counted via its box entry
if (j.boxId && accountedBoxIds.has(j.boxId)) continue; // counted via its box
if (typeof j.pid === 'number' && !processAlive(j.pid)) continue; // dead worker
count += 1; // in-flight box creation — occupies a slot until it registers
n += 1;
}
return count;
return n;
}

const RUNNING_COUNT_CACHE_MS = 3_000;
let runningCountCache: { value: number; expiresAt: number } | null = null;
let boxStateCache: { boxCount: number; stateIds: Set<string>; expiresAt: number } | null = null;

/**
* Cross-provider count of boxes whose runtime state is `running`. Reads
* `~/.agentbox/state.json`, then per-provider:
* Cross-provider count of boxes whose runtime state is `running`, plus the
* in-flight queue jobs whose box isn't in `state.json` yet.
*
* The box-state portion reads `~/.agentbox/state.json`, then per-provider:
* - docker: `docker inspect --format {{.State.Status}}` shell-out (cheap).
* - non-docker (daytona/hetzner/…): counted as running. Cloud providers
* don't expose a cheap synchronous probe; the autopause/queue loops would
* pay an SDK round-trip per box per tick otherwise. Tracked as a v1
* limitation: a paused cloud box still counts against the slot cap until
* it's destroyed. Acceptable because cloud pause is rare today.
* Result cached for 3s — multiple slot decisions in one tick share the count.
* That portion is cached for 3s so multiple slot decisions in one tick share it.
*
* The in-flight term is recomputed every call (a cheap `loadQueue` fs read), NOT
* cached: a job the scheduler just flipped to `running` hasn't created its box
* yet (~25s on cloud, image pull on docker), so it's absent from `state.json`.
* Without counting it, the per-tick "start as many as fit" loop would re-select
* the same free slot for the next job and over-start past `--max-running`.
*/
export async function defaultCountRunningBoxes(): Promise<number> {
const { boxCount, stateIds } = await cachedBoxState();
let jobs: QueueJob[];
try {
jobs = await loadQueue();
} catch {
return boxCount;
}
return boxCount + countInFlightCreateJobs(jobs, stateIds);
}

async function cachedBoxState(): Promise<{ boxCount: number; stateIds: Set<string> }> {
const now = Date.now();
if (runningCountCache && runningCountCache.expiresAt > now) {
return runningCountCache.value;
if (boxStateCache && boxStateCache.expiresAt > now) {
return { boxCount: boxStateCache.boxCount, stateIds: boxStateCache.stateIds };
}
const value = await uncachedCountRunningBoxes();
runningCountCache = { value, expiresAt: now + RUNNING_COUNT_CACHE_MS };
return value;
const fresh = await uncachedBoxStateCount();
boxStateCache = { ...fresh, expiresAt: now + RUNNING_COUNT_CACHE_MS };
return fresh;
}

async function uncachedCountRunningBoxes(): Promise<number> {
/** Running-box count from `state.json` plus the set of those boxes' ids (for
* in-flight-job dedup). Read errors / no boxes → zero and an empty set. */
async function uncachedBoxStateCount(): Promise<{ boxCount: number; stateIds: Set<string> }> {
let boxes: BoxRecord[];
try {
boxes = (await readState(STATE_FILE)).boxes;
} catch {
return 0;
return { boxCount: 0, stateIds: new Set() };
}
if (boxes.length === 0) return 0;
const stateIds = new Set(boxes.map((b) => b.id));
if (boxes.length === 0) return { boxCount: 0, stateIds };

let count = 0;
let boxCount = 0;
const dockerBoxes: BoxRecord[] = [];
for (const b of boxes) {
const provider = b.provider ?? 'docker';
Expand All @@ -647,16 +683,16 @@ async function uncachedCountRunningBoxes(): Promise<number> {
} else {
// Optimistic: count cloud boxes as running. See note on
// {@link defaultCountRunningBoxes} for why.
count += 1;
boxCount += 1;
}
}
if (dockerBoxes.length > 0) {
const states = await Promise.all(dockerBoxes.map((b) => inspectDockerState(b.container)));
for (const s of states) {
if (s === 'running') count += 1;
if (s === 'running') boxCount += 1;
}
}
return count;
return { boxCount, stateIds };
}

function inspectDockerState(containerName: string): Promise<'running' | 'other'> {
Expand Down
44 changes: 44 additions & 0 deletions packages/relay/test/queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { readFile, rm, writeFile } from 'node:fs/promises';
import { join } from 'node:path';
import { describe, expect, it } from 'vitest';
import {
countInFlightCreateJobs,
countWorkingSlots,
defaultCountWorkingBoxes,
loadQueue,
Expand Down Expand Up @@ -276,6 +277,49 @@ describe('defaultCountWorkingBoxes', () => {
});
});

describe('countInFlightCreateJobs', () => {
// A pid that's essentially never alive — process.kill(pid, 0) → ESRCH.
const DEAD_PID = 2_147_483_646;

it('counts a running job whose box is not yet accounted for', () => {
// No boxId yet (worker hasn't created the box) → in-flight, occupies a slot.
const jobs = [job({ id: 'a', status: 'running', startedAt: '2024-01-01T00:00:01.000Z' })];
expect(countInFlightCreateJobs(jobs, new Set())).toBe(1);
});

it('does not count a running job once its box id is accounted for', () => {
// boxId present AND in the accounted set → counted via its box, not here.
const jobs = [job({ id: 'a', status: 'running', boxId: 'box1' })];
expect(countInFlightCreateJobs(jobs, new Set(['box1']))).toBe(0);
// Same job, box NOT in the set → still in-flight, counted.
expect(countInFlightCreateJobs(jobs, new Set(['other']))).toBe(1);
});

it('ignores non-running jobs', () => {
const jobs = [
job({ id: 'q', status: 'queued' }),
job({ id: 'd', status: 'done' }),
job({ id: 'f', status: 'failed' }),
];
expect(countInFlightCreateJobs(jobs, new Set())).toBe(0);
});

it('skips a running job whose worker pid is dead', () => {
const jobs = [job({ id: 'a', status: 'running', pid: DEAD_PID })];
expect(countInFlightCreateJobs(jobs, new Set())).toBe(0);
});

it('sums multiple in-flight jobs', () => {
const jobs = [
job({ id: 'a', status: 'running' }),
job({ id: 'b', status: 'running', boxId: 'reg', pid: undefined }),
job({ id: 'c', status: 'running' }),
];
// 'b' is deduped via its box; 'a' and 'c' are in-flight.
expect(countInFlightCreateJobs(jobs, new Set(['reg']))).toBe(2);
});
});

describe('startQueueLoop working-agent gate', () => {
const cfg = (over: Partial<QueueConfig>): QueueConfig => ({
enabled: true,
Expand Down
Loading