diff --git a/CLAUDE.md b/CLAUDE.md index ab961ee..3482438 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -15,7 +15,7 @@ Graph-based workflow orchestrator with unified state model for NullClaw AI bot a | File | Role | |------|------| | `main.zig` | CLI args (`--port`, `--db`, `--config`, `--version`, `--export-manifest`, `--from-json`), HTTP accept loop, engine thread, tracker thread | -| `api.zig` | REST API routing and 30+ endpoint handlers (runs, workers, workflows, checkpoints, state, SSE stream, tracker) | +| `api.zig` | REST API routing and 30+ endpoint handlers (runs, workers, workflows, checkpoints, state, stream snapshots, tracker) | | `store.zig` | SQLite layer, CRUD methods for all tables, schema migrations (4 migration files) | | `engine.zig` | Graph-based state scheduler: tick loop, 7 node type handlers, checkpoints, reducers, goto, breakpoints, deferred nodes, reconciliation | | `state.zig` | Unified state model: 7 reducer types (last_value, append, merge, add, min, max, add_messages), overwrite bypass, ephemeral keys, state path resolution | @@ -46,7 +46,7 @@ Graph-based workflow orchestrator with unified state model for NullClaw AI bot a ```sh zig build # build -zig build test # unit tests (320 tests) +zig build test # unit tests (355 tests) zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock workers) ./zig-out/bin/nullboiler --port 8080 --db nullboiler.db --config config.json ``` @@ -68,8 +68,8 @@ zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock wo | POST | `/workers` | Register worker | | GET | `/workers` | List workers | | DELETE | `/workers/{id}` | Remove worker | -| POST | `/runs` | Create workflow run (legacy step-array or graph format) | -| GET | `/runs` | List runs (supports ?status= filter) | +| POST | `/runs` | Create workflow run from legacy step-array format | +| GET | `/runs` | List runs (supports ?status= and ?workflow_id= filters) | | GET | `/runs/{id}` | Get run details | | POST | `/runs/{id}/cancel` | Cancel run | | POST | `/runs/{id}/retry` | Retry failed run | @@ -82,7 +82,7 @@ zig build && bash tests/test_e2e.sh # e2e tests (requires Python 3 for mock wo | GET | `/runs/{id}/events` | List run events | | GET | `/runs/{id}/checkpoints` | List checkpoints for run | | GET | `/runs/{id}/checkpoints/{cpId}` | Get checkpoint details | -| GET | `/runs/{id}/stream` | SSE stream (supports ?mode=values\|updates\|tasks\|debug) | +| GET | `/runs/{id}/stream` | JSON stream snapshot (supports ?mode=values\|updates\|tasks\|debug\|custom and ?after_seq=) | | POST | `/workflows` | Create workflow definition | | GET | `/workflows` | List workflow definitions | | GET | `/workflows/{id}` | Get workflow definition | @@ -136,9 +136,12 @@ MQTT listener: (conditional, for async MQTT workers) Redis listener: (conditional, for async Redis workers) ``` -### SSE Streaming +### Stream Snapshots -5 modes for real-time consumption via `GET /runs/{id}/stream?mode=X`: +`GET /runs/{id}/stream` currently returns a JSON snapshot containing persisted +events and buffered in-memory stream events. It supports `mode=X` filtering and +`after_seq=N` cursors for independent consumers. The internal stream hub uses 5 +modes: - `values` -- full state after each step - `updates` -- node name + partial state updates - `tasks` -- task start/finish with metadata diff --git a/docs/api/README.md b/docs/api/README.md new file mode 100644 index 0000000..39323fd --- /dev/null +++ b/docs/api/README.md @@ -0,0 +1,134 @@ +# NullBoiler HTTP API + +This directory hosts the HTTP API contract for NullBoiler. + +- **Source of truth:** [`docs/openapi.yaml`](../openapi.yaml) — OpenAPI 3.1 +- **Maintainer roadmap reference:** [`reference/todo.md` P2-04](../../reference/todo.md) + +The spec covers all 36 HTTP operations exposed by `src/api.zig` and all +domain types from `src/types.zig`. + +## At a glance + +| Group | Endpoints | +|---|---| +| Health, Metrics | `GET /health`, `GET /metrics` | +| Runs | `POST /runs`, `GET /runs`, `GET /runs/{id}`, `POST /runs/{id}/{cancel,retry,resume,replay,state}`, `POST /runs/fork` | +| Steps & Events | `GET /runs/{id}/steps`, `GET /runs/{id}/steps/{step_id}`, `GET /runs/{id}/events`, `GET /runs/{id}/stream` (JSON stream snapshot) | +| Checkpoints | `GET /runs/{id}/checkpoints`, `GET /runs/{id}/checkpoints/{cp_id}` | +| Workers | `POST /workers`, `GET /workers`, `DELETE /workers/{id}` | +| Workflows | full CRUD on `/workflows`, plus `validate`, `mermaid`, `run` | +| Tracker bridge | `GET /tracker/{status,tasks,stats,tasks/{id}}`, `POST /tracker/refresh` | +| Admin | `POST /admin/drain`, `GET /rate-limits` | +| Internal | `POST /internal/agent-events/{run_id}/{step_id}` (worker callback) | + +## Quick start + +### View the spec + +```bash +# Redoc (no install — uses npx) +npx @redocly/cli preview-docs docs/openapi.yaml + +# Swagger UI (Docker) +docker run --rm -p 8088:8080 \ + -e SWAGGER_JSON=/spec/openapi.yaml \ + -v "$(pwd)/docs:/spec" \ + swaggerapi/swagger-ui +# then open http://localhost:8088 +``` + +### Validate locally + +```bash +# Python +python -m pip install openapi-spec-validator +python -m openapi_spec_validator docs/openapi.yaml + +# Node +npx @apidevtools/swagger-cli validate docs/openapi.yaml + +# Redocly (also runs lint rules beyond bare OpenAPI) +npx @redocly/cli lint docs/openapi.yaml +``` + +### Generate client SDKs + +The spec is suitable for `openapi-generator-cli`. Recommended targets and +generators: + +```bash +# TypeScript (fetch-based, browser & node) +npx @openapitools/openapi-generator-cli generate \ + -i docs/openapi.yaml \ + -g typescript-fetch \ + -o sdks/typescript-fetch \ + --additional-properties=npmName=@nullboiler/client,supportsES6=true,typescriptThreePlus=true + +# Python (httpx async + sync) +npx @openapitools/openapi-generator-cli generate \ + -i docs/openapi.yaml \ + -g python \ + -o sdks/python \ + --additional-properties=packageName=nullboiler_client,projectName=nullboiler-client + +# Go +npx @openapitools/openapi-generator-cli generate \ + -i docs/openapi.yaml \ + -g go \ + -o sdks/go \ + --additional-properties=packageName=nullboiler,withGoMod=true +``` + +For first-class language coverage we recommend publishing each SDK from +its own repository (e.g. `nullboiler/nullboiler-ts-sdk`) and pinning a +spec version per release tag. + +## Conventions + +- **Ids** — opaque strings; do not parse (currently 22-char ULIDs but + this is not part of the contract). +- **Timestamps** — `*_ms` fields are milliseconds since the Unix epoch + (UTC), `int64`. +- **Errors** — every 4xx/5xx response uses the same envelope: + ```json + {"error": {"code": "", "message": ""}} + ``` + See `ErrorDetail.code` for the closed enum of codes. +- **Idempotency** — `POST /runs` honors `Idempotency-Key` (preferred) or + `idempotency_key` body field. Stored-workflow launches via + `POST /workflows/{id}/run` do not currently implement idempotency. +- **Auth** — bearer token; `/health` and `/metrics` are public so that + load balancers and Prometheus scrapers can reach them without + provisioning a token. + +## Versioning the spec + +The spec carries the same `info.version` as `GET /health` returns. When +the API surface changes: + +1. Update `src/api.zig` and the matching tests. +2. Update `docs/openapi.yaml` and bump `info.version` in lockstep with + the next NullBoiler release. +3. Re-run `python -m openapi_spec_validator docs/openapi.yaml` (or the + Node equivalent) before committing. +4. Regenerate any vendored SDKs you ship. + +A future enhancement (P2-03 in `reference/todo.md`) is to validate the +spec against a running orchestrator in CI by hitting every endpoint with +a smoke client. + +## Provenance + +This spec was authored from the source of truth files on the `main` +branch: + +- `src/api.zig` — route table (`handleRequest`) and per-handler bodies +- `src/types.zig` — all enums and DB row types +- `src/strategy.zig` — strategy expansion semantics +- `src/workflow_validation.zig` and `src/engine.zig` — graph workflow shape + and validation rules +- `src/metrics.zig` — Prometheus exposition (used in `/metrics` example) + +If you change one of those files, update this spec. CI does not yet +diff them, so the discipline is currently social. diff --git a/docs/openapi.yaml b/docs/openapi.yaml new file mode 100644 index 0000000..75a9b3e --- /dev/null +++ b/docs/openapi.yaml @@ -0,0 +1,1903 @@ +openapi: 3.1.0 +info: + title: NullBoiler Orchestrator API + summary: Workflow orchestration HTTP API for the Null ecosystem. + description: | + NullBoiler is the orchestration engine of the Null ecosystem. It pulls work + from a tracker (or accepts ad-hoc runs), applies scheduling/routing + strategies, and dispatches steps to compatible worker runtimes (NullClaw, + OpenClaw-compatible, ZeroClaw, PicoClaw bridges). + + This document is the source of truth for the HTTP surface used by: + + - dashboards and operators talking to a single NullBoiler instance, + - automation/SDKs driving runs and workflows, + - integration tests that need to assert request/response shapes, + - generated client SDKs (TypeScript, Python, Go) — see + `docs/api/README.md` for the generator command. + + The spec was reverse-engineered from `src/api.zig` and `src/types.zig` + on the NullBoiler `main` branch and tracks the same version reported by + `GET /health`. + + ## Endpoint availability + + A few endpoints are conditional on runtime configuration: + + - **`/tracker/*`** — only mounted when `tracker` is configured in + `nullboiler.config.json`. Otherwise the orchestrator runs in + tracker-less mode and these paths return `404`. + + ## Authentication + + All endpoints except `GET /health` and `GET /metrics` require a bearer + token when the orchestrator is started with `--api-token` or + `NULLBOILER_API_TOKEN`. When no token is configured, the API is open. + + ## Idempotency + + `POST /runs` accepts an idempotency key either as the + `Idempotency-Key` request header (preferred) or as `idempotency_key` + in the body. Replays of the same payload return `200` with + `idempotent_replay: true`. Replays with a different payload return + `409 conflict`. + + ## Drain mode + + `POST /admin/drain` puts the orchestrator into drain mode. While + draining, `POST /runs` returns `503` and existing runs continue to + completion. Drain mode is process-local and resets on restart. + version: "2026.3.2" + license: + name: MIT + identifier: MIT + contact: + name: NullBoiler maintainers + url: https://github.com/nullclaw/nullboiler + +servers: + - url: http://localhost:8080 + description: Local development (default port from docker-compose) + - url: http://localhost:3000 + description: Local development (default Dockerfile CMD port) + - url: https://{host} + description: Custom deployment + variables: + host: + default: nullboiler.example.com + +tags: + - name: Health + description: Liveness, version, and aggregate health probes. + - name: Metrics + description: Prometheus-format scrape endpoint for `/metrics`. + - name: Runs + description: Workflow run lifecycle — create, inspect, cancel, retry, replay, fork. + - name: Steps + description: Individual step inspection within a run. + - name: Events + description: Run-scoped event feed and stream snapshots. + - name: Checkpoints + description: Run state snapshots used for resume, fork, and replay. + - name: Workers + description: Worker registry — register, list, delete worker runtimes. + - name: Workflows + description: Reusable workflow definitions (CRUD + validate + mermaid + run). + - name: Tracker + description: Bridge to NullTickets when running in tracker-driven mode. + - name: Admin + description: Operational controls — drain mode, rate limit inspection. + - name: Internal + description: Endpoints used by worker runtimes — not for external consumers. + +security: + - bearerAuth: [] + +paths: + /health: + get: + tags: [Health] + summary: Health probe + description: Public health probe. Always returns 200 with current version and counters. + operationId: getHealth + security: [] + responses: + '200': + description: Health snapshot + content: + application/json: + schema: + $ref: '#/components/schemas/HealthResponse' + examples: + ok: + value: + status: ok + version: "2026.3.2" + active_runs: 3 + total_workers: 2 + + /metrics: + get: + tags: [Metrics] + summary: Prometheus metrics + description: | + Prometheus exposition format. Public (no auth) so that scrapers can + reach it without provisioning a token. + operationId: getMetrics + security: [] + responses: + '200': + description: Metrics in text exposition format + content: + text/plain: + schema: + type: string + example: | + # TYPE nullboiler_http_requests_total counter + nullboiler_http_requests_total 1234 + # TYPE nullboiler_runs_created_total counter + nullboiler_runs_created_total 42 + # ... 9 more counters + + /admin/drain: + post: + tags: [Admin] + summary: Enter drain mode + description: | + Refuses new runs. Existing runs continue. Drain mode is process-local + and resets on restart. There is no inverse endpoint — restart the + orchestrator to leave drain mode. + operationId: enableDrain + responses: + '200': + description: Drain mode enabled + content: + application/json: + schema: + type: object + required: [status] + properties: + status: + type: string + enum: [draining] + '500': + $ref: '#/components/responses/InternalError' + + /rate-limits: + get: + tags: [Admin] + summary: Inspect per-worker rate limit state + description: | + Returns rate limit state per worker_id. Empty array when no rate + limits are configured or none have been observed yet. Useful for + investigating dispatch backpressure. + operationId: listRateLimits + responses: + '200': + description: Array of per-worker rate limit snapshots + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/RateLimitEntry' + + /workers: + get: + tags: [Workers] + summary: List registered workers + operationId: listWorkers + responses: + '200': + description: Array of workers + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Worker' + '500': + $ref: '#/components/responses/InternalError' + post: + tags: [Workers] + summary: Register a worker + description: | + Registers a worker with the orchestrator. Workers can also be + configured statically in `config.json` — those have `source: config`. + Registered workers have `source: registered`. + operationId: registerWorker + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkerRegistration' + responses: + '201': + description: Worker registered + content: + application/json: + schema: + type: object + required: [id, status, protocol] + properties: + id: + type: string + status: + type: string + enum: [active] + protocol: + $ref: '#/components/schemas/WorkerProtocol' + '400': + $ref: '#/components/responses/BadRequest' + '409': + description: Worker id already exists + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '500': + $ref: '#/components/responses/InternalError' + + /workers/{id}: + delete: + tags: [Workers] + summary: Remove a worker + operationId: deleteWorker + parameters: + - $ref: '#/components/parameters/WorkerId' + responses: + '200': + description: Worker removed + content: + application/json: + schema: + type: object + required: [ok] + properties: + ok: + type: boolean + enum: [true] + '500': + $ref: '#/components/responses/InternalError' + + /runs: + post: + tags: [Runs] + summary: Create a workflow run + description: | + Creates a new run from an inline workflow definition. Use + `/workflows/{id}/run` to launch a stored workflow instead. + + Idempotency: pass `Idempotency-Key` header (preferred) or + `idempotency_key` body field. Replays of the exact same payload + return `200` with `idempotent_replay: true`. Different payloads + with the same key return `409 conflict`. + operationId: createRun + parameters: + - name: Idempotency-Key + in: header + required: false + schema: + type: string + minLength: 1 + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreate' + responses: + '200': + description: Idempotent replay of an existing run + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + examples: + replay: + value: + id: r_abc123 + status: completed + idempotent_replay: true + '201': + description: Run created + content: + application/json: + schema: + $ref: '#/components/schemas/RunCreated' + examples: + created: + value: + id: r_abc123 + status: running + idempotent_replay: false + '400': + $ref: '#/components/responses/BadRequest' + '409': + description: Idempotency conflict — same key, different payload + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + '503': + description: Orchestrator is in drain mode + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + draining: + value: + error: + code: draining + message: orchestrator is draining and does not accept new runs + '500': + $ref: '#/components/responses/InternalError' + get: + tags: [Runs] + summary: List runs + description: | + Returns a paginated wrapper. Items are the condensed run shape + (id, status, optional idempotency_key, optional workflow_id, + timestamps) — use `GET /runs/{id}` for the full detail with + embedded steps, workflow snapshot, input, callbacks, and state. + operationId: listRuns + parameters: + - name: status + in: query + required: false + schema: + $ref: '#/components/schemas/RunStatus' + description: Filter by run status. + - name: workflow_id + in: query + required: false + schema: + type: string + description: Filter to runs created from a specific stored workflow. + - name: limit + in: query + required: false + schema: + type: integer + minimum: 1 + maximum: 1000 + default: 100 + - name: offset + in: query + required: false + schema: + type: integer + minimum: 0 + maximum: 1000000000 + default: 0 + responses: + '200': + description: Paginated list of runs (newest first) + content: + application/json: + schema: + $ref: '#/components/schemas/RunListResponse' + examples: + page: + value: + items: + - id: ea25c0ac-9c45-441c-a1ff-d2f4602a3b2e + status: failed + created_at_ms: 1778183970318 + updated_at_ms: 1778183975351 + limit: 100 + offset: 0 + next_offset: 1 + has_more: false + + /runs/{id}: + get: + tags: [Runs] + summary: Get run detail + operationId: getRun + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Run with embedded steps + content: + application/json: + schema: + $ref: '#/components/schemas/RunDetail' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/cancel: + post: + tags: [Runs] + summary: Cancel a run + operationId: cancelRun + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Cancellation accepted + content: + application/json: + schema: + $ref: '#/components/schemas/RunStatusResponse' + '404': + $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/Conflict' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/retry: + post: + tags: [Runs] + summary: Retry a failed run + description: Resets failed steps to ready and moves the run back to `running`. + operationId: retryRun + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Run moved back to running + content: + application/json: + schema: + $ref: '#/components/schemas/RunStatusResponse' + '404': + $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/Conflict' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/steps: + get: + tags: [Steps] + summary: List steps in a run + operationId: listRunSteps + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Array of steps + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Step' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/steps/{step_id}: + get: + tags: [Steps] + summary: Get step detail + operationId: getStep + parameters: + - $ref: '#/components/parameters/RunId' + - $ref: '#/components/parameters/StepId' + responses: + '200': + description: Step detail + content: + application/json: + schema: + $ref: '#/components/schemas/Step' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/events: + get: + tags: [Events] + summary: List run events + description: | + Polling-mode event feed backed by persisted run events. + operationId: listRunEvents + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Array of events ordered by id ascending + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Event' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/stream: + get: + tags: [Events] + summary: Snapshot run stream events + description: | + Returns the current run status/state, persisted events, and buffered + in-memory stream events as a regular JSON response. Use `after_seq` + to page through buffered stream events without sharing cursor state + between consumers. + operationId: streamRunEvents + parameters: + - $ref: '#/components/parameters/RunId' + - name: mode + in: query + required: false + schema: + type: string + example: values,debug + description: Comma-separated stream modes to include (`values`, `updates`, `tasks`, `debug`, `custom`). Defaults to all modes. + - name: after_seq + in: query + required: false + schema: + type: integer + format: int64 + minimum: 0 + default: 0 + description: Return buffered stream events after this sequence number. + responses: + '200': + description: Stream snapshot + content: + application/json: + schema: + $ref: '#/components/schemas/StreamSnapshot' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/checkpoints: + get: + tags: [Checkpoints] + summary: List checkpoints for a run + operationId: listCheckpoints + parameters: + - $ref: '#/components/parameters/RunId' + responses: + '200': + description: Array of checkpoints + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Checkpoint' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/checkpoints/{cp_id}: + get: + tags: [Checkpoints] + summary: Get checkpoint detail + operationId: getCheckpoint + parameters: + - $ref: '#/components/parameters/RunId' + - $ref: '#/components/parameters/CheckpointId' + responses: + '200': + description: Checkpoint with full state + content: + application/json: + schema: + $ref: '#/components/schemas/Checkpoint' + '404': + $ref: '#/components/responses/NotFound' + + /runs/{id}/resume: + post: + tags: [Runs] + summary: Resume an interrupted run + description: | + Resumes the latest checkpoint for a run currently in `interrupted` + status. Optional `state_updates` are applied before the run is moved + back to `running`. + operationId: resumeRun + parameters: + - $ref: '#/components/parameters/RunId' + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + state_updates: + type: object + additionalProperties: true + description: Optional state values to merge before resuming. + responses: + '200': + description: Run resumed + content: + application/json: + schema: + $ref: '#/components/schemas/RunStatusResponse' + '404': + $ref: '#/components/responses/NotFound' + '409': + $ref: '#/components/responses/Conflict' + '500': + $ref: '#/components/responses/InternalError' + + /runs/fork: + post: + tags: [Runs] + summary: Fork a run from a checkpoint into a new run + description: | + Creates a new run that branches from the parent's checkpoint. + Useful for what-if exploration without disturbing the parent run. + operationId: forkRun + requestBody: + required: true + content: + application/json: + schema: + type: object + required: [checkpoint_id] + properties: + checkpoint_id: + type: string + description: Existing checkpoint to fork from; the parent run is inferred from this checkpoint. + state_overrides: + type: object + additionalProperties: true + responses: + '201': + description: Forked run created + content: + application/json: + schema: + $ref: '#/components/schemas/ForkRunResponse' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/state: + post: + tags: [Runs] + summary: Inject state into a running run + description: | + Schedules an injection of state updates into the run. Optionally + delays application until after a specified step completes. + operationId: injectRunState + parameters: + - $ref: '#/components/parameters/RunId' + requestBody: + required: true + content: + application/json: + schema: + type: object + required: [updates] + properties: + updates: + type: object + additionalProperties: true + description: State key/value updates to merge. + apply_after_step: + type: string + description: Optional def_step_id — defer until that step finishes. + responses: + '200': + description: State update applied immediately or queued for a later step + content: + application/json: + schema: + $ref: '#/components/schemas/StateInjectionResponse' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /runs/{id}/replay: + post: + tags: [Runs] + summary: Replay a run from a checkpoint + description: | + Resets the existing run back to a checkpoint, deletes later steps and + checkpoints, and moves the same run back to `running`. + operationId: replayRun + parameters: + - $ref: '#/components/parameters/RunId' + requestBody: + required: true + content: + application/json: + schema: + type: object + anyOf: + - required: [from_checkpoint_id] + - required: [checkpoint_id] + properties: + from_checkpoint_id: + type: string + description: Canonical checkpoint id to replay from. + checkpoint_id: + type: string + description: Backward-compatible alias for `from_checkpoint_id`. + responses: + '200': + description: Run reset to checkpoint and moved to running + content: + application/json: + schema: + $ref: '#/components/schemas/ReplayRunResponse' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /workflows: + post: + tags: [Workflows] + summary: Store a reusable workflow definition + operationId: createWorkflow + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkflowCreate' + responses: + '201': + description: Workflow stored + content: + application/json: + schema: + $ref: '#/components/schemas/WorkflowCreated' + '400': + $ref: '#/components/responses/BadRequest' + '500': + $ref: '#/components/responses/InternalError' + get: + tags: [Workflows] + summary: List stored workflows + operationId: listWorkflows + responses: + '200': + description: Array of workflows including definitions + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/Workflow' + + /workflows/{id}: + get: + tags: [Workflows] + summary: Get workflow with full definition + operationId: getWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Workflow with definition + content: + application/json: + schema: + $ref: '#/components/schemas/Workflow' + '404': + $ref: '#/components/responses/NotFound' + put: + tags: [Workflows] + summary: Update workflow definition + description: | + Replaces the workflow definition. If `version` is omitted, the stored + version is preserved. Existing runs already created from older versions + are unaffected because they hold a frozen snapshot of the workflow. + operationId: updateWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/WorkflowUpdate' + responses: + '200': + description: Updated workflow + content: + application/json: + schema: + $ref: '#/components/schemas/OkResponse' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + delete: + tags: [Workflows] + summary: Delete a workflow + operationId: deleteWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Deleted + content: + application/json: + schema: + $ref: '#/components/schemas/OkResponse' + '404': + $ref: '#/components/responses/NotFound' + + /workflows/{id}/validate: + post: + tags: [Workflows] + summary: Validate a stored workflow definition + description: | + Runs graph-based workflow validation without creating a run. Returns + a list of validation errors (empty array on success) and, when + generation succeeds, a Mermaid diagram. + operationId: validateWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Validation result + content: + application/json: + schema: + $ref: '#/components/schemas/ValidationResult' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /workflows/{id}/mermaid: + get: + tags: [Workflows] + summary: Render workflow as Mermaid diagram + description: Returns a Mermaid `flowchart` string suitable for embedding in docs. + operationId: getWorkflowMermaid + parameters: + - $ref: '#/components/parameters/WorkflowId' + responses: + '200': + description: Mermaid source + content: + text/plain: + schema: + type: string + example: | + flowchart TD + start --> plan + plan --> build + build --> review + '404': + $ref: '#/components/responses/NotFound' + + /workflows/{id}/run: + post: + tags: [Workflows] + summary: Launch a run from a stored workflow + description: | + Convenience over `POST /runs` when the graph workflow definition is + already stored. The current implementation accepts an optional + `input` object and does not implement idempotency for this endpoint. + operationId: runWorkflow + parameters: + - $ref: '#/components/parameters/WorkflowId' + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + input: + type: object + additionalProperties: true + responses: + '201': + description: Run created + content: + application/json: + schema: + $ref: '#/components/schemas/RunStatusResponse' + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /tracker/status: + get: + tags: [Tracker] + summary: NullTickets bridge status + description: | + Reports whether NullBoiler is connected to a NullTickets tracker + and basic queue health. Returns `404 tracker_disabled` when no + tracker is configured. + operationId: getTrackerStatus + responses: + '200': + description: Tracker bridge status + content: + application/json: + schema: + $ref: '#/components/schemas/TrackerStatus' + '404': + $ref: '#/components/responses/TrackerDisabled' + '500': + $ref: '#/components/responses/InternalError' + + /tracker/tasks: + get: + tags: [Tracker] + summary: List active tracker tasks + operationId: listTrackerTasks + responses: + '200': + description: Array of tracker tasks + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/TrackerTask' + '404': + $ref: '#/components/responses/TrackerDisabled' + '500': + $ref: '#/components/responses/InternalError' + + /tracker/tasks/{task_id}: + get: + tags: [Tracker] + summary: Tracker task detail + operationId: getTrackerTask + parameters: + - name: task_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Tracker task detail + content: + application/json: + schema: + $ref: '#/components/schemas/TrackerTask' + '404': + $ref: '#/components/responses/NotFound' + '500': + $ref: '#/components/responses/InternalError' + + /tracker/stats: + get: + tags: [Tracker] + summary: Aggregate tracker statistics + operationId: getTrackerStats + responses: + '200': + description: Aggregate counts by state + content: + application/json: + schema: + $ref: '#/components/schemas/TrackerStats' + '404': + $ref: '#/components/responses/TrackerDisabled' + '500': + $ref: '#/components/responses/InternalError' + + /tracker/refresh: + post: + tags: [Tracker] + summary: Force a tracker poll cycle + description: | + Acknowledges an operator refresh request. The current handler returns + success immediately. + operationId: refreshTracker + responses: + '200': + description: Refresh request acknowledged + content: + application/json: + schema: + $ref: '#/components/schemas/TrackerRefreshResponse' + + /internal/agent-events/{run_id}/{step_id}: + post: + tags: [Internal] + summary: Worker callback — agent event ingest + description: | + Workers post agent execution events here (tool calls, model + results, status updates). Not for external consumers. + operationId: postAgentEvent + parameters: + - name: run_id + in: path + required: true + schema: + type: string + - name: step_id + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/AgentEventPayload' + responses: + '200': + description: Event accepted + content: + application/json: + schema: + type: object + properties: + ok: { type: boolean, enum: [true] } + '400': + $ref: '#/components/responses/BadRequest' + '404': + $ref: '#/components/responses/NotFound' + +components: + securitySchemes: + bearerAuth: + type: http + scheme: bearer + bearerFormat: token + description: | + Configured via `--api-token` CLI flag or `NULLBOILER_API_TOKEN` + environment variable. Health and metrics endpoints are unauthenticated. + + parameters: + RunId: + name: id + in: path + required: true + schema: + type: string + description: Run identifier returned from `POST /runs`. + StepId: + name: step_id + in: path + required: true + schema: + type: string + CheckpointId: + name: cp_id + in: path + required: true + schema: + type: string + WorkerId: + name: id + in: path + required: true + schema: + type: string + WorkflowId: + name: id + in: path + required: true + schema: + type: string + + responses: + BadRequest: + description: Validation error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + invalid_json: + value: + error: + code: bad_request + message: invalid JSON body + missing_field: + value: + error: + code: bad_request + message: 'missing required field: steps' + NotFound: + description: Resource not found + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + not_found: + value: + error: + code: not_found + message: run not found + Conflict: + description: Request conflicts with current resource state + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + conflict: + value: + error: + code: conflict + message: run is not failed + TrackerDisabled: + description: Pull-mode tracker is not configured + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + disabled: + value: + error: + code: tracker_disabled + message: pull-mode tracker is not configured + InternalError: + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' + examples: + oom: + value: + error: + code: internal + message: out of memory + + schemas: + HealthResponse: + type: object + required: [status, version, active_runs, total_workers] + properties: + status: + type: string + enum: [ok] + version: + type: string + example: "2026.3.2" + active_runs: + type: integer + format: int64 + minimum: 0 + total_workers: + type: integer + format: int64 + minimum: 0 + + ErrorResponse: + type: object + required: [error] + properties: + error: + $ref: '#/components/schemas/ErrorDetail' + + ErrorDetail: + type: object + required: [code, message] + properties: + code: + type: string + enum: + - bad_request + - conflict + - not_found + - internal + - draining + - unauthorized + - tracker_disabled + message: + type: string + + RunStatus: + type: string + enum: [pending, running, interrupted, completed, failed, cancelled, forked] + + StepStatus: + type: string + enum: [pending, ready, running, completed, failed, skipped, interrupted] + + StepType: + type: string + enum: [task, route, interrupt, agent, send, transform, subgraph] + + WorkerStatus: + type: string + enum: [active, draining, dead] + + WorkerSource: + type: string + enum: [config, registered] + + WorkerProtocol: + type: string + enum: [webhook, api_chat, openai_chat, mqtt, redis_stream, a2a] + description: | + - `webhook` — requires URL with explicit path (e.g. `/webhook`) + - `api_chat` / `openai_chat` — OpenAI-compatible chat completion + - `mqtt` — pub/sub MQTT broker + - `redis_stream` — Redis Streams + - `a2a` — Google Agent-to-Agent protocol + + TrackerTaskState: + type: string + enum: + - claiming + - workspace_setup + - spawning + - running + - completing + - completed + - failed + - stalled + - cooldown + - removing + + ReducerType: + type: string + enum: [last_value, append, merge, add, min, max, add_messages] + + Worker: + type: object + required: [id, url, protocol, model, tags, max_concurrent, source, status, consecutive_failures, created_at_ms] + properties: + id: { type: string } + url: { type: string, format: uri } + protocol: { $ref: '#/components/schemas/WorkerProtocol' } + model: + type: string + nullable: true + tags: + type: array + items: + type: string + max_concurrent: + type: integer + format: int64 + minimum: 1 + source: { $ref: '#/components/schemas/WorkerSource' } + status: { $ref: '#/components/schemas/WorkerStatus' } + consecutive_failures: + type: integer + format: int64 + minimum: 0 + circuit_open_until_ms: + type: integer + format: int64 + nullable: true + last_error_text: + type: string + nullable: true + created_at_ms: + type: integer + format: int64 + + WorkerRegistration: + type: object + required: [id, url] + properties: + id: + type: string + minLength: 1 + example: claw-1 + url: + type: string + format: uri + description: Must include explicit path for `webhook` protocol (e.g. `http://nullclaw:3000/webhook`). + example: http://nullclaw:3000/webhook + token: + type: string + default: "" + protocol: + allOf: + - $ref: '#/components/schemas/WorkerProtocol' + default: webhook + model: + type: string + description: Required when `protocol` is `openai_chat`. + tags: + type: array + items: + type: string + default: [] + max_concurrent: + type: integer + minimum: 1 + default: 1 + + RunSummary: + type: object + description: | + Condensed run shape returned in list responses. Optional + `idempotency_key` and `workflow_id` fields are present only when + non-null on the row. + required: [id, status, created_at_ms, updated_at_ms] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + idempotency_key: + type: string + description: Present only when set on the run. + workflow_id: + type: string + description: Present only when the run was launched from a stored workflow. + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + RunListResponse: + type: object + required: [items, limit, offset, next_offset, has_more] + properties: + items: + type: array + items: + $ref: '#/components/schemas/RunSummary' + limit: + type: integer + format: int64 + minimum: 1 + maximum: 1000 + description: Page size echoed back. + offset: + type: integer + format: int64 + minimum: 0 + description: Page offset echoed back. + next_offset: + type: integer + format: int64 + description: | + Pass as `offset` to the next request. Equals + `offset + items.length` on the last page. + has_more: + type: boolean + description: | + True when at least one more page exists. Computed by + fetching `limit + 1` rows and checking the overflow. + + RunDetail: + type: object + required: + - id + - status + - created_at_ms + - updated_at_ms + - checkpoint_count + - total_input_tokens + - total_output_tokens + - total_tokens + - steps + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + idempotency_key: + type: string + description: Present only when the run was created with an idempotency key. + workflow_id: + type: string + description: Present only when the run was launched from a stored workflow. + error_text: + type: string + description: Present only when the run has an error. + started_at_ms: { type: integer, format: int64 } + ended_at_ms: { type: integer, format: int64 } + state_json: + type: object + additionalProperties: true + description: Present only when persisted state exists for the run. + checkpoint_count: { type: integer, format: int64, minimum: 0 } + total_input_tokens: { type: integer, format: int64, minimum: 0 } + total_output_tokens: { type: integer, format: int64, minimum: 0 } + total_tokens: { type: integer, format: int64, minimum: 0 } + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + steps: + type: array + items: + $ref: '#/components/schemas/Step' + + RunCreate: + type: object + required: [steps] + properties: + steps: + type: array + minItems: 1 + items: + $ref: '#/components/schemas/StepDefinition' + strategy: + type: string + description: Name of a registered strategy (e.g. `sequential`, `parallel`). + idempotency_key: + type: string + minLength: 1 + input: + type: object + additionalProperties: true + callbacks: + type: object + additionalProperties: true + config: + type: object + additionalProperties: true + description: Run-scoped config (`tracker_url`, `tracker_api_token`, etc.). + reduce: + type: object + description: Optional reducer step appended after parallel strategy. + properties: + id: + type: string + default: __reduce + prompt_template: + type: string + + RunCreated: + type: object + required: [id, status, idempotent_replay] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + idempotent_replay: + type: boolean + + RunStatusResponse: + type: object + required: [id, status] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + + ForkRunResponse: + type: object + required: [id, status, forked_from_checkpoint] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + forked_from_checkpoint: { type: string } + + ReplayRunResponse: + type: object + required: [id, status, replayed_from_checkpoint] + properties: + id: { type: string } + status: { $ref: '#/components/schemas/RunStatus' } + replayed_from_checkpoint: { type: string } + + StateInjectionResponse: + type: object + required: [applied] + properties: + applied: + type: boolean + description: True when updates were applied immediately. + pending: + type: boolean + description: Present and true when updates were queued until `apply_after_step`. + + StepDefinition: + type: object + required: [id] + properties: + id: + type: string + description: Definition step id (unique within the workflow). + type: + allOf: + - $ref: '#/components/schemas/StepType' + default: task + depends_on: + type: array + items: + type: string + description: Definition step ids this step depends on. + retry: + type: object + properties: + max_attempts: + type: integer + minimum: 1 + default: 1 + timeout_ms: + type: integer + format: int64 + minimum: 1 + prompt_template: + type: string + worker: + type: string + description: Constrain dispatch to a specific worker id. + tags: + type: array + items: { type: string } + description: Worker-tag selector. + output_key: + type: string + output_mapping: + type: object + additionalProperties: true + items_key: + type: string + description: For `send` (fan-out) steps — state key holding the items array. + steps: + type: array + items: + $ref: '#/components/schemas/StepDefinition' + description: For `subgraph` or strategy-nested steps. + additionalProperties: true + + Step: + type: object + required: + - id + - run_id + - def_step_id + - type + - status + - attempt + - max_attempts + - created_at_ms + - updated_at_ms + - iteration_index + properties: + id: { type: string } + run_id: { type: string } + def_step_id: { type: string } + type: { $ref: '#/components/schemas/StepType' } + status: { $ref: '#/components/schemas/StepStatus' } + worker_id: + type: string + nullable: true + output_json: + type: object + additionalProperties: true + nullable: true + description: Present only after a step has output. + error_text: + type: string + nullable: true + attempt: { type: integer, format: int64, minimum: 1 } + max_attempts: { type: integer, format: int64, minimum: 1 } + timeout_ms: + type: integer + format: int64 + nullable: true + next_attempt_at_ms: + type: integer + format: int64 + nullable: true + parent_step_id: + type: string + nullable: true + item_index: + type: integer + format: int64 + nullable: true + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + started_at_ms: + type: integer + format: int64 + nullable: true + ended_at_ms: + type: integer + format: int64 + nullable: true + child_run_id: + type: string + nullable: true + iteration_index: { type: integer, format: int64 } + + Event: + type: object + required: [id, run_id, kind, ts_ms] + properties: + id: { type: integer, format: int64 } + run_id: { type: string } + step_id: + type: string + nullable: true + kind: + type: string + description: Event kind (e.g. `step.started`, `step.completed`, `run.completed`). + data: + type: object + additionalProperties: true + ts_ms: { type: integer, format: int64 } + + StreamEvent: + type: object + required: [seq, event, mode, data] + properties: + seq: { type: integer, format: int64, minimum: 1 } + event: { type: string } + mode: + type: string + enum: [values, updates, tasks, debug, custom] + data: + type: object + additionalProperties: true + + StreamPersistedEvent: + type: object + required: [kind, data, ts_ms] + properties: + kind: { type: string } + data: + type: object + additionalProperties: true + ts_ms: { type: integer, format: int64 } + + StreamSnapshot: + type: object + required: [status, events, stream_events, next_stream_seq, stream_oldest_seq, stream_gap] + properties: + status: { $ref: '#/components/schemas/RunStatus' } + state: + type: object + additionalProperties: true + description: Present only when persisted state exists for the run. + events: + type: array + items: + $ref: '#/components/schemas/StreamPersistedEvent' + stream_events: + type: array + items: + $ref: '#/components/schemas/StreamEvent' + next_stream_seq: { type: integer, format: int64, minimum: 0 } + stream_oldest_seq: { type: integer, format: int64, minimum: 0 } + stream_gap: + type: boolean + description: True when requested events are older than the retained buffer. + + Checkpoint: + type: object + required: [id, run_id, step_id, version, created_at_ms] + properties: + id: { type: string } + run_id: { type: string } + step_id: { type: string } + parent_id: + type: string + nullable: true + state: + type: object + additionalProperties: true + completed_nodes: + type: array + items: + type: string + version: { type: integer, format: int64 } + metadata: + type: object + additionalProperties: true + nullable: true + created_at_ms: { type: integer, format: int64 } + + Workflow: + type: object + required: [id, name, definition, version, created_at_ms, updated_at_ms] + properties: + id: { type: string } + name: { type: string } + definition: + $ref: '#/components/schemas/WorkflowDefinition' + version: { type: integer, format: int64, minimum: 1 } + created_at_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + WorkflowCreated: + type: object + required: [id, name, version] + properties: + id: { type: string } + name: { type: string } + version: { type: integer, format: int64 } + + OkResponse: + type: object + required: [ok] + properties: + ok: { type: boolean, enum: [true] } + + WorkflowDefinition: + type: object + description: Graph workflow definition consumed by the engine. + required: [nodes, edges] + properties: + nodes: + type: object + additionalProperties: + type: object + additionalProperties: true + description: Map of node name to node configuration. + edges: + type: array + items: + type: array + minItems: 2 + maxItems: 2 + items: + type: string + description: Directed edges such as `["__start__", "node"]` or `["router:yes", "next"]`. + state_schema: + type: object + additionalProperties: + type: object + additionalProperties: true + interrupt_before: + type: array + items: { type: string } + interrupt_after: + type: array + items: { type: string } + additionalProperties: true + + WorkflowCreate: + type: object + properties: + id: + type: string + minLength: 1 + description: Optional. Generated when omitted. + name: + type: string + default: untitled + version: + type: integer + format: int64 + minimum: 1 + default: 1 + definition_json: + $ref: '#/components/schemas/WorkflowDefinition' + additionalProperties: true + + WorkflowUpdate: + type: object + properties: + name: + type: string + default: untitled + version: + type: integer + format: int64 + minimum: 1 + description: Optional explicit version. When omitted, the existing version is preserved. + definition_json: + $ref: '#/components/schemas/WorkflowDefinition' + additionalProperties: true + + ValidationResult: + type: object + required: [valid, errors] + properties: + valid: + type: boolean + errors: + type: array + items: + type: object + required: [type, message] + properties: + type: + type: string + node: + type: string + key: + type: string + message: + type: string + mermaid: + type: string + description: Present when Mermaid generation succeeds. + + RateLimitEntry: + type: object + required: [worker_id, remaining, limit, reset_ms, updated_at_ms] + properties: + worker_id: { type: string } + remaining: { type: integer, format: int64, minimum: 0 } + limit: { type: integer, format: int64, minimum: 0 } + reset_ms: { type: integer, format: int64 } + updated_at_ms: { type: integer, format: int64 } + + TrackerStatus: + type: object + required: [mode, running_count, max_concurrent, completed_count, failed_count, poll_interval_ms, running] + properties: + mode: + type: string + enum: [pull] + running_count: { type: integer, format: int64, minimum: 0 } + max_concurrent: { type: integer, format: int64, minimum: 1 } + completed_count: { type: integer, format: int64, minimum: 0 } + failed_count: { type: integer, format: int64, minimum: 0 } + poll_interval_ms: { type: integer, format: int64, minimum: 1 } + running: + type: array + items: + $ref: '#/components/schemas/TrackerTask' + + TrackerTask: + type: object + required: + - task_id + - task_title + - pipeline_id + - agent_role + - execution + - current_turn + - max_turns + - started_at_ms + - last_activity_ms + - state + properties: + task_id: { type: string } + task_title: { type: string } + pipeline_id: { type: string } + agent_role: { type: string } + execution: + type: string + enum: [subprocess, dispatch] + current_turn: { type: integer, format: int64, minimum: 0 } + max_turns: { type: integer, format: int64, minimum: 1 } + started_at_ms: { type: integer, format: int64 } + last_activity_ms: { type: integer, format: int64 } + state: { $ref: '#/components/schemas/TrackerTaskState' } + + TrackerStats: + type: object + required: [running, completed, failed, total, max_concurrent] + properties: + running: { type: integer, format: int64, minimum: 0 } + completed: { type: integer, format: int64, minimum: 0 } + failed: { type: integer, format: int64, minimum: 0 } + total: { type: integer, format: int64, minimum: 0 } + max_concurrent: { type: integer, format: int64, minimum: 1 } + + TrackerRefreshResponse: + type: object + required: [status, message] + properties: + status: + type: string + enum: [ok] + message: + type: string + + AgentEventPayload: + type: object + properties: + iteration: + type: integer + format: int64 + minimum: 0 + tool: + type: string + nullable: true + args: + type: object + additionalProperties: true + nullable: true + result: + type: string + nullable: true + status: + type: string + default: running diff --git a/src/api.zig b/src/api.zig index 3ddc132..b5ee09e 100644 --- a/src/api.zig +++ b/src/api.zig @@ -232,7 +232,7 @@ pub fn handleRequest(ctx: *Context, method: []const u8, target: []const u8, body return handleInjectState(ctx, seg1.?, body); } - // ── SSE stream endpoint ───────────────────────────────────────── + // ── Stream snapshot endpoint ──────────────────────────────────── // GET /runs/{id}/stream if (is_get and eql(seg0, "runs") and seg1 != null and eql(seg2, "stream") and seg3 == null) { @@ -1506,9 +1506,10 @@ fn handleForkRun(ctx: *Context, body: []const u8) HttpResponse { }; const run_id_json = jsonQuoted(ctx.allocator, new_run_id) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); + const checkpoint_id_json = jsonQuoted(ctx.allocator, checkpoint_id) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); const resp = std.fmt.allocPrint(ctx.allocator, \\{{"id":{s},"status":"running","forked_from_checkpoint":{s}}} - , .{ run_id_json, checkpoint_id }) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); + , .{ run_id_json, checkpoint_id_json }) catch return jsonResponse(500, "{\"error\":{\"code\":\"internal\",\"message\":\"out of memory\"}}"); return jsonResponse(201, resp); } @@ -1630,12 +1631,12 @@ fn handleInjectState(ctx: *Context, run_id: []const u8, body: []const u8) HttpRe } } -// ── SSE Stream Handler ────────────────────────────────────────────── +// ── Stream Snapshot Handler ───────────────────────────────────────── fn handleStream(ctx: *Context, run_id: []const u8, target: []const u8) HttpResponse { // For now, return the current state and events as a regular JSON response. - // Full SSE streaming with held-open connections will be implemented - // when the threading model is wired in main.zig (Task 12). + // Held-open SSE is handled internally by the hub; the HTTP endpoint + // returns snapshots so independent consumers can use after_seq cursors. // // Supports ?mode=values,tasks,debug,updates,custom query param to filter // which streaming modes the client wants. Default: all modes. @@ -2667,6 +2668,30 @@ test "API: list runs supports workflow_id filter" { try std.testing.expect(std.mem.indexOf(u8, resp.body, "\"workflow_id\":\"wf_2\"") == null); } +test "API: fork run returns quoted checkpoint id" { + const allocator = std.testing.allocator; + var store = try Store.init(allocator, ":memory:"); + defer store.deinit(); + + var arena = std.heap.ArenaAllocator.init(allocator); + defer arena.deinit(); + + try store.createRunWithState("r1", null, "{\"nodes\":{}}", "{}", "{\"x\":1}"); + try store.createCheckpoint("cp-one", "r1", "step_a", null, "{\"x\":1}", "[\"step_a\"]", 1, null); + + var ctx = Context{ + .store = &store, + .allocator = arena.allocator(), + }; + + const resp = handleRequest(&ctx, "POST", "/runs/fork", "{\"checkpoint_id\":\"cp-one\"}"); + try std.testing.expectEqual(@as(u16, 201), resp.status_code); + + const parsed = try std.json.parseFromSlice(std.json.Value, allocator, resp.body, .{}); + defer parsed.deinit(); + try std.testing.expectEqualStrings("cp-one", parsed.value.object.get("forked_from_checkpoint").?.string); +} + test "API: replay run from checkpoint" { const allocator = std.testing.allocator; var store = try Store.init(allocator, ":memory:");