diff --git a/.github/workflows/publish-client.yml b/.github/workflows/publish-client.yml new file mode 100644 index 00000000..c0595366 --- /dev/null +++ b/.github/workflows/publish-client.yml @@ -0,0 +1,67 @@ +name: Publish Client Library + +on: + pull_request: + types: [closed] + branches: [main] + paths: + - "packages/client/**" + workflow_dispatch: + +concurrency: + group: publish-client + cancel-in-progress: false + +permissions: + contents: read + id-token: write + +jobs: + publish: + name: Publish to JSR + runs-on: ubuntu-latest + if: github.event.pull_request.merged == true || github.event_name == 'workflow_dispatch' + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: main + + - name: Setup Deno + uses: denoland/setup-deno@v2 + with: + deno-version: v2.x + + - name: Generate version + id: version + run: | + # Semantic version derived from date: MAJOR.YYYYMMDD.PATCH + VERSION="0.$(date -u +%Y%m%d).$(git rev-list --count HEAD -- packages/client)" + echo "version=$VERSION" >> "$GITHUB_OUTPUT" + echo "Version: $VERSION" + + - name: Stamp version + working-directory: packages/client + run: | + VERSION="${{ steps.version.outputs.version }}" + # Update version in deno.json + deno eval " + const config = JSON.parse(await Deno.readTextFile('deno.json')); + config.version = '${VERSION}'; + await Deno.writeTextFile('deno.json', JSON.stringify(config, null, 2) + '\n'); + " + echo "Updated deno.json:" + cat deno.json + + - name: Type check + working-directory: packages/client + run: deno check mod.ts + + - name: Run tests + working-directory: packages/client + run: deno test -A + + - name: Publish to JSR + working-directory: packages/client + run: deno publish --allow-dirty diff --git a/deno.json b/deno.json index ff745ced..13305469 100644 --- a/deno.json +++ b/deno.json @@ -3,6 +3,7 @@ "version": "0.1.0", "license": "AGPL-3.0-only", "exports": "./main.ts", + "workspace": ["packages/client"], "tasks": { "dev": "deno run --unstable-bundle --allow-read --allow-write --allow-env --allow-run --allow-sys main.ts", "test": "deno test --unstable-bundle --allow-read --allow-write --allow-env --allow-run --allow-net --allow-sys", diff --git a/packages/client/client.ts b/packages/client/client.ts new file mode 100644 index 00000000..b23e8c67 --- /dev/null +++ b/packages/client/client.ts @@ -0,0 +1,346 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and\/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * SwampClient — typed WebSocket client for the swamp serve API. + * + * Provides both callback-based and AsyncIterable-based consumption of + * workflow and model method execution event streams. + */ + +import type { + ModelMethodRunEvent, + ModelMethodRunPayload, + ModelMethodRunView, + ServerMessage, + ServerRequest, + WorkflowRunEvent, + WorkflowRunPayload, + WorkflowRunView, +} from "./protocol.ts"; +import { + type EventHandlers, + SwampClientError, + withDefaults, +} from "./stream.ts"; + +interface PendingRequest { + resolve: (value: T) => void; + reject: (error: Error) => void; + // deno-lint-ignore no-explicit-any + handlers: EventHandlers; + // deno-lint-ignore no-explicit-any + queue?: AsyncIterableQueue; +} + +export class SwampClient { + private url: string; + private socket: WebSocket | null = null; + // deno-lint-ignore no-explicit-any + private pending = new Map>(); + private connectPromise: Promise | null = null; + + constructor(url: string) { + this.url = url; + } + + /** + * Opens the WebSocket connection. Resolves when the connection is ready. + * Safe to call multiple times — returns the same promise if already connecting. + */ + connect(): Promise { + if (this.socket?.readyState === WebSocket.OPEN) { + return Promise.resolve(); + } + if (this.connectPromise) return this.connectPromise; + + this.connectPromise = new Promise((resolve, reject) => { + const socket = new WebSocket(this.url); + + socket.onopen = () => { + this.socket = socket; + this.connectPromise = null; + resolve(); + }; + + socket.onerror = (event) => { + this.connectPromise = null; + const msg = event instanceof ErrorEvent + ? event.message + : "WebSocket connection failed"; + reject(new Error(msg)); + }; + + socket.onmessage = (event) => { + this.handleMessage(event.data as string); + }; + + socket.onclose = () => { + this.socket = null; + this.connectPromise = null; + // Reject all pending requests + for (const [id, pending] of this.pending) { + pending.reject(new Error("WebSocket closed")); + this.pending.delete(id); + } + }; + }); + + return this.connectPromise; + } + + /** Closes the WebSocket connection. */ + close(): void { + this.socket?.close(); + this.socket = null; + } + + /** + * Runs a workflow and returns the completed event payload. + * Optionally dispatches events to partial handlers as they arrive. + */ + async workflowRun( + payload: WorkflowRunPayload, + handlers?: Partial>, + ): Promise { + await this.connect(); + const id = crypto.randomUUID(); + const fullHandlers = handlers + ? withDefaults(handlers) + : withDefaults({}); + + return new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject, handlers: fullHandlers }); + this.send({ + type: "workflow.run", + id, + payload, + }); + }); + } + + /** + * Runs a model method and returns the completed event payload. + * Optionally dispatches events to partial handlers as they arrive. + */ + async modelMethodRun( + payload: ModelMethodRunPayload, + handlers?: Partial>, + ): Promise { + await this.connect(); + const id = crypto.randomUUID(); + const fullHandlers = handlers + ? withDefaults(handlers) + : withDefaults({}); + + return new Promise((resolve, reject) => { + this.pending.set(id, { resolve, reject, handlers: fullHandlers }); + this.send({ + type: "model.method.run", + id, + payload, + }); + }); + } + + /** + * Returns an AsyncIterable of workflow run events. + */ + async *workflowRunStream( + payload: WorkflowRunPayload, + ): AsyncGenerator { + await this.connect(); + const id = crypto.randomUUID(); + const queue = new AsyncIterableQueue(); + + this.pending.set(id, { + resolve: () => {}, + reject: (err) => queue.error(err), + handlers: withDefaults({}, (event) => { + queue.push(event); + if (event.kind === "completed" || event.kind === "error") { + queue.done(); + } + }), + queue, + }); + + this.send({ type: "workflow.run", id, payload }); + + try { + yield* queue; + } finally { + this.pending.delete(id); + } + } + + /** + * Returns an AsyncIterable of model method run events. + */ + async *modelMethodRunStream( + payload: ModelMethodRunPayload, + ): AsyncGenerator { + await this.connect(); + const id = crypto.randomUUID(); + const queue = new AsyncIterableQueue(); + + this.pending.set(id, { + resolve: () => {}, + reject: (err) => queue.error(err), + handlers: withDefaults({}, (event) => { + queue.push(event); + if (event.kind === "completed" || event.kind === "error") { + queue.done(); + } + }), + queue, + }); + + this.send({ type: "model.method.run", id, payload }); + + try { + yield* queue; + } finally { + this.pending.delete(id); + } + } + + /** Cancels a running operation by its request id. */ + cancel(id: string): void { + this.send({ type: "cancel", id }); + } + + private handleMessage(data: string): void { + let msg: ServerMessage; + try { + msg = JSON.parse(data) as ServerMessage; + } catch { + return; + } + + const pending = this.pending.get(msg.id); + if (!pending) return; + + if (msg.type === "error") { + const err = new SwampClientError( + msg.error.code, + msg.error.message, + msg.error.details, + ); + this.pending.delete(msg.id); + pending.reject(err); + return; + } + + if (msg.type === "event") { + const event = msg.event; + + // Dispatch to handler + const handler = pending.handlers[event.kind]; + if (handler) { + handler(event); + } + + // Resolve/reject on terminal events + if (event.kind === "completed") { + this.pending.delete(msg.id); + pending.resolve(event.run); + } else if (event.kind === "error") { + this.pending.delete(msg.id); + // deno-lint-ignore no-explicit-any + const swampError = event.error as any; + pending.reject( + new SwampClientError( + swampError?.code ?? "unknown", + swampError?.message ?? "Unknown error", + swampError?.details, + ), + ); + } + } + } + + private send(request: ServerRequest): void { + if (!this.socket || this.socket.readyState !== WebSocket.OPEN) { + throw new Error("WebSocket is not connected"); + } + this.socket.send(JSON.stringify(request)); + } +} + +/** + * Simple async iterable queue for bridging push-based WebSocket events + * into a pull-based AsyncGenerator. + */ +class AsyncIterableQueue implements AsyncIterable { + private buffer: T[] = []; + private resolve: ((value: IteratorResult) => void) | null = null; + private finished = false; + private err: Error | null = null; + + push(value: T): void { + if (this.finished) return; + if (this.resolve) { + const r = this.resolve; + this.resolve = null; + r({ value, done: false }); + } else { + this.buffer.push(value); + } + } + + done(): void { + this.finished = true; + if (this.resolve) { + const r = this.resolve; + this.resolve = null; + r({ value: undefined as unknown as T, done: true }); + } + } + + error(err: Error): void { + this.err = err; + this.finished = true; + if (this.resolve) { + const r = this.resolve; + this.resolve = null; + r({ value: undefined as unknown as T, done: true }); + } + } + + [Symbol.asyncIterator](): AsyncIterator { + return { + next: (): Promise> => { + if (this.err) return Promise.reject(this.err); + if (this.buffer.length > 0) { + return Promise.resolve({ value: this.buffer.shift()!, done: false }); + } + if (this.finished) { + return Promise.resolve({ + value: undefined as unknown as T, + done: true, + }); + } + return new Promise>((resolve) => { + this.resolve = resolve; + }); + }, + }; + } +} diff --git a/packages/client/client_test.ts b/packages/client/client_test.ts new file mode 100644 index 00000000..d3813bee --- /dev/null +++ b/packages/client/client_test.ts @@ -0,0 +1,319 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals, assertRejects } from "@std/assert"; +import { SwampClient } from "./client.ts"; +import { SwampClientError } from "./stream.ts"; + +// ── Constructor ───────────────────────────────────────────────────────── + +Deno.test("SwampClient - constructor accepts a URL", () => { + const client = new SwampClient("ws://localhost:9876/ws"); + // No error thrown — client is created in disconnected state + client.close(); // safe to call even when not connected +}); + +Deno.test("SwampClient - close is idempotent when not connected", () => { + const client = new SwampClient("ws://localhost:9876/ws"); + client.close(); + client.close(); + // No error — close on a null socket is a no-op +}); + +// ── Integration with a local WebSocket server ─────────────────────────── + +Deno.test("SwampClient - connect and close lifecycle", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onopen = () => {}; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + await client.connect(); + // Calling connect again should return immediately (already open) + await client.connect(); + + client.close(); + await server.shutdown(); +}); + +Deno.test("SwampClient - connect rejects on refused connection", async () => { + // Port 1 is almost certainly not listening + const client = new SwampClient("ws://127.0.0.1:1/ws"); + + await assertRejects( + () => client.connect(), + Error, + ); +}); + +Deno.test("SwampClient - workflowRun receives completed event", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onmessage = (e) => { + const msg = JSON.parse(e.data as string); + // Echo back a completed event + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { + kind: "completed", + run: { + id: "run-1", + workflowId: "wf-1", + workflowName: "test-wf", + status: "succeeded", + jobs: [], + }, + }, + })); + }; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + const result = await client.workflowRun({ + workflowIdOrName: "test-wf", + }); + assertEquals(result.workflowName, "test-wf"); + assertEquals(result.status, "succeeded"); + + client.close(); + await server.shutdown(); +}); + +Deno.test("SwampClient - workflowRun dispatches intermediate events to handlers", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onmessage = (e) => { + const msg = JSON.parse(e.data as string); + // Send a sequence of events + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { kind: "validating_inputs" }, + })); + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { kind: "evaluating_workflow" }, + })); + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { + kind: "completed", + run: { + id: "r1", + workflowId: "w1", + workflowName: "wf", + status: "succeeded", + jobs: [], + }, + }, + })); + }; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + const seen: string[] = []; + await client.workflowRun( + { workflowIdOrName: "wf" }, + { + validating_inputs: () => { + seen.push("validating"); + }, + evaluating_workflow: () => { + seen.push("evaluating"); + }, + }, + ); + + assertEquals(seen, ["validating", "evaluating"]); + + client.close(); + await server.shutdown(); +}); + +Deno.test("SwampClient - server error message rejects with SwampClientError", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onmessage = (e) => { + const msg = JSON.parse(e.data as string); + socket.send(JSON.stringify({ + type: "error", + id: msg.id, + error: { + code: "not_found", + message: "Workflow not found", + }, + })); + }; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + const err = await assertRejects( + () => client.workflowRun({ workflowIdOrName: "missing" }), + SwampClientError, + "Workflow not found", + ); + assertEquals((err as SwampClientError).code, "not_found"); + + client.close(); + await server.shutdown(); +}); + +Deno.test("SwampClient - error event in stream rejects with SwampClientError", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onmessage = (e) => { + const msg = JSON.parse(e.data as string); + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { + kind: "error", + error: { + code: "execution_failed", + message: "Method threw an exception", + }, + }, + })); + }; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + const err = await assertRejects( + () => + client.modelMethodRun({ + modelIdOrName: "test", + methodName: "run", + }), + SwampClientError, + "Method threw an exception", + ); + assertEquals((err as SwampClientError).code, "execution_failed"); + + client.close(); + await server.shutdown(); +}); + +Deno.test("SwampClient - workflowRunStream yields events", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onmessage = (e) => { + const msg = JSON.parse(e.data as string); + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { kind: "validating_inputs" }, + })); + socket.send(JSON.stringify({ + type: "event", + id: msg.id, + event: { + kind: "completed", + run: { + id: "r1", + workflowId: "w1", + workflowName: "stream-wf", + status: "succeeded", + jobs: [], + }, + }, + })); + }; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + const kinds: string[] = []; + for await ( + const event of client.workflowRunStream({ + workflowIdOrName: "stream-wf", + }) + ) { + kinds.push(event.kind); + } + + assertEquals(kinds, ["validating_inputs", "completed"]); + + client.close(); + await server.shutdown(); +}); + +Deno.test("SwampClient - socket close rejects pending requests", async () => { + const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => { + if (req.headers.get("upgrade") === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + socket.onmessage = () => { + // Close instead of responding + socket.close(); + }; + return response; + } + return new Response("not found", { status: 404 }); + }); + + const addr = server.addr; + const client = new SwampClient(`ws://localhost:${addr.port}/ws`); + + await assertRejects( + () => client.workflowRun({ workflowIdOrName: "test" }), + Error, + "WebSocket closed", + ); + + client.close(); + await server.shutdown(); +}); diff --git a/packages/client/deno.json b/packages/client/deno.json new file mode 100644 index 00000000..75fd5199 --- /dev/null +++ b/packages/client/deno.json @@ -0,0 +1,6 @@ +{ + "name": "@systeminit/swamp-lib", + "version": "0.1.0", + "license": "AGPL-3.0-only", + "exports": "./mod.ts" +} diff --git a/packages/client/mod.ts b/packages/client/mod.ts new file mode 100644 index 00000000..a937e984 --- /dev/null +++ b/packages/client/mod.ts @@ -0,0 +1,71 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and\/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * @systeminit/swamp-lib — TypeScript client for the swamp WebSocket API. + * + * ```typescript + * import { SwampClient } from "@systeminit/swamp-lib"; + * + * const client = new SwampClient("ws://localhost:9090"); + * await client.connect(); + * + * // Callback-based: resolves with completed payload + * const run = await client.workflowRun( + * { workflowIdOrName: "my-workflow", inputs: { env: "dev" } }, + * { started: (e) => console.log(`Run ${e.runId} started`) }, + * ); + * + * // AsyncIterable-based: for-await-of event stream + * for await (const event of client.workflowRunStream({ workflowIdOrName: "my-workflow" })) { + * console.log(event.kind); + * } + * + * client.close(); + * ``` + * + * @module + */ + +export { SwampClient } from "./client.ts"; + +// Protocol types +export type { + ModelMethodRunEvent, + ModelMethodRunPayload, + ModelMethodRunView, + SerializedError, + SerializedEvent, + ServerMessage, + ServerRequest, + WorkflowRunEvent, + WorkflowRunPayload, + WorkflowRunView, +} from "./protocol.ts"; + +// Stream helpers +export { + consumeStream, + type EventHandlers, + type HasTerminals, + result, + type StreamEvent, + SwampClientError, + withDefaults, +} from "./stream.ts"; diff --git a/packages/client/protocol.ts b/packages/client/protocol.ts new file mode 100644 index 00000000..102f225e --- /dev/null +++ b/packages/client/protocol.ts @@ -0,0 +1,222 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and\/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Wire protocol types for the swamp WebSocket API. + * + * These types mirror src/serve/protocol.ts in the swamp CLI. They are + * duplicated here so the client package has zero dependencies on the CLI + * source tree and can be published independently to JSR. + */ + +// ── Inbound (client → server) ──────────────────────────────────────────── + +export interface WorkflowRunPayload { + workflowIdOrName: string; + inputs?: Record; + lastEvaluated?: boolean; + driver?: string; + verbose?: boolean; + runtimeTags?: Record; +} + +export interface ModelMethodRunPayload { + modelIdOrName: string; + methodName: string; + inputs?: Record; + lastEvaluated?: boolean; + driver?: string; + runtimeTags?: Record; +} + +export type ServerRequest = + | { type: "workflow.run"; id: string; payload: WorkflowRunPayload } + | { type: "model.method.run"; id: string; payload: ModelMethodRunPayload } + | { type: "cancel"; id: string }; + +// ── Outbound (server → client) ─────────────────────────────────────────── + +export interface SerializedEvent { + kind: string; + [key: string]: unknown; +} + +export interface SerializedError { + code: string; + message: string; + details?: unknown; +} + +export type ServerMessage = + | { type: "event"; id: string; event: SerializedEvent } + | { type: "error"; id: string; error: SerializedError }; + +// ── Event types (wire-format discriminated unions) ──────────────────────── + +/** Events emitted during a workflow run. */ +export type WorkflowRunEvent = + | { kind: "validating_inputs" } + | { kind: "evaluating_workflow" } + | { kind: "started"; runId: string; workflowName: string } + | { kind: "job_started"; jobId: string } + | { kind: "job_completed"; jobId: string; status: string } + | { kind: "job_skipped"; jobId: string } + | { kind: "step_started"; jobId: string; stepId: string } + | { kind: "step_completed"; jobId: string; stepId: string } + | { kind: "step_skipped"; jobId: string; stepId: string } + | { + kind: "step_failed"; + jobId: string; + stepId: string; + error: string; + allowedFailure?: boolean; + } + | { + kind: "model_resolved"; + jobId: string; + stepId: string; + modelName: string; + modelType: string; + methodName: string; + } + | { + kind: "method_executing"; + jobId: string; + stepId: string; + modelName: string; + methodName: string; + } + | { + kind: "method_output"; + jobId: string; + stepId: string; + modelName: string; + methodName: string; + stream: "stdout" | "stderr"; + line: string; + } + | { + kind: "method_event"; + jobId: string; + stepId: string; + modelName: string; + methodName: string; + event: Record; + } + | { kind: "report_started"; reportName: string; scope: string } + | { + kind: "report_completed"; + reportName: string; + scope: string; + markdown: string; + json: Record; + } + | { kind: "report_failed"; reportName: string; scope: string; error: string } + | { kind: "completed"; run: WorkflowRunView } + | { kind: "error"; error: SerializedError }; + +/** Events emitted during a model method run. */ +export type ModelMethodRunEvent = + | { kind: "validating_inputs" } + | { kind: "resolving_model"; modelIdOrName: string } + | { + kind: "model_resolved"; + modelName: string; + modelType: string; + methodName: string; + } + | { + kind: "env_var_warning"; + modelName: string; + envVars: Array<{ path: string; envVar: string }>; + message: string; + } + | { kind: "evaluating_expressions"; lastEvaluated: boolean } + | { kind: "executing"; modelName: string; methodName: string } + | { + kind: "method_output"; + modelName: string; + methodName: string; + stream: "stdout" | "stderr"; + line: string; + } + | { + kind: "method_event"; + modelName: string; + methodName: string; + event: Record; + } + | { kind: "data_artifact_saved"; name: string; path: string } + | { kind: "report_started"; reportName: string; scope: string } + | { + kind: "report_completed"; + reportName: string; + scope: string; + markdown: string; + json: Record; + } + | { kind: "report_failed"; reportName: string; scope: string; error: string } + | { kind: "completed"; run: ModelMethodRunView } + | { kind: "error"; error: SerializedError }; + +// ── Completed event payload types ──────────────────────────────────────── + +export interface WorkflowRunView { + id: string; + workflowId: string; + workflowName: string; + status: string; + jobs: Array<{ + name: string; + status: string; + steps: Array<{ + name: string; + status: string; + error?: string; + duration?: number; + dataArtifacts?: Array<{ + dataId: string; + name: string; + version: number; + tags: Record; + }>; + allowedFailure?: boolean; + }>; + duration?: number; + }>; + duration?: number; + path?: string; +} + +export interface ModelMethodRunView { + modelId: string; + modelName: string; + modelType: string; + methodName: string; + status: string; + duration?: number; + outputId: string; + logFile?: string; + dataArtifacts: Array<{ + id: string; + name: string; + path: string; + attributes?: Record; + }>; +} diff --git a/packages/client/stream.ts b/packages/client/stream.ts new file mode 100644 index 00000000..fbe02ae8 --- /dev/null +++ b/packages/client/stream.ts @@ -0,0 +1,113 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and\/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Client-side stream consumption helpers. + * + * These mirror the core helpers from libswamp/stream.ts so consumers can use + * the same patterns (consumeStream, result, withDefaults) without depending + * on the full swamp CLI. + */ + +import type { SerializedError } from "./protocol.ts"; + +/** Base constraint for stream events. */ +export type StreamEvent = { kind: string }; + +/** Compile-time check that E includes both `completed` and `error` terminals. */ +export type HasTerminals = Extract< + E, + { kind: "completed" } +> extends never ? never + : Extract extends never ? never + : E; + +/** Mapped type enforcing exhaustive handlers per event kind. */ +export type EventHandlers = { + [K in E["kind"]]: ( + event: Extract, + ) => void | Promise; +}; + +/** + * Iterates a stream and dispatches each event to the matching handler. + * Handlers are exhaustiveness-checked at compile time. + */ +export async function consumeStream( + stream: AsyncIterable>, + handlers: EventHandlers, +): Promise { + for await (const event of stream) { + const handler = handlers[event.kind as E["kind"]]; + // deno-lint-ignore no-explicit-any + await handler(event as any); + } +} + +/** + * Fast-forwards through a stream to the `completed` event. + * Throws a SwampClientError if an `error` event is encountered. + */ +export async function result( + stream: AsyncIterable>, +): Promise> { + for await (const event of stream) { + if (event.kind === "completed") { + return event as unknown as Extract; + } + if (event.kind === "error") { + const error = (event as unknown as { error: SerializedError }).error; + throw new SwampClientError(error.code, error.message, error.details); + } + } + throw new Error("Stream ended without a completed or error event"); +} + +/** + * Fills missing handlers with no-ops (or a provided fallback). + */ +export function withDefaults( + partial: Partial>, + fallback?: (event: E) => void | Promise, +): EventHandlers { + const noop = () => {}; + return new Proxy(partial as EventHandlers, { + get(target, prop, receiver) { + const handler = Reflect.get(target, prop, receiver); + if (handler) return handler; + if (fallback) return fallback; + return noop; + }, + }); +} + +/** + * Error thrown when the server sends an error event. + */ +export class SwampClientError extends Error { + readonly code: string; + readonly details?: unknown; + + constructor(code: string, message: string, details?: unknown) { + super(message); + this.name = "SwampClientError"; + this.code = code; + this.details = details; + } +} diff --git a/packages/client/stream_test.ts b/packages/client/stream_test.ts new file mode 100644 index 00000000..7d276d22 --- /dev/null +++ b/packages/client/stream_test.ts @@ -0,0 +1,221 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals, assertRejects } from "@std/assert"; +import { + consumeStream, + result, + SwampClientError, + withDefaults, +} from "./stream.ts"; +import type { SerializedError } from "./protocol.ts"; + +// ── Test event types ──────────────────────────────────────────────────── + +type TestEvent = + | { kind: "started"; id: string } + | { kind: "progress"; percent: number } + | { kind: "completed"; result: string } + | { kind: "error"; error: SerializedError }; + +/** Helper: creates an async iterable from an array. */ +async function* asyncOf(...items: T[]): AsyncGenerator { + for (const item of items) { + yield item; + } +} + +// ── SwampClientError ──────────────────────────────────────────────────── + +Deno.test("SwampClientError - constructor sets all fields", () => { + const err = new SwampClientError("not_found", "Model not found", { + id: "abc", + }); + assertEquals(err.code, "not_found"); + assertEquals(err.message, "Model not found"); + assertEquals(err.details, { id: "abc" }); + assertEquals(err.name, "SwampClientError"); + assertEquals(err instanceof Error, true); +}); + +Deno.test("SwampClientError - details is optional", () => { + const err = new SwampClientError("cancelled", "Cancelled"); + assertEquals(err.code, "cancelled"); + assertEquals(err.details, undefined); +}); + +// ── withDefaults ──────────────────────────────────────────────────────── + +Deno.test("withDefaults - provided handlers are used", () => { + const calls: string[] = []; + const handlers = withDefaults({ + started: (e) => { + calls.push(`started:${e.id}`); + }, + }); + + handlers.started({ kind: "started", id: "x" }); + assertEquals(calls, ["started:x"]); +}); + +Deno.test("withDefaults - missing handlers get noop", () => { + const handlers = withDefaults({}); + // Should not throw for unhandled kinds + handlers.progress({ kind: "progress", percent: 50 }); + handlers.completed({ kind: "completed", result: "ok" }); +}); + +Deno.test("withDefaults - fallback is called for missing handlers", () => { + const fallbackCalls: string[] = []; + const handlers = withDefaults({}, (event) => { + fallbackCalls.push(event.kind); + }); + + handlers.progress({ kind: "progress", percent: 50 }); + handlers.completed({ kind: "completed", result: "ok" }); + assertEquals(fallbackCalls, ["progress", "completed"]); +}); + +Deno.test("withDefaults - explicit handler takes precedence over fallback", () => { + const calls: string[] = []; + const handlers = withDefaults( + { + started: () => { + calls.push("explicit"); + }, + }, + () => { + calls.push("fallback"); + }, + ); + + handlers.started({ kind: "started", id: "1" }); + handlers.progress({ kind: "progress", percent: 0 }); + assertEquals(calls, ["explicit", "fallback"]); +}); + +// ── consumeStream ─────────────────────────────────────────────────────── + +Deno.test("consumeStream - dispatches all events to handlers", async () => { + const events: TestEvent[] = [ + { kind: "started", id: "run-1" }, + { kind: "progress", percent: 50 }, + { kind: "completed", result: "done" }, + ]; + + const seen: string[] = []; + await consumeStream( + asyncOf(...events), + withDefaults({ + started: (e) => { + seen.push(`started:${e.id}`); + }, + progress: (e) => { + seen.push(`progress:${e.percent}`); + }, + completed: (e) => { + seen.push(`completed:${e.result}`); + }, + }), + ); + + assertEquals(seen, ["started:run-1", "progress:50", "completed:done"]); +}); + +Deno.test("consumeStream - works with async handlers", async () => { + const events: TestEvent[] = [ + { kind: "started", id: "a" }, + { kind: "completed", result: "ok" }, + ]; + + const seen: string[] = []; + await consumeStream( + asyncOf(...events), + withDefaults({ + started: async (e) => { + await Promise.resolve(); + seen.push(e.id); + }, + }), + ); + + assertEquals(seen, ["a"]); +}); + +// ── result ────────────────────────────────────────────────────────────── + +Deno.test("result - returns completed event", async () => { + const events: TestEvent[] = [ + { kind: "started", id: "r1" }, + { kind: "progress", percent: 100 }, + { kind: "completed", result: "success" }, + ]; + + const completed = await result(asyncOf(...events)); + assertEquals(completed.kind, "completed"); + assertEquals(completed.result, "success"); +}); + +Deno.test("result - throws SwampClientError on error event", async () => { + const events: TestEvent[] = [ + { kind: "started", id: "r1" }, + { + kind: "error", + error: { + code: "execution_failed", + message: "Method threw", + details: { exitCode: 1 }, + }, + }, + ]; + + const err = await assertRejects( + () => result(asyncOf(...events)), + SwampClientError, + "Method threw", + ); + assertEquals((err as SwampClientError).code, "execution_failed"); + assertEquals((err as SwampClientError).details, { exitCode: 1 }); +}); + +Deno.test("result - throws generic Error if stream ends without terminal", async () => { + const events: TestEvent[] = [ + { kind: "started", id: "r1" }, + { kind: "progress", percent: 50 }, + ]; + + await assertRejects( + () => result(asyncOf(...events)), + Error, + "Stream ended without a completed or error event", + ); +}); + +Deno.test("result - skips non-terminal events", async () => { + const events: TestEvent[] = [ + { kind: "started", id: "r1" }, + { kind: "progress", percent: 10 }, + { kind: "progress", percent: 50 }, + { kind: "progress", percent: 90 }, + { kind: "completed", result: "all done" }, + ]; + + const completed = await result(asyncOf(...events)); + assertEquals(completed.result, "all done"); +}); diff --git a/src/cli/commands/serve.ts b/src/cli/commands/serve.ts new file mode 100644 index 00000000..295e12a5 --- /dev/null +++ b/src/cli/commands/serve.ts @@ -0,0 +1,120 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { Command } from "@cliffy/command"; +import { createContext, type GlobalOptions } from "../context.ts"; +import { requireInitializedRepoUnlocked } from "../repo_context.ts"; +import { handleConnection } from "../../serve/connection.ts"; +import { getSwampLogger } from "../../infrastructure/logging/logger.ts"; + +// deno-lint-ignore no-explicit-any +type AnyOptions = any; + +const logger = getSwampLogger(["serve"]); + +export const serveCommand = new Command() + .name("serve") + .description("Start a WebSocket API server for workflow and model execution") + .option("--repo-dir ", "Repository directory", { default: "." }) + .option("--port ", "Port to listen on", { default: 9090 }) + .option("--host ", "Host to bind to", { default: "127.0.0.1" }) + .action(async function (options: AnyOptions) { + const ctx = createContext(options as GlobalOptions, ["serve"]); + const repoDir = options.repoDir as string ?? "."; + const port = options.port as number; + const host = options.host as string; + const isJson = ctx.outputMode === "json"; + + ctx.logger.info`Initializing repository at ${repoDir}`; + + const { repoDir: resolvedRepoDir, repoContext, datastoreConfig } = + await requireInitializedRepoUnlocked({ + repoDir, + outputMode: ctx.outputMode, + }); + + if (host !== "127.0.0.1" && host !== "localhost") { + logger.warn( + "Binding to non-loopback address {host} — no authentication is enforced on WebSocket connections", + { host }, + ); + } + const connectionCtx = { + repoDir: resolvedRepoDir, + repoContext, + datastoreConfig, + }; + + const ac = new AbortController(); + + const server = Deno.serve( + { + port, + hostname: host, + signal: ac.signal, + onListen({ hostname, port: listenPort }) { + if (isJson) { + console.log(JSON.stringify({ + status: "listening", + host: hostname, + port: listenPort, + url: `ws://${hostname}:${listenPort}`, + })); + } else { + logger.info("WebSocket API server listening on {host}:{port}", { + host: hostname, + port: listenPort, + }); + } + }, + }, + (req) => { + // WebSocket upgrade (check first — upgrade requests are also GETs) + const upgrade = req.headers.get("upgrade") ?? ""; + if (upgrade.toLowerCase() === "websocket") { + const { socket, response } = Deno.upgradeWebSocket(req); + handleConnection(socket, connectionCtx); + return response; + } + + // Health check endpoint + if (req.method === "GET") { + const url = new URL(req.url); + if (url.pathname === "/" || url.pathname === "/health") { + return Response.json({ status: "ok", version: "1" }); + } + } + + return new Response("Not found", { status: 404 }); + }, + ); + + // Handle SIGINT/SIGTERM for graceful shutdown + const shutdown = () => { + if (isJson) { + console.log(JSON.stringify({ status: "stopped" })); + } + logger.info("Shutting down..."); + ac.abort(); + }; + Deno.addSignalListener("SIGINT", shutdown); + Deno.addSignalListener("SIGTERM", shutdown); + + await server.finished; + }); diff --git a/src/cli/commands/serve_test.ts b/src/cli/commands/serve_test.ts new file mode 100644 index 00000000..443078a3 --- /dev/null +++ b/src/cli/commands/serve_test.ts @@ -0,0 +1,58 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import { initializeLogging } from "../../infrastructure/logging/logger.ts"; + +// Initialize logging for tests +await initializeLogging({}); + +Deno.test("serveCommand module loads", async () => { + const { serveCommand } = await import("./serve.ts"); + assertEquals(serveCommand.getName(), "serve"); +}); + +Deno.test("serveCommand has correct description", async () => { + const { serveCommand } = await import("./serve.ts"); + assertEquals( + serveCommand.getDescription(), + "Start a WebSocket API server for workflow and model execution", + ); +}); + +Deno.test("serveCommand has --port option", async () => { + const { serveCommand } = await import("./serve.ts"); + const options = serveCommand.getOptions(); + const portOpt = options.find((o) => o.name === "port"); + assertEquals(portOpt !== undefined, true); +}); + +Deno.test("serveCommand has --host option", async () => { + const { serveCommand } = await import("./serve.ts"); + const options = serveCommand.getOptions(); + const hostOpt = options.find((o) => o.name === "host"); + assertEquals(hostOpt !== undefined, true); +}); + +Deno.test("serveCommand has --repo-dir option", async () => { + const { serveCommand } = await import("./serve.ts"); + const options = serveCommand.getOptions(); + const repoDirOpt = options.find((o) => o.name === "repo-dir"); + assertEquals(repoDirOpt !== undefined, true); +}); diff --git a/src/cli/mod.ts b/src/cli/mod.ts index a80f1346..344e08ce 100644 --- a/src/cli/mod.ts +++ b/src/cli/mod.ts @@ -44,6 +44,7 @@ import { extensionCommand } from "./commands/extension.ts"; import { summariseCommand } from "./commands/summarise.ts"; import { datastoreCommand } from "./commands/datastore.ts"; import { reportCommand } from "./commands/report.ts"; +import { serveCommand } from "./commands/serve.ts"; import { createHelpCommand } from "./commands/help.ts"; import { unknownCommandErrorHandler } from "./unknown_command_handler.ts"; import { @@ -723,7 +724,8 @@ export async function runCli(args: string[]): Promise { .command("extension", extensionCommand) .command("summarise", summariseCommand) .command("datastore", datastoreCommand) - .command("report", reportCommand); + .command("report", reportCommand) + .command("serve", serveCommand); // Register help command last — needs reference to the fully-built CLI tree cli.command("help", createHelpCommand(cli)); diff --git a/src/serve/connection.ts b/src/serve/connection.ts new file mode 100644 index 00000000..ddcd56a4 --- /dev/null +++ b/src/serve/connection.ts @@ -0,0 +1,375 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Per-WebSocket connection handler. Dispatches incoming requests to libswamp + * operations and streams serialized events back to the client. + */ + +import { z } from "zod"; +import type { RepositoryContext } from "../infrastructure/persistence/repository_factory.ts"; +import type { DatastoreConfig } from "../domain/datastore/datastore_config.ts"; +import { + createLibSwampContext, + modelMethodRun, + workflowRun, +} from "../libswamp/mod.ts"; +import { createModelMethodRunDeps, createWorkflowRunDeps } from "./deps.ts"; +import { serializeEvent } from "./serializer.ts"; +import type { + ModelMethodRunPayload, + ServerMessage, + ServerRequest, + WorkflowRunPayload, +} from "./protocol.ts"; +import { findDefinitionByIdOrName } from "../domain/models/model_lookup.ts"; +import { extractModelReferencesFromWorkflow } from "../domain/workflows/model_reference_extractor.ts"; +import { createWorkflowId } from "../domain/workflows/workflow_id.ts"; +import { acquireModelLocks } from "../cli/repo_context.ts"; +import { getSwampLogger } from "../infrastructure/logging/logger.ts"; + +// ── Zod schemas for incoming WebSocket messages ───────────────────────── + +const WorkflowRunRequestSchema = z.object({ + type: z.literal("workflow.run"), + id: z.string().min(1), + payload: z.object({ + workflowIdOrName: z.string(), + inputs: z.record(z.string(), z.unknown()).optional(), + lastEvaluated: z.boolean().optional(), + driver: z.string().optional(), + verbose: z.boolean().optional(), + runtimeTags: z.record(z.string(), z.string()).optional(), + }), +}); + +const ModelMethodRunRequestSchema = z.object({ + type: z.literal("model.method.run"), + id: z.string().min(1), + payload: z.object({ + modelIdOrName: z.string(), + methodName: z.string(), + inputs: z.record(z.string(), z.unknown()).optional(), + lastEvaluated: z.boolean().optional(), + driver: z.string().optional(), + runtimeTags: z.record(z.string(), z.string()).optional(), + }), +}); + +const CancelRequestSchema = z.object({ + type: z.literal("cancel"), + id: z.string().min(1), +}); + +const ServerRequestSchema = z.discriminatedUnion("type", [ + WorkflowRunRequestSchema, + ModelMethodRunRequestSchema, + CancelRequestSchema, +]); + +/** + * Validates a parsed JSON value against the ServerRequest schema. + * Returns the validated request on success, or a human-readable error string on failure. + */ +export function validateServerRequest( + data: unknown, +): ServerRequest | string { + const result = ServerRequestSchema.safeParse(data); + if (result.success) { + return result.data as ServerRequest; + } + const issues = result.error.issues.map((i) => + `${i.path.join(".")}: ${i.message}` + ).join("; "); + return `Invalid request: ${issues}`; +} + +const logger = getSwampLogger(["serve", "connection"]); + +export interface ConnectionContext { + repoDir: string; + repoContext: RepositoryContext; + datastoreConfig: DatastoreConfig; +} + +export function handleConnection( + socket: WebSocket, + ctx: ConnectionContext, +): void { + const activeRequests = new Map(); + + socket.onmessage = (event) => { + handleMessage(socket, ctx, activeRequests, event); + }; + + socket.onclose = () => { + for (const controller of activeRequests.values()) { + controller.abort(); + } + activeRequests.clear(); + }; + + socket.onerror = (event) => { + logger.warn("WebSocket error: {error}", { + error: event instanceof ErrorEvent ? event.message : "unknown", + }); + }; +} + +/** + * Parse, validate, and dispatch a single incoming WebSocket message. + * Exported for unit testing. + */ +export function handleMessage( + socket: WebSocket, + ctx: ConnectionContext, + activeRequests: Map, + event: MessageEvent, +): void { + let parsed: unknown; + try { + parsed = JSON.parse(event.data as string); + } catch { + sendError(socket, "unknown", "invalid_request", "Invalid JSON"); + return; + } + + const validated = validateServerRequest(parsed); + if (typeof validated === "string") { + sendError(socket, "unknown", "invalid_request", validated); + return; + } + + const request: ServerRequest = validated; + + if (request.type === "cancel") { + const controller = activeRequests.get(request.id); + if (controller) { + controller.abort(); + } + return; + } + + if (activeRequests.has(request.id)) { + sendError( + socket, + request.id, + "duplicate_id", + `Request id '${request.id}' is already active`, + ); + return; + } + + const controller = new AbortController(); + activeRequests.set(request.id, controller); + + const task = request.type === "workflow.run" + ? handleWorkflowRun( + socket, + ctx, + request.id, + request.payload, + controller, + ) + : handleModelMethodRun( + socket, + ctx, + request.id, + request.payload, + controller, + ); + + task.finally(() => activeRequests.delete(request.id)); +} + +async function handleWorkflowRun( + socket: WebSocket, + ctx: ConnectionContext, + requestId: string, + payload: WorkflowRunPayload, + controller: AbortController, +): Promise { + let flushLocks: (() => Promise) | null = null; + + try { + // Pre-lookup workflow for per-model lock acquisition + const workflowRepo = ctx.repoContext.workflowRepo; + const workflow = await workflowRepo.findByName( + payload.workflowIdOrName, + ) ?? await workflowRepo.findById( + createWorkflowId(payload.workflowIdOrName), + ); + + if (workflow) { + const modelRefs = await extractModelReferencesFromWorkflow( + workflow, + workflowRepo, + ); + if (modelRefs !== null && modelRefs.length > 0) { + const resolvedModels: Array<{ modelType: string; modelId: string }> = + []; + for (const ref of modelRefs) { + const result = await findDefinitionByIdOrName( + ctx.repoContext.definitionRepo, + ref, + ); + if (result) { + resolvedModels.push({ + modelType: result.type.normalized, + modelId: result.definition.id, + }); + } + } + if (resolvedModels.length > 0) { + flushLocks = await acquireModelLocks( + ctx.datastoreConfig, + resolvedModels, + ctx.repoDir, + ); + } + } + } + + const deps = createWorkflowRunDeps(ctx.repoDir, ctx.repoContext); + const libCtx = createLibSwampContext({ signal: controller.signal }); + + for await ( + const event of workflowRun(libCtx, deps, { + workflowIdOrName: payload.workflowIdOrName, + inputs: payload.inputs, + lastEvaluated: payload.lastEvaluated, + driver: payload.driver, + verbose: payload.verbose, + runtimeTags: payload.runtimeTags, + }) + ) { + if (socket.readyState !== WebSocket.OPEN) break; + const serialized = serializeEvent( + event as { kind: string; [key: string]: unknown }, + ); + send(socket, { type: "event", id: requestId, event: serialized }); + } + } catch (error) { + if (error instanceof DOMException && error.name === "AbortError") { + sendError(socket, requestId, "cancelled", "Operation was cancelled"); + } else { + const message = error instanceof Error ? error.message : String(error); + sendError(socket, requestId, "workflow_execution_failed", message); + } + } finally { + if (flushLocks) { + try { + await flushLocks(); + } catch (releaseError) { + logger.warn("Failed to release locks: {error}", { + error: releaseError instanceof Error + ? releaseError.message + : String(releaseError), + }); + } + } + } +} + +async function handleModelMethodRun( + socket: WebSocket, + ctx: ConnectionContext, + requestId: string, + payload: ModelMethodRunPayload, + controller: AbortController, +): Promise { + let flushLocks: (() => Promise) | null = null; + + try { + // Pre-lookup for per-model lock acquisition + const preResult = await findDefinitionByIdOrName( + ctx.repoContext.definitionRepo, + payload.modelIdOrName, + ); + if (preResult) { + flushLocks = await acquireModelLocks( + ctx.datastoreConfig, + [{ + modelType: preResult.type.normalized, + modelId: preResult.definition.id, + }], + ctx.repoDir, + ); + } + + const deps = createModelMethodRunDeps(ctx.repoDir, ctx.repoContext); + const libCtx = createLibSwampContext({ signal: controller.signal }); + + for await ( + const event of modelMethodRun(libCtx, deps, { + modelIdOrName: payload.modelIdOrName, + methodName: payload.methodName, + inputs: payload.inputs ?? {}, + lastEvaluated: payload.lastEvaluated ?? false, + runtimeTags: payload.runtimeTags, + driver: payload.driver, + }) + ) { + if (socket.readyState !== WebSocket.OPEN) break; + const serialized = serializeEvent( + event as { kind: string; [key: string]: unknown }, + ); + send(socket, { type: "event", id: requestId, event: serialized }); + } + } catch (error) { + if (error instanceof DOMException && error.name === "AbortError") { + sendError(socket, requestId, "cancelled", "Operation was cancelled"); + } else { + const message = error instanceof Error ? error.message : String(error); + sendError( + socket, + requestId, + "method_execution_failed", + message, + ); + } + } finally { + if (flushLocks) { + try { + await flushLocks(); + } catch (releaseError) { + logger.warn("Failed to release locks: {error}", { + error: releaseError instanceof Error + ? releaseError.message + : String(releaseError), + }); + } + } + } +} + +function send(socket: WebSocket, message: ServerMessage): void { + if (socket.readyState === WebSocket.OPEN) { + socket.send(JSON.stringify(message)); + } +} + +function sendError( + socket: WebSocket, + id: string, + code: string, + message: string, +): void { + send(socket, { type: "error", id, error: { code, message } }); +} diff --git a/src/serve/connection_test.ts b/src/serve/connection_test.ts new file mode 100644 index 00000000..05cef0f9 --- /dev/null +++ b/src/serve/connection_test.ts @@ -0,0 +1,236 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import { handleMessage, validateServerRequest } from "./connection.ts"; +import type { ConnectionContext } from "./connection.ts"; +import { initializeLogging } from "../infrastructure/logging/logger.ts"; + +await initializeLogging({}); + +// ── Mock WebSocket ────────────────────────────────────────────────────── + +interface MockSocket { + sent: string[]; + closed: boolean; + readyState: number; + send(data: string): void; + close(): void; +} + +function createMockSocket(): MockSocket { + return { + sent: [], + closed: false, + readyState: WebSocket.OPEN, + send(data: string) { + this.sent.push(data); + }, + close() { + this.closed = true; + }, + }; +} + +function parseSent(mock: MockSocket, index = 0): Record { + return JSON.parse(mock.sent[index]); +} + +// Stub ConnectionContext — handleMessage only needs it for dispatch, and +// workflow/model handlers won't be reached in validation-level tests. +const stubCtx = {} as ConnectionContext; + +function makeEvent(data: string): MessageEvent { + return new MessageEvent("message", { data }); +} + +// ── validateServerRequest ─────────────────────────────────────────────── + +Deno.test("validateServerRequest accepts a valid workflow.run request", () => { + const input = { + type: "workflow.run", + id: "req-1", + payload: { workflowIdOrName: "deploy" }, + }; + const result = validateServerRequest(input); + assertEquals(typeof result, "object"); +}); + +Deno.test("validateServerRequest accepts a valid model.method.run request", () => { + const input = { + type: "model.method.run", + id: "req-2", + payload: { modelIdOrName: "my-model", methodName: "start" }, + }; + const result = validateServerRequest(input); + assertEquals(typeof result, "object"); +}); + +Deno.test("validateServerRequest accepts a valid cancel request", () => { + const input = { type: "cancel", id: "req-3" }; + const result = validateServerRequest(input); + assertEquals(typeof result, "object"); +}); + +Deno.test("validateServerRequest rejects unknown type", () => { + const input = { type: "unknown.type", id: "req-4" }; + const result = validateServerRequest(input); + assertEquals(typeof result, "string"); +}); + +Deno.test("validateServerRequest rejects empty id", () => { + const input = { type: "cancel", id: "" }; + const result = validateServerRequest(input); + assertEquals(typeof result, "string"); +}); + +Deno.test("validateServerRequest rejects missing payload for workflow.run", () => { + const input = { type: "workflow.run", id: "req-5" }; + const result = validateServerRequest(input); + assertEquals(typeof result, "string"); +}); + +Deno.test("validateServerRequest rejects missing methodName for model.method.run", () => { + const input = { + type: "model.method.run", + id: "req-6", + payload: { modelIdOrName: "m" }, + }; + const result = validateServerRequest(input); + assertEquals(typeof result, "string"); +}); + +// ── handleMessage: invalid JSON ───────────────────────────────────────── + +Deno.test("handleMessage sends error for invalid JSON", () => { + const mock = createMockSocket(); + const active = new Map(); + + handleMessage( + mock as unknown as WebSocket, + stubCtx, + active, + makeEvent("not json{{{"), + ); + + assertEquals(mock.sent.length, 1); + const msg = parseSent(mock); + assertEquals(msg.type, "error"); + assertEquals((msg.error as Record).code, "invalid_request"); +}); + +// ── handleMessage: validation failure ─────────────────────────────────── + +Deno.test("handleMessage sends error for invalid request shape", () => { + const mock = createMockSocket(); + const active = new Map(); + + handleMessage( + mock as unknown as WebSocket, + stubCtx, + active, + makeEvent(JSON.stringify({ type: "bad", id: "x" })), + ); + + assertEquals(mock.sent.length, 1); + const msg = parseSent(mock); + assertEquals(msg.type, "error"); + assertEquals((msg.error as Record).code, "invalid_request"); +}); + +// ── handleMessage: cancel ─────────────────────────────────────────────── + +Deno.test("handleMessage cancel aborts the matching controller", () => { + const mock = createMockSocket(); + const controller = new AbortController(); + const active = new Map([["req-10", controller]]); + + handleMessage( + mock as unknown as WebSocket, + stubCtx, + active, + makeEvent(JSON.stringify({ type: "cancel", id: "req-10" })), + ); + + assertEquals(controller.signal.aborted, true); + // Cancel does not send a response + assertEquals(mock.sent.length, 0); +}); + +Deno.test("handleMessage cancel for unknown id is a no-op", () => { + const mock = createMockSocket(); + const active = new Map(); + + handleMessage( + mock as unknown as WebSocket, + stubCtx, + active, + makeEvent(JSON.stringify({ type: "cancel", id: "nonexistent" })), + ); + + assertEquals(mock.sent.length, 0); +}); + +// ── handleMessage: duplicate request ID ───────────────────────────────── + +Deno.test("handleMessage rejects duplicate request ID", () => { + const mock = createMockSocket(); + const active = new Map([ + ["dup-1", new AbortController()], + ]); + + handleMessage( + mock as unknown as WebSocket, + stubCtx, + active, + makeEvent(JSON.stringify({ + type: "workflow.run", + id: "dup-1", + payload: { workflowIdOrName: "w" }, + })), + ); + + assertEquals(mock.sent.length, 1); + const msg = parseSent(mock); + assertEquals(msg.type, "error"); + assertEquals((msg.error as Record).code, "duplicate_id"); +}); + +// ── handleMessage: unknown type not leaked ────────────────────────────── + +Deno.test("handleMessage does not leak unknown type value in error", () => { + const mock = createMockSocket(); + const active = new Map(); + + handleMessage( + mock as unknown as WebSocket, + stubCtx, + active, + makeEvent(JSON.stringify({ type: "secret.op", id: "x" })), + ); + + assertEquals(mock.sent.length, 1); + const msg = parseSent(mock); + assertEquals(msg.type, "error"); + // The error message should NOT contain the actual type value + const errorMessage = String( + (msg.error as Record).message, + ); + assertEquals(errorMessage.includes("secret.op"), false); +}); diff --git a/src/serve/deps.ts b/src/serve/deps.ts new file mode 100644 index 00000000..02fd264c --- /dev/null +++ b/src/serve/deps.ts @@ -0,0 +1,110 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Factories that construct WorkflowRunDeps and ModelMethodRunDeps from a + * RepositoryContext. These mirror the patterns in the CLI commands but are + * decoupled from Cliffy options parsing. + */ + +import { join } from "@std/path"; +import type { RepositoryContext } from "../infrastructure/persistence/repository_factory.ts"; +import type { ModelMethodRunDeps, WorkflowRunDeps } from "../libswamp/mod.ts"; +import { WorkflowExecutionService } from "../domain/workflows/execution_service.ts"; +import { createWorkflowId } from "../domain/workflows/workflow_id.ts"; +import { findDefinitionByIdOrName } from "../domain/models/model_lookup.ts"; +import { resolveModelType } from "../domain/extensions/extension_auto_resolver.ts"; +import { getAutoResolver } from "../domain/extensions/auto_resolver_context.ts"; +import { DefaultMethodExecutionService } from "../domain/models/method_execution_service.ts"; +import { VaultService } from "../domain/vaults/vault_service.ts"; +import { ExpressionEvaluationService } from "../domain/expressions/expression_evaluation_service.ts"; +import { runFileSink } from "../infrastructure/logging/logger.ts"; +import { + SWAMP_SUBDIRS, + swampPath, +} from "../infrastructure/persistence/paths.ts"; +import { SecretRedactor } from "../domain/secrets/mod.ts"; + +export function createWorkflowRunDeps( + repoDir: string, + repoContext: RepositoryContext, +): WorkflowRunDeps { + return { + workflowRepo: repoContext.workflowRepo, + runRepo: repoContext.workflowRunRepo, + repoDir, + lookupWorkflow: async (repo, idOrName) => { + return await repo.findByName(idOrName) ?? + await repo.findById(createWorkflowId(idOrName)); + }, + createExecutionService: (wfRepo, rnRepo, dir) => + new WorkflowExecutionService(wfRepo, rnRepo, dir), + dataRepo: repoContext.unifiedDataRepo, + definitionRepo: repoContext.definitionRepo, + }; +} + +export function createModelMethodRunDeps( + repoDir: string, + repoContext: RepositoryContext, +): ModelMethodRunDeps { + return { + repoDir, + lookupDefinition: (idOrName) => + findDefinitionByIdOrName(repoContext.definitionRepo, idOrName), + getModelDef: (type) => resolveModelType(type, getAutoResolver()), + createEvaluationService: () => + new ExpressionEvaluationService( + repoContext.definitionRepo, + repoDir, + { dataRepo: repoContext.unifiedDataRepo }, + ), + loadEvaluatedDefinition: (type, name) => + repoContext.evaluatedDefinitionRepo.findByName(type, name), + saveEvaluatedDefinition: (type, definition) => + repoContext.evaluatedDefinitionRepo.save(type, definition), + createExecutionService: () => new DefaultMethodExecutionService(), + createVaultService: () => VaultService.fromRepository(repoDir), + dataRepo: repoContext.unifiedDataRepo, + definitionRepo: repoContext.definitionRepo, + outputRepo: repoContext.outputRepo, + createRunLog: async (modelType, method, definitionId) => { + const redactor = new SecretRedactor(); + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const logFilePath = join( + swampPath(repoDir, SWAMP_SUBDIRS.outputs), + modelType.normalized, + method, + `${definitionId}-${timestamp}.log`, + ); + const logCategory: string[] = []; + await runFileSink.register( + logCategory, + logFilePath, + redactor, + swampPath(repoDir), + ); + return { + logFilePath, + redactor, + cleanup: () => runFileSink.unregister(logCategory), + }; + }, + }; +} diff --git a/src/serve/mod.ts b/src/serve/mod.ts new file mode 100644 index 00000000..97115df7 --- /dev/null +++ b/src/serve/mod.ts @@ -0,0 +1,30 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +export type { + ModelMethodRunPayload, + SerializedError, + SerializedEvent, + ServerMessage, + ServerRequest, + WorkflowRunPayload, +} from "./protocol.ts"; +export { serializeEvent, serializeSwampError } from "./serializer.ts"; +export { type ConnectionContext, handleConnection } from "./connection.ts"; +export { createModelMethodRunDeps, createWorkflowRunDeps } from "./deps.ts"; diff --git a/src/serve/protocol.ts b/src/serve/protocol.ts new file mode 100644 index 00000000..8f7fd3b9 --- /dev/null +++ b/src/serve/protocol.ts @@ -0,0 +1,70 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Wire protocol types for the swamp WebSocket API. + * + * Clients send ServerRequest messages, the server streams back ServerMessage + * events. Each request carries a client-assigned `id` that the server echoes + * on every response, enabling multiplexed operations on a single socket. + */ + +// ── Inbound (client → server) ──────────────────────────────────────────── + +export interface WorkflowRunPayload { + workflowIdOrName: string; + inputs?: Record; + lastEvaluated?: boolean; + driver?: string; + verbose?: boolean; + runtimeTags?: Record; +} + +export interface ModelMethodRunPayload { + modelIdOrName: string; + methodName: string; + inputs?: Record; + lastEvaluated?: boolean; + driver?: string; + runtimeTags?: Record; +} + +export type ServerRequest = + | { type: "workflow.run"; id: string; payload: WorkflowRunPayload } + | { type: "model.method.run"; id: string; payload: ModelMethodRunPayload } + | { type: "cancel"; id: string }; + +// ── Outbound (server → client) ─────────────────────────────────────────── + +/** A JSON-safe event from a libswamp async generator. */ +export interface SerializedEvent { + kind: string; + [key: string]: unknown; +} + +/** Error details sent to the client. */ +export interface SerializedError { + code: string; + message: string; + details?: unknown; +} + +export type ServerMessage = + | { type: "event"; id: string; event: SerializedEvent } + | { type: "error"; id: string; error: SerializedError }; diff --git a/src/serve/protocol_test.ts b/src/serve/protocol_test.ts new file mode 100644 index 00000000..4a4c554b --- /dev/null +++ b/src/serve/protocol_test.ts @@ -0,0 +1,208 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import type { + ModelMethodRunPayload, + SerializedError, + SerializedEvent, + ServerMessage, + ServerRequest, + WorkflowRunPayload, +} from "./protocol.ts"; + +// ── WorkflowRunPayload ────────────────────────────────────────────────── + +Deno.test("WorkflowRunPayload - minimal payload", () => { + const payload: WorkflowRunPayload = { + workflowIdOrName: "deploy", + }; + assertEquals(payload.workflowIdOrName, "deploy"); + assertEquals(payload.inputs, undefined); + assertEquals(payload.verbose, undefined); +}); + +Deno.test("WorkflowRunPayload - full payload", () => { + const payload: WorkflowRunPayload = { + workflowIdOrName: "deploy", + inputs: { env: "prod" }, + lastEvaluated: true, + driver: "docker", + verbose: true, + runtimeTags: { region: "us-east-1" }, + }; + assertEquals(payload.workflowIdOrName, "deploy"); + assertEquals(payload.inputs, { env: "prod" }); + assertEquals(payload.lastEvaluated, true); + assertEquals(payload.driver, "docker"); + assertEquals(payload.verbose, true); + assertEquals(payload.runtimeTags, { region: "us-east-1" }); +}); + +// ── ModelMethodRunPayload ─────────────────────────────────────────────── + +Deno.test("ModelMethodRunPayload - minimal payload", () => { + const payload: ModelMethodRunPayload = { + modelIdOrName: "my-server", + methodName: "start", + }; + assertEquals(payload.modelIdOrName, "my-server"); + assertEquals(payload.methodName, "start"); +}); + +Deno.test("ModelMethodRunPayload - full payload", () => { + const payload: ModelMethodRunPayload = { + modelIdOrName: "my-server", + methodName: "start", + inputs: { force: true }, + lastEvaluated: false, + driver: "shell", + runtimeTags: { tier: "staging" }, + }; + assertEquals(payload.inputs, { force: true }); + assertEquals(payload.driver, "shell"); +}); + +// ── ServerRequest discriminated union ─────────────────────────────────── + +Deno.test("ServerRequest - workflow.run variant", () => { + const req: ServerRequest = { + type: "workflow.run", + id: "req-1", + payload: { workflowIdOrName: "deploy" }, + }; + assertEquals(req.type, "workflow.run"); + assertEquals(req.id, "req-1"); + if (req.type === "workflow.run") { + assertEquals(req.payload.workflowIdOrName, "deploy"); + } +}); + +Deno.test("ServerRequest - model.method.run variant", () => { + const req: ServerRequest = { + type: "model.method.run", + id: "req-2", + payload: { modelIdOrName: "db", methodName: "migrate" }, + }; + assertEquals(req.type, "model.method.run"); + if (req.type === "model.method.run") { + assertEquals(req.payload.modelIdOrName, "db"); + assertEquals(req.payload.methodName, "migrate"); + } +}); + +Deno.test("ServerRequest - cancel variant", () => { + const req: ServerRequest = { + type: "cancel", + id: "req-3", + }; + assertEquals(req.type, "cancel"); + assertEquals(req.id, "req-3"); +}); + +// ── SerializedEvent / SerializedError ─────────────────────────────────── + +Deno.test("SerializedEvent - allows arbitrary extra properties", () => { + const event: SerializedEvent = { + kind: "step_completed", + jobId: "j1", + stepId: "s1", + }; + assertEquals(event.kind, "step_completed"); + assertEquals(event.jobId, "j1"); +}); + +Deno.test("SerializedError - with and without details", () => { + const withDetails: SerializedError = { + code: "validation_error", + message: "Bad input", + details: { field: "name" }, + }; + assertEquals(withDetails.details, { field: "name" }); + + const withoutDetails: SerializedError = { + code: "not_found", + message: "Not found", + }; + assertEquals(withoutDetails.details, undefined); +}); + +// ── ServerMessage discriminated union ─────────────────────────────────── + +Deno.test("ServerMessage - event variant", () => { + const msg: ServerMessage = { + type: "event", + id: "req-1", + event: { kind: "started", runId: "r1" }, + }; + assertEquals(msg.type, "event"); + if (msg.type === "event") { + assertEquals(msg.event.kind, "started"); + } +}); + +Deno.test("ServerMessage - error variant", () => { + const msg: ServerMessage = { + type: "error", + id: "req-1", + error: { code: "internal", message: "Server error" }, + }; + assertEquals(msg.type, "error"); + if (msg.type === "error") { + assertEquals(msg.error.code, "internal"); + } +}); + +Deno.test("ServerMessage - discriminated union narrows correctly", () => { + const messages: ServerMessage[] = [ + { + type: "event", + id: "1", + event: { kind: "completed", run: {} }, + }, + { + type: "error", + id: "2", + error: { code: "cancelled", message: "Cancelled" }, + }, + ]; + + const types = messages.map((m) => m.type); + assertEquals(types, ["event", "error"]); +}); + +Deno.test("ServerRequest - round-trips through JSON", () => { + const req: ServerRequest = { + type: "workflow.run", + id: "req-42", + payload: { + workflowIdOrName: "build", + inputs: { version: "1.0.0" }, + verbose: true, + }, + }; + const json = JSON.stringify(req); + const parsed = JSON.parse(json) as ServerRequest; + assertEquals(parsed.type, "workflow.run"); + assertEquals(parsed.id, "req-42"); + if (parsed.type === "workflow.run") { + assertEquals(parsed.payload.workflowIdOrName, "build"); + assertEquals(parsed.payload.inputs, { version: "1.0.0" }); + } +}); diff --git a/src/serve/serializer.ts b/src/serve/serializer.ts new file mode 100644 index 00000000..d819dc6e --- /dev/null +++ b/src/serve/serializer.ts @@ -0,0 +1,75 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +/** + * Converts libswamp events to JSON-safe objects for WebSocket transmission. + */ + +import type { SwampError } from "../libswamp/mod.ts"; +import type { SerializedError, SerializedEvent } from "./protocol.ts"; + +/** + * Serializes a libswamp event (WorkflowRunEvent or ModelMethodRunEvent) into + * a JSON-safe object. Handles non-serializable values like Error instances. + */ +export function serializeEvent( + event: { kind: string; [key: string]: unknown }, +): SerializedEvent { + if (event.kind === "error") { + const swampError = event.error as SwampError; + return { + kind: "error", + error: serializeSwampError(swampError), + }; + } + + // For all other events, do a JSON-safe clone that handles Error instances + return jsonSafeClone(event) as SerializedEvent; +} + +/** + * Serializes a SwampError into a JSON-safe error object. + */ +export function serializeSwampError(error: SwampError): SerializedError { + return { + code: error.code, + message: error.message, + ...(error.details !== undefined && { details: error.details }), + }; +} + +/** + * Deep-clones an object, converting Error instances to plain objects. + */ +function jsonSafeClone(value: unknown): unknown { + if (value === null || value === undefined) return value; + if (value instanceof Error) { + return { message: value.message, stack: value.stack }; + } + if (value instanceof Date) return value.toISOString(); + if (Array.isArray(value)) return value.map(jsonSafeClone); + if (typeof value === "object") { + const result: Record = {}; + for (const [k, v] of Object.entries(value)) { + result[k] = jsonSafeClone(v); + } + return result; + } + return value; +} diff --git a/src/serve/serializer_test.ts b/src/serve/serializer_test.ts new file mode 100644 index 00000000..22f27386 --- /dev/null +++ b/src/serve/serializer_test.ts @@ -0,0 +1,195 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals, assertExists } from "@std/assert"; +import { serializeEvent, serializeSwampError } from "./serializer.ts"; +import type { SwampError } from "../libswamp/mod.ts"; + +// ── serializeSwampError ───────────────────────────────────────────────── + +Deno.test("serializeSwampError - basic error with code and message", () => { + const err: SwampError = { + code: "not_found", + message: "Model not found", + }; + const result = serializeSwampError(err); + assertEquals(result, { code: "not_found", message: "Model not found" }); +}); + +Deno.test("serializeSwampError - includes details when present", () => { + const err: SwampError = { + code: "validation_error", + message: "Invalid inputs", + details: { field: "name", reason: "required" }, + }; + const result = serializeSwampError(err); + assertEquals(result, { + code: "validation_error", + message: "Invalid inputs", + details: { field: "name", reason: "required" }, + }); +}); + +Deno.test("serializeSwampError - omits details when undefined", () => { + const err: SwampError = { + code: "cancelled", + message: "Operation cancelled", + details: undefined, + }; + const result = serializeSwampError(err); + assertEquals(result, { code: "cancelled", message: "Operation cancelled" }); + assertEquals("details" in result, false); +}); + +Deno.test("serializeSwampError - does not include cause (non-serializable)", () => { + const err: SwampError = { + code: "network", + message: "Connection refused", + cause: new Error("ECONNREFUSED"), + }; + const result = serializeSwampError(err); + assertEquals(result, { code: "network", message: "Connection refused" }); + assertEquals("cause" in result, false); +}); + +// ── serializeEvent with error kind ────────────────────────────────────── + +Deno.test("serializeEvent - error event serializes the SwampError", () => { + const swampError: SwampError = { + code: "not_authenticated", + message: "Not authenticated", + }; + const result = serializeEvent({ kind: "error", error: swampError }); + assertEquals(result, { + kind: "error", + error: { code: "not_authenticated", message: "Not authenticated" }, + }); +}); + +Deno.test("serializeEvent - error event with details", () => { + const swampError: SwampError = { + code: "validation_error", + message: "Bad input", + details: { missing: ["name"] }, + }; + const result = serializeEvent({ kind: "error", error: swampError }); + assertEquals(result, { + kind: "error", + error: { + code: "validation_error", + message: "Bad input", + details: { missing: ["name"] }, + }, + }); +}); + +// ── serializeEvent with non-error kinds (jsonSafeClone) ───────────────── + +Deno.test("serializeEvent - simple event passes through", () => { + const event = { kind: "started", runId: "abc-123", workflowName: "deploy" }; + const result = serializeEvent(event); + assertEquals(result, { + kind: "started", + runId: "abc-123", + workflowName: "deploy", + }); +}); + +Deno.test("serializeEvent - preserves null and undefined values", () => { + const event = { kind: "test", a: null, b: undefined }; + const result = serializeEvent(event); + assertEquals((result as Record).a, null); + assertEquals((result as Record).b, undefined); +}); + +Deno.test("serializeEvent - converts Date to ISO string", () => { + const date = new Date("2026-03-27T12:00:00.000Z"); + const event = { kind: "test", timestamp: date }; + const result = serializeEvent(event); + assertEquals( + (result as Record).timestamp, + "2026-03-27T12:00:00.000Z", + ); +}); + +Deno.test("serializeEvent - converts Error instances to plain objects", () => { + const error = new Error("something broke"); + const event = { kind: "test", nested: { err: error } }; + const result = serializeEvent(event); + const nested = (result as Record).nested as Record< + string, + unknown + >; + assertEquals(nested.err !== null && typeof nested.err === "object", true); + const errObj = nested.err as Record; + assertEquals(errObj.message, "something broke"); + assertExists(errObj.stack); +}); + +Deno.test("serializeEvent - handles arrays", () => { + const event = { kind: "test", items: [1, "two", null] }; + const result = serializeEvent(event); + assertEquals((result as Record).items, [1, "two", null]); +}); + +Deno.test("serializeEvent - handles nested objects", () => { + const event = { + kind: "completed", + run: { + id: "r1", + status: "succeeded", + jobs: [{ name: "build", steps: [] }], + }, + }; + const result = serializeEvent(event); + assertEquals(result, event); +}); + +Deno.test("serializeEvent - handles array with mixed types including Date and Error", () => { + const date = new Date("2026-01-01T00:00:00.000Z"); + const error = new Error("fail"); + const event = { kind: "test", mixed: [date, error, 42, "ok"] }; + const result = serializeEvent(event); + const mixed = (result as Record).mixed as unknown[]; + assertEquals(mixed[0], "2026-01-01T00:00:00.000Z"); + assertEquals((mixed[1] as Record).message, "fail"); + assertEquals(mixed[2], 42); + assertEquals(mixed[3], "ok"); +}); + +Deno.test("serializeEvent - primitives pass through unchanged", () => { + const event = { + kind: "test", + num: 42, + str: "hello", + bool: true, + }; + const result = serializeEvent(event); + assertEquals(result, event); +}); + +Deno.test("serializeEvent - deep clone does not mutate original", () => { + const inner = { value: "original" }; + const event = { kind: "test", data: inner }; + const result = serializeEvent(event); + ( + (result as Record).data as Record + ).value = "modified"; + assertEquals(inner.value, "original"); +});