diff --git a/packages/opencode/src/cli/bootstrap.ts b/packages/opencode/src/cli/bootstrap.ts index 2308c29199b..562221c6470 100644 --- a/packages/opencode/src/cli/bootstrap.ts +++ b/packages/opencode/src/cli/bootstrap.ts @@ -1,10 +1,9 @@ import { InstanceRuntime } from "../project/instance-runtime" -import { context } from "../project/instance-context" export async function bootstrap(directory: string, cb: () => Promise) { const ctx = await InstanceRuntime.load({ directory }) try { - return await context.provide(ctx, cb) + return await cb() } finally { await InstanceRuntime.disposeInstance(ctx) } diff --git a/packages/opencode/src/project/instance-context.ts b/packages/opencode/src/project/instance-context.ts index b281f492d4d..3d33454ae07 100644 --- a/packages/opencode/src/project/instance-context.ts +++ b/packages/opencode/src/project/instance-context.ts @@ -1,4 +1,3 @@ -import { LocalContext } from "@/util/local-context" import { AppFileSystem } from "@opencode-ai/core/filesystem" import type * as Project from "./project" @@ -8,8 +7,6 @@ export interface InstanceContext { project: Project.Info } -export const context = LocalContext.create("instance") - /** * Check if a path is within the project boundary. * Returns true if path is inside ctx.directory OR ctx.worktree. diff --git a/packages/opencode/test/control-plane/workspace.test.ts b/packages/opencode/test/control-plane/workspace.test.ts index 26784592fbd..234a19d935b 100644 --- a/packages/opencode/test/control-plane/workspace.test.ts +++ b/packages/opencode/test/control-plane/workspace.test.ts @@ -3,9 +3,8 @@ import { $ } from "bun" import fs from "node:fs/promises" import Http from "node:http" import path from "node:path" -import { setTimeout as delay } from "node:timers/promises" import { NodeHttpServer } from "@effect/platform-node" -import { Effect, Layer, Schema } from "effect" +import { Cause, Effect, Exit, Layer, Schema } from "effect" import { FetchHttpClient, HttpServer, HttpServerRequest, HttpServerResponse } from "effect/unstable/http" import { eq } from "drizzle-orm" import { AppFileSystem } from "@opencode-ai/core/filesystem" @@ -14,7 +13,6 @@ import { GlobalBus, type GlobalEvent } from "@/bus/global" import { Database } from "@/storage/db" import { ProjectID } from "@/project/schema" import { ProjectTable } from "@/project/project.sql" -import { context, type InstanceContext } from "@/project/instance-context" import { InstanceRef } from "@/effect/instance-ref" import { Session as SessionNs } from "@/session/session" import { SessionID } from "@/session/schema" @@ -22,14 +20,13 @@ import { SessionTable } from "@/session/session.sql" import { SyncEvent } from "@/sync" import { EventSequenceTable } from "@/sync/event.sql" import { resetDatabase } from "../fixture/db" -import { disposeAllInstances, provideTmpdirInstance, TestInstance, tmpdir } from "../fixture/fixture" +import { disposeAllInstances, provideTmpdirInstance, TestInstance, withTestInstance } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { registerAdapter } from "../../src/control-plane/adapters" import { WorkspaceID } from "../../src/control-plane/schema" import { WorkspaceTable } from "../../src/control-plane/workspace.sql" import type { Target, WorkspaceAdapter, WorkspaceInfo } from "../../src/control-plane/types" import * as Workspace from "../../src/control-plane/workspace" -import { AppRuntime } from "@/effect/app-runtime" import { InstanceStore } from "@/project/instance-store" import { InstanceBootstrap } from "@/project/bootstrap" import { Auth } from "@/auth" @@ -121,12 +118,6 @@ afterEach(async () => { await resetDatabase() }) -async function withInstance(fn: (ctx: InstanceContext) => T | Promise) { - await using tmp = await tmpdir({ git: true }) - const ctx = await AppRuntime.runPromise(InstanceStore.Service.use((store) => store.load({ directory: tmp.path }))) - return await context.provide(ctx, () => fn(ctx)) -} - async function initGitRepo(dir: string) { await fs.mkdir(dir, { recursive: true }) await $`git init`.cwd(dir).quiet() @@ -139,34 +130,22 @@ async function initGitRepo(dir: string) { await $`git commit -m "base"`.cwd(dir).quiet() } -function currentInstance() { - try { - return context.use() - } catch { - return undefined - } -} +const requireInstance = Effect.gen(function* () { + const instance = yield* InstanceRef + if (!instance) return yield* Effect.die(new Error("missing test instance")) + return instance +}) -const runWorkspace = (effect: Effect.Effect) => { - const ctx = currentInstance() - return AppRuntime.runPromise(ctx ? effect.pipe(Effect.provideService(InstanceRef, ctx)) : effect) -} -const createWorkspace = (input: Workspace.CreateInput) => - runWorkspace(Workspace.Service.use((workspace) => workspace.create(input))) +const createWorkspace = (input: Workspace.CreateInput) => Workspace.Service.use((workspace) => workspace.create(input)) const warpWorkspaceSession = (input: Workspace.SessionWarpInput) => - runWorkspace(Workspace.Service.use((workspace) => workspace.sessionWarp(input))) + Workspace.Service.use((workspace) => workspace.sessionWarp(input)) const listWorkspaces = (project: Parameters[0]) => - runWorkspace(Workspace.Service.use((workspace) => workspace.list(project))) + Workspace.Service.use((workspace) => workspace.list(project)) const syncListWorkspaces = (project: Parameters[0]) => - runWorkspace(Workspace.Service.use((workspace) => workspace.syncList(project))) -const getWorkspace = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.get(id))) -const removeWorkspace = (id: WorkspaceID) => runWorkspace(Workspace.Service.use((workspace) => workspace.remove(id))) -const workspaceStatus = () => runWorkspace(Workspace.Service.use((workspace) => workspace.status())) -const isWorkspaceSyncing = (id: WorkspaceID) => - runWorkspace(Workspace.Service.use((workspace) => workspace.isSyncing(id))) -const startWorkspaceSyncing = (projectID: ProjectID) => { - void runWorkspace(Workspace.Service.use((workspace) => workspace.startWorkspaceSyncing(projectID))) -} + Workspace.Service.use((workspace) => workspace.syncList(project)) +const getWorkspace = (id: WorkspaceID) => Workspace.Service.use((workspace) => workspace.get(id)) +const removeWorkspace = (id: WorkspaceID) => Workspace.Service.use((workspace) => workspace.remove(id)) +const workspaceStatus = () => Workspace.Service.use((workspace) => workspace.status()) const startWorkspaceSyncingWithFlag = (projectID: ProjectID, experimentalWorkspaces: boolean) => Effect.runPromise( Workspace.Service.use((workspace) => workspace.startWorkspaceSyncing(projectID)).pipe( @@ -174,7 +153,14 @@ const startWorkspaceSyncingWithFlag = (projectID: ProjectID, experimentalWorkspa ), ) const waitForWorkspaceSync = (workspaceID: WorkspaceID, state: Record, signal?: AbortSignal) => - runWorkspace(Workspace.Service.use((workspace) => workspace.waitForSync(workspaceID, state, signal))) + Workspace.Service.use((workspace) => workspace.waitForSync(workspaceID, state, signal)) + +const expectEffectFailure = (effect: Effect.Effect, message: string) => + Effect.gen(function* () { + const exit = yield* Effect.exit(effect) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) expect(Cause.pretty(exit.cause)).toContain(message) + }) function captureGlobalEvents() { const events: GlobalEvent[] = [] @@ -188,20 +174,6 @@ function captureGlobalEvents() { } } -async function eventually(fn: () => T | Promise, timeout = 1500) { - const started = Date.now() - let last: unknown - while (Date.now() - started < timeout) { - try { - return await fn() - } catch (err) { - last = err - await delay(10) - } - } - throw last ?? new Error("Timed out waiting for condition") -} - function eventuallyEffect(effect: Effect.Effect, timeout = 1500) { return Effect.gen(function* () { const started = Date.now() @@ -417,289 +389,322 @@ describe("workspace schemas and exports", () => { }) describe("workspace CRUD", () => { - test("get returns undefined for a missing workspace", async () => { - await withInstance(async () => { - expect(await getWorkspace(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined() - }) - }) + it.instance( + "get returns undefined for a missing workspace", + () => + Effect.gen(function* () { + expect(yield* getWorkspace(WorkspaceID.ascending("wrk_missing_get"))).toBeUndefined() + }), + { git: true }, + ) - test("list maps database rows, filters by project, and sorts by id", async () => { - await withInstance(async (instance) => { - const otherProjectID = ProjectID.make("project-other") - insertProject(otherProjectID, "/tmp/other") - const a = workspaceInfo(instance.project.id, "manual", { - id: WorkspaceID.ascending("wrk_a_list"), - branch: "a", - directory: "/a", - extra: { a: true }, - }) - const b = workspaceInfo(instance.project.id, "manual", { - id: WorkspaceID.ascending("wrk_b_list"), - branch: "b", - directory: "/b", - extra: ["b"], - }) - const other = workspaceInfo(otherProjectID, "manual", { id: WorkspaceID.ascending("wrk_c_list") }) - insertWorkspace(b) - insertWorkspace(other) - insertWorkspace(a) + it.instance( + "list maps database rows, filters by project, and sorts by id", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const otherProjectID = ProjectID.make("project-other") + insertProject(otherProjectID, "/tmp/other") + const a = workspaceInfo(instance.project.id, "manual", { + id: WorkspaceID.ascending("wrk_a_list"), + branch: "a", + directory: "/a", + extra: { a: true }, + }) + const b = workspaceInfo(instance.project.id, "manual", { + id: WorkspaceID.ascending("wrk_b_list"), + branch: "b", + directory: "/b", + extra: ["b"], + }) + const other = workspaceInfo(otherProjectID, "manual", { id: WorkspaceID.ascending("wrk_c_list") }) + insertWorkspace(b) + insertWorkspace(other) + insertWorkspace(a) - expect(await listWorkspaces(instance.project)).toEqual([a, b]) - }) - }) + expect(yield* listWorkspaces(instance.project)).toEqual([a, b]) + }), + { git: true }, + ) - test("create configures, persists, creates, starts local sync, and passes environment", async () => { - await withInstance(async (instance) => { - process.env.OPENCODE_AUTH_CONTENT = JSON.stringify({ test: { type: "api", key: "secret" } }) - process.env.OTEL_EXPORTER_OTLP_HEADERS = "authorization=otel" - process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "https://otel.test" - process.env.OTEL_RESOURCE_ATTRIBUTES = "service.name=opencode-test" - - const workspaceID = WorkspaceID.ascending("wrk_create_local") - const type = unique("create-local") - const targetDir = path.join(instance.directory, "created-local") - const recorded = recordedAdapter({ - configure(info) { - return { - ...info, - branch: "configured-branch", - name: "Configured Name", - directory: targetDir, - extra: { configured: true }, - } - }, - async create() { - await fs.mkdir(targetDir, { recursive: true }) - }, - target() { - return { type: "local", directory: targetDir } - }, - }) - registerAdapter(instance.project.id, type, recorded.adapter) - - const info = await createWorkspace({ - id: workspaceID, - type, - branch: null, - projectID: instance.project.id, - extra: null, - }) + it.instance( + "create configures, persists, creates, starts local sync, and passes environment", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + process.env.OPENCODE_AUTH_CONTENT = JSON.stringify({ test: { type: "api", key: "secret" } }) + process.env.OTEL_EXPORTER_OTLP_HEADERS = "authorization=otel" + process.env.OTEL_EXPORTER_OTLP_ENDPOINT = "https://otel.test" + process.env.OTEL_RESOURCE_ATTRIBUTES = "service.name=opencode-test" + + const workspaceID = WorkspaceID.ascending("wrk_create_local") + const type = unique("create-local") + const targetDir = path.join(instance.directory, "created-local") + const recorded = recordedAdapter({ + configure(info) { + return { + ...info, + branch: "configured-branch", + name: "Configured Name", + directory: targetDir, + extra: { configured: true }, + } + }, + async create() { + await fs.mkdir(targetDir, { recursive: true }) + }, + target() { + return { type: "local", directory: targetDir } + }, + }) + registerAdapter(instance.project.id, type, recorded.adapter) - expect(info).toEqual({ - id: workspaceID, - type, - branch: "configured-branch", - name: "Configured Name", - directory: targetDir, - extra: { configured: true }, - projectID: instance.project.id, - timeUsed: info.timeUsed, - }) - expect(await getWorkspace(workspaceID)).toEqual(info) - expect(await listWorkspaces(instance.project)).toEqual([info]) - expect(recorded.calls.configure).toHaveLength(1) - expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null }) - expect(recorded.calls.create).toHaveLength(1) - expect(recorded.calls.create[0].info).toEqual({ - id: workspaceID, - type, - branch: "configured-branch", - name: "Configured Name", - directory: targetDir, - extra: { configured: true }, - projectID: instance.project.id, - }) - expect(JSON.parse(recorded.calls.create[0].env.OPENCODE_AUTH_CONTENT ?? "{}")).toEqual({ - test: { type: "api", key: "secret" }, - }) - expect(recorded.calls.create[0].env.OPENCODE_WORKSPACE_ID).toBe(workspaceID) - expect(recorded.calls.create[0].env.OPENCODE_EXPERIMENTAL_WORKSPACES).toBe("true") - expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel") - expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test") - expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test") - expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBe("connected") - - await removeWorkspace(workspaceID) - expect((await workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined() - }) - }) + const info = yield* createWorkspace({ + id: workspaceID, + type, + branch: null, + projectID: instance.project.id, + extra: null, + }) + + expect(info).toEqual({ + id: workspaceID, + type, + branch: "configured-branch", + name: "Configured Name", + directory: targetDir, + extra: { configured: true }, + projectID: instance.project.id, + timeUsed: info.timeUsed, + }) + expect(yield* getWorkspace(workspaceID)).toEqual(info) + expect(yield* listWorkspaces(instance.project)).toEqual([info]) + expect(recorded.calls.configure).toHaveLength(1) + expect(recorded.calls.configure[0]).toMatchObject({ id: workspaceID, type, directory: null }) + expect(recorded.calls.create).toHaveLength(1) + expect(recorded.calls.create[0].info).toEqual({ + id: workspaceID, + type, + branch: "configured-branch", + name: "Configured Name", + directory: targetDir, + extra: { configured: true }, + projectID: instance.project.id, + }) + expect(JSON.parse(recorded.calls.create[0].env.OPENCODE_AUTH_CONTENT ?? "{}")).toEqual({ + test: { type: "api", key: "secret" }, + }) + expect(recorded.calls.create[0].env.OPENCODE_WORKSPACE_ID).toBe(workspaceID) + expect(recorded.calls.create[0].env.OPENCODE_EXPERIMENTAL_WORKSPACES).toBe("true") + expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_HEADERS).toBe("authorization=otel") + expect(recorded.calls.create[0].env.OTEL_EXPORTER_OTLP_ENDPOINT).toBe("https://otel.test") + expect(recorded.calls.create[0].env.OTEL_RESOURCE_ATTRIBUTES).toBe("service.name=opencode-test") + expect((yield* workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBe("connected") + + yield* removeWorkspace(workspaceID) + expect((yield* workspaceStatus()).find((item) => item.workspaceID === workspaceID)?.status).toBeUndefined() + }), + { git: true }, + ) - test("create propagates configure failures and does not insert a workspace", async () => { - await withInstance(async (instance) => { - const type = unique("configure-failure") - registerAdapter( - instance.project.id, - type, - recordedAdapter({ - configure() { - throw new Error("configure exploded") + it.instance( + "create propagates configure failures and does not insert a workspace", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const type = unique("configure-failure") + registerAdapter( + instance.project.id, + type, + recordedAdapter({ + configure() { + throw new Error("configure exploded") + }, + target() { + return { type: "local", directory: "/unused" } + }, + }).adapter, + ) + + yield* expectEffectFailure( + createWorkspace({ type, branch: null, projectID: instance.project.id, extra: null }), + "configure exploded", + ) + expect(yield* listWorkspaces(instance.project)).toEqual([]) + }), + { git: true }, + ) + + it.instance( + "create leaves the inserted row when adapter create fails", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const type = unique("create-failure") + const recorded = recordedAdapter({ + async create() { + throw new Error("create exploded") }, target() { return { type: "local", directory: "/unused" } }, - }).adapter, - ) + }) + registerAdapter(instance.project.id, type, recorded.adapter) - await expect( - createWorkspace({ type, branch: null, projectID: instance.project.id, extra: null }), - ).rejects.toThrow("configure exploded") - expect(await listWorkspaces(instance.project)).toEqual([]) - }) - }) + yield* expectEffectFailure( + createWorkspace({ type, branch: "branch", projectID: instance.project.id, extra: { x: 1 } }), + "create exploded", + ) - test("create leaves the inserted row when adapter create fails", async () => { - await withInstance(async (instance) => { - const type = unique("create-failure") - const recorded = recordedAdapter({ - async create() { - throw new Error("create exploded") - }, - target() { - return { type: "local", directory: "/unused" } - }, - }) - registerAdapter(instance.project.id, type, recorded.adapter) + const rows = yield* listWorkspaces(instance.project) + expect(rows).toHaveLength(1) + expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } }) + expect(recorded.calls.target).toHaveLength(0) + yield* removeWorkspace(rows[0].id) + }), + { git: true }, + ) - await expect( - createWorkspace({ type, branch: "branch", projectID: instance.project.id, extra: { x: 1 } }), - ).rejects.toThrow("create exploded") + it.instance( + "create returns after a local workspace reports error", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const type = unique("local-error") + const missing = path.join(instance.directory, "missing-local-target") + const recorded = localAdapter(missing, { createDir: false }) + registerAdapter(instance.project.id, type, recorded.adapter) - const rows = await listWorkspaces(instance.project) - expect(rows).toHaveLength(1) - expect(rows[0]).toMatchObject({ type, branch: "branch", extra: { x: 1 } }) - expect(recorded.calls.target).toHaveLength(0) - await removeWorkspace(rows[0].id) - }) - }) + const info = yield* createWorkspace({ type, branch: null, projectID: instance.project.id, extra: null }) - test("create returns after a local workspace reports error", async () => { - await withInstance(async (instance) => { - const type = unique("local-error") - const missing = path.join(instance.directory, "missing-local-target") - const recorded = localAdapter(missing, { createDir: false }) - registerAdapter(instance.project.id, type, recorded.adapter) + expect(info.directory).toBe(missing) + expect((yield* workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBe("error") + yield* removeWorkspace(info.id) + }), + { git: true }, + ) - const info = await createWorkspace({ type, branch: null, projectID: instance.project.id, extra: null }) + it.instance( + "syncList registers adapter-listed workspaces that are missing by name", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const type = unique("list-sync") + const existing = workspaceInfo(instance.project.id, type, { + id: WorkspaceID.ascending("wrk_list_sync_existing"), + name: "existing", + directory: path.join(instance.directory, "existing"), + }) + insertWorkspace(existing) - expect(info.directory).toBe(missing) - expect((await workspaceStatus()).find((item) => item.workspaceID === info.id)?.status).toBe("error") - await removeWorkspace(info.id) - }) - }) + const discovered = { + type, + name: "discovered", + branch: "feature/discovered", + directory: path.join(instance.directory, "discovered"), + extra: { source: "adapter" }, + projectID: instance.project.id, + } + const recorded = recordedAdapter({ + list() { + return [ + { + type, + name: existing.name, + branch: "ignored", + directory: path.join(instance.directory, "ignored"), + extra: null, + projectID: instance.project.id, + }, + discovered, + ] + }, + target(info) { + return { type: "local", directory: info.directory ?? instance.directory } + }, + }) + registerAdapter(instance.project.id, type, recorded.adapter) - test("syncList registers adapter-listed workspaces that are missing by name", async () => { - await withInstance(async (instance) => { - const type = unique("list-sync") - const existing = workspaceInfo(instance.project.id, type, { - id: WorkspaceID.ascending("wrk_list_sync_existing"), - name: "existing", - directory: path.join(instance.directory, "existing"), - }) - insertWorkspace(existing) - - const discovered = { - type, - name: "discovered", - branch: "feature/discovered", - directory: path.join(instance.directory, "discovered"), - extra: { source: "adapter" }, - projectID: instance.project.id, - } - const recorded = recordedAdapter({ - list() { - return [ - { - type, - name: existing.name, - branch: "ignored", - directory: path.join(instance.directory, "ignored"), - extra: null, - projectID: instance.project.id, - }, - discovered, - ] - }, - target(info) { - return { type: "local", directory: info.directory ?? instance.directory } - }, - }) - registerAdapter(instance.project.id, type, recorded.adapter) - - await syncListWorkspaces(instance.project) - const synced = (await listWorkspaces(instance.project)).filter((item) => item.name === discovered.name) - - expect(synced).toHaveLength(1) - expect(synced[0]).toMatchObject(discovered) - expect(synced[0]?.id).toStartWith("wrk_") - expect(await listWorkspaces(instance.project)).toEqual(expect.arrayContaining([existing, synced[0]])) - expect(recorded.calls.list).toBe(1) - expect(recorded.calls.configure).toHaveLength(0) - expect(recorded.calls.create).toHaveLength(0) - expect(recorded.calls.target).toHaveLength(1) - }) - }) + yield* syncListWorkspaces(instance.project) + const synced = (yield* listWorkspaces(instance.project)).filter((item) => item.name === discovered.name) + + expect(synced).toHaveLength(1) + expect(synced[0]).toMatchObject(discovered) + expect(synced[0]?.id).toStartWith("wrk_") + expect(yield* listWorkspaces(instance.project)).toEqual(expect.arrayContaining([existing, synced[0]])) + expect(recorded.calls.list).toBe(1) + expect(recorded.calls.configure).toHaveLength(0) + expect(recorded.calls.create).toHaveLength(0) + expect(recorded.calls.target).toHaveLength(1) + }), + { git: true }, + ) - test("syncList calls every registered adapter with a list method", async () => { - await withInstance(async (instance) => { - const typeA = unique("list-sync-a") - const typeB = unique("list-sync-b") - const adapterA = recordedAdapter({ - list() { - return [ - { - type: typeA, - name: "adapter-a", - branch: null, - directory: path.join(instance.directory, "adapter-a"), - extra: null, - projectID: instance.project.id, - }, - ] - }, - target(info) { - return { type: "local", directory: info.directory ?? instance.directory } - }, - }) - const adapterB = recordedAdapter({ - list() { - return [ - { - type: typeB, - name: "adapter-b", - branch: null, - directory: path.join(instance.directory, "adapter-b"), - extra: null, - projectID: instance.project.id, - }, - ] - }, - target(info) { - return { type: "local", directory: info.directory ?? instance.directory } - }, - }) - const noList = recordedAdapter({ - target() { - return { type: "local", directory: instance.directory } - }, - }) - registerAdapter(instance.project.id, typeA, adapterA.adapter) - registerAdapter(instance.project.id, typeB, adapterB.adapter) - registerAdapter(instance.project.id, unique("list-sync-none"), noList.adapter) - - await syncListWorkspaces(instance.project) - const synced = await listWorkspaces(instance.project) - - expect( - synced - .filter((item) => item.type === typeA || item.type === typeB) - .map((item) => item.name) - .toSorted(), - ).toEqual(["adapter-a", "adapter-b"]) - expect(adapterA.calls.list).toBe(1) - expect(adapterB.calls.list).toBe(1) - expect(noList.calls.list).toBe(0) - }) - }) + it.instance( + "syncList calls every registered adapter with a list method", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const typeA = unique("list-sync-a") + const typeB = unique("list-sync-b") + const adapterA = recordedAdapter({ + list() { + return [ + { + type: typeA, + name: "adapter-a", + branch: null, + directory: path.join(instance.directory, "adapter-a"), + extra: null, + projectID: instance.project.id, + }, + ] + }, + target(info) { + return { type: "local", directory: info.directory ?? instance.directory } + }, + }) + const adapterB = recordedAdapter({ + list() { + return [ + { + type: typeB, + name: "adapter-b", + branch: null, + directory: path.join(instance.directory, "adapter-b"), + extra: null, + projectID: instance.project.id, + }, + ] + }, + target(info) { + return { type: "local", directory: info.directory ?? instance.directory } + }, + }) + const noList = recordedAdapter({ + target() { + return { type: "local", directory: instance.directory } + }, + }) + registerAdapter(instance.project.id, typeA, adapterA.adapter) + registerAdapter(instance.project.id, typeB, adapterB.adapter) + registerAdapter(instance.project.id, unique("list-sync-none"), noList.adapter) + + yield* syncListWorkspaces(instance.project) + const synced = yield* listWorkspaces(instance.project) + + expect( + synced + .filter((item) => item.type === typeA || item.type === typeB) + .map((item) => item.name) + .toSorted(), + ).toEqual(["adapter-a", "adapter-b"]) + expect(adapterA.calls.list).toBe(1) + expect(adapterB.calls.list).toBe(1) + expect(noList.calls.list).toBe(0) + }), + { git: true }, + ) it.live("remote create connects to routed event and history endpoints", () => { const calls: FetchCall[] = [] @@ -751,11 +756,14 @@ describe("workspace CRUD", () => { }) }) - test("remove returns undefined for a missing workspace", async () => { - await withInstance(async () => { - expect(await removeWorkspace(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined() - }) - }) + it.instance( + "remove returns undefined for a missing workspace", + () => + Effect.gen(function* () { + expect(yield* removeWorkspace(WorkspaceID.ascending("wrk_missing_remove"))).toBeUndefined() + }), + { git: true }, + ) it.instance( "remove deletes the workspace, associated sessions, adapter resources, and status", @@ -791,28 +799,32 @@ describe("workspace CRUD", () => { { git: true }, ) - test("remove still deletes the row when the adapter cannot remove resources", async () => { - await withInstance(async (instance) => { - const type = unique("remove-throws") - const info = workspaceInfo(instance.project.id, type, { id: WorkspaceID.ascending("wrk_remove_throws") }) - registerAdapter( - instance.project.id, - type, - recordedAdapter({ - async remove() { - throw new Error("remove exploded") - }, - target() { - return { type: "local", directory: "/unused" } - }, - }).adapter, - ) - insertWorkspace(info) + it.instance( + "remove still deletes the row when the adapter cannot remove resources", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const type = unique("remove-throws") + const info = workspaceInfo(instance.project.id, type, { id: WorkspaceID.ascending("wrk_remove_throws") }) + registerAdapter( + instance.project.id, + type, + recordedAdapter({ + async remove() { + throw new Error("remove exploded") + }, + target() { + return { type: "local", directory: "/unused" } + }, + }).adapter, + ) + insertWorkspace(info) - expect(await removeWorkspace(info.id)).toEqual(info) - expect(await getWorkspace(info.id)).toBeUndefined() - }) - }) + expect(yield* removeWorkspace(info.id)).toEqual(info) + expect(yield* getWorkspace(info.id)).toBeUndefined() + }), + { git: true }, + ) it.instance( "sessionWarp moves a session into a local workspace and claims ownership", @@ -920,42 +932,44 @@ describe("workspace CRUD", () => { { git: true }, ) - test("sessionWarp detaches to the source project when invoked from a workspace instance", async () => { - await withInstance(async (instance) => { - const projectID = instance.project.id - await using workspaceTmp = await tmpdir({ git: true }) - const previousType = unique("warp-detach-workspace-instance") - const previous = workspaceInfo(projectID, previousType) - insertWorkspace(previous) - registerAdapter(projectID, previousType, localAdapter(workspaceTmp.path, { createDir: false }).adapter) - const session = await AppRuntime.runPromise( - SessionNs.Service.use((svc) => svc.create({})).pipe(Effect.provideService(InstanceRef, instance)), - ) - attachSessionToWorkspace(session.id, previous.id) + it.instance( + "sessionWarp detaches to the source project when invoked from a workspace instance", + () => + Effect.gen(function* () { + const instance = yield* requireInstance + const projectID = instance.project.id + const workspaceDir = path.join(instance.directory, "warp-detach-workspace-instance") + yield* Effect.promise(() => initGitRepo(workspaceDir)) + const previousType = unique("warp-detach-workspace-instance") + const previous = workspaceInfo(projectID, previousType) + insertWorkspace(previous) + registerAdapter(projectID, previousType, localAdapter(workspaceDir, { createDir: false }).adapter) + const session = yield* SessionNs.Service.use((svc) => svc.create({})) + attachSessionToWorkspace(session.id, previous.id) - const workspaceCtx = await AppRuntime.runPromise( - InstanceStore.Service.use((store) => store.load({ directory: workspaceTmp.path })), - ) - const workspaceProjectID = await context.provide(workspaceCtx, async () => { - const id = workspaceCtx.project.id - expect(id).not.toBe(projectID) - await warpWorkspaceSession({ workspaceID: null, sessionID: session.id }) - return id - }) + const workspaceCtx = yield* Effect.promise(() => + withTestInstance({ directory: workspaceDir, fn: (ctx) => ctx }), + ) + const workspaceProjectID = workspaceCtx.project.id + expect(workspaceProjectID).not.toBe(projectID) + yield* warpWorkspaceSession({ workspaceID: null, sessionID: session.id }).pipe( + Effect.provideService(InstanceRef, workspaceCtx), + ) - expect( - Database.use((db) => - db - .select({ workspaceID: SessionTable.workspace_id }) - .from(SessionTable) - .where(eq(SessionTable.id, session.id)) - .get(), - )?.workspaceID, - ).toBeNull() - expect(sessionSequenceOwner(session.id)).toBe(projectID) - expect(sessionSequenceOwner(session.id)).not.toBe(workspaceProjectID) - }) - }) + expect( + Database.use((db) => + db + .select({ workspaceID: SessionTable.workspace_id }) + .from(SessionTable) + .where(eq(SessionTable.id, session.id)) + .get(), + )?.workspaceID, + ).toBeNull() + expect(sessionSequenceOwner(session.id)).toBe(projectID) + expect(sessionSequenceOwner(session.id)).not.toBe(workspaceProjectID) + }), + { git: true }, + ) it.live("sessionWarp syncs previous remote history, replays it, steals, and claims the sequence", () => { const calls: FetchCall[] = [] @@ -1559,89 +1573,102 @@ describe("workspace sync state", () => { }) describe("workspace waitForSync", () => { - test("returns immediately for an empty fence", async () => { - await withInstance(async () => { - await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_empty"), {})).resolves.toBeUndefined() - }) - }) + it.instance("returns immediately for an empty fence", () => + waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_empty"), {}), + ) - test("returns immediately when the stored sequence already satisfies the fence", async () => { - await withInstance(async () => { + it.instance("returns immediately when the stored sequence already satisfies the fence", () => + Effect.gen(function* () { const sessionID = SessionID.descending("ses_wait_done") Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 4 }).run()) - await expect( - waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 }), - ).resolves.toBeUndefined() - await expect( - waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 }), - ).resolves.toBeUndefined() - }) - }) + yield* waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done"), { [sessionID]: 4 }) + yield* waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_done_2"), { [sessionID]: 3 }) + }), + ) - test("waits until the database reaches the requested sequence and a workspace event arrives", async () => { - await withInstance(async () => { + it.instance("waits until the database reaches the requested sequence and a workspace event arrives", () => + Effect.gen(function* () { const workspaceID = WorkspaceID.ascending("wrk_wait_event") const sessionID = SessionID.descending("ses_wait_event") Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 1 }).run()) - const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 2 }) - await delay(10) - Database.use((db) => - db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), + yield* Effect.all( + [ + waitForWorkspaceSync(workspaceID, { [sessionID]: 2 }), + Effect.gen(function* () { + yield* Effect.sleep("10 millis") + Database.use((db) => + db.update(EventSequenceTable).set({ seq: 2 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), + ) + GlobalBus.emit("event", { workspace: workspaceID, payload: { type: "anything" } }) + }), + ], + { concurrency: 2, discard: true }, ) - GlobalBus.emit("event", { workspace: workspaceID, payload: { type: "anything" } }) - - await expect(waited).resolves.toBeUndefined() - }) - }) + }), + ) - test("a sync event for a different workspace can also release the fence", async () => { - await withInstance(async () => { + it.instance("a sync event for a different workspace can also release the fence", () => + Effect.gen(function* () { const workspaceID = WorkspaceID.ascending("wrk_wait_sync_any") const sessionID = SessionID.descending("ses_wait_sync_any") Database.use((db) => db.insert(EventSequenceTable).values({ aggregate_id: sessionID, seq: 0 }).run()) - const waited = waitForWorkspaceSync(workspaceID, { [sessionID]: 1 }) - await delay(10) - Database.use((db) => - db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), + yield* Effect.all( + [ + waitForWorkspaceSync(workspaceID, { [sessionID]: 1 }), + Effect.gen(function* () { + yield* Effect.sleep("10 millis") + Database.use((db) => + db.update(EventSequenceTable).set({ seq: 1 }).where(eq(EventSequenceTable.aggregate_id, sessionID)).run(), + ) + GlobalBus.emit("event", { + workspace: WorkspaceID.ascending("wrk_other_workspace"), + payload: { type: "sync" }, + }) + }), + ], + { concurrency: 2, discard: true }, ) - GlobalBus.emit("event", { - workspace: WorkspaceID.ascending("wrk_other_workspace"), - payload: { type: "sync" }, - }) - - await expect(waited).resolves.toBeUndefined() - }) - }) + }), + ) - test("rejects with the abort reason when aborted", async () => { - await withInstance(async () => { + it.instance("rejects with the abort reason when aborted", () => + Effect.gen(function* () { const abort = new AbortController() const reason = new Error("caller aborted") - const waited = waitForWorkspaceSync( - WorkspaceID.ascending("wrk_wait_abort"), - { [SessionID.descending("ses_wait_abort")]: 1 }, - abort.signal, + const exit = yield* Effect.exit( + Effect.all( + [ + waitForWorkspaceSync( + WorkspaceID.ascending("wrk_wait_abort"), + { [SessionID.descending("ses_wait_abort")]: 1 }, + abort.signal, + ), + Effect.sync(() => abort.abort(reason)), + ], + { concurrency: 2 }, + ), ) - abort.abort(reason) - await expect(waited).rejects.toMatchObject({ - _tag: "WorkspaceSyncAbortedError", - message: reason.message, - cause: reason, - }) - }) - }) + expect(Exit.isFailure(exit)).toBe(true) + if (Exit.isFailure(exit)) expect(Cause.pretty(exit.cause)).toContain(reason.message) + }), + ) - test("times out with the requested fence in the error message", async () => { - await withInstance(async () => { - const sessionID = SessionID.descending("ses_wait_timeout") + it.instance( + "times out with the requested fence in the error message", + () => + Effect.gen(function* () { + const sessionID = SessionID.descending("ses_wait_timeout") - await expect(waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 })).rejects.toThrow( - `Timed out waiting for sync fence: {"${sessionID}":1}`, - ) - }) - }, 7000) + yield* expectEffectFailure( + waitForWorkspaceSync(WorkspaceID.ascending("wrk_wait_timeout"), { [sessionID]: 1 }), + `Timed out waiting for sync fence: {"${sessionID}":1}`, + ) + }), + { git: true }, + 7000, + ) })