diff --git a/deno.json b/deno.json
index ff745ced..13305469 100644
--- a/deno.json
+++ b/deno.json
@@ -3,6 +3,7 @@
"version": "0.1.0",
"license": "AGPL-3.0-only",
"exports": "./main.ts",
+ "workspace": ["packages/client"],
"tasks": {
"dev": "deno run --unstable-bundle --allow-read --allow-write --allow-env --allow-run --allow-sys main.ts",
"test": "deno test --unstable-bundle --allow-read --allow-write --allow-env --allow-run --allow-net --allow-sys",
diff --git a/packages/client/client.ts b/packages/client/client.ts
new file mode 100644
index 00000000..b23e8c67
--- /dev/null
+++ b/packages/client/client.ts
@@ -0,0 +1,346 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and\/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * SwampClient — typed WebSocket client for the swamp serve API.
+ *
+ * Provides both callback-based and AsyncIterable-based consumption of
+ * workflow and model method execution event streams.
+ */
+
+import type {
+ ModelMethodRunEvent,
+ ModelMethodRunPayload,
+ ModelMethodRunView,
+ ServerMessage,
+ ServerRequest,
+ WorkflowRunEvent,
+ WorkflowRunPayload,
+ WorkflowRunView,
+} from "./protocol.ts";
+import {
+ type EventHandlers,
+ SwampClientError,
+ withDefaults,
+} from "./stream.ts";
+
+interface PendingRequest {
+ resolve: (value: T) => void;
+ reject: (error: Error) => void;
+ // deno-lint-ignore no-explicit-any
+ handlers: EventHandlers;
+ // deno-lint-ignore no-explicit-any
+ queue?: AsyncIterableQueue;
+}
+
+export class SwampClient {
+ private url: string;
+ private socket: WebSocket | null = null;
+ // deno-lint-ignore no-explicit-any
+ private pending = new Map>();
+ private connectPromise: Promise | null = null;
+
+ constructor(url: string) {
+ this.url = url;
+ }
+
+ /**
+ * Opens the WebSocket connection. Resolves when the connection is ready.
+ * Safe to call multiple times — returns the same promise if already connecting.
+ */
+ connect(): Promise {
+ if (this.socket?.readyState === WebSocket.OPEN) {
+ return Promise.resolve();
+ }
+ if (this.connectPromise) return this.connectPromise;
+
+ this.connectPromise = new Promise((resolve, reject) => {
+ const socket = new WebSocket(this.url);
+
+ socket.onopen = () => {
+ this.socket = socket;
+ this.connectPromise = null;
+ resolve();
+ };
+
+ socket.onerror = (event) => {
+ this.connectPromise = null;
+ const msg = event instanceof ErrorEvent
+ ? event.message
+ : "WebSocket connection failed";
+ reject(new Error(msg));
+ };
+
+ socket.onmessage = (event) => {
+ this.handleMessage(event.data as string);
+ };
+
+ socket.onclose = () => {
+ this.socket = null;
+ this.connectPromise = null;
+ // Reject all pending requests
+ for (const [id, pending] of this.pending) {
+ pending.reject(new Error("WebSocket closed"));
+ this.pending.delete(id);
+ }
+ };
+ });
+
+ return this.connectPromise;
+ }
+
+ /** Closes the WebSocket connection. */
+ close(): void {
+ this.socket?.close();
+ this.socket = null;
+ }
+
+ /**
+ * Runs a workflow and returns the completed event payload.
+ * Optionally dispatches events to partial handlers as they arrive.
+ */
+ async workflowRun(
+ payload: WorkflowRunPayload,
+ handlers?: Partial>,
+ ): Promise {
+ await this.connect();
+ const id = crypto.randomUUID();
+ const fullHandlers = handlers
+ ? withDefaults(handlers)
+ : withDefaults({});
+
+ return new Promise((resolve, reject) => {
+ this.pending.set(id, { resolve, reject, handlers: fullHandlers });
+ this.send({
+ type: "workflow.run",
+ id,
+ payload,
+ });
+ });
+ }
+
+ /**
+ * Runs a model method and returns the completed event payload.
+ * Optionally dispatches events to partial handlers as they arrive.
+ */
+ async modelMethodRun(
+ payload: ModelMethodRunPayload,
+ handlers?: Partial>,
+ ): Promise {
+ await this.connect();
+ const id = crypto.randomUUID();
+ const fullHandlers = handlers
+ ? withDefaults(handlers)
+ : withDefaults({});
+
+ return new Promise((resolve, reject) => {
+ this.pending.set(id, { resolve, reject, handlers: fullHandlers });
+ this.send({
+ type: "model.method.run",
+ id,
+ payload,
+ });
+ });
+ }
+
+ /**
+ * Returns an AsyncIterable of workflow run events.
+ */
+ async *workflowRunStream(
+ payload: WorkflowRunPayload,
+ ): AsyncGenerator {
+ await this.connect();
+ const id = crypto.randomUUID();
+ const queue = new AsyncIterableQueue();
+
+ this.pending.set(id, {
+ resolve: () => {},
+ reject: (err) => queue.error(err),
+ handlers: withDefaults({}, (event) => {
+ queue.push(event);
+ if (event.kind === "completed" || event.kind === "error") {
+ queue.done();
+ }
+ }),
+ queue,
+ });
+
+ this.send({ type: "workflow.run", id, payload });
+
+ try {
+ yield* queue;
+ } finally {
+ this.pending.delete(id);
+ }
+ }
+
+ /**
+ * Returns an AsyncIterable of model method run events.
+ */
+ async *modelMethodRunStream(
+ payload: ModelMethodRunPayload,
+ ): AsyncGenerator {
+ await this.connect();
+ const id = crypto.randomUUID();
+ const queue = new AsyncIterableQueue();
+
+ this.pending.set(id, {
+ resolve: () => {},
+ reject: (err) => queue.error(err),
+ handlers: withDefaults({}, (event) => {
+ queue.push(event);
+ if (event.kind === "completed" || event.kind === "error") {
+ queue.done();
+ }
+ }),
+ queue,
+ });
+
+ this.send({ type: "model.method.run", id, payload });
+
+ try {
+ yield* queue;
+ } finally {
+ this.pending.delete(id);
+ }
+ }
+
+ /** Cancels a running operation by its request id. */
+ cancel(id: string): void {
+ this.send({ type: "cancel", id });
+ }
+
+ private handleMessage(data: string): void {
+ let msg: ServerMessage;
+ try {
+ msg = JSON.parse(data) as ServerMessage;
+ } catch {
+ return;
+ }
+
+ const pending = this.pending.get(msg.id);
+ if (!pending) return;
+
+ if (msg.type === "error") {
+ const err = new SwampClientError(
+ msg.error.code,
+ msg.error.message,
+ msg.error.details,
+ );
+ this.pending.delete(msg.id);
+ pending.reject(err);
+ return;
+ }
+
+ if (msg.type === "event") {
+ const event = msg.event;
+
+ // Dispatch to handler
+ const handler = pending.handlers[event.kind];
+ if (handler) {
+ handler(event);
+ }
+
+ // Resolve/reject on terminal events
+ if (event.kind === "completed") {
+ this.pending.delete(msg.id);
+ pending.resolve(event.run);
+ } else if (event.kind === "error") {
+ this.pending.delete(msg.id);
+ // deno-lint-ignore no-explicit-any
+ const swampError = event.error as any;
+ pending.reject(
+ new SwampClientError(
+ swampError?.code ?? "unknown",
+ swampError?.message ?? "Unknown error",
+ swampError?.details,
+ ),
+ );
+ }
+ }
+ }
+
+ private send(request: ServerRequest): void {
+ if (!this.socket || this.socket.readyState !== WebSocket.OPEN) {
+ throw new Error("WebSocket is not connected");
+ }
+ this.socket.send(JSON.stringify(request));
+ }
+}
+
+/**
+ * Simple async iterable queue for bridging push-based WebSocket events
+ * into a pull-based AsyncGenerator.
+ */
+class AsyncIterableQueue implements AsyncIterable {
+ private buffer: T[] = [];
+ private resolve: ((value: IteratorResult) => void) | null = null;
+ private finished = false;
+ private err: Error | null = null;
+
+ push(value: T): void {
+ if (this.finished) return;
+ if (this.resolve) {
+ const r = this.resolve;
+ this.resolve = null;
+ r({ value, done: false });
+ } else {
+ this.buffer.push(value);
+ }
+ }
+
+ done(): void {
+ this.finished = true;
+ if (this.resolve) {
+ const r = this.resolve;
+ this.resolve = null;
+ r({ value: undefined as unknown as T, done: true });
+ }
+ }
+
+ error(err: Error): void {
+ this.err = err;
+ this.finished = true;
+ if (this.resolve) {
+ const r = this.resolve;
+ this.resolve = null;
+ r({ value: undefined as unknown as T, done: true });
+ }
+ }
+
+ [Symbol.asyncIterator](): AsyncIterator {
+ return {
+ next: (): Promise> => {
+ if (this.err) return Promise.reject(this.err);
+ if (this.buffer.length > 0) {
+ return Promise.resolve({ value: this.buffer.shift()!, done: false });
+ }
+ if (this.finished) {
+ return Promise.resolve({
+ value: undefined as unknown as T,
+ done: true,
+ });
+ }
+ return new Promise>((resolve) => {
+ this.resolve = resolve;
+ });
+ },
+ };
+ }
+}
diff --git a/packages/client/client_test.ts b/packages/client/client_test.ts
new file mode 100644
index 00000000..d3813bee
--- /dev/null
+++ b/packages/client/client_test.ts
@@ -0,0 +1,319 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { assertEquals, assertRejects } from "@std/assert";
+import { SwampClient } from "./client.ts";
+import { SwampClientError } from "./stream.ts";
+
+// ── Constructor ─────────────────────────────────────────────────────────
+
+Deno.test("SwampClient - constructor accepts a URL", () => {
+ const client = new SwampClient("ws://localhost:9876/ws");
+ // No error thrown — client is created in disconnected state
+ client.close(); // safe to call even when not connected
+});
+
+Deno.test("SwampClient - close is idempotent when not connected", () => {
+ const client = new SwampClient("ws://localhost:9876/ws");
+ client.close();
+ client.close();
+ // No error — close on a null socket is a no-op
+});
+
+// ── Integration with a local WebSocket server ───────────────────────────
+
+Deno.test("SwampClient - connect and close lifecycle", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onopen = () => {};
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ await client.connect();
+ // Calling connect again should return immediately (already open)
+ await client.connect();
+
+ client.close();
+ await server.shutdown();
+});
+
+Deno.test("SwampClient - connect rejects on refused connection", async () => {
+ // Port 1 is almost certainly not listening
+ const client = new SwampClient("ws://127.0.0.1:1/ws");
+
+ await assertRejects(
+ () => client.connect(),
+ Error,
+ );
+});
+
+Deno.test("SwampClient - workflowRun receives completed event", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onmessage = (e) => {
+ const msg = JSON.parse(e.data as string);
+ // Echo back a completed event
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: {
+ kind: "completed",
+ run: {
+ id: "run-1",
+ workflowId: "wf-1",
+ workflowName: "test-wf",
+ status: "succeeded",
+ jobs: [],
+ },
+ },
+ }));
+ };
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ const result = await client.workflowRun({
+ workflowIdOrName: "test-wf",
+ });
+ assertEquals(result.workflowName, "test-wf");
+ assertEquals(result.status, "succeeded");
+
+ client.close();
+ await server.shutdown();
+});
+
+Deno.test("SwampClient - workflowRun dispatches intermediate events to handlers", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onmessage = (e) => {
+ const msg = JSON.parse(e.data as string);
+ // Send a sequence of events
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: { kind: "validating_inputs" },
+ }));
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: { kind: "evaluating_workflow" },
+ }));
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: {
+ kind: "completed",
+ run: {
+ id: "r1",
+ workflowId: "w1",
+ workflowName: "wf",
+ status: "succeeded",
+ jobs: [],
+ },
+ },
+ }));
+ };
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ const seen: string[] = [];
+ await client.workflowRun(
+ { workflowIdOrName: "wf" },
+ {
+ validating_inputs: () => {
+ seen.push("validating");
+ },
+ evaluating_workflow: () => {
+ seen.push("evaluating");
+ },
+ },
+ );
+
+ assertEquals(seen, ["validating", "evaluating"]);
+
+ client.close();
+ await server.shutdown();
+});
+
+Deno.test("SwampClient - server error message rejects with SwampClientError", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onmessage = (e) => {
+ const msg = JSON.parse(e.data as string);
+ socket.send(JSON.stringify({
+ type: "error",
+ id: msg.id,
+ error: {
+ code: "not_found",
+ message: "Workflow not found",
+ },
+ }));
+ };
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ const err = await assertRejects(
+ () => client.workflowRun({ workflowIdOrName: "missing" }),
+ SwampClientError,
+ "Workflow not found",
+ );
+ assertEquals((err as SwampClientError).code, "not_found");
+
+ client.close();
+ await server.shutdown();
+});
+
+Deno.test("SwampClient - error event in stream rejects with SwampClientError", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onmessage = (e) => {
+ const msg = JSON.parse(e.data as string);
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: {
+ kind: "error",
+ error: {
+ code: "execution_failed",
+ message: "Method threw an exception",
+ },
+ },
+ }));
+ };
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ const err = await assertRejects(
+ () =>
+ client.modelMethodRun({
+ modelIdOrName: "test",
+ methodName: "run",
+ }),
+ SwampClientError,
+ "Method threw an exception",
+ );
+ assertEquals((err as SwampClientError).code, "execution_failed");
+
+ client.close();
+ await server.shutdown();
+});
+
+Deno.test("SwampClient - workflowRunStream yields events", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onmessage = (e) => {
+ const msg = JSON.parse(e.data as string);
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: { kind: "validating_inputs" },
+ }));
+ socket.send(JSON.stringify({
+ type: "event",
+ id: msg.id,
+ event: {
+ kind: "completed",
+ run: {
+ id: "r1",
+ workflowId: "w1",
+ workflowName: "stream-wf",
+ status: "succeeded",
+ jobs: [],
+ },
+ },
+ }));
+ };
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ const kinds: string[] = [];
+ for await (
+ const event of client.workflowRunStream({
+ workflowIdOrName: "stream-wf",
+ })
+ ) {
+ kinds.push(event.kind);
+ }
+
+ assertEquals(kinds, ["validating_inputs", "completed"]);
+
+ client.close();
+ await server.shutdown();
+});
+
+Deno.test("SwampClient - socket close rejects pending requests", async () => {
+ const server = Deno.serve({ port: 0, onListen: () => {} }, (req) => {
+ if (req.headers.get("upgrade") === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ socket.onmessage = () => {
+ // Close instead of responding
+ socket.close();
+ };
+ return response;
+ }
+ return new Response("not found", { status: 404 });
+ });
+
+ const addr = server.addr;
+ const client = new SwampClient(`ws://localhost:${addr.port}/ws`);
+
+ await assertRejects(
+ () => client.workflowRun({ workflowIdOrName: "test" }),
+ Error,
+ "WebSocket closed",
+ );
+
+ client.close();
+ await server.shutdown();
+});
diff --git a/packages/client/deno.json b/packages/client/deno.json
new file mode 100644
index 00000000..75fd5199
--- /dev/null
+++ b/packages/client/deno.json
@@ -0,0 +1,6 @@
+{
+ "name": "@systeminit/swamp-lib",
+ "version": "0.1.0",
+ "license": "AGPL-3.0-only",
+ "exports": "./mod.ts"
+}
diff --git a/packages/client/mod.ts b/packages/client/mod.ts
new file mode 100644
index 00000000..a937e984
--- /dev/null
+++ b/packages/client/mod.ts
@@ -0,0 +1,71 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and\/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * @systeminit/swamp-lib — TypeScript client for the swamp WebSocket API.
+ *
+ * ```typescript
+ * import { SwampClient } from "@systeminit/swamp-lib";
+ *
+ * const client = new SwampClient("ws://localhost:9090");
+ * await client.connect();
+ *
+ * // Callback-based: resolves with completed payload
+ * const run = await client.workflowRun(
+ * { workflowIdOrName: "my-workflow", inputs: { env: "dev" } },
+ * { started: (e) => console.log(`Run ${e.runId} started`) },
+ * );
+ *
+ * // AsyncIterable-based: for-await-of event stream
+ * for await (const event of client.workflowRunStream({ workflowIdOrName: "my-workflow" })) {
+ * console.log(event.kind);
+ * }
+ *
+ * client.close();
+ * ```
+ *
+ * @module
+ */
+
+export { SwampClient } from "./client.ts";
+
+// Protocol types
+export type {
+ ModelMethodRunEvent,
+ ModelMethodRunPayload,
+ ModelMethodRunView,
+ SerializedError,
+ SerializedEvent,
+ ServerMessage,
+ ServerRequest,
+ WorkflowRunEvent,
+ WorkflowRunPayload,
+ WorkflowRunView,
+} from "./protocol.ts";
+
+// Stream helpers
+export {
+ consumeStream,
+ type EventHandlers,
+ type HasTerminals,
+ result,
+ type StreamEvent,
+ SwampClientError,
+ withDefaults,
+} from "./stream.ts";
diff --git a/packages/client/protocol.ts b/packages/client/protocol.ts
new file mode 100644
index 00000000..102f225e
--- /dev/null
+++ b/packages/client/protocol.ts
@@ -0,0 +1,222 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and\/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * Wire protocol types for the swamp WebSocket API.
+ *
+ * These types mirror src/serve/protocol.ts in the swamp CLI. They are
+ * duplicated here so the client package has zero dependencies on the CLI
+ * source tree and can be published independently to JSR.
+ */
+
+// ── Inbound (client → server) ────────────────────────────────────────────
+
+export interface WorkflowRunPayload {
+ workflowIdOrName: string;
+ inputs?: Record;
+ lastEvaluated?: boolean;
+ driver?: string;
+ verbose?: boolean;
+ runtimeTags?: Record;
+}
+
+export interface ModelMethodRunPayload {
+ modelIdOrName: string;
+ methodName: string;
+ inputs?: Record;
+ lastEvaluated?: boolean;
+ driver?: string;
+ runtimeTags?: Record;
+}
+
+export type ServerRequest =
+ | { type: "workflow.run"; id: string; payload: WorkflowRunPayload }
+ | { type: "model.method.run"; id: string; payload: ModelMethodRunPayload }
+ | { type: "cancel"; id: string };
+
+// ── Outbound (server → client) ───────────────────────────────────────────
+
+export interface SerializedEvent {
+ kind: string;
+ [key: string]: unknown;
+}
+
+export interface SerializedError {
+ code: string;
+ message: string;
+ details?: unknown;
+}
+
+export type ServerMessage =
+ | { type: "event"; id: string; event: SerializedEvent }
+ | { type: "error"; id: string; error: SerializedError };
+
+// ── Event types (wire-format discriminated unions) ────────────────────────
+
+/** Events emitted during a workflow run. */
+export type WorkflowRunEvent =
+ | { kind: "validating_inputs" }
+ | { kind: "evaluating_workflow" }
+ | { kind: "started"; runId: string; workflowName: string }
+ | { kind: "job_started"; jobId: string }
+ | { kind: "job_completed"; jobId: string; status: string }
+ | { kind: "job_skipped"; jobId: string }
+ | { kind: "step_started"; jobId: string; stepId: string }
+ | { kind: "step_completed"; jobId: string; stepId: string }
+ | { kind: "step_skipped"; jobId: string; stepId: string }
+ | {
+ kind: "step_failed";
+ jobId: string;
+ stepId: string;
+ error: string;
+ allowedFailure?: boolean;
+ }
+ | {
+ kind: "model_resolved";
+ jobId: string;
+ stepId: string;
+ modelName: string;
+ modelType: string;
+ methodName: string;
+ }
+ | {
+ kind: "method_executing";
+ jobId: string;
+ stepId: string;
+ modelName: string;
+ methodName: string;
+ }
+ | {
+ kind: "method_output";
+ jobId: string;
+ stepId: string;
+ modelName: string;
+ methodName: string;
+ stream: "stdout" | "stderr";
+ line: string;
+ }
+ | {
+ kind: "method_event";
+ jobId: string;
+ stepId: string;
+ modelName: string;
+ methodName: string;
+ event: Record;
+ }
+ | { kind: "report_started"; reportName: string; scope: string }
+ | {
+ kind: "report_completed";
+ reportName: string;
+ scope: string;
+ markdown: string;
+ json: Record;
+ }
+ | { kind: "report_failed"; reportName: string; scope: string; error: string }
+ | { kind: "completed"; run: WorkflowRunView }
+ | { kind: "error"; error: SerializedError };
+
+/** Events emitted during a model method run. */
+export type ModelMethodRunEvent =
+ | { kind: "validating_inputs" }
+ | { kind: "resolving_model"; modelIdOrName: string }
+ | {
+ kind: "model_resolved";
+ modelName: string;
+ modelType: string;
+ methodName: string;
+ }
+ | {
+ kind: "env_var_warning";
+ modelName: string;
+ envVars: Array<{ path: string; envVar: string }>;
+ message: string;
+ }
+ | { kind: "evaluating_expressions"; lastEvaluated: boolean }
+ | { kind: "executing"; modelName: string; methodName: string }
+ | {
+ kind: "method_output";
+ modelName: string;
+ methodName: string;
+ stream: "stdout" | "stderr";
+ line: string;
+ }
+ | {
+ kind: "method_event";
+ modelName: string;
+ methodName: string;
+ event: Record;
+ }
+ | { kind: "data_artifact_saved"; name: string; path: string }
+ | { kind: "report_started"; reportName: string; scope: string }
+ | {
+ kind: "report_completed";
+ reportName: string;
+ scope: string;
+ markdown: string;
+ json: Record;
+ }
+ | { kind: "report_failed"; reportName: string; scope: string; error: string }
+ | { kind: "completed"; run: ModelMethodRunView }
+ | { kind: "error"; error: SerializedError };
+
+// ── Completed event payload types ────────────────────────────────────────
+
+export interface WorkflowRunView {
+ id: string;
+ workflowId: string;
+ workflowName: string;
+ status: string;
+ jobs: Array<{
+ name: string;
+ status: string;
+ steps: Array<{
+ name: string;
+ status: string;
+ error?: string;
+ duration?: number;
+ dataArtifacts?: Array<{
+ dataId: string;
+ name: string;
+ version: number;
+ tags: Record;
+ }>;
+ allowedFailure?: boolean;
+ }>;
+ duration?: number;
+ }>;
+ duration?: number;
+ path?: string;
+}
+
+export interface ModelMethodRunView {
+ modelId: string;
+ modelName: string;
+ modelType: string;
+ methodName: string;
+ status: string;
+ duration?: number;
+ outputId: string;
+ logFile?: string;
+ dataArtifacts: Array<{
+ id: string;
+ name: string;
+ path: string;
+ attributes?: Record;
+ }>;
+}
diff --git a/packages/client/stream.ts b/packages/client/stream.ts
new file mode 100644
index 00000000..fbe02ae8
--- /dev/null
+++ b/packages/client/stream.ts
@@ -0,0 +1,113 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and\/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * Client-side stream consumption helpers.
+ *
+ * These mirror the core helpers from libswamp/stream.ts so consumers can use
+ * the same patterns (consumeStream, result, withDefaults) without depending
+ * on the full swamp CLI.
+ */
+
+import type { SerializedError } from "./protocol.ts";
+
+/** Base constraint for stream events. */
+export type StreamEvent = { kind: string };
+
+/** Compile-time check that E includes both `completed` and `error` terminals. */
+export type HasTerminals = Extract<
+ E,
+ { kind: "completed" }
+> extends never ? never
+ : Extract extends never ? never
+ : E;
+
+/** Mapped type enforcing exhaustive handlers per event kind. */
+export type EventHandlers = {
+ [K in E["kind"]]: (
+ event: Extract,
+ ) => void | Promise;
+};
+
+/**
+ * Iterates a stream and dispatches each event to the matching handler.
+ * Handlers are exhaustiveness-checked at compile time.
+ */
+export async function consumeStream(
+ stream: AsyncIterable>,
+ handlers: EventHandlers,
+): Promise {
+ for await (const event of stream) {
+ const handler = handlers[event.kind as E["kind"]];
+ // deno-lint-ignore no-explicit-any
+ await handler(event as any);
+ }
+}
+
+/**
+ * Fast-forwards through a stream to the `completed` event.
+ * Throws a SwampClientError if an `error` event is encountered.
+ */
+export async function result(
+ stream: AsyncIterable>,
+): Promise> {
+ for await (const event of stream) {
+ if (event.kind === "completed") {
+ return event as unknown as Extract;
+ }
+ if (event.kind === "error") {
+ const error = (event as unknown as { error: SerializedError }).error;
+ throw new SwampClientError(error.code, error.message, error.details);
+ }
+ }
+ throw new Error("Stream ended without a completed or error event");
+}
+
+/**
+ * Fills missing handlers with no-ops (or a provided fallback).
+ */
+export function withDefaults(
+ partial: Partial>,
+ fallback?: (event: E) => void | Promise,
+): EventHandlers {
+ const noop = () => {};
+ return new Proxy(partial as EventHandlers, {
+ get(target, prop, receiver) {
+ const handler = Reflect.get(target, prop, receiver);
+ if (handler) return handler;
+ if (fallback) return fallback;
+ return noop;
+ },
+ });
+}
+
+/**
+ * Error thrown when the server sends an error event.
+ */
+export class SwampClientError extends Error {
+ readonly code: string;
+ readonly details?: unknown;
+
+ constructor(code: string, message: string, details?: unknown) {
+ super(message);
+ this.name = "SwampClientError";
+ this.code = code;
+ this.details = details;
+ }
+}
diff --git a/packages/client/stream_test.ts b/packages/client/stream_test.ts
new file mode 100644
index 00000000..7d276d22
--- /dev/null
+++ b/packages/client/stream_test.ts
@@ -0,0 +1,221 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { assertEquals, assertRejects } from "@std/assert";
+import {
+ consumeStream,
+ result,
+ SwampClientError,
+ withDefaults,
+} from "./stream.ts";
+import type { SerializedError } from "./protocol.ts";
+
+// ── Test event types ────────────────────────────────────────────────────
+
+type TestEvent =
+ | { kind: "started"; id: string }
+ | { kind: "progress"; percent: number }
+ | { kind: "completed"; result: string }
+ | { kind: "error"; error: SerializedError };
+
+/** Helper: creates an async iterable from an array. */
+async function* asyncOf(...items: T[]): AsyncGenerator {
+ for (const item of items) {
+ yield item;
+ }
+}
+
+// ── SwampClientError ────────────────────────────────────────────────────
+
+Deno.test("SwampClientError - constructor sets all fields", () => {
+ const err = new SwampClientError("not_found", "Model not found", {
+ id: "abc",
+ });
+ assertEquals(err.code, "not_found");
+ assertEquals(err.message, "Model not found");
+ assertEquals(err.details, { id: "abc" });
+ assertEquals(err.name, "SwampClientError");
+ assertEquals(err instanceof Error, true);
+});
+
+Deno.test("SwampClientError - details is optional", () => {
+ const err = new SwampClientError("cancelled", "Cancelled");
+ assertEquals(err.code, "cancelled");
+ assertEquals(err.details, undefined);
+});
+
+// ── withDefaults ────────────────────────────────────────────────────────
+
+Deno.test("withDefaults - provided handlers are used", () => {
+ const calls: string[] = [];
+ const handlers = withDefaults({
+ started: (e) => {
+ calls.push(`started:${e.id}`);
+ },
+ });
+
+ handlers.started({ kind: "started", id: "x" });
+ assertEquals(calls, ["started:x"]);
+});
+
+Deno.test("withDefaults - missing handlers get noop", () => {
+ const handlers = withDefaults({});
+ // Should not throw for unhandled kinds
+ handlers.progress({ kind: "progress", percent: 50 });
+ handlers.completed({ kind: "completed", result: "ok" });
+});
+
+Deno.test("withDefaults - fallback is called for missing handlers", () => {
+ const fallbackCalls: string[] = [];
+ const handlers = withDefaults({}, (event) => {
+ fallbackCalls.push(event.kind);
+ });
+
+ handlers.progress({ kind: "progress", percent: 50 });
+ handlers.completed({ kind: "completed", result: "ok" });
+ assertEquals(fallbackCalls, ["progress", "completed"]);
+});
+
+Deno.test("withDefaults - explicit handler takes precedence over fallback", () => {
+ const calls: string[] = [];
+ const handlers = withDefaults(
+ {
+ started: () => {
+ calls.push("explicit");
+ },
+ },
+ () => {
+ calls.push("fallback");
+ },
+ );
+
+ handlers.started({ kind: "started", id: "1" });
+ handlers.progress({ kind: "progress", percent: 0 });
+ assertEquals(calls, ["explicit", "fallback"]);
+});
+
+// ── consumeStream ───────────────────────────────────────────────────────
+
+Deno.test("consumeStream - dispatches all events to handlers", async () => {
+ const events: TestEvent[] = [
+ { kind: "started", id: "run-1" },
+ { kind: "progress", percent: 50 },
+ { kind: "completed", result: "done" },
+ ];
+
+ const seen: string[] = [];
+ await consumeStream(
+ asyncOf(...events),
+ withDefaults({
+ started: (e) => {
+ seen.push(`started:${e.id}`);
+ },
+ progress: (e) => {
+ seen.push(`progress:${e.percent}`);
+ },
+ completed: (e) => {
+ seen.push(`completed:${e.result}`);
+ },
+ }),
+ );
+
+ assertEquals(seen, ["started:run-1", "progress:50", "completed:done"]);
+});
+
+Deno.test("consumeStream - works with async handlers", async () => {
+ const events: TestEvent[] = [
+ { kind: "started", id: "a" },
+ { kind: "completed", result: "ok" },
+ ];
+
+ const seen: string[] = [];
+ await consumeStream(
+ asyncOf(...events),
+ withDefaults({
+ started: async (e) => {
+ await Promise.resolve();
+ seen.push(e.id);
+ },
+ }),
+ );
+
+ assertEquals(seen, ["a"]);
+});
+
+// ── result ──────────────────────────────────────────────────────────────
+
+Deno.test("result - returns completed event", async () => {
+ const events: TestEvent[] = [
+ { kind: "started", id: "r1" },
+ { kind: "progress", percent: 100 },
+ { kind: "completed", result: "success" },
+ ];
+
+ const completed = await result(asyncOf(...events));
+ assertEquals(completed.kind, "completed");
+ assertEquals(completed.result, "success");
+});
+
+Deno.test("result - throws SwampClientError on error event", async () => {
+ const events: TestEvent[] = [
+ { kind: "started", id: "r1" },
+ {
+ kind: "error",
+ error: {
+ code: "execution_failed",
+ message: "Method threw",
+ details: { exitCode: 1 },
+ },
+ },
+ ];
+
+ const err = await assertRejects(
+ () => result(asyncOf(...events)),
+ SwampClientError,
+ "Method threw",
+ );
+ assertEquals((err as SwampClientError).code, "execution_failed");
+ assertEquals((err as SwampClientError).details, { exitCode: 1 });
+});
+
+Deno.test("result - throws generic Error if stream ends without terminal", async () => {
+ const events: TestEvent[] = [
+ { kind: "started", id: "r1" },
+ { kind: "progress", percent: 50 },
+ ];
+
+ await assertRejects(
+ () => result(asyncOf(...events)),
+ Error,
+ "Stream ended without a completed or error event",
+ );
+});
+
+Deno.test("result - skips non-terminal events", async () => {
+ const events: TestEvent[] = [
+ { kind: "started", id: "r1" },
+ { kind: "progress", percent: 10 },
+ { kind: "progress", percent: 50 },
+ { kind: "progress", percent: 90 },
+ { kind: "completed", result: "all done" },
+ ];
+
+ const completed = await result(asyncOf(...events));
+ assertEquals(completed.result, "all done");
+});
diff --git a/src/cli/commands/serve.ts b/src/cli/commands/serve.ts
new file mode 100644
index 00000000..295e12a5
--- /dev/null
+++ b/src/cli/commands/serve.ts
@@ -0,0 +1,120 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { Command } from "@cliffy/command";
+import { createContext, type GlobalOptions } from "../context.ts";
+import { requireInitializedRepoUnlocked } from "../repo_context.ts";
+import { handleConnection } from "../../serve/connection.ts";
+import { getSwampLogger } from "../../infrastructure/logging/logger.ts";
+
+// deno-lint-ignore no-explicit-any
+type AnyOptions = any;
+
+const logger = getSwampLogger(["serve"]);
+
+export const serveCommand = new Command()
+ .name("serve")
+ .description("Start a WebSocket API server for workflow and model execution")
+ .option("--repo-dir ", "Repository directory", { default: "." })
+ .option("--port ", "Port to listen on", { default: 9090 })
+ .option("--host ", "Host to bind to", { default: "127.0.0.1" })
+ .action(async function (options: AnyOptions) {
+ const ctx = createContext(options as GlobalOptions, ["serve"]);
+ const repoDir = options.repoDir as string ?? ".";
+ const port = options.port as number;
+ const host = options.host as string;
+ const isJson = ctx.outputMode === "json";
+
+ ctx.logger.info`Initializing repository at ${repoDir}`;
+
+ const { repoDir: resolvedRepoDir, repoContext, datastoreConfig } =
+ await requireInitializedRepoUnlocked({
+ repoDir,
+ outputMode: ctx.outputMode,
+ });
+
+ if (host !== "127.0.0.1" && host !== "localhost") {
+ logger.warn(
+ "Binding to non-loopback address {host} — no authentication is enforced on WebSocket connections",
+ { host },
+ );
+ }
+ const connectionCtx = {
+ repoDir: resolvedRepoDir,
+ repoContext,
+ datastoreConfig,
+ };
+
+ const ac = new AbortController();
+
+ const server = Deno.serve(
+ {
+ port,
+ hostname: host,
+ signal: ac.signal,
+ onListen({ hostname, port: listenPort }) {
+ if (isJson) {
+ console.log(JSON.stringify({
+ status: "listening",
+ host: hostname,
+ port: listenPort,
+ url: `ws://${hostname}:${listenPort}`,
+ }));
+ } else {
+ logger.info("WebSocket API server listening on {host}:{port}", {
+ host: hostname,
+ port: listenPort,
+ });
+ }
+ },
+ },
+ (req) => {
+ // WebSocket upgrade (check first — upgrade requests are also GETs)
+ const upgrade = req.headers.get("upgrade") ?? "";
+ if (upgrade.toLowerCase() === "websocket") {
+ const { socket, response } = Deno.upgradeWebSocket(req);
+ handleConnection(socket, connectionCtx);
+ return response;
+ }
+
+ // Health check endpoint
+ if (req.method === "GET") {
+ const url = new URL(req.url);
+ if (url.pathname === "/" || url.pathname === "/health") {
+ return Response.json({ status: "ok", version: "1" });
+ }
+ }
+
+ return new Response("Not found", { status: 404 });
+ },
+ );
+
+ // Handle SIGINT/SIGTERM for graceful shutdown
+ const shutdown = () => {
+ if (isJson) {
+ console.log(JSON.stringify({ status: "stopped" }));
+ }
+ logger.info("Shutting down...");
+ ac.abort();
+ };
+ Deno.addSignalListener("SIGINT", shutdown);
+ Deno.addSignalListener("SIGTERM", shutdown);
+
+ await server.finished;
+ });
diff --git a/src/cli/commands/serve_test.ts b/src/cli/commands/serve_test.ts
new file mode 100644
index 00000000..443078a3
--- /dev/null
+++ b/src/cli/commands/serve_test.ts
@@ -0,0 +1,58 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { assertEquals } from "@std/assert";
+import { initializeLogging } from "../../infrastructure/logging/logger.ts";
+
+// Initialize logging for tests
+await initializeLogging({});
+
+Deno.test("serveCommand module loads", async () => {
+ const { serveCommand } = await import("./serve.ts");
+ assertEquals(serveCommand.getName(), "serve");
+});
+
+Deno.test("serveCommand has correct description", async () => {
+ const { serveCommand } = await import("./serve.ts");
+ assertEquals(
+ serveCommand.getDescription(),
+ "Start a WebSocket API server for workflow and model execution",
+ );
+});
+
+Deno.test("serveCommand has --port option", async () => {
+ const { serveCommand } = await import("./serve.ts");
+ const options = serveCommand.getOptions();
+ const portOpt = options.find((o) => o.name === "port");
+ assertEquals(portOpt !== undefined, true);
+});
+
+Deno.test("serveCommand has --host option", async () => {
+ const { serveCommand } = await import("./serve.ts");
+ const options = serveCommand.getOptions();
+ const hostOpt = options.find((o) => o.name === "host");
+ assertEquals(hostOpt !== undefined, true);
+});
+
+Deno.test("serveCommand has --repo-dir option", async () => {
+ const { serveCommand } = await import("./serve.ts");
+ const options = serveCommand.getOptions();
+ const repoDirOpt = options.find((o) => o.name === "repo-dir");
+ assertEquals(repoDirOpt !== undefined, true);
+});
diff --git a/src/cli/mod.ts b/src/cli/mod.ts
index a80f1346..344e08ce 100644
--- a/src/cli/mod.ts
+++ b/src/cli/mod.ts
@@ -44,6 +44,7 @@ import { extensionCommand } from "./commands/extension.ts";
import { summariseCommand } from "./commands/summarise.ts";
import { datastoreCommand } from "./commands/datastore.ts";
import { reportCommand } from "./commands/report.ts";
+import { serveCommand } from "./commands/serve.ts";
import { createHelpCommand } from "./commands/help.ts";
import { unknownCommandErrorHandler } from "./unknown_command_handler.ts";
import {
@@ -723,7 +724,8 @@ export async function runCli(args: string[]): Promise {
.command("extension", extensionCommand)
.command("summarise", summariseCommand)
.command("datastore", datastoreCommand)
- .command("report", reportCommand);
+ .command("report", reportCommand)
+ .command("serve", serveCommand);
// Register help command last — needs reference to the fully-built CLI tree
cli.command("help", createHelpCommand(cli));
diff --git a/src/serve/connection.ts b/src/serve/connection.ts
new file mode 100644
index 00000000..ddcd56a4
--- /dev/null
+++ b/src/serve/connection.ts
@@ -0,0 +1,375 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * Per-WebSocket connection handler. Dispatches incoming requests to libswamp
+ * operations and streams serialized events back to the client.
+ */
+
+import { z } from "zod";
+import type { RepositoryContext } from "../infrastructure/persistence/repository_factory.ts";
+import type { DatastoreConfig } from "../domain/datastore/datastore_config.ts";
+import {
+ createLibSwampContext,
+ modelMethodRun,
+ workflowRun,
+} from "../libswamp/mod.ts";
+import { createModelMethodRunDeps, createWorkflowRunDeps } from "./deps.ts";
+import { serializeEvent } from "./serializer.ts";
+import type {
+ ModelMethodRunPayload,
+ ServerMessage,
+ ServerRequest,
+ WorkflowRunPayload,
+} from "./protocol.ts";
+import { findDefinitionByIdOrName } from "../domain/models/model_lookup.ts";
+import { extractModelReferencesFromWorkflow } from "../domain/workflows/model_reference_extractor.ts";
+import { createWorkflowId } from "../domain/workflows/workflow_id.ts";
+import { acquireModelLocks } from "../cli/repo_context.ts";
+import { getSwampLogger } from "../infrastructure/logging/logger.ts";
+
+// ── Zod schemas for incoming WebSocket messages ─────────────────────────
+
+const WorkflowRunRequestSchema = z.object({
+ type: z.literal("workflow.run"),
+ id: z.string().min(1),
+ payload: z.object({
+ workflowIdOrName: z.string(),
+ inputs: z.record(z.string(), z.unknown()).optional(),
+ lastEvaluated: z.boolean().optional(),
+ driver: z.string().optional(),
+ verbose: z.boolean().optional(),
+ runtimeTags: z.record(z.string(), z.string()).optional(),
+ }),
+});
+
+const ModelMethodRunRequestSchema = z.object({
+ type: z.literal("model.method.run"),
+ id: z.string().min(1),
+ payload: z.object({
+ modelIdOrName: z.string(),
+ methodName: z.string(),
+ inputs: z.record(z.string(), z.unknown()).optional(),
+ lastEvaluated: z.boolean().optional(),
+ driver: z.string().optional(),
+ runtimeTags: z.record(z.string(), z.string()).optional(),
+ }),
+});
+
+const CancelRequestSchema = z.object({
+ type: z.literal("cancel"),
+ id: z.string().min(1),
+});
+
+const ServerRequestSchema = z.discriminatedUnion("type", [
+ WorkflowRunRequestSchema,
+ ModelMethodRunRequestSchema,
+ CancelRequestSchema,
+]);
+
+/**
+ * Validates a parsed JSON value against the ServerRequest schema.
+ * Returns the validated request on success, or a human-readable error string on failure.
+ */
+export function validateServerRequest(
+ data: unknown,
+): ServerRequest | string {
+ const result = ServerRequestSchema.safeParse(data);
+ if (result.success) {
+ return result.data as ServerRequest;
+ }
+ const issues = result.error.issues.map((i) =>
+ `${i.path.join(".")}: ${i.message}`
+ ).join("; ");
+ return `Invalid request: ${issues}`;
+}
+
+const logger = getSwampLogger(["serve", "connection"]);
+
+export interface ConnectionContext {
+ repoDir: string;
+ repoContext: RepositoryContext;
+ datastoreConfig: DatastoreConfig;
+}
+
+export function handleConnection(
+ socket: WebSocket,
+ ctx: ConnectionContext,
+): void {
+ const activeRequests = new Map();
+
+ socket.onmessage = (event) => {
+ handleMessage(socket, ctx, activeRequests, event);
+ };
+
+ socket.onclose = () => {
+ for (const controller of activeRequests.values()) {
+ controller.abort();
+ }
+ activeRequests.clear();
+ };
+
+ socket.onerror = (event) => {
+ logger.warn("WebSocket error: {error}", {
+ error: event instanceof ErrorEvent ? event.message : "unknown",
+ });
+ };
+}
+
+/**
+ * Parse, validate, and dispatch a single incoming WebSocket message.
+ * Exported for unit testing.
+ */
+export function handleMessage(
+ socket: WebSocket,
+ ctx: ConnectionContext,
+ activeRequests: Map,
+ event: MessageEvent,
+): void {
+ let parsed: unknown;
+ try {
+ parsed = JSON.parse(event.data as string);
+ } catch {
+ sendError(socket, "unknown", "invalid_request", "Invalid JSON");
+ return;
+ }
+
+ const validated = validateServerRequest(parsed);
+ if (typeof validated === "string") {
+ sendError(socket, "unknown", "invalid_request", validated);
+ return;
+ }
+
+ const request: ServerRequest = validated;
+
+ if (request.type === "cancel") {
+ const controller = activeRequests.get(request.id);
+ if (controller) {
+ controller.abort();
+ }
+ return;
+ }
+
+ if (activeRequests.has(request.id)) {
+ sendError(
+ socket,
+ request.id,
+ "duplicate_id",
+ `Request id '${request.id}' is already active`,
+ );
+ return;
+ }
+
+ const controller = new AbortController();
+ activeRequests.set(request.id, controller);
+
+ const task = request.type === "workflow.run"
+ ? handleWorkflowRun(
+ socket,
+ ctx,
+ request.id,
+ request.payload,
+ controller,
+ )
+ : handleModelMethodRun(
+ socket,
+ ctx,
+ request.id,
+ request.payload,
+ controller,
+ );
+
+ task.finally(() => activeRequests.delete(request.id));
+}
+
+async function handleWorkflowRun(
+ socket: WebSocket,
+ ctx: ConnectionContext,
+ requestId: string,
+ payload: WorkflowRunPayload,
+ controller: AbortController,
+): Promise {
+ let flushLocks: (() => Promise) | null = null;
+
+ try {
+ // Pre-lookup workflow for per-model lock acquisition
+ const workflowRepo = ctx.repoContext.workflowRepo;
+ const workflow = await workflowRepo.findByName(
+ payload.workflowIdOrName,
+ ) ?? await workflowRepo.findById(
+ createWorkflowId(payload.workflowIdOrName),
+ );
+
+ if (workflow) {
+ const modelRefs = await extractModelReferencesFromWorkflow(
+ workflow,
+ workflowRepo,
+ );
+ if (modelRefs !== null && modelRefs.length > 0) {
+ const resolvedModels: Array<{ modelType: string; modelId: string }> =
+ [];
+ for (const ref of modelRefs) {
+ const result = await findDefinitionByIdOrName(
+ ctx.repoContext.definitionRepo,
+ ref,
+ );
+ if (result) {
+ resolvedModels.push({
+ modelType: result.type.normalized,
+ modelId: result.definition.id,
+ });
+ }
+ }
+ if (resolvedModels.length > 0) {
+ flushLocks = await acquireModelLocks(
+ ctx.datastoreConfig,
+ resolvedModels,
+ ctx.repoDir,
+ );
+ }
+ }
+ }
+
+ const deps = createWorkflowRunDeps(ctx.repoDir, ctx.repoContext);
+ const libCtx = createLibSwampContext({ signal: controller.signal });
+
+ for await (
+ const event of workflowRun(libCtx, deps, {
+ workflowIdOrName: payload.workflowIdOrName,
+ inputs: payload.inputs,
+ lastEvaluated: payload.lastEvaluated,
+ driver: payload.driver,
+ verbose: payload.verbose,
+ runtimeTags: payload.runtimeTags,
+ })
+ ) {
+ if (socket.readyState !== WebSocket.OPEN) break;
+ const serialized = serializeEvent(
+ event as { kind: string; [key: string]: unknown },
+ );
+ send(socket, { type: "event", id: requestId, event: serialized });
+ }
+ } catch (error) {
+ if (error instanceof DOMException && error.name === "AbortError") {
+ sendError(socket, requestId, "cancelled", "Operation was cancelled");
+ } else {
+ const message = error instanceof Error ? error.message : String(error);
+ sendError(socket, requestId, "workflow_execution_failed", message);
+ }
+ } finally {
+ if (flushLocks) {
+ try {
+ await flushLocks();
+ } catch (releaseError) {
+ logger.warn("Failed to release locks: {error}", {
+ error: releaseError instanceof Error
+ ? releaseError.message
+ : String(releaseError),
+ });
+ }
+ }
+ }
+}
+
+async function handleModelMethodRun(
+ socket: WebSocket,
+ ctx: ConnectionContext,
+ requestId: string,
+ payload: ModelMethodRunPayload,
+ controller: AbortController,
+): Promise {
+ let flushLocks: (() => Promise) | null = null;
+
+ try {
+ // Pre-lookup for per-model lock acquisition
+ const preResult = await findDefinitionByIdOrName(
+ ctx.repoContext.definitionRepo,
+ payload.modelIdOrName,
+ );
+ if (preResult) {
+ flushLocks = await acquireModelLocks(
+ ctx.datastoreConfig,
+ [{
+ modelType: preResult.type.normalized,
+ modelId: preResult.definition.id,
+ }],
+ ctx.repoDir,
+ );
+ }
+
+ const deps = createModelMethodRunDeps(ctx.repoDir, ctx.repoContext);
+ const libCtx = createLibSwampContext({ signal: controller.signal });
+
+ for await (
+ const event of modelMethodRun(libCtx, deps, {
+ modelIdOrName: payload.modelIdOrName,
+ methodName: payload.methodName,
+ inputs: payload.inputs ?? {},
+ lastEvaluated: payload.lastEvaluated ?? false,
+ runtimeTags: payload.runtimeTags,
+ driver: payload.driver,
+ })
+ ) {
+ if (socket.readyState !== WebSocket.OPEN) break;
+ const serialized = serializeEvent(
+ event as { kind: string; [key: string]: unknown },
+ );
+ send(socket, { type: "event", id: requestId, event: serialized });
+ }
+ } catch (error) {
+ if (error instanceof DOMException && error.name === "AbortError") {
+ sendError(socket, requestId, "cancelled", "Operation was cancelled");
+ } else {
+ const message = error instanceof Error ? error.message : String(error);
+ sendError(
+ socket,
+ requestId,
+ "method_execution_failed",
+ message,
+ );
+ }
+ } finally {
+ if (flushLocks) {
+ try {
+ await flushLocks();
+ } catch (releaseError) {
+ logger.warn("Failed to release locks: {error}", {
+ error: releaseError instanceof Error
+ ? releaseError.message
+ : String(releaseError),
+ });
+ }
+ }
+ }
+}
+
+function send(socket: WebSocket, message: ServerMessage): void {
+ if (socket.readyState === WebSocket.OPEN) {
+ socket.send(JSON.stringify(message));
+ }
+}
+
+function sendError(
+ socket: WebSocket,
+ id: string,
+ code: string,
+ message: string,
+): void {
+ send(socket, { type: "error", id, error: { code, message } });
+}
diff --git a/src/serve/connection_test.ts b/src/serve/connection_test.ts
new file mode 100644
index 00000000..05cef0f9
--- /dev/null
+++ b/src/serve/connection_test.ts
@@ -0,0 +1,236 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { assertEquals } from "@std/assert";
+import { handleMessage, validateServerRequest } from "./connection.ts";
+import type { ConnectionContext } from "./connection.ts";
+import { initializeLogging } from "../infrastructure/logging/logger.ts";
+
+await initializeLogging({});
+
+// ── Mock WebSocket ──────────────────────────────────────────────────────
+
+interface MockSocket {
+ sent: string[];
+ closed: boolean;
+ readyState: number;
+ send(data: string): void;
+ close(): void;
+}
+
+function createMockSocket(): MockSocket {
+ return {
+ sent: [],
+ closed: false,
+ readyState: WebSocket.OPEN,
+ send(data: string) {
+ this.sent.push(data);
+ },
+ close() {
+ this.closed = true;
+ },
+ };
+}
+
+function parseSent(mock: MockSocket, index = 0): Record {
+ return JSON.parse(mock.sent[index]);
+}
+
+// Stub ConnectionContext — handleMessage only needs it for dispatch, and
+// workflow/model handlers won't be reached in validation-level tests.
+const stubCtx = {} as ConnectionContext;
+
+function makeEvent(data: string): MessageEvent {
+ return new MessageEvent("message", { data });
+}
+
+// ── validateServerRequest ───────────────────────────────────────────────
+
+Deno.test("validateServerRequest accepts a valid workflow.run request", () => {
+ const input = {
+ type: "workflow.run",
+ id: "req-1",
+ payload: { workflowIdOrName: "deploy" },
+ };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "object");
+});
+
+Deno.test("validateServerRequest accepts a valid model.method.run request", () => {
+ const input = {
+ type: "model.method.run",
+ id: "req-2",
+ payload: { modelIdOrName: "my-model", methodName: "start" },
+ };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "object");
+});
+
+Deno.test("validateServerRequest accepts a valid cancel request", () => {
+ const input = { type: "cancel", id: "req-3" };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "object");
+});
+
+Deno.test("validateServerRequest rejects unknown type", () => {
+ const input = { type: "unknown.type", id: "req-4" };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "string");
+});
+
+Deno.test("validateServerRequest rejects empty id", () => {
+ const input = { type: "cancel", id: "" };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "string");
+});
+
+Deno.test("validateServerRequest rejects missing payload for workflow.run", () => {
+ const input = { type: "workflow.run", id: "req-5" };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "string");
+});
+
+Deno.test("validateServerRequest rejects missing methodName for model.method.run", () => {
+ const input = {
+ type: "model.method.run",
+ id: "req-6",
+ payload: { modelIdOrName: "m" },
+ };
+ const result = validateServerRequest(input);
+ assertEquals(typeof result, "string");
+});
+
+// ── handleMessage: invalid JSON ─────────────────────────────────────────
+
+Deno.test("handleMessage sends error for invalid JSON", () => {
+ const mock = createMockSocket();
+ const active = new Map();
+
+ handleMessage(
+ mock as unknown as WebSocket,
+ stubCtx,
+ active,
+ makeEvent("not json{{{"),
+ );
+
+ assertEquals(mock.sent.length, 1);
+ const msg = parseSent(mock);
+ assertEquals(msg.type, "error");
+ assertEquals((msg.error as Record).code, "invalid_request");
+});
+
+// ── handleMessage: validation failure ───────────────────────────────────
+
+Deno.test("handleMessage sends error for invalid request shape", () => {
+ const mock = createMockSocket();
+ const active = new Map();
+
+ handleMessage(
+ mock as unknown as WebSocket,
+ stubCtx,
+ active,
+ makeEvent(JSON.stringify({ type: "bad", id: "x" })),
+ );
+
+ assertEquals(mock.sent.length, 1);
+ const msg = parseSent(mock);
+ assertEquals(msg.type, "error");
+ assertEquals((msg.error as Record).code, "invalid_request");
+});
+
+// ── handleMessage: cancel ───────────────────────────────────────────────
+
+Deno.test("handleMessage cancel aborts the matching controller", () => {
+ const mock = createMockSocket();
+ const controller = new AbortController();
+ const active = new Map([["req-10", controller]]);
+
+ handleMessage(
+ mock as unknown as WebSocket,
+ stubCtx,
+ active,
+ makeEvent(JSON.stringify({ type: "cancel", id: "req-10" })),
+ );
+
+ assertEquals(controller.signal.aborted, true);
+ // Cancel does not send a response
+ assertEquals(mock.sent.length, 0);
+});
+
+Deno.test("handleMessage cancel for unknown id is a no-op", () => {
+ const mock = createMockSocket();
+ const active = new Map();
+
+ handleMessage(
+ mock as unknown as WebSocket,
+ stubCtx,
+ active,
+ makeEvent(JSON.stringify({ type: "cancel", id: "nonexistent" })),
+ );
+
+ assertEquals(mock.sent.length, 0);
+});
+
+// ── handleMessage: duplicate request ID ─────────────────────────────────
+
+Deno.test("handleMessage rejects duplicate request ID", () => {
+ const mock = createMockSocket();
+ const active = new Map([
+ ["dup-1", new AbortController()],
+ ]);
+
+ handleMessage(
+ mock as unknown as WebSocket,
+ stubCtx,
+ active,
+ makeEvent(JSON.stringify({
+ type: "workflow.run",
+ id: "dup-1",
+ payload: { workflowIdOrName: "w" },
+ })),
+ );
+
+ assertEquals(mock.sent.length, 1);
+ const msg = parseSent(mock);
+ assertEquals(msg.type, "error");
+ assertEquals((msg.error as Record).code, "duplicate_id");
+});
+
+// ── handleMessage: unknown type not leaked ──────────────────────────────
+
+Deno.test("handleMessage does not leak unknown type value in error", () => {
+ const mock = createMockSocket();
+ const active = new Map();
+
+ handleMessage(
+ mock as unknown as WebSocket,
+ stubCtx,
+ active,
+ makeEvent(JSON.stringify({ type: "secret.op", id: "x" })),
+ );
+
+ assertEquals(mock.sent.length, 1);
+ const msg = parseSent(mock);
+ assertEquals(msg.type, "error");
+ // The error message should NOT contain the actual type value
+ const errorMessage = String(
+ (msg.error as Record).message,
+ );
+ assertEquals(errorMessage.includes("secret.op"), false);
+});
diff --git a/src/serve/deps.ts b/src/serve/deps.ts
new file mode 100644
index 00000000..02fd264c
--- /dev/null
+++ b/src/serve/deps.ts
@@ -0,0 +1,110 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * Factories that construct WorkflowRunDeps and ModelMethodRunDeps from a
+ * RepositoryContext. These mirror the patterns in the CLI commands but are
+ * decoupled from Cliffy options parsing.
+ */
+
+import { join } from "@std/path";
+import type { RepositoryContext } from "../infrastructure/persistence/repository_factory.ts";
+import type { ModelMethodRunDeps, WorkflowRunDeps } from "../libswamp/mod.ts";
+import { WorkflowExecutionService } from "../domain/workflows/execution_service.ts";
+import { createWorkflowId } from "../domain/workflows/workflow_id.ts";
+import { findDefinitionByIdOrName } from "../domain/models/model_lookup.ts";
+import { resolveModelType } from "../domain/extensions/extension_auto_resolver.ts";
+import { getAutoResolver } from "../domain/extensions/auto_resolver_context.ts";
+import { DefaultMethodExecutionService } from "../domain/models/method_execution_service.ts";
+import { VaultService } from "../domain/vaults/vault_service.ts";
+import { ExpressionEvaluationService } from "../domain/expressions/expression_evaluation_service.ts";
+import { runFileSink } from "../infrastructure/logging/logger.ts";
+import {
+ SWAMP_SUBDIRS,
+ swampPath,
+} from "../infrastructure/persistence/paths.ts";
+import { SecretRedactor } from "../domain/secrets/mod.ts";
+
+export function createWorkflowRunDeps(
+ repoDir: string,
+ repoContext: RepositoryContext,
+): WorkflowRunDeps {
+ return {
+ workflowRepo: repoContext.workflowRepo,
+ runRepo: repoContext.workflowRunRepo,
+ repoDir,
+ lookupWorkflow: async (repo, idOrName) => {
+ return await repo.findByName(idOrName) ??
+ await repo.findById(createWorkflowId(idOrName));
+ },
+ createExecutionService: (wfRepo, rnRepo, dir) =>
+ new WorkflowExecutionService(wfRepo, rnRepo, dir),
+ dataRepo: repoContext.unifiedDataRepo,
+ definitionRepo: repoContext.definitionRepo,
+ };
+}
+
+export function createModelMethodRunDeps(
+ repoDir: string,
+ repoContext: RepositoryContext,
+): ModelMethodRunDeps {
+ return {
+ repoDir,
+ lookupDefinition: (idOrName) =>
+ findDefinitionByIdOrName(repoContext.definitionRepo, idOrName),
+ getModelDef: (type) => resolveModelType(type, getAutoResolver()),
+ createEvaluationService: () =>
+ new ExpressionEvaluationService(
+ repoContext.definitionRepo,
+ repoDir,
+ { dataRepo: repoContext.unifiedDataRepo },
+ ),
+ loadEvaluatedDefinition: (type, name) =>
+ repoContext.evaluatedDefinitionRepo.findByName(type, name),
+ saveEvaluatedDefinition: (type, definition) =>
+ repoContext.evaluatedDefinitionRepo.save(type, definition),
+ createExecutionService: () => new DefaultMethodExecutionService(),
+ createVaultService: () => VaultService.fromRepository(repoDir),
+ dataRepo: repoContext.unifiedDataRepo,
+ definitionRepo: repoContext.definitionRepo,
+ outputRepo: repoContext.outputRepo,
+ createRunLog: async (modelType, method, definitionId) => {
+ const redactor = new SecretRedactor();
+ const timestamp = new Date().toISOString().replace(/[:.]/g, "-");
+ const logFilePath = join(
+ swampPath(repoDir, SWAMP_SUBDIRS.outputs),
+ modelType.normalized,
+ method,
+ `${definitionId}-${timestamp}.log`,
+ );
+ const logCategory: string[] = [];
+ await runFileSink.register(
+ logCategory,
+ logFilePath,
+ redactor,
+ swampPath(repoDir),
+ );
+ return {
+ logFilePath,
+ redactor,
+ cleanup: () => runFileSink.unregister(logCategory),
+ };
+ },
+ };
+}
diff --git a/src/serve/mod.ts b/src/serve/mod.ts
new file mode 100644
index 00000000..97115df7
--- /dev/null
+++ b/src/serve/mod.ts
@@ -0,0 +1,30 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+export type {
+ ModelMethodRunPayload,
+ SerializedError,
+ SerializedEvent,
+ ServerMessage,
+ ServerRequest,
+ WorkflowRunPayload,
+} from "./protocol.ts";
+export { serializeEvent, serializeSwampError } from "./serializer.ts";
+export { type ConnectionContext, handleConnection } from "./connection.ts";
+export { createModelMethodRunDeps, createWorkflowRunDeps } from "./deps.ts";
diff --git a/src/serve/protocol.ts b/src/serve/protocol.ts
new file mode 100644
index 00000000..8f7fd3b9
--- /dev/null
+++ b/src/serve/protocol.ts
@@ -0,0 +1,70 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * Wire protocol types for the swamp WebSocket API.
+ *
+ * Clients send ServerRequest messages, the server streams back ServerMessage
+ * events. Each request carries a client-assigned `id` that the server echoes
+ * on every response, enabling multiplexed operations on a single socket.
+ */
+
+// ── Inbound (client → server) ────────────────────────────────────────────
+
+export interface WorkflowRunPayload {
+ workflowIdOrName: string;
+ inputs?: Record;
+ lastEvaluated?: boolean;
+ driver?: string;
+ verbose?: boolean;
+ runtimeTags?: Record;
+}
+
+export interface ModelMethodRunPayload {
+ modelIdOrName: string;
+ methodName: string;
+ inputs?: Record;
+ lastEvaluated?: boolean;
+ driver?: string;
+ runtimeTags?: Record;
+}
+
+export type ServerRequest =
+ | { type: "workflow.run"; id: string; payload: WorkflowRunPayload }
+ | { type: "model.method.run"; id: string; payload: ModelMethodRunPayload }
+ | { type: "cancel"; id: string };
+
+// ── Outbound (server → client) ───────────────────────────────────────────
+
+/** A JSON-safe event from a libswamp async generator. */
+export interface SerializedEvent {
+ kind: string;
+ [key: string]: unknown;
+}
+
+/** Error details sent to the client. */
+export interface SerializedError {
+ code: string;
+ message: string;
+ details?: unknown;
+}
+
+export type ServerMessage =
+ | { type: "event"; id: string; event: SerializedEvent }
+ | { type: "error"; id: string; error: SerializedError };
diff --git a/src/serve/protocol_test.ts b/src/serve/protocol_test.ts
new file mode 100644
index 00000000..4a4c554b
--- /dev/null
+++ b/src/serve/protocol_test.ts
@@ -0,0 +1,208 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { assertEquals } from "@std/assert";
+import type {
+ ModelMethodRunPayload,
+ SerializedError,
+ SerializedEvent,
+ ServerMessage,
+ ServerRequest,
+ WorkflowRunPayload,
+} from "./protocol.ts";
+
+// ── WorkflowRunPayload ──────────────────────────────────────────────────
+
+Deno.test("WorkflowRunPayload - minimal payload", () => {
+ const payload: WorkflowRunPayload = {
+ workflowIdOrName: "deploy",
+ };
+ assertEquals(payload.workflowIdOrName, "deploy");
+ assertEquals(payload.inputs, undefined);
+ assertEquals(payload.verbose, undefined);
+});
+
+Deno.test("WorkflowRunPayload - full payload", () => {
+ const payload: WorkflowRunPayload = {
+ workflowIdOrName: "deploy",
+ inputs: { env: "prod" },
+ lastEvaluated: true,
+ driver: "docker",
+ verbose: true,
+ runtimeTags: { region: "us-east-1" },
+ };
+ assertEquals(payload.workflowIdOrName, "deploy");
+ assertEquals(payload.inputs, { env: "prod" });
+ assertEquals(payload.lastEvaluated, true);
+ assertEquals(payload.driver, "docker");
+ assertEquals(payload.verbose, true);
+ assertEquals(payload.runtimeTags, { region: "us-east-1" });
+});
+
+// ── ModelMethodRunPayload ───────────────────────────────────────────────
+
+Deno.test("ModelMethodRunPayload - minimal payload", () => {
+ const payload: ModelMethodRunPayload = {
+ modelIdOrName: "my-server",
+ methodName: "start",
+ };
+ assertEquals(payload.modelIdOrName, "my-server");
+ assertEquals(payload.methodName, "start");
+});
+
+Deno.test("ModelMethodRunPayload - full payload", () => {
+ const payload: ModelMethodRunPayload = {
+ modelIdOrName: "my-server",
+ methodName: "start",
+ inputs: { force: true },
+ lastEvaluated: false,
+ driver: "shell",
+ runtimeTags: { tier: "staging" },
+ };
+ assertEquals(payload.inputs, { force: true });
+ assertEquals(payload.driver, "shell");
+});
+
+// ── ServerRequest discriminated union ───────────────────────────────────
+
+Deno.test("ServerRequest - workflow.run variant", () => {
+ const req: ServerRequest = {
+ type: "workflow.run",
+ id: "req-1",
+ payload: { workflowIdOrName: "deploy" },
+ };
+ assertEquals(req.type, "workflow.run");
+ assertEquals(req.id, "req-1");
+ if (req.type === "workflow.run") {
+ assertEquals(req.payload.workflowIdOrName, "deploy");
+ }
+});
+
+Deno.test("ServerRequest - model.method.run variant", () => {
+ const req: ServerRequest = {
+ type: "model.method.run",
+ id: "req-2",
+ payload: { modelIdOrName: "db", methodName: "migrate" },
+ };
+ assertEquals(req.type, "model.method.run");
+ if (req.type === "model.method.run") {
+ assertEquals(req.payload.modelIdOrName, "db");
+ assertEquals(req.payload.methodName, "migrate");
+ }
+});
+
+Deno.test("ServerRequest - cancel variant", () => {
+ const req: ServerRequest = {
+ type: "cancel",
+ id: "req-3",
+ };
+ assertEquals(req.type, "cancel");
+ assertEquals(req.id, "req-3");
+});
+
+// ── SerializedEvent / SerializedError ───────────────────────────────────
+
+Deno.test("SerializedEvent - allows arbitrary extra properties", () => {
+ const event: SerializedEvent = {
+ kind: "step_completed",
+ jobId: "j1",
+ stepId: "s1",
+ };
+ assertEquals(event.kind, "step_completed");
+ assertEquals(event.jobId, "j1");
+});
+
+Deno.test("SerializedError - with and without details", () => {
+ const withDetails: SerializedError = {
+ code: "validation_error",
+ message: "Bad input",
+ details: { field: "name" },
+ };
+ assertEquals(withDetails.details, { field: "name" });
+
+ const withoutDetails: SerializedError = {
+ code: "not_found",
+ message: "Not found",
+ };
+ assertEquals(withoutDetails.details, undefined);
+});
+
+// ── ServerMessage discriminated union ───────────────────────────────────
+
+Deno.test("ServerMessage - event variant", () => {
+ const msg: ServerMessage = {
+ type: "event",
+ id: "req-1",
+ event: { kind: "started", runId: "r1" },
+ };
+ assertEquals(msg.type, "event");
+ if (msg.type === "event") {
+ assertEquals(msg.event.kind, "started");
+ }
+});
+
+Deno.test("ServerMessage - error variant", () => {
+ const msg: ServerMessage = {
+ type: "error",
+ id: "req-1",
+ error: { code: "internal", message: "Server error" },
+ };
+ assertEquals(msg.type, "error");
+ if (msg.type === "error") {
+ assertEquals(msg.error.code, "internal");
+ }
+});
+
+Deno.test("ServerMessage - discriminated union narrows correctly", () => {
+ const messages: ServerMessage[] = [
+ {
+ type: "event",
+ id: "1",
+ event: { kind: "completed", run: {} },
+ },
+ {
+ type: "error",
+ id: "2",
+ error: { code: "cancelled", message: "Cancelled" },
+ },
+ ];
+
+ const types = messages.map((m) => m.type);
+ assertEquals(types, ["event", "error"]);
+});
+
+Deno.test("ServerRequest - round-trips through JSON", () => {
+ const req: ServerRequest = {
+ type: "workflow.run",
+ id: "req-42",
+ payload: {
+ workflowIdOrName: "build",
+ inputs: { version: "1.0.0" },
+ verbose: true,
+ },
+ };
+ const json = JSON.stringify(req);
+ const parsed = JSON.parse(json) as ServerRequest;
+ assertEquals(parsed.type, "workflow.run");
+ assertEquals(parsed.id, "req-42");
+ if (parsed.type === "workflow.run") {
+ assertEquals(parsed.payload.workflowIdOrName, "build");
+ assertEquals(parsed.payload.inputs, { version: "1.0.0" });
+ }
+});
diff --git a/src/serve/serializer.ts b/src/serve/serializer.ts
new file mode 100644
index 00000000..d819dc6e
--- /dev/null
+++ b/src/serve/serializer.ts
@@ -0,0 +1,75 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+/**
+ * Converts libswamp events to JSON-safe objects for WebSocket transmission.
+ */
+
+import type { SwampError } from "../libswamp/mod.ts";
+import type { SerializedError, SerializedEvent } from "./protocol.ts";
+
+/**
+ * Serializes a libswamp event (WorkflowRunEvent or ModelMethodRunEvent) into
+ * a JSON-safe object. Handles non-serializable values like Error instances.
+ */
+export function serializeEvent(
+ event: { kind: string; [key: string]: unknown },
+): SerializedEvent {
+ if (event.kind === "error") {
+ const swampError = event.error as SwampError;
+ return {
+ kind: "error",
+ error: serializeSwampError(swampError),
+ };
+ }
+
+ // For all other events, do a JSON-safe clone that handles Error instances
+ return jsonSafeClone(event) as SerializedEvent;
+}
+
+/**
+ * Serializes a SwampError into a JSON-safe error object.
+ */
+export function serializeSwampError(error: SwampError): SerializedError {
+ return {
+ code: error.code,
+ message: error.message,
+ ...(error.details !== undefined && { details: error.details }),
+ };
+}
+
+/**
+ * Deep-clones an object, converting Error instances to plain objects.
+ */
+function jsonSafeClone(value: unknown): unknown {
+ if (value === null || value === undefined) return value;
+ if (value instanceof Error) {
+ return { message: value.message, stack: value.stack };
+ }
+ if (value instanceof Date) return value.toISOString();
+ if (Array.isArray(value)) return value.map(jsonSafeClone);
+ if (typeof value === "object") {
+ const result: Record = {};
+ for (const [k, v] of Object.entries(value)) {
+ result[k] = jsonSafeClone(v);
+ }
+ return result;
+ }
+ return value;
+}
diff --git a/src/serve/serializer_test.ts b/src/serve/serializer_test.ts
new file mode 100644
index 00000000..22f27386
--- /dev/null
+++ b/src/serve/serializer_test.ts
@@ -0,0 +1,195 @@
+// Swamp, an Automation Framework
+// Copyright (C) 2026 System Initiative, Inc.
+//
+// This file is part of Swamp.
+//
+// Swamp is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License version 3
+// as published by the Free Software Foundation, with the Swamp
+// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
+// file).
+//
+// Swamp is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with Swamp. If not, see .
+
+import { assertEquals, assertExists } from "@std/assert";
+import { serializeEvent, serializeSwampError } from "./serializer.ts";
+import type { SwampError } from "../libswamp/mod.ts";
+
+// ── serializeSwampError ─────────────────────────────────────────────────
+
+Deno.test("serializeSwampError - basic error with code and message", () => {
+ const err: SwampError = {
+ code: "not_found",
+ message: "Model not found",
+ };
+ const result = serializeSwampError(err);
+ assertEquals(result, { code: "not_found", message: "Model not found" });
+});
+
+Deno.test("serializeSwampError - includes details when present", () => {
+ const err: SwampError = {
+ code: "validation_error",
+ message: "Invalid inputs",
+ details: { field: "name", reason: "required" },
+ };
+ const result = serializeSwampError(err);
+ assertEquals(result, {
+ code: "validation_error",
+ message: "Invalid inputs",
+ details: { field: "name", reason: "required" },
+ });
+});
+
+Deno.test("serializeSwampError - omits details when undefined", () => {
+ const err: SwampError = {
+ code: "cancelled",
+ message: "Operation cancelled",
+ details: undefined,
+ };
+ const result = serializeSwampError(err);
+ assertEquals(result, { code: "cancelled", message: "Operation cancelled" });
+ assertEquals("details" in result, false);
+});
+
+Deno.test("serializeSwampError - does not include cause (non-serializable)", () => {
+ const err: SwampError = {
+ code: "network",
+ message: "Connection refused",
+ cause: new Error("ECONNREFUSED"),
+ };
+ const result = serializeSwampError(err);
+ assertEquals(result, { code: "network", message: "Connection refused" });
+ assertEquals("cause" in result, false);
+});
+
+// ── serializeEvent with error kind ──────────────────────────────────────
+
+Deno.test("serializeEvent - error event serializes the SwampError", () => {
+ const swampError: SwampError = {
+ code: "not_authenticated",
+ message: "Not authenticated",
+ };
+ const result = serializeEvent({ kind: "error", error: swampError });
+ assertEquals(result, {
+ kind: "error",
+ error: { code: "not_authenticated", message: "Not authenticated" },
+ });
+});
+
+Deno.test("serializeEvent - error event with details", () => {
+ const swampError: SwampError = {
+ code: "validation_error",
+ message: "Bad input",
+ details: { missing: ["name"] },
+ };
+ const result = serializeEvent({ kind: "error", error: swampError });
+ assertEquals(result, {
+ kind: "error",
+ error: {
+ code: "validation_error",
+ message: "Bad input",
+ details: { missing: ["name"] },
+ },
+ });
+});
+
+// ── serializeEvent with non-error kinds (jsonSafeClone) ─────────────────
+
+Deno.test("serializeEvent - simple event passes through", () => {
+ const event = { kind: "started", runId: "abc-123", workflowName: "deploy" };
+ const result = serializeEvent(event);
+ assertEquals(result, {
+ kind: "started",
+ runId: "abc-123",
+ workflowName: "deploy",
+ });
+});
+
+Deno.test("serializeEvent - preserves null and undefined values", () => {
+ const event = { kind: "test", a: null, b: undefined };
+ const result = serializeEvent(event);
+ assertEquals((result as Record).a, null);
+ assertEquals((result as Record).b, undefined);
+});
+
+Deno.test("serializeEvent - converts Date to ISO string", () => {
+ const date = new Date("2026-03-27T12:00:00.000Z");
+ const event = { kind: "test", timestamp: date };
+ const result = serializeEvent(event);
+ assertEquals(
+ (result as Record).timestamp,
+ "2026-03-27T12:00:00.000Z",
+ );
+});
+
+Deno.test("serializeEvent - converts Error instances to plain objects", () => {
+ const error = new Error("something broke");
+ const event = { kind: "test", nested: { err: error } };
+ const result = serializeEvent(event);
+ const nested = (result as Record).nested as Record<
+ string,
+ unknown
+ >;
+ assertEquals(nested.err !== null && typeof nested.err === "object", true);
+ const errObj = nested.err as Record;
+ assertEquals(errObj.message, "something broke");
+ assertExists(errObj.stack);
+});
+
+Deno.test("serializeEvent - handles arrays", () => {
+ const event = { kind: "test", items: [1, "two", null] };
+ const result = serializeEvent(event);
+ assertEquals((result as Record).items, [1, "two", null]);
+});
+
+Deno.test("serializeEvent - handles nested objects", () => {
+ const event = {
+ kind: "completed",
+ run: {
+ id: "r1",
+ status: "succeeded",
+ jobs: [{ name: "build", steps: [] }],
+ },
+ };
+ const result = serializeEvent(event);
+ assertEquals(result, event);
+});
+
+Deno.test("serializeEvent - handles array with mixed types including Date and Error", () => {
+ const date = new Date("2026-01-01T00:00:00.000Z");
+ const error = new Error("fail");
+ const event = { kind: "test", mixed: [date, error, 42, "ok"] };
+ const result = serializeEvent(event);
+ const mixed = (result as Record).mixed as unknown[];
+ assertEquals(mixed[0], "2026-01-01T00:00:00.000Z");
+ assertEquals((mixed[1] as Record).message, "fail");
+ assertEquals(mixed[2], 42);
+ assertEquals(mixed[3], "ok");
+});
+
+Deno.test("serializeEvent - primitives pass through unchanged", () => {
+ const event = {
+ kind: "test",
+ num: 42,
+ str: "hello",
+ bool: true,
+ };
+ const result = serializeEvent(event);
+ assertEquals(result, event);
+});
+
+Deno.test("serializeEvent - deep clone does not mutate original", () => {
+ const inner = { value: "original" };
+ const event = { kind: "test", data: inner };
+ const result = serializeEvent(event);
+ (
+ (result as Record).data as Record
+ ).value = "modified";
+ assertEquals(inner.value, "original");
+});