diff --git a/apps/host-daemon/src/app.test.ts b/apps/host-daemon/src/app.test.ts index 60c1fd215..e10937ad1 100644 --- a/apps/host-daemon/src/app.test.ts +++ b/apps/host-daemon/src/app.test.ts @@ -132,6 +132,7 @@ function createFetchRecorder( heartbeatIntervalMs: 30000, leaseTimeoutMs: 90000, trackedThreadTargets: [], + liveEnvironmentIds: [], }, { status: 201 }, ); @@ -570,6 +571,7 @@ describe("createHostDaemonApp", () => { heartbeatIntervalMs: 30000, leaseTimeoutMs: 90000, trackedThreadTargets: [], + liveEnvironmentIds: [], }, { status: 201 }, ); diff --git a/apps/host-daemon/src/app.ts b/apps/host-daemon/src/app.ts index e62e55108..dc4445a14 100644 --- a/apps/host-daemon/src/app.ts +++ b/apps/host-daemon/src/app.ts @@ -717,6 +717,17 @@ export async function createHostDaemonApp( onTerminalMessage: (message) => terminalManager.handleMessage(message), onSessionOpened: (session) => { sessionState.value = session.sessionId; + void runtimeManager + .reconcileLiveEnvironments(new Set(session.liveEnvironmentIds)) + .catch((error) => { + options.logger.warn( + { + sessionId: session.sessionId, + ...runtimeErrorLogFields(error), + }, + "Failed to reconcile live environments after session opened", + ); + }); runtimeManager.replaceTrackedThreadStorageTargets( session.trackedThreadTargets, ); diff --git a/apps/host-daemon/src/command-dispatch-support.ts b/apps/host-daemon/src/command-dispatch-support.ts index 13335e559..37f72d3bb 100644 --- a/apps/host-daemon/src/command-dispatch-support.ts +++ b/apps/host-daemon/src/command-dispatch-support.ts @@ -220,6 +220,16 @@ export async function requireWorkspaceEnvironment( return existing; } + // A destroyed environment's worktree is gone. Reconnecting it here would + // re-provision a dead path and re-subscribe an FSEvents watcher on every + // poll, so refuse instead of resurrecting it. + if (runtimeManager.isEnvironmentDestroyed(args.environmentId)) { + throw new ExpectedCommandDispatchError( + "environment_destroyed", + `Environment ${args.environmentId} has been destroyed`, + ); + } + return runtimeManager.ensureEnvironment({ environmentId: args.environmentId, personalWorkspaceRoot: getPersonalWorkspaceRoot(args.dataDir), diff --git a/apps/host-daemon/src/command-dispatch.ts b/apps/host-daemon/src/command-dispatch.ts index f93ac13b7..c595c86f6 100644 --- a/apps/host-daemon/src/command-dispatch.ts +++ b/apps/host-daemon/src/command-dispatch.ts @@ -343,6 +343,14 @@ const commandHandlers: CommandHandlerMap = { if (error instanceof WorkspaceError && error.code === "path_not_found") { return {}; } + // The daemon already tore this environment down; a repeated destroy is a + // no-op success rather than a resurrection attempt. + if ( + error instanceof ExpectedCommandDispatchError && + error.code === "environment_destroyed" + ) { + return {}; + } throw error; } return {}; diff --git a/apps/host-daemon/src/runtime-manager.test.ts b/apps/host-daemon/src/runtime-manager.test.ts index 8d3de2092..91cb3402f 100644 --- a/apps/host-daemon/src/runtime-manager.test.ts +++ b/apps/host-daemon/src/runtime-manager.test.ts @@ -600,6 +600,165 @@ describe("RuntimeManager", () => { expect(workspace.destroy).toHaveBeenCalledTimes(1); }); + it("tombstones a destroyed environment so it is reported destroyed", async () => { + const stopWatchingStatus = vi.fn(() => undefined); + const { hostWatcher } = createFakeHostWatcher({ + watchWorkspaceImplementation: (_args) => stopWatchingStatus, + }); + const manager = new RuntimeManager({ + hostWatcher, + provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-1"), + createRuntime: vi.fn(() => createFakeRuntime()), + }); + + await manager.ensureEnvironment({ + environmentId: "env-1", + workspacePath: "/tmp/env-1", + }); + expect(manager.isEnvironmentDestroyed("env-1")).toBe(false); + + await manager.destroyEnvironment("env-1"); + + expect(manager.isEnvironmentDestroyed("env-1")).toBe(true); + expect(manager.get("env-1")).toBeUndefined(); + }); + + it("records a tombstone when destroying an environment with no live runtime", async () => { + const manager = new RuntimeManager({ + provisionWorkspace: createProvisionWorkspaceMock("/tmp/missing"), + createRuntime: vi.fn(() => createFakeRuntime()), + }); + + await manager.destroyEnvironment("env-missing"); + + expect(manager.isEnvironmentDestroyed("env-missing")).toBe(true); + }); + + it("clears the destroyed tombstone when an environment is explicitly re-provisioned", async () => { + const manager = new RuntimeManager({ + provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-1"), + createRuntime: vi.fn(() => createFakeRuntime()), + }); + + await manager.ensureEnvironment({ + environmentId: "env-1", + workspacePath: "/tmp/env-1", + }); + await manager.destroyEnvironment("env-1"); + expect(manager.isEnvironmentDestroyed("env-1")).toBe(true); + + await manager.ensureEnvironment({ + environmentId: "env-1", + workspacePath: "/tmp/env-1", + }); + + expect(manager.isEnvironmentDestroyed("env-1")).toBe(false); + expect(manager.get("env-1")).toBeDefined(); + }); + + it("reconciles live environments by dropping watchers and runtimes for ones no longer live", async () => { + const stopWatchingStatusLive = vi.fn(() => undefined); + const stopWatchingStatusStale = vi.fn(() => undefined); + const runtimes: AgentRuntime[] = []; + const { hostWatcher } = createFakeHostWatcher({ + watchWorkspaceImplementation: vi + .fn() + .mockImplementationOnce((_args) => stopWatchingStatusLive) + .mockImplementationOnce((_args) => stopWatchingStatusStale), + }); + const manager = new RuntimeManager({ + hostWatcher, + provisionWorkspace: vi.fn(async (args: ProvisionWorkspaceArgs) => + createFakeWorkspace(getProvisionWorkspacePath(args)), + ), + createRuntime: vi.fn(() => { + const runtime = createFakeRuntime(); + runtimes.push(runtime); + return runtime; + }), + }); + + await manager.ensureEnvironment({ + environmentId: "env-live", + workspacePath: "/tmp/env-live", + }); + await manager.ensureEnvironment({ + environmentId: "env-stale", + workspacePath: "/tmp/env-stale", + }); + + await manager.reconcileLiveEnvironments(new Set(["env-live"])); + + expect(manager.get("env-live")).toBeDefined(); + expect(manager.get("env-stale")).toBeUndefined(); + expect(stopWatchingStatusLive).not.toHaveBeenCalled(); + expect(stopWatchingStatusStale).toHaveBeenCalledTimes(1); + expect(manager.isEnvironmentDestroyed("env-stale")).toBe(true); + expect(manager.isEnvironmentDestroyed("env-live")).toBe(false); + expect(runtimes[0]?.shutdown).not.toHaveBeenCalled(); + expect(runtimes[1]?.shutdown).toHaveBeenCalledTimes(1); + }); + + it("does not drop environments with active work during reconcile, even when not listed live", async () => { + const stopWatchingStatus = vi.fn(() => undefined); + const { hostWatcher } = createFakeHostWatcher({ + watchWorkspaceImplementation: (_args) => stopWatchingStatus, + }); + const manager = new RuntimeManager({ + hostWatcher, + provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-busy"), + createRuntime: vi.fn(() => createFakeRuntime()), + }); + + await manager.ensureEnvironment({ + environmentId: "env-busy", + workspacePath: "/tmp/env-busy", + }); + manager.markThreadActive("env-busy", "thr-busy", "provider-thread-busy"); + + await manager.reconcileLiveEnvironments(new Set()); + + expect(manager.get("env-busy")).toBeDefined(); + expect(stopWatchingStatus).not.toHaveBeenCalled(); + expect(manager.isEnvironmentDestroyed("env-busy")).toBe(false); + }); + + it("lifts the tombstone for an environment the server reports live again during reconcile", async () => { + const stopWatchingStatus = vi.fn(() => undefined); + const { hostWatcher, watchWorkspace } = createFakeHostWatcher({ + watchWorkspaceImplementation: (_args) => stopWatchingStatus, + }); + const manager = new RuntimeManager({ + hostWatcher, + provisionWorkspace: createProvisionWorkspaceMock("/tmp/env-1"), + createRuntime: vi.fn(() => createFakeRuntime()), + }); + + await manager.ensureEnvironment({ + environmentId: "env-1", + workspacePath: "/tmp/env-1", + }); + await manager.destroyEnvironment("env-1"); + expect(manager.isEnvironmentDestroyed("env-1")).toBe(true); + expect(watchWorkspace).toHaveBeenCalledTimes(1); + + // A destroy whose teardown threw leaves the env tombstoned here while the + // server reverts it to ready; reconcile must heal it so it stops returning + // environment_destroyed for an env the server considers live. + await manager.reconcileLiveEnvironments(new Set(["env-1"])); + expect(manager.isEnvironmentDestroyed("env-1")).toBe(false); + + // Watchable again: a reconnecting workspace command re-provisions the env + // and installs a fresh watcher. + const entry = await manager.ensureEnvironment({ + environmentId: "env-1", + workspacePath: "/tmp/env-1", + }); + expect(entry.environmentId).toBe("env-1"); + expect(manager.get("env-1")).toBeDefined(); + expect(watchWorkspace).toHaveBeenCalledTimes(2); + }); + it("installs the workspace status watcher once and reports workspace status changes", async () => { const stopWatchingStatus = vi.fn(() => undefined); let watchWorkspaceArgs: WatchWorkspaceArgs | undefined; diff --git a/apps/host-daemon/src/runtime-manager.ts b/apps/host-daemon/src/runtime-manager.ts index d5ce9d0a7..42b44160b 100644 --- a/apps/host-daemon/src/runtime-manager.ts +++ b/apps/host-daemon/src/runtime-manager.ts @@ -218,6 +218,10 @@ export class RuntimeManager { private readonly baseShellEnv; private readonly entries = new Map(); private readonly pendingEntries = new Map>(); + // Environments this daemon has torn down. They must never be resurrected by a + // later workspace.* command, otherwise the daemon would re-provision a dead + // worktree and re-subscribe an FSEvents watcher against it on every poll. + private readonly destroyedEnvironmentIds = new Set(); private readonly trackedThreadStorageTargets = new Map< string, ThreadStorageTarget @@ -363,6 +367,10 @@ export class RuntimeManager { return this.entries.get(environmentId); } + isEnvironmentDestroyed(environmentId: string): boolean { + return this.destroyedEnvironmentIds.has(environmentId); + } + async getOrAwait(environmentId: string): Promise { const existing = this.entries.get(environmentId); if (existing) { @@ -528,6 +536,11 @@ export class RuntimeManager { } async ensureEnvironment(args: EnsureEnvironmentArgs): Promise { + // Establishing a runtime for an environment means the server considers it + // live again (e.g. an explicit environment.provision). Lift any tombstone so + // the entry and its watcher are allowed to exist. + this.destroyedEnvironmentIds.delete(args.environmentId); + const existing = this.entries.get(args.environmentId); if (existing) { await this.applyExistingEnvironmentProvision({ @@ -575,6 +588,11 @@ export class RuntimeManager { } async destroyEnvironment(environmentId: string): Promise { + // Tombstone first so the environment can never be resurrected by a later + // workspace.* command, even when there is currently no in-memory entry + // (e.g. a destroy retried after the runtime was already torn down). + this.destroyedEnvironmentIds.add(environmentId); + const existing = this.entries.get(environmentId); const pending = this.pendingEntries.get(environmentId); const entry = existing ?? (pending ? await pending : undefined); @@ -591,6 +609,55 @@ export class RuntimeManager { await entry.workspace.destroy(); } + /** + * Reconciles the in-memory runtime/watch set against the server's + * authoritative set of live (non-destroyed) environments, invoked on every + * session open. Drops the watcher and runtime for any idle environment the + * server no longer considers live — environments destroyed while this daemon + * was disconnected, whose environment.destroy command never reached it. Those + * stale entries are tombstoned so they cannot be resurrected. Environments + * with active threads or terminals are never dropped here; their lifecycle is + * owned by explicit stop/destroy commands, guarding against transient gaps in + * the live set. + * + * Conversely, any tombstoned environment the server now reports live is + * healed (its tombstone is lifted): a destroy whose teardown threw leaves the + * env tombstoned here while the server reverts it to ready, which would + * otherwise wedge every workspace.* command on `environment_destroyed` until + * a thread/terminal happened to re-provision it. + */ + async reconcileLiveEnvironments( + liveEnvironmentIds: ReadonlySet, + ): Promise { + for (const environmentId of [...this.destroyedEnvironmentIds]) { + if (liveEnvironmentIds.has(environmentId)) { + this.destroyedEnvironmentIds.delete(environmentId); + } + } + + const staleEntries = [...this.entries.values()].filter((entry) => { + if (liveEnvironmentIds.has(entry.environmentId)) { + return false; + } + const hasActiveThread = [...entry.threads.values()].some( + (thread) => thread.status === "active", + ); + return !hasActiveThread && entry.terminals.size === 0; + }); + + for (const entry of staleEntries) { + this.destroyedEnvironmentIds.add(entry.environmentId); + this.entries.delete(entry.environmentId); + this.removeTrackedThreadStorageTargetsForEnvironment(entry.environmentId); + this.stopWatchingStatus(entry); + } + this.stopWatchingThreadStorageIfNoTrackedThreads(); + + await Promise.allSettled( + staleEntries.map((entry) => entry.runtime.shutdown()), + ); + } + async evictIdleEnvironments(): Promise { // A pending environment creation is still active work. If we evict around // it, the creation can resolve immediately after this sweep and resurrect diff --git a/apps/host-daemon/test/command/workspace-dispatch.test.ts b/apps/host-daemon/test/command/workspace-dispatch.test.ts index bb175264d..5ef9351c5 100644 --- a/apps/host-daemon/test/command/workspace-dispatch.test.ts +++ b/apps/host-daemon/test/command/workspace-dispatch.test.ts @@ -108,6 +108,68 @@ describe("workspace command dispatch", () => { ]); }); + it("refuses to resurrect a destroyed environment for workspace.status", async () => { + const harness = createHarness({ workspacePath: "/tmp/env-destroyed" }); + + await dispatchCommand( + { + type: "environment.destroy", + environmentId: "env-destroyed", + workspaceContext: { + workspacePath: "/tmp/env-destroyed", + workspaceProvisionType: "unmanaged", + }, + }, + harness.dispatchOptions(), + ); + expect(harness.workspaceState.destroyed).toBe(true); + + const provisionsAfterDestroy = harness.provisions.length; + const statusReadsAfterDestroy = harness.workspaceState.statusReads; + + await expect( + dispatchCommand( + { + type: "workspace.status", + environmentId: "env-destroyed", + workspaceContext: { + workspacePath: "/tmp/env-destroyed", + workspaceProvisionType: "unmanaged", + }, + }, + harness.dispatchOptions(), + ), + ).rejects.toMatchObject({ code: "environment_destroyed" }); + + // No resurrection: the destroyed environment is never re-provisioned and its + // workspace status is never read again, so no new watcher can be installed. + expect(harness.provisions.length).toBe(provisionsAfterDestroy); + expect(harness.workspaceState.statusReads).toBe(statusReadsAfterDestroy); + }); + + it("treats a repeated environment.destroy as idempotent success", async () => { + const harness = createHarness({ + workspacePath: "/tmp/env-destroyed-twice", + }); + const destroyCommand = { + type: "environment.destroy" as const, + environmentId: "env-destroyed-twice", + workspaceContext: { + workspacePath: "/tmp/env-destroyed-twice", + workspaceProvisionType: "unmanaged" as const, + }, + }; + + await dispatchCommand(destroyCommand, harness.dispatchOptions()); + const provisionsAfterFirstDestroy = harness.provisions.length; + + await expect( + dispatchCommand(destroyCommand, harness.dispatchOptions()), + ).resolves.toEqual({}); + // The second destroy must not re-provision (resurrect) the workspace. + expect(harness.provisions.length).toBe(provisionsAfterFirstDestroy); + }); + it("covers host.list_files", async () => { const tempDir = await makeTempDir("bb-dispatch-host-list-files-"); await fs.writeFile(path.join(tempDir, "PREFERENCES.md"), "hello"); @@ -377,7 +439,9 @@ describe("workspace command dispatch", () => { }); it("hides host.read_file_relative dotfiles when dotfiles are denied", async () => { - const tempDir = await makeTempDir("bb-dispatch-host-read-relative-dotfile-"); + const tempDir = await makeTempDir( + "bb-dispatch-host-read-relative-dotfile-", + ); await fs.writeFile(path.join(tempDir, ".env"), "secret"); const harness = createHarness(); diff --git a/apps/host-daemon/test/helpers/test-server.ts b/apps/host-daemon/test/helpers/test-server.ts index 0b0df897c..d80e353d2 100644 --- a/apps/host-daemon/test/helpers/test-server.ts +++ b/apps/host-daemon/test/helpers/test-server.ts @@ -92,6 +92,7 @@ export interface CreateTestServerOptions { heartbeatIntervalMs?: number; interactiveRequestFailures?: number; leaseTimeoutMs?: number; + liveEnvironmentIds?: string[]; requireTurnStartedForInteractiveRequests?: boolean; requireTurnStartedForToolCalls?: boolean; trackedThreadTargets?: HostDaemonTrackedThreadTarget[]; @@ -256,6 +257,7 @@ export async function createTestServer( heartbeatIntervalMs: options.heartbeatIntervalMs ?? 25, leaseTimeoutMs: options.leaseTimeoutMs ?? 1_000, trackedThreadTargets: options.trackedThreadTargets ?? [], + liveEnvironmentIds: options.liveEnvironmentIds ?? [], }, 201, ); diff --git a/apps/server/src/internal/session.ts b/apps/server/src/internal/session.ts index 4906f9fce..4eaece9db 100644 --- a/apps/server/src/internal/session.ts +++ b/apps/server/src/internal/session.ts @@ -1,5 +1,6 @@ import { getActiveSession, + listLiveEnvironmentIdsOnHost, listThreadEnvironmentAssignmentsOnHost, openSession, upsertHost, @@ -108,12 +109,17 @@ export function registerInternalSessionRoutes(app: Hono, deps: AppDeps): void { threadId: target.threadId, })); + const liveEnvironmentIds = listLiveEnvironmentIdsOnHost(deps.db, { + hostId: daemon.hostId, + }); + return context.json( { sessionId: session.id, heartbeatIntervalMs: HEARTBEAT_INTERVAL_MS, leaseTimeoutMs: LEASE_TIMEOUT_MS, trackedThreadTargets, + liveEnvironmentIds, }, 201, ); diff --git a/apps/server/test/internal/internal-session-correctness.test.ts b/apps/server/test/internal/internal-session-correctness.test.ts index de8d98d45..f2a4e6881 100644 --- a/apps/server/test/internal/internal-session-correctness.test.ts +++ b/apps/server/test/internal/internal-session-correctness.test.ts @@ -13,6 +13,7 @@ import { buildHostDaemonWebSocketAuthorizationHeader, buildHostDaemonWebSocketProtocols, createHostDaemonClient, + hostDaemonSessionOpenResponseSchema, } from "@bb/host-daemon-contract"; import { describe, expect, it, vi } from "vitest"; import { ApiError } from "../../src/errors.js"; @@ -791,6 +792,56 @@ describe("internal session correctness", () => { } }); + it("reports only non-destroyed environments as live on session open", async () => { + const harness = await createTestAppHarness(); + try { + const { host } = seedHostSession(harness.deps, { + id: "host-live-environments", + }); + const { project } = seedProjectWithSource(harness.deps, { + hostId: host.id, + }); + const liveEnvironment = seedEnvironment(harness.deps, { + hostId: host.id, + projectId: project.id, + path: "/tmp/live-environment", + status: "ready", + }); + const destroyedEnvironment = seedEnvironment(harness.deps, { + hostId: host.id, + projectId: project.id, + path: "/tmp/destroyed-environment", + status: "destroyed", + }); + + const response = await harness.app.request("/internal/session/open", { + method: "POST", + headers: internalAuthHeaders(harness, { + hostId: host.id, + hostType: host.type, + }), + body: JSON.stringify({ + hostId: host.id, + instanceId: "instance-live-environments", + hostName: host.name, + hostType: host.type, + dataDir: "/tmp/host-daemon-live-environments", + protocolVersion: HOST_DAEMON_PROTOCOL_VERSION, + activeThreads: [], + }), + }); + + expect(response.status).toBe(201); + const body = hostDaemonSessionOpenResponseSchema.parse( + await readJson(response), + ); + expect(body.liveEnvironmentIds).toContain(liveEnvironment.id); + expect(body.liveEnvironmentIds).not.toContain(destroyedEnvironment.id); + } finally { + await harness.cleanup(); + } + }); + it("interrupts pending interactions when a replacement daemon session has a new instance id", async () => { const harness = await createTestAppHarness(); try { diff --git a/packages/db/src/data/environments.ts b/packages/db/src/data/environments.ts index 792a9f140..73d5c24eb 100644 --- a/packages/db/src/data/environments.ts +++ b/packages/db/src/data/environments.ts @@ -1,4 +1,4 @@ -import { and, eq, inArray } from "drizzle-orm"; +import { and, eq, inArray, ne } from "drizzle-orm"; import type { DiscoveredWorkspaceProperties, EnvironmentChangeKind, @@ -96,6 +96,33 @@ export function listEnvironments(db: DbConnection, projectId?: string) { return db.select().from(environments).all(); } +export interface ListLiveEnvironmentIdsOnHostArgs { + hostId: string; +} + +/** + * Returns the ids of every non-destroyed environment owned by a host. The host + * daemon uses this on session open to reconcile its in-memory watch/runtime set + * against the server's authoritative liveness, dropping watchers for + * environments that were destroyed while the daemon was disconnected. + */ +export function listLiveEnvironmentIdsOnHost( + db: EnvironmentReadConnection, + args: ListLiveEnvironmentIdsOnHostArgs, +): string[] { + return db + .select({ id: environments.id }) + .from(environments) + .where( + and( + eq(environments.hostId, args.hostId), + ne(environments.status, "destroyed"), + ), + ) + .all() + .map((row) => row.id); +} + export function listEnvironmentsByIds( db: DbConnection, environmentIds: readonly string[], diff --git a/packages/db/src/data/index.ts b/packages/db/src/data/index.ts index 234a507b5..234826f0b 100644 --- a/packages/db/src/data/index.ts +++ b/packages/db/src/data/index.ts @@ -207,11 +207,13 @@ export { findEnvironmentByHostPath, listEnvironments, listEnvironmentsByIds, + listLiveEnvironmentIdsOnHost, updateEnvironmentMetadata, deleteEnvironment, } from "./environments.js"; export type { CreateEnvironmentInput, + ListLiveEnvironmentIdsOnHostArgs, UpdateEnvironmentMetadataInput, } from "./environments.js"; diff --git a/packages/host-daemon-contract/src/session.ts b/packages/host-daemon-contract/src/session.ts index 20ac2eb74..76d46943a 100644 --- a/packages/host-daemon-contract/src/session.ts +++ b/packages/host-daemon-contract/src/session.ts @@ -83,6 +83,12 @@ export const hostDaemonSessionOpenResponseSchema = z heartbeatIntervalMs: z.number().int().positive(), leaseTimeoutMs: z.number().int().positive(), trackedThreadTargets: z.array(hostDaemonTrackedThreadTargetSchema), + // Ids of every non-destroyed environment the server owns for this host. The + // daemon reconciles its in-memory watch/runtime set against this list on + // session open, dropping watchers for environments destroyed while it was + // disconnected so it stops re-subscribing FSEvents watchers against dead + // worktrees. + liveEnvironmentIds: z.array(z.string().min(1)), }) .strict(); export type HostDaemonSessionOpenResponse = z.infer< diff --git a/packages/host-daemon-contract/test/contract.test.ts b/packages/host-daemon-contract/test/contract.test.ts index 09694488b..9338828c9 100644 --- a/packages/host-daemon-contract/test/contract.test.ts +++ b/packages/host-daemon-contract/test/contract.test.ts @@ -1398,6 +1398,7 @@ describe("host-daemon session schemas", () => { threadId: "thr_123", }, ], + liveEnvironmentIds: ["env_123"], }), ).toMatchObject({ sessionId: "session_123", @@ -1407,6 +1408,7 @@ describe("host-daemon session schemas", () => { threadId: "thr_123", }, ], + liveEnvironmentIds: ["env_123"], }); expect(() => @@ -1415,6 +1417,7 @@ describe("host-daemon session schemas", () => { heartbeatIntervalMs: 5_000, leaseTimeoutMs: 30_000, trackedThreadTargets: [], + liveEnvironmentIds: [], threadHighWaterMarks: { thr_123: 10 }, }), ).toThrow(); diff --git a/packages/host-watcher/src/workspace-status-watcher.ts b/packages/host-watcher/src/workspace-status-watcher.ts index a1b1dc45f..8f496c896 100644 --- a/packages/host-watcher/src/workspace-status-watcher.ts +++ b/packages/host-watcher/src/workspace-status-watcher.ts @@ -20,8 +20,11 @@ const WORKSPACE_STATUS_WATCH_DEBOUNCE_MS = 75; const WORKSPACE_STATUS_WATCH_MAX_WAIT_MS = 500; const WORKSPACE_STATUS_WATCH_RETRY_DELAY_MS = 250; const WORKSPACE_STATUS_WATCH_MAX_RETRY_DELAY_MS = 30_000; -const FSEVENTS_DROPPED_EVENTS_RESCAN_MESSAGE = - "File system must be re-scanned"; +// A subscription that never succeeds (e.g. a removed worktree whose path is +// gone for good) must not be retried forever. Give up after a bounded number of +// attempts; the daemon owns re-establishing watches for live environments. +const WORKSPACE_STATUS_WATCH_MAX_RETRY_ATTEMPTS = 60; +const FSEVENTS_DROPPED_EVENTS_RESCAN_MESSAGE = "File system must be re-scanned"; type ParcelWatcherSubscribe = typeof parcelWatcher.subscribe; type ParcelWatcherCallback = Parameters[1]; @@ -34,6 +37,7 @@ type ParcelWatcherEventBatch = Parameters[1]; interface WorkspaceStatusWatcherArgs extends WorkspaceStatusWatchArgs { cwd: string; debounceMs: number; + maxRetryAttempts: number; maxRetryDelayMs: number; maxWaitMs: number; retryDelayMs: number; @@ -199,6 +203,11 @@ export class WorkspaceStatusWatcher { if (this.disposed || this.metadataStartRetryTimer !== null) { return; } + if (this.metadataRetryAttempt >= this.args.maxRetryAttempts) { + // Bounded retry: stop probing for git metadata once the attempt budget is + // exhausted instead of looping forever against a path that never resolves. + return; + } this.metadataRetryAttempt += 1; this.metadataStartRetryTimer = setTimeout( () => { @@ -222,6 +231,18 @@ export class WorkspaceStatusWatcher { return; } const retryAttempt = (this.retryAttempts.get(spec.rootPath) ?? 0) + 1; + if (retryAttempt > this.args.maxRetryAttempts) { + // Bounded retry: a spec that never becomes watchable (e.g. a worktree that + // was removed for good) is abandoned instead of retried forever. Report + // once (deduped per root path) so the give-up is observable. + this.reportWatchError( + spec, + new Error( + `Workspace status watch gave up after ${this.args.maxRetryAttempts} attempts: ${spec.rootPath}`, + ), + ); + return; + } this.retryAttempts.set(spec.rootPath, retryAttempt); const retryTimer = setTimeout( () => { @@ -384,6 +405,7 @@ export function createWorkspaceStatusWatcher( return new WorkspaceStatusWatcher({ cwd: args.cwd, debounceMs: WORKSPACE_STATUS_WATCH_DEBOUNCE_MS, + maxRetryAttempts: WORKSPACE_STATUS_WATCH_MAX_RETRY_ATTEMPTS, maxRetryDelayMs: WORKSPACE_STATUS_WATCH_MAX_RETRY_DELAY_MS, maxWaitMs: WORKSPACE_STATUS_WATCH_MAX_WAIT_MS, onChange: args.onChange, diff --git a/packages/host-watcher/test/watch-status.test.ts b/packages/host-watcher/test/watch-status.test.ts index da6bc0efc..a79e1a55a 100644 --- a/packages/host-watcher/test/watch-status.test.ts +++ b/packages/host-watcher/test/watch-status.test.ts @@ -8,6 +8,7 @@ import parcelWatcher from "@parcel/watcher"; import { afterEach, describe, expect, it, vi } from "vitest"; import { createDeferredPromise } from "@bb/test-helpers"; import { watchWorkspaceStatus as watchWorkspaceStatusImpl } from "../src/watch-status.js"; +import { WorkspaceStatusWatcher } from "../src/workspace-status-watcher.js"; type WatchWorkspaceStatus = typeof watchWorkspaceStatusImpl; type WatchWorkspaceStatusArgs = Parameters[1]; @@ -434,11 +435,7 @@ describe.sequential("watchWorkspaceStatus", () => { const firstEventAt = Date.now(); const eventIntervalMs = 60; const earlyFlushGuardMs = 180; - for ( - let elapsedMs = 0; - elapsedMs < 600; - elapsedMs += eventIntervalMs - ) { + for (let elapsedMs = 0; elapsedMs < 600; elapsedMs += eventIntervalMs) { emitWorkspaceRootEvents([ { path: path.join(repoPath, "README.md"), @@ -488,11 +485,7 @@ describe.sequential("watchWorkspaceStatus", () => { type: "update", }, ]); - await waitForCallCount( - () => callbackCount, - 1, - WATCH_TEST_TIMEOUT_MS, - ); + await waitForCallCount(() => callbackCount, 1, WATCH_TEST_TIMEOUT_MS); expect(callbackCount).toBe(1); expect(watchErrors).toHaveLength(1); expect(watchErrors[0]).toContain(repoPath); @@ -506,11 +499,7 @@ describe.sequential("watchWorkspaceStatus", () => { type: "update", }, ]); - await waitForCallCount( - () => callbackCount, - 2, - WATCH_TEST_TIMEOUT_MS, - ); + await waitForCallCount(() => callbackCount, 2, WATCH_TEST_TIMEOUT_MS); expect(callbackCount).toBe(2); expect(watchErrors).toHaveLength(1); } finally { @@ -847,6 +836,56 @@ describe.sequential("watchWorkspaceStatus", () => { } }); + it("gives up workspace subscriptions after a bounded number of retries", async () => { + const repoPath = await initRepo(); + const workspaceRootPath = normalizeWatchPath(repoPath); + let workspaceRootSubscribeCount = 0; + vi.spyOn(parcelWatcher, "subscribe").mockImplementation( + async (...watchArgs: ParcelWatcherSubscribeArgs) => { + if (isWorkspaceRootSubscription(watchArgs, repoPath)) { + workspaceRootSubscribeCount += 1; + } + throw new Error("workspace subscription unavailable"); + }, + ); + + const maxRetryAttempts = 3; + const watchErrors: string[] = []; + const watcher = new WorkspaceStatusWatcher({ + cwd: repoPath, + debounceMs: 10, + maxRetryAttempts, + maxRetryDelayMs: 20, + maxWaitMs: 50, + retryDelayMs: 5, + onChange: () => undefined, + onWatchError: (error) => { + if (normalizeWatchPath(error.rootPath) === workspaceRootPath) { + watchErrors.push(error.message); + } + }, + }); + watcher.start(); + + try { + await waitForCallCount( + () => workspaceRootSubscribeCount, + maxRetryAttempts + 1, + WATCH_TEST_TIMEOUT_MS, + ); + // Bounded retry: after the attempt budget is exhausted the watcher must + // stop re-subscribing instead of looping forever against a dead path. + await ensureCallCountStays( + () => workspaceRootSubscribeCount, + maxRetryAttempts + 1, + ); + expect(workspaceRootSubscribeCount).toBe(maxRetryAttempts + 1); + expect(watchErrors.length).toBeGreaterThanOrEqual(1); + } finally { + watcher.dispose(); + } + }); + it("ignores shared common-dir index updates for detached worktree environments", async () => { const repoPath = await initRepo(); const worktreePath = await addDetachedWorktree(repoPath);