Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/host-daemon/src/app.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ function createFetchRecorder(
heartbeatIntervalMs: 30000,
leaseTimeoutMs: 90000,
trackedThreadTargets: [],
liveEnvironmentIds: [],
},
{ status: 201 },
);
Expand Down Expand Up @@ -570,6 +571,7 @@ describe("createHostDaemonApp", () => {
heartbeatIntervalMs: 30000,
leaseTimeoutMs: 90000,
trackedThreadTargets: [],
liveEnvironmentIds: [],
},
{ status: 201 },
);
Expand Down
11 changes: 11 additions & 0 deletions apps/host-daemon/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,17 @@ export async function createHostDaemonApp(
onTerminalMessage: (message) => terminalManager.handleMessage(message),
onSessionOpened: (session) => {
sessionState.value = session.sessionId;
void runtimeManager
.reconcileLiveEnvironments(new Set(session.liveEnvironmentIds))
.catch((error) => {
options.logger.warn(
{
sessionId: session.sessionId,
...runtimeErrorLogFields(error),
},
"Failed to reconcile live environments after session opened",
);
});
runtimeManager.replaceTrackedThreadStorageTargets(
session.trackedThreadTargets,
);
Expand Down
10 changes: 10 additions & 0 deletions apps/host-daemon/src/command-dispatch-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ export async function requireWorkspaceEnvironment(
return existing;
}

// A destroyed environment's worktree is gone. Reconnecting it here would
// re-provision a dead path and re-subscribe an FSEvents watcher on every
// poll, so refuse instead of resurrecting it.
if (runtimeManager.isEnvironmentDestroyed(args.environmentId)) {
throw new ExpectedCommandDispatchError(
"environment_destroyed",
`Environment ${args.environmentId} has been destroyed`,
);
}

return runtimeManager.ensureEnvironment({
environmentId: args.environmentId,
personalWorkspaceRoot: getPersonalWorkspaceRoot(args.dataDir),
Expand Down
8 changes: 8 additions & 0 deletions apps/host-daemon/src/command-dispatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,14 @@ const commandHandlers: CommandHandlerMap = {
if (error instanceof WorkspaceError && error.code === "path_not_found") {
return {};
}
// The daemon already tore this environment down; a repeated destroy is a
// no-op success rather than a resurrection attempt.
if (
error instanceof ExpectedCommandDispatchError &&
error.code === "environment_destroyed"
) {
return {};
}
throw error;
}
return {};
Expand Down
123 changes: 123 additions & 0 deletions apps/host-daemon/src/runtime-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,129 @@ describe("RuntimeManager", () => {
expect(workspace.destroy).toHaveBeenCalledTimes(1);
});

it("tombstones a destroyed environment so it is reported destroyed", async () => {
const stopWatchingStatus = vi.fn(() => undefined);
const { hostWatcher } = createFakeHostWatcher({
watchWorkspaceImplementation: (_args) => stopWatchingStatus,
});
const manager = new RuntimeManager({
hostWatcher,
provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-1"),
createRuntime: vi.fn(() => createFakeRuntime()),
});

await manager.ensureEnvironment({
environmentId: "env-1",
workspacePath: "/tmp/env-1",
});
expect(manager.isEnvironmentDestroyed("env-1")).toBe(false);

await manager.destroyEnvironment("env-1");

expect(manager.isEnvironmentDestroyed("env-1")).toBe(true);
expect(manager.get("env-1")).toBeUndefined();
});

it("records a tombstone when destroying an environment with no live runtime", async () => {
const manager = new RuntimeManager({
provisionWorkspace: createProvisionWorkspaceMock("/tmp/missing"),
createRuntime: vi.fn(() => createFakeRuntime()),
});

await manager.destroyEnvironment("env-missing");

expect(manager.isEnvironmentDestroyed("env-missing")).toBe(true);
});

it("clears the destroyed tombstone when an environment is explicitly re-provisioned", async () => {
const manager = new RuntimeManager({
provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-1"),
createRuntime: vi.fn(() => createFakeRuntime()),
});

await manager.ensureEnvironment({
environmentId: "env-1",
workspacePath: "/tmp/env-1",
});
await manager.destroyEnvironment("env-1");
expect(manager.isEnvironmentDestroyed("env-1")).toBe(true);

await manager.ensureEnvironment({
environmentId: "env-1",
workspacePath: "/tmp/env-1",
});

expect(manager.isEnvironmentDestroyed("env-1")).toBe(false);
expect(manager.get("env-1")).toBeDefined();
});

it("reconciles live environments by dropping watchers and runtimes for ones no longer live", async () => {
const stopWatchingStatusLive = vi.fn(() => undefined);
const stopWatchingStatusStale = vi.fn(() => undefined);
const runtimes: AgentRuntime[] = [];
const { hostWatcher } = createFakeHostWatcher({
watchWorkspaceImplementation: vi
.fn<WatchWorkspaceImplementation>()
.mockImplementationOnce((_args) => stopWatchingStatusLive)
.mockImplementationOnce((_args) => stopWatchingStatusStale),
});
const manager = new RuntimeManager({
hostWatcher,
provisionWorkspace: vi.fn(async (args: ProvisionWorkspaceArgs) =>
createFakeWorkspace(getProvisionWorkspacePath(args)),
),
createRuntime: vi.fn(() => {
const runtime = createFakeRuntime();
runtimes.push(runtime);
return runtime;
}),
});

await manager.ensureEnvironment({
environmentId: "env-live",
workspacePath: "/tmp/env-live",
});
await manager.ensureEnvironment({
environmentId: "env-stale",
workspacePath: "/tmp/env-stale",
});

await manager.reconcileLiveEnvironments(new Set(["env-live"]));

expect(manager.get("env-live")).toBeDefined();
expect(manager.get("env-stale")).toBeUndefined();
expect(stopWatchingStatusLive).not.toHaveBeenCalled();
expect(stopWatchingStatusStale).toHaveBeenCalledTimes(1);
expect(manager.isEnvironmentDestroyed("env-stale")).toBe(true);
expect(manager.isEnvironmentDestroyed("env-live")).toBe(false);
expect(runtimes[0]?.shutdown).not.toHaveBeenCalled();
expect(runtimes[1]?.shutdown).toHaveBeenCalledTimes(1);
});

it("does not drop environments with active work during reconcile, even when not listed live", async () => {
const stopWatchingStatus = vi.fn(() => undefined);
const { hostWatcher } = createFakeHostWatcher({
watchWorkspaceImplementation: (_args) => stopWatchingStatus,
});
const manager = new RuntimeManager({
hostWatcher,
provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-busy"),
createRuntime: vi.fn(() => createFakeRuntime()),
});

await manager.ensureEnvironment({
environmentId: "env-busy",
workspacePath: "/tmp/env-busy",
});
manager.markThreadActive("env-busy", "thr-busy", "provider-thread-busy");

await manager.reconcileLiveEnvironments(new Set());

expect(manager.get("env-busy")).toBeDefined();
expect(stopWatchingStatus).not.toHaveBeenCalled();
expect(manager.isEnvironmentDestroyed("env-busy")).toBe(false);
});

it("installs the workspace status watcher once and reports workspace status changes", async () => {
const stopWatchingStatus = vi.fn(() => undefined);
let watchWorkspaceArgs: WatchWorkspaceArgs | undefined;
Expand Down
55 changes: 55 additions & 0 deletions apps/host-daemon/src/runtime-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ export class RuntimeManager {
private readonly baseShellEnv;
private readonly entries = new Map<string, RuntimeEntry>();
private readonly pendingEntries = new Map<string, Promise<RuntimeEntry>>();
// Environments this daemon has torn down. They must never be resurrected by a
// later workspace.* command, otherwise the daemon would re-provision a dead
// worktree and re-subscribe an FSEvents watcher against it on every poll.
private readonly destroyedEnvironmentIds = new Set<string>();
private readonly trackedThreadStorageTargets = new Map<
string,
ThreadStorageTarget
Expand Down Expand Up @@ -363,6 +367,10 @@ export class RuntimeManager {
return this.entries.get(environmentId);
}

isEnvironmentDestroyed(environmentId: string): boolean {
return this.destroyedEnvironmentIds.has(environmentId);
}

async getOrAwait(environmentId: string): Promise<RuntimeEntry | undefined> {
const existing = this.entries.get(environmentId);
if (existing) {
Expand Down Expand Up @@ -528,6 +536,11 @@ export class RuntimeManager {
}

async ensureEnvironment(args: EnsureEnvironmentArgs): Promise<RuntimeEntry> {
// Establishing a runtime for an environment means the server considers it
// live again (e.g. an explicit environment.provision). Lift any tombstone so
// the entry and its watcher are allowed to exist.
this.destroyedEnvironmentIds.delete(args.environmentId);

const existing = this.entries.get(args.environmentId);
if (existing) {
await this.applyExistingEnvironmentProvision({
Expand Down Expand Up @@ -575,6 +588,11 @@ export class RuntimeManager {
}

async destroyEnvironment(environmentId: string): Promise<void> {
// Tombstone first so the environment can never be resurrected by a later
// workspace.* command, even when there is currently no in-memory entry
// (e.g. a destroy retried after the runtime was already torn down).
this.destroyedEnvironmentIds.add(environmentId);

const existing = this.entries.get(environmentId);
const pending = this.pendingEntries.get(environmentId);
const entry = existing ?? (pending ? await pending : undefined);
Expand All @@ -591,6 +609,43 @@ export class RuntimeManager {
await entry.workspace.destroy();
}

/**
* Reconciles the in-memory runtime/watch set against the server's
* authoritative set of live (non-destroyed) environments, invoked on every
* session open. Drops the watcher and runtime for any idle environment the
* server no longer considers live — environments destroyed while this daemon
* was disconnected, whose environment.destroy command never reached it. Those
* stale entries are tombstoned so they cannot be resurrected. Environments
* with active threads or terminals are never dropped here; their lifecycle is
* owned by explicit stop/destroy commands, guarding against transient gaps in
* the live set.
*/
async reconcileLiveEnvironments(
liveEnvironmentIds: ReadonlySet<string>,
): Promise<void> {
const staleEntries = [...this.entries.values()].filter((entry) => {
if (liveEnvironmentIds.has(entry.environmentId)) {
return false;
}
const hasActiveThread = [...entry.threads.values()].some(
(thread) => thread.status === "active",
);
return !hasActiveThread && entry.terminals.size === 0;
});

for (const entry of staleEntries) {
this.destroyedEnvironmentIds.add(entry.environmentId);
this.entries.delete(entry.environmentId);
this.removeTrackedThreadStorageTargetsForEnvironment(entry.environmentId);
this.stopWatchingStatus(entry);
}
this.stopWatchingThreadStorageIfNoTrackedThreads();

await Promise.allSettled(
staleEntries.map((entry) => entry.runtime.shutdown()),
);
}

async evictIdleEnvironments(): Promise<string[]> {
// A pending environment creation is still active work. If we evict around
// it, the creation can resolve immediately after this sweep and resurrect
Expand Down
66 changes: 65 additions & 1 deletion apps/host-daemon/test/command/workspace-dispatch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,68 @@ describe("workspace command dispatch", () => {
]);
});

it("refuses to resurrect a destroyed environment for workspace.status", async () => {
const harness = createHarness({ workspacePath: "/tmp/env-destroyed" });

await dispatchCommand(
{
type: "environment.destroy",
environmentId: "env-destroyed",
workspaceContext: {
workspacePath: "/tmp/env-destroyed",
workspaceProvisionType: "unmanaged",
},
},
harness.dispatchOptions(),
);
expect(harness.workspaceState.destroyed).toBe(true);

const provisionsAfterDestroy = harness.provisions.length;
const statusReadsAfterDestroy = harness.workspaceState.statusReads;

await expect(
dispatchCommand(
{
type: "workspace.status",
environmentId: "env-destroyed",
workspaceContext: {
workspacePath: "/tmp/env-destroyed",
workspaceProvisionType: "unmanaged",
},
},
harness.dispatchOptions(),
),
).rejects.toMatchObject({ code: "environment_destroyed" });

// No resurrection: the destroyed environment is never re-provisioned and its
// workspace status is never read again, so no new watcher can be installed.
expect(harness.provisions.length).toBe(provisionsAfterDestroy);
expect(harness.workspaceState.statusReads).toBe(statusReadsAfterDestroy);
});

it("treats a repeated environment.destroy as idempotent success", async () => {
const harness = createHarness({
workspacePath: "/tmp/env-destroyed-twice",
});
const destroyCommand = {
type: "environment.destroy" as const,
environmentId: "env-destroyed-twice",
workspaceContext: {
workspacePath: "/tmp/env-destroyed-twice",
workspaceProvisionType: "unmanaged" as const,
},
};

await dispatchCommand(destroyCommand, harness.dispatchOptions());
const provisionsAfterFirstDestroy = harness.provisions.length;

await expect(
dispatchCommand(destroyCommand, harness.dispatchOptions()),
).resolves.toEqual({});
// The second destroy must not re-provision (resurrect) the workspace.
expect(harness.provisions.length).toBe(provisionsAfterFirstDestroy);
});

it("covers host.list_files", async () => {
const tempDir = await makeTempDir("bb-dispatch-host-list-files-");
await fs.writeFile(path.join(tempDir, "PREFERENCES.md"), "hello");
Expand Down Expand Up @@ -377,7 +439,9 @@ describe("workspace command dispatch", () => {
});

it("hides host.read_file_relative dotfiles when dotfiles are denied", async () => {
const tempDir = await makeTempDir("bb-dispatch-host-read-relative-dotfile-");
const tempDir = await makeTempDir(
"bb-dispatch-host-read-relative-dotfile-",
);
await fs.writeFile(path.join(tempDir, ".env"), "secret");
const harness = createHarness();

Expand Down
2 changes: 2 additions & 0 deletions apps/host-daemon/test/helpers/test-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ export interface CreateTestServerOptions {
heartbeatIntervalMs?: number;
interactiveRequestFailures?: number;
leaseTimeoutMs?: number;
liveEnvironmentIds?: string[];
requireTurnStartedForInteractiveRequests?: boolean;
requireTurnStartedForToolCalls?: boolean;
trackedThreadTargets?: HostDaemonTrackedThreadTarget[];
Expand Down Expand Up @@ -256,6 +257,7 @@ export async function createTestServer(
heartbeatIntervalMs: options.heartbeatIntervalMs ?? 25,
leaseTimeoutMs: options.leaseTimeoutMs ?? 1_000,
trackedThreadTargets: options.trackedThreadTargets ?? [],
liveEnvironmentIds: options.liveEnvironmentIds ?? [],
},
201,
);
Expand Down
Loading